ad9929dca1050f51ba1b628e9e053e2dc71881dd
[oweals/gnunet.git] / src / multicast / multicast_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/multicast_api.c
23  * @brief Multicast service; implements multicast groups using CADET connections.
24  * @author Christian Grothoff
25  * @author Gabor X Toth
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_multicast_service.h"
31 #include "multicast.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
34
35
36 /**
37  * Handle for a request to send a message to all multicast group members
38  * (from the origin).
39  */
40 struct GNUNET_MULTICAST_OriginTransmitHandle
41 {
42   GNUNET_MULTICAST_OriginTransmitNotify notify;
43   void *notify_cls;
44   struct GNUNET_MULTICAST_Origin *origin;
45
46   uint64_t message_id;
47   uint64_t group_generation;
48   uint64_t fragment_offset;
49 };
50
51
52 /**
53  * Handle for a message to be delivered from a member to the origin.
54  */
55 struct GNUNET_MULTICAST_MemberTransmitHandle
56 {
57   GNUNET_MULTICAST_MemberTransmitNotify notify;
58   void *notify_cls;
59   struct GNUNET_MULTICAST_Member *member;
60
61   uint64_t request_id;
62   uint64_t fragment_offset;
63 };
64
65
66 struct GNUNET_MULTICAST_Group
67 {
68   /**
69    * Configuration to use.
70    */
71   const struct GNUNET_CONFIGURATION_Handle *cfg;
72
73   /**
74    * Client connection to the service.
75    */
76   struct GNUNET_MQ_Handle *mq;
77
78   /**
79    * Message to send on connect.
80    */
81   struct GNUNET_MQ_Envelope *connect_env;
82
83   /**
84    * Time to wait until we try to reconnect on failure.
85    */
86   struct GNUNET_TIME_Relative reconnect_delay;
87
88   /**
89    * Task for reconnecting when the listener fails.
90    */
91   struct GNUNET_SCHEDULER_Task *reconnect_task;
92
93   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
94   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
95   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
96   GNUNET_MULTICAST_MessageCallback message_cb;
97   void *cb_cls;
98
99   /**
100    * Function called after disconnected from the service.
101    */
102   GNUNET_ContinuationCallback disconnect_cb;
103
104   /**
105    * Closure for @a disconnect_cb.
106    */
107   void *disconnect_cls;
108
109   /**
110    * Are we currently transmitting a message?
111    */
112   uint8_t in_transmit;
113
114   /**
115    * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
116    */
117   uint8_t acks_pending;
118
119   /**
120    * Is this the origin or a member?
121    */
122   uint8_t is_origin;
123
124   /**
125    * Is this channel in the process of disconnecting from the service?
126    * #GNUNET_YES or #GNUNET_NO
127    */
128   uint8_t is_disconnecting;
129 };
130
131
132 /**
133  * Handle for the origin of a multicast group.
134  */
135 struct GNUNET_MULTICAST_Origin
136 {
137   struct GNUNET_MULTICAST_Group grp;
138   struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
139
140   GNUNET_MULTICAST_RequestCallback request_cb;
141 };
142
143
144 /**
145  * Handle for a multicast group member.
146  */
147 struct GNUNET_MULTICAST_Member
148 {
149   struct GNUNET_MULTICAST_Group grp;
150   struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
151
152   GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
153
154   /**
155    * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
156    */
157   struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
158
159   uint64_t next_fragment_id;
160 };
161
162
163 /**
164  * Handle that identifies a join request.
165  *
166  * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
167  * corresponding calls to #GNUNET_MULTICAST_join_decision().
168  */
169 struct GNUNET_MULTICAST_JoinHandle
170 {
171   struct GNUNET_MULTICAST_Group *group;
172
173   /**
174    * Public key of the member requesting join.
175    */
176   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
177
178   /**
179    * Peer identity of the member requesting join.
180    */
181   struct GNUNET_PeerIdentity peer;
182 };
183
184
185 /**
186  * Opaque handle to a replay request from the multicast service.
187  */
188 struct GNUNET_MULTICAST_ReplayHandle
189 {
190   struct GNUNET_MULTICAST_Group *grp;
191   struct MulticastReplayRequestMessage req;
192 };
193
194
195 /**
196  * Handle for a replay request.
197  */
198 struct GNUNET_MULTICAST_MemberReplayHandle
199 {
200 };
201
202
203 static void
204 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
205
206 static void
207 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
208
209
210 /**
211  * Check join request message.
212  */
213 static int
214 check_group_join_request (void *cls,
215                           const struct MulticastJoinRequestMessage *jreq)
216 {
217   uint16_t size = ntohs (jreq->header.size);
218
219   if (sizeof (*jreq) == size)
220     return GNUNET_OK;
221
222   if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
223     return GNUNET_OK;
224
225   return GNUNET_SYSERR;
226 }
227
228
229 /**
230  * Receive join request from service.
231  */
232 static void
233 handle_group_join_request (void *cls,
234                            const struct MulticastJoinRequestMessage *jreq)
235 {
236   struct GNUNET_MULTICAST_Group *grp = cls;
237   struct GNUNET_MULTICAST_JoinHandle *jh;
238   const struct GNUNET_MessageHeader *jmsg = NULL;
239
240   if (NULL == grp)
241   {
242     GNUNET_break (0);
243     return;
244   }
245   if (NULL == grp->join_req_cb)
246     return;
247
248   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
249     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
250
251   jh = GNUNET_malloc (sizeof (*jh));
252   jh->group = grp;
253   jh->member_pub_key = jreq->member_pub_key;
254   jh->peer = jreq->peer;
255   grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
256
257   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
258 }
259
260
261 /**
262  * Check multicast message.
263  */
264 static int
265 check_group_message (void *cls,
266                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
267 {
268   return GNUNET_OK;
269 }
270
271
272 /**
273  * Receive multicast message from service.
274  */
275 static void
276 handle_group_message (void *cls,
277                       const struct GNUNET_MULTICAST_MessageHeader *mmsg)
278 {
279   struct GNUNET_MULTICAST_Group *grp = cls;
280
281   if (GNUNET_YES == grp->is_disconnecting)
282     return;
283
284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285               "Calling message callback with a message of size %u.\n",
286               ntohs (mmsg->header.size));
287
288   if (NULL != grp->message_cb)
289     grp->message_cb (grp->cb_cls, mmsg);
290
291   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
292 }
293
294
295 /**
296  * Receive message/request fragment acknowledgement from service.
297  */
298 static void
299 handle_group_fragment_ack (void *cls,
300                            const struct GNUNET_MessageHeader *msg)
301 {
302   struct GNUNET_MULTICAST_Group *grp = cls;
303
304   LOG (GNUNET_ERROR_TYPE_DEBUG,
305        "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
306        grp, grp->in_transmit, grp->acks_pending);
307
308   if (0 == grp->acks_pending)
309   {
310     LOG (GNUNET_ERROR_TYPE_DEBUG,
311          "%p Ignoring extraneous fragment ACK.\n", grp);
312     return;
313   }
314   grp->acks_pending--;
315
316   if (GNUNET_YES != grp->in_transmit)
317     return;
318
319   if (GNUNET_YES == grp->is_origin)
320     origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
321   else
322     member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
323
324   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
325 }
326
327
328 /**
329  * Check unicast request.
330  */
331 static int
332 check_origin_request (void *cls,
333                       const struct GNUNET_MULTICAST_RequestHeader *req)
334 {
335   return GNUNET_OK;
336 }
337
338
339 /**
340  * Origin receives unicast request from a member.
341  */
342 static void
343 handle_origin_request (void *cls,
344                        const struct GNUNET_MULTICAST_RequestHeader *req)
345 {
346   struct GNUNET_MULTICAST_Group *grp;
347   struct GNUNET_MULTICAST_Origin *orig = cls;
348   grp = &orig->grp;
349
350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
351               "Calling request callback with a request of size %u.\n",
352               ntohs (req->header.size));
353
354   if (NULL != orig->request_cb)
355     orig->request_cb (grp->cb_cls, req);
356
357   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
358 }
359
360
361 /**
362  * Receive multicast replay request from service.
363  */
364 static void
365 handle_group_replay_request (void *cls,
366                              const struct MulticastReplayRequestMessage *rep)
367
368 {
369   struct GNUNET_MULTICAST_Group *grp = cls;
370
371   if (GNUNET_YES == grp->is_disconnecting)
372     return;
373
374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
375
376   if (0 != rep->fragment_id)
377   {
378     if (NULL != grp->replay_frag_cb)
379     {
380       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
381       rh->grp = grp;
382       rh->req = *rep;
383       grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
384                            GNUNET_ntohll (rep->fragment_id),
385                            GNUNET_ntohll (rep->flags), rh);
386     }
387   }
388   else if (0 != rep->message_id)
389   {
390     if (NULL != grp->replay_msg_cb)
391     {
392       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
393       rh->grp = grp;
394       rh->req = *rep;
395       grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
396                           GNUNET_ntohll (rep->message_id),
397                           GNUNET_ntohll (rep->fragment_offset),
398                           GNUNET_ntohll (rep->flags), rh);
399     }
400   }
401
402   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
403 }
404
405
406 /**
407  * Check replay response.
408  */
409 static int
410 check_member_replay_response (void *cls,
411                               const struct MulticastReplayResponseMessage *res)
412 {
413   uint16_t size = ntohs (res->header.size);
414
415   if (sizeof (*res) == size)
416     return GNUNET_OK;
417
418   if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
419     return GNUNET_OK;
420
421   return GNUNET_SYSERR;
422 }
423
424
425 /**
426  * Receive replay response from service.
427  */
428 static void
429 handle_member_replay_response (void *cls,
430                                const struct MulticastReplayResponseMessage *res)
431 {
432   struct GNUNET_MULTICAST_Group *grp;
433   struct GNUNET_MULTICAST_Member *mem = cls;
434   grp = &mem->grp;
435
436   if (GNUNET_YES == grp->is_disconnecting)
437     return;
438
439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
440
441   // FIXME: return result
442 }
443
444
445 /**
446  * Check join decision.
447  */
448 static int
449 check_member_join_decision (void *cls,
450                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
451 {
452   return GNUNET_OK; // checked in handle below
453 }
454
455
456 /**
457  * Member receives join decision.
458  */
459 static void
460 handle_member_join_decision (void *cls,
461                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
462 {
463   struct GNUNET_MULTICAST_Group *grp;
464   struct GNUNET_MULTICAST_Member *mem = cls;
465   grp = &mem->grp;
466
467   const struct MulticastJoinDecisionMessage *
468     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
469
470   uint16_t dcsn_size = ntohs (dcsn->header.size);
471   int is_admitted = ntohl (dcsn->is_admitted);
472
473   LOG (GNUNET_ERROR_TYPE_DEBUG,
474        "%p Member got join decision from multicast: %d\n",
475        mem, is_admitted);
476
477   const struct GNUNET_MessageHeader *join_resp = NULL;
478   uint16_t join_resp_size = 0;
479
480   uint16_t relay_count = ntohl (dcsn->relay_count);
481   const struct GNUNET_PeerIdentity *relays = NULL;
482   uint16_t relay_size = relay_count * sizeof (*relays);
483   if (0 < relay_count)
484   {
485     if (dcsn_size < sizeof (*dcsn) + relay_size)
486     {
487       GNUNET_break_op (0);
488       is_admitted = GNUNET_SYSERR;
489     }
490     else
491     {
492       relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
493     }
494   }
495
496   if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
497   {
498     join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
499     join_resp_size = ntohs (join_resp->size);
500   }
501   if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
502   {
503     LOG (GNUNET_ERROR_TYPE_DEBUG,
504          "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
505          dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
506     GNUNET_break_op (0);
507     is_admitted = GNUNET_SYSERR;
508   }
509
510   if (NULL != mem->join_dcsn_cb)
511     mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
512                        relay_count, relays, join_resp);
513
514   // FIXME:
515   //if (GNUNET_YES != is_admitted)
516   //  GNUNET_MULTICAST_member_part (mem);
517
518   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
519 }
520
521
522 static void
523 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
524 {
525   if (NULL != grp->connect_env)
526   {
527     GNUNET_MQ_discard (grp->connect_env);
528     grp->connect_env = NULL;
529   }
530   if (NULL != grp->mq)
531   {
532     GNUNET_MQ_destroy (grp->mq);
533     grp->mq = NULL;
534   }
535   if (NULL != grp->disconnect_cb)
536   {
537     grp->disconnect_cb (grp->disconnect_cls);
538     grp->disconnect_cb = NULL;
539   }
540   GNUNET_free (grp);
541 }
542
543
544 static void
545 group_disconnect (struct GNUNET_MULTICAST_Group *grp,
546                   GNUNET_ContinuationCallback cb,
547                   void *cls)
548 {
549   grp->is_disconnecting = GNUNET_YES;
550   grp->disconnect_cb = cb;
551   grp->disconnect_cls = cls;
552
553   if (NULL != grp->mq)
554   {
555     struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
556     if (NULL != last)
557     {
558       GNUNET_MQ_notify_sent (last,
559                              (GNUNET_MQ_NotifyCallback) group_cleanup, grp);
560     }
561     else
562     {
563       group_cleanup (grp);
564     }
565   }
566   else
567   {
568     group_cleanup (grp);
569   }
570 }
571
572
573 /**
574  * Function to call with the decision made for a join request.
575  *
576  * Must be called once and only once in response to an invocation of the
577  * #GNUNET_MULTICAST_JoinRequestCallback.
578  *
579  * @param join
580  *        Join request handle.
581  * @param is_admitted
582  *        #GNUNET_YES    if the join is approved,
583  *        #GNUNET_NO     if it is disapproved,
584  *        #GNUNET_SYSERR if we cannot answer the request.
585  * @param relay_count
586  *        Number of relays given.
587  * @param relays
588  *        Array of suggested peers that might be useful relays to use
589  *        when joining the multicast group (essentially a list of peers that
590  *        are already part of the multicast group and might thus be willing
591  *        to help with routing).  If empty, only this local peer (which must
592  *        be the multicast origin) is a good candidate for building the
593  *        multicast tree.  Note that it is unnecessary to specify our own
594  *        peer identity in this array.
595  * @param join_resp
596  *        Message to send in response to the joining peer;
597  *        can also be used to redirect the peer to a different group at the
598  *        application layer; this response is to be transmitted to the
599  *        peer that issued the request even if admission is denied.
600  */
601 struct GNUNET_MULTICAST_ReplayHandle *
602 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
603                                 int is_admitted,
604                                 uint16_t relay_count,
605                                 const struct GNUNET_PeerIdentity *relays,
606                                 const struct GNUNET_MessageHeader *join_resp)
607 {
608   struct GNUNET_MULTICAST_Group *grp = join->group;
609   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
610   uint16_t relay_size = relay_count * sizeof (*relays);
611
612   struct MulticastJoinDecisionMessageHeader *hdcsn;
613   struct MulticastJoinDecisionMessage *dcsn;
614   struct GNUNET_MQ_Envelope *
615     env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
616                                GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
617   hdcsn->member_pub_key = join->member_pub_key;
618   hdcsn->peer = join->peer;
619
620   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
621   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
622   dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
623   dcsn->is_admitted = htonl (is_admitted);
624   dcsn->relay_count = htonl (relay_count);
625   if (0 < relay_size)
626     GNUNET_memcpy (&dcsn[1], relays, relay_size);
627   if (0 < join_resp_size)
628     GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
629
630   GNUNET_MQ_send (grp->mq, env);
631   GNUNET_free (join);
632   return NULL;
633 }
634
635
636 /**
637  * Replay a message fragment for the multicast group.
638  *
639  * @param rh
640  *        Replay handle identifying which replay operation was requested.
641  * @param msg
642  *        Replayed message fragment, NULL if not found / an error occurred.
643  * @param ec
644  *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
645  *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
646  */
647 void
648 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
649                                   const struct GNUNET_MessageHeader *msg,
650                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
651 {
652   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
653   struct MulticastReplayResponseMessage *res;
654   struct GNUNET_MQ_Envelope *
655     env = GNUNET_MQ_msg_extra (res, msg_size,
656                                GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
657   res->fragment_id = rh->req.fragment_id;
658   res->message_id = rh->req.message_id;
659   res->fragment_offset = rh->req.fragment_offset;
660   res->flags = rh->req.flags;
661   res->error_code = htonl (ec);
662
663   if (GNUNET_MULTICAST_REC_OK == ec)
664   {
665     GNUNET_assert (NULL != msg);
666     GNUNET_memcpy (&res[1], msg, msg_size);
667   }
668
669   GNUNET_MQ_send (rh->grp->mq, env);
670
671   if (GNUNET_MULTICAST_REC_OK != ec)
672     GNUNET_free (rh);
673 }
674
675
676 /**
677  * Indicate the end of the replay session.
678  *
679  * Invalidates the replay handle.
680  *
681  * @param rh
682  *        Replay session to end.
683  */
684 void
685 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
686 {
687   struct MulticastReplayResponseMessage *end;
688   struct GNUNET_MQ_Envelope *
689     env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
690
691   end->fragment_id = rh->req.fragment_id;
692   end->message_id = rh->req.message_id;
693   end->fragment_offset = rh->req.fragment_offset;
694   end->flags = rh->req.flags;
695
696   GNUNET_MQ_send (rh->grp->mq, env);
697   GNUNET_free (rh);
698 }
699
700
701 /**
702  * Replay a message for the multicast group.
703  *
704  * @param rh
705  *        Replay handle identifying which replay operation was requested.
706  * @param notify
707  *        Function to call to get the message.
708  * @param notify_cls
709  *        Closure for @a notify.
710  */
711 void
712 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
713                                    GNUNET_MULTICAST_ReplayTransmitNotify notify,
714                                    void *notify_cls)
715 {
716 }
717
718
719 static void
720 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
721
722
723 static void
724 origin_reconnect (void *cls)
725 {
726   origin_connect (cls);
727 }
728
729
730 /**
731  * Origin client disconnected from service.
732  *
733  * Reconnect after backoff period.
734  */
735 static void
736 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
737 {
738   struct GNUNET_MULTICAST_Origin *orig = cls;
739   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
740
741   LOG (GNUNET_ERROR_TYPE_DEBUG,
742        "Origin client disconnected (%d), re-connecting\n",
743        (int) error);
744   if (NULL != grp->mq)
745   {
746     GNUNET_MQ_destroy (grp->mq);
747     grp->mq = NULL;
748   }
749
750   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
751                                                       origin_reconnect,
752                                                       orig);
753   grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
754 }
755
756
757 /**
758  * Connect to service as origin.
759  */
760 static void
761 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
762 {
763   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
764
765   GNUNET_MQ_hd_var_size (group_message,
766                          GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
767                          struct GNUNET_MULTICAST_MessageHeader);
768
769   GNUNET_MQ_hd_var_size (origin_request,
770                          GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
771                          struct GNUNET_MULTICAST_RequestHeader);
772
773   GNUNET_MQ_hd_fixed_size (group_fragment_ack,
774                            GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
775                            struct GNUNET_MessageHeader);
776
777   GNUNET_MQ_hd_var_size (group_join_request,
778                          GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
779                          struct MulticastJoinRequestMessage);
780
781   GNUNET_MQ_hd_fixed_size (group_replay_request,
782                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
783                            struct MulticastReplayRequestMessage);
784
785   struct GNUNET_MQ_MessageHandler handlers[] = {
786     make_group_message_handler (grp),
787     make_origin_request_handler (orig),
788     make_group_fragment_ack_handler (grp),
789     make_group_join_request_handler (grp),
790     make_group_replay_request_handler (grp),
791     GNUNET_MQ_handler_end ()
792   };
793
794   grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
795                                    handlers, origin_disconnected, orig);
796   GNUNET_assert (NULL != grp->mq);
797   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
798 }
799
800
801 /**
802  * Start a multicast group.
803  *
804  * Will advertise the origin in the P2P overlay network under the respective
805  * public key so that other peer can find this peer to join it.  Peers that
806  * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
807  * either an existing group member or to the origin.  If the joining is
808  * approved, the member is cleared for @e replay and will begin to receive
809  * messages transmitted to the group.  If joining is disapproved, the failed
810  * candidate will be given a response.  Members in the group can send messages
811  * to the origin (one at a time).
812  *
813  * @param cfg
814  *        Configuration to use.
815  * @param priv_key
816  *        ECC key that will be used to sign messages for this
817  *        multicast session; public key is used to identify the multicast group;
818  * @param max_fragment_id
819  *        Maximum fragment ID already sent to the group.
820  *        0 for a new group.
821  * @param join_request_cb
822  *        Function called to approve / disapprove joining of a peer.
823  * @param replay_frag_cb
824  *        Function that can be called to replay a message fragment.
825  * @param replay_msg_cb
826  *        Function that can be called to replay a message.
827  * @param request_cb
828  *        Function called with message fragments from group members.
829  * @param message_cb
830  *        Function called with the message fragments sent to the
831  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
832  *        should be stored for answering replay requests later.
833  * @param cls
834  *        Closure for the various callbacks that follow.
835  *
836  * @return Handle for the origin, NULL on error.
837  */
838 struct GNUNET_MULTICAST_Origin *
839 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
840                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
841                                uint64_t max_fragment_id,
842                                GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
843                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
844                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
845                                GNUNET_MULTICAST_RequestCallback request_cb,
846                                GNUNET_MULTICAST_MessageCallback message_cb,
847                                void *cls)
848 {
849   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
850   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
851
852   struct MulticastOriginStartMessage *start;
853   grp->connect_env = GNUNET_MQ_msg (start,
854                                     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
855   start->max_fragment_id = max_fragment_id;
856   start->group_key = *priv_key;
857
858   grp->cfg = cfg;
859   grp->is_origin = GNUNET_YES;
860   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
861
862   grp->cb_cls = cls;
863   grp->join_req_cb = join_request_cb;
864   grp->replay_frag_cb = replay_frag_cb;
865   grp->replay_msg_cb = replay_msg_cb;
866   grp->message_cb = message_cb;
867
868   orig->request_cb = request_cb;
869
870   origin_connect (orig);
871   return orig;
872 }
873
874
875 /**
876  * Stop a multicast group.
877  *
878  * @param origin
879  *        Multicast group to stop.
880  */
881 void
882 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
883                               GNUNET_ContinuationCallback stop_cb,
884                               void *stop_cls)
885 {
886   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
887
888   group_disconnect (grp, stop_cb, stop_cls);
889 }
890
891
892 static void
893 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
894 {
895   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
896   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
897   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
898   GNUNET_assert (GNUNET_YES == grp->in_transmit);
899
900   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
901   struct GNUNET_MULTICAST_MessageHeader *msg;
902   struct GNUNET_MQ_Envelope *
903     env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
904                                GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
905
906   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
907
908   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
909       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
910   {
911     LOG (GNUNET_ERROR_TYPE_ERROR,
912          "%p OriginTransmitNotify() returned error or invalid message size.\n",
913          orig);
914     /* FIXME: handle error */
915     GNUNET_MQ_discard (env);
916     return;
917   }
918
919   if (GNUNET_NO == ret && 0 == buf_size)
920   {
921     LOG (GNUNET_ERROR_TYPE_DEBUG,
922          "%p OriginTransmitNotify() - transmission paused.\n", orig);
923     GNUNET_MQ_discard (env);
924     return; /* Transmission paused. */
925   }
926
927   msg->header.size = htons (sizeof (*msg) + buf_size);
928   msg->message_id = GNUNET_htonll (tmit->message_id);
929   msg->group_generation = tmit->group_generation;
930   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
931   tmit->fragment_offset += sizeof (*msg) + buf_size;
932
933   grp->acks_pending++;
934   GNUNET_MQ_send (grp->mq, env);
935
936   if (GNUNET_YES == ret)
937     grp->in_transmit = GNUNET_NO;
938 }
939
940
941 /**
942  * Send a message to the multicast group.
943  *
944  * @param orig
945  *        Handle to the multicast group.
946  * @param message_id
947  *        Application layer ID for the message.  Opaque to multicast.
948  * @param group_generation
949  *        Group generation of the message.
950  *        Documented in struct GNUNET_MULTICAST_MessageHeader.
951  * @param notify
952  *        Function to call to get the message.
953  * @param notify_cls
954  *        Closure for @a notify.
955  *
956  * @return Message handle on success,
957  *         NULL on error (i.e. another request is already pending).
958  */
959 struct GNUNET_MULTICAST_OriginTransmitHandle *
960 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
961                                 uint64_t message_id,
962                                 uint64_t group_generation,
963                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
964                                 void *notify_cls)
965 {
966   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
967   if (GNUNET_YES == grp->in_transmit)
968     return NULL;
969   grp->in_transmit = GNUNET_YES;
970
971   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
972   tmit->origin = orig;
973   tmit->message_id = message_id;
974   tmit->fragment_offset = 0;
975   tmit->group_generation = group_generation;
976   tmit->notify = notify;
977   tmit->notify_cls = notify_cls;
978
979   origin_to_all (orig);
980   return tmit;
981 }
982
983
984 /**
985  * Resume message transmission to multicast group.
986  *
987  * @param th
988  *        Transmission to cancel.
989  */
990 void
991 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
992 {
993   struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
994   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
995     return;
996   origin_to_all (th->origin);
997 }
998
999
1000 /**
1001  * Cancel request for message transmission to multicast group.
1002  *
1003  * @param th
1004  *        Transmission to cancel.
1005  */
1006 void
1007 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
1008 {
1009   th->origin->grp.in_transmit = GNUNET_NO;
1010 }
1011
1012
1013 static void
1014 member_connect (struct GNUNET_MULTICAST_Member *mem);
1015
1016
1017 static void
1018 member_reconnect (void *cls)
1019 {
1020   member_connect (cls);
1021 }
1022
1023
1024 /**
1025  * Member client disconnected from service.
1026  *
1027  * Reconnect after backoff period.
1028  */
1029 static void
1030 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1031 {
1032   struct GNUNET_MULTICAST_Member *mem = cls;
1033   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1034
1035   LOG (GNUNET_ERROR_TYPE_DEBUG,
1036        "Member client disconnected (%d), re-connecting\n",
1037        (int) error);
1038   GNUNET_MQ_destroy (grp->mq);
1039   grp->mq = NULL;
1040
1041   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1042                                                       member_reconnect,
1043                                                       mem);
1044   grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1045 }
1046
1047
1048 /**
1049  * Connect to service as member.
1050  */
1051 static void
1052 member_connect (struct GNUNET_MULTICAST_Member *mem)
1053 {
1054   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1055
1056   GNUNET_MQ_hd_var_size (group_message,
1057                          GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1058                          struct GNUNET_MULTICAST_MessageHeader);
1059
1060   GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1061                            GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1062                            struct GNUNET_MessageHeader);
1063
1064   GNUNET_MQ_hd_var_size (group_join_request,
1065                          GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1066                          struct MulticastJoinRequestMessage);
1067
1068   GNUNET_MQ_hd_var_size (member_join_decision,
1069                          GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1070                          struct MulticastJoinDecisionMessageHeader);
1071
1072   GNUNET_MQ_hd_fixed_size (group_replay_request,
1073                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1074                            struct MulticastReplayRequestMessage);
1075
1076   GNUNET_MQ_hd_var_size (member_replay_response,
1077                          GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1078                          struct MulticastReplayResponseMessage);
1079
1080   struct GNUNET_MQ_MessageHandler handlers[] = {
1081     make_group_message_handler (grp),
1082     make_group_fragment_ack_handler (grp),
1083     make_group_join_request_handler (grp),
1084     make_member_join_decision_handler (mem),
1085     make_group_replay_request_handler (grp),
1086     make_member_replay_response_handler (mem),
1087     GNUNET_MQ_handler_end ()
1088   };
1089
1090   grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
1091                                    handlers, member_disconnected, mem);
1092   GNUNET_assert (NULL != grp->mq);
1093   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1094 }
1095
1096
1097 /**
1098  * Join a multicast group.
1099  *
1100  * The entity joining is always the local peer.  Further information about the
1101  * candidate can be provided in the @a join_request message.  If the join fails, the
1102  * @a message_cb is invoked with a (failure) response and then with NULL.  If
1103  * the join succeeds, outstanding (state) messages and ongoing multicast
1104  * messages will be given to the @a message_cb until the member decides to part
1105  * the group.  The @a replay_cb function may be called at any time by the
1106  * multicast service to support relaying messages to other members of the group.
1107  *
1108  * @param cfg
1109  *        Configuration to use.
1110  * @param group_key
1111  *        ECC public key that identifies the group to join.
1112  * @param member_key
1113  *        ECC key that identifies the member
1114  *        and used to sign requests sent to the origin.
1115  * @param origin
1116  *        Peer ID of the origin to send unicast requsets to.  If NULL,
1117  *        unicast requests are sent back via multiple hops on the reverse path
1118  *        of multicast messages.
1119  * @param relay_count
1120  *        Number of peers in the @a relays array.
1121  * @param relays
1122  *        Peer identities of members of the group, which serve as relays
1123  *        and can be used to join the group at. and send the @a join_request to.
1124  *        If empty, the @a join_request is sent directly to the @a origin.
1125  * @param join_msg
1126  *        Application-dependent join message to be passed to the peer @a origin.
1127  * @param join_request_cb
1128  *        Function called to approve / disapprove joining of a peer.
1129  * @param join_decision_cb
1130  *        Function called to inform about the join decision.
1131  * @param replay_frag_cb
1132  *        Function that can be called to replay message fragments
1133  *        this peer already knows from this group. NULL if this
1134  *        client is unable to support replay.
1135  * @param replay_msg_cb
1136  *        Function that can be called to replay message fragments
1137  *        this peer already knows from this group. NULL if this
1138  *        client is unable to support replay.
1139  * @param message_cb
1140  *        Function to be called for all message fragments we
1141  *        receive from the group, excluding those our @a replay_cb
1142  *        already has.
1143  * @param cls
1144  *        Closure for callbacks.
1145  *
1146  * @return Handle for the member, NULL on error.
1147  */
1148 struct GNUNET_MULTICAST_Member *
1149 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1150                               const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1151                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1152                               const struct GNUNET_PeerIdentity *origin,
1153                               uint16_t relay_count,
1154                               const struct GNUNET_PeerIdentity *relays,
1155                               const struct GNUNET_MessageHeader *join_msg,
1156                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1157                               GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1158                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1159                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1160                               GNUNET_MULTICAST_MessageCallback message_cb,
1161                               void *cls)
1162 {
1163   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1164   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1165
1166   uint16_t relay_size = relay_count * sizeof (*relays);
1167   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1168   struct MulticastMemberJoinMessage *join;
1169   grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1170                                           GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1171   join->group_pub_key = *group_pub_key;
1172   join->member_key = *member_key;
1173   join->origin = *origin;
1174   join->relay_count = ntohl (relay_count);
1175   if (0 < relay_size)
1176     GNUNET_memcpy (&join[1], relays, relay_size);
1177   if (0 < join_msg_size)
1178     GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1179
1180   grp->cfg = cfg;
1181   grp->is_origin = GNUNET_NO;
1182   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1183
1184   mem->join_dcsn_cb = join_decision_cb;
1185   grp->join_req_cb = join_request_cb;
1186   grp->replay_frag_cb = replay_frag_cb;
1187   grp->replay_msg_cb = replay_msg_cb;
1188   grp->message_cb = message_cb;
1189   grp->cb_cls = cls;
1190
1191   member_connect (mem);
1192   return mem;
1193 }
1194
1195
1196 /**
1197  * Part a multicast group.
1198  *
1199  * Disconnects from all group members and invalidates the @a member handle.
1200  *
1201  * An application-dependent part message can be transmitted beforehand using
1202  * #GNUNET_MULTICAST_member_to_origin())
1203  *
1204  * @param member
1205  *        Membership handle.
1206  */
1207 void
1208 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1209                               GNUNET_ContinuationCallback part_cb,
1210                               void *part_cls)
1211 {
1212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
1213   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1214
1215   mem->join_dcsn_cb = NULL;
1216   grp->join_req_cb = NULL;
1217   grp->message_cb = NULL;
1218   grp->replay_msg_cb = NULL;
1219   grp->replay_frag_cb = NULL;
1220
1221   group_disconnect (grp, part_cb, part_cls);
1222 }
1223
1224
1225 void
1226 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1227                        uint64_t fragment_id,
1228                        uint64_t message_id,
1229                        uint64_t fragment_offset,
1230                        uint64_t flags)
1231 {
1232   struct MulticastReplayRequestMessage *rep;
1233   struct GNUNET_MQ_Envelope *
1234     env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1235
1236   rep->fragment_id = GNUNET_htonll (fragment_id);
1237   rep->message_id = GNUNET_htonll (message_id);
1238   rep->fragment_offset = GNUNET_htonll (fragment_offset);
1239   rep->flags = GNUNET_htonll (flags);
1240
1241   GNUNET_MQ_send (mem->grp.mq, env);
1242 }
1243
1244
1245 /**
1246  * Request a fragment to be replayed by fragment ID.
1247  *
1248  * Useful if messages below the @e max_known_fragment_id given when joining are
1249  * needed and not known to the client.
1250  *
1251  * @param member
1252  *        Membership handle.
1253  * @param fragment_id
1254  *        ID of a message fragment that this client would like to see replayed.
1255  * @param flags
1256  *        Additional flags for the replay request.
1257  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1258  *
1259  * @return Replay request handle.
1260  */
1261 struct GNUNET_MULTICAST_MemberReplayHandle *
1262 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1263                                          uint64_t fragment_id,
1264                                          uint64_t flags)
1265 {
1266   member_replay_request (mem, fragment_id, 0, 0, flags);
1267   // FIXME: return something useful
1268   return NULL;
1269 }
1270
1271
1272 /**
1273  * Request a message fragment to be replayed.
1274  *
1275  * Useful if messages below the @e max_known_fragment_id given when joining are
1276  * needed and not known to the client.
1277  *
1278  * @param member
1279  *        Membership handle.
1280  * @param message_id
1281  *        ID of the message this client would like to see replayed.
1282  * @param fragment_offset
1283  *        Offset of the fragment within the message to replay.
1284  * @param flags
1285  *        Additional flags for the replay request.
1286  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1287  *
1288  * @return Replay request handle, NULL on error.
1289  */
1290 struct GNUNET_MULTICAST_MemberReplayHandle *
1291 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1292                                         uint64_t message_id,
1293                                         uint64_t fragment_offset,
1294                                         uint64_t flags)
1295 {
1296   member_replay_request (mem, 0, message_id, fragment_offset, flags);
1297   // FIXME: return something useful
1298   return NULL;
1299 }
1300
1301
1302 static void
1303 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1304 {
1305   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1306   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1307   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1308   GNUNET_assert (GNUNET_YES == grp->in_transmit);
1309
1310   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1311   struct GNUNET_MULTICAST_RequestHeader *req;
1312   struct GNUNET_MQ_Envelope *
1313     env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1314                                GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1315
1316   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1317
1318   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1319       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1320   {
1321     LOG (GNUNET_ERROR_TYPE_ERROR,
1322          "MemberTransmitNotify() returned error or invalid message size. "
1323          "ret=%d, buf_size=%u\n", ret, buf_size);
1324     /* FIXME: handle error */
1325     GNUNET_MQ_discard (env);
1326     return;
1327   }
1328
1329   if (GNUNET_NO == ret && 0 == buf_size)
1330   {
1331     /* Transmission paused. */
1332     GNUNET_MQ_discard (env);
1333     return;
1334   }
1335
1336   req->header.size = htons (sizeof (*req) + buf_size);
1337   req->request_id = GNUNET_htonll (tmit->request_id);
1338   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1339   tmit->fragment_offset += sizeof (*req) + buf_size;
1340
1341   GNUNET_MQ_send (grp->mq, env);
1342
1343   if (GNUNET_YES == ret)
1344     grp->in_transmit = GNUNET_NO;
1345 }
1346
1347
1348 /**
1349  * Send a message to the origin of the multicast group.
1350  *
1351  * @param mem
1352  *        Membership handle.
1353  * @param request_id
1354  *        Application layer ID for the request.  Opaque to multicast.
1355  * @param notify
1356  *        Callback to call to get the message.
1357  * @param notify_cls
1358  *        Closure for @a notify.
1359  *
1360  * @return Handle to cancel request, NULL on error (i.e. request already pending).
1361  */
1362 struct GNUNET_MULTICAST_MemberTransmitHandle *
1363 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1364                                    uint64_t request_id,
1365                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
1366                                    void *notify_cls)
1367 {
1368   if (GNUNET_YES == mem->grp.in_transmit)
1369     return NULL;
1370   mem->grp.in_transmit = GNUNET_YES;
1371
1372   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1373   tmit->member = mem;
1374   tmit->request_id = request_id;
1375   tmit->fragment_offset = 0;
1376   tmit->notify = notify;
1377   tmit->notify_cls = notify_cls;
1378
1379   member_to_origin (mem);
1380   return tmit;
1381 }
1382
1383
1384 /**
1385  * Resume message transmission to origin.
1386  *
1387  * @param th
1388  *        Transmission to cancel.
1389  */
1390 void
1391 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1392 {
1393   struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1394   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1395     return;
1396   member_to_origin (th->member);
1397 }
1398
1399
1400 /**
1401  * Cancel request for message transmission to origin.
1402  *
1403  * @param th
1404  *        Transmission to cancel.
1405  */
1406 void
1407 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1408 {
1409   th->member->grp.in_transmit = GNUNET_NO;
1410 }
1411
1412
1413 /* end of multicast_api.c */