e390a621cb9e573575aeaf5c1c3e107d3d901c27
[oweals/gnunet.git] / src / multicast / multicast_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/multicast_api.c
23  * @brief Multicast service; implements multicast groups using CADET connections.
24  * @author Christian Grothoff
25  * @author Gabor X Toth
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_multicast_service.h"
31 #include "multicast.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
34
35
36 /**
37  * Handle for a request to send a message to all multicast group members
38  * (from the origin).
39  */
40 struct GNUNET_MULTICAST_OriginTransmitHandle
41 {
42   GNUNET_MULTICAST_OriginTransmitNotify notify;
43   void *notify_cls;
44   struct GNUNET_MULTICAST_Origin *origin;
45
46   uint64_t message_id;
47   uint64_t group_generation;
48   uint64_t fragment_offset;
49 };
50
51
52 /**
53  * Handle for a message to be delivered from a member to the origin.
54  */
55 struct GNUNET_MULTICAST_MemberTransmitHandle
56 {
57   GNUNET_MULTICAST_MemberTransmitNotify notify;
58   void *notify_cls;
59   struct GNUNET_MULTICAST_Member *member;
60
61   uint64_t request_id;
62   uint64_t fragment_offset;
63 };
64
65
66 struct GNUNET_MULTICAST_Group
67 {
68   /**
69    * Configuration to use.
70    */
71   const struct GNUNET_CONFIGURATION_Handle *cfg;
72
73   /**
74    * Client connection to the service.
75    */
76   struct GNUNET_CLIENT_MANAGER_Connection *client;
77
78   /**
79    * Message to send on reconnect.
80    */
81   struct GNUNET_MessageHeader *connect_msg;
82
83   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
84   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
85   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
86   GNUNET_MULTICAST_MessageCallback message_cb;
87   void *cb_cls;
88
89   /**
90    * Function called after disconnected from the service.
91    */
92   GNUNET_ContinuationCallback disconnect_cb;
93
94   /**
95    * Closure for @a disconnect_cb.
96    */
97   void *disconnect_cls;
98
99   /**
100    * Are we currently transmitting a message?
101    */
102   uint8_t in_transmit;
103
104   /**
105    * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
106    */
107   uint8_t acks_pending;
108
109   /**
110    * Is this the origin or a member?
111    */
112   uint8_t is_origin;
113
114   /**
115    * Is this channel in the process of disconnecting from the service?
116    * #GNUNET_YES or #GNUNET_NO
117    */
118   uint8_t is_disconnecting;
119 };
120
121
122 /**
123  * Handle for the origin of a multicast group.
124  */
125 struct GNUNET_MULTICAST_Origin
126 {
127   struct GNUNET_MULTICAST_Group grp;
128   struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
129
130   GNUNET_MULTICAST_RequestCallback request_cb;
131 };
132
133
134 /**
135  * Handle for a multicast group member.
136  */
137 struct GNUNET_MULTICAST_Member
138 {
139   struct GNUNET_MULTICAST_Group grp;
140   struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
141
142   GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
143
144   /**
145    * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
146    */
147   struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
148
149   uint64_t next_fragment_id;
150 };
151
152
153 /**
154  * Handle that identifies a join request.
155  *
156  * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
157  * corresponding calls to #GNUNET_MULTICAST_join_decision().
158  */
159 struct GNUNET_MULTICAST_JoinHandle
160 {
161   struct GNUNET_MULTICAST_Group *group;
162
163   /**
164    * Public key of the member requesting join.
165    */
166   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
167
168   /**
169    * Peer identity of the member requesting join.
170    */
171   struct GNUNET_PeerIdentity peer;
172 };
173
174
175 /**
176  * Opaque handle to a replay request from the multicast service.
177  */
178 struct GNUNET_MULTICAST_ReplayHandle
179 {
180   struct GNUNET_MULTICAST_Group *grp;
181   struct MulticastReplayRequestMessage req;
182 };
183
184
185 /**
186  * Handle for a replay request.
187  */
188 struct GNUNET_MULTICAST_MemberReplayHandle
189 {
190 };
191
192
193 static void
194 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
195
196 static void
197 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
198
199
200 /**
201  * Send first message to the service after connecting.
202  */
203 static void
204 group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
205 {
206   uint16_t cmsg_size = ntohs (grp->connect_msg->size);
207   struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
208   GNUNET_memcpy (cmsg, grp->connect_msg, cmsg_size);
209   GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
210   GNUNET_free (cmsg);
211 }
212
213
214 /**
215  * Got disconnected from service.  Reconnect.
216  */
217 static void
218 group_recv_disconnect (void *cls,
219                         struct GNUNET_CLIENT_MANAGER_Connection *client,
220                         const struct GNUNET_MessageHeader *msg)
221 {
222   struct GNUNET_MULTICAST_Group *
223     grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
224   GNUNET_CLIENT_MANAGER_reconnect (client);
225   group_send_connect_msg (grp);
226 }
227
228
229 /**
230  * Receive join request from service.
231  */
232 static void
233 group_recv_join_request (void *cls,
234                           struct GNUNET_CLIENT_MANAGER_Connection *client,
235                           const struct GNUNET_MessageHeader *msg)
236 {
237   struct GNUNET_MULTICAST_Group *grp;
238   const struct MulticastJoinRequestMessage *jreq;
239   struct GNUNET_MULTICAST_JoinHandle *jh;
240   const struct GNUNET_MessageHeader *jmsg;
241
242   grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
243   if (NULL == grp)
244   {
245     GNUNET_break (0);
246     return;
247   }
248   if (NULL == grp->join_req_cb)
249     return;
250   /* FIXME: this fails to check that 'msg' is well-formed! */
251   jreq = (const struct MulticastJoinRequestMessage *) msg;
252   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
253     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
254   else
255     jmsg = NULL;
256   jh = GNUNET_malloc (sizeof (*jh));
257   jh->group = grp;
258   jh->member_pub_key = jreq->member_pub_key;
259   jh->peer = jreq->peer;
260   grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
261 }
262
263
264 /**
265  * Receive multicast message from service.
266  */
267 static void
268 group_recv_message (void *cls,
269                     struct GNUNET_CLIENT_MANAGER_Connection *client,
270                     const struct GNUNET_MessageHeader *msg)
271 {
272   struct GNUNET_MULTICAST_Group *
273     grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
274   struct GNUNET_MULTICAST_MessageHeader *
275     mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
276
277   if (GNUNET_YES == grp->is_disconnecting)
278     return;
279
280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
281               "Calling message callback with a message of size %u.\n",
282               ntohs (mmsg->header.size));
283
284   if (NULL != grp->message_cb)
285     grp->message_cb (grp->cb_cls, mmsg);
286 }
287
288
289 /**
290  * Receive message/request fragment acknowledgement from service.
291  */
292 static void
293 group_recv_fragment_ack (void *cls,
294                          struct GNUNET_CLIENT_MANAGER_Connection *client,
295                          const struct GNUNET_MessageHeader *msg)
296 {
297   struct GNUNET_MULTICAST_Group *
298     grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
299
300   LOG (GNUNET_ERROR_TYPE_DEBUG,
301        "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
302        grp, grp->in_transmit, grp->acks_pending);
303
304   if (0 == grp->acks_pending)
305   {
306     LOG (GNUNET_ERROR_TYPE_DEBUG,
307          "%p Ignoring extraneous fragment ACK.\n", grp);
308     return;
309   }
310   grp->acks_pending--;
311
312   if (GNUNET_YES != grp->in_transmit)
313     return;
314
315   if (GNUNET_YES == grp->is_origin)
316     origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
317   else
318     member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
319 }
320
321 /**
322  * Origin receives uniquest request from a member.
323  */
324 static void
325 origin_recv_request (void *cls,
326                      struct GNUNET_CLIENT_MANAGER_Connection *client,
327                      const struct GNUNET_MessageHeader *msg)
328 {
329   struct GNUNET_MULTICAST_Group *grp;
330   struct GNUNET_MULTICAST_Origin *
331     orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
332   grp = &orig->grp;
333   struct GNUNET_MULTICAST_RequestHeader *
334     req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
335
336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337               "Calling request callback with a request of size %u.\n",
338               ntohs (req->header.size));
339
340   if (NULL != orig->request_cb)
341     orig->request_cb (grp->cb_cls, req);
342 }
343
344
345 /**
346  * Receive multicast replay request from service.
347  */
348 static void
349 group_recv_replay_request (void *cls,
350                            struct GNUNET_CLIENT_MANAGER_Connection *client,
351                            const struct GNUNET_MessageHeader *msg)
352 {
353   struct GNUNET_MULTICAST_Group *
354     grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
355   struct MulticastReplayRequestMessage *
356     rep = (struct MulticastReplayRequestMessage *) msg;
357
358   if (GNUNET_YES == grp->is_disconnecting)
359     return;
360
361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
362
363   if (0 != rep->fragment_id)
364   {
365     if (NULL != grp->replay_frag_cb)
366     {
367       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
368       rh->grp = grp;
369       rh->req = *rep;
370       grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
371                            GNUNET_ntohll (rep->fragment_id),
372                            GNUNET_ntohll (rep->flags), rh);
373     }
374   }
375   else if (0 != rep->message_id)
376   {
377     if (NULL != grp->replay_msg_cb)
378     {
379       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
380       rh->grp = grp;
381       rh->req = *rep;
382       grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
383                           GNUNET_ntohll (rep->message_id),
384                           GNUNET_ntohll (rep->fragment_offset),
385                           GNUNET_ntohll (rep->flags), rh);
386     }
387   }
388 }
389
390
391 /**
392  * Receive multicast replay request from service.
393  */
394 static void
395 member_recv_replay_response (void *cls,
396                             struct GNUNET_CLIENT_MANAGER_Connection *client,
397                             const struct GNUNET_MessageHeader *msg)
398 {
399   struct GNUNET_MULTICAST_Group *grp;
400   struct GNUNET_MULTICAST_Member *
401     mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
402   grp = &mem->grp;
403   // FIXME: Something is missing here for the code to make sense
404   //struct MulticastReplayResponseMessage *
405   //  res = (struct MulticastReplayResponseMessage *) msg;
406   if (GNUNET_YES == grp->is_disconnecting)
407     return;
408
409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
410 }
411
412 /**
413  * Member receives join decision.
414  */
415 static void
416 member_recv_join_decision (void *cls,
417                            struct GNUNET_CLIENT_MANAGER_Connection *client,
418                            const struct GNUNET_MessageHeader *msg)
419 {
420   struct GNUNET_MULTICAST_Group *grp;
421   struct GNUNET_MULTICAST_Member *
422     mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
423   grp = &mem->grp;
424
425   const struct MulticastJoinDecisionMessageHeader *
426     hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
427   const struct MulticastJoinDecisionMessage *
428     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
429
430   uint16_t dcsn_size = ntohs (dcsn->header.size);
431   int is_admitted = ntohl (dcsn->is_admitted);
432
433   LOG (GNUNET_ERROR_TYPE_DEBUG,
434        "%p Member got join decision from multicast: %d\n",
435        mem, is_admitted);
436
437   const struct GNUNET_MessageHeader *join_resp = NULL;
438   uint16_t join_resp_size = 0;
439
440   uint16_t relay_count = ntohl (dcsn->relay_count);
441   const struct GNUNET_PeerIdentity *relays = NULL;
442   uint16_t relay_size = relay_count * sizeof (*relays);
443   if (0 < relay_count)
444   {
445     if (dcsn_size < sizeof (*dcsn) + relay_size)
446     {
447       GNUNET_break_op (0);
448       is_admitted = GNUNET_SYSERR;
449     }
450     else
451     {
452       relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
453     }
454   }
455
456   if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
457   {
458     join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
459     join_resp_size = ntohs (join_resp->size);
460   }
461   if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
462   {
463     LOG (GNUNET_ERROR_TYPE_DEBUG,
464          "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
465          dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
466     GNUNET_break_op (0);
467     is_admitted = GNUNET_SYSERR;
468   }
469
470   if (NULL != mem->join_dcsn_cb)
471     mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
472                        relay_count, relays, join_resp);
473
474   // FIXME:
475   //if (GNUNET_YES != is_admitted)
476   //  GNUNET_MULTICAST_member_part (mem);
477 }
478
479
480 /**
481  * Message handlers for an origin.
482  */
483 static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
484 {
485   { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
486
487   { group_recv_message, NULL,
488     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
489     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
490
491   { origin_recv_request, NULL,
492     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
493     sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
494
495   { group_recv_fragment_ack, NULL,
496     GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
497     sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
498
499   { group_recv_join_request, NULL,
500     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
501     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
502
503   { group_recv_replay_request, NULL,
504     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
505     sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
506
507   { NULL, NULL, 0, 0, GNUNET_NO }
508 };
509
510
511 /**
512  * Message handlers for a member.
513  */
514 static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
515 {
516   { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
517
518   { group_recv_message, NULL,
519     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
520     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
521
522   { group_recv_fragment_ack, NULL,
523     GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
524     sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
525
526   { group_recv_join_request, NULL,
527     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
528     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
529
530   { member_recv_join_decision, NULL,
531     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
532     sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
533
534   { group_recv_replay_request, NULL,
535     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
536     sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
537
538   { member_recv_replay_response, NULL,
539     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
540     sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
541
542   { NULL, NULL, 0, 0, GNUNET_NO }
543 };
544
545
546 static void
547 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
548 {
549   GNUNET_free (grp->connect_msg);
550   if (NULL != grp->disconnect_cb)
551     grp->disconnect_cb (grp->disconnect_cls);
552 }
553
554
555 static void
556 origin_cleanup (void *cls)
557 {
558   struct GNUNET_MULTICAST_Origin *orig = cls;
559   group_cleanup (&orig->grp);
560   GNUNET_free (orig);
561 }
562
563
564 static void
565 member_cleanup (void *cls)
566 {
567   struct GNUNET_MULTICAST_Member *mem = cls;
568   group_cleanup (&mem->grp);
569   GNUNET_free (mem);
570 }
571
572
573 /**
574  * Function to call with the decision made for a join request.
575  *
576  * Must be called once and only once in response to an invocation of the
577  * #GNUNET_MULTICAST_JoinRequestCallback.
578  *
579  * @param join
580  *        Join request handle.
581  * @param is_admitted
582  *        #GNUNET_YES    if the join is approved,
583  *        #GNUNET_NO     if it is disapproved,
584  *        #GNUNET_SYSERR if we cannot answer the request.
585  * @param relay_count
586  *        Number of relays given.
587  * @param relays
588  *        Array of suggested peers that might be useful relays to use
589  *        when joining the multicast group (essentially a list of peers that
590  *        are already part of the multicast group and might thus be willing
591  *        to help with routing).  If empty, only this local peer (which must
592  *        be the multicast origin) is a good candidate for building the
593  *        multicast tree.  Note that it is unnecessary to specify our own
594  *        peer identity in this array.
595  * @param join_resp
596  *        Message to send in response to the joining peer;
597  *        can also be used to redirect the peer to a different group at the
598  *        application layer; this response is to be transmitted to the
599  *        peer that issued the request even if admission is denied.
600  */
601 struct GNUNET_MULTICAST_ReplayHandle *
602 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
603                                 int is_admitted,
604                                 uint16_t relay_count,
605                                 const struct GNUNET_PeerIdentity *relays,
606                                 const struct GNUNET_MessageHeader *join_resp)
607 {
608   struct GNUNET_MULTICAST_Group *grp = join->group;
609   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
610   uint16_t relay_size = relay_count * sizeof (*relays);
611
612   struct MulticastJoinDecisionMessageHeader * hdcsn;
613   struct MulticastJoinDecisionMessage *dcsn;
614   hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
615                          + relay_size + join_resp_size);
616   hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
617                               + relay_size + join_resp_size);
618   hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
619   hdcsn->member_pub_key = join->member_pub_key;
620   hdcsn->peer = join->peer;
621
622   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
623   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
624   dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
625   dcsn->is_admitted = htonl (is_admitted);
626   dcsn->relay_count = htonl (relay_count);
627   if (0 < relay_size)
628     GNUNET_memcpy (&dcsn[1], relays, relay_size);
629   if (0 < join_resp_size)
630     GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
631
632   GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
633   GNUNET_free (hdcsn);
634   GNUNET_free (join);
635   return NULL;
636 }
637
638
639 /**
640  * Replay a message fragment for the multicast group.
641  *
642  * @param rh
643  *        Replay handle identifying which replay operation was requested.
644  * @param msg
645  *        Replayed message fragment, NULL if not found / an error occurred.
646  * @param ec
647  *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
648  *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
649  */
650 void
651 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
652                                   const struct GNUNET_MessageHeader *msg,
653                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
654 {
655   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
656   struct MulticastReplayResponseMessage *
657     res = GNUNET_malloc (sizeof (*res) + msg_size);
658   *res = (struct MulticastReplayResponseMessage) {
659     .header = {
660       .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
661       .size = htons (sizeof (*res) + msg_size),
662     },
663     .fragment_id = rh->req.fragment_id,
664     .message_id = rh->req.message_id,
665     .fragment_offset = rh->req.fragment_offset,
666     .flags = rh->req.flags,
667     .error_code = htonl (ec),
668   };
669
670   if (GNUNET_MULTICAST_REC_OK == ec)
671   {
672     GNUNET_assert (NULL != msg);
673     GNUNET_memcpy (&res[1], msg, msg_size);
674   }
675
676   GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
677   GNUNET_free (res);
678
679   if (GNUNET_MULTICAST_REC_OK != ec)
680     GNUNET_free (rh);
681 }
682
683
684 /**
685  * Indicate the end of the replay session.
686  *
687  * Invalidates the replay handle.
688  *
689  * @param rh
690  *        Replay session to end.
691  */
692 void
693 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
694 {
695   struct MulticastReplayResponseMessage end = {
696     .header = {
697       .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
698       .size = htons (sizeof (end)),
699     },
700     .fragment_id = rh->req.fragment_id,
701     .message_id = rh->req.message_id,
702     .fragment_offset = rh->req.fragment_offset,
703     .flags = rh->req.flags,
704   };
705
706   GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
707   GNUNET_free (rh);
708 }
709
710
711 /**
712  * Replay a message for the multicast group.
713  *
714  * @param rh
715  *        Replay handle identifying which replay operation was requested.
716  * @param notify
717  *        Function to call to get the message.
718  * @param notify_cls
719  *        Closure for @a notify.
720  */
721 void
722 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
723                                    GNUNET_MULTICAST_ReplayTransmitNotify notify,
724                                    void *notify_cls)
725 {
726 }
727
728
729 /**
730  * Start a multicast group.
731  *
732  * Will advertise the origin in the P2P overlay network under the respective
733  * public key so that other peer can find this peer to join it.  Peers that
734  * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
735  * either an existing group member or to the origin.  If the joining is
736  * approved, the member is cleared for @e replay and will begin to receive
737  * messages transmitted to the group.  If joining is disapproved, the failed
738  * candidate will be given a response.  Members in the group can send messages
739  * to the origin (one at a time).
740  *
741  * @param cfg
742  *        Configuration to use.
743  * @param priv_key
744  *        ECC key that will be used to sign messages for this
745  *        multicast session; public key is used to identify the multicast group;
746  * @param max_fragment_id
747  *        Maximum fragment ID already sent to the group.
748  *        0 for a new group.
749  * @param join_request_cb
750  *        Function called to approve / disapprove joining of a peer.
751  * @param replay_frag_cb
752  *        Function that can be called to replay a message fragment.
753  * @param replay_msg_cb
754  *        Function that can be called to replay a message.
755  * @param request_cb
756  *        Function called with message fragments from group members.
757  * @param message_cb
758  *        Function called with the message fragments sent to the
759  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
760  *        should be stored for answering replay requests later.
761  * @param cls
762  *        Closure for the various callbacks that follow.
763  *
764  * @return Handle for the origin, NULL on error.
765  */
766 struct GNUNET_MULTICAST_Origin *
767 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
768                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
769                                uint64_t max_fragment_id,
770                                GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
771                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
772                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
773                                GNUNET_MULTICAST_RequestCallback request_cb,
774                                GNUNET_MULTICAST_MessageCallback message_cb,
775                                void *cls)
776 {
777   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
778   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
779   struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
780
781   start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
782   start->header.size = htons (sizeof (*start));
783   start->max_fragment_id = max_fragment_id;
784   GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
785
786   grp->connect_msg = (struct GNUNET_MessageHeader *) start;
787   grp->is_origin = GNUNET_YES;
788   grp->cfg = cfg;
789
790   grp->cb_cls = cls;
791   grp->join_req_cb = join_request_cb;
792   grp->replay_frag_cb = replay_frag_cb;
793   grp->replay_msg_cb = replay_msg_cb;
794   grp->message_cb = message_cb;
795
796   orig->request_cb = request_cb;
797
798   grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers);
799   GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
800   group_send_connect_msg (grp);
801
802   return orig;
803 }
804
805
806 /**
807  * Stop a multicast group.
808  *
809  * @param origin
810  *        Multicast group to stop.
811  */
812 void
813 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
814                               GNUNET_ContinuationCallback stop_cb,
815                               void *stop_cls)
816 {
817   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
818
819   grp->is_disconnecting = GNUNET_YES;
820   grp->disconnect_cb = stop_cb;
821   grp->disconnect_cls = stop_cls;
822
823   GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES,
824                                     &origin_cleanup, orig);
825 }
826
827
828 static void
829 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
830 {
831   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
832   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
833   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
834   GNUNET_assert (GNUNET_YES == grp->in_transmit);
835
836   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
837   struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
838   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
839
840   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
841       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
842   {
843     LOG (GNUNET_ERROR_TYPE_ERROR,
844          "%p OriginTransmitNotify() returned error or invalid message size.\n",
845          orig);
846     /* FIXME: handle error */
847     GNUNET_free (msg);
848     return;
849   }
850
851   if (GNUNET_NO == ret && 0 == buf_size)
852   {
853     LOG (GNUNET_ERROR_TYPE_DEBUG,
854          "%p OriginTransmitNotify() - transmission paused.\n", orig);
855     GNUNET_free (msg);
856     return; /* Transmission paused. */
857   }
858
859   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
860   msg->header.size = htons (sizeof (*msg) + buf_size);
861   msg->message_id = GNUNET_htonll (tmit->message_id);
862   msg->group_generation = tmit->group_generation;
863   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
864   tmit->fragment_offset += sizeof (*msg) + buf_size;
865
866   grp->acks_pending++;
867   GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
868   GNUNET_free (msg);
869
870   if (GNUNET_YES == ret)
871     grp->in_transmit = GNUNET_NO;
872 }
873
874
875 /**
876  * Send a message to the multicast group.
877  *
878  * @param orig
879  *        Handle to the multicast group.
880  * @param message_id
881  *        Application layer ID for the message.  Opaque to multicast.
882  * @param group_generation
883  *        Group generation of the message.
884  *        Documented in struct GNUNET_MULTICAST_MessageHeader.
885  * @param notify
886  *        Function to call to get the message.
887  * @param notify_cls
888  *        Closure for @a notify.
889  *
890  * @return Message handle on success,
891  *         NULL on error (i.e. another request is already pending).
892  */
893 struct GNUNET_MULTICAST_OriginTransmitHandle *
894 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
895                                 uint64_t message_id,
896                                 uint64_t group_generation,
897                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
898                                 void *notify_cls)
899 {
900   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
901   if (GNUNET_YES == grp->in_transmit)
902     return NULL;
903   grp->in_transmit = GNUNET_YES;
904
905   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
906   tmit->origin = orig;
907   tmit->message_id = message_id;
908   tmit->fragment_offset = 0;
909   tmit->group_generation = group_generation;
910   tmit->notify = notify;
911   tmit->notify_cls = notify_cls;
912
913   origin_to_all (orig);
914   return tmit;
915 }
916
917
918 /**
919  * Resume message transmission to multicast group.
920  *
921  * @param th
922  *        Transmission to cancel.
923  */
924 void
925 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
926 {
927   struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
928   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
929     return;
930   origin_to_all (th->origin);
931 }
932
933
934 /**
935  * Cancel request for message transmission to multicast group.
936  *
937  * @param th
938  *        Transmission to cancel.
939  */
940 void
941 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
942 {
943   th->origin->grp.in_transmit = GNUNET_NO;
944 }
945
946
947 /**
948  * Join a multicast group.
949  *
950  * The entity joining is always the local peer.  Further information about the
951  * candidate can be provided in the @a join_request message.  If the join fails, the
952  * @a message_cb is invoked with a (failure) response and then with NULL.  If
953  * the join succeeds, outstanding (state) messages and ongoing multicast
954  * messages will be given to the @a message_cb until the member decides to part
955  * the group.  The @a replay_cb function may be called at any time by the
956  * multicast service to support relaying messages to other members of the group.
957  *
958  * @param cfg
959  *        Configuration to use.
960  * @param group_key
961  *        ECC public key that identifies the group to join.
962  * @param member_key
963  *        ECC key that identifies the member
964  *        and used to sign requests sent to the origin.
965  * @param origin
966  *        Peer ID of the origin to send unicast requsets to.  If NULL,
967  *        unicast requests are sent back via multiple hops on the reverse path
968  *        of multicast messages.
969  * @param relay_count
970  *        Number of peers in the @a relays array.
971  * @param relays
972  *        Peer identities of members of the group, which serve as relays
973  *        and can be used to join the group at. and send the @a join_request to.
974  *        If empty, the @a join_request is sent directly to the @a origin.
975  * @param join_msg
976  *        Application-dependent join message to be passed to the peer @a origin.
977  * @param join_request_cb
978  *        Function called to approve / disapprove joining of a peer.
979  * @param join_decision_cb
980  *        Function called to inform about the join decision.
981  * @param replay_frag_cb
982  *        Function that can be called to replay message fragments
983  *        this peer already knows from this group. NULL if this
984  *        client is unable to support replay.
985  * @param replay_msg_cb
986  *        Function that can be called to replay message fragments
987  *        this peer already knows from this group. NULL if this
988  *        client is unable to support replay.
989  * @param message_cb
990  *        Function to be called for all message fragments we
991  *        receive from the group, excluding those our @a replay_cb
992  *        already has.
993  * @param cls
994  *        Closure for callbacks.
995  *
996  * @return Handle for the member, NULL on error.
997  */
998 struct GNUNET_MULTICAST_Member *
999 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1000                               const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1001                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1002                               const struct GNUNET_PeerIdentity *origin,
1003                               uint16_t relay_count,
1004                               const struct GNUNET_PeerIdentity *relays,
1005                               const struct GNUNET_MessageHeader *join_msg,
1006                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1007                               GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1008                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1009                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1010                               GNUNET_MULTICAST_MessageCallback message_cb,
1011                               void *cls)
1012 {
1013   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1014   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1015
1016   uint16_t relay_size = relay_count * sizeof (*relays);
1017   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1018   struct MulticastMemberJoinMessage *
1019     join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
1020   join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
1021   join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1022   join->group_pub_key = *group_pub_key;
1023   join->member_key = *member_key;
1024   join->origin = *origin;
1025   join->relay_count = ntohl (relay_count);
1026   if (0 < relay_size)
1027     GNUNET_memcpy (&join[1], relays, relay_size);
1028   if (0 < join_msg_size)
1029     GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1030
1031   grp->connect_msg = (struct GNUNET_MessageHeader *) join;
1032   grp->is_origin = GNUNET_NO;
1033   grp->cfg = cfg;
1034
1035   mem->join_dcsn_cb = join_decision_cb;
1036   grp->join_req_cb = join_request_cb;
1037   grp->replay_frag_cb = replay_frag_cb;
1038   grp->replay_msg_cb = replay_msg_cb;
1039   grp->message_cb = message_cb;
1040   grp->cb_cls = cls;
1041
1042   grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers);
1043   GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
1044   group_send_connect_msg (grp);
1045
1046   return mem;
1047 }
1048
1049
1050 /**
1051  * Part a multicast group.
1052  *
1053  * Disconnects from all group members and invalidates the @a member handle.
1054  *
1055  * An application-dependent part message can be transmitted beforehand using
1056  * #GNUNET_MULTICAST_member_to_origin())
1057  *
1058  * @param member
1059  *        Membership handle.
1060  */
1061 void
1062 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1063                               GNUNET_ContinuationCallback part_cb,
1064                               void *part_cls)
1065 {
1066   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
1067   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1068
1069   grp->is_disconnecting = GNUNET_YES;
1070   grp->disconnect_cb = part_cb;
1071   grp->disconnect_cls = part_cls;
1072
1073   mem->join_dcsn_cb = NULL;
1074   grp->join_req_cb = NULL;
1075   grp->message_cb = NULL;
1076   grp->replay_msg_cb = NULL;
1077   grp->replay_frag_cb = NULL;
1078
1079   GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
1080                                     member_cleanup, mem);
1081 }
1082
1083
1084 void
1085 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1086                        uint64_t fragment_id,
1087                        uint64_t message_id,
1088                        uint64_t fragment_offset,
1089                        uint64_t flags)
1090 {
1091   struct MulticastReplayRequestMessage rep = {
1092     .header = {
1093       .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
1094       .size = htons (sizeof (rep)),
1095     },
1096     .fragment_id = GNUNET_htonll (fragment_id),
1097     .message_id = GNUNET_htonll (message_id),
1098     .fragment_offset = GNUNET_htonll (fragment_offset),
1099     .flags = GNUNET_htonll (flags),
1100   };
1101   GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
1102 }
1103
1104
1105 /**
1106  * Request a fragment to be replayed by fragment ID.
1107  *
1108  * Useful if messages below the @e max_known_fragment_id given when joining are
1109  * needed and not known to the client.
1110  *
1111  * @param member
1112  *        Membership handle.
1113  * @param fragment_id
1114  *        ID of a message fragment that this client would like to see replayed.
1115  * @param flags
1116  *        Additional flags for the replay request.
1117  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1118  *
1119  * @return Replay request handle.
1120  */
1121 struct GNUNET_MULTICAST_MemberReplayHandle *
1122 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1123                                          uint64_t fragment_id,
1124                                          uint64_t flags)
1125 {
1126   member_replay_request (mem, fragment_id, 0, 0, flags);
1127   // FIXME: return something useful
1128   return NULL;
1129 }
1130
1131
1132 /**
1133  * Request a message fragment to be replayed.
1134  *
1135  * Useful if messages below the @e max_known_fragment_id given when joining are
1136  * needed and not known to the client.
1137  *
1138  * @param member
1139  *        Membership handle.
1140  * @param message_id
1141  *        ID of the message this client would like to see replayed.
1142  * @param fragment_offset
1143  *        Offset of the fragment within the message to replay.
1144  * @param flags
1145  *        Additional flags for the replay request.
1146  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1147  *
1148  * @return Replay request handle, NULL on error.
1149  */
1150 struct GNUNET_MULTICAST_MemberReplayHandle *
1151 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1152                                         uint64_t message_id,
1153                                         uint64_t fragment_offset,
1154                                         uint64_t flags)
1155 {
1156   member_replay_request (mem, 0, message_id, fragment_offset, flags);
1157   // FIXME: return something useful
1158   return NULL;
1159 }
1160
1161
1162 static void
1163 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1164 {
1165   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1166   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1167   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1168   GNUNET_assert (GNUNET_YES == grp->in_transmit);
1169
1170   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1171   struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
1172   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1173
1174   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1175       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1176   {
1177     LOG (GNUNET_ERROR_TYPE_ERROR,
1178          "MemberTransmitNotify() returned error or invalid message size. "
1179          "ret=%d, buf_size=%u\n", ret, buf_size);
1180     /* FIXME: handle error */
1181     GNUNET_free (req);
1182     return;
1183   }
1184
1185   if (GNUNET_NO == ret && 0 == buf_size)
1186   {
1187     /* Transmission paused. */
1188     GNUNET_free (req);
1189     return;
1190   }
1191
1192   req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1193   req->header.size = htons (sizeof (*req) + buf_size);
1194   req->request_id = GNUNET_htonll (tmit->request_id);
1195   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1196   tmit->fragment_offset += sizeof (*req) + buf_size;
1197
1198   GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
1199   GNUNET_free (req);
1200
1201   if (GNUNET_YES == ret)
1202     grp->in_transmit = GNUNET_NO;
1203 }
1204
1205
1206 /**
1207  * Send a message to the origin of the multicast group.
1208  *
1209  * @param mem
1210  *        Membership handle.
1211  * @param request_id
1212  *        Application layer ID for the request.  Opaque to multicast.
1213  * @param notify
1214  *        Callback to call to get the message.
1215  * @param notify_cls
1216  *        Closure for @a notify.
1217  *
1218  * @return Handle to cancel request, NULL on error (i.e. request already pending).
1219  */
1220 struct GNUNET_MULTICAST_MemberTransmitHandle *
1221 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1222                                    uint64_t request_id,
1223                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
1224                                    void *notify_cls)
1225 {
1226   if (GNUNET_YES == mem->grp.in_transmit)
1227     return NULL;
1228   mem->grp.in_transmit = GNUNET_YES;
1229
1230   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1231   tmit->member = mem;
1232   tmit->request_id = request_id;
1233   tmit->fragment_offset = 0;
1234   tmit->notify = notify;
1235   tmit->notify_cls = notify_cls;
1236
1237   member_to_origin (mem);
1238   return tmit;
1239 }
1240
1241
1242 /**
1243  * Resume message transmission to origin.
1244  *
1245  * @param th
1246  *        Transmission to cancel.
1247  */
1248 void
1249 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1250 {
1251   struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1252   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1253     return;
1254   member_to_origin (th->member);
1255 }
1256
1257
1258 /**
1259  * Cancel request for message transmission to origin.
1260  *
1261  * @param th
1262  *        Transmission to cancel.
1263  */
1264 void
1265 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1266 {
1267   th->member->grp.in_transmit = GNUNET_NO;
1268 }
1269
1270
1271 /* end of multicast_api.c */