glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / multicast / multicast_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file multicast/multicast_api.c
18  * @brief Multicast service; implements multicast groups using CADET connections.
19  * @author Christian Grothoff
20  * @author Gabor X Toth
21  */
22
23 #include "platform.h"
24 #include "gnunet_util_lib.h"
25 #include "gnunet_multicast_service.h"
26 #include "multicast.h"
27
28 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
29
30
31 /**
32  * Handle for a request to send a message to all multicast group members
33  * (from the origin).
34  */
35 struct GNUNET_MULTICAST_OriginTransmitHandle
36 {
37   GNUNET_MULTICAST_OriginTransmitNotify notify;
38   void *notify_cls;
39   struct GNUNET_MULTICAST_Origin *origin;
40
41   uint64_t message_id;
42   uint64_t group_generation;
43   uint64_t fragment_offset;
44 };
45
46
47 /**
48  * Handle for a message to be delivered from a member to the origin.
49  */
50 struct GNUNET_MULTICAST_MemberTransmitHandle
51 {
52   GNUNET_MULTICAST_MemberTransmitNotify notify;
53   void *notify_cls;
54   struct GNUNET_MULTICAST_Member *member;
55
56   uint64_t request_id;
57   uint64_t fragment_offset;
58 };
59
60
61 struct GNUNET_MULTICAST_Group
62 {
63   /**
64    * Configuration to use.
65    */
66   const struct GNUNET_CONFIGURATION_Handle *cfg;
67
68   /**
69    * Client connection to the service.
70    */
71   struct GNUNET_MQ_Handle *mq;
72
73   /**
74    * Message to send on connect.
75    */
76   struct GNUNET_MQ_Envelope *connect_env;
77
78   /**
79    * Time to wait until we try to reconnect on failure.
80    */
81   struct GNUNET_TIME_Relative reconnect_delay;
82
83   /**
84    * Task for reconnecting when the listener fails.
85    */
86   struct GNUNET_SCHEDULER_Task *reconnect_task;
87
88   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
89   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
90   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
91   GNUNET_MULTICAST_MessageCallback message_cb;
92   void *cb_cls;
93
94   /**
95    * Function called after disconnected from the service.
96    */
97   GNUNET_ContinuationCallback disconnect_cb;
98
99   /**
100    * Closure for @a disconnect_cb.
101    */
102   void *disconnect_cls;
103
104   /**
105    * Are we currently transmitting a message?
106    */
107   uint8_t in_transmit;
108
109   /**
110    * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
111    */
112   uint8_t acks_pending;
113
114   /**
115    * Is this the origin or a member?
116    */
117   uint8_t is_origin;
118
119   /**
120    * Is this channel in the process of disconnecting from the service?
121    * #GNUNET_YES or #GNUNET_NO
122    */
123   uint8_t is_disconnecting;
124 };
125
126
127 /**
128  * Handle for the origin of a multicast group.
129  */
130 struct GNUNET_MULTICAST_Origin
131 {
132   struct GNUNET_MULTICAST_Group grp;
133   struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
134
135   GNUNET_MULTICAST_RequestCallback request_cb;
136 };
137
138
139 /**
140  * Handle for a multicast group member.
141  */
142 struct GNUNET_MULTICAST_Member
143 {
144   struct GNUNET_MULTICAST_Group grp;
145   struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
146
147   GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
148
149   /**
150    * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
151    */
152   struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
153
154   uint64_t next_fragment_id;
155 };
156
157
158 /**
159  * Handle that identifies a join request.
160  *
161  * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
162  * corresponding calls to #GNUNET_MULTICAST_join_decision().
163  */
164 struct GNUNET_MULTICAST_JoinHandle
165 {
166   struct GNUNET_MULTICAST_Group *group;
167
168   /**
169    * Public key of the member requesting join.
170    */
171   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
172
173   /**
174    * Peer identity of the member requesting join.
175    */
176   struct GNUNET_PeerIdentity peer;
177 };
178
179
180 /**
181  * Opaque handle to a replay request from the multicast service.
182  */
183 struct GNUNET_MULTICAST_ReplayHandle
184 {
185   struct GNUNET_MULTICAST_Group *grp;
186   struct MulticastReplayRequestMessage req;
187 };
188
189
190 /**
191  * Handle for a replay request.
192  */
193 struct GNUNET_MULTICAST_MemberReplayHandle
194 {
195 };
196
197
198 static void
199 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
200
201 static void
202 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
203
204
205 /**
206  * Check join request message.
207  */
208 static int
209 check_group_join_request (void *cls,
210                           const struct MulticastJoinRequestMessage *jreq)
211 {
212   uint16_t size = ntohs (jreq->header.size);
213
214   if (sizeof (*jreq) == size)
215     return GNUNET_OK;
216
217   if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
218     return GNUNET_OK;
219
220   return GNUNET_SYSERR;
221 }
222
223
224 /**
225  * Receive join request from service.
226  */
227 static void
228 handle_group_join_request (void *cls,
229                            const struct MulticastJoinRequestMessage *jreq)
230 {
231   struct GNUNET_MULTICAST_Group *grp = cls;
232   struct GNUNET_MULTICAST_JoinHandle *jh;
233   const struct GNUNET_MessageHeader *jmsg = NULL;
234
235   if (NULL == grp)
236   {
237     GNUNET_break (0);
238     return;
239   }
240   if (NULL == grp->join_req_cb)
241     return;
242
243   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
244     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
245
246   jh = GNUNET_malloc (sizeof (*jh));
247   jh->group = grp;
248   jh->member_pub_key = jreq->member_pub_key;
249   jh->peer = jreq->peer;
250   grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
251
252   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
253 }
254
255
256 /**
257  * Check multicast message.
258  */
259 static int
260 check_group_message (void *cls,
261                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
262 {
263   return GNUNET_OK;
264 }
265
266
267 /**
268  * Receive multicast message from service.
269  */
270 static void
271 handle_group_message (void *cls,
272                       const struct GNUNET_MULTICAST_MessageHeader *mmsg)
273 {
274   struct GNUNET_MULTICAST_Group *grp = cls;
275
276   if (GNUNET_YES == grp->is_disconnecting)
277     return;
278
279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
280               "Calling message callback with a message of size %u.\n",
281               ntohs (mmsg->header.size));
282
283   if (NULL != grp->message_cb)
284     grp->message_cb (grp->cb_cls, mmsg);
285
286   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
287 }
288
289
290 /**
291  * Receive message/request fragment acknowledgement from service.
292  */
293 static void
294 handle_group_fragment_ack (void *cls,
295                            const struct GNUNET_MessageHeader *msg)
296 {
297   struct GNUNET_MULTICAST_Group *grp = cls;
298
299   LOG (GNUNET_ERROR_TYPE_DEBUG,
300        "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
301        grp, grp->in_transmit, grp->acks_pending);
302
303   if (0 == grp->acks_pending)
304   {
305     LOG (GNUNET_ERROR_TYPE_DEBUG,
306          "%p Ignoring extraneous fragment ACK.\n", grp);
307     return;
308   }
309   grp->acks_pending--;
310
311   if (GNUNET_YES != grp->in_transmit)
312     return;
313
314   if (GNUNET_YES == grp->is_origin)
315     origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
316   else
317     member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
318
319   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
320 }
321
322
323 /**
324  * Check unicast request.
325  */
326 static int
327 check_origin_request (void *cls,
328                       const struct GNUNET_MULTICAST_RequestHeader *req)
329 {
330   return GNUNET_OK;
331 }
332
333
334 /**
335  * Origin receives unicast request from a member.
336  */
337 static void
338 handle_origin_request (void *cls,
339                        const struct GNUNET_MULTICAST_RequestHeader *req)
340 {
341   struct GNUNET_MULTICAST_Group *grp;
342   struct GNUNET_MULTICAST_Origin *orig = cls;
343   grp = &orig->grp;
344
345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346               "Calling request callback with a request of size %u.\n",
347               ntohs (req->header.size));
348
349   if (NULL != orig->request_cb)
350     orig->request_cb (grp->cb_cls, req);
351
352   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
353 }
354
355
356 /**
357  * Receive multicast replay request from service.
358  */
359 static void
360 handle_group_replay_request (void *cls,
361                              const struct MulticastReplayRequestMessage *rep)
362
363 {
364   struct GNUNET_MULTICAST_Group *grp = cls;
365
366   if (GNUNET_YES == grp->is_disconnecting)
367     return;
368
369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
370
371   if (0 != rep->fragment_id)
372   {
373     if (NULL != grp->replay_frag_cb)
374     {
375       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
376       rh->grp = grp;
377       rh->req = *rep;
378       grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
379                            GNUNET_ntohll (rep->fragment_id),
380                            GNUNET_ntohll (rep->flags), rh);
381     }
382   }
383   else if (0 != rep->message_id)
384   {
385     if (NULL != grp->replay_msg_cb)
386     {
387       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
388       rh->grp = grp;
389       rh->req = *rep;
390       grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
391                           GNUNET_ntohll (rep->message_id),
392                           GNUNET_ntohll (rep->fragment_offset),
393                           GNUNET_ntohll (rep->flags), rh);
394     }
395   }
396
397   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
398 }
399
400
401 /**
402  * Check replay response.
403  */
404 static int
405 check_member_replay_response (void *cls,
406                               const struct MulticastReplayResponseMessage *res)
407 {
408   uint16_t size = ntohs (res->header.size);
409
410   if (sizeof (*res) == size)
411     return GNUNET_OK;
412
413   if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
414     return GNUNET_OK;
415
416   return GNUNET_SYSERR;
417 }
418
419
420 /**
421  * Receive replay response from service.
422  */
423 static void
424 handle_member_replay_response (void *cls,
425                                const struct MulticastReplayResponseMessage *res)
426 {
427   struct GNUNET_MULTICAST_Group *grp;
428   struct GNUNET_MULTICAST_Member *mem = cls;
429   grp = &mem->grp;
430
431   if (GNUNET_YES == grp->is_disconnecting)
432     return;
433
434   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
435
436   // FIXME: return result
437 }
438
439
440 /**
441  * Check join decision.
442  */
443 static int
444 check_member_join_decision (void *cls,
445                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
446 {
447   return GNUNET_OK; // checked in handle below
448 }
449
450
451 /**
452  * Member receives join decision.
453  */
454 static void
455 handle_member_join_decision (void *cls,
456                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
457 {
458   struct GNUNET_MULTICAST_Group *grp;
459   struct GNUNET_MULTICAST_Member *mem = cls;
460   grp = &mem->grp;
461
462   const struct MulticastJoinDecisionMessage *
463     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
464
465   uint16_t dcsn_size = ntohs (dcsn->header.size);
466   int is_admitted = ntohl (dcsn->is_admitted);
467
468   LOG (GNUNET_ERROR_TYPE_DEBUG,
469        "%p Member got join decision from multicast: %d\n",
470        mem, is_admitted);
471
472   const struct GNUNET_MessageHeader *join_resp = NULL;
473   uint16_t join_resp_size = 0;
474
475   uint16_t relay_count = ntohl (dcsn->relay_count);
476   const struct GNUNET_PeerIdentity *relays = NULL;
477   uint16_t relay_size = relay_count * sizeof (*relays);
478   if (0 < relay_count)
479   {
480     if (dcsn_size < sizeof (*dcsn) + relay_size)
481     {
482       GNUNET_break_op (0);
483       is_admitted = GNUNET_SYSERR;
484     }
485     else
486     {
487       relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
488     }
489   }
490
491   if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
492   {
493     join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
494     join_resp_size = ntohs (join_resp->size);
495   }
496   if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
497   {
498     LOG (GNUNET_ERROR_TYPE_DEBUG,
499          "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
500          dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
501     GNUNET_break_op (0);
502     is_admitted = GNUNET_SYSERR;
503   }
504
505   if (NULL != mem->join_dcsn_cb)
506     mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
507                        relay_count, relays, join_resp);
508
509   // FIXME:
510   //if (GNUNET_YES != is_admitted)
511   //  GNUNET_MULTICAST_member_part (mem);
512
513   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
514 }
515
516
517 static void
518 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
519 {
520   if (NULL != grp->connect_env)
521   {
522     GNUNET_MQ_discard (grp->connect_env);
523     grp->connect_env = NULL;
524   }
525   if (NULL != grp->mq)
526   {
527     GNUNET_MQ_destroy (grp->mq);
528     grp->mq = NULL;
529   }
530   if (NULL != grp->disconnect_cb)
531   {
532     grp->disconnect_cb (grp->disconnect_cls);
533     grp->disconnect_cb = NULL;
534   }
535   GNUNET_free (grp);
536 }
537
538
539 static void
540 handle_group_part_ack (void *cls,
541                        const struct GNUNET_MessageHeader *msg)
542 {
543   struct GNUNET_MULTICAST_Group *grp = cls;
544
545   group_cleanup (grp);
546 }
547
548
549 /**
550  * Function to call with the decision made for a join request.
551  *
552  * Must be called once and only once in response to an invocation of the
553  * #GNUNET_MULTICAST_JoinRequestCallback.
554  *
555  * @param join
556  *        Join request handle.
557  * @param is_admitted
558  *        #GNUNET_YES    if the join is approved,
559  *        #GNUNET_NO     if it is disapproved,
560  *        #GNUNET_SYSERR if we cannot answer the request.
561  * @param relay_count
562  *        Number of relays given.
563  * @param relays
564  *        Array of suggested peers that might be useful relays to use
565  *        when joining the multicast group (essentially a list of peers that
566  *        are already part of the multicast group and might thus be willing
567  *        to help with routing).  If empty, only this local peer (which must
568  *        be the multicast origin) is a good candidate for building the
569  *        multicast tree.  Note that it is unnecessary to specify our own
570  *        peer identity in this array.
571  * @param join_resp
572  *        Message to send in response to the joining peer;
573  *        can also be used to redirect the peer to a different group at the
574  *        application layer; this response is to be transmitted to the
575  *        peer that issued the request even if admission is denied.
576  */
577 struct GNUNET_MULTICAST_ReplayHandle *
578 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
579                                 int is_admitted,
580                                 uint16_t relay_count,
581                                 const struct GNUNET_PeerIdentity *relays,
582                                 const struct GNUNET_MessageHeader *join_resp)
583 {
584   struct GNUNET_MULTICAST_Group *grp = join->group;
585   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
586   uint16_t relay_size = relay_count * sizeof (*relays);
587
588   struct MulticastJoinDecisionMessageHeader *hdcsn;
589   struct MulticastJoinDecisionMessage *dcsn;
590   struct GNUNET_MQ_Envelope *
591     env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
592                                GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
593   hdcsn->member_pub_key = join->member_pub_key;
594   hdcsn->peer = join->peer;
595
596   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
597   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
598   dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
599   dcsn->is_admitted = htonl (is_admitted);
600   dcsn->relay_count = htonl (relay_count);
601   if (0 < relay_size)
602     GNUNET_memcpy (&dcsn[1], relays, relay_size);
603   if (0 < join_resp_size)
604     GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
605
606   GNUNET_MQ_send (grp->mq, env);
607   GNUNET_free (join);
608   return NULL;
609 }
610
611
612 /**
613  * Replay a message fragment for the multicast group.
614  *
615  * @param rh
616  *        Replay handle identifying which replay operation was requested.
617  * @param msg
618  *        Replayed message fragment, NULL if not found / an error occurred.
619  * @param ec
620  *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
621  *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
622  */
623 void
624 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
625                                   const struct GNUNET_MessageHeader *msg,
626                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
627 {
628   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
629   struct MulticastReplayResponseMessage *res;
630   struct GNUNET_MQ_Envelope *
631     env = GNUNET_MQ_msg_extra (res, msg_size,
632                                GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
633   res->fragment_id = rh->req.fragment_id;
634   res->message_id = rh->req.message_id;
635   res->fragment_offset = rh->req.fragment_offset;
636   res->flags = rh->req.flags;
637   res->error_code = htonl (ec);
638
639   if (GNUNET_MULTICAST_REC_OK == ec)
640   {
641     GNUNET_assert (NULL != msg);
642     GNUNET_memcpy (&res[1], msg, msg_size);
643   }
644
645   GNUNET_MQ_send (rh->grp->mq, env);
646
647   if (GNUNET_MULTICAST_REC_OK != ec)
648     GNUNET_free (rh);
649 }
650
651
652 /**
653  * Indicate the end of the replay session.
654  *
655  * Invalidates the replay handle.
656  *
657  * @param rh
658  *        Replay session to end.
659  */
660 void
661 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
662 {
663   struct MulticastReplayResponseMessage *end;
664   struct GNUNET_MQ_Envelope *
665     env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
666
667   end->fragment_id = rh->req.fragment_id;
668   end->message_id = rh->req.message_id;
669   end->fragment_offset = rh->req.fragment_offset;
670   end->flags = rh->req.flags;
671
672   GNUNET_MQ_send (rh->grp->mq, env);
673   GNUNET_free (rh);
674 }
675
676
677 /**
678  * Replay a message for the multicast group.
679  *
680  * @param rh
681  *        Replay handle identifying which replay operation was requested.
682  * @param notify
683  *        Function to call to get the message.
684  * @param notify_cls
685  *        Closure for @a notify.
686  */
687 void
688 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
689                                    GNUNET_MULTICAST_ReplayTransmitNotify notify,
690                                    void *notify_cls)
691 {
692 }
693
694
695 static void
696 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
697
698
699 static void
700 origin_reconnect (void *cls)
701 {
702   origin_connect (cls);
703 }
704
705
706 /**
707  * Origin client disconnected from service.
708  *
709  * Reconnect after backoff period.
710  */
711 static void
712 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
713 {
714   struct GNUNET_MULTICAST_Origin *orig = cls;
715   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
716
717   LOG (GNUNET_ERROR_TYPE_DEBUG,
718        "Origin client disconnected (%d), re-connecting\n",
719        (int) error);
720   if (NULL != grp->mq)
721   {
722     GNUNET_MQ_destroy (grp->mq);
723     grp->mq = NULL;
724   }
725
726   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
727                                                       origin_reconnect,
728                                                       orig);
729   grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
730 }
731
732
733 /**
734  * Connect to service as origin.
735  */
736 static void
737 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
738 {
739   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
740
741   struct GNUNET_MQ_MessageHandler handlers[] = {
742     GNUNET_MQ_hd_var_size (group_message,
743                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
744                            struct GNUNET_MULTICAST_MessageHeader,
745                            grp),
746     GNUNET_MQ_hd_var_size (origin_request,
747                            GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
748                            struct GNUNET_MULTICAST_RequestHeader,
749                            orig),
750     GNUNET_MQ_hd_fixed_size (group_fragment_ack,
751                              GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
752                              struct GNUNET_MessageHeader,
753                              grp),
754     GNUNET_MQ_hd_var_size (group_join_request,
755                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
756                            struct MulticastJoinRequestMessage,
757                            grp),
758     GNUNET_MQ_hd_fixed_size (group_part_ack,
759                              GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
760                              struct GNUNET_MessageHeader,
761                              grp),
762     GNUNET_MQ_hd_fixed_size (group_replay_request,
763                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
764                              struct MulticastReplayRequestMessage,
765                              grp),
766     GNUNET_MQ_handler_end ()
767   };
768
769   grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
770                                    handlers, origin_disconnected, orig);
771   GNUNET_assert (NULL != grp->mq);
772   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
773 }
774
775
776 /**
777  * Start a multicast group.
778  *
779  * Will advertise the origin in the P2P overlay network under the respective
780  * public key so that other peer can find this peer to join it.  Peers that
781  * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
782  * either an existing group member or to the origin.  If the joining is
783  * approved, the member is cleared for @e replay and will begin to receive
784  * messages transmitted to the group.  If joining is disapproved, the failed
785  * candidate will be given a response.  Members in the group can send messages
786  * to the origin (one at a time).
787  *
788  * @param cfg
789  *        Configuration to use.
790  * @param priv_key
791  *        ECC key that will be used to sign messages for this
792  *        multicast session; public key is used to identify the multicast group;
793  * @param max_fragment_id
794  *        Maximum fragment ID already sent to the group.
795  *        0 for a new group.
796  * @param join_request_cb
797  *        Function called to approve / disapprove joining of a peer.
798  * @param replay_frag_cb
799  *        Function that can be called to replay a message fragment.
800  * @param replay_msg_cb
801  *        Function that can be called to replay a message.
802  * @param request_cb
803  *        Function called with message fragments from group members.
804  * @param message_cb
805  *        Function called with the message fragments sent to the
806  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
807  *        should be stored for answering replay requests later.
808  * @param cls
809  *        Closure for the various callbacks that follow.
810  *
811  * @return Handle for the origin, NULL on error.
812  */
813 struct GNUNET_MULTICAST_Origin *
814 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
815                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
816                                uint64_t max_fragment_id,
817                                GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
818                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
819                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
820                                GNUNET_MULTICAST_RequestCallback request_cb,
821                                GNUNET_MULTICAST_MessageCallback message_cb,
822                                void *cls)
823 {
824   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
825   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
826
827   struct MulticastOriginStartMessage *start;
828   grp->connect_env = GNUNET_MQ_msg (start,
829                                     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
830   start->max_fragment_id = max_fragment_id;
831   start->group_key = *priv_key;
832
833   grp->cfg = cfg;
834   grp->is_origin = GNUNET_YES;
835   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
836
837   grp->cb_cls = cls;
838   grp->join_req_cb = join_request_cb;
839   grp->replay_frag_cb = replay_frag_cb;
840   grp->replay_msg_cb = replay_msg_cb;
841   grp->message_cb = message_cb;
842
843   orig->request_cb = request_cb;
844
845   origin_connect (orig);
846   return orig;
847 }
848
849
850 /**
851  * Stop a multicast group.
852  *
853  * @param origin
854  *        Multicast group to stop.
855  */
856 void
857 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
858                               GNUNET_ContinuationCallback stop_cb,
859                               void *stop_cls)
860 {
861   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
862   struct GNUNET_MQ_Envelope *env;
863
864   grp->is_disconnecting = GNUNET_YES;
865   grp->disconnect_cb = stop_cb;
866   grp->disconnect_cls = stop_cls;
867   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
868   GNUNET_MQ_send (grp->mq, env);
869 }
870
871
872 static void
873 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
874 {
875   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
876   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
877   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
878   GNUNET_assert (GNUNET_YES == grp->in_transmit);
879
880   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
881   struct GNUNET_MULTICAST_MessageHeader *msg;
882   struct GNUNET_MQ_Envelope *
883     env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
884                                GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
885
886   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
887
888   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
889       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
890   {
891     LOG (GNUNET_ERROR_TYPE_ERROR,
892          "%p OriginTransmitNotify() returned error or invalid message size.\n",
893          orig);
894     /* FIXME: handle error */
895     GNUNET_MQ_discard (env);
896     return;
897   }
898
899   if (GNUNET_NO == ret && 0 == buf_size)
900   {
901     LOG (GNUNET_ERROR_TYPE_DEBUG,
902          "%p OriginTransmitNotify() - transmission paused.\n", orig);
903     GNUNET_MQ_discard (env);
904     return; /* Transmission paused. */
905   }
906
907   msg->header.size = htons (sizeof (*msg) + buf_size);
908   msg->message_id = GNUNET_htonll (tmit->message_id);
909   msg->group_generation = tmit->group_generation;
910   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
911   tmit->fragment_offset += sizeof (*msg) + buf_size;
912
913   grp->acks_pending++;
914   GNUNET_MQ_send (grp->mq, env);
915
916   if (GNUNET_YES == ret)
917     grp->in_transmit = GNUNET_NO;
918 }
919
920
921 /**
922  * Send a message to the multicast group.
923  *
924  * @param orig
925  *        Handle to the multicast group.
926  * @param message_id
927  *        Application layer ID for the message.  Opaque to multicast.
928  * @param group_generation
929  *        Group generation of the message.
930  *        Documented in struct GNUNET_MULTICAST_MessageHeader.
931  * @param notify
932  *        Function to call to get the message.
933  * @param notify_cls
934  *        Closure for @a notify.
935  *
936  * @return Message handle on success,
937  *         NULL on error (i.e. another request is already pending).
938  */
939 struct GNUNET_MULTICAST_OriginTransmitHandle *
940 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
941                                 uint64_t message_id,
942                                 uint64_t group_generation,
943                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
944                                 void *notify_cls)
945 {
946   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
947   if (GNUNET_YES == grp->in_transmit)
948     return NULL;
949   grp->in_transmit = GNUNET_YES;
950
951   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
952   tmit->origin = orig;
953   tmit->message_id = message_id;
954   tmit->fragment_offset = 0;
955   tmit->group_generation = group_generation;
956   tmit->notify = notify;
957   tmit->notify_cls = notify_cls;
958
959   origin_to_all (orig);
960   return tmit;
961 }
962
963
964 /**
965  * Resume message transmission to multicast group.
966  *
967  * @param th
968  *        Transmission to cancel.
969  */
970 void
971 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
972 {
973   struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
974   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
975     return;
976   origin_to_all (th->origin);
977 }
978
979
980 /**
981  * Cancel request for message transmission to multicast group.
982  *
983  * @param th
984  *        Transmission to cancel.
985  */
986 void
987 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
988 {
989   th->origin->grp.in_transmit = GNUNET_NO;
990 }
991
992
993 static void
994 member_connect (struct GNUNET_MULTICAST_Member *mem);
995
996
997 static void
998 member_reconnect (void *cls)
999 {
1000   member_connect (cls);
1001 }
1002
1003
1004 /**
1005  * Member client disconnected from service.
1006  *
1007  * Reconnect after backoff period.
1008  */
1009 static void
1010 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1011 {
1012   struct GNUNET_MULTICAST_Member *mem = cls;
1013   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1014
1015   LOG (GNUNET_ERROR_TYPE_DEBUG,
1016        "Member client disconnected (%d), re-connecting\n",
1017        (int) error);
1018   GNUNET_MQ_destroy (grp->mq);
1019   grp->mq = NULL;
1020
1021   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1022                                                       member_reconnect,
1023                                                       mem);
1024   grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1025 }
1026
1027
1028 /**
1029  * Connect to service as member.
1030  */
1031 static void
1032 member_connect (struct GNUNET_MULTICAST_Member *mem)
1033 {
1034   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1035
1036   struct GNUNET_MQ_MessageHandler handlers[] = {
1037     GNUNET_MQ_hd_var_size (group_message,
1038                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1039                            struct GNUNET_MULTICAST_MessageHeader,
1040                            grp),
1041     GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1042                              GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1043                              struct GNUNET_MessageHeader,
1044                              grp),
1045     GNUNET_MQ_hd_var_size (group_join_request,
1046                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1047                            struct MulticastJoinRequestMessage,
1048                            grp),
1049     GNUNET_MQ_hd_var_size (member_join_decision,
1050                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1051                            struct MulticastJoinDecisionMessageHeader,
1052                            mem),
1053     GNUNET_MQ_hd_fixed_size (group_part_ack,
1054                              GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
1055                              struct GNUNET_MessageHeader,
1056                              grp),
1057     GNUNET_MQ_hd_fixed_size (group_replay_request,
1058                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1059                              struct MulticastReplayRequestMessage,
1060                              grp),
1061     GNUNET_MQ_hd_var_size (member_replay_response,
1062                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1063                            struct MulticastReplayResponseMessage,
1064                            mem),
1065     GNUNET_MQ_handler_end ()
1066   };
1067
1068   grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
1069                                    handlers, member_disconnected, mem);
1070   GNUNET_assert (NULL != grp->mq);
1071   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1072 }
1073
1074
1075 /**
1076  * Join a multicast group.
1077  *
1078  * The entity joining is always the local peer.  Further information about the
1079  * candidate can be provided in the @a join_request message.  If the join fails, the
1080  * @a message_cb is invoked with a (failure) response and then with NULL.  If
1081  * the join succeeds, outstanding (state) messages and ongoing multicast
1082  * messages will be given to the @a message_cb until the member decides to part
1083  * the group.  The @a replay_cb function may be called at any time by the
1084  * multicast service to support relaying messages to other members of the group.
1085  *
1086  * @param cfg
1087  *        Configuration to use.
1088  * @param group_key
1089  *        ECC public key that identifies the group to join.
1090  * @param member_key
1091  *        ECC key that identifies the member
1092  *        and used to sign requests sent to the origin.
1093  * @param origin
1094  *        Peer ID of the origin to send unicast requsets to.  If NULL,
1095  *        unicast requests are sent back via multiple hops on the reverse path
1096  *        of multicast messages.
1097  * @param relay_count
1098  *        Number of peers in the @a relays array.
1099  * @param relays
1100  *        Peer identities of members of the group, which serve as relays
1101  *        and can be used to join the group at. and send the @a join_request to.
1102  *        If empty, the @a join_request is sent directly to the @a origin.
1103  * @param join_msg
1104  *        Application-dependent join message to be passed to the peer @a origin.
1105  * @param join_request_cb
1106  *        Function called to approve / disapprove joining of a peer.
1107  * @param join_decision_cb
1108  *        Function called to inform about the join decision.
1109  * @param replay_frag_cb
1110  *        Function that can be called to replay message fragments
1111  *        this peer already knows from this group. NULL if this
1112  *        client is unable to support replay.
1113  * @param replay_msg_cb
1114  *        Function that can be called to replay message fragments
1115  *        this peer already knows from this group. NULL if this
1116  *        client is unable to support replay.
1117  * @param message_cb
1118  *        Function to be called for all message fragments we
1119  *        receive from the group, excluding those our @a replay_cb
1120  *        already has.
1121  * @param cls
1122  *        Closure for callbacks.
1123  *
1124  * @return Handle for the member, NULL on error.
1125  */
1126 struct GNUNET_MULTICAST_Member *
1127 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1128                               const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1129                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1130                               const struct GNUNET_PeerIdentity *origin,
1131                               uint16_t relay_count,
1132                               const struct GNUNET_PeerIdentity *relays,
1133                               const struct GNUNET_MessageHeader *join_msg,
1134                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1135                               GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1136                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1137                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1138                               GNUNET_MULTICAST_MessageCallback message_cb,
1139                               void *cls)
1140 {
1141   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1142   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1143
1144   uint16_t relay_size = relay_count * sizeof (*relays);
1145   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1146   struct MulticastMemberJoinMessage *join;
1147   grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1148                                           GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1149   join->group_pub_key = *group_pub_key;
1150   join->member_key = *member_key;
1151   join->origin = *origin;
1152   join->relay_count = ntohl (relay_count);
1153   if (0 < relay_size)
1154     GNUNET_memcpy (&join[1], relays, relay_size);
1155   if (0 < join_msg_size)
1156     GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1157
1158   grp->cfg = cfg;
1159   grp->is_origin = GNUNET_NO;
1160   grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1161
1162   mem->join_dcsn_cb = join_decision_cb;
1163   grp->join_req_cb = join_request_cb;
1164   grp->replay_frag_cb = replay_frag_cb;
1165   grp->replay_msg_cb = replay_msg_cb;
1166   grp->message_cb = message_cb;
1167   grp->cb_cls = cls;
1168
1169   member_connect (mem);
1170   return mem;
1171 }
1172
1173
1174 /**
1175  * Part a multicast group.
1176  *
1177  * Disconnects from all group members and invalidates the @a member handle.
1178  *
1179  * An application-dependent part message can be transmitted beforehand using
1180  * #GNUNET_MULTICAST_member_to_origin())
1181  *
1182  * @param member
1183  *        Membership handle.
1184  */
1185 void
1186 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1187                               GNUNET_ContinuationCallback part_cb,
1188                               void *part_cls)
1189 {
1190   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1191   struct GNUNET_MQ_Envelope *env;
1192
1193   mem->join_dcsn_cb = NULL;
1194   grp->join_req_cb = NULL;
1195   grp->message_cb = NULL;
1196   grp->replay_msg_cb = NULL;
1197   grp->replay_frag_cb = NULL;
1198   grp->is_disconnecting = GNUNET_YES;
1199   grp->disconnect_cb = part_cb;
1200   grp->disconnect_cls = part_cls;
1201   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
1202   GNUNET_MQ_send (grp->mq, env);
1203 }
1204
1205
1206 void
1207 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1208                        uint64_t fragment_id,
1209                        uint64_t message_id,
1210                        uint64_t fragment_offset,
1211                        uint64_t flags)
1212 {
1213   struct MulticastReplayRequestMessage *rep;
1214   struct GNUNET_MQ_Envelope *
1215     env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1216
1217   rep->fragment_id = GNUNET_htonll (fragment_id);
1218   rep->message_id = GNUNET_htonll (message_id);
1219   rep->fragment_offset = GNUNET_htonll (fragment_offset);
1220   rep->flags = GNUNET_htonll (flags);
1221
1222   GNUNET_MQ_send (mem->grp.mq, env);
1223 }
1224
1225
1226 /**
1227  * Request a fragment to be replayed by fragment ID.
1228  *
1229  * Useful if messages below the @e max_known_fragment_id given when joining are
1230  * needed and not known to the client.
1231  *
1232  * @param member
1233  *        Membership handle.
1234  * @param fragment_id
1235  *        ID of a message fragment that this client would like to see replayed.
1236  * @param flags
1237  *        Additional flags for the replay request.
1238  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1239  *
1240  * @return Replay request handle.
1241  */
1242 struct GNUNET_MULTICAST_MemberReplayHandle *
1243 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1244                                          uint64_t fragment_id,
1245                                          uint64_t flags)
1246 {
1247   member_replay_request (mem, fragment_id, 0, 0, flags);
1248   // FIXME: return something useful
1249   return NULL;
1250 }
1251
1252
1253 /**
1254  * Request a message fragment to be replayed.
1255  *
1256  * Useful if messages below the @e max_known_fragment_id given when joining are
1257  * needed and not known to the client.
1258  *
1259  * @param member
1260  *        Membership handle.
1261  * @param message_id
1262  *        ID of the message this client would like to see replayed.
1263  * @param fragment_offset
1264  *        Offset of the fragment within the message to replay.
1265  * @param flags
1266  *        Additional flags for the replay request.
1267  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1268  *
1269  * @return Replay request handle, NULL on error.
1270  */
1271 struct GNUNET_MULTICAST_MemberReplayHandle *
1272 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1273                                         uint64_t message_id,
1274                                         uint64_t fragment_offset,
1275                                         uint64_t flags)
1276 {
1277   member_replay_request (mem, 0, message_id, fragment_offset, flags);
1278   // FIXME: return something useful
1279   return NULL;
1280 }
1281
1282
1283 static void
1284 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1285 {
1286   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1287   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1288   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1289   GNUNET_assert (GNUNET_YES == grp->in_transmit);
1290
1291   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1292   struct GNUNET_MULTICAST_RequestHeader *req;
1293   struct GNUNET_MQ_Envelope *
1294     env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1295                                GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1296
1297   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1298
1299   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1300       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1301   {
1302     LOG (GNUNET_ERROR_TYPE_ERROR,
1303          "MemberTransmitNotify() returned error or invalid message size. "
1304          "ret=%d, buf_size=%u\n", ret, buf_size);
1305     /* FIXME: handle error */
1306     GNUNET_MQ_discard (env);
1307     return;
1308   }
1309
1310   if (GNUNET_NO == ret && 0 == buf_size)
1311   {
1312     /* Transmission paused. */
1313     GNUNET_MQ_discard (env);
1314     return;
1315   }
1316
1317   req->header.size = htons (sizeof (*req) + buf_size);
1318   req->request_id = GNUNET_htonll (tmit->request_id);
1319   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1320   tmit->fragment_offset += sizeof (*req) + buf_size;
1321
1322   GNUNET_MQ_send (grp->mq, env);
1323
1324   if (GNUNET_YES == ret)
1325     grp->in_transmit = GNUNET_NO;
1326 }
1327
1328
1329 /**
1330  * Send a message to the origin of the multicast group.
1331  *
1332  * @param mem
1333  *        Membership handle.
1334  * @param request_id
1335  *        Application layer ID for the request.  Opaque to multicast.
1336  * @param notify
1337  *        Callback to call to get the message.
1338  * @param notify_cls
1339  *        Closure for @a notify.
1340  *
1341  * @return Handle to cancel request, NULL on error (i.e. request already pending).
1342  */
1343 struct GNUNET_MULTICAST_MemberTransmitHandle *
1344 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1345                                    uint64_t request_id,
1346                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
1347                                    void *notify_cls)
1348 {
1349   if (GNUNET_YES == mem->grp.in_transmit)
1350     return NULL;
1351   mem->grp.in_transmit = GNUNET_YES;
1352
1353   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1354   tmit->member = mem;
1355   tmit->request_id = request_id;
1356   tmit->fragment_offset = 0;
1357   tmit->notify = notify;
1358   tmit->notify_cls = notify_cls;
1359
1360   member_to_origin (mem);
1361   return tmit;
1362 }
1363
1364
1365 /**
1366  * Resume message transmission to origin.
1367  *
1368  * @param th
1369  *        Transmission to cancel.
1370  */
1371 void
1372 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1373 {
1374   struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1375   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1376     return;
1377   member_to_origin (th->member);
1378 }
1379
1380
1381 /**
1382  * Cancel request for message transmission to origin.
1383  *
1384  * @param th
1385  *        Transmission to cancel.
1386  */
1387 void
1388 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1389 {
1390   th->member->grp.in_transmit = GNUNET_NO;
1391 }
1392
1393
1394 /* end of multicast_api.c */