multicast: close cadet port
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_core_service.h"
32 #include "gnunet_cadet_service.h"
33 #include "gnunet_multicast_service.h"
34 #include "multicast.h"
35
36 /**
37  * Handle to our current configuration.
38  */
39 static const struct GNUNET_CONFIGURATION_Handle *cfg;
40
41 /**
42  * Server handle.
43  */
44 static struct GNUNET_SERVER_Handle *server;
45
46 /**
47  * Core handle.
48  * Only used during initialization.
49  */
50 static struct GNUNET_CORE_Handle *core;
51
52 /**
53  * CADET handle.
54  */
55 static struct GNUNET_CADET_Handle *cadet;
56
57 /**
58  * Identity of this peer.
59  */
60 static struct GNUNET_PeerIdentity this_peer;
61
62 /**
63  * Handle to the statistics service.
64  */
65 static struct GNUNET_STATISTICS_Handle *stats;
66
67 /**
68  * Notification context, simplifies client broadcasts.
69  */
70 static struct GNUNET_SERVER_NotificationContext *nc;
71
72 /**
73  * All connected origin clients.
74  * Group's pub_key_hash -> struct Origin * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *origins;
77
78 /**
79  * All connected member clients.
80  * Group's pub_key_hash -> struct Member * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *members;
83
84 /**
85  * Connected member clients per group.
86  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
89
90 /**
91  * Incoming CADET channels with connected children in the tree.
92  * Group's pub_key_hash -> struct Channel * (multi)
93  */
94 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
95
96 /**
97  * Outgoing CADET channels connecting to parents in the tree.
98  * Group's pub_key_hash -> struct Channel * (multi)
99  */
100 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
101
102 /**
103  * Incoming replay requests from CADET.
104  * Group's pub_key_hash ->
105  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
106  */
107 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
108
109 /**
110  * Incoming replay requests from clients.
111  * Group's pub_key_hash ->
112  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVER_Client *
113  */
114 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
115
116
117 /**
118  * Join status of a remote peer.
119  */
120 enum JoinStatus
121 {
122   JOIN_REFUSED  = -1,
123   JOIN_NOT_ASKED = 0,
124   JOIN_WAITING   = 1,
125   JOIN_ADMITTED  = 2,
126 };
127
128 enum ChannelDirection
129 {
130   DIR_INCOMING = 0,
131   DIR_OUTGOING = 1,
132 };
133
134
135 /**
136  * Context for a CADET channel.
137  */
138 struct Channel
139 {
140   /**
141    * Group the channel belongs to.
142    *
143    * Only set for outgoing channels.
144    */
145   struct Group *grp;
146
147   /**
148    * CADET channel.
149    */
150   struct GNUNET_CADET_Channel *channel;
151
152   /**
153    * CADET transmission handle.
154    */
155   struct GNUNET_CADET_TransmitHandle *tmit_handle;
156
157   /**
158    * Public key of the target group.
159    */
160   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
161
162   /**
163    * Hash of @a group_pub_key.
164    */
165   struct GNUNET_HashCode group_pub_hash;
166
167   /**
168    * Public key of the joining member.
169    */
170   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
171
172   /**
173    * Remote peer identity.
174    */
175   struct GNUNET_PeerIdentity peer;
176
177   /**
178    * Is the remote peer admitted to the group?
179    * @see enum JoinStatus
180    */
181   int8_t join_status;
182
183   /**
184    * Number of messages waiting to be sent to CADET.
185    */
186   uint8_t msgs_pending;
187
188   /**
189    * Channel direction.
190    * @see enum ChannelDirection
191    */
192   uint8_t direction;
193 };
194
195
196 /**
197  * List of connected clients.
198  */
199 struct ClientList
200 {
201   struct ClientList *prev;
202   struct ClientList *next;
203   struct GNUNET_SERVER_Client *client;
204 };
205
206 /**
207  * Common part of the client context for both an origin and member.
208  */
209 struct Group
210 {
211   struct ClientList *clients_head;
212   struct ClientList *clients_tail;
213
214   /**
215    * Public key of the group.
216    */
217   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
218
219   /**
220    * Hash of @a pub_key.
221    */
222   struct GNUNET_HashCode pub_key_hash;
223
224   /**
225    * CADET port hash.
226    */
227   struct GNUNET_HashCode cadet_port_hash;
228
229   /**
230    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
231    */
232   uint8_t is_origin;
233
234   /**
235    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
236    */
237   uint8_t disconnected;
238 };
239
240
241 /**
242  * Client context for a group's origin.
243  */
244 struct Origin
245 {
246   struct Group grp;
247
248   /**
249    * Private key of the group.
250    */
251   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
252
253   /**
254    * CADET port.
255    */
256   struct GNUNET_CADET_Port *cadet_port;
257
258   /**
259    * Last message fragment ID sent to the group.
260    */
261   uint64_t max_fragment_id;
262 };
263
264
265 /**
266  * Client context for a group member.
267  */
268 struct Member
269 {
270   struct Group grp;
271
272   /**
273    * Private key of the member.
274    */
275   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
276
277   /**
278    * Public key of the member.
279    */
280   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
281
282   /**
283    * Hash of @a pub_key.
284    */
285   struct GNUNET_HashCode pub_key_hash;
286
287   /**
288    * Join request sent to the origin / members.
289    */
290   struct MulticastJoinRequestMessage *join_req;
291
292   /**
293    * Join decision sent in reply to our request.
294    *
295    * Only a positive decision is stored here, in case of a negative decision the
296    * client is disconnected.
297    */
298   struct MulticastJoinDecisionMessageHeader *join_dcsn;
299
300   /**
301    * CADET channel to the origin.
302    */
303   struct Channel *origin_channel;
304
305   /**
306    * Peer identity of origin.
307    */
308   struct GNUNET_PeerIdentity origin;
309
310   /**
311    * Peer identity of relays (other members to connect).
312    */
313   struct GNUNET_PeerIdentity *relays;
314
315   /**
316    * Last request fragment ID sent to the origin.
317    */
318   uint64_t max_fragment_id;
319
320   /**
321    * Number of @a relays.
322    */
323   uint32_t relay_count;
324 };
325
326
327 struct ReplayRequestKey
328 {
329   uint64_t fragment_id;
330   uint64_t message_id;
331   uint64_t fragment_offset;
332   uint64_t flags;
333 };
334
335
336 /**
337  * Task run during shutdown.
338  *
339  * @param cls unused
340  */
341 static void
342 shutdown_task (void *cls)
343 {
344   if (NULL != core)
345   {
346     GNUNET_CORE_disconnecT (core);
347     core = NULL;
348   }
349   if (NULL != cadet)
350   {
351     GNUNET_CADET_disconnect (cadet);
352     cadet = NULL;
353   }
354   if (NULL != stats)
355   {
356     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
357     stats = NULL;
358   }
359   /* FIXME: do more clean up here */
360 }
361
362
363 /**
364  * Clean up origin data structures after a client disconnected.
365  */
366 static void
367 cleanup_origin (struct Origin *orig)
368 {
369   struct Group *grp = &orig->grp;
370   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
371   if (NULL != orig->cadet_port)
372   {
373     GNUNET_CADET_close_port (orig->cadet_port);
374     orig->cadet_port = NULL;
375   }
376   GNUNET_free (orig);
377 }
378
379
380 /**
381  * Clean up member data structures after a client disconnected.
382  */
383 static void
384 cleanup_member (struct Member *mem)
385 {
386   struct Group *grp = &mem->grp;
387   struct GNUNET_CONTAINER_MultiHashMap *
388     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
389                                                  &grp->pub_key_hash);
390   GNUNET_assert (NULL != grp_mem);
391   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
392
393   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
394   {
395     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
396                                           grp_mem);
397     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
398   }
399   if (NULL != mem->join_dcsn)
400   {
401     GNUNET_free (mem->join_dcsn);
402     mem->join_dcsn = NULL;
403   }
404   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
405   GNUNET_free (mem);
406 }
407
408
409 /**
410  * Clean up group data structures after a client disconnected.
411  */
412 static void
413 cleanup_group (struct Group *grp)
414 {
415   (GNUNET_YES == grp->is_origin)
416     ? cleanup_origin ((struct Origin *) grp)
417     : cleanup_member ((struct Member *) grp);
418 }
419
420
421 void
422 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
423                  uint64_t fragment_offset, uint64_t flags,
424                  struct GNUNET_HashCode *key_hash)
425 {
426   struct ReplayRequestKey key = {
427     .fragment_id = fragment_id,
428     .message_id = message_id,
429     .fragment_offset = fragment_offset,
430     .flags = flags,
431   };
432   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
433 }
434
435
436 /**
437  * Remove channel from replay request hashmap.
438  *
439  * @param chn
440  *        Channel to remove.
441  *
442  * @return #GNUNET_YES if there are more entries to process,
443  *         #GNUNET_NO when reached end of hashmap.
444  */
445 static int
446 replay_req_remove_cadet (struct Channel *chn)
447 {
448   struct GNUNET_CONTAINER_MultiHashMap *
449     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
450                                                         &chn->grp->pub_key_hash);
451   if (NULL == grp_replay_req)
452     return GNUNET_NO;
453
454   struct GNUNET_CONTAINER_MultiHashMapIterator *
455     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
456   struct GNUNET_HashCode key;
457   const struct Channel *c;
458   while (GNUNET_YES
459          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
460                                                          (const void **) &c))
461   {
462     if (c == chn)
463     {
464       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
465       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
466       return GNUNET_YES;
467     }
468   }
469   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
470   return GNUNET_NO;
471 }
472
473
474 /**
475  * Remove client from replay request hashmap.
476  *
477  * @param client
478  *        Client to remove.
479  *
480  * @return #GNUNET_YES if there are more entries to process,
481  *         #GNUNET_NO when reached end of hashmap.
482  */
483 static int
484 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client)
485 {
486   struct GNUNET_CONTAINER_MultiHashMap *
487     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
488                                                         &grp->pub_key_hash);
489   if (NULL == grp_replay_req)
490     return GNUNET_NO;
491
492   struct GNUNET_CONTAINER_MultiHashMapIterator *
493     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
494   struct GNUNET_HashCode key;
495   const struct GNUNET_SERVER_Client *c;
496   while (GNUNET_YES
497          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
498                                                          (const void **) &c))
499   {
500     if (c == client)
501     {
502       GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
503       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
504       return GNUNET_YES;
505     }
506   }
507   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
508   return GNUNET_NO;
509 }
510
511
512 /**
513  * Called whenever a client is disconnected.
514  *
515  * Frees our resources associated with that client.
516  *
517  * @param cls  Closure.
518  * @param client  Client handle.
519  */
520 static void
521 client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
522 {
523   if (NULL == client)
524     return;
525
526   struct Group *grp
527     = GNUNET_SERVER_client_get_user_context (client, struct Group);
528
529   if (NULL == grp)
530   {
531     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
532                 "%p User context is NULL in client_disconnect()\n", grp);
533     GNUNET_break (0);
534     return;
535   }
536
537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538               "%p Client (%s) disconnected from group %s\n",
539               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
540               GNUNET_h2s (&grp->pub_key_hash));
541
542   struct ClientList *cl = grp->clients_head;
543   while (NULL != cl)
544   {
545     if (cl->client == client)
546     {
547       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
548       GNUNET_free (cl);
549       break;
550     }
551     cl = cl->next;
552   }
553
554   while (GNUNET_YES == replay_req_remove_client (grp, client));
555
556   if (NULL == grp->clients_head)
557   { /* Last client disconnected. */
558 #if FIXME
559     if (NULL != grp->tmit_head)
560     { /* Send pending messages via CADET before cleanup. */
561       transmit_message (grp);
562     }
563     else
564 #endif
565     {
566       cleanup_group (grp);
567     }
568   }
569 }
570
571
572 /**
573  * Send message to a client.
574  */
575 static void
576 client_send (struct GNUNET_SERVER_Client *client,
577              const struct GNUNET_MessageHeader *msg)
578 {
579   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
580               "%p Sending message to client.\n", client);
581
582   GNUNET_SERVER_notification_context_add (nc, client);
583   GNUNET_SERVER_notification_context_unicast (nc, client, msg, GNUNET_NO);
584 }
585
586
587 /**
588  * Send message to all clients connected to the group.
589  */
590 static void
591 client_send_group (const struct Group *grp,
592                    const struct GNUNET_MessageHeader *msg)
593 {
594   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
595               "%p Sending message to all clients of the group.\n", grp);
596
597   struct ClientList *cl = grp->clients_head;
598   while (NULL != cl)
599   {
600     GNUNET_SERVER_notification_context_add (nc, cl->client);
601     GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
602     cl = cl->next;
603   }
604 }
605
606
607 /**
608  * Iterator callback for sending a message to origin clients.
609  */
610 static int
611 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
612                        void *origin)
613 {
614   const struct GNUNET_MessageHeader *msg = cls;
615   struct Member *orig = origin;
616
617   client_send_group (&orig->grp, msg);
618   return GNUNET_YES;
619 }
620
621
622 /**
623  * Iterator callback for sending a message to member clients.
624  */
625 static int
626 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
627                        void *member)
628 {
629   const struct GNUNET_MessageHeader *msg = cls;
630   struct Member *mem = member;
631
632   if (NULL != mem->join_dcsn)
633   { /* Only send message to admitted members */
634     client_send_group (&mem->grp, msg);
635   }
636   return GNUNET_YES;
637 }
638
639
640 /**
641  * Send message to all origin and member clients connected to the group.
642  *
643  * @param pub_key_hash
644  *        H(key_pub) of the group.
645  * @param msg
646  *        Message to send.
647  */
648 static int
649 client_send_all (struct GNUNET_HashCode *pub_key_hash,
650                  const struct GNUNET_MessageHeader *msg)
651 {
652   int n = 0;
653   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
654                                                    client_send_origin_cb,
655                                                    (void *) msg);
656   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
657                                                    client_send_member_cb,
658                                                    (void *) msg);
659   return n;
660 }
661
662
663 /**
664  * Send message to a random origin client or a random member client.
665  *
666  * @param grp  The group to send @a msg to.
667  * @param msg  Message to send.
668  */
669 static int
670 client_send_random (struct GNUNET_HashCode *pub_key_hash,
671                     const struct GNUNET_MessageHeader *msg)
672 {
673   int n = 0;
674   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
675                                                  (void *) msg);
676   if (n <= 0)
677     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
678                                                    (void *) msg);
679   return n;
680 }
681
682
683 /**
684  * Send message to all origin clients connected to the group.
685  *
686  * @param pub_key_hash
687  *        H(key_pub) of the group.
688  * @param msg
689  *        Message to send.
690  */
691 static int
692 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
693                     const struct GNUNET_MessageHeader *msg)
694 {
695   int n = 0;
696   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
697                                                    client_send_origin_cb,
698                                                    (void *) msg);
699   return n;
700 }
701
702
703 /**
704  * Send fragment acknowledgement to all clients of the channel.
705  *
706  * @param pub_key_hash
707  *        H(key_pub) of the group.
708  */
709 static void
710 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
711 {
712   static struct GNUNET_MessageHeader *msg = NULL;
713   if (NULL == msg)
714   {
715     msg = GNUNET_malloc (sizeof (*msg));
716     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
717     msg->size = htons (sizeof (*msg));
718   }
719   client_send_all (pub_key_hash, msg);
720 }
721
722
723 struct CadetTransmitClosure
724 {
725   struct Channel *chn;
726   const struct GNUNET_MessageHeader *msg;
727 };
728
729
730 /**
731  * CADET is ready to transmit a message.
732  */
733 size_t
734 cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
735 {
736   if (0 == buf_size)
737   {
738     /* FIXME: connection closed */
739     return 0;
740   }
741   struct CadetTransmitClosure *tcls = cls;
742   struct Channel *chn = tcls->chn;
743   uint16_t msg_size = ntohs (tcls->msg->size);
744   GNUNET_assert (msg_size <= buf_size);
745   GNUNET_memcpy (buf, tcls->msg, msg_size);
746   GNUNET_free (tcls);
747
748   if (0 == chn->msgs_pending)
749   {
750     GNUNET_break (0);
751   }
752   else if (0 == --chn->msgs_pending)
753   {
754     client_send_ack (&chn->group_pub_hash);
755   }
756   return msg_size;
757 }
758
759
760 /**
761  * Send a message to a CADET channel.
762  *
763  * @param chn  Channel.
764  * @param msg  Message.
765  */
766 static void
767 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
768 {
769   struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
770   tcls->chn = chn;
771   tcls->msg = msg;
772
773   chn->msgs_pending++;
774   chn->tmit_handle
775     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
776                                           GNUNET_TIME_UNIT_FOREVER_REL,
777                                           ntohs (msg->size),
778                                           &cadet_notify_transmit_ready,
779                                           tcls);
780   GNUNET_assert (NULL != chn->tmit_handle);
781 }
782
783
784 /**
785  * Create new outgoing CADET channel.
786  *
787  * @param peer
788  *        Peer to connect to.
789  * @param group_pub_key
790  *        Public key of group the channel belongs to.
791  * @param group_pub_hash
792  *        Hash of @a group_pub_key.
793  *
794  * @return Channel.
795  */
796 static struct Channel *
797 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
798 {
799   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
800   chn->grp = grp;
801   chn->group_pub_key = grp->pub_key;
802   chn->group_pub_hash = grp->pub_key_hash;
803   chn->peer = *peer;
804   chn->direction = DIR_OUTGOING;
805   chn->join_status = JOIN_WAITING;
806   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
807                                               &grp->cadet_port_hash,
808                                               GNUNET_CADET_OPTION_RELIABLE);
809   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
810                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
811   return chn;
812 }
813
814
815 /**
816  * Create CADET channel and send a join request.
817  */
818 static void
819 cadet_send_join_request (struct Member *mem)
820 {
821   mem->origin_channel = cadet_channel_create (&mem->grp, &mem->origin);
822   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
823
824   uint32_t i;
825   for (i = 0; i < mem->relay_count; i++)
826   {
827     struct Channel *
828       chn = cadet_channel_create (&mem->grp, &mem->relays[i]);
829     cadet_send_channel (chn, &mem->join_req->header);
830   }
831 }
832
833
834 static int
835 cadet_send_join_decision_cb (void *cls,
836                              const struct GNUNET_HashCode *group_pub_hash,
837                              void *channel)
838 {
839   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
840   struct Channel *chn = channel;
841
842   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
843       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
844   {
845     cadet_send_channel (chn, &hdcsn->header);
846     return GNUNET_NO;
847   }
848   return GNUNET_YES;
849 }
850
851
852 /**
853  * Send join decision to a remote peer.
854  */
855 static void
856 cadet_send_join_decision (struct Group *grp,
857                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
858 {
859   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
860                                               &cadet_send_join_decision_cb,
861                                               (void *) hdcsn);
862 }
863
864
865 /**
866  * Iterator callback for sending a message to origin clients.
867  */
868 static int
869 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
870                void *channel)
871 {
872   const struct GNUNET_MessageHeader *msg = cls;
873   struct Channel *chn = channel;
874   if (JOIN_ADMITTED == chn->join_status)
875     cadet_send_channel (chn, msg);
876   return GNUNET_YES;
877 }
878
879
880 /**
881  * Send message to all connected children.
882  */
883 static int
884 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
885                      const struct GNUNET_MessageHeader *msg)
886 {
887   int n = 0;
888   if (channels_in != NULL)
889     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
890                                                      cadet_send_cb, (void *) msg);
891   return n;
892 }
893
894
895 #if 0       // unused as yet
896 /**
897  * Send message to all connected parents.
898  */
899 static int
900 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
901                     const struct GNUNET_MessageHeader *msg)
902 {
903   int n = 0;
904   if (channels_in != NULL)
905     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
906                                                      cadet_send_cb, (void *) msg);
907   return n;
908 }
909 #endif
910
911
912 /**
913  * New incoming CADET channel.
914  */
915 static void *
916 cadet_notify_channel_new (void *cls,
917                           struct GNUNET_CADET_Channel *channel,
918                           const struct GNUNET_PeerIdentity *initiator,
919                           const struct GNUNET_HashCode *port,
920                           enum GNUNET_CADET_ChannelOption options)
921 {
922   return NULL;
923 }
924
925
926 /**
927  * CADET channel is being destroyed.
928  */
929 static void
930 cadet_notify_channel_end (void *cls,
931                           const struct GNUNET_CADET_Channel *channel,
932                           void *ctx)
933 {
934   if (NULL == ctx)
935     return;
936
937   struct Channel *chn = ctx;
938   if (NULL != chn->grp)
939   {
940     if (GNUNET_NO == chn->grp->is_origin)
941     {
942       struct Member *mem = (struct Member *) chn->grp;
943       if (chn == mem->origin_channel)
944         mem->origin_channel = NULL;
945     }
946   }
947
948   while (GNUNET_YES == replay_req_remove_cadet (chn));
949
950   GNUNET_free (chn);
951 }
952
953
954 static void
955 group_set_cadet_port_hash (struct Group *grp)
956 {
957   struct CadetPort {
958     struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
959     uint32_t app_type;
960   } port = {
961     grp->pub_key,
962     GNUNET_APPLICATION_TYPE_MULTICAST,
963   };
964   GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
965 }
966
967
968 /**
969  * Handle a connecting client starting an origin.
970  */
971 static void
972 client_recv_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
973                           const struct GNUNET_MessageHeader *m)
974 {
975   const struct MulticastOriginStartMessage *
976     msg = (const struct MulticastOriginStartMessage *) m;
977
978   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
979   struct GNUNET_HashCode pub_key_hash;
980
981   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
982   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
983
984   struct Origin *
985     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
986   struct Group *grp;
987
988   if (NULL == orig)
989   {
990     orig = GNUNET_new (struct Origin);
991     orig->priv_key = msg->group_key;
992     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
993     grp = &orig->grp;
994     grp->is_origin = GNUNET_YES;
995     grp->pub_key = pub_key;
996     grp->pub_key_hash = pub_key_hash;
997
998     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
999                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1000
1001     group_set_cadet_port_hash (grp);
1002     orig->cadet_port = GNUNET_CADET_open_port (cadet, &grp->cadet_port_hash,
1003                                                cadet_notify_channel_new, NULL);
1004   }
1005   else
1006   {
1007     grp = &orig->grp;
1008   }
1009
1010   struct ClientList *cl = GNUNET_new (struct ClientList);
1011   cl->client = client;
1012   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1013
1014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015               "%p Client connected as origin to group %s.\n",
1016               orig, GNUNET_h2s (&grp->pub_key_hash));
1017
1018   GNUNET_SERVER_client_set_user_context (client, grp);
1019   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1020 }
1021
1022
1023 /**
1024  * Handle a connecting client joining a group.
1025  */
1026 static void
1027 client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
1028                          const struct GNUNET_MessageHeader *m)
1029 {
1030   const struct MulticastMemberJoinMessage *
1031     msg = (const struct MulticastMemberJoinMessage *) m;
1032   uint16_t msg_size = ntohs (msg->header.size);
1033
1034   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
1035   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
1036
1037   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
1038   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
1039   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
1040
1041   struct GNUNET_CONTAINER_MultiHashMap *
1042     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
1043   struct Member *mem = NULL;
1044   struct Group *grp;
1045
1046   if (NULL != grp_mem)
1047   {
1048     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
1049   }
1050   if (NULL == mem)
1051   {
1052     mem = GNUNET_new (struct Member);
1053     mem->priv_key = msg->member_key;
1054     mem->pub_key = mem_pub_key;
1055     mem->pub_key_hash = mem_pub_key_hash;
1056     mem->max_fragment_id = 0; // FIXME
1057
1058     grp = &mem->grp;
1059     grp->is_origin = GNUNET_NO;
1060     grp->pub_key = msg->group_pub_key;
1061     grp->pub_key_hash = pub_key_hash;
1062     group_set_cadet_port_hash (grp);
1063
1064     if (NULL == grp_mem)
1065     {
1066       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1067       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
1068                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1069     }
1070     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
1071                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1072     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1073                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1074   }
1075   else
1076   {
1077     grp = &mem->grp;
1078   }
1079
1080   struct ClientList *cl = GNUNET_new (struct ClientList);
1081   cl->client = client;
1082   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1083
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "%p Client connected to group %s..\n",
1086               mem, GNUNET_h2s (&grp->pub_key_hash));
1087   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1088   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1089               "%p ..as member %s (%s).\n",
1090               mem, GNUNET_h2s (&mem->pub_key_hash), str);
1091   GNUNET_free (str);
1092
1093   GNUNET_SERVER_client_set_user_context (client, grp);
1094
1095   if (NULL != mem->join_dcsn)
1096   { /* Already got a join decision, send it to client. */
1097     GNUNET_SERVER_notification_context_add (nc, client);
1098     GNUNET_SERVER_notification_context_unicast (nc, client,
1099                                                 (struct GNUNET_MessageHeader *)
1100                                                 mem->join_dcsn,
1101                                                 GNUNET_NO);
1102   }
1103   else
1104   { /* First client of the group, send join request. */
1105     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1106     uint32_t relay_count = ntohl (msg->relay_count);
1107     uint16_t relay_size = relay_count * sizeof (*relays);
1108     struct GNUNET_MessageHeader *join_msg = NULL;
1109     uint16_t join_msg_size = 0;
1110     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1111         <= msg_size)
1112     {
1113       join_msg = (struct GNUNET_MessageHeader *)
1114         (((char *) &msg[1]) + relay_size);
1115       join_msg_size = ntohs (join_msg->size);
1116     }
1117     if (sizeof (*msg) + relay_size + join_msg_size != msg_size)
1118     {
1119       GNUNET_break (0);
1120       GNUNET_SERVER_client_disconnect (client);
1121       return;
1122     }
1123
1124     struct MulticastJoinRequestMessage *
1125       req = GNUNET_malloc (sizeof (*req) + join_msg_size);
1126     req->header.size = htons (sizeof (*req) + join_msg_size);
1127     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1128     req->group_pub_key = grp->pub_key;
1129     req->peer = this_peer;
1130     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1131     if (0 < join_msg_size)
1132       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1133
1134     req->member_pub_key = mem->pub_key;
1135     req->purpose.size = htonl (msg_size
1136                                - sizeof (req->header)
1137                                - sizeof (req->reserved)
1138                                - sizeof (req->signature));
1139     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1140
1141     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1142                                                &req->signature))
1143     {
1144       /* FIXME: handle error */
1145       GNUNET_assert (0);
1146     }
1147
1148     if (NULL != mem->join_req)
1149       GNUNET_free (mem->join_req);
1150     mem->join_req = req;
1151
1152     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1153     { /* No local origins, send to remote origin */
1154       cadet_send_join_request (mem);
1155     }
1156   }
1157   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1158 }
1159
1160
1161 static void
1162 client_send_join_decision (struct Member *mem,
1163                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1164 {
1165   client_send_group (&mem->grp, &hdcsn->header);
1166
1167   const struct MulticastJoinDecisionMessage *
1168     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1169   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1170   { /* Member admitted, store join_decision. */
1171     uint16_t dcsn_size = ntohs (dcsn->header.size);
1172     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1173     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1174   }
1175   else
1176   { /* Refused entry, but replay would be still possible for past members. */
1177   }
1178 }
1179
1180
1181 /**
1182  * Join decision from client.
1183  */
1184 static void
1185 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1186                            const struct GNUNET_MessageHeader *m)
1187 {
1188   struct Group *
1189     grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1190   const struct MulticastJoinDecisionMessageHeader *
1191     hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
1192
1193   if (NULL == grp)
1194   {
1195     GNUNET_break (0);
1196     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1197     return;
1198   }
1199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200               "%p Got join decision from client for group %s..\n",
1201               grp, GNUNET_h2s (&grp->pub_key_hash));
1202
1203   struct GNUNET_CONTAINER_MultiHashMap *
1204     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1205                                                  &grp->pub_key_hash);
1206   struct Member *mem = NULL;
1207   if (NULL != grp_mem)
1208   {
1209     struct GNUNET_HashCode member_key_hash;
1210     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1211                         &member_key_hash);
1212     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1213     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214                 "%p ..and member %s: %p\n",
1215                 grp, GNUNET_h2s (&member_key_hash), mem);
1216   }
1217   if (NULL != mem)
1218   { /* Found local member */
1219     client_send_join_decision (mem, hdcsn);
1220   }
1221   else
1222   { /* Look for remote member */
1223     cadet_send_join_decision (grp, hdcsn);
1224   }
1225   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1226 }
1227
1228
1229 /**
1230  * Incoming message from a client.
1231  */
1232 static void
1233 client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
1234                                const struct GNUNET_MessageHeader *m)
1235 {
1236   struct Group *
1237     grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1238   struct GNUNET_MULTICAST_MessageHeader *out;
1239   struct Origin *orig;
1240
1241   if (NULL == grp)
1242   {
1243     GNUNET_break (0);
1244     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1245     return;
1246   }
1247   GNUNET_assert (GNUNET_YES == grp->is_origin);
1248   orig = (struct Origin *) grp;
1249
1250   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1251   out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (m);
1252   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1253   out->purpose.size = htonl (ntohs (out->header.size)
1254                              - sizeof (out->header)
1255                              - sizeof (out->hop_counter)
1256                              - sizeof (out->signature));
1257   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1258
1259   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1260                                              &out->signature))
1261   {
1262     GNUNET_assert (0);
1263   }
1264
1265   client_send_all (&grp->pub_key_hash, &out->header);
1266   if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
1267   {
1268     client_send_ack (&grp->pub_key_hash);
1269   }
1270   GNUNET_free (out);
1271
1272   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1273 }
1274
1275
1276 /**
1277  * Incoming request from a client.
1278  */
1279 static void
1280 client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1281                                const struct GNUNET_MessageHeader *m)
1282 {
1283   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1284   struct Member *mem;
1285   struct GNUNET_MULTICAST_RequestHeader *out;
1286   if (NULL == grp)
1287   {
1288     GNUNET_break (0);
1289     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1290     return;
1291   }
1292   GNUNET_assert (GNUNET_NO == grp->is_origin);
1293   mem = (struct Member *) grp;
1294
1295   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1296   out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
1297   out->member_pub_key = mem->pub_key;
1298   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1299   out->purpose.size = htonl (ntohs (out->header.size)
1300                              - sizeof (out->header)
1301                              - sizeof (out->member_pub_key)
1302                              - sizeof (out->signature));
1303   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1304
1305   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1306                                              &out->signature))
1307   {
1308     GNUNET_assert (0);
1309   }
1310
1311   uint8_t send_ack = GNUNET_YES;
1312   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1313   { /* No local origins, send to remote origin */
1314     if (NULL != mem->origin_channel)
1315     {
1316       cadet_send_channel (mem->origin_channel, &out->header);
1317       send_ack = GNUNET_NO;
1318     }
1319     else
1320     {
1321       /* FIXME: not yet connected to origin */
1322       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1323       GNUNET_free (out);
1324       return;
1325     }
1326   }
1327   if (GNUNET_YES == send_ack)
1328   {
1329     client_send_ack (&grp->pub_key_hash);
1330   }
1331   GNUNET_free (out);
1332   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1333 }
1334
1335
1336 /**
1337  * Incoming replay request from a client.
1338  */
1339 static void
1340 client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client,
1341                             const struct GNUNET_MessageHeader *m)
1342 {
1343   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1344   struct Member *mem;
1345   if (NULL == grp)
1346   {
1347     GNUNET_break (0);
1348     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1349     return;
1350   }
1351   GNUNET_assert (GNUNET_NO == grp->is_origin);
1352   mem = (struct Member *) grp;
1353
1354   struct GNUNET_CONTAINER_MultiHashMap *
1355     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1356                                                         &grp->pub_key_hash);
1357   if (NULL == grp_replay_req)
1358   {
1359     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1360     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1361                                        &grp->pub_key_hash, grp_replay_req,
1362                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1363   }
1364   struct MulticastReplayRequestMessage *
1365     rep = (struct MulticastReplayRequestMessage *) m;
1366   struct GNUNET_HashCode key_hash;
1367   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1368                    rep->flags, &key_hash);
1369   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1370                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1371
1372   if (0 == client_send_origin (&grp->pub_key_hash, m))
1373   { /* No local origin, replay from remote members / origin. */
1374     if (NULL != mem->origin_channel)
1375     {
1376       cadet_send_channel (mem->origin_channel, m);
1377     }
1378     else
1379     {
1380       /* FIXME: not yet connected to origin */
1381       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1382       return;
1383     }
1384   }
1385   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1386 }
1387
1388
1389 static int
1390 cadet_send_replay_response_cb (void *cls,
1391                                const struct GNUNET_HashCode *key_hash,
1392                                void *value)
1393 {
1394   struct Channel *chn = value;
1395   struct GNUNET_MessageHeader *msg = cls;
1396
1397   cadet_send_channel (chn, msg);
1398   return GNUNET_OK;
1399 }
1400
1401
1402 static int
1403 client_send_replay_response_cb (void *cls,
1404                                 const struct GNUNET_HashCode *key_hash,
1405                                 void *value)
1406 {
1407   struct GNUNET_SERVER_Client *client = value;
1408   struct GNUNET_MessageHeader *msg = cls;
1409
1410   client_send (client, msg);
1411   return GNUNET_OK;
1412 }
1413
1414
1415 /**
1416  * End of replay response from a client.
1417  */
1418 static void
1419 client_recv_replay_response_end (void *cls, struct GNUNET_SERVER_Client *client,
1420                                  const struct GNUNET_MessageHeader *m)
1421 {
1422   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1423   if (NULL == grp)
1424   {
1425     GNUNET_break (0);
1426     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1427     return;
1428   }
1429
1430   struct MulticastReplayResponseMessage *
1431     res = (struct MulticastReplayResponseMessage *) m;
1432
1433   struct GNUNET_HashCode key_hash;
1434   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1435                    res->flags, &key_hash);
1436
1437   struct GNUNET_CONTAINER_MultiHashMap *
1438     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1439                                                                 &grp->pub_key_hash);
1440   if (NULL != grp_replay_req_cadet)
1441   {
1442     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1443   }
1444   struct GNUNET_CONTAINER_MultiHashMap *
1445     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1446                                                                &grp->pub_key_hash);
1447   if (NULL != grp_replay_req_client)
1448   {
1449     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1450   }
1451   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1452 }
1453
1454
1455 /**
1456  * Incoming replay response from a client.
1457  *
1458  * Respond with a multicast message on success, or otherwise with an error code.
1459  */
1460 static void
1461 client_recv_replay_response (void *cls, struct GNUNET_SERVER_Client *client,
1462                              const struct GNUNET_MessageHeader *m)
1463 {
1464   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
1465   if (NULL == grp)
1466   {
1467     GNUNET_break (0);
1468     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1469     return;
1470   }
1471
1472   struct MulticastReplayResponseMessage *
1473     res = (struct MulticastReplayResponseMessage *) m;
1474
1475   const struct GNUNET_MessageHeader *msg = m;
1476   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1477   {
1478     msg = (struct GNUNET_MessageHeader *) &res[1];
1479   }
1480
1481   struct GNUNET_HashCode key_hash;
1482   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1483                    res->flags, &key_hash);
1484
1485   struct GNUNET_CONTAINER_MultiHashMap *
1486     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1487                                                               &grp->pub_key_hash);
1488   if (NULL != grp_replay_req_cadet)
1489   {
1490     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1491                                                 cadet_send_replay_response_cb,
1492                                                 (void *) msg);
1493   }
1494   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1495   {
1496     struct GNUNET_CONTAINER_MultiHashMap *
1497       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1498                                                                  &grp->pub_key_hash);
1499     if (NULL != grp_replay_req_client)
1500     {
1501       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1502                                                   client_send_replay_response_cb,
1503                                                   (void *) msg);
1504     }
1505   }
1506   else
1507   {
1508     client_recv_replay_response_end (cls, client, m);
1509     return;
1510   }
1511   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1512 }
1513
1514
1515 /**
1516  * A new client connected.
1517  */
1518 static void
1519 client_notify_connect (void *cls, struct GNUNET_SERVER_Client *client)
1520 {
1521   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1522   /* FIXME: send connect ACK */
1523 }
1524
1525
1526 /**
1527  * Message handlers for the server.
1528  */
1529 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1530   { client_recv_origin_start, NULL,
1531     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
1532
1533   { client_recv_member_join, NULL,
1534     GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
1535
1536   { client_recv_join_decision, NULL,
1537     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
1538
1539   { client_recv_multicast_message, NULL,
1540     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
1541
1542   { client_recv_multicast_request, NULL,
1543     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
1544
1545   { client_recv_replay_request, NULL,
1546     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
1547
1548   { client_recv_replay_response, NULL,
1549     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
1550
1551   { client_recv_replay_response_end, NULL,
1552     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, 0 },
1553
1554   { NULL, NULL, 0, 0 }
1555 };
1556
1557
1558 /**
1559  * Incoming join request message from CADET.
1560  */
1561 int
1562 cadet_recv_join_request (void *cls,
1563                          struct GNUNET_CADET_Channel *channel,
1564                          void **ctx,
1565                          const struct GNUNET_MessageHeader *m)
1566 {
1567   const struct MulticastJoinRequestMessage *
1568     req = (const struct MulticastJoinRequestMessage *) m;
1569   uint16_t size = ntohs (m->size);
1570   if (size < sizeof (*req))
1571   {
1572     GNUNET_break_op (0);
1573     return GNUNET_SYSERR;
1574   }
1575   if (NULL != *ctx)
1576   {
1577     GNUNET_break_op (0);
1578     return GNUNET_SYSERR;
1579   }
1580   if (ntohl (req->purpose.size) != (size
1581                                     - sizeof (req->header)
1582                                     - sizeof (req->reserved)
1583                                     - sizeof (req->signature)))
1584   {
1585     GNUNET_break_op (0);
1586     return GNUNET_SYSERR;
1587   }
1588   if (GNUNET_OK !=
1589       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1590                                   &req->purpose, &req->signature,
1591                                   &req->member_pub_key))
1592   {
1593     GNUNET_break_op (0);
1594     return GNUNET_SYSERR;
1595   }
1596
1597   struct GNUNET_HashCode group_pub_hash;
1598   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
1599
1600   struct Channel *chn = GNUNET_malloc (sizeof *chn);
1601   chn->channel = channel;
1602   chn->group_pub_key = req->group_pub_key;
1603   chn->group_pub_hash = group_pub_hash;
1604   chn->member_pub_key = req->member_pub_key;
1605   chn->peer = req->peer;
1606   chn->join_status = JOIN_WAITING;
1607   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
1608                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1609
1610   client_send_all (&group_pub_hash, m);
1611   return GNUNET_OK;
1612 }
1613
1614
1615 /**
1616  * Incoming join decision message from CADET.
1617  */
1618 int
1619 cadet_recv_join_decision (void *cls,
1620                           struct GNUNET_CADET_Channel *channel,
1621                           void **ctx,
1622                           const struct GNUNET_MessageHeader *m)
1623 {
1624   const struct MulticastJoinDecisionMessage *
1625     dcsn = (const struct MulticastJoinDecisionMessage *) m;
1626   uint16_t size = ntohs (m->size);
1627   if (size < sizeof (*dcsn))
1628   {
1629     GNUNET_break_op (0);
1630     return GNUNET_SYSERR;
1631   }
1632   struct Channel *chn = *ctx;
1633   if (NULL == chn)
1634   {
1635     GNUNET_break_op (0);
1636     return GNUNET_SYSERR;
1637   }
1638   if (NULL == chn->grp || GNUNET_NO != chn->grp->is_origin)
1639   {
1640     GNUNET_break_op (0);
1641     return GNUNET_SYSERR;
1642   }
1643   switch (chn->join_status)
1644   {
1645   case JOIN_REFUSED:
1646     return GNUNET_SYSERR;
1647
1648   case JOIN_ADMITTED:
1649     return GNUNET_OK;
1650
1651   case JOIN_NOT_ASKED:
1652   case JOIN_WAITING:
1653     break;
1654   }
1655
1656   struct MulticastJoinDecisionMessageHeader *
1657     hdcsn = GNUNET_malloc (sizeof (*hdcsn) + size);
1658   hdcsn->peer = chn->peer;
1659   GNUNET_memcpy (&hdcsn[1], dcsn, sizeof (*hdcsn) + size);
1660
1661   struct Member *mem = (struct Member *) chn->grp;
1662   client_send_join_decision (mem, hdcsn);
1663   GNUNET_free (hdcsn);
1664   if (GNUNET_YES == ntohs (dcsn->is_admitted))
1665   {
1666     chn->join_status = JOIN_ADMITTED;
1667     return GNUNET_OK;
1668   }
1669   else
1670   {
1671     chn->join_status = JOIN_REFUSED;
1672     return GNUNET_SYSERR;
1673   }
1674 }
1675
1676 /**
1677  * Incoming multicast message from CADET.
1678  */
1679 int
1680 cadet_recv_message (void *cls,
1681                     struct GNUNET_CADET_Channel *channel,
1682                     void **ctx,
1683                     const struct GNUNET_MessageHeader *m)
1684 {
1685   const struct GNUNET_MULTICAST_MessageHeader *
1686     msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
1687   uint16_t size = ntohs (m->size);
1688   if (size < sizeof (*msg))
1689   {
1690     GNUNET_break_op (0);
1691     return GNUNET_SYSERR;
1692   }
1693   struct Channel *chn = *ctx;
1694   if (NULL == chn)
1695   {
1696     GNUNET_break_op (0);
1697     return GNUNET_SYSERR;
1698   }
1699   if (ntohl (msg->purpose.size) != (size
1700                                     - sizeof (msg->header)
1701                                     - sizeof (msg->hop_counter)
1702                                     - sizeof (msg->signature)))
1703   {
1704     GNUNET_break_op (0);
1705     return GNUNET_SYSERR;
1706   }
1707   if (GNUNET_OK !=
1708       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1709                                   &msg->purpose, &msg->signature,
1710                                   &chn->group_pub_key))
1711   {
1712     GNUNET_break_op (0);
1713     return GNUNET_SYSERR;
1714   }
1715
1716   client_send_all (&chn->group_pub_hash, m);
1717   return GNUNET_OK;
1718 }
1719
1720
1721 /**
1722  * Incoming multicast request message from CADET.
1723  */
1724 int
1725 cadet_recv_request (void *cls,
1726                     struct GNUNET_CADET_Channel *channel,
1727                     void **ctx,
1728                     const struct GNUNET_MessageHeader *m)
1729 {
1730   const struct GNUNET_MULTICAST_RequestHeader *
1731     req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
1732   uint16_t size = ntohs (m->size);
1733   if (size < sizeof (*req))
1734   {
1735     GNUNET_break_op (0);
1736     return GNUNET_SYSERR;
1737   }
1738   struct Channel *chn = *ctx;
1739   if (NULL == chn)
1740   {
1741     GNUNET_break_op (0);
1742     return GNUNET_SYSERR;
1743   }
1744   if (ntohl (req->purpose.size) != (size
1745                                     - sizeof (req->header)
1746                                     - sizeof (req->member_pub_key)
1747                                     - sizeof (req->signature)))
1748   {
1749     GNUNET_break_op (0);
1750     return GNUNET_SYSERR;
1751   }
1752   if (GNUNET_OK !=
1753       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1754                                   &req->purpose, &req->signature,
1755                                   &req->member_pub_key))
1756   {
1757     GNUNET_break_op (0);
1758     return GNUNET_SYSERR;
1759   }
1760
1761   client_send_origin (&chn->group_pub_hash, m);
1762   return GNUNET_OK;
1763 }
1764
1765
1766 /**
1767  * Incoming multicast replay request from CADET.
1768  */
1769 int
1770 cadet_recv_replay_request (void *cls,
1771                            struct GNUNET_CADET_Channel *channel,
1772                            void **ctx,
1773                            const struct GNUNET_MessageHeader *m)
1774 {
1775   struct MulticastReplayRequestMessage rep;
1776   uint16_t size = ntohs (m->size);
1777   if (size < sizeof (rep))
1778   {
1779     GNUNET_break_op (0);
1780     return GNUNET_SYSERR;
1781   }
1782   struct Channel *chn = *ctx;
1783
1784   GNUNET_memcpy (&rep, m, sizeof (rep));
1785   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1786
1787   struct GNUNET_CONTAINER_MultiHashMap *
1788     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1789                                                         &chn->grp->pub_key_hash);
1790   if (NULL == grp_replay_req)
1791   {
1792     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1793     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1794                                        &chn->grp->pub_key_hash, grp_replay_req,
1795                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1796   }
1797   struct GNUNET_HashCode key_hash;
1798   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1799                    rep.flags, &key_hash);
1800   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1801                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1802
1803   client_send_random (&chn->group_pub_hash, &rep.header);
1804   return GNUNET_OK;
1805 }
1806
1807
1808 /**
1809  * Incoming multicast replay response from CADET.
1810  */
1811 int
1812 cadet_recv_replay_response (void *cls,
1813                             struct GNUNET_CADET_Channel *channel,
1814                             void **ctx,
1815                             const struct GNUNET_MessageHeader *m)
1816 {
1817   //struct Channel *chn = *ctx;
1818
1819   /* @todo FIXME: got replay error response, send request to other members */
1820
1821   return GNUNET_OK;
1822 }
1823
1824
1825 /**
1826  * Message handlers for CADET.
1827  */
1828 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1829   { cadet_recv_join_request,
1830     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
1831
1832   { cadet_recv_message,
1833     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
1834
1835   { cadet_recv_request,
1836     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
1837
1838   { cadet_recv_replay_request,
1839     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
1840
1841   { cadet_recv_replay_response,
1842     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
1843
1844   { NULL, 0, 0 }
1845 };
1846
1847
1848 /**
1849  * Connected to core service.
1850  */
1851 static void
1852 core_connected_cb  (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1853 {
1854   this_peer = *my_identity;
1855
1856   stats = GNUNET_STATISTICS_create ("multicast", cfg);
1857   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1858   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1859   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1860   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1861   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1862   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1863   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1864
1865   cadet = GNUNET_CADET_connect (cfg, NULL,
1866                                 &cadet_notify_channel_end,
1867                                 cadet_handlers);
1868   GNUNET_assert (NULL != cadet);
1869
1870   nc = GNUNET_SERVER_notification_context_create (server, 1);
1871   GNUNET_SERVER_add_handlers (server, server_handlers);
1872   GNUNET_SERVER_disconnect_notify (server,
1873                                    &client_notify_disconnect, NULL);
1874   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1875                                  NULL);
1876 }
1877
1878
1879 /**
1880  * Service started.
1881  *
1882  * @param cls closure
1883  * @param server the initialized server
1884  * @param cfg configuration to use
1885  */
1886 static void
1887 run (void *cls,
1888      struct GNUNET_SERVER_Handle *srv,
1889      const struct GNUNET_CONFIGURATION_Handle *c)
1890 {
1891   cfg = c;
1892   server = srv;
1893   GNUNET_SERVER_connect_notify (server, &client_notify_connect, NULL);
1894   core = GNUNET_CORE_connecT (cfg, NULL, &core_connected_cb, NULL, NULL, NULL);
1895 }
1896
1897
1898 /**
1899  * The main function for the multicast service.
1900  *
1901  * @param argc number of arguments from the command line
1902  * @param argv command line arguments
1903  * @return 0 ok, 1 on error
1904  */
1905 int
1906 main (int argc, char *const *argv)
1907 {
1908   return (GNUNET_OK ==
1909           GNUNET_SERVICE_run (argc, argv, "multicast",
1910                               GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
1911 }
1912
1913 /* end of gnunet-service-multicast.c */