multicast: switch to MQ
[oweals/gnunet.git] / src / multicast / multicast_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2012, 2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/multicast_api.c
23  * @brief Multicast service; implements multicast groups using CADET connections.
24  * @author Christian Grothoff
25  * @author Gabor X Toth
26  */
27
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_mq_lib.h"
31 #include "gnunet_multicast_service.h"
32 #include "multicast.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
35
36
37 /**
38  * Handle for a request to send a message to all multicast group members
39  * (from the origin).
40  */
41 struct GNUNET_MULTICAST_OriginTransmitHandle
42 {
43   GNUNET_MULTICAST_OriginTransmitNotify notify;
44   void *notify_cls;
45   struct GNUNET_MULTICAST_Origin *origin;
46
47   uint64_t message_id;
48   uint64_t group_generation;
49   uint64_t fragment_offset;
50 };
51
52
53 /**
54  * Handle for a message to be delivered from a member to the origin.
55  */
56 struct GNUNET_MULTICAST_MemberTransmitHandle
57 {
58   GNUNET_MULTICAST_MemberTransmitNotify notify;
59   void *notify_cls;
60   struct GNUNET_MULTICAST_Member *member;
61
62   uint64_t request_id;
63   uint64_t fragment_offset;
64 };
65
66
67 struct GNUNET_MULTICAST_Group
68 {
69   /**
70    * Configuration to use.
71    */
72   const struct GNUNET_CONFIGURATION_Handle *cfg;
73
74   /**
75    * Client connection to the service.
76    */
77   struct GNUNET_MQ_Handle *mq;
78
79   /**
80    * Time to wait until we try to reconnect on failure.
81    */
82   struct GNUNET_TIME_Relative reconnect_backoff;
83
84   /**
85    * Task for reconnecting when the listener fails.
86    */
87   struct GNUNET_SCHEDULER_Task *reconnect_task;
88
89   /**
90    * Message to send on connect.
91    */
92   struct GNUNET_MQ_Envelope *connect_env;
93
94   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
95   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
96   GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
97   GNUNET_MULTICAST_MessageCallback message_cb;
98   void *cb_cls;
99
100   /**
101    * Function called after disconnected from the service.
102    */
103   GNUNET_ContinuationCallback disconnect_cb;
104
105   /**
106    * Closure for @a disconnect_cb.
107    */
108   void *disconnect_cls;
109
110   /**
111    * Are we currently transmitting a message?
112    */
113   uint8_t in_transmit;
114
115   /**
116    * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
117    */
118   uint8_t acks_pending;
119
120   /**
121    * Is this the origin or a member?
122    */
123   uint8_t is_origin;
124
125   /**
126    * Is this channel in the process of disconnecting from the service?
127    * #GNUNET_YES or #GNUNET_NO
128    */
129   uint8_t is_disconnecting;
130 };
131
132
133 /**
134  * Handle for the origin of a multicast group.
135  */
136 struct GNUNET_MULTICAST_Origin
137 {
138   struct GNUNET_MULTICAST_Group grp;
139   struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
140
141   GNUNET_MULTICAST_RequestCallback request_cb;
142 };
143
144
145 /**
146  * Handle for a multicast group member.
147  */
148 struct GNUNET_MULTICAST_Member
149 {
150   struct GNUNET_MULTICAST_Group grp;
151   struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
152
153   GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
154
155   /**
156    * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
157    */
158   struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
159
160   uint64_t next_fragment_id;
161 };
162
163
164 /**
165  * Handle that identifies a join request.
166  *
167  * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
168  * corresponding calls to #GNUNET_MULTICAST_join_decision().
169  */
170 struct GNUNET_MULTICAST_JoinHandle
171 {
172   struct GNUNET_MULTICAST_Group *group;
173
174   /**
175    * Public key of the member requesting join.
176    */
177   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
178
179   /**
180    * Peer identity of the member requesting join.
181    */
182   struct GNUNET_PeerIdentity peer;
183 };
184
185
186 /**
187  * Opaque handle to a replay request from the multicast service.
188  */
189 struct GNUNET_MULTICAST_ReplayHandle
190 {
191   struct GNUNET_MULTICAST_Group *grp;
192   struct MulticastReplayRequestMessage req;
193 };
194
195
196 /**
197  * Handle for a replay request.
198  */
199 struct GNUNET_MULTICAST_MemberReplayHandle
200 {
201 };
202
203
204 static void
205 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
206
207 static void
208 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
209
210
211 /**
212  * Check join request message.
213  */
214 static int
215 check_group_join_request (void *cls,
216                           const struct MulticastJoinRequestMessage *jreq)
217 {
218   uint16_t size = ntohs (jreq->header.size);
219
220   if (sizeof (*jreq) == size)
221     return GNUNET_OK;
222
223   if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
224     return GNUNET_OK;
225
226   return GNUNET_SYSERR;
227 }
228
229
230 /**
231  * Receive join request from service.
232  */
233 static void
234 handle_group_join_request (void *cls,
235                            const struct MulticastJoinRequestMessage *jreq)
236 {
237   struct GNUNET_MULTICAST_Group *grp = cls;
238   struct GNUNET_MULTICAST_JoinHandle *jh;
239   const struct GNUNET_MessageHeader *jmsg = NULL;
240
241   if (NULL == grp)
242   {
243     GNUNET_break (0);
244     return;
245   }
246   if (NULL == grp->join_req_cb)
247     return;
248
249   if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
250     jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
251
252   jh = GNUNET_malloc (sizeof (*jh));
253   jh->group = grp;
254   jh->member_pub_key = jreq->member_pub_key;
255   jh->peer = jreq->peer;
256   grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
257
258   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
259 }
260
261
262 /**
263  * Check multicast message.
264  */
265 static int
266 check_group_message (void *cls,
267                      const struct GNUNET_MULTICAST_MessageHeader *mmsg)
268 {
269   return GNUNET_OK;
270 }
271
272
273 /**
274  * Receive multicast message from service.
275  */
276 static void
277 handle_group_message (void *cls,
278                       const struct GNUNET_MULTICAST_MessageHeader *mmsg)
279 {
280   struct GNUNET_MULTICAST_Group *grp = cls;
281
282   if (GNUNET_YES == grp->is_disconnecting)
283     return;
284
285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
286               "Calling message callback with a message of size %u.\n",
287               ntohs (mmsg->header.size));
288
289   if (NULL != grp->message_cb)
290     grp->message_cb (grp->cb_cls, mmsg);
291
292   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
293 }
294
295
296 /**
297  * Receive message/request fragment acknowledgement from service.
298  */
299 static void
300 handle_group_fragment_ack (void *cls,
301                            const struct GNUNET_MessageHeader *msg)
302 {
303   struct GNUNET_MULTICAST_Group *grp = cls;
304
305   LOG (GNUNET_ERROR_TYPE_DEBUG,
306        "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
307        grp, grp->in_transmit, grp->acks_pending);
308
309   if (0 == grp->acks_pending)
310   {
311     LOG (GNUNET_ERROR_TYPE_DEBUG,
312          "%p Ignoring extraneous fragment ACK.\n", grp);
313     return;
314   }
315   grp->acks_pending--;
316
317   if (GNUNET_YES != grp->in_transmit)
318     return;
319
320   if (GNUNET_YES == grp->is_origin)
321     origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
322   else
323     member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
324
325   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
326 }
327
328
329 /**
330  * Check unicast request.
331  */
332 static int
333 check_origin_request (void *cls,
334                       const struct GNUNET_MULTICAST_RequestHeader *req)
335 {
336   return GNUNET_OK;
337 }
338
339
340 /**
341  * Origin receives unicast request from a member.
342  */
343 static void
344 handle_origin_request (void *cls,
345                        const struct GNUNET_MULTICAST_RequestHeader *req)
346 {
347   struct GNUNET_MULTICAST_Group *grp;
348   struct GNUNET_MULTICAST_Origin *orig = cls;
349   grp = &orig->grp;
350
351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352               "Calling request callback with a request of size %u.\n",
353               ntohs (req->header.size));
354
355   if (NULL != orig->request_cb)
356     orig->request_cb (grp->cb_cls, req);
357
358   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
359 }
360
361
362 /**
363  * Receive multicast replay request from service.
364  */
365 static void
366 handle_group_replay_request (void *cls,
367                              const struct MulticastReplayRequestMessage *rep)
368
369 {
370   struct GNUNET_MULTICAST_Group *grp = cls;
371
372   if (GNUNET_YES == grp->is_disconnecting)
373     return;
374
375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
376
377   if (0 != rep->fragment_id)
378   {
379     if (NULL != grp->replay_frag_cb)
380     {
381       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
382       rh->grp = grp;
383       rh->req = *rep;
384       grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
385                            GNUNET_ntohll (rep->fragment_id),
386                            GNUNET_ntohll (rep->flags), rh);
387     }
388   }
389   else if (0 != rep->message_id)
390   {
391     if (NULL != grp->replay_msg_cb)
392     {
393       struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
394       rh->grp = grp;
395       rh->req = *rep;
396       grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
397                           GNUNET_ntohll (rep->message_id),
398                           GNUNET_ntohll (rep->fragment_offset),
399                           GNUNET_ntohll (rep->flags), rh);
400     }
401   }
402
403   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
404 }
405
406
407 /**
408  * Check replay response.
409  */
410 static int
411 check_member_replay_response (void *cls,
412                               const struct MulticastReplayResponseMessage *res)
413 {
414   uint16_t size = ntohs (res->header.size);
415
416   if (sizeof (*res) == size)
417     return GNUNET_OK;
418
419   if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
420     return GNUNET_OK;
421
422   return GNUNET_SYSERR;
423 }
424
425
426 /**
427  * Receive replay response from service.
428  */
429 static void
430 handle_member_replay_response (void *cls,
431                                const struct MulticastReplayResponseMessage *res)
432 {
433   struct GNUNET_MULTICAST_Group *grp;
434   struct GNUNET_MULTICAST_Member *mem = cls;
435   grp = &mem->grp;
436
437   if (GNUNET_YES == grp->is_disconnecting)
438     return;
439
440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
441
442   // FIXME: return result
443 }
444
445
446 /**
447  * Check join decision.
448  */
449 static int
450 check_member_join_decision (void *cls,
451                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
452 {
453   return GNUNET_OK; // checked in handle below
454 }
455
456
457 /**
458  * Member receives join decision.
459  */
460 static void
461 handle_member_join_decision (void *cls,
462                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
463 {
464   struct GNUNET_MULTICAST_Group *grp;
465   struct GNUNET_MULTICAST_Member *mem = cls;
466   grp = &mem->grp;
467
468   const struct MulticastJoinDecisionMessage *
469     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
470
471   uint16_t dcsn_size = ntohs (dcsn->header.size);
472   int is_admitted = ntohl (dcsn->is_admitted);
473
474   LOG (GNUNET_ERROR_TYPE_DEBUG,
475        "%p Member got join decision from multicast: %d\n",
476        mem, is_admitted);
477
478   const struct GNUNET_MessageHeader *join_resp = NULL;
479   uint16_t join_resp_size = 0;
480
481   uint16_t relay_count = ntohl (dcsn->relay_count);
482   const struct GNUNET_PeerIdentity *relays = NULL;
483   uint16_t relay_size = relay_count * sizeof (*relays);
484   if (0 < relay_count)
485   {
486     if (dcsn_size < sizeof (*dcsn) + relay_size)
487     {
488       GNUNET_break_op (0);
489       is_admitted = GNUNET_SYSERR;
490     }
491     else
492     {
493       relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
494     }
495   }
496
497   if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
498   {
499     join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
500     join_resp_size = ntohs (join_resp->size);
501   }
502   if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
503   {
504     LOG (GNUNET_ERROR_TYPE_DEBUG,
505          "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
506          dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
507     GNUNET_break_op (0);
508     is_admitted = GNUNET_SYSERR;
509   }
510
511   if (NULL != mem->join_dcsn_cb)
512     mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
513                        relay_count, relays, join_resp);
514
515   // FIXME:
516   //if (GNUNET_YES != is_admitted)
517   //  GNUNET_MULTICAST_member_part (mem);
518
519   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
520 }
521
522
523 static void
524 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
525 {
526   GNUNET_free (grp->connect_env);
527   if (NULL != grp->disconnect_cb)
528     grp->disconnect_cb (grp->disconnect_cls);
529 }
530
531
532 static void
533 origin_cleanup (void *cls)
534 {
535   struct GNUNET_MULTICAST_Origin *orig = cls;
536   group_cleanup (&orig->grp);
537   GNUNET_free (orig);
538 }
539
540
541 static void
542 member_cleanup (void *cls)
543 {
544   struct GNUNET_MULTICAST_Member *mem = cls;
545   group_cleanup (&mem->grp);
546   GNUNET_free (mem);
547 }
548
549
550 /**
551  * Function to call with the decision made for a join request.
552  *
553  * Must be called once and only once in response to an invocation of the
554  * #GNUNET_MULTICAST_JoinRequestCallback.
555  *
556  * @param join
557  *        Join request handle.
558  * @param is_admitted
559  *        #GNUNET_YES    if the join is approved,
560  *        #GNUNET_NO     if it is disapproved,
561  *        #GNUNET_SYSERR if we cannot answer the request.
562  * @param relay_count
563  *        Number of relays given.
564  * @param relays
565  *        Array of suggested peers that might be useful relays to use
566  *        when joining the multicast group (essentially a list of peers that
567  *        are already part of the multicast group and might thus be willing
568  *        to help with routing).  If empty, only this local peer (which must
569  *        be the multicast origin) is a good candidate for building the
570  *        multicast tree.  Note that it is unnecessary to specify our own
571  *        peer identity in this array.
572  * @param join_resp
573  *        Message to send in response to the joining peer;
574  *        can also be used to redirect the peer to a different group at the
575  *        application layer; this response is to be transmitted to the
576  *        peer that issued the request even if admission is denied.
577  */
578 struct GNUNET_MULTICAST_ReplayHandle *
579 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
580                                 int is_admitted,
581                                 uint16_t relay_count,
582                                 const struct GNUNET_PeerIdentity *relays,
583                                 const struct GNUNET_MessageHeader *join_resp)
584 {
585   struct GNUNET_MULTICAST_Group *grp = join->group;
586   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
587   uint16_t relay_size = relay_count * sizeof (*relays);
588
589   struct MulticastJoinDecisionMessageHeader *hdcsn;
590   struct MulticastJoinDecisionMessage *dcsn;
591   struct GNUNET_MQ_Envelope *
592     env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
593                                GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
594   hdcsn->member_pub_key = join->member_pub_key;
595   hdcsn->peer = join->peer;
596
597   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
598   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
599   dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
600   dcsn->is_admitted = htonl (is_admitted);
601   dcsn->relay_count = htonl (relay_count);
602   if (0 < relay_size)
603     GNUNET_memcpy (&dcsn[1], relays, relay_size);
604   if (0 < join_resp_size)
605     GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
606
607   GNUNET_MQ_send (grp->mq, env);
608   GNUNET_free (join);
609   return NULL;
610 }
611
612
613 /**
614  * Replay a message fragment for the multicast group.
615  *
616  * @param rh
617  *        Replay handle identifying which replay operation was requested.
618  * @param msg
619  *        Replayed message fragment, NULL if not found / an error occurred.
620  * @param ec
621  *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
622  *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
623  */
624 void
625 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
626                                   const struct GNUNET_MessageHeader *msg,
627                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
628 {
629   uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
630   struct MulticastReplayResponseMessage *res;
631   struct GNUNET_MQ_Envelope *
632     env = GNUNET_MQ_msg_extra (res, msg_size,
633                                GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
634   res->fragment_id = rh->req.fragment_id;
635   res->message_id = rh->req.message_id;
636   res->fragment_offset = rh->req.fragment_offset;
637   res->flags = rh->req.flags;
638   res->error_code = htonl (ec);
639
640   if (GNUNET_MULTICAST_REC_OK == ec)
641   {
642     GNUNET_assert (NULL != msg);
643     GNUNET_memcpy (&res[1], msg, msg_size);
644   }
645
646   GNUNET_MQ_send (rh->grp->mq, env);
647
648   if (GNUNET_MULTICAST_REC_OK != ec)
649     GNUNET_free (rh);
650 }
651
652
653 /**
654  * Indicate the end of the replay session.
655  *
656  * Invalidates the replay handle.
657  *
658  * @param rh
659  *        Replay session to end.
660  */
661 void
662 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
663 {
664   struct MulticastReplayResponseMessage *end;
665   struct GNUNET_MQ_Envelope *
666     env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
667
668   end->fragment_id = rh->req.fragment_id;
669   end->message_id = rh->req.message_id;
670   end->fragment_offset = rh->req.fragment_offset;
671   end->flags = rh->req.flags;
672
673   GNUNET_MQ_send (rh->grp->mq, env);
674   GNUNET_free (rh);
675 }
676
677
678 /**
679  * Replay a message for the multicast group.
680  *
681  * @param rh
682  *        Replay handle identifying which replay operation was requested.
683  * @param notify
684  *        Function to call to get the message.
685  * @param notify_cls
686  *        Closure for @a notify.
687  */
688 void
689 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
690                                    GNUNET_MULTICAST_ReplayTransmitNotify notify,
691                                    void *notify_cls)
692 {
693 }
694
695
696 void
697 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
698
699
700 static void
701 origin_reconnect (void *cls)
702 {
703   origin_connect (cls);
704 }
705
706
707 /**
708  * Origin client disconnected from service.
709  *
710  * Reconnect after backoff period.=
711  */
712 void
713 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
714 {
715   struct GNUNET_MULTICAST_Origin *orig = cls;
716   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
717
718   LOG (GNUNET_ERROR_TYPE_DEBUG,
719        "Origin client disconnected (%d), re-connecting\n",
720        (int) error);
721   if (NULL != grp->mq)
722   {
723     GNUNET_MQ_destroy (grp->mq);
724     grp->mq = NULL;
725   }
726
727   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
728                                                       &origin_reconnect,
729                                                       orig);
730   grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
731 }
732
733
734 /**
735  * Connect to service as origin.
736  */
737 void
738 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
739 {
740   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
741
742   GNUNET_MQ_hd_var_size (group_message,
743                          GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
744                          struct GNUNET_MULTICAST_MessageHeader);
745
746   GNUNET_MQ_hd_var_size (origin_request,
747                          GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
748                          struct GNUNET_MULTICAST_RequestHeader);
749
750   GNUNET_MQ_hd_fixed_size (group_fragment_ack,
751                            GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
752                            struct GNUNET_MessageHeader);
753
754   GNUNET_MQ_hd_var_size (group_join_request,
755                          GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
756                          struct MulticastJoinRequestMessage);
757
758   GNUNET_MQ_hd_fixed_size (group_replay_request,
759                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
760                            struct MulticastReplayRequestMessage);
761
762   struct GNUNET_MQ_MessageHandler handlers[] = {
763     make_group_message_handler (grp),
764     make_origin_request_handler (orig),
765     make_group_fragment_ack_handler (grp),
766     make_group_join_request_handler (grp),
767     make_group_replay_request_handler (grp),
768     GNUNET_MQ_handler_end ()
769   };
770
771   grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
772                                    handlers, origin_disconnected, orig);
773   if (NULL == grp->mq)
774   {
775     GNUNET_break (0);
776     return;
777   }
778   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
779 }
780
781
782 /**
783  * Start a multicast group.
784  *
785  * Will advertise the origin in the P2P overlay network under the respective
786  * public key so that other peer can find this peer to join it.  Peers that
787  * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
788  * either an existing group member or to the origin.  If the joining is
789  * approved, the member is cleared for @e replay and will begin to receive
790  * messages transmitted to the group.  If joining is disapproved, the failed
791  * candidate will be given a response.  Members in the group can send messages
792  * to the origin (one at a time).
793  *
794  * @param cfg
795  *        Configuration to use.
796  * @param priv_key
797  *        ECC key that will be used to sign messages for this
798  *        multicast session; public key is used to identify the multicast group;
799  * @param max_fragment_id
800  *        Maximum fragment ID already sent to the group.
801  *        0 for a new group.
802  * @param join_request_cb
803  *        Function called to approve / disapprove joining of a peer.
804  * @param replay_frag_cb
805  *        Function that can be called to replay a message fragment.
806  * @param replay_msg_cb
807  *        Function that can be called to replay a message.
808  * @param request_cb
809  *        Function called with message fragments from group members.
810  * @param message_cb
811  *        Function called with the message fragments sent to the
812  *        network by GNUNET_MULTICAST_origin_to_all().  These message fragments
813  *        should be stored for answering replay requests later.
814  * @param cls
815  *        Closure for the various callbacks that follow.
816  *
817  * @return Handle for the origin, NULL on error.
818  */
819 struct GNUNET_MULTICAST_Origin *
820 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
821                                const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
822                                uint64_t max_fragment_id,
823                                GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
824                                GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
825                                GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
826                                GNUNET_MULTICAST_RequestCallback request_cb,
827                                GNUNET_MULTICAST_MessageCallback message_cb,
828                                void *cls)
829 {
830   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
831   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
832
833   struct MulticastOriginStartMessage *start;
834   grp->connect_env = GNUNET_MQ_msg (start,
835                                     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
836   start->max_fragment_id = max_fragment_id;
837   GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
838
839   grp->is_origin = GNUNET_YES;
840   grp->cfg = cfg;
841
842   grp->cb_cls = cls;
843   grp->join_req_cb = join_request_cb;
844   grp->replay_frag_cb = replay_frag_cb;
845   grp->replay_msg_cb = replay_msg_cb;
846   grp->message_cb = message_cb;
847
848   orig->request_cb = request_cb;
849
850   origin_connect (orig);
851   return orig;
852 }
853
854
855 /**
856  * Stop a multicast group.
857  *
858  * @param origin
859  *        Multicast group to stop.
860  */
861 void
862 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
863                               GNUNET_ContinuationCallback stop_cb,
864                               void *stop_cls)
865 {
866   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
867
868   grp->is_disconnecting = GNUNET_YES;
869   grp->disconnect_cb = stop_cb;
870   grp->disconnect_cls = stop_cls;
871
872   // FIXME: wait till queued messages are sent
873   if (NULL != grp->mq)
874   {
875     GNUNET_MQ_destroy (grp->mq);
876     grp->mq = NULL;
877   }
878   origin_cleanup (orig);
879 }
880
881
882 static void
883 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
884 {
885   LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
886   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
887   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
888   GNUNET_assert (GNUNET_YES == grp->in_transmit);
889
890   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
891   struct GNUNET_MULTICAST_MessageHeader *msg;
892   struct GNUNET_MQ_Envelope *
893     env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
894                                GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
895
896   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
897
898   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
899       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
900   {
901     LOG (GNUNET_ERROR_TYPE_ERROR,
902          "%p OriginTransmitNotify() returned error or invalid message size.\n",
903          orig);
904     /* FIXME: handle error */
905     GNUNET_free (env);
906     return;
907   }
908
909   if (GNUNET_NO == ret && 0 == buf_size)
910   {
911     LOG (GNUNET_ERROR_TYPE_DEBUG,
912          "%p OriginTransmitNotify() - transmission paused.\n", orig);
913     GNUNET_free (env);
914     return; /* Transmission paused. */
915   }
916
917   msg->header.size = htons (sizeof (*msg) + buf_size);
918   msg->message_id = GNUNET_htonll (tmit->message_id);
919   msg->group_generation = tmit->group_generation;
920   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
921   tmit->fragment_offset += sizeof (*msg) + buf_size;
922
923   grp->acks_pending++;
924   GNUNET_MQ_send (grp->mq, env);
925
926   if (GNUNET_YES == ret)
927     grp->in_transmit = GNUNET_NO;
928 }
929
930
931 /**
932  * Send a message to the multicast group.
933  *
934  * @param orig
935  *        Handle to the multicast group.
936  * @param message_id
937  *        Application layer ID for the message.  Opaque to multicast.
938  * @param group_generation
939  *        Group generation of the message.
940  *        Documented in struct GNUNET_MULTICAST_MessageHeader.
941  * @param notify
942  *        Function to call to get the message.
943  * @param notify_cls
944  *        Closure for @a notify.
945  *
946  * @return Message handle on success,
947  *         NULL on error (i.e. another request is already pending).
948  */
949 struct GNUNET_MULTICAST_OriginTransmitHandle *
950 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
951                                 uint64_t message_id,
952                                 uint64_t group_generation,
953                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
954                                 void *notify_cls)
955 {
956   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
957   if (GNUNET_YES == grp->in_transmit)
958     return NULL;
959   grp->in_transmit = GNUNET_YES;
960
961   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
962   tmit->origin = orig;
963   tmit->message_id = message_id;
964   tmit->fragment_offset = 0;
965   tmit->group_generation = group_generation;
966   tmit->notify = notify;
967   tmit->notify_cls = notify_cls;
968
969   origin_to_all (orig);
970   return tmit;
971 }
972
973
974 /**
975  * Resume message transmission to multicast group.
976  *
977  * @param th
978  *        Transmission to cancel.
979  */
980 void
981 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
982 {
983   struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
984   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
985     return;
986   origin_to_all (th->origin);
987 }
988
989
990 /**
991  * Cancel request for message transmission to multicast group.
992  *
993  * @param th
994  *        Transmission to cancel.
995  */
996 void
997 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
998 {
999   th->origin->grp.in_transmit = GNUNET_NO;
1000 }
1001
1002
1003  void
1004 member_connect (struct GNUNET_MULTICAST_Member *mem);
1005
1006
1007 static void
1008 member_reconnect (void *cls)
1009 {
1010   member_connect (cls);
1011 }
1012
1013
1014 /**
1015  * Member client disconnected from service.
1016  *
1017  * Reconnect after backoff period.
1018  */
1019 void
1020 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1021 {
1022   struct GNUNET_MULTICAST_Member *mem = cls;
1023   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1024
1025   LOG (GNUNET_ERROR_TYPE_DEBUG,
1026        "Member client disconnected (%d), re-connecting\n",
1027        (int) error);
1028   GNUNET_MQ_destroy (grp->mq);
1029   grp->mq = NULL;
1030
1031   grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
1032                                                       &member_reconnect,
1033                                                       mem);
1034   grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
1035 }
1036
1037
1038 /**
1039  * Connect to service as member.
1040  */
1041 void
1042 member_connect (struct GNUNET_MULTICAST_Member *mem)
1043 {
1044   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1045
1046   GNUNET_MQ_hd_var_size (group_message,
1047                          GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1048                          struct GNUNET_MULTICAST_MessageHeader);
1049
1050   GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1051                            GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1052                            struct GNUNET_MessageHeader);
1053
1054   GNUNET_MQ_hd_var_size (group_join_request,
1055                          GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1056                          struct MulticastJoinRequestMessage);
1057
1058   GNUNET_MQ_hd_var_size (member_join_decision,
1059                          GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1060                          struct MulticastJoinDecisionMessageHeader);
1061
1062   GNUNET_MQ_hd_fixed_size (group_replay_request,
1063                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1064                            struct MulticastReplayRequestMessage);
1065
1066   GNUNET_MQ_hd_var_size (member_replay_response,
1067                          GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1068                          struct MulticastReplayResponseMessage);
1069
1070   struct GNUNET_MQ_MessageHandler handlers[] = {
1071     make_group_message_handler (grp),
1072     make_group_fragment_ack_handler (grp),
1073     make_group_join_request_handler (grp),
1074     make_member_join_decision_handler (mem),
1075     make_group_replay_request_handler (grp),
1076     make_member_replay_response_handler (mem),
1077     GNUNET_MQ_handler_end ()
1078   };
1079
1080   grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
1081                                    handlers, member_disconnected, mem);
1082   if (NULL == grp->mq)
1083   {
1084     GNUNET_break (0);
1085     return;
1086   }
1087   GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1088 }
1089
1090
1091 /**
1092  * Join a multicast group.
1093  *
1094  * The entity joining is always the local peer.  Further information about the
1095  * candidate can be provided in the @a join_request message.  If the join fails, the
1096  * @a message_cb is invoked with a (failure) response and then with NULL.  If
1097  * the join succeeds, outstanding (state) messages and ongoing multicast
1098  * messages will be given to the @a message_cb until the member decides to part
1099  * the group.  The @a replay_cb function may be called at any time by the
1100  * multicast service to support relaying messages to other members of the group.
1101  *
1102  * @param cfg
1103  *        Configuration to use.
1104  * @param group_key
1105  *        ECC public key that identifies the group to join.
1106  * @param member_key
1107  *        ECC key that identifies the member
1108  *        and used to sign requests sent to the origin.
1109  * @param origin
1110  *        Peer ID of the origin to send unicast requsets to.  If NULL,
1111  *        unicast requests are sent back via multiple hops on the reverse path
1112  *        of multicast messages.
1113  * @param relay_count
1114  *        Number of peers in the @a relays array.
1115  * @param relays
1116  *        Peer identities of members of the group, which serve as relays
1117  *        and can be used to join the group at. and send the @a join_request to.
1118  *        If empty, the @a join_request is sent directly to the @a origin.
1119  * @param join_msg
1120  *        Application-dependent join message to be passed to the peer @a origin.
1121  * @param join_request_cb
1122  *        Function called to approve / disapprove joining of a peer.
1123  * @param join_decision_cb
1124  *        Function called to inform about the join decision.
1125  * @param replay_frag_cb
1126  *        Function that can be called to replay message fragments
1127  *        this peer already knows from this group. NULL if this
1128  *        client is unable to support replay.
1129  * @param replay_msg_cb
1130  *        Function that can be called to replay message fragments
1131  *        this peer already knows from this group. NULL if this
1132  *        client is unable to support replay.
1133  * @param message_cb
1134  *        Function to be called for all message fragments we
1135  *        receive from the group, excluding those our @a replay_cb
1136  *        already has.
1137  * @param cls
1138  *        Closure for callbacks.
1139  *
1140  * @return Handle for the member, NULL on error.
1141  */
1142 struct GNUNET_MULTICAST_Member *
1143 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1144                               const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1145                               const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1146                               const struct GNUNET_PeerIdentity *origin,
1147                               uint16_t relay_count,
1148                               const struct GNUNET_PeerIdentity *relays,
1149                               const struct GNUNET_MessageHeader *join_msg,
1150                               GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1151                               GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1152                               GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1153                               GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1154                               GNUNET_MULTICAST_MessageCallback message_cb,
1155                               void *cls)
1156 {
1157   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1158   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1159
1160   uint16_t relay_size = relay_count * sizeof (*relays);
1161   uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1162   struct MulticastMemberJoinMessage *join;
1163   grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1164                                           GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1165   join->group_pub_key = *group_pub_key;
1166   join->member_key = *member_key;
1167   join->origin = *origin;
1168   join->relay_count = ntohl (relay_count);
1169   if (0 < relay_size)
1170     GNUNET_memcpy (&join[1], relays, relay_size);
1171   if (0 < join_msg_size)
1172     GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1173
1174   grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1175   grp->is_origin = GNUNET_NO;
1176   grp->cfg = cfg;
1177
1178   mem->join_dcsn_cb = join_decision_cb;
1179   grp->join_req_cb = join_request_cb;
1180   grp->replay_frag_cb = replay_frag_cb;
1181   grp->replay_msg_cb = replay_msg_cb;
1182   grp->message_cb = message_cb;
1183   grp->cb_cls = cls;
1184
1185   member_connect (mem);
1186   return mem;
1187 }
1188
1189
1190 /**
1191  * Part a multicast group.
1192  *
1193  * Disconnects from all group members and invalidates the @a member handle.
1194  *
1195  * An application-dependent part message can be transmitted beforehand using
1196  * #GNUNET_MULTICAST_member_to_origin())
1197  *
1198  * @param member
1199  *        Membership handle.
1200  */
1201 void
1202 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1203                               GNUNET_ContinuationCallback part_cb,
1204                               void *part_cls)
1205 {
1206   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
1207   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1208
1209   grp->is_disconnecting = GNUNET_YES;
1210   grp->disconnect_cb = part_cb;
1211   grp->disconnect_cls = part_cls;
1212
1213   mem->join_dcsn_cb = NULL;
1214   grp->join_req_cb = NULL;
1215   grp->message_cb = NULL;
1216   grp->replay_msg_cb = NULL;
1217   grp->replay_frag_cb = NULL;
1218
1219   // FIXME: wait till queued messages are sent
1220   if (NULL != grp->mq)
1221   {
1222     GNUNET_MQ_destroy (grp->mq);
1223     grp->mq = NULL;
1224   }
1225   member_cleanup (mem);
1226 }
1227
1228
1229 void
1230 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1231                        uint64_t fragment_id,
1232                        uint64_t message_id,
1233                        uint64_t fragment_offset,
1234                        uint64_t flags)
1235 {
1236   struct MulticastReplayRequestMessage *rep;
1237   struct GNUNET_MQ_Envelope *
1238     env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1239
1240   rep->fragment_id = GNUNET_htonll (fragment_id);
1241   rep->message_id = GNUNET_htonll (message_id);
1242   rep->fragment_offset = GNUNET_htonll (fragment_offset);
1243   rep->flags = GNUNET_htonll (flags);
1244
1245   GNUNET_MQ_send (mem->grp.mq, env);
1246 }
1247
1248
1249 /**
1250  * Request a fragment to be replayed by fragment ID.
1251  *
1252  * Useful if messages below the @e max_known_fragment_id given when joining are
1253  * needed and not known to the client.
1254  *
1255  * @param member
1256  *        Membership handle.
1257  * @param fragment_id
1258  *        ID of a message fragment that this client would like to see replayed.
1259  * @param flags
1260  *        Additional flags for the replay request.
1261  *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1262  *
1263  * @return Replay request handle.
1264  */
1265 struct GNUNET_MULTICAST_MemberReplayHandle *
1266 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1267                                          uint64_t fragment_id,
1268                                          uint64_t flags)
1269 {
1270   member_replay_request (mem, fragment_id, 0, 0, flags);
1271   // FIXME: return something useful
1272   return NULL;
1273 }
1274
1275
1276 /**
1277  * Request a message fragment to be replayed.
1278  *
1279  * Useful if messages below the @e max_known_fragment_id given when joining are
1280  * needed and not known to the client.
1281  *
1282  * @param member
1283  *        Membership handle.
1284  * @param message_id
1285  *        ID of the message this client would like to see replayed.
1286  * @param fragment_offset
1287  *        Offset of the fragment within the message to replay.
1288  * @param flags
1289  *        Additional flags for the replay request.
1290  *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1291  *
1292  * @return Replay request handle, NULL on error.
1293  */
1294 struct GNUNET_MULTICAST_MemberReplayHandle *
1295 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1296                                         uint64_t message_id,
1297                                         uint64_t fragment_offset,
1298                                         uint64_t flags)
1299 {
1300   member_replay_request (mem, 0, message_id, fragment_offset, flags);
1301   // FIXME: return something useful
1302   return NULL;
1303 }
1304
1305
1306 static void
1307 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1308 {
1309   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1310   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1311   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1312   GNUNET_assert (GNUNET_YES == grp->in_transmit);
1313
1314   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1315   struct GNUNET_MULTICAST_RequestHeader *req;
1316   struct GNUNET_MQ_Envelope *
1317     env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1318                                GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1319
1320   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1321
1322   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1323       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1324   {
1325     LOG (GNUNET_ERROR_TYPE_ERROR,
1326          "MemberTransmitNotify() returned error or invalid message size. "
1327          "ret=%d, buf_size=%u\n", ret, buf_size);
1328     /* FIXME: handle error */
1329     GNUNET_free (req);
1330     return;
1331   }
1332
1333   if (GNUNET_NO == ret && 0 == buf_size)
1334   {
1335     /* Transmission paused. */
1336     GNUNET_free (req);
1337     return;
1338   }
1339
1340   req->header.size = htons (sizeof (*req) + buf_size);
1341   req->request_id = GNUNET_htonll (tmit->request_id);
1342   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1343   tmit->fragment_offset += sizeof (*req) + buf_size;
1344
1345   GNUNET_MQ_send (grp->mq, env);
1346
1347   if (GNUNET_YES == ret)
1348     grp->in_transmit = GNUNET_NO;
1349 }
1350
1351
1352 /**
1353  * Send a message to the origin of the multicast group.
1354  *
1355  * @param mem
1356  *        Membership handle.
1357  * @param request_id
1358  *        Application layer ID for the request.  Opaque to multicast.
1359  * @param notify
1360  *        Callback to call to get the message.
1361  * @param notify_cls
1362  *        Closure for @a notify.
1363  *
1364  * @return Handle to cancel request, NULL on error (i.e. request already pending).
1365  */
1366 struct GNUNET_MULTICAST_MemberTransmitHandle *
1367 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1368                                    uint64_t request_id,
1369                                    GNUNET_MULTICAST_MemberTransmitNotify notify,
1370                                    void *notify_cls)
1371 {
1372   if (GNUNET_YES == mem->grp.in_transmit)
1373     return NULL;
1374   mem->grp.in_transmit = GNUNET_YES;
1375
1376   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1377   tmit->member = mem;
1378   tmit->request_id = request_id;
1379   tmit->fragment_offset = 0;
1380   tmit->notify = notify;
1381   tmit->notify_cls = notify_cls;
1382
1383   member_to_origin (mem);
1384   return tmit;
1385 }
1386
1387
1388 /**
1389  * Resume message transmission to origin.
1390  *
1391  * @param th
1392  *        Transmission to cancel.
1393  */
1394 void
1395 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1396 {
1397   struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1398   if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1399     return;
1400   member_to_origin (th->member);
1401 }
1402
1403
1404 /**
1405  * Cancel request for message transmission to origin.
1406  *
1407  * @param th
1408  *        Transmission to cancel.
1409  */
1410 void
1411 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1412 {
1413   th->member->grp.in_transmit = GNUNET_NO;
1414 }
1415
1416
1417 /* end of multicast_api.c */