paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / multicast / multicast_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file multicast/multicast_api.c
21  * @brief Multicast service; implements multicast groups using CADET connections.
22  * @author Christian Grothoff
23  * @author Gabor X Toth
24  */
25
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_multicast_service.h"
29 #include "multicast.h"
30
31 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
32
33
34 /**
35  * Handle for a request to send a message to all multicast group members
36  * (from the origin).
37  */
38 struct GNUNET_MULTICAST_OriginTransmitHandle
39 {
40   GNUNET_MULTICAST_OriginTransmitNotify notify;
41   void *notify_cls;
42   struct GNUNET_MULTICAST_Origin *origin;
43
44   uint64_t message_id;
45   uint64_t group_generation;
46   uint64_t fragment_offset;
47 };
48
49
50 /**
51  * Handle for a message to be delivered from a member to the origin.
52  */
53 struct GNUNET_MULTICAST_MemberTransmitHandle
54 {
55   GNUNET_MULTICAST_MemberTransmitNotify notify;
56   void *notify_cls;
57   struct GNUNET_MULTICAST_Member *member;
58
59   uint64_t request_id;
60   uint64_t fragment_offset;
61 };
62
63
64 struct GNUNET_MULTICAST_Group
65 {
66   /**
67    * Configuration to use.
68    */
69   const struct GNUNET_CONFIGURATION_Handle *cfg;
70
71   /**
72    * Client connection to the service.
73    */
74   struct GNUNET_MQ_Handle *mq;
75
76   /**
77    * Message to send on connect.
78    */
79   struct GNUNET_MQ_Envelope *connect_env;
80
81   /**
82    * Time to wait until we try to reconnect on failure.
83    */
84   struct GNUNET_TIME_Relative reconnect_delay;
85
86   /**
87    * Task for reconnecting when the listener fails.
88    */
89   struct GNUNET_SCHEDULER_Task *reconnect_task;
90
91   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
92   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
93   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
94   GNUNET_MULTICAST_MessageCallback message_cb;
95   void *cb_cls;
96
97   /**
98    * Function called after disconnected from the service.
99    */
100   GNUNET_ContinuationCallback disconnect_cb;
101
102   /**
103    * Closure for @a disconnect_cb.
104    */
105   void *disconnect_cls;
106
107   /**
108    * Are we currently transmitting a message?
109    */
110   uint8_t in_transmit;
111
112   /**
113    * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
114    */
115   uint8_t acks_pending;
116
117   /**
118    * Is this the origin or a member?
119    */
120   uint8_t is_origin;
121
122   /**
123    * Is this channel in the process of disconnecting from the service?
124    * #GNUNET_YES or #GNUNET_NO
125    */
126   uint8_t is_disconnecting;
127 };
128
129
130 /**
131  * Handle for the origin of a multicast group.
132  */
133 struct GNUNET_MULTICAST_Origin
134 {
135   struct GNUNET_MULTICAST_Group grp;
136   struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
137
138   GNUNET_MULTICAST_RequestCallback request_cb;
139 };
140
141
142 /**
143  * Handle for a multicast group member.
144  */
145 struct GNUNET_MULTICAST_Member
146 {
147   struct GNUNET_MULTICAST_Group grp;
148   struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
149
150   GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
151
152   /**
153    * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
154    */
155   struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
156
157   uint64_t next_fragment_id;
158 };
159
160
161 /**
162  * Handle that identifies a join request.
163  *
164  * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
165  * corresponding calls to #GNUNET_MULTICAST_join_decision().
166  */
167 struct GNUNET_MULTICAST_JoinHandle
168 {
169   struct GNUNET_MULTICAST_Group *group;
170
171   /**
172    * Public key of the member requesting join.
173    */
174   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
175
176   /**
177    * Peer identity of the member requesting join.
178    */
179   struct GNUNET_PeerIdentity peer;
180 };
181
182
183 /**
184  * Opaque handle to a replay request from the multicast service.
185  */
186 struct GNUNET_MULTICAST_ReplayHandle
187 {
188   struct GNUNET_MULTICAST_Group *grp;
189   struct MulticastReplayRequestMessage req;
190 };
191
192
193 /**
194  * Handle for a replay request.
195  */
196 struct GNUNET_MULTICAST_MemberReplayHandle
197 {
198 };
199
200
201 static void
202 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
203
204 static void
205 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
206
207
208 /**
209  * Check join request message.
210  */
211 static int
212 check_group_join_request (void *cls,
213                           const struct MulticastJoinRequestMessage *jreq)
214 {
215   uint16_t size = ntohs (jreq->header.size);
216
217   if (sizeof (*jreq) == size)
218     return GNUNET_OK;
219
220   if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
221     return GNUNET_OK;
222
223   return GNUNET_SYSERR;
224 }
225
226
227 /**
228  * Receive join request from service.
229  */
230 static void
231 handle_group_join_request (void *cls,
232                            const struct MulticastJoinRequestMessage *jreq)
233 {
234   struct GNUNET_MULTICAST_Group *grp = cls;
235   struct GNUNET_MULTICAST_JoinHandle *jh;
236   const struct GNUNET_MessageHeader *jmsg = NULL;
237
238   if (NULL == grp)
239   {
240     GNUNET_break (0);
241     return;
242   }
243   if (NULL == grp->join_req_cb)
244     return;
245
246   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
247     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
248
249   jh = GNUNET_malloc (sizeof (*jh));
250   jh->group = grp;
251   jh->member_pub_key = jreq->member_pub_key;
252   jh->peer = jreq->peer;
253   grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
254
255   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
256 }
257
258
259 /**
260  * Check multicast message.
261  */
262 static int
263 check_group_message (void *cls,
264                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
265 {
266   return GNUNET_OK;
267 }
268
269
270 /**
271  * Receive multicast message from service.
272  */
273 static void
274 handle_group_message (void *cls,
275                       const struct GNUNET_MULTICAST_MessageHeader *mmsg)
276 {
277   struct GNUNET_MULTICAST_Group *grp = cls;
278
279   if (GNUNET_YES == grp->is_disconnecting)
280     return;
281
282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
283               "Calling message callback with a message of size %u.\n",
284               ntohs (mmsg->header.size));
285
286   if (NULL != grp->message_cb)
287     grp->message_cb (grp->cb_cls, mmsg);
288
289   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
290 }
291
292
293 /**
294  * Receive message/request fragment acknowledgement from service.
295  */
296 static void
297 handle_group_fragment_ack (void *cls,
298                            const struct GNUNET_MessageHeader *msg)
299 {
300   struct GNUNET_MULTICAST_Group *grp = cls;
301
302   LOG (GNUNET_ERROR_TYPE_DEBUG,
303        "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
304        grp, grp->in_transmit, grp->acks_pending);
305
306   if (0 == grp->acks_pending)
307   {
308     LOG (GNUNET_ERROR_TYPE_DEBUG,
309          "%p Ignoring extraneous fragment ACK.\n", grp);
310     return;
311   }
312   grp->acks_pending--;
313
314   if (GNUNET_YES != grp->in_transmit)
315     return;
316
317   if (GNUNET_YES == grp->is_origin)
318     origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
319   else
320     member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
321
322   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
323 }
324
325
326 /**
327  * Check unicast request.
328  */
329 static int
330 check_origin_request (void *cls,
331                       const struct GNUNET_MULTICAST_RequestHeader *req)
332 {
333   return GNUNET_OK;
334 }
335
336
337 /**
338  * Origin receives unicast request from a member.
339  */
340 static void
341 handle_origin_request (void *cls,
342                        const struct GNUNET_MULTICAST_RequestHeader *req)
343 {
344   struct GNUNET_MULTICAST_Group *grp;
345   struct GNUNET_MULTICAST_Origin *orig = cls;
346   grp = &orig->grp;
347
348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
349               "Calling request callback with a request of size %u.\n",
350               ntohs (req->header.size));
351
352   if (NULL != orig->request_cb)
353     orig->request_cb (grp->cb_cls, req);
354
355   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
356 }
357
358
359 /**
360  * Receive multicast replay request from service.
361  */
362 static void
363 handle_group_replay_request (void *cls,
364                              const struct MulticastReplayRequestMessage *rep)
365
366 {
367   struct GNUNET_MULTICAST_Group *grp = cls;
368
369   if (GNUNET_YES == grp->is_disconnecting)
370     return;
371
372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
373
374   if (0 != rep->fragment_id)
375   {
376     if (NULL != grp->replay_frag_cb)
377     {
378       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
379       rh->grp = grp;
380       rh->req = *rep;
381       grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
382                            GNUNET_ntohll (rep->fragment_id),
383                            GNUNET_ntohll (rep->flags), rh);
384     }
385   }
386   else if (0 != rep->message_id)
387   {
388     if (NULL != grp->replay_msg_cb)
389     {
390       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
391       rh->grp = grp;
392       rh->req = *rep;
393       grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
394                           GNUNET_ntohll (rep->message_id),
395                           GNUNET_ntohll (rep->fragment_offset),
396                           GNUNET_ntohll (rep->flags), rh);
397     }
398   }
399
400   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
401 }
402
403
404 /**
405  * Check replay response.
406  */
407 static int
408 check_member_replay_response (void *cls,
409                               const struct MulticastReplayResponseMessage *res)
410 {
411   uint16_t size = ntohs (res->header.size);
412
413   if (sizeof (*res) == size)
414     return GNUNET_OK;
415
416   if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
417     return GNUNET_OK;
418
419   return GNUNET_SYSERR;
420 }
421
422
423 /**
424  * Receive replay response from service.
425  */
426 static void
427 handle_member_replay_response (void *cls,
428                                const struct MulticastReplayResponseMessage *res)
429 {
430   struct GNUNET_MULTICAST_Group *grp;
431   struct GNUNET_MULTICAST_Member *mem = cls;
432   grp = &mem->grp;
433
434   if (GNUNET_YES == grp->is_disconnecting)
435     return;
436
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
438
439   // FIXME: return result
440 }
441
442
443 /**
444  * Check join decision.
445  */
446 static int
447 check_member_join_decision (void *cls,
448                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
449 {
450   return GNUNET_OK; // checked in handle below
451 }
452
453
454 /**
455  * Member receives join decision.
456  */
457 static void
458 handle_member_join_decision (void *cls,
459                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
460 {
461   struct GNUNET_MULTICAST_Group *grp;
462   struct GNUNET_MULTICAST_Member *mem = cls;
463   grp = &mem->grp;
464
465   const struct MulticastJoinDecisionMessage *
466     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
467
468   uint16_t dcsn_size = ntohs (dcsn->header.size);
469   int is_admitted = ntohl (dcsn->is_admitted);
470
471   LOG (GNUNET_ERROR_TYPE_DEBUG,
472        "%p Member got join decision from multicast: %d\n",
473        mem, is_admitted);
474
475   const struct GNUNET_MessageHeader *join_resp = NULL;
476   uint16_t join_resp_size = 0;
477
478   uint16_t relay_count = ntohl (dcsn->relay_count);
479   const struct GNUNET_PeerIdentity *relays = NULL;
480   uint16_t relay_size = relay_count * sizeof (*relays);
481   if (0 < relay_count)
482   {
483     if (dcsn_size < sizeof (*dcsn) + relay_size)
484     {
485       GNUNET_break_op (0);
486       is_admitted = GNUNET_SYSERR;
487     }
488     else
489     {
490       relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
491     }
492   }
493
494   if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
495   {
496     join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
497     join_resp_size = ntohs (join_resp->size);
498   }
499   if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
500   {
501     LOG (GNUNET_ERROR_TYPE_DEBUG,
502          "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
503          dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
504     GNUNET_break_op (0);
505     is_admitted = GNUNET_SYSERR;
506   }
507
508   if (NULL != mem->join_dcsn_cb)
509     mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
510                        relay_count, relays, join_resp);
511
512   // FIXME:
513   //if (GNUNET_YES != is_admitted)
514   //  GNUNET_MULTICAST_member_part (mem);
515
516   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
517 }
518
519
520 static void
521 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
522 {
523   if (NULL != grp->connect_env)
524   {
525     GNUNET_MQ_discard (grp->connect_env);
526     grp->connect_env = NULL;
527   }
528   if (NULL != grp->mq)
529   {
530     GNUNET_MQ_destroy (grp->mq);
531     grp->mq = NULL;
532   }
533   if (NULL != grp->disconnect_cb)
534   {
535     grp->disconnect_cb (grp->disconnect_cls);
536     grp->disconnect_cb = NULL;
537   }
538   GNUNET_free (grp);
539 }
540
541
542 static void
543 handle_group_part_ack (void *cls,
544                        const struct GNUNET_MessageHeader *msg)
545 {
546   struct GNUNET_MULTICAST_Group *grp = cls;
547
548   group_cleanup (grp);
549 }
550
551
552 /**
553  * Function to call with the decision made for a join request.
554  *
555  * Must be called once and only once in response to an invocation of the
556  * #GNUNET_MULTICAST_JoinRequestCallback.
557  *
558  * @param join
559  *        Join request handle.
560  * @param is_admitted
561  *        #GNUNET_YES    if the join is approved,
562  *        #GNUNET_NO     if it is disapproved,
563  *        #GNUNET_SYSERR if we cannot answer the request.
564  * @param relay_count
565  *        Number of relays given.
566  * @param relays
567  *        Array of suggested peers that might be useful relays to use
568  *        when joining the multicast group (essentially a list of peers that
569  *        are already part of the multicast group and might thus be willing
570  *        to help with routing).  If empty, only this local peer (which must
571  *        be the multicast origin) is a good candidate for building the
572  *        multicast tree.  Note that it is unnecessary to specify our own
573  *        peer identity in this array.
574  * @param join_resp
575  *        Message to send in response to the joining peer;
576  *        can also be used to redirect the peer to a different group at the
577  *        application layer; this response is to be transmitted to the
578  *        peer that issued the request even if admission is denied.
579  */
580 struct GNUNET_MULTICAST_ReplayHandle *
581 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
582                                 int is_admitted,
583                                 uint16_t relay_count,
584                                 const struct GNUNET_PeerIdentity *relays,
585                                 const struct GNUNET_MessageHeader *join_resp)
586 {
587   struct GNUNET_MULTICAST_Group *grp = join->group;
588   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
589   uint16_t relay_size = relay_count * sizeof (*relays);
590
591   struct MulticastJoinDecisionMessageHeader *hdcsn;
592   struct MulticastJoinDecisionMessage *dcsn;
593   struct GNUNET_MQ_Envelope *
594     env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
595                                GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
596   hdcsn->member_pub_key = join->member_pub_key;
597   hdcsn->peer = join->peer;
598
599   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
600   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
601   dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
602   dcsn->is_admitted = htonl (is_admitted);
603   dcsn->relay_count = htonl (relay_count);
604   if (0 < relay_size)
605     GNUNET_memcpy (&dcsn[1], relays, relay_size);
606   if (0 < join_resp_size)
607     GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
608
609   GNUNET_MQ_send (grp->mq, env);
610   GNUNET_free (join);
611   return NULL;
612 }
613
614
615 /**
616  * Replay a message fragment for the multicast group.
617  *
618  * @param rh
619  *        Replay handle identifying which replay operation was requested.
620  * @param msg
621  *        Replayed message fragment, NULL if not found / an error occurred.
622  * @param ec
623  *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
624  *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
625  */
626 void
627 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
628                                   const struct GNUNET_MessageHeader *msg,
629                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
630 {
631   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
632   struct MulticastReplayResponseMessage *res;
633   struct GNUNET_MQ_Envelope *
634     env = GNUNET_MQ_msg_extra (res, msg_size,
635                                GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
636   res->fragment_id = rh->req.fragment_id;
637   res->message_id = rh->req.message_id;
638   res->fragment_offset = rh->req.fragment_offset;
639   res->flags = rh->req.flags;
640   res->error_code = htonl (ec);
641
642   if (GNUNET_MULTICAST_REC_OK == ec)
643   {
644     GNUNET_assert (NULL != msg);
645     GNUNET_memcpy (&res[1], msg, msg_size);
646   }
647
648   GNUNET_MQ_send (rh->grp->mq, env);
649
650   if (GNUNET_MULTICAST_REC_OK != ec)
651     GNUNET_free (rh);
652 }
653
654
655 /**
656  * Indicate the end of the replay session.
657  *
658  * Invalidates the replay handle.
659  *
660  * @param rh
661  *        Replay session to end.
662  */
663 void
664 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
665 {
666   struct MulticastReplayResponseMessage *end;
667   struct GNUNET_MQ_Envelope *
668     env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
669
670   end->fragment_id = rh->req.fragment_id;
671   end->message_id = rh->req.message_id;
672   end->fragment_offset = rh->req.fragment_offset;
673   end->flags = rh->req.flags;
674
675   GNUNET_MQ_send (rh->grp->mq, env);
676   GNUNET_free (rh);
677 }
678
679
680 /**
681  * Replay a message for the multicast group.
682  *
683  * @param rh
684  *        Replay handle identifying which replay operation was requested.
685  * @param notify
686  *        Function to call to get the message.
687  * @param notify_cls
688  *        Closure for @a notify.
689  */
690 void
691 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
692                                    GNUNET_MULTICAST_ReplayTransmitNotify notify,
693                                    void *notify_cls)
694 {
695 }
696
697
698 static void
699 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
700
701
702 static void
703 origin_reconnect (void *cls)
704 {
705   origin_connect (cls);
706 }
707
708
709 /**
710  * Origin client disconnected from service.
711  *
712  * Reconnect after backoff period.
713  */
714 static void
715 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
716 {
717   struct GNUNET_MULTICAST_Origin *orig = cls;
718   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
719
720   LOG (GNUNET_ERROR_TYPE_DEBUG,
721        "Origin client disconnected (%d), re-connecting\n",
722        (int) error);
723   if (NULL != grp->mq)
724   {
725     GNUNET_MQ_destroy (grp->mq);
726     grp->mq = NULL;
727   }
728
729   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
730                                                       origin_reconnect,
731                                                       orig);
732   grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
733 }
734
735
736 /**
737  * Connect to service as origin.
738  */
739 static void
740 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
741 {
742   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
743
744   struct GNUNET_MQ_MessageHandler handlers[] = {
745     GNUNET_MQ_hd_var_size (group_message,
746                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
747                            struct GNUNET_MULTICAST_MessageHeader,
748                            grp),
749     GNUNET_MQ_hd_var_size (origin_request,
750                            GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
751                            struct GNUNET_MULTICAST_RequestHeader,
752                            orig),
753     GNUNET_MQ_hd_fixed_size (group_fragment_ack,
754                              GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
755                              struct GNUNET_MessageHeader,
756                              grp),
757     GNUNET_MQ_hd_var_size (group_join_request,
758                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
759                            struct MulticastJoinRequestMessage,
760                            grp),
761     GNUNET_MQ_hd_fixed_size (group_part_ack,
762                              GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
763                              struct GNUNET_MessageHeader,
764                              grp),
765     GNUNET_MQ_hd_fixed_size (group_replay_request,
766                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
767                              struct MulticastReplayRequestMessage,
768                              grp),
769     GNUNET_MQ_handler_end ()
770   };
771
772   grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
773                                    handlers, origin_disconnected, orig);
774   GNUNET_assert (NULL != grp->mq);
775   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
776 }
777
778
779 /**
780  * Start a multicast group.
781  *
782  * Will advertise the origin in the P2P overlay network under the respective
783  * public key so that other peer can find this peer to join it.  Peers that
784  * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
785  * either an existing group member or to the origin.  If the joining is
786  * approved, the member is cleared for @e replay and will begin to receive
787  * messages transmitted to the group.  If joining is disapproved, the failed
788  * candidate will be given a response.  Members in the group can send messages
789  * to the origin (one at a time).
790  *
791  * @param cfg
792  *        Configuration to use.
793  * @param priv_key
794  *        ECC key that will be used to sign messages for this
795  *        multicast session; public key is used to identify the multicast group;
796  * @param max_fragment_id
797  *        Maximum fragment ID already sent to the group.
798  *        0 for a new group.
799  * @param join_request_cb
800  *        Function called to approve / disapprove joining of a peer.
801  * @param replay_frag_cb
802  *        Function that can be called to replay a message fragment.
803  * @param replay_msg_cb
804  *        Function that can be called to replay a message.
805  * @param request_cb
806  *        Function called with message fragments from group members.
807  * @param message_cb
808  *        Function called with the message fragments sent to the
809  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
810  *        should be stored for answering replay requests later.
811  * @param cls
812  *        Closure for the various callbacks that follow.
813  *
814  * @return Handle for the origin, NULL on error.
815  */
816 struct GNUNET_MULTICAST_Origin *
817 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
818                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
819                                uint64_t max_fragment_id,
820                                GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
821                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
822                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
823                                GNUNET_MULTICAST_RequestCallback request_cb,
824                                GNUNET_MULTICAST_MessageCallback message_cb,
825                                void *cls)
826 {
827   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
828   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
829
830   struct MulticastOriginStartMessage *start;
831   grp->connect_env = GNUNET_MQ_msg (start,
832                                     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
833   start->max_fragment_id = max_fragment_id;
834   start->group_key = *priv_key;
835
836   grp->cfg = cfg;
837   grp->is_origin = GNUNET_YES;
838   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
839
840   grp->cb_cls = cls;
841   grp->join_req_cb = join_request_cb;
842   grp->replay_frag_cb = replay_frag_cb;
843   grp->replay_msg_cb = replay_msg_cb;
844   grp->message_cb = message_cb;
845
846   orig->request_cb = request_cb;
847
848   origin_connect (orig);
849   return orig;
850 }
851
852
853 /**
854  * Stop a multicast group.
855  *
856  * @param origin
857  *        Multicast group to stop.
858  */
859 void
860 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
861                               GNUNET_ContinuationCallback stop_cb,
862                               void *stop_cls)
863 {
864   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
865   struct GNUNET_MQ_Envelope *env;
866
867   grp->is_disconnecting = GNUNET_YES;
868   grp->disconnect_cb = stop_cb;
869   grp->disconnect_cls = stop_cls;
870   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
871   GNUNET_MQ_send (grp->mq, env);
872 }
873
874
875 static void
876 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
877 {
878   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
879   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
880   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
881   GNUNET_assert (GNUNET_YES == grp->in_transmit);
882
883   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
884   struct GNUNET_MULTICAST_MessageHeader *msg;
885   struct GNUNET_MQ_Envelope *
886     env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
887                                GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
888
889   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
890
891   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
892       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
893   {
894     LOG (GNUNET_ERROR_TYPE_ERROR,
895          "%p OriginTransmitNotify() returned error or invalid message size.\n",
896          orig);
897     /* FIXME: handle error */
898     GNUNET_MQ_discard (env);
899     return;
900   }
901
902   if (GNUNET_NO == ret && 0 == buf_size)
903   {
904     LOG (GNUNET_ERROR_TYPE_DEBUG,
905          "%p OriginTransmitNotify() - transmission paused.\n", orig);
906     GNUNET_MQ_discard (env);
907     return; /* Transmission paused. */
908   }
909
910   msg->header.size = htons (sizeof (*msg) + buf_size);
911   msg->message_id = GNUNET_htonll (tmit->message_id);
912   msg->group_generation = tmit->group_generation;
913   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
914   tmit->fragment_offset += sizeof (*msg) + buf_size;
915
916   grp->acks_pending++;
917   GNUNET_MQ_send (grp->mq, env);
918
919   if (GNUNET_YES == ret)
920     grp->in_transmit = GNUNET_NO;
921 }
922
923
924 /**
925  * Send a message to the multicast group.
926  *
927  * @param orig
928  *        Handle to the multicast group.
929  * @param message_id
930  *        Application layer ID for the message.  Opaque to multicast.
931  * @param group_generation
932  *        Group generation of the message.
933  *        Documented in struct GNUNET_MULTICAST_MessageHeader.
934  * @param notify
935  *        Function to call to get the message.
936  * @param notify_cls
937  *        Closure for @a notify.
938  *
939  * @return Message handle on success,
940  *         NULL on error (i.e. another request is already pending).
941  */
942 struct GNUNET_MULTICAST_OriginTransmitHandle *
943 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
944                                 uint64_t message_id,
945                                 uint64_t group_generation,
946                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
947                                 void *notify_cls)
948 {
949   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
950   if (GNUNET_YES == grp->in_transmit)
951     return NULL;
952   grp->in_transmit = GNUNET_YES;
953
954   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
955   tmit->origin = orig;
956   tmit->message_id = message_id;
957   tmit->fragment_offset = 0;
958   tmit->group_generation = group_generation;
959   tmit->notify = notify;
960   tmit->notify_cls = notify_cls;
961
962   origin_to_all (orig);
963   return tmit;
964 }
965
966
967 /**
968  * Resume message transmission to multicast group.
969  *
970  * @param th
971  *        Transmission to cancel.
972  */
973 void
974 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
975 {
976   struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
977   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
978     return;
979   origin_to_all (th->origin);
980 }
981
982
983 /**
984  * Cancel request for message transmission to multicast group.
985  *
986  * @param th
987  *        Transmission to cancel.
988  */
989 void
990 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
991 {
992   th->origin->grp.in_transmit = GNUNET_NO;
993 }
994
995
996 static void
997 member_connect (struct GNUNET_MULTICAST_Member *mem);
998
999
1000 static void
1001 member_reconnect (void *cls)
1002 {
1003   member_connect (cls);
1004 }
1005
1006
1007 /**
1008  * Member client disconnected from service.
1009  *
1010  * Reconnect after backoff period.
1011  */
1012 static void
1013 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1014 {
1015   struct GNUNET_MULTICAST_Member *mem = cls;
1016   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1017
1018   LOG (GNUNET_ERROR_TYPE_DEBUG,
1019        "Member client disconnected (%d), re-connecting\n",
1020        (int) error);
1021   GNUNET_MQ_destroy (grp->mq);
1022   grp->mq = NULL;
1023
1024   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1025                                                       member_reconnect,
1026                                                       mem);
1027   grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1028 }
1029
1030
1031 /**
1032  * Connect to service as member.
1033  */
1034 static void
1035 member_connect (struct GNUNET_MULTICAST_Member *mem)
1036 {
1037   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1038
1039   struct GNUNET_MQ_MessageHandler handlers[] = {
1040     GNUNET_MQ_hd_var_size (group_message,
1041                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1042                            struct GNUNET_MULTICAST_MessageHeader,
1043                            grp),
1044     GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1045                              GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1046                              struct GNUNET_MessageHeader,
1047                              grp),
1048     GNUNET_MQ_hd_var_size (group_join_request,
1049                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1050                            struct MulticastJoinRequestMessage,
1051                            grp),
1052     GNUNET_MQ_hd_var_size (member_join_decision,
1053                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1054                            struct MulticastJoinDecisionMessageHeader,
1055                            mem),
1056     GNUNET_MQ_hd_fixed_size (group_part_ack,
1057                              GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
1058                              struct GNUNET_MessageHeader,
1059                              grp),
1060     GNUNET_MQ_hd_fixed_size (group_replay_request,
1061                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1062                              struct MulticastReplayRequestMessage,
1063                              grp),
1064     GNUNET_MQ_hd_var_size (member_replay_response,
1065                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1066                            struct MulticastReplayResponseMessage,
1067                            mem),
1068     GNUNET_MQ_handler_end ()
1069   };
1070
1071   grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
1072                                    handlers, member_disconnected, mem);
1073   GNUNET_assert (NULL != grp->mq);
1074   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1075 }
1076
1077
1078 /**
1079  * Join a multicast group.
1080  *
1081  * The entity joining is always the local peer.  Further information about the
1082  * candidate can be provided in the @a join_request message.  If the join fails, the
1083  * @a message_cb is invoked with a (failure) response and then with NULL.  If
1084  * the join succeeds, outstanding (state) messages and ongoing multicast
1085  * messages will be given to the @a message_cb until the member decides to part
1086  * the group.  The @a replay_cb function may be called at any time by the
1087  * multicast service to support relaying messages to other members of the group.
1088  *
1089  * @param cfg
1090  *        Configuration to use.
1091  * @param group_key
1092  *        ECC public key that identifies the group to join.
1093  * @param member_key
1094  *        ECC key that identifies the member
1095  *        and used to sign requests sent to the origin.
1096  * @param origin
1097  *        Peer ID of the origin to send unicast requsets to.  If NULL,
1098  *        unicast requests are sent back via multiple hops on the reverse path
1099  *        of multicast messages.
1100  * @param relay_count
1101  *        Number of peers in the @a relays array.
1102  * @param relays
1103  *        Peer identities of members of the group, which serve as relays
1104  *        and can be used to join the group at. and send the @a join_request to.
1105  *        If empty, the @a join_request is sent directly to the @a origin.
1106  * @param join_msg
1107  *        Application-dependent join message to be passed to the peer @a origin.
1108  * @param join_request_cb
1109  *        Function called to approve / disapprove joining of a peer.
1110  * @param join_decision_cb
1111  *        Function called to inform about the join decision.
1112  * @param replay_frag_cb
1113  *        Function that can be called to replay message fragments
1114  *        this peer already knows from this group. NULL if this
1115  *        client is unable to support replay.
1116  * @param replay_msg_cb
1117  *        Function that can be called to replay message fragments
1118  *        this peer already knows from this group. NULL if this
1119  *        client is unable to support replay.
1120  * @param message_cb
1121  *        Function to be called for all message fragments we
1122  *        receive from the group, excluding those our @a replay_cb
1123  *        already has.
1124  * @param cls
1125  *        Closure for callbacks.
1126  *
1127  * @return Handle for the member, NULL on error.
1128  */
1129 struct GNUNET_MULTICAST_Member *
1130 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1131                               const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1132                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1133                               const struct GNUNET_PeerIdentity *origin,
1134                               uint16_t relay_count,
1135                               const struct GNUNET_PeerIdentity *relays,
1136                               const struct GNUNET_MessageHeader *join_msg,
1137                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1138                               GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1139                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1140                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1141                               GNUNET_MULTICAST_MessageCallback message_cb,
1142                               void *cls)
1143 {
1144   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1145   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1146
1147   uint16_t relay_size = relay_count * sizeof (*relays);
1148   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1149   struct MulticastMemberJoinMessage *join;
1150   grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1151                                           GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1152   join->group_pub_key = *group_pub_key;
1153   join->member_key = *member_key;
1154   join->origin = *origin;
1155   join->relay_count = ntohl (relay_count);
1156   if (0 < relay_size)
1157     GNUNET_memcpy (&join[1], relays, relay_size);
1158   if (0 < join_msg_size)
1159     GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1160
1161   grp->cfg = cfg;
1162   grp->is_origin = GNUNET_NO;
1163   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1164
1165   mem->join_dcsn_cb = join_decision_cb;
1166   grp->join_req_cb = join_request_cb;
1167   grp->replay_frag_cb = replay_frag_cb;
1168   grp->replay_msg_cb = replay_msg_cb;
1169   grp->message_cb = message_cb;
1170   grp->cb_cls = cls;
1171
1172   member_connect (mem);
1173   return mem;
1174 }
1175
1176
1177 /**
1178  * Part a multicast group.
1179  *
1180  * Disconnects from all group members and invalidates the @a member handle.
1181  *
1182  * An application-dependent part message can be transmitted beforehand using
1183  * #GNUNET_MULTICAST_member_to_origin())
1184  *
1185  * @param member
1186  *        Membership handle.
1187  */
1188 void
1189 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1190                               GNUNET_ContinuationCallback part_cb,
1191                               void *part_cls)
1192 {
1193   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1194   struct GNUNET_MQ_Envelope *env;
1195
1196   mem->join_dcsn_cb = NULL;
1197   grp->join_req_cb = NULL;
1198   grp->message_cb = NULL;
1199   grp->replay_msg_cb = NULL;
1200   grp->replay_frag_cb = NULL;
1201   grp->is_disconnecting = GNUNET_YES;
1202   grp->disconnect_cb = part_cb;
1203   grp->disconnect_cls = part_cls;
1204   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
1205   GNUNET_MQ_send (grp->mq, env);
1206 }
1207
1208
1209 void
1210 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1211                        uint64_t fragment_id,
1212                        uint64_t message_id,
1213                        uint64_t fragment_offset,
1214                        uint64_t flags)
1215 {
1216   struct MulticastReplayRequestMessage *rep;
1217   struct GNUNET_MQ_Envelope *
1218     env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1219
1220   rep->fragment_id = GNUNET_htonll (fragment_id);
1221   rep->message_id = GNUNET_htonll (message_id);
1222   rep->fragment_offset = GNUNET_htonll (fragment_offset);
1223   rep->flags = GNUNET_htonll (flags);
1224
1225   GNUNET_MQ_send (mem->grp.mq, env);
1226 }
1227
1228
1229 /**
1230  * Request a fragment to be replayed by fragment ID.
1231  *
1232  * Useful if messages below the @e max_known_fragment_id given when joining are
1233  * needed and not known to the client.
1234  *
1235  * @param member
1236  *        Membership handle.
1237  * @param fragment_id
1238  *        ID of a message fragment that this client would like to see replayed.
1239  * @param flags
1240  *        Additional flags for the replay request.
1241  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1242  *
1243  * @return Replay request handle.
1244  */
1245 struct GNUNET_MULTICAST_MemberReplayHandle *
1246 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1247                                          uint64_t fragment_id,
1248                                          uint64_t flags)
1249 {
1250   member_replay_request (mem, fragment_id, 0, 0, flags);
1251   // FIXME: return something useful
1252   return NULL;
1253 }
1254
1255
1256 /**
1257  * Request a message fragment to be replayed.
1258  *
1259  * Useful if messages below the @e max_known_fragment_id given when joining are
1260  * needed and not known to the client.
1261  *
1262  * @param member
1263  *        Membership handle.
1264  * @param message_id
1265  *        ID of the message this client would like to see replayed.
1266  * @param fragment_offset
1267  *        Offset of the fragment within the message to replay.
1268  * @param flags
1269  *        Additional flags for the replay request.
1270  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1271  *
1272  * @return Replay request handle, NULL on error.
1273  */
1274 struct GNUNET_MULTICAST_MemberReplayHandle *
1275 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1276                                         uint64_t message_id,
1277                                         uint64_t fragment_offset,
1278                                         uint64_t flags)
1279 {
1280   member_replay_request (mem, 0, message_id, fragment_offset, flags);
1281   // FIXME: return something useful
1282   return NULL;
1283 }
1284
1285
1286 static void
1287 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1288 {
1289   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1290   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1291   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1292   GNUNET_assert (GNUNET_YES == grp->in_transmit);
1293
1294   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1295   struct GNUNET_MULTICAST_RequestHeader *req;
1296   struct GNUNET_MQ_Envelope *
1297     env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1298                                GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1299
1300   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1301
1302   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1303       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1304   {
1305     LOG (GNUNET_ERROR_TYPE_ERROR,
1306          "MemberTransmitNotify() returned error or invalid message size. "
1307          "ret=%d, buf_size=%u\n", ret, buf_size);
1308     /* FIXME: handle error */
1309     GNUNET_MQ_discard (env);
1310     return;
1311   }
1312
1313   if (GNUNET_NO == ret && 0 == buf_size)
1314   {
1315     /* Transmission paused. */
1316     GNUNET_MQ_discard (env);
1317     return;
1318   }
1319
1320   req->header.size = htons (sizeof (*req) + buf_size);
1321   req->request_id = GNUNET_htonll (tmit->request_id);
1322   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1323   tmit->fragment_offset += sizeof (*req) + buf_size;
1324
1325   GNUNET_MQ_send (grp->mq, env);
1326
1327   if (GNUNET_YES == ret)
1328     grp->in_transmit = GNUNET_NO;
1329 }
1330
1331
1332 /**
1333  * Send a message to the origin of the multicast group.
1334  *
1335  * @param mem
1336  *        Membership handle.
1337  * @param request_id
1338  *        Application layer ID for the request.  Opaque to multicast.
1339  * @param notify
1340  *        Callback to call to get the message.
1341  * @param notify_cls
1342  *        Closure for @a notify.
1343  *
1344  * @return Handle to cancel request, NULL on error (i.e. request already pending).
1345  */
1346 struct GNUNET_MULTICAST_MemberTransmitHandle *
1347 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1348                                    uint64_t request_id,
1349                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
1350                                    void *notify_cls)
1351 {
1352   if (GNUNET_YES == mem->grp.in_transmit)
1353     return NULL;
1354   mem->grp.in_transmit = GNUNET_YES;
1355
1356   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1357   tmit->member = mem;
1358   tmit->request_id = request_id;
1359   tmit->fragment_offset = 0;
1360   tmit->notify = notify;
1361   tmit->notify_cls = notify_cls;
1362
1363   member_to_origin (mem);
1364   return tmit;
1365 }
1366
1367
1368 /**
1369  * Resume message transmission to origin.
1370  *
1371  * @param th
1372  *        Transmission to cancel.
1373  */
1374 void
1375 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1376 {
1377   struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1378   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1379     return;
1380   member_to_origin (th->member);
1381 }
1382
1383
1384 /**
1385  * Cancel request for message transmission to origin.
1386  *
1387  * @param th
1388  *        Transmission to cancel.
1389  */
1390 void
1391 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1392 {
1393   th->member->grp.in_transmit = GNUNET_NO;
1394 }
1395
1396
1397 /* end of multicast_api.c */