multicast: switch to MQ
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_core_service.h"
32 #include "gnunet_cadet_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "multicast.h"
35
36 /**
37  * Handle to our current configuration.
38  */
39 static const struct GNUNET_CONFIGURATION_Handle *cfg;
40
41 /**
42  * Server handle.
43  */
44 static struct GNUNET_SERVER_Handle *server;
45
46 /**
47  * Core handle.
48  * Only used during initialization.
49  */
50 static struct GNUNET_CORE_Handle *core;
51
52 /**
53  * CADET handle.
54  */
55 static struct GNUNET_CADET_Handle *cadet;
56
57 /**
58  * Identity of this peer.
59  */
60 static struct GNUNET_PeerIdentity this_peer;
61
62 /**
63  * Handle to the statistics service.
64  */
65 static struct GNUNET_STATISTICS_Handle *stats;
66
67 /**
68  * Notification context, simplifies client broadcasts.
69  */
70 static struct GNUNET_SERVER_NotificationContext *nc;
71
72 /**
73  * All connected origin clients.
74  * Group's pub_key_hash -> struct Origin * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *origins;
77
78 /**
79  * All connected member clients.
80  * Group's pub_key_hash -> struct Member * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *members;
83
84 /**
85  * Connected member clients per group.
86  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
89
90 /**
91  * Incoming CADET channels with connected children in the tree.
92  * Group's pub_key_hash -> struct Channel * (multi)
93  */
94 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
95
96 /**
97  * Outgoing CADET channels connecting to parents in the tree.
98  * Group's pub_key_hash -> struct Channel * (multi)
99  */
100 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
101
102 /**
103  * Incoming replay requests from CADET.
104  * Group's pub_key_hash ->
105  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
106  */
107 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
108
109 /**
110  * Incoming replay requests from clients.
111  * Group's pub_key_hash ->
112  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVER_Client *
113  */
114 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
115
116
117 /**
118  * Join status of a remote peer.
119  */
120 enum JoinStatus
121 {
122   JOIN_REFUSED  = -1,
123   JOIN_NOT_ASKED = 0,
124   JOIN_WAITING   = 1,
125   JOIN_ADMITTED  = 2,
126 };
127
128 enum ChannelDirection
129 {
130   DIR_INCOMING = 0,
131   DIR_OUTGOING = 1,
132 };
133
134
135 /**
136  * Context for a CADET channel.
137  */
138 struct Channel
139 {
140   /**
141    * Group the channel belongs to.
142    *
143    * Only set for outgoing channels.
144    */
145   struct Group *grp;
146
147   /**
148    * CADET channel.
149    */
150   struct GNUNET_CADET_Channel *channel;
151
152   /**
153    * CADET transmission handle.
154    */
155   struct GNUNET_CADET_TransmitHandle *tmit_handle;
156
157   /**
158    * Public key of the target group.
159    */
160   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
161
162   /**
163    * Hash of @a group_pub_key.
164    */
165   struct GNUNET_HashCode group_pub_hash;
166
167   /**
168    * Public key of the joining member.
169    */
170   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
171
172   /**
173    * Remote peer identity.
174    */
175   struct GNUNET_PeerIdentity peer;
176
177   /**
178    * Is the remote peer admitted to the group?
179    * @see enum JoinStatus
180    */
181   int8_t join_status;
182
183   /**
184    * Number of messages waiting to be sent to CADET.
185    */
186   uint8_t msgs_pending;
187
188   /**
189    * Channel direction.
190    * @see enum ChannelDirection
191    */
192   uint8_t direction;
193 };
194
195
196 /**
197  * List of connected clients.
198  */
199 struct ClientList
200 {
201   struct ClientList *prev;
202   struct ClientList *next;
203   struct GNUNET_SERVER_Client *client;
204 };
205
206 /**
207  * Common part of the client context for both an origin and member.
208  */
209 struct Group
210 {
211   struct ClientList *clients_head;
212   struct ClientList *clients_tail;
213
214   /**
215    * Public key of the group.
216    */
217   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
218
219   /**
220    * Hash of @a pub_key.
221    */
222   struct GNUNET_HashCode pub_key_hash;
223
224   /**
225    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
226    */
227   uint8_t is_origin;
228
229   /**
230    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
231    */
232   uint8_t disconnected;
233 };
234
235
236 /**
237  * Client context for a group's origin.
238  */
239 struct Origin
240 {
241   struct Group grp;
242
243   /**
244    * Private key of the group.
245    */
246   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
247
248   /**
249    * Last message fragment ID sent to the group.
250    */
251   uint64_t max_fragment_id;
252 };
253
254
255 /**
256  * Client context for a group member.
257  */
258 struct Member
259 {
260   struct Group grp;
261
262   /**
263    * Private key of the member.
264    */
265   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
266
267   /**
268    * Public key of the member.
269    */
270   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
271
272   /**
273    * Hash of @a pub_key.
274    */
275   struct GNUNET_HashCode pub_key_hash;
276
277   /**
278    * Join request sent to the origin / members.
279    */
280   struct MulticastJoinRequestMessage *join_req;
281
282   /**
283    * Join decision sent in reply to our request.
284    *
285    * Only a positive decision is stored here, in case of a negative decision the
286    * client is disconnected.
287    */
288   struct MulticastJoinDecisionMessageHeader *join_dcsn;
289
290   /**
291    * CADET channel to the origin.
292    */
293   struct Channel *origin_channel;
294
295   /**
296    * Peer identity of origin.
297    */
298   struct GNUNET_PeerIdentity origin;
299
300   /**
301    * Peer identity of relays (other members to connect).
302    */
303   struct GNUNET_PeerIdentity *relays;
304
305   /**
306    * Last request fragment ID sent to the origin.
307    */
308   uint64_t max_fragment_id;
309
310   /**
311    * Number of @a relays.
312    */
313   uint32_t relay_count;
314 };
315
316
317 struct ReplayRequestKey
318 {
319   uint64_t fragment_id;
320   uint64_t message_id;
321   uint64_t fragment_offset;
322   uint64_t flags;
323 };
324
325
326 /**
327  * Task run during shutdown.
328  *
329  * @param cls unused
330  */
331 static void
332 shutdown_task (void *cls)
333 {
334   if (NULL != core)
335   {
336     GNUNET_CORE_disconnecT (core);
337     core = NULL;
338   }
339   if (NULL != cadet)
340   {
341     GNUNET_CADET_disconnect (cadet);
342     cadet = NULL;
343   }
344   if (NULL != stats)
345   {
346     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
347     stats = NULL;
348   }
349   /* FIXME: do more clean up here */
350 }
351
352
353 /**
354  * Clean up origin data structures after a client disconnected.
355  */
356 static void
357 cleanup_origin (struct Origin *orig)
358 {
359   struct Group *grp = &orig->grp;
360   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
361 }
362
363
364 /**
365  * Clean up member data structures after a client disconnected.
366  */
367 static void
368 cleanup_member (struct Member *mem)
369 {
370   struct Group *grp = &mem->grp;
371   struct GNUNET_CONTAINER_MultiHashMap *
372     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
373                                                  &grp->pub_key_hash);
374   GNUNET_assert (NULL != grp_mem);
375   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
376
377   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
378   {
379     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
380                                           grp_mem);
381     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
382   }
383   if (NULL != mem->join_dcsn)
384   {
385     GNUNET_free (mem->join_dcsn);
386     mem->join_dcsn = NULL;
387   }
388   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
389 }
390
391
392 /**
393  * Clean up group data structures after a client disconnected.
394  */
395 static void
396 cleanup_group (struct Group *grp)
397 {
398   (GNUNET_YES == grp->is_origin)
399     ? cleanup_origin ((struct Origin *) grp)
400     : cleanup_member ((struct Member *) grp);
401
402   GNUNET_free (grp);
403 }
404
405
406 void
407 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
408                  uint64_t fragment_offset, uint64_t flags,
409                  struct GNUNET_HashCode *key_hash)
410 {
411   struct ReplayRequestKey key = {
412     .fragment_id = fragment_id,
413     .message_id = message_id,
414     .fragment_offset = fragment_offset,
415     .flags = flags,
416   };
417   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
418 }
419
420
421 /**
422  * Remove channel from replay request hashmap.
423  *
424  * @param chn
425  *        Channel to remove.
426  *
427  * @return #GNUNET_YES if there are more entries to process,
428  *         #GNUNET_NO when reached end of hashmap.
429  */
430 static int
431 replay_req_remove_cadet (struct Channel *chn)
432 {
433   struct GNUNET_CONTAINER_MultiHashMap *
434     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
435                                                         &chn->grp->pub_key_hash);
436   if (NULL == grp_replay_req)
437     return GNUNET_NO;
438
439   struct GNUNET_CONTAINER_MultiHashMapIterator *
440     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
441   struct GNUNET_HashCode key;
442   const struct Channel *c;
443   while (GNUNET_YES
444          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
445                                                          (const void **) &c))
446   {
447     if (c == chn)
448     {
449       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
450       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
451       return GNUNET_YES;
452     }
453   }
454   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
455   return GNUNET_NO;
456 }
457
458
459 /**
460  * Remove client from replay request hashmap.
461  *
462  * @param client
463  *        Client to remove.
464  *
465  * @return #GNUNET_YES if there are more entries to process,
466  *         #GNUNET_NO when reached end of hashmap.
467  */
468 static int
469 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client)
470 {
471   struct GNUNET_CONTAINER_MultiHashMap *
472     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
473                                                         &grp->pub_key_hash);
474   if (NULL == grp_replay_req)
475     return GNUNET_NO;
476
477   struct GNUNET_CONTAINER_MultiHashMapIterator *
478     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
479   struct GNUNET_HashCode key;
480   const struct GNUNET_SERVER_Client *c;
481   while (GNUNET_YES
482          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
483                                                          (const void **) &c))
484   {
485     if (c == client)
486     {
487       GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
488       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
489       return GNUNET_YES;
490     }
491   }
492   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
493   return GNUNET_NO;
494 }
495
496
497 /**
498  * Called whenever a client is disconnected.
499  *
500  * Frees our resources associated with that client.
501  *
502  * @param cls  Closure.
503  * @param client  Client handle.
504  */
505 static void
506 client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
507 {
508   if (NULL == client)
509     return;
510
511   struct Group *grp
512     = GNUNET_SERVER_client_get_user_context (client, struct Group);
513
514   if (NULL == grp)
515   {
516     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
517                 "%p User context is NULL in client_disconnect()\n", grp);
518     GNUNET_break (0);
519     return;
520   }
521
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "%p Client (%s) disconnected from group %s\n",
524               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
525               GNUNET_h2s (&grp->pub_key_hash));
526
527   struct ClientList *cl = grp->clients_head;
528   while (NULL != cl)
529   {
530     if (cl->client == client)
531     {
532       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
533       GNUNET_free (cl);
534       break;
535     }
536     cl = cl->next;
537   }
538
539   while (GNUNET_YES == replay_req_remove_client (grp, client));
540
541   if (NULL == grp->clients_head)
542   { /* Last client disconnected. */
543 #if FIXME
544     if (NULL != grp->tmit_head)
545     { /* Send pending messages via CADET before cleanup. */
546       transmit_message (grp);
547     }
548     else
549 #endif
550     {
551       cleanup_group (grp);
552     }
553   }
554 }
555
556
557 /**
558  * Send message to a client.
559  */
560 static void
561 client_send (struct GNUNET_SERVER_Client *client,
562              const struct GNUNET_MessageHeader *msg)
563 {
564   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
565               "%p Sending message to client.\n", client);
566
567   GNUNET_SERVER_notification_context_add (nc, client);
568   GNUNET_SERVER_notification_context_unicast (nc, client, msg, GNUNET_NO);
569 }
570
571
572 /**
573  * Send message to all clients connected to the group.
574  */
575 static void
576 client_send_group (const struct Group *grp,
577                    const struct GNUNET_MessageHeader *msg)
578 {
579   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
580               "%p Sending message to all clients of the group.\n", grp);
581
582   struct ClientList *cl = grp->clients_head;
583   while (NULL != cl)
584   {
585     GNUNET_SERVER_notification_context_add (nc, cl->client);
586     GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
587     cl = cl->next;
588   }
589 }
590
591
592 /**
593  * Iterator callback for sending a message to origin clients.
594  */
595 static int
596 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
597                        void *origin)
598 {
599   const struct GNUNET_MessageHeader *msg = cls;
600   struct Member *orig = origin;
601
602   client_send_group (&orig->grp, msg);
603   return GNUNET_YES;
604 }
605
606
607 /**
608  * Iterator callback for sending a message to member clients.
609  */
610 static int
611 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
612                        void *member)
613 {
614   const struct GNUNET_MessageHeader *msg = cls;
615   struct Member *mem = member;
616
617   if (NULL != mem->join_dcsn)
618   { /* Only send message to admitted members */
619     client_send_group (&mem->grp, msg);
620   }
621   return GNUNET_YES;
622 }
623
624
625 /**
626  * Send message to all origin and member clients connected to the group.
627  *
628  * @param pub_key_hash
629  *        H(key_pub) of the group.
630  * @param msg
631  *        Message to send.
632  */
633 static int
634 client_send_all (struct GNUNET_HashCode *pub_key_hash,
635                  const struct GNUNET_MessageHeader *msg)
636 {
637   int n = 0;
638   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
639                                                    client_send_origin_cb,
640                                                    (void *) msg);
641   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
642                                                    client_send_member_cb,
643                                                    (void *) msg);
644   return n;
645 }
646
647
648 /**
649  * Send message to a random origin client or a random member client.
650  *
651  * @param grp  The group to send @a msg to.
652  * @param msg  Message to send.
653  */
654 static int
655 client_send_random (struct GNUNET_HashCode *pub_key_hash,
656                     const struct GNUNET_MessageHeader *msg)
657 {
658   int n = 0;
659   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
660                                                  (void *) msg);
661   if (n <= 0)
662     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
663                                                    (void *) msg);
664   return n;
665 }
666
667
668 /**
669  * Send message to all origin clients connected to the group.
670  *
671  * @param pub_key_hash
672  *        H(key_pub) of the group.
673  * @param msg
674  *        Message to send.
675  */
676 static int
677 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
678                     const struct GNUNET_MessageHeader *msg)
679 {
680   int n = 0;
681   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
682                                                    client_send_origin_cb,
683                                                    (void *) msg);
684   return n;
685 }
686
687
688 /**
689  * Send fragment acknowledgement to all clients of the channel.
690  *
691  * @param pub_key_hash
692  *        H(key_pub) of the group.
693  */
694 static void
695 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
696 {
697   static struct GNUNET_MessageHeader *msg = NULL;
698   if (NULL == msg)
699   {
700     msg = GNUNET_malloc (sizeof (*msg));
701     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
702     msg->size = htons (sizeof (*msg));
703   }
704   client_send_all (pub_key_hash, msg);
705 }
706
707
708 struct CadetTransmitClosure
709 {
710   struct Channel *chn;
711   const struct GNUNET_MessageHeader *msg;
712 };
713
714
715 /**
716  * CADET is ready to transmit a message.
717  */
718 size_t
719 cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
720 {
721   if (0 == buf_size)
722   {
723     /* FIXME: connection closed */
724     return 0;
725   }
726   struct CadetTransmitClosure *tcls = cls;
727   struct Channel *chn = tcls->chn;
728   uint16_t msg_size = ntohs (tcls->msg->size);
729   GNUNET_assert (msg_size <= buf_size);
730   GNUNET_memcpy (buf, tcls->msg, msg_size);
731   GNUNET_free (tcls);
732
733   if (0 == chn->msgs_pending)
734   {
735     GNUNET_break (0);
736   }
737   else if (0 == --chn->msgs_pending)
738   {
739     client_send_ack (&chn->group_pub_hash);
740   }
741   return msg_size;
742 }
743
744
745 /**
746  * Send a message to a CADET channel.
747  *
748  * @param chn  Channel.
749  * @param msg  Message.
750  */
751 static void
752 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
753 {
754   struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
755   tcls->chn = chn;
756   tcls->msg = msg;
757
758   chn->msgs_pending++;
759   chn->tmit_handle
760     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
761                                           GNUNET_TIME_UNIT_FOREVER_REL,
762                                           ntohs (msg->size),
763                                           &cadet_notify_transmit_ready,
764                                           tcls);
765   GNUNET_assert (NULL != chn->tmit_handle);
766 }
767
768
769 /**
770  * Create new outgoing CADET channel.
771  *
772  * @param peer
773  *        Peer to connect to.
774  * @param group_pub_key
775  *        Public key of group the channel belongs to.
776  * @param group_pub_hash
777  *        Hash of @a group_pub_key.
778  *
779  * @return Channel.
780  */
781 static struct Channel *
782 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
783 {
784   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
785   chn->grp = grp;
786   chn->group_pub_key = grp->pub_key;
787   chn->group_pub_hash = grp->pub_key_hash;
788   chn->peer = *peer;
789   chn->direction = DIR_OUTGOING;
790   chn->join_status = JOIN_WAITING;
791   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
792                                               GC_u2h (GNUNET_APPLICATION_TYPE_MULTICAST),
793                                               GNUNET_CADET_OPTION_RELIABLE);
794   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
795                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
796   return chn;
797 }
798
799
800 /**
801  * Create CADET channel and send a join request.
802  */
803 static void
804 cadet_send_join_request (struct Member *mem)
805 {
806   mem->origin_channel = cadet_channel_create (&mem->grp, &mem->origin);
807   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
808
809   uint32_t i;
810   for (i = 0; i < mem->relay_count; i++)
811   {
812     struct Channel *
813       chn = cadet_channel_create (&mem->grp, &mem->relays[i]);
814     cadet_send_channel (chn, &mem->join_req->header);
815   }
816 }
817
818
819 static int
820 cadet_send_join_decision_cb (void *cls,
821                              const struct GNUNET_HashCode *group_pub_hash,
822                              void *channel)
823 {
824   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
825   struct Channel *chn = channel;
826
827   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
828       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
829   {
830     cadet_send_channel (chn, &hdcsn->header);
831     return GNUNET_NO;
832   }
833   return GNUNET_YES;
834 }
835
836
837 /**
838  * Send join decision to a remote peer.
839  */
840 static void
841 cadet_send_join_decision (struct Group *grp,
842                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
843 {
844   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
845                                               &cadet_send_join_decision_cb,
846                                               (void *) hdcsn);
847 }
848
849
850 /**
851  * Iterator callback for sending a message to origin clients.
852  */
853 static int
854 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
855                void *channel)
856 {
857   const struct GNUNET_MessageHeader *msg = cls;
858   struct Channel *chn = channel;
859   if (JOIN_ADMITTED == chn->join_status)
860     cadet_send_channel (chn, msg);
861   return GNUNET_YES;
862 }
863
864
865 /**
866  * Send message to all connected children.
867  */
868 static int
869 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
870                      const struct GNUNET_MessageHeader *msg)
871 {
872   int n = 0;
873   if (channels_in != NULL)
874     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
875                                                      cadet_send_cb, (void *) msg);
876   return n;
877 }
878
879
880 #if 0       // unused as yet
881 /**
882  * Send message to all connected parents.
883  */
884 static int
885 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
886                     const struct GNUNET_MessageHeader *msg)
887 {
888   int n = 0;
889   if (channels_in != NULL)
890     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
891                                                      cadet_send_cb, (void *) msg);
892   return n;
893 }
894 #endif
895
896
897 /**
898  * Handle a connecting client starting an origin.
899  */
900 static void
901 client_recv_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
902                           const struct GNUNET_MessageHeader *m)
903 {
904   const struct MulticastOriginStartMessage *
905     msg = (const struct MulticastOriginStartMessage *) m;
906
907   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
908   struct GNUNET_HashCode pub_key_hash;
909
910   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
911   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
912
913   struct Origin *
914     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
915   struct Group *grp;
916
917   if (NULL == orig)
918   {
919     orig = GNUNET_new (struct Origin);
920     orig->priv_key = msg->group_key;
921     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
922     grp = &orig->grp;
923     grp->is_origin = GNUNET_YES;
924     grp->pub_key = pub_key;
925     grp->pub_key_hash = pub_key_hash;
926
927     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
928                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
929   }
930   else
931   {
932     grp = &orig->grp;
933   }
934
935   struct ClientList *cl = GNUNET_new (struct ClientList);
936   cl->client = client;
937   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
938
939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940               "%p Client connected as origin to group %s.\n",
941               orig, GNUNET_h2s (&grp->pub_key_hash));
942
943   GNUNET_SERVER_client_set_user_context (client, grp);
944   GNUNET_SERVER_receive_done (client, GNUNET_OK);
945 }
946
947
948 /**
949  * Handle a connecting client joining a group.
950  */
951 static void
952 client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
953                          const struct GNUNET_MessageHeader *m)
954 {
955   const struct MulticastMemberJoinMessage *
956     msg = (const struct MulticastMemberJoinMessage *) m;
957   uint16_t msg_size = ntohs (msg->header.size);
958
959   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
960   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
961
962   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
963   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
964   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
965
966   struct GNUNET_CONTAINER_MultiHashMap *
967     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
968   struct Member *mem = NULL;
969   struct Group *grp;
970
971   if (NULL != grp_mem)
972   {
973     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
974   }
975   if (NULL == mem)
976   {
977     mem = GNUNET_new (struct Member);
978     mem->priv_key = msg->member_key;
979     mem->pub_key = mem_pub_key;
980     mem->pub_key_hash = mem_pub_key_hash;
981     mem->max_fragment_id = 0; // FIXME
982
983     grp = &mem->grp;
984     grp->is_origin = GNUNET_NO;
985     grp->pub_key = msg->group_pub_key;
986     grp->pub_key_hash = pub_key_hash;
987
988     if (NULL == grp_mem)
989     {
990       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
991       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
992                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
993     }
994     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
995                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
996     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
997                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
998   }
999   else
1000   {
1001     grp = &mem->grp;
1002   }
1003
1004   struct ClientList *cl = GNUNET_new (struct ClientList);
1005   cl->client = client;
1006   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1007
1008   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1009               "%p Client connected to group %s..\n",
1010               mem, GNUNET_h2s (&grp->pub_key_hash));
1011   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1013               "%p ..as member %s (%s).\n",
1014               mem, GNUNET_h2s (&mem->pub_key_hash), str);
1015   GNUNET_free (str);
1016
1017   GNUNET_SERVER_client_set_user_context (client, grp);
1018
1019   if (NULL != mem->join_dcsn)
1020   { /* Already got a join decision, send it to client. */
1021     GNUNET_SERVER_notification_context_add (nc, client);
1022     GNUNET_SERVER_notification_context_unicast (nc, client,
1023                                                 (struct GNUNET_MessageHeader *)
1024                                                 mem->join_dcsn,
1025                                                 GNUNET_NO);
1026   }
1027   else
1028   { /* First client of the group, send join request. */
1029     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1030     uint32_t relay_count = ntohl (msg->relay_count);
1031     uint16_t relay_size = relay_count * sizeof (*relays);
1032     struct GNUNET_MessageHeader *join_msg = NULL;
1033     uint16_t join_msg_size = 0;
1034     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1035         <= msg_size)
1036     {
1037       join_msg = (struct GNUNET_MessageHeader *)
1038         (((char *) &msg[1]) + relay_size);
1039       join_msg_size = ntohs (join_msg->size);
1040     }
1041     if (sizeof (*msg) + relay_size + join_msg_size != msg_size)
1042     {
1043       GNUNET_break (0);
1044       GNUNET_SERVER_client_disconnect (client);
1045       return;
1046     }
1047
1048     struct MulticastJoinRequestMessage *
1049       req = GNUNET_malloc (sizeof (*req) + join_msg_size);
1050     req->header.size = htons (sizeof (*req) + join_msg_size);
1051     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1052     req->group_pub_key = grp->pub_key;
1053     req->peer = this_peer;
1054     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1055     if (0 < join_msg_size)
1056       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1057
1058     req->member_pub_key = mem->pub_key;
1059     req->purpose.size = htonl (msg_size
1060                                - sizeof (req->header)
1061                                - sizeof (req->reserved)
1062                                - sizeof (req->signature));
1063     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1064
1065     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1066                                                &req->signature))
1067     {
1068       /* FIXME: handle error */
1069       GNUNET_assert (0);
1070     }
1071
1072     if (NULL != mem->join_req)
1073       GNUNET_free (mem->join_req);
1074     mem->join_req = req;
1075
1076     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1077     { /* No local origins, send to remote origin */
1078       cadet_send_join_request (mem);
1079     }
1080   }
1081   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1082 }
1083
1084
1085 static void
1086 client_send_join_decision (struct Member *mem,
1087                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1088 {
1089   client_send_group (&mem->grp, &hdcsn->header);
1090
1091   const struct MulticastJoinDecisionMessage *
1092     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1093   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1094   { /* Member admitted, store join_decision. */
1095     uint16_t dcsn_size = ntohs (dcsn->header.size);
1096     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1097     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1098   }
1099   else
1100   { /* Refused entry, but replay would be still possible for past members. */
1101   }
1102 }
1103
1104
1105 /**
1106  * Join decision from client.
1107  */
1108 static void
1109 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1110                            const struct GNUNET_MessageHeader *m)
1111 {
1112   struct Group *
1113     grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1114   const struct MulticastJoinDecisionMessageHeader *
1115     hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
1116
1117   if (NULL == grp)
1118   {
1119     GNUNET_break (0);
1120     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1121     return;
1122   }
1123   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124               "%p Got join decision from client for group %s..\n",
1125               grp, GNUNET_h2s (&grp->pub_key_hash));
1126
1127   struct GNUNET_CONTAINER_MultiHashMap *
1128     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1129                                                  &grp->pub_key_hash);
1130   struct Member *mem = NULL;
1131   if (NULL != grp_mem)
1132   {
1133     struct GNUNET_HashCode member_key_hash;
1134     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1135                         &member_key_hash);
1136     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1137     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138                 "%p ..and member %s: %p\n",
1139                 grp, GNUNET_h2s (&member_key_hash), mem);
1140   }
1141   if (NULL != mem)
1142   { /* Found local member */
1143     client_send_join_decision (mem, hdcsn);
1144   }
1145   else
1146   { /* Look for remote member */
1147     cadet_send_join_decision (grp, hdcsn);
1148   }
1149   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1150 }
1151
1152
1153 /**
1154  * Incoming message from a client.
1155  */
1156 static void
1157 client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
1158                                const struct GNUNET_MessageHeader *m)
1159 {
1160   struct Group *
1161     grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1162   struct GNUNET_MULTICAST_MessageHeader *out;
1163   struct Origin *orig;
1164
1165   if (NULL == grp)
1166   {
1167     GNUNET_break (0);
1168     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1169     return;
1170   }
1171   GNUNET_assert (GNUNET_YES == grp->is_origin);
1172   orig = (struct Origin *) grp;
1173
1174   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1175   out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (m);
1176   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1177   out->purpose.size = htonl (ntohs (out->header.size)
1178                              - sizeof (out->header)
1179                              - sizeof (out->hop_counter)
1180                              - sizeof (out->signature));
1181   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1182
1183   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1184                                              &out->signature))
1185   {
1186     GNUNET_assert (0);
1187   }
1188
1189   client_send_all (&grp->pub_key_hash, &out->header);
1190   if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
1191   {
1192     client_send_ack (&grp->pub_key_hash);
1193   }
1194   GNUNET_free (out);
1195
1196   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1197 }
1198
1199
1200 /**
1201  * Incoming request from a client.
1202  */
1203 static void
1204 client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1205                                const struct GNUNET_MessageHeader *m)
1206 {
1207   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1208   struct Member *mem;
1209   struct GNUNET_MULTICAST_RequestHeader *out;
1210   if (NULL == grp)
1211   {
1212     GNUNET_break (0);
1213     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1214     return;
1215   }
1216   GNUNET_assert (GNUNET_NO == grp->is_origin);
1217   mem = (struct Member *) grp;
1218
1219   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1220   out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
1221   out->member_pub_key = mem->pub_key;
1222   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1223   out->purpose.size = htonl (ntohs (out->header.size)
1224                              - sizeof (out->header)
1225                              - sizeof (out->member_pub_key)
1226                              - sizeof (out->signature));
1227   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1228
1229   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1230                                              &out->signature))
1231   {
1232     GNUNET_assert (0);
1233   }
1234
1235   uint8_t send_ack = GNUNET_YES;
1236   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1237   { /* No local origins, send to remote origin */
1238     if (NULL != mem->origin_channel)
1239     {
1240       cadet_send_channel (mem->origin_channel, &out->header);
1241       send_ack = GNUNET_NO;
1242     }
1243     else
1244     {
1245       /* FIXME: not yet connected to origin */
1246       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1247       GNUNET_free (out);
1248       return;
1249     }
1250   }
1251   if (GNUNET_YES == send_ack)
1252   {
1253     client_send_ack (&grp->pub_key_hash);
1254   }
1255   GNUNET_free (out);
1256   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1257 }
1258
1259
1260 /**
1261  * Incoming replay request from a client.
1262  */
1263 static void
1264 client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client,
1265                             const struct GNUNET_MessageHeader *m)
1266 {
1267   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1268   struct Member *mem;
1269   if (NULL == grp)
1270   {
1271     GNUNET_break (0);
1272     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1273     return;
1274   }
1275   GNUNET_assert (GNUNET_NO == grp->is_origin);
1276   mem = (struct Member *) grp;
1277
1278   struct GNUNET_CONTAINER_MultiHashMap *
1279     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1280                                                         &grp->pub_key_hash);
1281   if (NULL == grp_replay_req)
1282   {
1283     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1284     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1285                                        &grp->pub_key_hash, grp_replay_req,
1286                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1287   }
1288   struct MulticastReplayRequestMessage *
1289     rep = (struct MulticastReplayRequestMessage *) m;
1290   struct GNUNET_HashCode key_hash;
1291   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1292                    rep->flags, &key_hash);
1293   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1294                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1295
1296   if (0 == client_send_origin (&grp->pub_key_hash, m))
1297   { /* No local origin, replay from remote members / origin. */
1298     if (NULL != mem->origin_channel)
1299     {
1300       cadet_send_channel (mem->origin_channel, m);
1301     }
1302     else
1303     {
1304       /* FIXME: not yet connected to origin */
1305       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1306       return;
1307     }
1308   }
1309   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1310 }
1311
1312
1313 static int
1314 cadet_send_replay_response_cb (void *cls,
1315                                const struct GNUNET_HashCode *key_hash,
1316                                void *value)
1317 {
1318   struct Channel *chn = value;
1319   struct GNUNET_MessageHeader *msg = cls;
1320
1321   cadet_send_channel (chn, msg);
1322   return GNUNET_OK;
1323 }
1324
1325
1326 static int
1327 client_send_replay_response_cb (void *cls,
1328                                 const struct GNUNET_HashCode *key_hash,
1329                                 void *value)
1330 {
1331   struct GNUNET_SERVER_Client *client = value;
1332   struct GNUNET_MessageHeader *msg = cls;
1333
1334   client_send (client, msg);
1335   return GNUNET_OK;
1336 }
1337
1338
1339 /**
1340  * End of replay response from a client.
1341  */
1342 static void
1343 client_recv_replay_response_end (void *cls, struct GNUNET_SERVER_Client *client,
1344                                  const struct GNUNET_MessageHeader *m)
1345 {
1346   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1347   if (NULL == grp)
1348   {
1349     GNUNET_break (0);
1350     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1351     return;
1352   }
1353
1354   struct MulticastReplayResponseMessage *
1355     res = (struct MulticastReplayResponseMessage *) m;
1356
1357   struct GNUNET_HashCode key_hash;
1358   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1359                    res->flags, &key_hash);
1360
1361   struct GNUNET_CONTAINER_MultiHashMap *
1362     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1363                                                                 &grp->pub_key_hash);
1364   if (NULL != grp_replay_req_cadet)
1365   {
1366     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1367   }
1368   struct GNUNET_CONTAINER_MultiHashMap *
1369     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1370                                                                &grp->pub_key_hash);
1371   if (NULL != grp_replay_req_client)
1372   {
1373     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1374   }
1375   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1376 }
1377
1378
1379 /**
1380  * Incoming replay response from a client.
1381  *
1382  * Respond with a multicast message on success, or otherwise with an error code.
1383  */
1384 static void
1385 client_recv_replay_response (void *cls, struct GNUNET_SERVER_Client *client,
1386                              const struct GNUNET_MessageHeader *m)
1387 {
1388   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1389   if (NULL == grp)
1390   {
1391     GNUNET_break (0);
1392     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1393     return;
1394   }
1395
1396   struct MulticastReplayResponseMessage *
1397     res = (struct MulticastReplayResponseMessage *) m;
1398
1399   const struct GNUNET_MessageHeader *msg = m;
1400   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1401   {
1402     msg = (struct GNUNET_MessageHeader *) &res[1];
1403   }
1404
1405   struct GNUNET_HashCode key_hash;
1406   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1407                    res->flags, &key_hash);
1408
1409   struct GNUNET_CONTAINER_MultiHashMap *
1410     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1411                                                               &grp->pub_key_hash);
1412   if (NULL != grp_replay_req_cadet)
1413   {
1414     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1415                                                 cadet_send_replay_response_cb,
1416                                                 (void *) msg);
1417   }
1418   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1419   {
1420     struct GNUNET_CONTAINER_MultiHashMap *
1421       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1422                                                                  &grp->pub_key_hash);
1423     if (NULL != grp_replay_req_client)
1424     {
1425       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1426                                                   client_send_replay_response_cb,
1427                                                   (void *) msg);
1428     }
1429   }
1430   else
1431   {
1432     client_recv_replay_response_end (cls, client, m);
1433     return;
1434   }
1435   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1436 }
1437
1438
1439 /**
1440  * A new client connected.
1441  */
1442 static void
1443 client_notify_connect (void *cls, struct GNUNET_SERVER_Client *client)
1444 {
1445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1446   /* FIXME: send connect ACK */
1447 }
1448
1449
1450 /**
1451  * Message handlers for the server.
1452  */
1453 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1454   { client_recv_origin_start, NULL,
1455     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
1456
1457   { client_recv_member_join, NULL,
1458     GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
1459
1460   { client_recv_join_decision, NULL,
1461     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
1462
1463   { client_recv_multicast_message, NULL,
1464     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
1465
1466   { client_recv_multicast_request, NULL,
1467     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
1468
1469   { client_recv_replay_request, NULL,
1470     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
1471
1472   { client_recv_replay_response, NULL,
1473     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
1474
1475   { client_recv_replay_response_end, NULL,
1476     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, 0 },
1477
1478   { NULL, NULL, 0, 0 }
1479 };
1480
1481
1482 /**
1483  * New incoming CADET channel.
1484  */
1485 static void *
1486 cadet_notify_channel_new (void *cls,
1487                           struct GNUNET_CADET_Channel *channel,
1488                           const struct GNUNET_PeerIdentity *initiator,
1489                           const struct GNUNET_HashCode *port,
1490                           enum GNUNET_CADET_ChannelOption options)
1491 {
1492   return NULL;
1493 }
1494
1495
1496 /**
1497  * CADET channel is being destroyed.
1498  */
1499 static void
1500 cadet_notify_channel_end (void *cls,
1501                           const struct GNUNET_CADET_Channel *channel,
1502                           void *ctx)
1503 {
1504   if (NULL == ctx)
1505     return;
1506
1507   struct Channel *chn = ctx;
1508   if (NULL != chn->grp)
1509   {
1510     if (GNUNET_NO == chn->grp->is_origin)
1511     {
1512       struct Member *mem = (struct Member *) chn->grp;
1513       if (chn == mem->origin_channel)
1514         mem->origin_channel = NULL;
1515     }
1516   }
1517
1518   while (GNUNET_YES == replay_req_remove_cadet (chn));
1519
1520   GNUNET_free (chn);
1521 }
1522
1523
1524 /**
1525  * Incoming join request message from CADET.
1526  */
1527 int
1528 cadet_recv_join_request (void *cls,
1529                          struct GNUNET_CADET_Channel *channel,
1530                          void **ctx,
1531                          const struct GNUNET_MessageHeader *m)
1532 {
1533   const struct MulticastJoinRequestMessage *
1534     req = (const struct MulticastJoinRequestMessage *) m;
1535   uint16_t size = ntohs (m->size);
1536   if (size < sizeof (*req))
1537   {
1538     GNUNET_break_op (0);
1539     return GNUNET_SYSERR;
1540   }
1541   if (NULL != *ctx)
1542   {
1543     GNUNET_break_op (0);
1544     return GNUNET_SYSERR;
1545   }
1546   if (ntohl (req->purpose.size) != (size
1547                                     - sizeof (req->header)
1548                                     - sizeof (req->reserved)
1549                                     - sizeof (req->signature)))
1550   {
1551     GNUNET_break_op (0);
1552     return GNUNET_SYSERR;
1553   }
1554   if (GNUNET_OK !=
1555       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1556                                   &req->purpose, &req->signature,
1557                                   &req->member_pub_key))
1558   {
1559     GNUNET_break_op (0);
1560     return GNUNET_SYSERR;
1561   }
1562
1563   struct GNUNET_HashCode group_pub_hash;
1564   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
1565
1566   struct Channel *chn = GNUNET_malloc (sizeof *chn);
1567   chn->channel = channel;
1568   chn->group_pub_key = req->group_pub_key;
1569   chn->group_pub_hash = group_pub_hash;
1570   chn->member_pub_key = req->member_pub_key;
1571   chn->peer = req->peer;
1572   chn->join_status = JOIN_WAITING;
1573   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
1574                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1575
1576   client_send_all (&group_pub_hash, m);
1577   return GNUNET_OK;
1578 }
1579
1580
1581 /**
1582  * Incoming join decision message from CADET.
1583  */
1584 int
1585 cadet_recv_join_decision (void *cls,
1586                           struct GNUNET_CADET_Channel *channel,
1587                           void **ctx,
1588                           const struct GNUNET_MessageHeader *m)
1589 {
1590   const struct MulticastJoinDecisionMessage *
1591     dcsn = (const struct MulticastJoinDecisionMessage *) m;
1592   uint16_t size = ntohs (m->size);
1593   if (size < sizeof (*dcsn))
1594   {
1595     GNUNET_break_op (0);
1596     return GNUNET_SYSERR;
1597   }
1598   struct Channel *chn = *ctx;
1599   if (NULL == chn)
1600   {
1601     GNUNET_break_op (0);
1602     return GNUNET_SYSERR;
1603   }
1604   if (NULL == chn->grp || GNUNET_NO != chn->grp->is_origin)
1605   {
1606     GNUNET_break_op (0);
1607     return GNUNET_SYSERR;
1608   }
1609   switch (chn->join_status)
1610   {
1611   case JOIN_REFUSED:
1612     return GNUNET_SYSERR;
1613
1614   case JOIN_ADMITTED:
1615     return GNUNET_OK;
1616
1617   case JOIN_NOT_ASKED:
1618   case JOIN_WAITING:
1619     break;
1620   }
1621
1622   struct MulticastJoinDecisionMessageHeader *
1623     hdcsn = GNUNET_malloc (sizeof (*hdcsn) + size);
1624   hdcsn->peer = chn->peer;
1625   GNUNET_memcpy (&hdcsn[1], dcsn, sizeof (*hdcsn) + size);
1626
1627   struct Member *mem = (struct Member *) chn->grp;
1628   client_send_join_decision (mem, hdcsn);
1629   GNUNET_free (hdcsn);
1630   if (GNUNET_YES == ntohs (dcsn->is_admitted))
1631   {
1632     chn->join_status = JOIN_ADMITTED;
1633     return GNUNET_OK;
1634   }
1635   else
1636   {
1637     chn->join_status = JOIN_REFUSED;
1638     return GNUNET_SYSERR;
1639   }
1640 }
1641
1642 /**
1643  * Incoming multicast message from CADET.
1644  */
1645 int
1646 cadet_recv_message (void *cls,
1647                     struct GNUNET_CADET_Channel *channel,
1648                     void **ctx,
1649                     const struct GNUNET_MessageHeader *m)
1650 {
1651   const struct GNUNET_MULTICAST_MessageHeader *
1652     msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
1653   uint16_t size = ntohs (m->size);
1654   if (size < sizeof (*msg))
1655   {
1656     GNUNET_break_op (0);
1657     return GNUNET_SYSERR;
1658   }
1659   struct Channel *chn = *ctx;
1660   if (NULL == chn)
1661   {
1662     GNUNET_break_op (0);
1663     return GNUNET_SYSERR;
1664   }
1665   if (ntohl (msg->purpose.size) != (size
1666                                     - sizeof (msg->header)
1667                                     - sizeof (msg->hop_counter)
1668                                     - sizeof (msg->signature)))
1669   {
1670     GNUNET_break_op (0);
1671     return GNUNET_SYSERR;
1672   }
1673   if (GNUNET_OK !=
1674       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1675                                   &msg->purpose, &msg->signature,
1676                                   &chn->group_pub_key))
1677   {
1678     GNUNET_break_op (0);
1679     return GNUNET_SYSERR;
1680   }
1681
1682   client_send_all (&chn->group_pub_hash, m);
1683   return GNUNET_OK;
1684 }
1685
1686
1687 /**
1688  * Incoming multicast request message from CADET.
1689  */
1690 int
1691 cadet_recv_request (void *cls,
1692                     struct GNUNET_CADET_Channel *channel,
1693                     void **ctx,
1694                     const struct GNUNET_MessageHeader *m)
1695 {
1696   const struct GNUNET_MULTICAST_RequestHeader *
1697     req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
1698   uint16_t size = ntohs (m->size);
1699   if (size < sizeof (*req))
1700   {
1701     GNUNET_break_op (0);
1702     return GNUNET_SYSERR;
1703   }
1704   struct Channel *chn = *ctx;
1705   if (NULL == chn)
1706   {
1707     GNUNET_break_op (0);
1708     return GNUNET_SYSERR;
1709   }
1710   if (ntohl (req->purpose.size) != (size
1711                                     - sizeof (req->header)
1712                                     - sizeof (req->member_pub_key)
1713                                     - sizeof (req->signature)))
1714   {
1715     GNUNET_break_op (0);
1716     return GNUNET_SYSERR;
1717   }
1718   if (GNUNET_OK !=
1719       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1720                                   &req->purpose, &req->signature,
1721                                   &req->member_pub_key))
1722   {
1723     GNUNET_break_op (0);
1724     return GNUNET_SYSERR;
1725   }
1726
1727   client_send_origin (&chn->group_pub_hash, m);
1728   return GNUNET_OK;
1729 }
1730
1731
1732 /**
1733  * Incoming multicast replay request from CADET.
1734  */
1735 int
1736 cadet_recv_replay_request (void *cls,
1737                            struct GNUNET_CADET_Channel *channel,
1738                            void **ctx,
1739                            const struct GNUNET_MessageHeader *m)
1740 {
1741   struct MulticastReplayRequestMessage rep;
1742   uint16_t size = ntohs (m->size);
1743   if (size < sizeof (rep))
1744   {
1745     GNUNET_break_op (0);
1746     return GNUNET_SYSERR;
1747   }
1748   struct Channel *chn = *ctx;
1749
1750   GNUNET_memcpy (&rep, m, sizeof (rep));
1751   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1752
1753   struct GNUNET_CONTAINER_MultiHashMap *
1754     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1755                                                         &chn->grp->pub_key_hash);
1756   if (NULL == grp_replay_req)
1757   {
1758     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1759     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1760                                        &chn->grp->pub_key_hash, grp_replay_req,
1761                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1762   }
1763   struct GNUNET_HashCode key_hash;
1764   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1765                    rep.flags, &key_hash);
1766   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1767                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1768
1769   client_send_random (&chn->group_pub_hash, &rep.header);
1770   return GNUNET_OK;
1771 }
1772
1773
1774 /**
1775  * Incoming multicast replay response from CADET.
1776  */
1777 int
1778 cadet_recv_replay_response (void *cls,
1779                             struct GNUNET_CADET_Channel *channel,
1780                             void **ctx,
1781                             const struct GNUNET_MessageHeader *m)
1782 {
1783   //struct Channel *chn = *ctx;
1784
1785   /* @todo FIXME: got replay error response, send request to other members */
1786
1787   return GNUNET_OK;
1788 }
1789
1790
1791 /**
1792  * Message handlers for CADET.
1793  */
1794 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1795   { cadet_recv_join_request,
1796     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
1797
1798   { cadet_recv_message,
1799     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
1800
1801   { cadet_recv_request,
1802     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
1803
1804   { cadet_recv_replay_request,
1805     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
1806
1807   { cadet_recv_replay_response,
1808     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
1809
1810   { NULL, 0, 0 }
1811 };
1812
1813
1814 /**
1815  * Connected to core service.
1816  */
1817 static void
1818 core_connected_cb  (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1819 {
1820   this_peer = *my_identity;
1821
1822   stats = GNUNET_STATISTICS_create ("multicast", cfg);
1823   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1824   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1825   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1826   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1827   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1828   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1829   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1830
1831   cadet = GNUNET_CADET_connect (cfg, NULL,
1832                                 &cadet_notify_channel_end,
1833                                 cadet_handlers);
1834   GNUNET_assert (NULL != cadet);
1835   GNUNET_CADET_open_port (cadet, GC_u2h (GNUNET_APPLICATION_TYPE_MULTICAST),
1836                           &cadet_notify_channel_new, NULL);
1837
1838   nc = GNUNET_SERVER_notification_context_create (server, 1);
1839   GNUNET_SERVER_add_handlers (server, server_handlers);
1840   GNUNET_SERVER_disconnect_notify (server,
1841                                    &client_notify_disconnect, NULL);
1842   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1843                                  NULL);
1844 }
1845
1846
1847 /**
1848  * Service started.
1849  *
1850  * @param cls closure
1851  * @param server the initialized server
1852  * @param cfg configuration to use
1853  */
1854 static void
1855 run (void *cls,
1856      struct GNUNET_SERVER_Handle *srv,
1857      const struct GNUNET_CONFIGURATION_Handle *c)
1858 {
1859   cfg = c;
1860   server = srv;
1861   GNUNET_SERVER_connect_notify (server, &client_notify_connect, NULL);
1862   core = GNUNET_CORE_connecT (cfg, NULL, &core_connected_cb, NULL, NULL, NULL);
1863 }
1864
1865
1866 /**
1867  * The main function for the multicast service.
1868  *
1869  * @param argc number of arguments from the command line
1870  * @param argv command line arguments
1871  * @return 0 ok, 1 on error
1872  */
1873 int
1874 main (int argc, char *const *argv)
1875 {
1876   return (GNUNET_OK ==
1877           GNUNET_SERVICE_run (argc, argv, "multicast",
1878                               GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
1879 }
1880
1881 /* end of gnunet-service-multicast.c */