guix-env: some update.
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "multicast.h"
34
35 /**
36  * Handle to our current configuration.
37  */
38 static const struct GNUNET_CONFIGURATION_Handle *cfg;
39
40 /**
41  * Service handle.
42  */
43 static struct GNUNET_SERVICE_Handle *service;
44
45 /**
46  * CADET handle.
47  */
48 static struct GNUNET_CADET_Handle *cadet;
49
50 /**
51  * Identity of this peer.
52  */
53 static struct GNUNET_PeerIdentity this_peer;
54
55 /**
56  * Handle to the statistics service.
57  */
58 static struct GNUNET_STATISTICS_Handle *stats;
59
60 /**
61  * All connected origin clients.
62  * Group's pub_key_hash -> struct Origin * (uniq)
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *origins;
65
66 /**
67  * All connected member clients.
68  * Group's pub_key_hash -> struct Member * (multi)
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *members;
71
72 /**
73  * Connected member clients per group.
74  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
77
78 /**
79  * Incoming CADET channels with connected children in the tree.
80  * Group's pub_key_hash -> struct Channel * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
83
84 /**
85  * Outgoing CADET channels connecting to parents in the tree.
86  * Group's pub_key_hash -> struct Channel * (multi)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
89
90 /**
91  * Incoming replay requests from CADET.
92  * Group's pub_key_hash ->
93  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
94  */
95 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
96
97 /**
98  * Incoming replay requests from clients.
99  * Group's pub_key_hash ->
100  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client *
101  */
102 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
103
104
105 /**
106  * Join status of a remote peer.
107  */
108 enum JoinStatus
109 {
110   JOIN_REFUSED  = -1,
111   JOIN_NOT_ASKED = 0,
112   JOIN_WAITING   = 1,
113   JOIN_ADMITTED  = 2,
114 };
115
116 enum ChannelDirection
117 {
118   DIR_INCOMING = 0,
119   DIR_OUTGOING = 1,
120 };
121
122
123 /**
124  * Context for a CADET channel.
125  */
126 struct Channel
127 {
128   /**
129    * Group the channel belongs to.
130    *
131    * Only set for outgoing channels.
132    */
133   struct Group *group;
134
135   /**
136    * CADET channel.
137    */
138   struct GNUNET_CADET_Channel *channel;
139
140   /**
141    * CADET transmission handle.
142    */
143   struct GNUNET_CADET_TransmitHandle *tmit_handle;
144
145   /**
146    * Public key of the target group.
147    */
148   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
149
150   /**
151    * Hash of @a group_pub_key.
152    */
153   struct GNUNET_HashCode group_pub_hash;
154
155   /**
156    * Public key of the joining member.
157    */
158   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
159
160   /**
161    * Remote peer identity.
162    */
163   struct GNUNET_PeerIdentity peer;
164
165   /**
166    * Current window size, set by cadet_notify_window_change()
167    */
168   int32_t window_size;
169
170   /**
171    * Is the connection established?
172    */
173   int8_t is_connected;
174
175   /**
176    * Is the remote peer admitted to the group?
177    * @see enum JoinStatus
178    */
179   int8_t join_status;
180
181   /**
182    * Number of messages waiting to be sent to CADET.
183    */
184   uint8_t msgs_pending;
185
186   /**
187    * Channel direction.
188    * @see enum ChannelDirection
189    */
190   uint8_t direction;
191 };
192
193
194 /**
195  * List of connected clients.
196  */
197 struct ClientList
198 {
199   struct ClientList *prev;
200   struct ClientList *next;
201   struct GNUNET_SERVICE_Client *client;
202 };
203
204
205 /**
206  * Client context for an origin or member.
207  */
208 struct Group
209 {
210   struct ClientList *clients_head;
211   struct ClientList *clients_tail;
212
213   /**
214    * Public key of the group.
215    */
216   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
217
218   /**
219    * Hash of @a pub_key.
220    */
221   struct GNUNET_HashCode pub_key_hash;
222
223   /**
224    * CADET port hash.
225    */
226   struct GNUNET_HashCode cadet_port_hash;
227
228   /**
229    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
230    */
231   uint8_t disconnected;
232
233   /**
234    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
235    */
236   uint8_t is_origin;
237
238   union {
239     struct Origin *origin;
240     struct Member *member;
241   };
242 };
243
244
245 /**
246 * Client context for a group's origin.
247  */
248 struct Origin
249 {
250   struct Group group;
251
252   /**
253    * Private key of the group.
254    */
255   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
256
257   /**
258    * CADET port.
259    */
260   struct GNUNET_CADET_Port *cadet_port;
261
262   /**
263    * Last message fragment ID sent to the group.
264    */
265   uint64_t max_fragment_id;
266 };
267
268
269 /**
270  * Client context for a group member.
271  */
272 struct Member
273 {
274   struct Group group;
275
276   /**
277    * Private key of the member.
278    */
279   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
280
281   /**
282    * Public key of the member.
283    */
284   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
285
286   /**
287    * Hash of @a pub_key.
288    */
289   struct GNUNET_HashCode pub_key_hash;
290
291   /**
292    * Join request sent to the origin / members.
293    */
294   struct MulticastJoinRequestMessage *join_req;
295
296   /**
297    * Join decision sent in reply to our request.
298    *
299    * Only a positive decision is stored here, in case of a negative decision the
300    * client is disconnected.
301    */
302   struct MulticastJoinDecisionMessageHeader *join_dcsn;
303
304   /**
305    * CADET channel to the origin.
306    */
307   struct Channel *origin_channel;
308
309   /**
310    * Peer identity of origin.
311    */
312   struct GNUNET_PeerIdentity origin;
313
314   /**
315    * Peer identity of relays (other members to connect).
316    */
317   struct GNUNET_PeerIdentity *relays;
318
319   /**
320    * Last request fragment ID sent to the origin.
321    */
322   uint64_t max_fragment_id;
323
324   /**
325    * Number of @a relays.
326    */
327   uint32_t relay_count;
328 };
329
330
331 /**
332  * Client context.
333  */
334 struct Client {
335   struct GNUNET_SERVICE_Client *client;
336   struct Group *group;
337 };
338
339
340 struct ReplayRequestKey
341 {
342   uint64_t fragment_id;
343   uint64_t message_id;
344   uint64_t fragment_offset;
345   uint64_t flags;
346 };
347
348
349 static struct Channel *
350 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer);
351
352 static void
353 cadet_channel_destroy (struct Channel *chn);
354
355 static void
356 client_send_join_decision (struct Member *mem,
357                            const struct MulticastJoinDecisionMessageHeader *hdcsn);
358
359
360 /**
361  * Task run during shutdown.
362  *
363  * @param cls unused
364  */
365 static void
366 shutdown_task (void *cls)
367 {
368   if (NULL != cadet)
369   {
370     GNUNET_CADET_disconnect (cadet);
371     cadet = NULL;
372   }
373   if (NULL != stats)
374   {
375     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
376     stats = NULL;
377   }
378   /* FIXME: do more clean up here */
379 }
380
381
382 /**
383  * Clean up origin data structures after a client disconnected.
384  */
385 static void
386 cleanup_origin (struct Origin *orig)
387 {
388   struct Group *grp = &orig->group;
389   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
390   if (NULL != orig->cadet_port)
391   {
392     GNUNET_CADET_close_port (orig->cadet_port);
393     orig->cadet_port = NULL;
394   }
395   GNUNET_free (orig);
396 }
397
398
399 /**
400  * Clean up member data structures after a client disconnected.
401  */
402 static void
403 cleanup_member (struct Member *mem)
404 {
405   struct Group *grp = &mem->group;
406   struct GNUNET_CONTAINER_MultiHashMap *
407     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
408                                                  &grp->pub_key_hash);
409   GNUNET_assert (NULL != grp_mem);
410   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
411
412   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
413   {
414     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
415                                           grp_mem);
416     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
417   }
418   if (NULL != mem->join_dcsn)
419   {
420     GNUNET_free (mem->join_dcsn);
421     mem->join_dcsn = NULL;
422   }
423   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
424   GNUNET_free (mem);
425 }
426
427
428 /**
429  * Clean up group data structures after a client disconnected.
430  */
431 static void
432 cleanup_group (struct Group *grp)
433 {
434   (GNUNET_YES == grp->is_origin)
435     ? cleanup_origin (grp->origin)
436     : cleanup_member (grp->member);
437 }
438
439
440 void
441 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
442                  uint64_t fragment_offset, uint64_t flags,
443                  struct GNUNET_HashCode *key_hash)
444 {
445   struct ReplayRequestKey key = {
446     .fragment_id = fragment_id,
447     .message_id = message_id,
448     .fragment_offset = fragment_offset,
449     .flags = flags,
450   };
451   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
452 }
453
454
455 /**
456  * Remove channel from replay request hashmap.
457  *
458  * @param chn
459  *        Channel to remove.
460  *
461  * @return #GNUNET_YES if there are more entries to process,
462  *         #GNUNET_NO when reached end of hashmap.
463  */
464 static int
465 replay_req_remove_cadet (struct Channel *chn)
466 {
467   if (NULL == chn || NULL == chn->group)
468     return GNUNET_SYSERR;
469
470   struct GNUNET_CONTAINER_MultiHashMap *
471     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
472                                                         &chn->group->pub_key_hash);
473   if (NULL == grp_replay_req)
474     return GNUNET_NO;
475
476   struct GNUNET_CONTAINER_MultiHashMapIterator *
477     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
478   struct GNUNET_HashCode key;
479   const struct Channel *c;
480   while (GNUNET_YES
481          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
482                                                          (const void **) &c))
483   {
484     if (c == chn)
485     {
486       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
487       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
488       return GNUNET_YES;
489     }
490   }
491   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
492   return GNUNET_NO;
493 }
494
495
496 /**
497  * Remove client from replay request hashmap.
498  *
499  * @param client
500  *        Client to remove.
501  *
502  * @return #GNUNET_YES if there are more entries to process,
503  *         #GNUNET_NO when reached end of hashmap.
504  */
505 static int
506 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client)
507 {
508   struct GNUNET_CONTAINER_MultiHashMap *
509     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
510                                                         &grp->pub_key_hash);
511   if (NULL == grp_replay_req)
512     return GNUNET_NO;
513
514   struct GNUNET_CONTAINER_MultiHashMapIterator *
515     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
516   struct GNUNET_HashCode key;
517   const struct GNUNET_SERVICE_Client *c;
518   while (GNUNET_YES
519          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
520                                                          (const void **) &c))
521   {
522     if (c == client)
523     {
524       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, client);
525       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
526       return GNUNET_YES;
527     }
528   }
529   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
530   return GNUNET_NO;
531 }
532
533
534 /**
535  * Send message to a client.
536  */
537 static void
538 client_send (struct GNUNET_SERVICE_Client *client,
539              const struct GNUNET_MessageHeader *msg)
540 {
541   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
542               "%p Sending message to client.\n", client);
543
544   struct GNUNET_MQ_Envelope *
545     env = GNUNET_MQ_msg_copy (msg);
546
547   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
548                   env);
549 }
550
551
552 /**
553  * Send message to all clients connected to the group.
554  */
555 static void
556 client_send_group (const struct Group *grp,
557                    const struct GNUNET_MessageHeader *msg)
558 {
559   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
560               "%p Sending message to all clients of the group.\n", grp);
561
562   struct ClientList *cl = grp->clients_head;
563   while (NULL != cl)
564   {
565     struct GNUNET_MQ_Envelope *
566       env = GNUNET_MQ_msg_copy (msg);
567
568     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
569                     env);
570     cl = cl->next;
571   }
572 }
573
574
575 /**
576  * Iterator callback for sending a message to origin clients.
577  */
578 static int
579 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
580                        void *origin)
581 {
582   const struct GNUNET_MessageHeader *msg = cls;
583   struct Member *orig = origin;
584
585   client_send_group (&orig->group, msg);
586   return GNUNET_YES;
587 }
588
589
590 /**
591  * Iterator callback for sending a message to member clients.
592  */
593 static int
594 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
595                        void *member)
596 {
597   const struct GNUNET_MessageHeader *msg = cls;
598   struct Member *mem = member;
599
600   if (NULL != mem->join_dcsn)
601   { /* Only send message to admitted members */
602     client_send_group (&mem->group, msg);
603   }
604   return GNUNET_YES;
605 }
606
607
608 /**
609  * Send message to all origin and member clients connected to the group.
610  *
611  * @param pub_key_hash
612  *        H(key_pub) of the group.
613  * @param msg
614  *        Message to send.
615  */
616 static int
617 client_send_all (struct GNUNET_HashCode *pub_key_hash,
618                  const struct GNUNET_MessageHeader *msg)
619 {
620   int n = 0;
621   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
622                                                    client_send_origin_cb,
623                                                    (void *) msg);
624   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
625                                                    client_send_member_cb,
626                                                    (void *) msg);
627   return n;
628 }
629
630
631 /**
632  * Send message to a random origin client or a random member client.
633  *
634  * @param grp  The group to send @a msg to.
635  * @param msg  Message to send.
636  */
637 static int
638 client_send_random (struct GNUNET_HashCode *pub_key_hash,
639                     const struct GNUNET_MessageHeader *msg)
640 {
641   int n = 0;
642   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
643                                                  (void *) msg);
644   if (n <= 0)
645     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
646                                                    (void *) msg);
647   return n;
648 }
649
650
651 /**
652  * Send message to all origin clients connected to the group.
653  *
654  * @param pub_key_hash
655  *        H(key_pub) of the group.
656  * @param msg
657  *        Message to send.
658  */
659 static int
660 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
661                     const struct GNUNET_MessageHeader *msg)
662 {
663   int n = 0;
664   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
665                                                    client_send_origin_cb,
666                                                    (void *) msg);
667   return n;
668 }
669
670
671 /**
672  * Send fragment acknowledgement to all clients of the channel.
673  *
674  * @param pub_key_hash
675  *        H(key_pub) of the group.
676  */
677 static void
678 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
679 {
680   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
681               "Sending message ACK to client.\n");
682
683   static struct GNUNET_MessageHeader *msg = NULL;
684   if (NULL == msg)
685   {
686     msg = GNUNET_malloc (sizeof (*msg));
687     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
688     msg->size = htons (sizeof (*msg));
689   }
690   client_send_all (pub_key_hash, msg);
691 }
692
693
694 struct CadetTransmitClosure
695 {
696   struct Channel *chn;
697   const struct GNUNET_MessageHeader *msg;
698 };
699
700
701 /**
702  * Send a message to a CADET channel.
703  *
704  * @param chn  Channel.
705  * @param msg  Message.
706  */
707 static void
708 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
709 {
710   struct GNUNET_MQ_Envelope *
711     env = GNUNET_MQ_msg_copy (msg);
712
713   GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env);
714
715   if (0 < chn->window_size)
716   {
717     client_send_ack (&chn->group_pub_hash);
718   }
719   else
720   {
721     chn->msgs_pending++;
722     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
723                 "%p Queuing message. Pending messages: %u\n",
724                 chn, chn->msgs_pending);
725   }
726 }
727
728
729 /**
730  * Create CADET channel and send a join request.
731  */
732 static void
733 cadet_send_join_request (struct Member *mem)
734 {
735   mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin);
736   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
737
738   uint32_t i;
739   for (i = 0; i < mem->relay_count; i++)
740   {
741     struct Channel *
742       chn = cadet_channel_create (&mem->group, &mem->relays[i]);
743     cadet_send_channel (chn, &mem->join_req->header);
744   }
745 }
746
747
748 static int
749 cadet_send_join_decision_cb (void *cls,
750                              const struct GNUNET_HashCode *group_pub_hash,
751                              void *channel)
752 {
753   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
754   struct Channel *chn = channel;
755
756   const struct MulticastJoinDecisionMessage *dcsn =
757     (struct MulticastJoinDecisionMessage *) &hdcsn[1];
758
759   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
760       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
761   {
762     if (GNUNET_YES == ntohl (dcsn->is_admitted))
763     {
764       chn->join_status = JOIN_ADMITTED;
765     }
766     else
767     {
768       chn->join_status = JOIN_REFUSED;
769     }
770
771     cadet_send_channel (chn, &hdcsn->header);
772     return GNUNET_NO;
773   }
774   return GNUNET_YES;
775 }
776
777
778 /**
779  * Send join decision to a remote peer.
780  */
781 static void
782 cadet_send_join_decision (struct Group *grp,
783                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
784 {
785   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
786                                               &cadet_send_join_decision_cb,
787                                               (void *) hdcsn);
788 }
789
790
791 /**
792  * Iterator callback for sending a message to origin clients.
793  */
794 static int
795 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
796                void *channel)
797 {
798   const struct GNUNET_MessageHeader *msg = cls;
799   struct Channel *chn = channel;
800   if (JOIN_ADMITTED == chn->join_status)
801     cadet_send_channel (chn, msg);
802   return GNUNET_YES;
803 }
804
805
806 /**
807  * Send message to all connected children.
808  */
809 static int
810 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
811                      const struct GNUNET_MessageHeader *msg)
812 {
813   int n = 0;
814   if (channels_in != NULL)
815     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
816                                                      cadet_send_cb, (void *) msg);
817   return n;
818 }
819
820
821 #if 0       // unused as yet
822 /**
823  * Send message to all connected parents.
824  */
825 static int
826 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
827                     const struct GNUNET_MessageHeader *msg)
828 {
829   int n = 0;
830   if (channels_in != NULL)
831     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
832                                                      cadet_send_cb, (void *) msg);
833   return n;
834 }
835 #endif
836
837
838 /**
839  * CADET channel connect handler.
840  *
841  * @see GNUNET_CADET_ConnectEventHandler()
842  */
843 static void *
844 cadet_notify_connect (void *cls,
845                       struct GNUNET_CADET_Channel *channel,
846                       const struct GNUNET_PeerIdentity *source)
847 {
848   struct Channel *chn = GNUNET_malloc (sizeof *chn);
849   chn->group = cls;
850   chn->channel = channel;
851   chn->direction = DIR_INCOMING;
852   chn->join_status = JOIN_NOT_ASKED;
853
854   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
855                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
856   return chn;
857 }
858
859
860 /**
861  * CADET window size change handler.
862  *
863  * @see GNUNET_CADET_WindowSizeEventHandler()
864  */
865 static void
866 cadet_notify_window_change (void *cls,
867                             const struct GNUNET_CADET_Channel *channel,
868                             int window_size)
869 {
870   struct Channel *chn = cls;
871
872   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
873               "%p Window size changed to %d.  Pending messages: %u\n",
874               chn, window_size, chn->msgs_pending);
875
876   chn->is_connected = GNUNET_YES;
877   chn->window_size = (int32_t) window_size;
878
879   for (int i = 0; i < window_size; i++)
880   {
881     if (0 < chn->msgs_pending)
882     {
883       client_send_ack (&chn->group_pub_hash);
884       chn->msgs_pending--;
885     }
886     else
887     {
888       break;
889     }
890   }
891 }
892
893
894 /**
895  * CADET channel disconnect handler.
896  *
897  * @see GNUNET_CADET_DisconnectEventHandler()
898  */
899 static void
900 cadet_notify_disconnect (void *cls,
901                          const struct GNUNET_CADET_Channel *channel)
902 {
903   if (NULL == cls)
904     return;
905
906   struct Channel *chn = cls;
907   if (NULL != chn->group)
908   {
909     if (GNUNET_NO == chn->group->is_origin)
910     {
911       struct Member *mem = (struct Member *) chn->group;
912       if (chn == mem->origin_channel)
913         mem->origin_channel = NULL;
914     }
915   }
916
917   int ret;
918   do
919   {
920     ret = replay_req_remove_cadet (chn);
921   }
922   while (GNUNET_YES == ret);
923
924   GNUNET_free (chn);
925 }
926
927
928 static int
929 check_cadet_join_request (void *cls,
930                           const struct MulticastJoinRequestMessage *req)
931 {
932   struct Channel *chn = cls;
933
934   if (NULL == chn
935       || JOIN_NOT_ASKED != chn->join_status)
936   {
937     return GNUNET_SYSERR;
938   }
939
940   uint16_t size = ntohs (req->header.size);
941   if (size < sizeof (*req))
942   {
943     GNUNET_break_op (0);
944     return GNUNET_SYSERR;
945   }
946   if (ntohl (req->purpose.size) != (size
947                                     - sizeof (req->header)
948                                     - sizeof (req->reserved)
949                                     - sizeof (req->signature)))
950   {
951     GNUNET_break_op (0);
952     return GNUNET_SYSERR;
953   }
954   if (GNUNET_OK !=
955       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
956                                   &req->purpose, &req->signature,
957                                   &req->member_pub_key))
958   {
959     GNUNET_break_op (0);
960     return GNUNET_SYSERR;
961   }
962
963   return GNUNET_OK;
964 }
965
966
967 /**
968  * Incoming join request message from CADET.
969  */
970 static void
971 handle_cadet_join_request (void *cls,
972                            const struct MulticastJoinRequestMessage *req)
973 {
974   struct Channel *chn = cls;
975   GNUNET_CADET_receive_done (chn->channel);
976
977   struct GNUNET_HashCode group_pub_hash;
978   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
979   chn->group_pub_key = req->group_pub_key;
980   chn->group_pub_hash = group_pub_hash;
981   chn->member_pub_key = req->member_pub_key;
982   chn->peer = req->peer;
983   chn->join_status = JOIN_WAITING;
984
985   client_send_all (&group_pub_hash, &req->header);
986 }
987
988
989 static int
990 check_cadet_join_decision (void *cls,
991                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
992 {
993   uint16_t size = ntohs (hdcsn->header.size);
994   if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
995              sizeof (struct MulticastJoinDecisionMessage))
996   {
997     GNUNET_break_op (0);
998     return GNUNET_SYSERR;
999   }
1000
1001   struct Channel *chn = cls;
1002   if (NULL == chn)
1003   {
1004     GNUNET_break (0);
1005     return GNUNET_SYSERR;
1006   }
1007   if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
1008   {
1009     GNUNET_break (0);
1010     return GNUNET_SYSERR;
1011   }
1012   switch (chn->join_status)
1013   {
1014   case JOIN_REFUSED:
1015     return GNUNET_SYSERR;
1016
1017   case JOIN_ADMITTED:
1018     return GNUNET_OK;
1019
1020   case JOIN_NOT_ASKED:
1021   case JOIN_WAITING:
1022     break;
1023   }
1024
1025   return GNUNET_OK;
1026 }
1027
1028
1029 /**
1030  * Incoming join decision message from CADET.
1031  */
1032 static void
1033 handle_cadet_join_decision (void *cls,
1034                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1035 {
1036   const struct MulticastJoinDecisionMessage *
1037     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1038
1039   struct Channel *chn = cls;
1040   GNUNET_CADET_receive_done (chn->channel);
1041
1042   // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
1043   struct Member *mem = (struct Member *) chn->group;
1044   client_send_join_decision (mem, hdcsn);
1045   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1046   {
1047     chn->join_status = JOIN_ADMITTED;
1048   }
1049   else
1050   {
1051     chn->join_status = JOIN_REFUSED;
1052     cadet_channel_destroy (chn);
1053   }
1054 }
1055
1056
1057 static int
1058 check_cadet_message (void *cls,
1059                      const struct GNUNET_MULTICAST_MessageHeader *msg)
1060 {
1061   uint16_t size = ntohs (msg->header.size);
1062   if (size < sizeof (*msg))
1063   {
1064     GNUNET_break_op (0);
1065     return GNUNET_SYSERR;
1066   }
1067
1068   struct Channel *chn = cls;
1069   if (NULL == chn)
1070   {
1071     GNUNET_break (0);
1072     return GNUNET_SYSERR;
1073   }
1074   if (ntohl (msg->purpose.size) != (size
1075                                     - sizeof (msg->header)
1076                                     - sizeof (msg->hop_counter)
1077                                     - sizeof (msg->signature)))
1078   {
1079     GNUNET_break_op (0);
1080     return GNUNET_SYSERR;
1081   }
1082   if (GNUNET_OK !=
1083       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1084                                   &msg->purpose, &msg->signature,
1085                                   &chn->group_pub_key))
1086   {
1087     GNUNET_break_op (0);
1088     return GNUNET_SYSERR;
1089   }
1090
1091   return GNUNET_OK;
1092 }
1093
1094
1095 /**
1096  * Incoming multicast message from CADET.
1097  */
1098 static void
1099 handle_cadet_message (void *cls,
1100                       const struct GNUNET_MULTICAST_MessageHeader *msg)
1101 {
1102   struct Channel *chn = cls;
1103   GNUNET_CADET_receive_done (chn->channel);
1104   client_send_all (&chn->group_pub_hash, &msg->header);
1105 }
1106
1107
1108 static int
1109 check_cadet_request (void *cls,
1110                      const struct GNUNET_MULTICAST_RequestHeader *req)
1111 {
1112   uint16_t size = ntohs (req->header.size);
1113   if (size < sizeof (*req))
1114   {
1115     GNUNET_break_op (0);
1116     return GNUNET_SYSERR;
1117   }
1118
1119   struct Channel *chn = cls;
1120   if (NULL == chn)
1121   {
1122     GNUNET_break (0);
1123     return GNUNET_SYSERR;
1124   }
1125   if (ntohl (req->purpose.size) != (size
1126                                     - sizeof (req->header)
1127                                     - sizeof (req->member_pub_key)
1128                                     - sizeof (req->signature)))
1129   {
1130     GNUNET_break_op (0);
1131     return GNUNET_SYSERR;
1132   }
1133   if (GNUNET_OK !=
1134       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1135                                   &req->purpose, &req->signature,
1136                                   &req->member_pub_key))
1137   {
1138     GNUNET_break_op (0);
1139     return GNUNET_SYSERR;
1140   }
1141
1142   return GNUNET_OK;
1143 }
1144
1145
1146 /**
1147  * Incoming multicast request message from CADET.
1148  */
1149 static void
1150 handle_cadet_request (void *cls,
1151                       const struct GNUNET_MULTICAST_RequestHeader *req)
1152 {
1153   struct Channel *chn = cls;
1154   GNUNET_CADET_receive_done (chn->channel);
1155   client_send_origin (&chn->group_pub_hash, &req->header);
1156 }
1157
1158
1159 static int
1160 check_cadet_replay_request (void *cls,
1161                             const struct MulticastReplayRequestMessage *req)
1162 {
1163   uint16_t size = ntohs (req->header.size);
1164   if (size < sizeof (*req))
1165   {
1166     GNUNET_break_op (0);
1167     return GNUNET_SYSERR;
1168   }
1169
1170   struct Channel *chn = cls;
1171   if (NULL == chn)
1172   {
1173     GNUNET_break_op (0);
1174     return GNUNET_SYSERR;
1175   }
1176
1177   return GNUNET_OK;
1178 }
1179
1180
1181 /**
1182  * Incoming multicast replay request from CADET.
1183  */
1184 static void
1185 handle_cadet_replay_request (void *cls,
1186                              const struct MulticastReplayRequestMessage *req)
1187 {
1188   struct Channel *chn = cls;
1189   GNUNET_CADET_receive_done (chn->channel);
1190
1191   struct MulticastReplayRequestMessage rep = *req;
1192   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1193
1194   struct GNUNET_CONTAINER_MultiHashMap *
1195     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1196                                                         &chn->group->pub_key_hash);
1197   if (NULL == grp_replay_req)
1198   {
1199     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1200     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1201                                        &chn->group->pub_key_hash, grp_replay_req,
1202                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1203   }
1204   struct GNUNET_HashCode key_hash;
1205   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1206                    rep.flags, &key_hash);
1207   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1208                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1209
1210   client_send_random (&chn->group_pub_hash, &rep.header);
1211 }
1212
1213
1214 static int
1215 check_cadet_replay_response (void *cls,
1216                              const struct MulticastReplayResponseMessage *res)
1217 {
1218   struct Channel *chn = cls;
1219   if (NULL == chn)
1220   {
1221     GNUNET_break (0);
1222     return GNUNET_SYSERR;
1223   }
1224   return GNUNET_OK;
1225 }
1226
1227
1228 /**
1229  * Incoming multicast replay response from CADET.
1230  */
1231 static void
1232 handle_cadet_replay_response (void *cls,
1233                               const struct MulticastReplayResponseMessage *res)
1234 {
1235   struct Channel *chn = cls;
1236   GNUNET_CADET_receive_done (chn->channel);
1237
1238   /* @todo FIXME: got replay error response, send request to other members */
1239 }
1240
1241
1242 static void
1243 group_set_cadet_port_hash (struct Group *grp)
1244 {
1245   struct CadetPort {
1246     struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1247     uint32_t app_type;
1248   } port = {
1249     grp->pub_key,
1250     GNUNET_APPLICATION_TYPE_MULTICAST,
1251   };
1252   GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
1253 }
1254
1255
1256
1257 /**
1258  * Create new outgoing CADET channel.
1259  *
1260  * @param peer
1261  *        Peer to connect to.
1262  * @param group_pub_key
1263  *        Public key of group the channel belongs to.
1264  * @param group_pub_hash
1265  *        Hash of @a group_pub_key.
1266  *
1267  * @return Channel.
1268  */
1269 static struct Channel *
1270 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
1271 {
1272   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
1273   chn->group = grp;
1274   chn->group_pub_key = grp->pub_key;
1275   chn->group_pub_hash = grp->pub_key_hash;
1276   chn->peer = *peer;
1277   chn->direction = DIR_OUTGOING;
1278   chn->is_connected = GNUNET_NO;
1279   chn->join_status = JOIN_WAITING;
1280
1281   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1282     GNUNET_MQ_hd_var_size (cadet_message,
1283                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1284                            struct GNUNET_MULTICAST_MessageHeader,
1285                            chn),
1286
1287     GNUNET_MQ_hd_var_size (cadet_join_decision,
1288                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1289                            struct MulticastJoinDecisionMessageHeader,
1290                            chn),
1291
1292     GNUNET_MQ_hd_var_size (cadet_replay_request,
1293                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1294                            struct MulticastReplayRequestMessage,
1295                            chn),
1296
1297     GNUNET_MQ_hd_var_size (cadet_replay_response,
1298                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1299                            struct MulticastReplayResponseMessage,
1300                            chn),
1301
1302     GNUNET_MQ_handler_end ()
1303   };
1304
1305   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
1306                                               &grp->cadet_port_hash,
1307                                               GNUNET_CADET_OPTION_RELIABLE,
1308                                               cadet_notify_window_change,
1309                                               cadet_notify_disconnect,
1310                                               cadet_handlers);
1311   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
1312                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1313   return chn;
1314 }
1315
1316
1317 /**
1318  * Destroy outgoing CADET channel.
1319  */
1320 static void
1321 cadet_channel_destroy (struct Channel *chn)
1322 {
1323   GNUNET_CADET_channel_destroy (chn->channel);
1324   GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash);
1325   GNUNET_free (chn);
1326 }
1327
1328 /**
1329  * Handle a connecting client starting an origin.
1330  */
1331 static void
1332 handle_client_origin_start (void *cls,
1333                             const struct MulticastOriginStartMessage *msg)
1334 {
1335   struct Client *c = cls;
1336   struct GNUNET_SERVICE_Client *client = c->client;
1337
1338   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1339   struct GNUNET_HashCode pub_key_hash;
1340
1341   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
1342   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1343
1344   struct Origin *
1345     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
1346   struct Group *grp;
1347
1348   if (NULL == orig)
1349   {
1350     orig = GNUNET_new (struct Origin);
1351     orig->priv_key = msg->group_key;
1352     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
1353
1354     grp = c->group = &orig->group;
1355     grp->origin = orig;
1356     grp->is_origin = GNUNET_YES;
1357     grp->pub_key = pub_key;
1358     grp->pub_key_hash = pub_key_hash;
1359
1360     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
1361                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1362
1363     group_set_cadet_port_hash (grp);
1364
1365     struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1366       GNUNET_MQ_hd_var_size (cadet_message,
1367                              GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1368                              struct GNUNET_MULTICAST_MessageHeader,
1369                              grp),
1370
1371       GNUNET_MQ_hd_var_size (cadet_request,
1372                              GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
1373                              struct GNUNET_MULTICAST_RequestHeader,
1374                              grp),
1375
1376       GNUNET_MQ_hd_var_size (cadet_join_request,
1377                              GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1378                              struct MulticastJoinRequestMessage,
1379                              grp),
1380
1381       GNUNET_MQ_hd_var_size (cadet_replay_request,
1382                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1383                              struct MulticastReplayRequestMessage,
1384                              grp),
1385
1386       GNUNET_MQ_hd_var_size (cadet_replay_response,
1387                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1388                              struct MulticastReplayResponseMessage,
1389                              grp),
1390
1391       GNUNET_MQ_handler_end ()
1392     };
1393
1394
1395     orig->cadet_port = GNUNET_CADET_open_port (cadet,
1396                                                &grp->cadet_port_hash,
1397                                                cadet_notify_connect,
1398                                                NULL,
1399                                                cadet_notify_window_change,
1400                                                cadet_notify_disconnect,
1401                                                cadet_handlers);
1402   }
1403   else
1404   {
1405     grp = &orig->group;
1406   }
1407
1408   struct ClientList *cl = GNUNET_new (struct ClientList);
1409   cl->client = client;
1410   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1411
1412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1413               "%p Client connected as origin to group %s.\n",
1414               orig, GNUNET_h2s (&grp->pub_key_hash));
1415   GNUNET_SERVICE_client_continue (client);
1416 }
1417
1418
1419 static int
1420 check_client_member_join (void *cls,
1421                           const struct MulticastMemberJoinMessage *msg)
1422 {
1423   uint16_t msg_size = ntohs (msg->header.size);
1424   struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1425   uint32_t relay_count = ntohl (msg->relay_count);
1426   uint16_t relay_size = relay_count * sizeof (*relays);
1427   struct GNUNET_MessageHeader *join_msg = NULL;
1428   uint16_t join_msg_size = 0;
1429   if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1430       <= msg_size)
1431   {
1432     join_msg = (struct GNUNET_MessageHeader *)
1433       (((char *) &msg[1]) + relay_size);
1434     join_msg_size = ntohs (join_msg->size);
1435   }
1436   return
1437     msg_size == (sizeof (*msg) + relay_size + join_msg_size)
1438     ? GNUNET_OK
1439     : GNUNET_SYSERR;
1440 }
1441
1442
1443 /**
1444  * Handle a connecting client joining a group.
1445  */
1446 static void
1447 handle_client_member_join (void *cls,
1448                            const struct MulticastMemberJoinMessage *msg)
1449 {
1450   struct Client *c = cls;
1451   struct GNUNET_SERVICE_Client *client = c->client;
1452
1453   uint16_t msg_size = ntohs (msg->header.size);
1454
1455   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
1456   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
1457
1458   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
1459   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
1460   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
1461
1462   struct GNUNET_CONTAINER_MultiHashMap *
1463     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
1464   struct Member *mem = NULL;
1465   struct Group *grp;
1466
1467   if (NULL != grp_mem)
1468   {
1469     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
1470   }
1471   if (NULL == mem)
1472   {
1473     mem = GNUNET_new (struct Member);
1474     mem->origin = msg->origin;
1475     mem->priv_key = msg->member_key;
1476     mem->pub_key = mem_pub_key;
1477     mem->pub_key_hash = mem_pub_key_hash;
1478     mem->max_fragment_id = 0; // FIXME
1479
1480     grp = c->group = &mem->group;
1481     grp->member = mem;
1482     grp->is_origin = GNUNET_NO;
1483     grp->pub_key = msg->group_pub_key;
1484     grp->pub_key_hash = pub_key_hash;
1485     group_set_cadet_port_hash (grp);
1486
1487     if (NULL == grp_mem)
1488     {
1489       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1490       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
1491                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1492     }
1493     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
1494                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1495     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1496                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1497   }
1498   else
1499   {
1500     grp = &mem->group;
1501   }
1502
1503   struct ClientList *cl = GNUNET_new (struct ClientList);
1504   cl->client = client;
1505   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1506
1507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1508               "%p Client connected to group %s..\n",
1509               mem, GNUNET_h2s (&grp->pub_key_hash));
1510   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "%p ..as member %s (%s).\n",
1513               mem, GNUNET_h2s (&mem->pub_key_hash), str);
1514   GNUNET_free (str);
1515
1516   if (NULL != mem->join_dcsn)
1517   { /* Already got a join decision, send it to client. */
1518     struct GNUNET_MQ_Envelope *
1519       env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header);
1520
1521     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
1522                     env);
1523   }
1524   else
1525   { /* First client of the group, send join request. */
1526     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1527     uint32_t relay_count = ntohl (msg->relay_count);
1528     uint16_t relay_size = relay_count * sizeof (*relays);
1529     struct GNUNET_MessageHeader *join_msg = NULL;
1530     uint16_t join_msg_size = 0;
1531     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1532         <= msg_size)
1533     {
1534       join_msg = (struct GNUNET_MessageHeader *)
1535         (((char *) &msg[1]) + relay_size);
1536       join_msg_size = ntohs (join_msg->size);
1537     }
1538
1539     uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size;
1540     struct MulticastJoinRequestMessage *
1541       req = GNUNET_malloc (req_msg_size);
1542     req->header.size = htons (req_msg_size);
1543     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1544     req->group_pub_key = grp->pub_key;
1545     req->peer = this_peer;
1546     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1547     if (0 < join_msg_size)
1548       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1549
1550     req->member_pub_key = mem->pub_key;
1551     req->purpose.size = htonl (req_msg_size
1552                                - sizeof (req->header)
1553                                - sizeof (req->reserved)
1554                                - sizeof (req->signature));
1555     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1556
1557     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1558                                                &req->signature))
1559     {
1560       /* FIXME: handle error */
1561       GNUNET_assert (0);
1562     }
1563
1564     if (NULL != mem->join_req)
1565       GNUNET_free (mem->join_req);
1566     mem->join_req = req;
1567
1568     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1569     { /* No local origins, send to remote origin */
1570       cadet_send_join_request (mem);
1571     }
1572   }
1573   GNUNET_SERVICE_client_continue (client);
1574 }
1575
1576
1577 static void
1578 client_send_join_decision (struct Member *mem,
1579                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1580 {
1581   client_send_group (&mem->group, &hdcsn->header);
1582
1583   const struct MulticastJoinDecisionMessage *
1584     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1585   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1586   { /* Member admitted, store join_decision. */
1587     uint16_t dcsn_size = ntohs (dcsn->header.size);
1588     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1589     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1590   }
1591   else
1592   { /* Refused entry, but replay would be still possible for past members. */
1593   }
1594 }
1595
1596
1597 static int
1598 check_client_join_decision (void *cls,
1599                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1600 {
1601   return GNUNET_OK;
1602 }
1603
1604
1605 /**
1606  * Join decision from client.
1607  */
1608 static void
1609 handle_client_join_decision (void *cls,
1610                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
1611 {
1612   struct Client *c = cls;
1613   struct GNUNET_SERVICE_Client *client = c->client;
1614   struct Group *grp = c->group;
1615
1616   if (NULL == grp)
1617   {
1618     GNUNET_break (0);
1619     GNUNET_SERVICE_client_drop (client);
1620     return;
1621   }
1622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1623               "%p Got join decision from client for group %s..\n",
1624               grp, GNUNET_h2s (&grp->pub_key_hash));
1625
1626   struct GNUNET_CONTAINER_MultiHashMap *
1627     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1628                                                  &grp->pub_key_hash);
1629   struct Member *mem = NULL;
1630   if (NULL != grp_mem)
1631   {
1632     struct GNUNET_HashCode member_key_hash;
1633     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1634                         &member_key_hash);
1635     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1636     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637                 "%p ..and member %s: %p\n",
1638                 grp, GNUNET_h2s (&member_key_hash), mem);
1639   }
1640   if (NULL != mem)
1641   { /* Found local member */
1642     client_send_join_decision (mem, hdcsn);
1643   }
1644   else
1645   { /* Look for remote member */
1646     cadet_send_join_decision (grp, hdcsn);
1647   }
1648   GNUNET_SERVICE_client_continue (client);
1649 }
1650
1651
1652 static int
1653 check_client_multicast_message (void *cls,
1654                                 const struct GNUNET_MULTICAST_MessageHeader *msg)
1655 {
1656   return GNUNET_OK;
1657 }
1658
1659
1660 /**
1661  * Incoming message from a client.
1662  */
1663 static void
1664 handle_client_multicast_message (void *cls,
1665                                  const struct GNUNET_MULTICAST_MessageHeader *msg)
1666 {
1667   struct Client *c = cls;
1668   struct GNUNET_SERVICE_Client *client = c->client;
1669   struct Group *grp = c->group;
1670
1671   if (NULL == grp)
1672   {
1673     GNUNET_break (0);
1674     GNUNET_SERVICE_client_drop (client);
1675     return;
1676   }
1677   GNUNET_assert (GNUNET_YES == grp->is_origin);
1678   struct Origin *orig = grp->origin;
1679
1680   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1681   struct GNUNET_MULTICAST_MessageHeader *
1682     out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
1683   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1684   out->purpose.size = htonl (ntohs (out->header.size)
1685                              - sizeof (out->header)
1686                              - sizeof (out->hop_counter)
1687                              - sizeof (out->signature));
1688   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1689
1690   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1691                                              &out->signature))
1692   {
1693     GNUNET_assert (0);
1694   }
1695
1696   client_send_all (&grp->pub_key_hash, &out->header);
1697   cadet_send_children (&grp->pub_key_hash, &out->header);
1698   client_send_ack (&grp->pub_key_hash);
1699   GNUNET_free (out);
1700
1701   GNUNET_SERVICE_client_continue (client);
1702 }
1703
1704
1705 static int
1706 check_client_multicast_request (void *cls,
1707                                 const struct GNUNET_MULTICAST_RequestHeader *req)
1708 {
1709   return GNUNET_OK;
1710 }
1711
1712
1713 /**
1714  * Incoming request from a client.
1715  */
1716 static void
1717 handle_client_multicast_request (void *cls,
1718                                  const struct GNUNET_MULTICAST_RequestHeader *req)
1719 {
1720   struct Client *c = cls;
1721   struct GNUNET_SERVICE_Client *client = c->client;
1722   struct Group *grp = c->group;
1723
1724   if (NULL == grp)
1725   {
1726     GNUNET_break (0);
1727     GNUNET_SERVICE_client_drop (client);
1728     return;
1729   }
1730   GNUNET_assert (GNUNET_NO == grp->is_origin);
1731   struct Member *mem = grp->member;
1732
1733   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1734   struct GNUNET_MULTICAST_RequestHeader *
1735     out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header);
1736   out->member_pub_key = mem->pub_key;
1737   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1738   out->purpose.size = htonl (ntohs (out->header.size)
1739                              - sizeof (out->header)
1740                              - sizeof (out->member_pub_key)
1741                              - sizeof (out->signature));
1742   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1743
1744   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1745                                              &out->signature))
1746   {
1747     GNUNET_assert (0);
1748   }
1749
1750   uint8_t send_ack = GNUNET_YES;
1751   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1752   { /* No local origins, send to remote origin */
1753     if (NULL != mem->origin_channel)
1754     {
1755       cadet_send_channel (mem->origin_channel, &out->header);
1756       send_ack = GNUNET_NO;
1757     }
1758     else
1759     {
1760       /* FIXME: not yet connected to origin */
1761       GNUNET_SERVICE_client_drop (client);
1762       GNUNET_free (out);
1763       return;
1764     }
1765   }
1766   if (GNUNET_YES == send_ack)
1767   {
1768     client_send_ack (&grp->pub_key_hash);
1769   }
1770   GNUNET_free (out);
1771   GNUNET_SERVICE_client_continue (client);
1772 }
1773
1774
1775 /**
1776  * Incoming replay request from a client.
1777  */
1778 static void
1779 handle_client_replay_request (void *cls,
1780                               const struct MulticastReplayRequestMessage *rep)
1781 {
1782   struct Client *c = cls;
1783   struct GNUNET_SERVICE_Client *client = c->client;
1784   struct Group *grp = c->group;
1785
1786   if (NULL == grp)
1787   {
1788     GNUNET_break (0);
1789     GNUNET_SERVICE_client_drop (client);
1790     return;
1791   }
1792   GNUNET_assert (GNUNET_NO == grp->is_origin);
1793   struct Member *mem = grp->member;
1794
1795   struct GNUNET_CONTAINER_MultiHashMap *
1796     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1797                                                         &grp->pub_key_hash);
1798   if (NULL == grp_replay_req)
1799   {
1800     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1801     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1802                                        &grp->pub_key_hash, grp_replay_req,
1803                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1804   }
1805
1806   struct GNUNET_HashCode key_hash;
1807   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1808                    rep->flags, &key_hash);
1809   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1810                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1811
1812   if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
1813   { /* No local origin, replay from remote members / origin. */
1814     if (NULL != mem->origin_channel)
1815     {
1816       cadet_send_channel (mem->origin_channel, &rep->header);
1817     }
1818     else
1819     {
1820       /* FIXME: not yet connected to origin */
1821       GNUNET_SERVICE_client_drop (client);
1822       return;
1823     }
1824   }
1825   GNUNET_SERVICE_client_continue (client);
1826 }
1827
1828
1829 static int
1830 cadet_send_replay_response_cb (void *cls,
1831                                const struct GNUNET_HashCode *key_hash,
1832                                void *value)
1833 {
1834   struct Channel *chn = value;
1835   struct GNUNET_MessageHeader *msg = cls;
1836
1837   cadet_send_channel (chn, msg);
1838   return GNUNET_OK;
1839 }
1840
1841
1842 static int
1843 client_send_replay_response_cb (void *cls,
1844                                 const struct GNUNET_HashCode *key_hash,
1845                                 void *value)
1846 {
1847   struct GNUNET_SERVICE_Client *client = value;
1848   struct GNUNET_MessageHeader *msg = cls;
1849
1850   client_send (client, msg);
1851   return GNUNET_OK;
1852 }
1853
1854
1855 static int
1856 check_client_replay_response_end (void *cls,
1857                                   const struct MulticastReplayResponseMessage *res)
1858 {
1859   return GNUNET_OK;
1860 }
1861
1862
1863 /**
1864  * End of replay response from a client.
1865  */
1866 static void
1867 handle_client_replay_response_end (void *cls,
1868                                    const struct MulticastReplayResponseMessage *res)
1869 {
1870   struct Client *c = cls;
1871   struct GNUNET_SERVICE_Client *client = c->client;
1872   struct Group *grp = c->group;
1873
1874   if (NULL == grp)
1875   {
1876     GNUNET_break (0);
1877     GNUNET_SERVICE_client_drop (client);
1878     return;
1879   }
1880
1881   struct GNUNET_HashCode key_hash;
1882   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1883                    res->flags, &key_hash);
1884
1885   struct GNUNET_CONTAINER_MultiHashMap *
1886     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1887                                                               &grp->pub_key_hash);
1888   if (NULL != grp_replay_req_cadet)
1889   {
1890     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1891   }
1892   struct GNUNET_CONTAINER_MultiHashMap *
1893     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1894                                                                &grp->pub_key_hash);
1895   if (NULL != grp_replay_req_client)
1896   {
1897     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1898   }
1899   GNUNET_SERVICE_client_continue (client);
1900 }
1901
1902
1903 static int
1904 check_client_replay_response (void *cls,
1905                               const struct MulticastReplayResponseMessage *res)
1906 {
1907   const struct GNUNET_MessageHeader *msg = &res->header;
1908   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1909   {
1910     msg = GNUNET_MQ_extract_nested_mh (res);
1911     if (NULL == msg)
1912     {
1913       return GNUNET_SYSERR;
1914     }
1915   }
1916   return GNUNET_OK;
1917 }
1918
1919
1920 /**
1921  * Incoming replay response from a client.
1922  *
1923  * Respond with a multicast message on success, or otherwise with an error code.
1924  */
1925 static void
1926 handle_client_replay_response (void *cls,
1927                                const struct MulticastReplayResponseMessage *res)
1928 {
1929   struct Client *c = cls;
1930   struct GNUNET_SERVICE_Client *client = c->client;
1931   struct Group *grp = c->group;
1932
1933   if (NULL == grp)
1934   {
1935     GNUNET_break (0);
1936     GNUNET_SERVICE_client_drop (client);
1937     return;
1938   }
1939
1940   const struct GNUNET_MessageHeader *msg = &res->header;
1941   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1942   {
1943     msg = GNUNET_MQ_extract_nested_mh (res);
1944   }
1945
1946   struct GNUNET_HashCode key_hash;
1947   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1948                    res->flags, &key_hash);
1949
1950   struct GNUNET_CONTAINER_MultiHashMap *
1951     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1952                                                               &grp->pub_key_hash);
1953   if (NULL != grp_replay_req_cadet)
1954   {
1955     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1956                                                 cadet_send_replay_response_cb,
1957                                                 (void *) msg);
1958   }
1959   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1960   {
1961     struct GNUNET_CONTAINER_MultiHashMap *
1962       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1963                                                                  &grp->pub_key_hash);
1964     if (NULL != grp_replay_req_client)
1965     {
1966       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1967                                                   client_send_replay_response_cb,
1968                                                   (void *) msg);
1969     }
1970   }
1971   else
1972   {
1973     handle_client_replay_response_end (c, res);
1974     return;
1975   }
1976   GNUNET_SERVICE_client_continue (client);
1977 }
1978
1979
1980 /**
1981  * A new client connected.
1982  *
1983  * @param cls NULL
1984  * @param client client to add
1985  * @param mq message queue for @a client
1986  * @return @a client
1987  */
1988 static void *
1989 client_notify_connect (void *cls,
1990                        struct GNUNET_SERVICE_Client *client,
1991                        struct GNUNET_MQ_Handle *mq)
1992 {
1993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1994   /* FIXME: send connect ACK */
1995
1996   struct Client *c = GNUNET_new (struct Client);
1997   c->client = client;
1998
1999   return c;
2000 }
2001
2002
2003 /**
2004  * Called whenever a client is disconnected.
2005  * Frees our resources associated with that client.
2006  *
2007  * @param cls closure
2008  * @param client identification of the client
2009  * @param app_ctx must match @a client
2010  */
2011 static void
2012 client_notify_disconnect (void *cls,
2013                           struct GNUNET_SERVICE_Client *client,
2014                           void *app_ctx)
2015 {
2016   struct Client *c = app_ctx;
2017   struct Group *grp = c->group;
2018   GNUNET_free (c);
2019
2020   if (NULL == grp)
2021   {
2022     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2023                 "%p User context is NULL in client_disconnect()\n", grp);
2024     GNUNET_break (0);
2025     return;
2026   }
2027
2028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2029               "%p Client (%s) disconnected from group %s\n",
2030               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
2031               GNUNET_h2s (&grp->pub_key_hash));
2032
2033   struct ClientList *cl = grp->clients_head;
2034   while (NULL != cl)
2035   {
2036     if (cl->client == client)
2037     {
2038       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
2039       GNUNET_free (cl);
2040       break;
2041     }
2042     cl = cl->next;
2043   }
2044
2045   while (GNUNET_YES == replay_req_remove_client (grp, client));
2046
2047   if (NULL == grp->clients_head)
2048   { /* Last client disconnected. */
2049 #if FIXME
2050     if (NULL != grp->tmit_head)
2051     { /* Send pending messages via CADET before cleanup. */
2052       transmit_message (grp);
2053     }
2054     else
2055 #endif
2056     {
2057       cleanup_group (grp);
2058     }
2059   }
2060 }
2061
2062
2063 /**
2064  * Service started.
2065  *
2066  * @param cls closure
2067  * @param server the initialized server
2068  * @param cfg configuration to use
2069  */
2070 static void
2071 run (void *cls,
2072      const struct GNUNET_CONFIGURATION_Handle *c,
2073      struct GNUNET_SERVICE_Handle *svc)
2074 {
2075   cfg = c;
2076   service = svc;
2077   GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
2078
2079   stats = GNUNET_STATISTICS_create ("multicast", cfg);
2080   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2081   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2082   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2083   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2084   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2085   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2086   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2087
2088   cadet = GNUNET_CADET_connect (cfg);
2089
2090   GNUNET_assert (NULL != cadet);
2091
2092   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2093                                  NULL);
2094 }
2095
2096
2097 /**
2098  * Define "main" method using service macro.
2099  */
2100 GNUNET_SERVICE_MAIN
2101 ("multicast",
2102  GNUNET_SERVICE_OPTION_NONE,
2103  run,
2104  client_notify_connect,
2105  client_notify_disconnect,
2106  NULL,
2107  GNUNET_MQ_hd_fixed_size (client_origin_start,
2108                           GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
2109                           struct MulticastOriginStartMessage,
2110                           NULL),
2111  GNUNET_MQ_hd_var_size (client_member_join,
2112                         GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN,
2113                         struct MulticastMemberJoinMessage,
2114                         NULL),
2115  GNUNET_MQ_hd_var_size (client_join_decision,
2116                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
2117                         struct MulticastJoinDecisionMessageHeader,
2118                         NULL),
2119  GNUNET_MQ_hd_var_size (client_multicast_message,
2120                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
2121                         struct GNUNET_MULTICAST_MessageHeader,
2122                         NULL),
2123  GNUNET_MQ_hd_var_size (client_multicast_request,
2124                         GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
2125                         struct GNUNET_MULTICAST_RequestHeader,
2126                         NULL),
2127  GNUNET_MQ_hd_fixed_size (client_replay_request,
2128                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
2129                           struct MulticastReplayRequestMessage,
2130                           NULL),
2131  GNUNET_MQ_hd_var_size (client_replay_response,
2132                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
2133                         struct MulticastReplayResponseMessage,
2134                         NULL),
2135  GNUNET_MQ_hd_var_size (client_replay_response_end,
2136                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END,
2137                         struct MulticastReplayResponseMessage,
2138                         NULL));
2139
2140 /* end of gnunet-service-multicast.c */