Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "multicast.h"
34
35 /**
36  * Handle to our current configuration.
37  */
38 static const struct GNUNET_CONFIGURATION_Handle *cfg;
39
40 /**
41  * Service handle.
42  */
43 static struct GNUNET_SERVICE_Handle *service;
44
45 /**
46  * CADET handle.
47  */
48 static struct GNUNET_CADET_Handle *cadet;
49
50 /**
51  * Identity of this peer.
52  */
53 static struct GNUNET_PeerIdentity this_peer;
54
55 /**
56  * Handle to the statistics service.
57  */
58 static struct GNUNET_STATISTICS_Handle *stats;
59
60 /**
61  * All connected origin clients.
62  * Group's pub_key_hash -> struct Origin * (uniq)
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *origins;
65
66 /**
67  * All connected member clients.
68  * Group's pub_key_hash -> struct Member * (multi)
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *members;
71
72 /**
73  * Connected member clients per group.
74  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
77
78 /**
79  * Incoming CADET channels with connected children in the tree.
80  * Group's pub_key_hash -> struct Channel * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
83
84 /**
85  * Outgoing CADET channels connecting to parents in the tree.
86  * Group's pub_key_hash -> struct Channel * (multi)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
89
90 /**
91  * Incoming replay requests from CADET.
92  * Group's pub_key_hash ->
93  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
94  */
95 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
96
97 /**
98  * Incoming replay requests from clients.
99  * Group's pub_key_hash ->
100  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client *
101  */
102 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
103
104
105 /**
106  * Join status of a remote peer.
107  */
108 enum JoinStatus
109 {
110   JOIN_REFUSED  = -1,
111   JOIN_NOT_ASKED = 0,
112   JOIN_WAITING   = 1,
113   JOIN_ADMITTED  = 2,
114 };
115
116 enum ChannelDirection
117 {
118   DIR_INCOMING = 0,
119   DIR_OUTGOING = 1,
120 };
121
122
123 /**
124  * Context for a CADET channel.
125  */
126 struct Channel
127 {
128   /**
129    * Group the channel belongs to.
130    *
131    * Only set for outgoing channels.
132    */
133   struct Group *group;
134
135   /**
136    * CADET channel.
137    */
138   struct GNUNET_CADET_Channel *channel;
139
140   /**
141    * CADET transmission handle.
142    */
143   struct GNUNET_CADET_TransmitHandle *tmit_handle;
144
145   /**
146    * Public key of the target group.
147    */
148   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
149
150   /**
151    * Hash of @a group_pub_key.
152    */
153   struct GNUNET_HashCode group_pub_hash;
154
155   /**
156    * Public key of the joining member.
157    */
158   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
159
160   /**
161    * Remote peer identity.
162    */
163   struct GNUNET_PeerIdentity peer;
164
165   /**
166    * Current window size, set by cadet_notify_window_change()
167    */
168   int32_t window_size;
169
170   /**
171    * Is the connection established?
172    */
173   int8_t is_connected;
174
175   /**
176    * Is the remote peer admitted to the group?
177    * @see enum JoinStatus
178    */
179   int8_t join_status;
180
181   /**
182    * Number of messages waiting to be sent to CADET.
183    */
184   uint8_t msgs_pending;
185
186   /**
187    * Channel direction.
188    * @see enum ChannelDirection
189    */
190   uint8_t direction;
191 };
192
193
194 /**
195  * List of connected clients.
196  */
197 struct ClientList
198 {
199   struct ClientList *prev;
200   struct ClientList *next;
201   struct GNUNET_SERVICE_Client *client;
202 };
203
204
205 /**
206  * Client context for an origin or member.
207  */
208 struct Group
209 {
210   struct ClientList *clients_head;
211   struct ClientList *clients_tail;
212
213   /**
214    * Public key of the group.
215    */
216   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
217
218   /**
219    * Hash of @a pub_key.
220    */
221   struct GNUNET_HashCode pub_key_hash;
222
223   /**
224    * CADET port hash.
225    */
226   struct GNUNET_HashCode cadet_port_hash;
227
228   /**
229    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
230    */
231   uint8_t disconnected;
232
233   /**
234    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
235    */
236   uint8_t is_origin;
237
238   union {
239     struct Origin *origin;
240     struct Member *member;
241   };
242 };
243
244
245 /**
246 * Client context for a group's origin.
247  */
248 struct Origin
249 {
250   struct Group group;
251
252   /**
253    * Private key of the group.
254    */
255   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
256
257   /**
258    * CADET port.
259    */
260   struct GNUNET_CADET_Port *cadet_port;
261
262   /**
263    * Last message fragment ID sent to the group.
264    */
265   uint64_t max_fragment_id;
266 };
267
268
269 /**
270  * Client context for a group member.
271  */
272 struct Member
273 {
274   struct Group group;
275
276   /**
277    * Private key of the member.
278    */
279   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
280
281   /**
282    * Public key of the member.
283    */
284   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
285
286   /**
287    * Hash of @a pub_key.
288    */
289   struct GNUNET_HashCode pub_key_hash;
290
291   /**
292    * Join request sent to the origin / members.
293    */
294   struct MulticastJoinRequestMessage *join_req;
295
296   /**
297    * Join decision sent in reply to our request.
298    *
299    * Only a positive decision is stored here, in case of a negative decision the
300    * client is disconnected.
301    */
302   struct MulticastJoinDecisionMessageHeader *join_dcsn;
303
304   /**
305    * CADET channel to the origin.
306    */
307   struct Channel *origin_channel;
308
309   /**
310    * Peer identity of origin.
311    */
312   struct GNUNET_PeerIdentity origin;
313
314   /**
315    * Peer identity of relays (other members to connect).
316    */
317   struct GNUNET_PeerIdentity *relays;
318
319   /**
320    * Last request fragment ID sent to the origin.
321    */
322   uint64_t max_fragment_id;
323
324   /**
325    * Number of @a relays.
326    */
327   uint32_t relay_count;
328 };
329
330
331 /**
332  * Client context.
333  */
334 struct Client {
335   struct GNUNET_SERVICE_Client *client;
336   struct Group *group;
337 };
338
339
340 struct ReplayRequestKey
341 {
342   uint64_t fragment_id;
343   uint64_t message_id;
344   uint64_t fragment_offset;
345   uint64_t flags;
346 };
347
348
349 static struct Channel *
350 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer);
351
352 static void
353 cadet_channel_destroy (struct Channel *chn);
354
355 static void
356 client_send_join_decision (struct Member *mem,
357                            const struct MulticastJoinDecisionMessageHeader *hdcsn);
358
359
360 /**
361  * Task run during shutdown.
362  *
363  * @param cls unused
364  */
365 static void
366 shutdown_task (void *cls)
367 {
368   if (NULL != cadet)
369   {
370     GNUNET_CADET_disconnect (cadet);
371     cadet = NULL;
372   }
373   if (NULL != stats)
374   {
375     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
376     stats = NULL;
377   }
378   /* FIXME: do more clean up here */
379 }
380
381
382 /**
383  * Clean up origin data structures after a client disconnected.
384  */
385 static void
386 cleanup_origin (struct Origin *orig)
387 {
388   struct Group *grp = &orig->group;
389   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
390   if (NULL != orig->cadet_port)
391   {
392     GNUNET_CADET_close_port (orig->cadet_port);
393     orig->cadet_port = NULL;
394   }
395   GNUNET_free (orig);
396 }
397
398
399 /**
400  * Clean up member data structures after a client disconnected.
401  */
402 static void
403 cleanup_member (struct Member *mem)
404 {
405   struct Group *grp = &mem->group;
406   struct GNUNET_CONTAINER_MultiHashMap *
407     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
408                                                  &grp->pub_key_hash);
409   GNUNET_assert (NULL != grp_mem);
410   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
411
412   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
413   {
414     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
415                                           grp_mem);
416     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
417   }
418   if (NULL != mem->join_dcsn)
419   {
420     GNUNET_free (mem->join_dcsn);
421     mem->join_dcsn = NULL;
422   }
423   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
424   GNUNET_free (mem);
425 }
426
427
428 /**
429  * Clean up group data structures after a client disconnected.
430  */
431 static void
432 cleanup_group (struct Group *grp)
433 {
434   (GNUNET_YES == grp->is_origin)
435     ? cleanup_origin (grp->origin)
436     : cleanup_member (grp->member);
437 }
438
439
440 void
441 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
442                  uint64_t fragment_offset, uint64_t flags,
443                  struct GNUNET_HashCode *key_hash)
444 {
445   struct ReplayRequestKey key = {
446     .fragment_id = fragment_id,
447     .message_id = message_id,
448     .fragment_offset = fragment_offset,
449     .flags = flags,
450   };
451   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
452 }
453
454
455 /**
456  * Remove channel from replay request hashmap.
457  *
458  * @param chn
459  *        Channel to remove.
460  *
461  * @return #GNUNET_YES if there are more entries to process,
462  *         #GNUNET_NO when reached end of hashmap.
463  */
464 static int
465 replay_req_remove_cadet (struct Channel *chn)
466 {
467   if (NULL == chn || NULL == chn->group)
468     return GNUNET_SYSERR;
469
470   struct GNUNET_CONTAINER_MultiHashMap *
471     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
472                                                         &chn->group->pub_key_hash);
473   if (NULL == grp_replay_req)
474     return GNUNET_NO;
475
476   struct GNUNET_CONTAINER_MultiHashMapIterator *
477     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
478   struct GNUNET_HashCode key;
479   const struct Channel *c;
480   while (GNUNET_YES
481          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
482                                                          (const void **) &c))
483   {
484     if (c == chn)
485     {
486       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
487       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
488       return GNUNET_YES;
489     }
490   }
491   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
492   return GNUNET_NO;
493 }
494
495
496 /**
497  * Remove client from replay request hashmap.
498  *
499  * @param client
500  *        Client to remove.
501  *
502  * @return #GNUNET_YES if there are more entries to process,
503  *         #GNUNET_NO when reached end of hashmap.
504  */
505 static int
506 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client)
507 {
508   struct GNUNET_CONTAINER_MultiHashMap *
509     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
510                                                         &grp->pub_key_hash);
511   if (NULL == grp_replay_req)
512     return GNUNET_NO;
513
514   struct GNUNET_CONTAINER_MultiHashMapIterator *
515     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
516   struct GNUNET_HashCode key;
517   const struct GNUNET_SERVICE_Client *c;
518   while (GNUNET_YES
519          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
520                                                          (const void **) &c))
521   {
522     if (c == client)
523     {
524       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client);
525       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
526       return GNUNET_YES;
527     }
528   }
529   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
530   return GNUNET_NO;
531 }
532
533
534 /**
535  * Send message to a client.
536  */
537 static void
538 client_send (struct GNUNET_SERVICE_Client *client,
539              const struct GNUNET_MessageHeader *msg)
540 {
541   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
542               "%p Sending message to client.\n", client);
543
544   struct GNUNET_MQ_Envelope *
545     env = GNUNET_MQ_msg_copy (msg);
546
547   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
548                   env);
549 }
550
551
552 /**
553  * Send message to all clients connected to the group.
554  */
555 static void
556 client_send_group (const struct Group *grp,
557                    const struct GNUNET_MessageHeader *msg)
558 {
559   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
560               "%p Sending message to all clients of the group.\n", grp);
561
562   struct ClientList *cl = grp->clients_head;
563   while (NULL != cl)
564   {
565     struct GNUNET_MQ_Envelope *
566       env = GNUNET_MQ_msg_copy (msg);
567
568     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
569                     env);
570     cl = cl->next;
571   }
572 }
573
574
575 /**
576  * Iterator callback for sending a message to origin clients.
577  */
578 static int
579 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
580                        void *origin)
581 {
582   const struct GNUNET_MessageHeader *msg = cls;
583   struct Member *orig = origin;
584
585   client_send_group (&orig->group, msg);
586   return GNUNET_YES;
587 }
588
589
590 /**
591  * Iterator callback for sending a message to member clients.
592  */
593 static int
594 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
595                        void *member)
596 {
597   const struct GNUNET_MessageHeader *msg = cls;
598   struct Member *mem = member;
599
600   if (NULL != mem->join_dcsn)
601   { /* Only send message to admitted members */
602     client_send_group (&mem->group, msg);
603   }
604   return GNUNET_YES;
605 }
606
607
608 /**
609  * Send message to all origin and member clients connected to the group.
610  *
611  * @param pub_key_hash
612  *        H(key_pub) of the group.
613  * @param msg
614  *        Message to send.
615  */
616 static int
617 client_send_all (struct GNUNET_HashCode *pub_key_hash,
618                  const struct GNUNET_MessageHeader *msg)
619 {
620   int n = 0;
621   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
622                                                    client_send_origin_cb,
623                                                    (void *) msg);
624   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
625                                                    client_send_member_cb,
626                                                    (void *) msg);
627   return n;
628 }
629
630
631 /**
632  * Send message to a random origin client or a random member client.
633  *
634  * @param grp  The group to send @a msg to.
635  * @param msg  Message to send.
636  */
637 static int
638 client_send_random (struct GNUNET_HashCode *pub_key_hash,
639                     const struct GNUNET_MessageHeader *msg)
640 {
641   int n = 0;
642   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
643                                                  (void *) msg);
644   if (n <= 0)
645     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
646                                                    (void *) msg);
647   return n;
648 }
649
650
651 /**
652  * Send message to all origin clients connected to the group.
653  *
654  * @param pub_key_hash
655  *        H(key_pub) of the group.
656  * @param msg
657  *        Message to send.
658  */
659 static int
660 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
661                     const struct GNUNET_MessageHeader *msg)
662 {
663   int n = 0;
664   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
665                                                    client_send_origin_cb,
666                                                    (void *) msg);
667   return n;
668 }
669
670
671 /**
672  * Send fragment acknowledgement to all clients of the channel.
673  *
674  * @param pub_key_hash
675  *        H(key_pub) of the group.
676  */
677 static void
678 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
679 {
680   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
681               "Sending message ACK to client.\n");
682
683   static struct GNUNET_MessageHeader *msg = NULL;
684   if (NULL == msg)
685   {
686     msg = GNUNET_malloc (sizeof (*msg));
687     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
688     msg->size = htons (sizeof (*msg));
689   }
690   client_send_all (pub_key_hash, msg);
691 }
692
693
694 struct CadetTransmitClosure
695 {
696   struct Channel *chn;
697   const struct GNUNET_MessageHeader *msg;
698 };
699
700
701 /**
702  * Send a message to a CADET channel.
703  *
704  * @param chn  Channel.
705  * @param msg  Message.
706  */
707 static void
708 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
709 {
710   struct GNUNET_MQ_Envelope *
711     env = GNUNET_MQ_msg_copy (msg);
712
713   GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env);
714
715   if (0 < chn->window_size)
716   {
717     client_send_ack (&chn->group_pub_hash);
718   }
719   else
720   {
721     chn->msgs_pending++;
722     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
723                 "%p Queuing message. Pending messages: %u\n",
724                 chn, chn->msgs_pending);
725   }
726 }
727
728
729 /**
730  * Create CADET channel and send a join request.
731  */
732 static void
733 cadet_send_join_request (struct Member *mem)
734 {
735   mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin);
736   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
737
738   uint32_t i;
739   for (i = 0; i < mem->relay_count; i++)
740   {
741     struct Channel *
742       chn = cadet_channel_create (&mem->group, &mem->relays[i]);
743     cadet_send_channel (chn, &mem->join_req->header);
744   }
745 }
746
747
748 static int
749 cadet_send_join_decision_cb (void *cls,
750                              const struct GNUNET_HashCode *group_pub_hash,
751                              void *channel)
752 {
753   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
754   struct Channel *chn = channel;
755
756   const struct MulticastJoinDecisionMessage *dcsn =
757     (struct MulticastJoinDecisionMessage *) &hdcsn[1];
758
759   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
760       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
761   {
762     if (GNUNET_YES == ntohl (dcsn->is_admitted))
763     {
764       chn->join_status = JOIN_ADMITTED;
765     }
766     else
767     {
768       chn->join_status = JOIN_REFUSED;
769     }
770     cadet_send_channel (chn, &hdcsn->header);
771     return GNUNET_YES;
772   }
773
774   // return GNUNET_YES to continue the multihashmap_get iteration
775   return GNUNET_YES;
776 }
777
778
779 /**
780  * Send join decision to a remote peer.
781  */
782 static void
783 cadet_send_join_decision (struct Group *grp,
784                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
785 {
786   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
787                                               &cadet_send_join_decision_cb,
788                                               (void *) hdcsn);
789 }
790
791
792 /**
793  * Iterator callback for sending a message to origin clients.
794  */
795 static int
796 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
797                void *channel)
798 {
799   const struct GNUNET_MessageHeader *msg = cls;
800   struct Channel *chn = channel;
801   if (JOIN_ADMITTED == chn->join_status)
802     cadet_send_channel (chn, msg);
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * Send message to all connected children.
809  */
810 static int
811 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
812                      const struct GNUNET_MessageHeader *msg)
813 {
814   int n = 0;
815   if (channels_in != NULL)
816     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
817                                                      cadet_send_cb, (void *) msg);
818   return n;
819 }
820
821
822 #if 0       // unused as yet
823 /**
824  * Send message to all connected parents.
825  */
826 static int
827 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
828                     const struct GNUNET_MessageHeader *msg)
829 {
830   int n = 0;
831   if (channels_in != NULL)
832     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
833                                                      cadet_send_cb, (void *) msg);
834   return n;
835 }
836 #endif
837
838
839 /**
840  * CADET channel connect handler.
841  *
842  * @see GNUNET_CADET_ConnectEventHandler()
843  */
844 static void *
845 cadet_notify_connect (void *cls,
846                       struct GNUNET_CADET_Channel *channel,
847                       const struct GNUNET_PeerIdentity *source)
848 {
849   struct Channel *chn = GNUNET_malloc (sizeof (struct Channel));
850   chn->group = cls;
851   chn->channel = channel;
852   chn->direction = DIR_INCOMING;
853   chn->join_status = JOIN_NOT_ASKED;
854       
855   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group->pub_key_hash, chn,
856                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
857   return chn;
858 }
859
860
861 /**
862  * CADET window size change handler.
863  *
864  * @see GNUNET_CADET_WindowSizeEventHandler()
865  */
866 static void
867 cadet_notify_window_change (void *cls,
868                             const struct GNUNET_CADET_Channel *channel,
869                             int window_size)
870 {
871   struct Channel *chn = cls;
872
873   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
874               "%p Window size changed to %d.  Pending messages: %u\n",
875               chn, window_size, chn->msgs_pending);
876
877   chn->is_connected = GNUNET_YES;
878   chn->window_size = (int32_t) window_size;
879
880   for (int i = 0; i < window_size; i++)
881   {
882     if (0 < chn->msgs_pending)
883     {
884       client_send_ack (&chn->group_pub_hash);
885       chn->msgs_pending--;
886     }
887     else
888     {
889       break;
890     }
891   }
892 }
893
894
895 /**
896  * CADET channel disconnect handler.
897  *
898  * @see GNUNET_CADET_DisconnectEventHandler()
899  */
900 static void
901 cadet_notify_disconnect (void *cls,
902                          const struct GNUNET_CADET_Channel *channel)
903 {
904   if (NULL == cls)
905     return;
906
907   struct Channel *chn = cls;
908   if (NULL != chn->group)
909   {
910     if (GNUNET_NO == chn->group->is_origin)
911     {
912       struct Member *mem = (struct Member *) chn->group;
913       if (chn == mem->origin_channel)
914         mem->origin_channel = NULL;
915     }
916   }
917
918   int ret;
919   do
920   {
921     ret = replay_req_remove_cadet (chn);
922   }
923   while (GNUNET_YES == ret);
924
925   GNUNET_free (chn);
926 }
927
928
929 static int
930 check_cadet_join_request (void *cls,
931                           const struct MulticastJoinRequestMessage *req)
932 {
933   struct Channel *chn = cls;
934
935   if (NULL == chn
936       || JOIN_NOT_ASKED != chn->join_status)
937   {
938     return GNUNET_SYSERR;
939   }
940
941   uint16_t size = ntohs (req->header.size);
942   if (size < sizeof (*req))
943   {
944     GNUNET_break_op (0);
945     return GNUNET_SYSERR;
946   }
947   if (ntohl (req->purpose.size) != (size
948                                     - sizeof (req->header)
949                                     - sizeof (req->reserved)
950                                     - sizeof (req->signature)))
951   {
952     GNUNET_break_op (0);
953     return GNUNET_SYSERR;
954   }
955   if (GNUNET_OK !=
956       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
957                                   &req->purpose, &req->signature,
958                                   &req->member_pub_key))
959   {
960     GNUNET_break_op (0);
961     return GNUNET_SYSERR;
962   }
963
964   return GNUNET_OK;
965 }
966
967
968 /**
969  * Incoming join request message from CADET.
970  */
971 static void
972 handle_cadet_join_request (void *cls,
973                            const struct MulticastJoinRequestMessage *req)
974 {
975   struct Channel *chn = cls;
976   GNUNET_CADET_receive_done (chn->channel);
977
978   struct GNUNET_HashCode group_pub_hash;
979   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
980   chn->group_pub_key = req->group_pub_key;
981   chn->group_pub_hash = group_pub_hash;
982   chn->member_pub_key = req->member_pub_key;
983   chn->peer = req->peer;
984   chn->join_status = JOIN_WAITING;
985
986   client_send_all (&group_pub_hash, &req->header);
987 }
988
989
990 static int
991 check_cadet_join_decision (void *cls,
992                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
993 {
994   uint16_t size = ntohs (hdcsn->header.size);
995   if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
996              sizeof (struct MulticastJoinDecisionMessage))
997   {
998     GNUNET_break_op (0);
999     return GNUNET_SYSERR;
1000   }
1001
1002   struct Channel *chn = cls;
1003   if (NULL == chn)
1004   {
1005     GNUNET_break (0);
1006     return GNUNET_SYSERR;
1007   }
1008   if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
1009   {
1010     GNUNET_break (0);
1011     return GNUNET_SYSERR;
1012   }
1013   switch (chn->join_status)
1014   {
1015   case JOIN_REFUSED:
1016     return GNUNET_SYSERR;
1017
1018   case JOIN_ADMITTED:
1019     return GNUNET_OK;
1020
1021   case JOIN_NOT_ASKED:
1022   case JOIN_WAITING:
1023     break;
1024   }
1025
1026   return GNUNET_OK;
1027 }
1028
1029
1030 /**
1031  * Incoming join decision message from CADET.
1032  */
1033 static void
1034 handle_cadet_join_decision (void *cls,
1035                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1036 {
1037   const struct MulticastJoinDecisionMessage *
1038     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1039
1040   struct Channel *chn = cls;
1041   GNUNET_CADET_receive_done (chn->channel);
1042
1043   // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
1044   struct Member *mem = (struct Member *) chn->group;
1045   client_send_join_decision (mem, hdcsn);
1046   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1047   {
1048     chn->join_status = JOIN_ADMITTED;
1049   }
1050   else
1051   {
1052     chn->join_status = JOIN_REFUSED;
1053     cadet_channel_destroy (chn);
1054   }
1055 }
1056
1057
1058 static int
1059 check_cadet_message (void *cls,
1060                      const struct GNUNET_MULTICAST_MessageHeader *msg)
1061 {
1062   uint16_t size = ntohs (msg->header.size);
1063   if (size < sizeof (*msg))
1064   {
1065     GNUNET_break_op (0);
1066     return GNUNET_SYSERR;
1067   }
1068
1069   struct Channel *chn = cls;
1070   if (NULL == chn)
1071   {
1072     GNUNET_break (0);
1073     return GNUNET_SYSERR;
1074   }
1075   if (ntohl (msg->purpose.size) != (size
1076                                     - sizeof (msg->header)
1077                                     - sizeof (msg->hop_counter)
1078                                     - sizeof (msg->signature)))
1079   {
1080     GNUNET_break_op (0);
1081     return GNUNET_SYSERR;
1082   }
1083   if (GNUNET_OK !=
1084       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1085                                   &msg->purpose, &msg->signature,
1086                                   &chn->group_pub_key))
1087   {
1088     GNUNET_break_op (0);
1089     return GNUNET_SYSERR;
1090   }
1091
1092   return GNUNET_OK;
1093 }
1094
1095
1096 /**
1097  * Incoming multicast message from CADET.
1098  */
1099 static void
1100 handle_cadet_message (void *cls,
1101                       const struct GNUNET_MULTICAST_MessageHeader *msg)
1102 {
1103   struct Channel *chn = cls;
1104   GNUNET_CADET_receive_done (chn->channel);
1105   client_send_all (&chn->group_pub_hash, &msg->header);
1106 }
1107
1108
1109 static int
1110 check_cadet_request (void *cls,
1111                      const struct GNUNET_MULTICAST_RequestHeader *req)
1112 {
1113   uint16_t size = ntohs (req->header.size);
1114   if (size < sizeof (*req))
1115   {
1116     GNUNET_break_op (0);
1117     return GNUNET_SYSERR;
1118   }
1119
1120   struct Channel *chn = cls;
1121   if (NULL == chn)
1122   {
1123     GNUNET_break (0);
1124     return GNUNET_SYSERR;
1125   }
1126   if (ntohl (req->purpose.size) != (size
1127                                     - sizeof (req->header)
1128                                     - sizeof (req->member_pub_key)
1129                                     - sizeof (req->signature)))
1130   {
1131     GNUNET_break_op (0);
1132     return GNUNET_SYSERR;
1133   }
1134   if (GNUNET_OK !=
1135       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1136                                   &req->purpose, &req->signature,
1137                                   &req->member_pub_key))
1138   {
1139     GNUNET_break_op (0);
1140     return GNUNET_SYSERR;
1141   }
1142
1143   return GNUNET_OK;
1144 }
1145
1146
1147 /**
1148  * Incoming multicast request message from CADET.
1149  */
1150 static void
1151 handle_cadet_request (void *cls,
1152                       const struct GNUNET_MULTICAST_RequestHeader *req)
1153 {
1154   struct Channel *chn = cls;
1155   GNUNET_CADET_receive_done (chn->channel);
1156   client_send_origin (&chn->group_pub_hash, &req->header);
1157 }
1158
1159
1160 static int
1161 check_cadet_replay_request (void *cls,
1162                             const struct MulticastReplayRequestMessage *req)
1163 {
1164   uint16_t size = ntohs (req->header.size);
1165   if (size < sizeof (*req))
1166   {
1167     GNUNET_break_op (0);
1168     return GNUNET_SYSERR;
1169   }
1170
1171   struct Channel *chn = cls;
1172   if (NULL == chn)
1173   {
1174     GNUNET_break_op (0);
1175     return GNUNET_SYSERR;
1176   }
1177
1178   return GNUNET_OK;
1179 }
1180
1181
1182 /**
1183  * Incoming multicast replay request from CADET.
1184  */
1185 static void
1186 handle_cadet_replay_request (void *cls,
1187                              const struct MulticastReplayRequestMessage *req)
1188 {
1189   struct Channel *chn = cls;
1190   GNUNET_CADET_receive_done (chn->channel);
1191
1192   struct MulticastReplayRequestMessage rep = *req;
1193   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1194
1195   struct GNUNET_CONTAINER_MultiHashMap *
1196     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1197                                                         &chn->group->pub_key_hash);
1198   if (NULL == grp_replay_req)
1199   {
1200     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1201     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1202                                        &chn->group->pub_key_hash, grp_replay_req,
1203                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1204   }
1205   struct GNUNET_HashCode key_hash;
1206   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1207                    rep.flags, &key_hash);
1208   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1209                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1210
1211   client_send_random (&chn->group_pub_hash, &rep.header);
1212 }
1213
1214
1215 static int
1216 check_cadet_replay_response (void *cls,
1217                              const struct MulticastReplayResponseMessage *res)
1218 {
1219   struct Channel *chn = cls;
1220   if (NULL == chn)
1221   {
1222     GNUNET_break (0);
1223     return GNUNET_SYSERR;
1224   }
1225   return GNUNET_OK;
1226 }
1227
1228
1229 /**
1230  * Incoming multicast replay response from CADET.
1231  */
1232 static void
1233 handle_cadet_replay_response (void *cls,
1234                               const struct MulticastReplayResponseMessage *res)
1235 {
1236   struct Channel *chn = cls;
1237   GNUNET_CADET_receive_done (chn->channel);
1238
1239   /* @todo FIXME: got replay error response, send request to other members */
1240 }
1241
1242
1243 static void
1244 group_set_cadet_port_hash (struct Group *grp)
1245 {
1246   struct CadetPort {
1247     struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1248     uint32_t app_type;
1249   } port = {
1250     grp->pub_key,
1251     GNUNET_APPLICATION_TYPE_MULTICAST,
1252   };
1253   GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
1254 }
1255
1256
1257
1258 /**
1259  * Create new outgoing CADET channel.
1260  *
1261  * @param peer
1262  *        Peer to connect to.
1263  * @param group_pub_key
1264  *        Public key of group the channel belongs to.
1265  * @param group_pub_hash
1266  *        Hash of @a group_pub_key.
1267  *
1268  * @return Channel.
1269  */
1270 static struct Channel *
1271 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
1272 {
1273   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
1274   chn->group = grp;
1275   chn->group_pub_key = grp->pub_key;
1276   chn->group_pub_hash = grp->pub_key_hash;
1277   chn->peer = *peer;
1278   chn->direction = DIR_OUTGOING;
1279   chn->is_connected = GNUNET_NO;
1280   chn->join_status = JOIN_WAITING;
1281
1282   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1283     GNUNET_MQ_hd_var_size (cadet_message,
1284                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1285                            struct GNUNET_MULTICAST_MessageHeader,
1286                            chn),
1287
1288     GNUNET_MQ_hd_var_size (cadet_join_decision,
1289                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1290                            struct MulticastJoinDecisionMessageHeader,
1291                            chn),
1292
1293     GNUNET_MQ_hd_var_size (cadet_replay_request,
1294                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1295                            struct MulticastReplayRequestMessage,
1296                            chn),
1297
1298     GNUNET_MQ_hd_var_size (cadet_replay_response,
1299                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1300                            struct MulticastReplayResponseMessage,
1301                            chn),
1302
1303     GNUNET_MQ_handler_end ()
1304   };
1305
1306   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
1307                                               &grp->cadet_port_hash,
1308                                               GNUNET_CADET_OPTION_RELIABLE,
1309                                               cadet_notify_window_change,
1310                                               cadet_notify_disconnect,
1311                                               cadet_handlers);
1312   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
1313                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1314   return chn;
1315 }
1316
1317
1318 /**
1319  * Destroy outgoing CADET channel.
1320  */
1321 static void
1322 cadet_channel_destroy (struct Channel *chn)
1323 {
1324   GNUNET_CADET_channel_destroy (chn->channel);
1325   GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash);
1326   GNUNET_free (chn);
1327 }
1328
1329 /**
1330  * Handle a connecting client starting an origin.
1331  */
1332 static void
1333 handle_client_origin_start (void *cls,
1334                             const struct MulticastOriginStartMessage *msg)
1335 {
1336   struct Client *c = cls;
1337   struct GNUNET_SERVICE_Client *client = c->client;
1338
1339   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1340   struct GNUNET_HashCode pub_key_hash;
1341
1342   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
1343   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1344
1345   struct Origin *
1346     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
1347   struct Group *grp;
1348
1349   if (NULL == orig)
1350   {
1351     orig = GNUNET_new (struct Origin);
1352     orig->priv_key = msg->group_key;
1353     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
1354
1355     grp = c->group = &orig->group;
1356     grp->origin = orig;
1357     grp->is_origin = GNUNET_YES;
1358     grp->pub_key = pub_key;
1359     grp->pub_key_hash = pub_key_hash;
1360
1361     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
1362                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1363
1364     group_set_cadet_port_hash (grp);
1365
1366     struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1367       GNUNET_MQ_hd_var_size (cadet_message,
1368                              GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1369                              struct GNUNET_MULTICAST_MessageHeader,
1370                              grp),
1371
1372       GNUNET_MQ_hd_var_size (cadet_request,
1373                              GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
1374                              struct GNUNET_MULTICAST_RequestHeader,
1375                              grp),
1376
1377       GNUNET_MQ_hd_var_size (cadet_join_request,
1378                              GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1379                              struct MulticastJoinRequestMessage,
1380                              grp),
1381
1382       GNUNET_MQ_hd_var_size (cadet_replay_request,
1383                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1384                              struct MulticastReplayRequestMessage,
1385                              grp),
1386
1387       GNUNET_MQ_hd_var_size (cadet_replay_response,
1388                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1389                              struct MulticastReplayResponseMessage,
1390                              grp),
1391
1392       GNUNET_MQ_handler_end ()
1393     };
1394
1395
1396     orig->cadet_port = GNUNET_CADET_open_port (cadet,
1397                                                &grp->cadet_port_hash,
1398                                                cadet_notify_connect,
1399                                                grp,
1400                                                cadet_notify_window_change,
1401                                                cadet_notify_disconnect,
1402                                                cadet_handlers);
1403   }
1404   else
1405   {
1406     grp = &orig->group;
1407   }
1408
1409   struct ClientList *cl = GNUNET_new (struct ClientList);
1410   cl->client = client;
1411   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1412
1413   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1414               "%p Client connected as origin to group %s.\n",
1415               orig, GNUNET_h2s (&grp->pub_key_hash));
1416   GNUNET_SERVICE_client_continue (client);
1417 }
1418
1419
1420 static int
1421 check_client_member_join (void *cls,
1422                           const struct MulticastMemberJoinMessage *msg)
1423 {
1424   uint16_t msg_size = ntohs (msg->header.size);
1425   struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1426   uint32_t relay_count = ntohl (msg->relay_count);
1427   uint16_t relay_size = relay_count * sizeof (*relays);
1428   struct GNUNET_MessageHeader *join_msg = NULL;
1429   uint16_t join_msg_size = 0;
1430   if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1431       <= msg_size)
1432   {
1433     join_msg = (struct GNUNET_MessageHeader *)
1434       (((char *) &msg[1]) + relay_size);
1435     join_msg_size = ntohs (join_msg->size);
1436   }
1437   return
1438     msg_size == (sizeof (*msg) + relay_size + join_msg_size)
1439     ? GNUNET_OK
1440     : GNUNET_SYSERR;
1441 }
1442
1443
1444 /**
1445  * Handle a connecting client joining a group.
1446  */
1447 static void
1448 handle_client_member_join (void *cls,
1449                            const struct MulticastMemberJoinMessage *msg)
1450 {
1451   struct Client *c = cls;
1452   struct GNUNET_SERVICE_Client *client = c->client;
1453
1454   uint16_t msg_size = ntohs (msg->header.size);
1455
1456   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
1457   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
1458
1459   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
1460   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
1461   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
1462   
1463   struct GNUNET_CONTAINER_MultiHashMap *
1464     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
1465   struct Member *mem = NULL;
1466   struct Group *grp;
1467
1468   if (NULL != grp_mem)
1469   {
1470     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
1471   }
1472   
1473   if (NULL == mem)
1474   {
1475     mem = GNUNET_new (struct Member);
1476     mem->origin = msg->origin;
1477     mem->priv_key = msg->member_key;
1478     mem->pub_key = mem_pub_key;
1479     mem->pub_key_hash = mem_pub_key_hash;
1480     mem->max_fragment_id = 0; // FIXME
1481
1482     grp = c->group = &mem->group;
1483     grp->member = mem;
1484     grp->is_origin = GNUNET_NO;
1485     grp->pub_key = msg->group_pub_key;
1486     grp->pub_key_hash = pub_key_hash;
1487     group_set_cadet_port_hash (grp);
1488   
1489     if (NULL == grp_mem)
1490     {
1491       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1492       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
1493                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1494     }
1495     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
1496                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1497
1498     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1499                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1500   }
1501   else
1502   {
1503     grp = &mem->group;
1504   }
1505
1506   struct ClientList *cl = GNUNET_new (struct ClientList);
1507   cl->client = client;
1508   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1509
1510   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "Client connected to group %s as member %s (%s).\n",
1513               GNUNET_h2s (&grp->pub_key_hash),
1514               GNUNET_h2s2 (&mem->pub_key_hash),
1515               str);
1516   GNUNET_free (str);
1517
1518   if (NULL != mem->join_dcsn)
1519   { /* Already got a join decision, send it to client. */
1520     struct GNUNET_MQ_Envelope *
1521       env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header);
1522
1523     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
1524                     env);
1525   }
1526   else
1527   { /* First client of the group, send join request. */
1528     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1529     uint32_t relay_count = ntohl (msg->relay_count);
1530     uint16_t relay_size = relay_count * sizeof (*relays);
1531     struct GNUNET_MessageHeader *join_msg = NULL;
1532     uint16_t join_msg_size = 0;
1533     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1534         <= msg_size)
1535     {
1536       join_msg = (struct GNUNET_MessageHeader *)
1537         (((char *) &msg[1]) + relay_size);
1538       join_msg_size = ntohs (join_msg->size);
1539     }
1540
1541     uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size;
1542     struct MulticastJoinRequestMessage *
1543       req = GNUNET_malloc (req_msg_size);
1544     req->header.size = htons (req_msg_size);
1545     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1546     req->group_pub_key = grp->pub_key;
1547     req->peer = this_peer;
1548     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1549     if (0 < join_msg_size)
1550       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1551
1552     req->member_pub_key = mem->pub_key;
1553     req->purpose.size = htonl (req_msg_size
1554                                - sizeof (req->header)
1555                                - sizeof (req->reserved)
1556                                - sizeof (req->signature));
1557     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1558
1559     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1560                                                &req->signature))
1561     {
1562       /* FIXME: handle error */
1563       GNUNET_assert (0);
1564     }
1565
1566     if (NULL != mem->join_req)
1567       GNUNET_free (mem->join_req);
1568     mem->join_req = req;
1569
1570     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1571     { /* No local origins, send to remote origin */
1572       cadet_send_join_request (mem);
1573     }
1574   }
1575   GNUNET_SERVICE_client_continue (client);
1576 }
1577
1578
1579 static void
1580 client_send_join_decision (struct Member *mem,
1581                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1582 {
1583   client_send_group (&mem->group, &hdcsn->header);
1584
1585   const struct MulticastJoinDecisionMessage *
1586     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1587   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1588   { /* Member admitted, store join_decision. */
1589     uint16_t dcsn_size = ntohs (dcsn->header.size);
1590     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1591     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1592   }
1593   else
1594   { /* Refused entry, but replay would be still possible for past members. */
1595   }
1596 }
1597
1598
1599 static int
1600 check_client_join_decision (void *cls,
1601                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1602 {
1603   return GNUNET_OK;
1604 }
1605
1606
1607 /**
1608  * Join decision from client.
1609  */
1610 static void
1611 handle_client_join_decision (void *cls,
1612                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
1613 {
1614   struct Client *c = cls;
1615   struct GNUNET_SERVICE_Client *client = c->client;
1616   struct Group *grp = c->group;
1617
1618   if (NULL == grp)
1619   {
1620     GNUNET_break (0);
1621     GNUNET_SERVICE_client_drop (client);
1622     return;
1623   }
1624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625               "%p Got join decision from client for group %s..\n",
1626               grp, GNUNET_h2s (&grp->pub_key_hash));
1627
1628   struct GNUNET_CONTAINER_MultiHashMap *
1629     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1630                                                  &grp->pub_key_hash);
1631   struct Member *mem = NULL;
1632   if (NULL != grp_mem)
1633   {
1634     struct GNUNET_HashCode member_key_hash;
1635     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1636                         &member_key_hash);
1637     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1638     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639                 "%p ..and member %s: %p\n",
1640                 grp, GNUNET_h2s (&member_key_hash), mem);
1641   }
1642   
1643   if (NULL != mem) 
1644   { /* Found local member */
1645     client_send_join_decision (mem, hdcsn);
1646   }
1647   else
1648   { /* Look for remote member */
1649     cadet_send_join_decision (grp, hdcsn);
1650   }
1651   GNUNET_SERVICE_client_continue (client);
1652 }
1653
1654
1655 static int
1656 check_client_multicast_message (void *cls,
1657                                 const struct GNUNET_MULTICAST_MessageHeader *msg)
1658 {
1659   return GNUNET_OK;
1660 }
1661
1662
1663 /**
1664  * Incoming message from a client.
1665  */
1666 static void
1667 handle_client_multicast_message (void *cls,
1668                                  const struct GNUNET_MULTICAST_MessageHeader *msg)
1669 {
1670   struct Client *c = cls;
1671   struct GNUNET_SERVICE_Client *client = c->client;
1672   struct Group *grp = c->group;
1673
1674   if (NULL == grp)
1675   {
1676     GNUNET_break (0);
1677     GNUNET_SERVICE_client_drop (client);
1678     return;
1679   }
1680   GNUNET_assert (GNUNET_YES == grp->is_origin);
1681   struct Origin *orig = grp->origin;
1682
1683   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1684   struct GNUNET_MULTICAST_MessageHeader *
1685     out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
1686   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1687   out->purpose.size = htonl (ntohs (out->header.size)
1688                              - sizeof (out->header)
1689                              - sizeof (out->hop_counter)
1690                              - sizeof (out->signature));
1691   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1692
1693   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1694                                              &out->signature))
1695   {
1696     GNUNET_assert (0);
1697   }
1698
1699   client_send_all (&grp->pub_key_hash, &out->header);
1700   cadet_send_children (&grp->pub_key_hash, &out->header);
1701   client_send_ack (&grp->pub_key_hash);
1702   GNUNET_free (out);
1703
1704   GNUNET_SERVICE_client_continue (client);
1705 }
1706
1707
1708 static int
1709 check_client_multicast_request (void *cls,
1710                                 const struct GNUNET_MULTICAST_RequestHeader *req)
1711 {
1712   return GNUNET_OK;
1713 }
1714
1715
1716 /**
1717  * Incoming request from a client.
1718  */
1719 static void
1720 handle_client_multicast_request (void *cls,
1721                                  const struct GNUNET_MULTICAST_RequestHeader *req)
1722 {
1723   struct Client *c = cls;
1724   struct GNUNET_SERVICE_Client *client = c->client;
1725   struct Group *grp = c->group;
1726
1727   if (NULL == grp)
1728   {
1729     GNUNET_break (0);
1730     GNUNET_SERVICE_client_drop (client);
1731     return;
1732   }
1733   GNUNET_assert (GNUNET_NO == grp->is_origin);
1734   struct Member *mem = grp->member;
1735
1736   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1737   struct GNUNET_MULTICAST_RequestHeader *
1738     out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header);
1739   out->member_pub_key = mem->pub_key;
1740   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1741   out->purpose.size = htonl (ntohs (out->header.size)
1742                              - sizeof (out->header)
1743                              - sizeof (out->member_pub_key)
1744                              - sizeof (out->signature));
1745   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1746
1747   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1748                                              &out->signature))
1749   {
1750     GNUNET_assert (0);
1751   }
1752
1753   uint8_t send_ack = GNUNET_YES;
1754   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1755   { /* No local origins, send to remote origin */
1756     if (NULL != mem->origin_channel)
1757     {
1758       cadet_send_channel (mem->origin_channel, &out->header);
1759       send_ack = GNUNET_NO;
1760     }
1761     else
1762     {
1763       /* FIXME: not yet connected to origin */
1764       GNUNET_SERVICE_client_drop (client);
1765       GNUNET_free (out);
1766       return;
1767     }
1768   }
1769   if (GNUNET_YES == send_ack)
1770   {
1771     client_send_ack (&grp->pub_key_hash);
1772   }
1773   GNUNET_free (out);
1774   GNUNET_SERVICE_client_continue (client);
1775 }
1776
1777
1778 /**
1779  * Incoming replay request from a client.
1780  */
1781 static void
1782 handle_client_replay_request (void *cls,
1783                               const struct MulticastReplayRequestMessage *rep)
1784 {
1785   struct Client *c = cls;
1786   struct GNUNET_SERVICE_Client *client = c->client;
1787   struct Group *grp = c->group;
1788
1789   if (NULL == grp)
1790   {
1791     GNUNET_break (0);
1792     GNUNET_SERVICE_client_drop (client);
1793     return;
1794   }
1795   GNUNET_assert (GNUNET_NO == grp->is_origin);
1796   struct Member *mem = grp->member;
1797
1798   struct GNUNET_CONTAINER_MultiHashMap *
1799     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1800                                                         &grp->pub_key_hash);
1801   if (NULL == grp_replay_req)
1802   {
1803     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1804     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1805                                        &grp->pub_key_hash, grp_replay_req,
1806                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1807   }
1808
1809   struct GNUNET_HashCode key_hash;
1810   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1811                    rep->flags, &key_hash);
1812   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1813                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1814
1815   if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
1816   { /* No local origin, replay from remote members / origin. */
1817     if (NULL != mem->origin_channel)
1818     {
1819       cadet_send_channel (mem->origin_channel, &rep->header);
1820     }
1821     else
1822     {
1823       /* FIXME: not yet connected to origin */
1824       GNUNET_SERVICE_client_drop (client);
1825       return;
1826     }
1827   }
1828   GNUNET_SERVICE_client_continue (client);
1829 }
1830
1831
1832 static int
1833 cadet_send_replay_response_cb (void *cls,
1834                                const struct GNUNET_HashCode *key_hash,
1835                                void *value)
1836 {
1837   struct Channel *chn = value;
1838   struct GNUNET_MessageHeader *msg = cls;
1839
1840   cadet_send_channel (chn, msg);
1841   return GNUNET_OK;
1842 }
1843
1844
1845 static int
1846 client_send_replay_response_cb (void *cls,
1847                                 const struct GNUNET_HashCode *key_hash,
1848                                 void *value)
1849 {
1850   struct GNUNET_SERVICE_Client *client = value;
1851   struct GNUNET_MessageHeader *msg = cls;
1852
1853   client_send (client, msg);
1854   return GNUNET_OK;
1855 }
1856
1857
1858 static int
1859 check_client_replay_response_end (void *cls,
1860                                   const struct MulticastReplayResponseMessage *res)
1861 {
1862   return GNUNET_OK;
1863 }
1864
1865
1866 /**
1867  * End of replay response from a client.
1868  */
1869 static void
1870 handle_client_replay_response_end (void *cls,
1871                                    const struct MulticastReplayResponseMessage *res)
1872 {
1873   struct Client *c = cls;
1874   struct GNUNET_SERVICE_Client *client = c->client;
1875   struct Group *grp = c->group;
1876
1877   if (NULL == grp)
1878   {
1879     GNUNET_break (0);
1880     GNUNET_SERVICE_client_drop (client);
1881     return;
1882   }
1883
1884   struct GNUNET_HashCode key_hash;
1885   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1886                    res->flags, &key_hash);
1887
1888   struct GNUNET_CONTAINER_MultiHashMap *
1889     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1890                                                               &grp->pub_key_hash);
1891   if (NULL != grp_replay_req_cadet)
1892   {
1893     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1894   }
1895   struct GNUNET_CONTAINER_MultiHashMap *
1896     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1897                                                                &grp->pub_key_hash);
1898   if (NULL != grp_replay_req_client)
1899   {
1900     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1901   }
1902   GNUNET_SERVICE_client_continue (client);
1903 }
1904
1905
1906 static int
1907 check_client_replay_response (void *cls,
1908                               const struct MulticastReplayResponseMessage *res)
1909 {
1910   const struct GNUNET_MessageHeader *msg = &res->header;
1911   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1912   {
1913     msg = GNUNET_MQ_extract_nested_mh (res);
1914     if (NULL == msg)
1915     {
1916       return GNUNET_SYSERR;
1917     }
1918   }
1919   return GNUNET_OK;
1920 }
1921
1922
1923 /**
1924  * Incoming replay response from a client.
1925  *
1926  * Respond with a multicast message on success, or otherwise with an error code.
1927  */
1928 static void
1929 handle_client_replay_response (void *cls,
1930                                const struct MulticastReplayResponseMessage *res)
1931 {
1932   struct Client *c = cls;
1933   struct GNUNET_SERVICE_Client *client = c->client;
1934   struct Group *grp = c->group;
1935
1936   if (NULL == grp)
1937   {
1938     GNUNET_break (0);
1939     GNUNET_SERVICE_client_drop (client);
1940     return;
1941   }
1942
1943   const struct GNUNET_MessageHeader *msg = &res->header;
1944   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1945   {
1946     msg = GNUNET_MQ_extract_nested_mh (res);
1947   }
1948
1949   struct GNUNET_HashCode key_hash;
1950   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1951                    res->flags, &key_hash);
1952
1953   struct GNUNET_CONTAINER_MultiHashMap *
1954     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1955                                                               &grp->pub_key_hash);
1956   if (NULL != grp_replay_req_cadet)
1957   {
1958     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1959                                                 cadet_send_replay_response_cb,
1960                                                 (void *) msg);
1961   }
1962   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1963   {
1964     struct GNUNET_CONTAINER_MultiHashMap *
1965       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1966                                                                  &grp->pub_key_hash);
1967     if (NULL != grp_replay_req_client)
1968     {
1969       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1970                                                   client_send_replay_response_cb,
1971                                                   (void *) msg);
1972     }
1973   }
1974   else
1975   {
1976     handle_client_replay_response_end (c, res);
1977     return;
1978   }
1979   GNUNET_SERVICE_client_continue (client);
1980 }
1981
1982
1983 /**
1984  * A new client connected.
1985  *
1986  * @param cls NULL
1987  * @param client client to add
1988  * @param mq message queue for @a client
1989  * @return @a client
1990  */
1991 static void *
1992 client_notify_connect (void *cls,
1993                        struct GNUNET_SERVICE_Client *client,
1994                        struct GNUNET_MQ_Handle *mq)
1995 {
1996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1997   /* FIXME: send connect ACK */
1998
1999   struct Client *c = GNUNET_new (struct Client);
2000   c->client = client;
2001
2002   return c;
2003 }
2004
2005
2006 /**
2007  * Called whenever a client is disconnected.
2008  * Frees our resources associated with that client.
2009  *
2010  * @param cls closure
2011  * @param client identification of the client
2012  * @param app_ctx must match @a client
2013  */
2014 static void
2015 client_notify_disconnect (void *cls,
2016                           struct GNUNET_SERVICE_Client *client,
2017                           void *app_ctx)
2018 {
2019   struct Client *c = app_ctx;
2020   struct Group *grp = c->group;
2021   GNUNET_free (c);
2022
2023   if (NULL == grp)
2024   {
2025     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2026                 "%p User context is NULL in client_disconnect()\n", grp);
2027     GNUNET_break (0);
2028     return;
2029   }
2030
2031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2032               "%p Client (%s) disconnected from group %s\n",
2033               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
2034               GNUNET_h2s (&grp->pub_key_hash));
2035
2036   struct ClientList *cl = grp->clients_head;
2037   while (NULL != cl)
2038   {
2039     if (cl->client == client)
2040     {
2041       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
2042       GNUNET_free (cl);
2043       break;
2044     }
2045     cl = cl->next;
2046   }
2047
2048   while (GNUNET_YES == replay_req_remove_client (grp, client));
2049
2050   if (NULL == grp->clients_head)
2051   { /* Last client disconnected. */
2052 #if FIXME
2053     if (NULL != grp->tmit_head)
2054     { /* Send pending messages via CADET before cleanup. */
2055       transmit_message (grp);
2056     }
2057     else
2058 #endif
2059     {
2060       cleanup_group (grp);
2061     }
2062   }
2063 }
2064
2065
2066 /**
2067  * Service started.
2068  *
2069  * @param cls closure
2070  * @param server the initialized server
2071  * @param cfg configuration to use
2072  */
2073 static void
2074 run (void *cls,
2075      const struct GNUNET_CONFIGURATION_Handle *c,
2076      struct GNUNET_SERVICE_Handle *svc)
2077 {
2078   cfg = c;
2079   service = svc;
2080   GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
2081
2082   stats = GNUNET_STATISTICS_create ("multicast", cfg);
2083   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2084   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2085   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2086   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2087   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2088   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2089   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2090
2091   cadet = GNUNET_CADET_connect (cfg);
2092
2093   GNUNET_assert (NULL != cadet);
2094
2095   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2096                                  NULL);
2097 }
2098
2099
2100 /**
2101  * Define "main" method using service macro.
2102  */
2103 GNUNET_SERVICE_MAIN
2104 ("multicast",
2105  GNUNET_SERVICE_OPTION_NONE,
2106  run,
2107  client_notify_connect,
2108  client_notify_disconnect,
2109  NULL,
2110  GNUNET_MQ_hd_fixed_size (client_origin_start,
2111                           GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
2112                           struct MulticastOriginStartMessage,
2113                           NULL),
2114  GNUNET_MQ_hd_var_size (client_member_join,
2115                         GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN,
2116                         struct MulticastMemberJoinMessage,
2117                         NULL),
2118  GNUNET_MQ_hd_var_size (client_join_decision,
2119                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
2120                         struct MulticastJoinDecisionMessageHeader,
2121                         NULL),
2122  GNUNET_MQ_hd_var_size (client_multicast_message,
2123                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
2124                         struct GNUNET_MULTICAST_MessageHeader,
2125                         NULL),
2126  GNUNET_MQ_hd_var_size (client_multicast_request,
2127                         GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
2128                         struct GNUNET_MULTICAST_RequestHeader,
2129                         NULL),
2130  GNUNET_MQ_hd_fixed_size (client_replay_request,
2131                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
2132                           struct MulticastReplayRequestMessage,
2133                           NULL),
2134  GNUNET_MQ_hd_var_size (client_replay_response,
2135                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
2136                         struct MulticastReplayResponseMessage,
2137                         NULL),
2138  GNUNET_MQ_hd_var_size (client_replay_response_end,
2139                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END,
2140                         struct MulticastReplayResponseMessage,
2141                         NULL));
2142
2143 /* end of gnunet-service-multicast.c */