bugfixes in the multicast service
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "multicast.h"
34
35 /**
36  * Handle to our current configuration.
37  */
38 static const struct GNUNET_CONFIGURATION_Handle *cfg;
39
40 /**
41  * Service handle.
42  */
43 static struct GNUNET_SERVICE_Handle *service;
44
45 /**
46  * CADET handle.
47  */
48 static struct GNUNET_CADET_Handle *cadet;
49
50 /**
51  * Identity of this peer.
52  */
53 static struct GNUNET_PeerIdentity this_peer;
54
55 /**
56  * Handle to the statistics service.
57  */
58 static struct GNUNET_STATISTICS_Handle *stats;
59
60 /**
61  * All connected origin clients.
62  * Group's pub_key_hash -> struct Origin * (uniq)
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *origins;
65
66 /**
67  * All connected member clients.
68  * Group's pub_key_hash -> struct Member * (multi)
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *members;
71
72 /**
73  * Connected member clients per group.
74  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
77
78 /**
79  * Incoming CADET channels with connected children in the tree.
80  * Group's pub_key_hash -> struct Channel * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
83
84 /**
85  * Outgoing CADET channels connecting to parents in the tree.
86  * Group's pub_key_hash -> struct Channel * (multi)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
89
90 /**
91  * Incoming replay requests from CADET.
92  * Group's pub_key_hash ->
93  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
94  */
95 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
96
97 /**
98  * Incoming replay requests from clients.
99  * Group's pub_key_hash ->
100  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client *
101  */
102 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
103
104
105 /**
106  * Join status of a remote peer.
107  */
108 enum JoinStatus
109 {
110   JOIN_REFUSED  = -1,
111   JOIN_NOT_ASKED = 0,
112   JOIN_WAITING   = 1,
113   JOIN_ADMITTED  = 2,
114 };
115
116 enum ChannelDirection
117 {
118   DIR_INCOMING = 0,
119   DIR_OUTGOING = 1,
120 };
121
122
123 /**
124  * Context for a CADET channel.
125  */
126 struct Channel
127 {
128   /**
129    * Group the channel belongs to.
130    *
131    * Only set for outgoing channels.
132    */
133   struct Group *group;
134
135   /**
136    * CADET channel.
137    */
138   struct GNUNET_CADET_Channel *channel;
139
140   /**
141    * CADET transmission handle.
142    */
143   struct GNUNET_CADET_TransmitHandle *tmit_handle;
144
145   /**
146    * Public key of the target group.
147    */
148   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
149
150   /**
151    * Hash of @a group_pub_key.
152    */
153   struct GNUNET_HashCode group_pub_hash;
154
155   /**
156    * Public key of the joining member.
157    */
158   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
159
160   /**
161    * Remote peer identity.
162    */
163   struct GNUNET_PeerIdentity peer;
164
165   /**
166    * Is the remote peer admitted to the group?
167    * @see enum JoinStatus
168    */
169   int8_t join_status;
170
171   /**
172    * Number of messages waiting to be sent to CADET.
173    */
174   uint8_t msgs_pending;
175
176   /**
177    * Channel direction.
178    * @see enum ChannelDirection
179    */
180   uint8_t direction;
181 };
182
183
184 /**
185  * List of connected clients.
186  */
187 struct ClientList
188 {
189   struct ClientList *prev;
190   struct ClientList *next;
191   struct GNUNET_SERVICE_Client *client;
192 };
193
194
195 /**
196  * Client context for an origin or member.
197  */
198 struct Group
199 {
200   struct ClientList *clients_head;
201   struct ClientList *clients_tail;
202
203   /**
204    * Public key of the group.
205    */
206   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
207
208   /**
209    * Hash of @a pub_key.
210    */
211   struct GNUNET_HashCode pub_key_hash;
212
213   /**
214    * CADET port hash.
215    */
216   struct GNUNET_HashCode cadet_port_hash;
217
218   /**
219    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
220    */
221   uint8_t disconnected;
222
223   /**
224    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
225    */
226   uint8_t is_origin;
227
228   union {
229     struct Origin *origin;
230     struct Member *member;
231   };
232 };
233
234
235 /**
236 * Client context for a group's origin.
237  */
238 struct Origin
239 {
240   struct Group group;
241
242   /**
243    * Private key of the group.
244    */
245   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
246
247   /**
248    * CADET port.
249    */
250   struct GNUNET_CADET_Port *cadet_port;
251
252   /**
253    * Last message fragment ID sent to the group.
254    */
255   uint64_t max_fragment_id;
256 };
257
258
259 /**
260  * Client context for a group member.
261  */
262 struct Member
263 {
264   struct Group group;
265
266   /**
267    * Private key of the member.
268    */
269   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
270
271   /**
272    * Public key of the member.
273    */
274   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
275
276   /**
277    * Hash of @a pub_key.
278    */
279   struct GNUNET_HashCode pub_key_hash;
280
281   /**
282    * Join request sent to the origin / members.
283    */
284   struct MulticastJoinRequestMessage *join_req;
285
286   /**
287    * Join decision sent in reply to our request.
288    *
289    * Only a positive decision is stored here, in case of a negative decision the
290    * client is disconnected.
291    */
292   struct MulticastJoinDecisionMessageHeader *join_dcsn;
293
294   /**
295    * CADET channel to the origin.
296    */
297   struct Channel *origin_channel;
298
299   /**
300    * Peer identity of origin.
301    */
302   struct GNUNET_PeerIdentity origin;
303
304   /**
305    * Peer identity of relays (other members to connect).
306    */
307   struct GNUNET_PeerIdentity *relays;
308
309   /**
310    * Last request fragment ID sent to the origin.
311    */
312   uint64_t max_fragment_id;
313
314   /**
315    * Number of @a relays.
316    */
317   uint32_t relay_count;
318 };
319
320
321 /**
322  * Client context.
323  */
324 struct Client {
325   struct GNUNET_SERVICE_Client *client;
326   struct Group *group;
327 };
328
329
330 struct ReplayRequestKey
331 {
332   uint64_t fragment_id;
333   uint64_t message_id;
334   uint64_t fragment_offset;
335   uint64_t flags;
336 };
337
338
339 /**
340  * Task run during shutdown.
341  *
342  * @param cls unused
343  */
344 static void
345 shutdown_task (void *cls)
346 {
347   if (NULL != cadet)
348   {
349     GNUNET_CADET_disconnect (cadet);
350     cadet = NULL;
351   }
352   if (NULL != stats)
353   {
354     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
355     stats = NULL;
356   }
357   /* FIXME: do more clean up here */
358 }
359
360
361 /**
362  * Clean up origin data structures after a client disconnected.
363  */
364 static void
365 cleanup_origin (struct Origin *orig)
366 {
367   struct Group *grp = &orig->group;
368   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
369   if (NULL != orig->cadet_port)
370   {
371     GNUNET_CADET_close_port (orig->cadet_port);
372     orig->cadet_port = NULL;
373   }
374   GNUNET_free (orig);
375 }
376
377
378 /**
379  * Clean up member data structures after a client disconnected.
380  */
381 static void
382 cleanup_member (struct Member *mem)
383 {
384   struct Group *grp = &mem->group;
385   struct GNUNET_CONTAINER_MultiHashMap *
386     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
387                                                  &grp->pub_key_hash);
388   GNUNET_assert (NULL != grp_mem);
389   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
390
391   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
392   {
393     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
394                                           grp_mem);
395     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
396   }
397   if (NULL != mem->join_dcsn)
398   {
399     GNUNET_free (mem->join_dcsn);
400     mem->join_dcsn = NULL;
401   }
402   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
403   GNUNET_free (mem);
404 }
405
406
407 /**
408  * Clean up group data structures after a client disconnected.
409  */
410 static void
411 cleanup_group (struct Group *grp)
412 {
413   (GNUNET_YES == grp->is_origin)
414     ? cleanup_origin (grp->origin)
415     : cleanup_member (grp->member);
416 }
417
418
419 void
420 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
421                  uint64_t fragment_offset, uint64_t flags,
422                  struct GNUNET_HashCode *key_hash)
423 {
424   struct ReplayRequestKey key = {
425     .fragment_id = fragment_id,
426     .message_id = message_id,
427     .fragment_offset = fragment_offset,
428     .flags = flags,
429   };
430   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
431 }
432
433
434 /**
435  * Remove channel from replay request hashmap.
436  *
437  * @param chn
438  *        Channel to remove.
439  *
440  * @return #GNUNET_YES if there are more entries to process,
441  *         #GNUNET_NO when reached end of hashmap.
442  */
443 static int
444 replay_req_remove_cadet (struct Channel *chn)
445 {
446   struct GNUNET_CONTAINER_MultiHashMap *
447     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
448                                                         &chn->group->pub_key_hash);
449   if (NULL == grp_replay_req)
450     return GNUNET_NO;
451
452   struct GNUNET_CONTAINER_MultiHashMapIterator *
453     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
454   struct GNUNET_HashCode key;
455   const struct Channel *c;
456   while (GNUNET_YES
457          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
458                                                          (const void **) &c))
459   {
460     if (c == chn)
461     {
462       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
463       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
464       return GNUNET_YES;
465     }
466   }
467   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
468   return GNUNET_NO;
469 }
470
471
472 /**
473  * Remove client from replay request hashmap.
474  *
475  * @param client
476  *        Client to remove.
477  *
478  * @return #GNUNET_YES if there are more entries to process,
479  *         #GNUNET_NO when reached end of hashmap.
480  */
481 static int
482 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client)
483 {
484   struct GNUNET_CONTAINER_MultiHashMap *
485     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
486                                                         &grp->pub_key_hash);
487   if (NULL == grp_replay_req)
488     return GNUNET_NO;
489
490   struct GNUNET_CONTAINER_MultiHashMapIterator *
491     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
492   struct GNUNET_HashCode key;
493   const struct GNUNET_SERVICE_Client *c;
494   while (GNUNET_YES
495          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
496                                                          (const void **) &c))
497   {
498     if (c == client)
499     {
500       GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
501       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
502       return GNUNET_YES;
503     }
504   }
505   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
506   return GNUNET_NO;
507 }
508
509
510 /**
511  * Send message to a client.
512  */
513 static void
514 client_send (struct GNUNET_SERVICE_Client *client,
515              const struct GNUNET_MessageHeader *msg)
516 {
517   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
518               "%p Sending message to client.\n", client);
519
520   struct GNUNET_MQ_Envelope *
521     env = GNUNET_MQ_msg_copy (msg);
522
523   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
524                   env);
525 }
526
527
528 /**
529  * Send message to all clients connected to the group.
530  */
531 static void
532 client_send_group (const struct Group *grp,
533                    const struct GNUNET_MessageHeader *msg)
534 {
535   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
536               "%p Sending message to all clients of the group.\n", grp);
537
538   struct ClientList *cl = grp->clients_head;
539   while (NULL != cl)
540   {
541     struct GNUNET_MQ_Envelope *
542       env = GNUNET_MQ_msg_copy (msg);
543
544     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
545                     env);
546     cl = cl->next;
547   }
548 }
549
550
551 /**
552  * Iterator callback for sending a message to origin clients.
553  */
554 static int
555 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
556                        void *origin)
557 {
558   const struct GNUNET_MessageHeader *msg = cls;
559   struct Member *orig = origin;
560
561   client_send_group (&orig->group, msg);
562   return GNUNET_YES;
563 }
564
565
566 /**
567  * Iterator callback for sending a message to member clients.
568  */
569 static int
570 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
571                        void *member)
572 {
573   const struct GNUNET_MessageHeader *msg = cls;
574   struct Member *mem = member;
575
576   if (NULL != mem->join_dcsn)
577   { /* Only send message to admitted members */
578     client_send_group (&mem->group, msg);
579   }
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Send message to all origin and member clients connected to the group.
586  *
587  * @param pub_key_hash
588  *        H(key_pub) of the group.
589  * @param msg
590  *        Message to send.
591  */
592 static int
593 client_send_all (struct GNUNET_HashCode *pub_key_hash,
594                  const struct GNUNET_MessageHeader *msg)
595 {
596   int n = 0;
597   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
598                                                    client_send_origin_cb,
599                                                    (void *) msg);
600   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
601                                                    client_send_member_cb,
602                                                    (void *) msg);
603   return n;
604 }
605
606
607 /**
608  * Send message to a random origin client or a random member client.
609  *
610  * @param grp  The group to send @a msg to.
611  * @param msg  Message to send.
612  */
613 static int
614 client_send_random (struct GNUNET_HashCode *pub_key_hash,
615                     const struct GNUNET_MessageHeader *msg)
616 {
617   int n = 0;
618   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
619                                                  (void *) msg);
620   if (n <= 0)
621     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
622                                                    (void *) msg);
623   return n;
624 }
625
626
627 /**
628  * Send message to all origin clients connected to the group.
629  *
630  * @param pub_key_hash
631  *        H(key_pub) of the group.
632  * @param msg
633  *        Message to send.
634  */
635 static int
636 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
637                     const struct GNUNET_MessageHeader *msg)
638 {
639   int n = 0;
640   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
641                                                    client_send_origin_cb,
642                                                    (void *) msg);
643   return n;
644 }
645
646
647 /**
648  * Send fragment acknowledgement to all clients of the channel.
649  *
650  * @param pub_key_hash
651  *        H(key_pub) of the group.
652  */
653 static void
654 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
655 {
656   static struct GNUNET_MessageHeader *msg = NULL;
657   if (NULL == msg)
658   {
659     msg = GNUNET_malloc (sizeof (*msg));
660     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
661     msg->size = htons (sizeof (*msg));
662   }
663   client_send_all (pub_key_hash, msg);
664 }
665
666
667 struct CadetTransmitClosure
668 {
669   struct Channel *chn;
670   const struct GNUNET_MessageHeader *msg;
671 };
672
673
674 /**
675  * CADET is ready to transmit a message.
676  */
677 size_t
678 cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
679 {
680   if (0 == buf_size)
681   {
682     /* FIXME: connection closed */
683     return 0;
684   }
685   struct CadetTransmitClosure *tcls = cls;
686   struct Channel *chn = tcls->chn;
687   uint16_t msg_size = ntohs (tcls->msg->size);
688   GNUNET_assert (msg_size <= buf_size);
689   GNUNET_memcpy (buf, tcls->msg, msg_size);
690   GNUNET_free (tcls);
691
692   if (0 == chn->msgs_pending)
693   {
694     GNUNET_break (0);
695   }
696   else if (0 == --chn->msgs_pending)
697   {
698     client_send_ack (&chn->group_pub_hash);
699   }
700   return msg_size;
701 }
702
703
704 /**
705  * Send a message to a CADET channel.
706  *
707  * @param chn  Channel.
708  * @param msg  Message.
709  */
710 static void
711 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
712 {
713   struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
714   tcls->chn = chn;
715   tcls->msg = msg;
716
717   chn->msgs_pending++;
718   chn->tmit_handle
719     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
720                                           GNUNET_TIME_UNIT_FOREVER_REL,
721                                           ntohs (msg->size),
722                                           &cadet_notify_transmit_ready,
723                                           tcls);
724   GNUNET_assert (NULL != chn->tmit_handle);
725 }
726
727
728 /**
729  * Create new outgoing CADET channel.
730  *
731  * @param peer
732  *        Peer to connect to.
733  * @param group_pub_key
734  *        Public key of group the channel belongs to.
735  * @param group_pub_hash
736  *        Hash of @a group_pub_key.
737  *
738  * @return Channel.
739  */
740 static struct Channel *
741 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
742 {
743   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
744   chn->group = grp;
745   chn->group_pub_key = grp->pub_key;
746   chn->group_pub_hash = grp->pub_key_hash;
747   chn->peer = *peer;
748   chn->direction = DIR_OUTGOING;
749   chn->join_status = JOIN_WAITING;
750   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
751                                               &grp->cadet_port_hash,
752                                               GNUNET_CADET_OPTION_RELIABLE);
753   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
754                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
755   return chn;
756 }
757
758
759 /**
760  * Create CADET channel and send a join request.
761  */
762 static void
763 cadet_send_join_request (struct Member *mem)
764 {
765   mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin);
766   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
767
768   uint32_t i;
769   for (i = 0; i < mem->relay_count; i++)
770   {
771     struct Channel *
772       chn = cadet_channel_create (&mem->group, &mem->relays[i]);
773     cadet_send_channel (chn, &mem->join_req->header);
774   }
775 }
776
777
778 static int
779 cadet_send_join_decision_cb (void *cls,
780                              const struct GNUNET_HashCode *group_pub_hash,
781                              void *channel)
782 {
783   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
784   struct Channel *chn = channel;
785
786   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
787       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
788   {
789     cadet_send_channel (chn, &hdcsn->header);
790     return GNUNET_NO;
791   }
792   return GNUNET_YES;
793 }
794
795
796 /**
797  * Send join decision to a remote peer.
798  */
799 static void
800 cadet_send_join_decision (struct Group *grp,
801                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
802 {
803   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
804                                               &cadet_send_join_decision_cb,
805                                               (void *) hdcsn);
806 }
807
808
809 /**
810  * Iterator callback for sending a message to origin clients.
811  */
812 static int
813 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
814                void *channel)
815 {
816   const struct GNUNET_MessageHeader *msg = cls;
817   struct Channel *chn = channel;
818   if (JOIN_ADMITTED == chn->join_status)
819     cadet_send_channel (chn, msg);
820   return GNUNET_YES;
821 }
822
823
824 /**
825  * Send message to all connected children.
826  */
827 static int
828 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
829                      const struct GNUNET_MessageHeader *msg)
830 {
831   int n = 0;
832   if (channels_in != NULL)
833     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
834                                                      cadet_send_cb, (void *) msg);
835   return n;
836 }
837
838
839 #if 0       // unused as yet
840 /**
841  * Send message to all connected parents.
842  */
843 static int
844 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
845                     const struct GNUNET_MessageHeader *msg)
846 {
847   int n = 0;
848   if (channels_in != NULL)
849     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
850                                                      cadet_send_cb, (void *) msg);
851   return n;
852 }
853 #endif
854
855
856 /**
857  * New incoming CADET channel.
858  */
859 static void *
860 cadet_notify_channel_new (void *cls,
861                           struct GNUNET_CADET_Channel *channel,
862                           const struct GNUNET_PeerIdentity *initiator,
863                           const struct GNUNET_HashCode *port,
864                           enum GNUNET_CADET_ChannelOption options)
865 {
866   return NULL;
867 }
868
869
870 /**
871  * CADET channel is being destroyed.
872  */
873 static void
874 cadet_notify_channel_end (void *cls,
875                           const struct GNUNET_CADET_Channel *channel,
876                           void *ctx)
877 {
878   if (NULL == ctx)
879     return;
880
881   struct Channel *chn = ctx;
882   if (NULL != chn->group)
883   {
884     if (GNUNET_NO == chn->group->is_origin)
885     {
886       struct Member *mem = (struct Member *) chn->group;
887       if (chn == mem->origin_channel)
888         mem->origin_channel = NULL;
889     }
890   }
891
892   while (GNUNET_YES == replay_req_remove_cadet (chn));
893
894   GNUNET_free (chn);
895 }
896
897
898 static void
899 group_set_cadet_port_hash (struct Group *grp)
900 {
901   struct CadetPort {
902     struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
903     uint32_t app_type;
904   } port = {
905     grp->pub_key,
906     GNUNET_APPLICATION_TYPE_MULTICAST,
907   };
908   GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
909 }
910
911
912 /**
913  * Handle a connecting client starting an origin.
914  */
915 static void
916 handle_client_origin_start (void *cls,
917                             const struct MulticastOriginStartMessage *msg)
918 {
919   struct Client *c = cls;
920   struct GNUNET_SERVICE_Client *client = c->client;
921
922   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
923   struct GNUNET_HashCode pub_key_hash;
924
925   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
926   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
927
928   struct Origin *
929     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
930   struct Group *grp;
931
932   if (NULL == orig)
933   {
934     orig = GNUNET_new (struct Origin);
935     orig->priv_key = msg->group_key;
936     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
937
938     grp = c->group = &orig->group;
939     grp->origin = orig;
940     grp->is_origin = GNUNET_YES;
941     grp->pub_key = pub_key;
942     grp->pub_key_hash = pub_key_hash;
943
944     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
945                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
946
947     group_set_cadet_port_hash (grp);
948     orig->cadet_port = GNUNET_CADET_open_port (cadet, &grp->cadet_port_hash,
949                                                cadet_notify_channel_new, NULL);
950   }
951   else
952   {
953     grp = &orig->group;
954   }
955
956   struct ClientList *cl = GNUNET_new (struct ClientList);
957   cl->client = client;
958   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
959
960   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961               "%p Client connected as origin to group %s.\n",
962               orig, GNUNET_h2s (&grp->pub_key_hash));
963   GNUNET_SERVICE_client_continue (client);
964 }
965
966
967 static int
968 check_client_member_join (void *cls,
969                           const struct MulticastMemberJoinMessage *msg)
970 {
971   uint16_t msg_size = ntohs (msg->header.size);
972   struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
973   uint32_t relay_count = ntohl (msg->relay_count);
974   uint16_t relay_size = relay_count * sizeof (*relays);
975   struct GNUNET_MessageHeader *join_msg = NULL;
976   uint16_t join_msg_size = 0;
977   if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
978       <= msg_size)
979   {
980     join_msg = (struct GNUNET_MessageHeader *)
981       (((char *) &msg[1]) + relay_size);
982     join_msg_size = ntohs (join_msg->size);
983   }
984   return
985     msg_size == (sizeof (*msg) + relay_size + join_msg_size)
986     ? GNUNET_OK
987     : GNUNET_SYSERR;
988 }
989
990
991 /**
992  * Handle a connecting client joining a group.
993  */
994 static void
995 handle_client_member_join (void *cls,
996                            const struct MulticastMemberJoinMessage *msg)
997 {
998   struct Client *c = cls;
999   struct GNUNET_SERVICE_Client *client = c->client;
1000
1001   uint16_t msg_size = ntohs (msg->header.size);
1002
1003   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
1004   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
1005
1006   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
1007   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
1008   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
1009
1010   struct GNUNET_CONTAINER_MultiHashMap *
1011     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
1012   struct Member *mem = NULL;
1013   struct Group *grp;
1014
1015   if (NULL != grp_mem)
1016   {
1017     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
1018   }
1019   if (NULL == mem)
1020   {
1021     mem = GNUNET_new (struct Member);
1022     mem->origin = msg->origin;
1023     mem->priv_key = msg->member_key;
1024     mem->pub_key = mem_pub_key;
1025     mem->pub_key_hash = mem_pub_key_hash;
1026     mem->max_fragment_id = 0; // FIXME
1027
1028     grp = c->group = &mem->group;
1029     grp->member = mem;
1030     grp->is_origin = GNUNET_NO;
1031     grp->pub_key = msg->group_pub_key;
1032     grp->pub_key_hash = pub_key_hash;
1033     group_set_cadet_port_hash (grp);
1034
1035     if (NULL == grp_mem)
1036     {
1037       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1038       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
1039                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1040     }
1041     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
1042                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1043     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1044                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1045   }
1046   else
1047   {
1048     grp = &mem->group;
1049   }
1050
1051   struct ClientList *cl = GNUNET_new (struct ClientList);
1052   cl->client = client;
1053   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1054
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "%p Client connected to group %s..\n",
1057               mem, GNUNET_h2s (&grp->pub_key_hash));
1058   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060               "%p ..as member %s (%s).\n",
1061               mem, GNUNET_h2s (&mem->pub_key_hash), str);
1062   GNUNET_free (str);
1063
1064   if (NULL != mem->join_dcsn)
1065   { /* Already got a join decision, send it to client. */
1066     struct GNUNET_MQ_Envelope *
1067       env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header);
1068
1069     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
1070                     env);
1071   }
1072   else
1073   { /* First client of the group, send join request. */
1074     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1075     uint32_t relay_count = ntohl (msg->relay_count);
1076     uint16_t relay_size = relay_count * sizeof (*relays);
1077     struct GNUNET_MessageHeader *join_msg = NULL;
1078     uint16_t join_msg_size = 0;
1079     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1080         <= msg_size)
1081     {
1082       join_msg = (struct GNUNET_MessageHeader *)
1083         (((char *) &msg[1]) + relay_size);
1084       join_msg_size = ntohs (join_msg->size);
1085     }
1086
1087     uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size;
1088     struct MulticastJoinRequestMessage *
1089       req = GNUNET_malloc (req_msg_size);
1090     req->header.size = htons (req_msg_size);
1091     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1092     req->group_pub_key = grp->pub_key;
1093     req->peer = this_peer;
1094     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1095     if (0 < join_msg_size)
1096       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1097
1098     req->member_pub_key = mem->pub_key;
1099     req->purpose.size = htonl (req_msg_size
1100                                - sizeof (req->header)
1101                                - sizeof (req->reserved)
1102                                - sizeof (req->signature));
1103     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1104
1105     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1106                                                &req->signature))
1107     {
1108       /* FIXME: handle error */
1109       GNUNET_assert (0);
1110     }
1111
1112     if (NULL != mem->join_req)
1113       GNUNET_free (mem->join_req);
1114     mem->join_req = req;
1115
1116     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1117     { /* No local origins, send to remote origin */
1118       cadet_send_join_request (mem);
1119     }
1120   }
1121   GNUNET_SERVICE_client_continue (client);
1122 }
1123
1124
1125 static void
1126 client_send_join_decision (struct Member *mem,
1127                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1128 {
1129   client_send_group (&mem->group, &hdcsn->header);
1130
1131   const struct MulticastJoinDecisionMessage *
1132     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1133   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1134   { /* Member admitted, store join_decision. */
1135     uint16_t dcsn_size = ntohs (dcsn->header.size);
1136     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1137     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1138   }
1139   else
1140   { /* Refused entry, but replay would be still possible for past members. */
1141   }
1142 }
1143
1144
1145 static int
1146 check_client_join_decision (void *cls,
1147                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1148 {
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Join decision from client.
1155  */
1156 static void
1157 handle_client_join_decision (void *cls,
1158                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
1159 {
1160   struct Client *c = cls;
1161   struct GNUNET_SERVICE_Client *client = c->client;
1162   struct Group *grp = c->group;
1163
1164   if (NULL == grp)
1165   {
1166     GNUNET_break (0);
1167     GNUNET_SERVICE_client_drop (client);
1168     return;
1169   }
1170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1171               "%p Got join decision from client for group %s..\n",
1172               grp, GNUNET_h2s (&grp->pub_key_hash));
1173
1174   struct GNUNET_CONTAINER_MultiHashMap *
1175     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1176                                                  &grp->pub_key_hash);
1177   struct Member *mem = NULL;
1178   if (NULL != grp_mem)
1179   {
1180     struct GNUNET_HashCode member_key_hash;
1181     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1182                         &member_key_hash);
1183     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1184     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185                 "%p ..and member %s: %p\n",
1186                 grp, GNUNET_h2s (&member_key_hash), mem);
1187   }
1188   if (NULL != mem)
1189   { /* Found local member */
1190     client_send_join_decision (mem, hdcsn);
1191   }
1192   else
1193   { /* Look for remote member */
1194     cadet_send_join_decision (grp, hdcsn);
1195   }
1196   GNUNET_SERVICE_client_continue (client);
1197 }
1198
1199
1200 static int
1201 check_client_multicast_message (void *cls,
1202                                 const struct GNUNET_MULTICAST_MessageHeader *msg)
1203 {
1204   return GNUNET_OK;
1205 }
1206
1207
1208 /**
1209  * Incoming message from a client.
1210  */
1211 static void
1212 handle_client_multicast_message (void *cls,
1213                                  const struct GNUNET_MULTICAST_MessageHeader *msg)
1214 {
1215   struct Client *c = cls;
1216   struct GNUNET_SERVICE_Client *client = c->client;
1217   struct Group *grp = c->group;
1218
1219   if (NULL == grp)
1220   {
1221     GNUNET_break (0);
1222     GNUNET_SERVICE_client_drop (client);
1223     return;
1224   }
1225   GNUNET_assert (GNUNET_YES == grp->is_origin);
1226   struct Origin *orig = grp->origin;
1227
1228   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1229   struct GNUNET_MULTICAST_MessageHeader *
1230     out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
1231   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1232   out->purpose.size = htonl (ntohs (out->header.size)
1233                              - sizeof (out->header)
1234                              - sizeof (out->hop_counter)
1235                              - sizeof (out->signature));
1236   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1237
1238   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1239                                              &out->signature))
1240   {
1241     GNUNET_assert (0);
1242   }
1243
1244   client_send_all (&grp->pub_key_hash, &out->header);
1245   if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
1246   {
1247     client_send_ack (&grp->pub_key_hash);
1248   }
1249   GNUNET_free (out);
1250
1251   GNUNET_SERVICE_client_continue (client);
1252 }
1253
1254
1255 static int
1256 check_client_multicast_request (void *cls,
1257                                 const struct GNUNET_MULTICAST_RequestHeader *req)
1258 {
1259   return GNUNET_OK;
1260 }
1261
1262
1263 /**
1264  * Incoming request from a client.
1265  */
1266 static void
1267 handle_client_multicast_request (void *cls,
1268                                  const struct GNUNET_MULTICAST_RequestHeader *req)
1269 {
1270   struct Client *c = cls;
1271   struct GNUNET_SERVICE_Client *client = c->client;
1272   struct Group *grp = c->group;
1273
1274   if (NULL == grp)
1275   {
1276     GNUNET_break (0);
1277     GNUNET_SERVICE_client_drop (client);
1278     return;
1279   }
1280   GNUNET_assert (GNUNET_NO == grp->is_origin);
1281   struct Member *mem = grp->member;
1282
1283   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1284   struct GNUNET_MULTICAST_RequestHeader *
1285     out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header);
1286   out->member_pub_key = mem->pub_key;
1287   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1288   out->purpose.size = htonl (ntohs (out->header.size)
1289                              - sizeof (out->header)
1290                              - sizeof (out->member_pub_key)
1291                              - sizeof (out->signature));
1292   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1293
1294   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1295                                              &out->signature))
1296   {
1297     GNUNET_assert (0);
1298   }
1299
1300   uint8_t send_ack = GNUNET_YES;
1301   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1302   { /* No local origins, send to remote origin */
1303     if (NULL != mem->origin_channel)
1304     {
1305       cadet_send_channel (mem->origin_channel, &out->header);
1306       send_ack = GNUNET_NO;
1307     }
1308     else
1309     {
1310       /* FIXME: not yet connected to origin */
1311       GNUNET_SERVICE_client_drop (client);
1312       GNUNET_free (out);
1313       return;
1314     }
1315   }
1316   if (GNUNET_YES == send_ack)
1317   {
1318     client_send_ack (&grp->pub_key_hash);
1319   }
1320   GNUNET_free (out);
1321   GNUNET_SERVICE_client_continue (client);
1322 }
1323
1324
1325 /**
1326  * Incoming replay request from a client.
1327  */
1328 static void
1329 handle_client_replay_request (void *cls,
1330                               const struct MulticastReplayRequestMessage *rep)
1331 {
1332   struct Client *c = cls;
1333   struct GNUNET_SERVICE_Client *client = c->client;
1334   struct Group *grp = c->group;
1335
1336   if (NULL == grp)
1337   {
1338     GNUNET_break (0);
1339     GNUNET_SERVICE_client_drop (client);
1340     return;
1341   }
1342   GNUNET_assert (GNUNET_NO == grp->is_origin);
1343   struct Member *mem = grp->member;
1344
1345   struct GNUNET_CONTAINER_MultiHashMap *
1346     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1347                                                         &grp->pub_key_hash);
1348   if (NULL == grp_replay_req)
1349   {
1350     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1351     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1352                                        &grp->pub_key_hash, grp_replay_req,
1353                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1354   }
1355
1356   struct GNUNET_HashCode key_hash;
1357   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1358                    rep->flags, &key_hash);
1359   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1360                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1361
1362   if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
1363   { /* No local origin, replay from remote members / origin. */
1364     if (NULL != mem->origin_channel)
1365     {
1366       cadet_send_channel (mem->origin_channel, &rep->header);
1367     }
1368     else
1369     {
1370       /* FIXME: not yet connected to origin */
1371       GNUNET_SERVICE_client_drop (client);
1372       return;
1373     }
1374   }
1375   GNUNET_SERVICE_client_continue (client);
1376 }
1377
1378
1379 static int
1380 cadet_send_replay_response_cb (void *cls,
1381                                const struct GNUNET_HashCode *key_hash,
1382                                void *value)
1383 {
1384   struct Channel *chn = value;
1385   struct GNUNET_MessageHeader *msg = cls;
1386
1387   cadet_send_channel (chn, msg);
1388   return GNUNET_OK;
1389 }
1390
1391
1392 static int
1393 client_send_replay_response_cb (void *cls,
1394                                 const struct GNUNET_HashCode *key_hash,
1395                                 void *value)
1396 {
1397   struct GNUNET_SERVICE_Client *client = value;
1398   struct GNUNET_MessageHeader *msg = cls;
1399
1400   client_send (client, msg);
1401   return GNUNET_OK;
1402 }
1403
1404
1405 static int
1406 check_client_replay_response_end (void *cls,
1407                                   const struct MulticastReplayResponseMessage *res)
1408 {
1409   return GNUNET_OK;
1410 }
1411
1412
1413 /**
1414  * End of replay response from a client.
1415  */
1416 static void
1417 handle_client_replay_response_end (void *cls,
1418                                    const struct MulticastReplayResponseMessage *res)
1419 {
1420   struct Client *c = cls;
1421   struct GNUNET_SERVICE_Client *client = c->client;
1422   struct Group *grp = c->group;
1423
1424   if (NULL == grp)
1425   {
1426     GNUNET_break (0);
1427     GNUNET_SERVICE_client_drop (client);
1428     return;
1429   }
1430
1431   struct GNUNET_HashCode key_hash;
1432   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1433                    res->flags, &key_hash);
1434
1435   struct GNUNET_CONTAINER_MultiHashMap *
1436     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1437                                                               &grp->pub_key_hash);
1438   if (NULL != grp_replay_req_cadet)
1439   {
1440     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1441   }
1442   struct GNUNET_CONTAINER_MultiHashMap *
1443     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1444                                                                &grp->pub_key_hash);
1445   if (NULL != grp_replay_req_client)
1446   {
1447     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1448   }
1449   GNUNET_SERVICE_client_continue (client);
1450 }
1451
1452
1453 static int
1454 check_client_replay_response (void *cls,
1455                               const struct MulticastReplayResponseMessage *res)
1456 {
1457   const struct GNUNET_MessageHeader *msg = &res->header;
1458   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1459   {
1460     msg = GNUNET_MQ_extract_nested_mh (res);
1461     if (NULL == msg)
1462     {
1463       return GNUNET_SYSERR;
1464     }
1465   }
1466   return GNUNET_OK;
1467 }
1468
1469
1470 /**
1471  * Incoming replay response from a client.
1472  *
1473  * Respond with a multicast message on success, or otherwise with an error code.
1474  */
1475 static void
1476 handle_client_replay_response (void *cls,
1477                                const struct MulticastReplayResponseMessage *res)
1478 {
1479   struct Client *c = cls;
1480   struct GNUNET_SERVICE_Client *client = c->client;
1481   struct Group *grp = c->group;
1482
1483   if (NULL == grp)
1484   {
1485     GNUNET_break (0);
1486     GNUNET_SERVICE_client_drop (client);
1487     return;
1488   }
1489
1490   const struct GNUNET_MessageHeader *msg = &res->header;
1491   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1492   {
1493     msg = GNUNET_MQ_extract_nested_mh (res);
1494   }
1495
1496   struct GNUNET_HashCode key_hash;
1497   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1498                    res->flags, &key_hash);
1499
1500   struct GNUNET_CONTAINER_MultiHashMap *
1501     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1502                                                               &grp->pub_key_hash);
1503   if (NULL != grp_replay_req_cadet)
1504   {
1505     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1506                                                 cadet_send_replay_response_cb,
1507                                                 (void *) msg);
1508   }
1509   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1510   {
1511     struct GNUNET_CONTAINER_MultiHashMap *
1512       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1513                                                                  &grp->pub_key_hash);
1514     if (NULL != grp_replay_req_client)
1515     {
1516       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1517                                                   client_send_replay_response_cb,
1518                                                   (void *) msg);
1519     }
1520   }
1521   else
1522   {
1523     handle_client_replay_response_end (c, res);
1524     return;
1525   }
1526   GNUNET_SERVICE_client_continue (client);
1527 }
1528
1529
1530 /**
1531  * Incoming join request message from CADET.
1532  */
1533 int
1534 cadet_recv_join_request (void *cls,
1535                          struct GNUNET_CADET_Channel *channel,
1536                          void **ctx,
1537                          const struct GNUNET_MessageHeader *m)
1538 {
1539   const struct MulticastJoinRequestMessage *
1540     req = (const struct MulticastJoinRequestMessage *) m;
1541   uint16_t size = ntohs (m->size);
1542   if (size < sizeof (*req))
1543   {
1544     GNUNET_break_op (0);
1545     return GNUNET_SYSERR;
1546   }
1547   if (NULL != *ctx)
1548   {
1549     GNUNET_break_op (0);
1550     return GNUNET_SYSERR;
1551   }
1552   if (ntohl (req->purpose.size) != (size
1553                                     - sizeof (req->header)
1554                                     - sizeof (req->reserved)
1555                                     - sizeof (req->signature)))
1556   {
1557     GNUNET_break_op (0);
1558     return GNUNET_SYSERR;
1559   }
1560   if (GNUNET_OK !=
1561       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1562                                   &req->purpose, &req->signature,
1563                                   &req->member_pub_key))
1564   {
1565     GNUNET_break_op (0);
1566     return GNUNET_SYSERR;
1567   }
1568
1569   struct GNUNET_HashCode group_pub_hash;
1570   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
1571
1572   struct Channel *chn = GNUNET_malloc (sizeof *chn);
1573   chn->channel = channel;
1574   chn->group_pub_key = req->group_pub_key;
1575   chn->group_pub_hash = group_pub_hash;
1576   chn->member_pub_key = req->member_pub_key;
1577   chn->peer = req->peer;
1578   chn->join_status = JOIN_WAITING;
1579   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
1580                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1581
1582   client_send_all (&group_pub_hash, m);
1583   return GNUNET_OK;
1584 }
1585
1586
1587 /**
1588  * Incoming join decision message from CADET.
1589  */
1590 int
1591 cadet_recv_join_decision (void *cls,
1592                           struct GNUNET_CADET_Channel *channel,
1593                           void **ctx,
1594                           const struct GNUNET_MessageHeader *m)
1595 {
1596   const struct MulticastJoinDecisionMessage *
1597     dcsn = (const struct MulticastJoinDecisionMessage *) m;
1598   uint16_t size = ntohs (m->size);
1599   if (size < sizeof (*dcsn))
1600   {
1601     GNUNET_break_op (0);
1602     return GNUNET_SYSERR;
1603   }
1604   struct Channel *chn = *ctx;
1605   if (NULL == chn)
1606   {
1607     GNUNET_break_op (0);
1608     return GNUNET_SYSERR;
1609   }
1610   if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
1611   {
1612     GNUNET_break_op (0);
1613     return GNUNET_SYSERR;
1614   }
1615   switch (chn->join_status)
1616   {
1617   case JOIN_REFUSED:
1618     return GNUNET_SYSERR;
1619
1620   case JOIN_ADMITTED:
1621     return GNUNET_OK;
1622
1623   case JOIN_NOT_ASKED:
1624   case JOIN_WAITING:
1625     break;
1626   }
1627
1628   struct MulticastJoinDecisionMessageHeader *
1629     hdcsn = GNUNET_malloc (size);
1630   GNUNET_memcpy (hdcsn, dcsn, size);
1631   hdcsn->peer = chn->peer;
1632
1633   struct Member *mem = (struct Member *) chn->group;
1634   client_send_join_decision (mem, hdcsn);
1635   GNUNET_free (hdcsn);
1636   if (GNUNET_YES == ntohs (dcsn->is_admitted))
1637   {
1638     chn->join_status = JOIN_ADMITTED;
1639     return GNUNET_OK;
1640   }
1641   else
1642   {
1643     chn->join_status = JOIN_REFUSED;
1644     return GNUNET_SYSERR;
1645   }
1646 }
1647
1648 /**
1649  * Incoming multicast message from CADET.
1650  */
1651 int
1652 cadet_recv_message (void *cls,
1653                     struct GNUNET_CADET_Channel *channel,
1654                     void **ctx,
1655                     const struct GNUNET_MessageHeader *m)
1656 {
1657   const struct GNUNET_MULTICAST_MessageHeader *
1658     msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
1659   uint16_t size = ntohs (m->size);
1660   if (size < sizeof (*msg))
1661   {
1662     GNUNET_break_op (0);
1663     return GNUNET_SYSERR;
1664   }
1665   struct Channel *chn = *ctx;
1666   if (NULL == chn)
1667   {
1668     GNUNET_break_op (0);
1669     return GNUNET_SYSERR;
1670   }
1671   if (ntohl (msg->purpose.size) != (size
1672                                     - sizeof (msg->header)
1673                                     - sizeof (msg->hop_counter)
1674                                     - sizeof (msg->signature)))
1675   {
1676     GNUNET_break_op (0);
1677     return GNUNET_SYSERR;
1678   }
1679   if (GNUNET_OK !=
1680       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1681                                   &msg->purpose, &msg->signature,
1682                                   &chn->group_pub_key))
1683   {
1684     GNUNET_break_op (0);
1685     return GNUNET_SYSERR;
1686   }
1687
1688   client_send_all (&chn->group_pub_hash, m);
1689   return GNUNET_OK;
1690 }
1691
1692
1693 /**
1694  * Incoming multicast request message from CADET.
1695  */
1696 int
1697 cadet_recv_request (void *cls,
1698                     struct GNUNET_CADET_Channel *channel,
1699                     void **ctx,
1700                     const struct GNUNET_MessageHeader *m)
1701 {
1702   const struct GNUNET_MULTICAST_RequestHeader *
1703     req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
1704   uint16_t size = ntohs (m->size);
1705   if (size < sizeof (*req))
1706   {
1707     GNUNET_break_op (0);
1708     return GNUNET_SYSERR;
1709   }
1710   struct Channel *chn = *ctx;
1711   if (NULL == chn)
1712   {
1713     GNUNET_break_op (0);
1714     return GNUNET_SYSERR;
1715   }
1716   if (ntohl (req->purpose.size) != (size
1717                                     - sizeof (req->header)
1718                                     - sizeof (req->member_pub_key)
1719                                     - sizeof (req->signature)))
1720   {
1721     GNUNET_break_op (0);
1722     return GNUNET_SYSERR;
1723   }
1724   if (GNUNET_OK !=
1725       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1726                                   &req->purpose, &req->signature,
1727                                   &req->member_pub_key))
1728   {
1729     GNUNET_break_op (0);
1730     return GNUNET_SYSERR;
1731   }
1732
1733   client_send_origin (&chn->group_pub_hash, m);
1734   return GNUNET_OK;
1735 }
1736
1737
1738 /**
1739  * Incoming multicast replay request from CADET.
1740  */
1741 int
1742 cadet_recv_replay_request (void *cls,
1743                            struct GNUNET_CADET_Channel *channel,
1744                            void **ctx,
1745                            const struct GNUNET_MessageHeader *m)
1746 {
1747   struct MulticastReplayRequestMessage rep;
1748   uint16_t size = ntohs (m->size);
1749   if (size < sizeof (rep))
1750   {
1751     GNUNET_break_op (0);
1752     return GNUNET_SYSERR;
1753   }
1754   struct Channel *chn = *ctx;
1755
1756   GNUNET_memcpy (&rep, m, sizeof (rep));
1757   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1758
1759   struct GNUNET_CONTAINER_MultiHashMap *
1760     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1761                                                         &chn->group->pub_key_hash);
1762   if (NULL == grp_replay_req)
1763   {
1764     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1765     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1766                                        &chn->group->pub_key_hash, grp_replay_req,
1767                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1768   }
1769   struct GNUNET_HashCode key_hash;
1770   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1771                    rep.flags, &key_hash);
1772   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1773                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1774
1775   client_send_random (&chn->group_pub_hash, &rep.header);
1776   return GNUNET_OK;
1777 }
1778
1779
1780 /**
1781  * Incoming multicast replay response from CADET.
1782  */
1783 int
1784 cadet_recv_replay_response (void *cls,
1785                             struct GNUNET_CADET_Channel *channel,
1786                             void **ctx,
1787                             const struct GNUNET_MessageHeader *m)
1788 {
1789   //struct Channel *chn = *ctx;
1790
1791   /* @todo FIXME: got replay error response, send request to other members */
1792
1793   return GNUNET_OK;
1794 }
1795
1796
1797 /**
1798  * A new client connected.
1799  *
1800  * @param cls NULL
1801  * @param client client to add
1802  * @param mq message queue for @a client
1803  * @return @a client
1804  */
1805 static void *
1806 client_notify_connect (void *cls,
1807                        struct GNUNET_SERVICE_Client *client,
1808                        struct GNUNET_MQ_Handle *mq)
1809 {
1810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1811   /* FIXME: send connect ACK */
1812
1813   struct Client *c = GNUNET_new (struct Client);
1814   c->client = client;
1815
1816   return c;
1817 }
1818
1819
1820 /**
1821  * Called whenever a client is disconnected.
1822  * Frees our resources associated with that client.
1823  *
1824  * @param cls closure
1825  * @param client identification of the client
1826  * @param app_ctx must match @a client
1827  */
1828 static void
1829 client_notify_disconnect (void *cls,
1830                           struct GNUNET_SERVICE_Client *client,
1831                           void *app_ctx)
1832 {
1833   struct Client *c = app_ctx;
1834   struct Group *grp = c->group;
1835   GNUNET_free (c);
1836
1837   if (NULL == grp)
1838   {
1839     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1840                 "%p User context is NULL in client_disconnect()\n", grp);
1841     GNUNET_break (0);
1842     return;
1843   }
1844
1845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1846               "%p Client (%s) disconnected from group %s\n",
1847               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
1848               GNUNET_h2s (&grp->pub_key_hash));
1849
1850   struct ClientList *cl = grp->clients_head;
1851   while (NULL != cl)
1852   {
1853     if (cl->client == client)
1854     {
1855       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
1856       GNUNET_free (cl);
1857       break;
1858     }
1859     cl = cl->next;
1860   }
1861
1862   while (GNUNET_YES == replay_req_remove_client (grp, client));
1863
1864   if (NULL == grp->clients_head)
1865   { /* Last client disconnected. */
1866 #if FIXME
1867     if (NULL != grp->tmit_head)
1868     { /* Send pending messages via CADET before cleanup. */
1869       transmit_message (grp);
1870     }
1871     else
1872 #endif
1873     {
1874       cleanup_group (grp);
1875     }
1876   }
1877 }
1878
1879
1880 /**
1881  * Message handlers for CADET.
1882  */
1883 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1884   { cadet_recv_join_request,
1885     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
1886
1887   { cadet_recv_join_decision,
1888     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
1889
1890   { cadet_recv_message,
1891     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
1892
1893   { cadet_recv_request,
1894     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
1895
1896   { cadet_recv_replay_request,
1897     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
1898
1899   { cadet_recv_replay_response,
1900     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
1901
1902   { NULL, 0, 0 }
1903 };
1904
1905
1906 /**
1907  * Service started.
1908  *
1909  * @param cls closure
1910  * @param server the initialized server
1911  * @param cfg configuration to use
1912  */
1913 static void
1914 run (void *cls,
1915      const struct GNUNET_CONFIGURATION_Handle *c,
1916      struct GNUNET_SERVICE_Handle *svc)
1917 {
1918   cfg = c;
1919   service = svc;
1920   GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
1921
1922   stats = GNUNET_STATISTICS_create ("multicast", cfg);
1923   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1924   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1925   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1926   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1927   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1928   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1929   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1930
1931   cadet = GNUNET_CADET_connect (cfg, NULL,
1932                                 cadet_notify_channel_end,
1933                                 cadet_handlers);
1934   GNUNET_assert (NULL != cadet);
1935
1936   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1937                                  NULL);
1938 }
1939
1940
1941 /**
1942  * Define "main" method using service macro.
1943  */
1944 GNUNET_SERVICE_MAIN
1945 ("multicast",
1946  GNUNET_SERVICE_OPTION_NONE,
1947  run,
1948  client_notify_connect,
1949  client_notify_disconnect,
1950  NULL,
1951  GNUNET_MQ_hd_fixed_size (client_origin_start,
1952                           GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
1953                           struct MulticastOriginStartMessage,
1954                           NULL),
1955  GNUNET_MQ_hd_var_size (client_member_join,
1956                         GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN,
1957                         struct MulticastMemberJoinMessage,
1958                         NULL),
1959  GNUNET_MQ_hd_var_size (client_join_decision,
1960                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1961                         struct MulticastJoinDecisionMessageHeader,
1962                         NULL),
1963  GNUNET_MQ_hd_var_size (client_multicast_message,
1964                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1965                         struct GNUNET_MULTICAST_MessageHeader,
1966                         NULL),
1967  GNUNET_MQ_hd_var_size (client_multicast_request,
1968                         GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
1969                         struct GNUNET_MULTICAST_RequestHeader,
1970                         NULL),
1971  GNUNET_MQ_hd_fixed_size (client_replay_request,
1972                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1973                           struct MulticastReplayRequestMessage,
1974                           NULL),
1975  GNUNET_MQ_hd_var_size (client_replay_response,
1976                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1977                         struct MulticastReplayResponseMessage,
1978                         NULL),
1979  GNUNET_MQ_hd_var_size (client_replay_response_end,
1980                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END,
1981                         struct MulticastReplayResponseMessage,
1982                         NULL));
1983
1984 /* end of gnunet-service-multicast.c */