multicast: logging, test fix
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "multicast.h"
34
35 /**
36  * Handle to our current configuration.
37  */
38 static const struct GNUNET_CONFIGURATION_Handle *cfg;
39
40 /**
41  * Service handle.
42  */
43 static struct GNUNET_SERVICE_Handle *service;
44
45 /**
46  * CADET handle.
47  */
48 static struct GNUNET_CADET_Handle *cadet;
49
50 /**
51  * Identity of this peer.
52  */
53 static struct GNUNET_PeerIdentity this_peer;
54
55 /**
56  * Handle to the statistics service.
57  */
58 static struct GNUNET_STATISTICS_Handle *stats;
59
60 /**
61  * All connected origin clients.
62  * Group's pub_key_hash -> struct Origin * (uniq)
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *origins;
65
66 /**
67  * All connected member clients.
68  * Group's pub_key_hash -> struct Member * (multi)
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *members;
71
72 /**
73  * Connected member clients per group.
74  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
77
78 /**
79  * Incoming CADET channels with connected children in the tree.
80  * Group's pub_key_hash -> struct Channel * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
83
84 /**
85  * Outgoing CADET channels connecting to parents in the tree.
86  * Group's pub_key_hash -> struct Channel * (multi)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
89
90 /**
91  * Incoming replay requests from CADET.
92  * Group's pub_key_hash ->
93  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
94  */
95 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
96
97 /**
98  * Incoming replay requests from clients.
99  * Group's pub_key_hash ->
100  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client *
101  */
102 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
103
104
105 /**
106  * Join status of a remote peer.
107  */
108 enum JoinStatus
109 {
110   JOIN_REFUSED  = -1,
111   JOIN_NOT_ASKED = 0,
112   JOIN_WAITING   = 1,
113   JOIN_ADMITTED  = 2,
114 };
115
116 enum ChannelDirection
117 {
118   DIR_INCOMING = 0,
119   DIR_OUTGOING = 1,
120 };
121
122
123 /**
124  * Context for a CADET channel.
125  */
126 struct Channel
127 {
128   /**
129    * Group the channel belongs to.
130    *
131    * Only set for outgoing channels.
132    */
133   struct Group *group;
134
135   /**
136    * CADET channel.
137    */
138   struct GNUNET_CADET_Channel *channel;
139
140   /**
141    * CADET transmission handle.
142    */
143   struct GNUNET_CADET_TransmitHandle *tmit_handle;
144
145   /**
146    * Public key of the target group.
147    */
148   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
149
150   /**
151    * Hash of @a group_pub_key.
152    */
153   struct GNUNET_HashCode group_pub_hash;
154
155   /**
156    * Public key of the joining member.
157    */
158   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
159
160   /**
161    * Remote peer identity.
162    */
163   struct GNUNET_PeerIdentity peer;
164
165   /**
166    * Current window size, set by cadet_notify_window_change()
167    */
168   int32_t window_size;
169
170   /**
171    * Is the connection established?
172    */
173   int8_t is_connected;
174
175   /**
176    * Is the remote peer admitted to the group?
177    * @see enum JoinStatus
178    */
179   int8_t join_status;
180
181   /**
182    * Number of messages waiting to be sent to CADET.
183    */
184   uint8_t msgs_pending;
185
186   /**
187    * Channel direction.
188    * @see enum ChannelDirection
189    */
190   uint8_t direction;
191 };
192
193
194 /**
195  * List of connected clients.
196  */
197 struct ClientList
198 {
199   struct ClientList *prev;
200   struct ClientList *next;
201   struct GNUNET_SERVICE_Client *client;
202 };
203
204
205 /**
206  * Client context for an origin or member.
207  */
208 struct Group
209 {
210   struct ClientList *clients_head;
211   struct ClientList *clients_tail;
212
213   /**
214    * Public key of the group.
215    */
216   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
217
218   /**
219    * Hash of @a pub_key.
220    */
221   struct GNUNET_HashCode pub_key_hash;
222
223   /**
224    * CADET port hash.
225    */
226   struct GNUNET_HashCode cadet_port_hash;
227
228   /**
229    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
230    */
231   uint8_t disconnected;
232
233   /**
234    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
235    */
236   uint8_t is_origin;
237
238   union {
239     struct Origin *origin;
240     struct Member *member;
241   };
242 };
243
244
245 /**
246 * Client context for a group's origin.
247  */
248 struct Origin
249 {
250   struct Group group;
251
252   /**
253    * Private key of the group.
254    */
255   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
256
257   /**
258    * CADET port.
259    */
260   struct GNUNET_CADET_Port *cadet_port;
261
262   /**
263    * Last message fragment ID sent to the group.
264    */
265   uint64_t max_fragment_id;
266 };
267
268
269 /**
270  * Client context for a group member.
271  */
272 struct Member
273 {
274   struct Group group;
275
276   /**
277    * Private key of the member.
278    */
279   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
280
281   /**
282    * Public key of the member.
283    */
284   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
285
286   /**
287    * Hash of @a pub_key.
288    */
289   struct GNUNET_HashCode pub_key_hash;
290
291   /**
292    * Join request sent to the origin / members.
293    */
294   struct MulticastJoinRequestMessage *join_req;
295
296   /**
297    * Join decision sent in reply to our request.
298    *
299    * Only a positive decision is stored here, in case of a negative decision the
300    * client is disconnected.
301    */
302   struct MulticastJoinDecisionMessageHeader *join_dcsn;
303
304   /**
305    * CADET channel to the origin.
306    */
307   struct Channel *origin_channel;
308
309   /**
310    * Peer identity of origin.
311    */
312   struct GNUNET_PeerIdentity origin;
313
314   /**
315    * Peer identity of relays (other members to connect).
316    */
317   struct GNUNET_PeerIdentity *relays;
318
319   /**
320    * Last request fragment ID sent to the origin.
321    */
322   uint64_t max_fragment_id;
323
324   /**
325    * Number of @a relays.
326    */
327   uint32_t relay_count;
328 };
329
330
331 /**
332  * Client context.
333  */
334 struct Client {
335   struct GNUNET_SERVICE_Client *client;
336   struct Group *group;
337 };
338
339
340 struct ReplayRequestKey
341 {
342   uint64_t fragment_id;
343   uint64_t message_id;
344   uint64_t fragment_offset;
345   uint64_t flags;
346 };
347
348
349 static struct Channel *
350 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer);
351
352 static void
353 cadet_channel_destroy (struct Channel *chn);
354
355 static void
356 client_send_join_decision (struct Member *mem,
357                            const struct MulticastJoinDecisionMessageHeader *hdcsn);
358
359
360 /**
361  * Task run during shutdown.
362  *
363  * @param cls unused
364  */
365 static void
366 shutdown_task (void *cls)
367 {
368   if (NULL != cadet)
369   {
370     GNUNET_CADET_disconnect (cadet);
371     cadet = NULL;
372   }
373   if (NULL != stats)
374   {
375     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
376     stats = NULL;
377   }
378   /* FIXME: do more clean up here */
379 }
380
381
382 /**
383  * Clean up origin data structures after a client disconnected.
384  */
385 static void
386 cleanup_origin (struct Origin *orig)
387 {
388   struct Group *grp = &orig->group;
389   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
390   if (NULL != orig->cadet_port)
391   {
392     GNUNET_CADET_close_port (orig->cadet_port);
393     orig->cadet_port = NULL;
394   }
395   GNUNET_free (orig);
396 }
397
398
399 /**
400  * Clean up member data structures after a client disconnected.
401  */
402 static void
403 cleanup_member (struct Member *mem)
404 {
405   struct Group *grp = &mem->group;
406   struct GNUNET_CONTAINER_MultiHashMap *
407     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
408                                                  &grp->pub_key_hash);
409   GNUNET_assert (NULL != grp_mem);
410   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
411
412   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
413   {
414     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
415                                           grp_mem);
416     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
417   }
418   if (NULL != mem->join_dcsn)
419   {
420     GNUNET_free (mem->join_dcsn);
421     mem->join_dcsn = NULL;
422   }
423   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
424   GNUNET_free (mem);
425 }
426
427
428 /**
429  * Clean up group data structures after a client disconnected.
430  */
431 static void
432 cleanup_group (struct Group *grp)
433 {
434   (GNUNET_YES == grp->is_origin)
435     ? cleanup_origin (grp->origin)
436     : cleanup_member (grp->member);
437 }
438
439
440 void
441 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
442                  uint64_t fragment_offset, uint64_t flags,
443                  struct GNUNET_HashCode *key_hash)
444 {
445   struct ReplayRequestKey key = {
446     .fragment_id = fragment_id,
447     .message_id = message_id,
448     .fragment_offset = fragment_offset,
449     .flags = flags,
450   };
451   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
452 }
453
454
455 /**
456  * Remove channel from replay request hashmap.
457  *
458  * @param chn
459  *        Channel to remove.
460  *
461  * @return #GNUNET_YES if there are more entries to process,
462  *         #GNUNET_NO when reached end of hashmap.
463  */
464 static int
465 replay_req_remove_cadet (struct Channel *chn)
466 {
467   struct GNUNET_CONTAINER_MultiHashMap *
468     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
469                                                         &chn->group->pub_key_hash);
470   if (NULL == grp_replay_req)
471     return GNUNET_NO;
472
473   struct GNUNET_CONTAINER_MultiHashMapIterator *
474     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
475   struct GNUNET_HashCode key;
476   const struct Channel *c;
477   while (GNUNET_YES
478          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
479                                                          (const void **) &c))
480   {
481     if (c == chn)
482     {
483       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
484       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
485       return GNUNET_YES;
486     }
487   }
488   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
489   return GNUNET_NO;
490 }
491
492
493 /**
494  * Remove client from replay request hashmap.
495  *
496  * @param client
497  *        Client to remove.
498  *
499  * @return #GNUNET_YES if there are more entries to process,
500  *         #GNUNET_NO when reached end of hashmap.
501  */
502 static int
503 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client)
504 {
505   struct GNUNET_CONTAINER_MultiHashMap *
506     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
507                                                         &grp->pub_key_hash);
508   if (NULL == grp_replay_req)
509     return GNUNET_NO;
510
511   struct GNUNET_CONTAINER_MultiHashMapIterator *
512     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
513   struct GNUNET_HashCode key;
514   const struct GNUNET_SERVICE_Client *c;
515   while (GNUNET_YES
516          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
517                                                          (const void **) &c))
518   {
519     if (c == client)
520     {
521       GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
522       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
523       return GNUNET_YES;
524     }
525   }
526   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
527   return GNUNET_NO;
528 }
529
530
531 /**
532  * Send message to a client.
533  */
534 static void
535 client_send (struct GNUNET_SERVICE_Client *client,
536              const struct GNUNET_MessageHeader *msg)
537 {
538   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
539               "%p Sending message to client.\n", client);
540
541   struct GNUNET_MQ_Envelope *
542     env = GNUNET_MQ_msg_copy (msg);
543
544   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
545                   env);
546 }
547
548
549 /**
550  * Send message to all clients connected to the group.
551  */
552 static void
553 client_send_group (const struct Group *grp,
554                    const struct GNUNET_MessageHeader *msg)
555 {
556   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
557               "%p Sending message to all clients of the group.\n", grp);
558
559   struct ClientList *cl = grp->clients_head;
560   while (NULL != cl)
561   {
562     struct GNUNET_MQ_Envelope *
563       env = GNUNET_MQ_msg_copy (msg);
564
565     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
566                     env);
567     cl = cl->next;
568   }
569 }
570
571
572 /**
573  * Iterator callback for sending a message to origin clients.
574  */
575 static int
576 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
577                        void *origin)
578 {
579   const struct GNUNET_MessageHeader *msg = cls;
580   struct Member *orig = origin;
581
582   client_send_group (&orig->group, msg);
583   return GNUNET_YES;
584 }
585
586
587 /**
588  * Iterator callback for sending a message to member clients.
589  */
590 static int
591 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
592                        void *member)
593 {
594   const struct GNUNET_MessageHeader *msg = cls;
595   struct Member *mem = member;
596
597   if (NULL != mem->join_dcsn)
598   { /* Only send message to admitted members */
599     client_send_group (&mem->group, msg);
600   }
601   return GNUNET_YES;
602 }
603
604
605 /**
606  * Send message to all origin and member clients connected to the group.
607  *
608  * @param pub_key_hash
609  *        H(key_pub) of the group.
610  * @param msg
611  *        Message to send.
612  */
613 static int
614 client_send_all (struct GNUNET_HashCode *pub_key_hash,
615                  const struct GNUNET_MessageHeader *msg)
616 {
617   int n = 0;
618   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
619                                                    client_send_origin_cb,
620                                                    (void *) msg);
621   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
622                                                    client_send_member_cb,
623                                                    (void *) msg);
624   return n;
625 }
626
627
628 /**
629  * Send message to a random origin client or a random member client.
630  *
631  * @param grp  The group to send @a msg to.
632  * @param msg  Message to send.
633  */
634 static int
635 client_send_random (struct GNUNET_HashCode *pub_key_hash,
636                     const struct GNUNET_MessageHeader *msg)
637 {
638   int n = 0;
639   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
640                                                  (void *) msg);
641   if (n <= 0)
642     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
643                                                    (void *) msg);
644   return n;
645 }
646
647
648 /**
649  * Send message to all origin clients connected to the group.
650  *
651  * @param pub_key_hash
652  *        H(key_pub) of the group.
653  * @param msg
654  *        Message to send.
655  */
656 static int
657 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
658                     const struct GNUNET_MessageHeader *msg)
659 {
660   int n = 0;
661   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
662                                                    client_send_origin_cb,
663                                                    (void *) msg);
664   return n;
665 }
666
667
668 /**
669  * Send fragment acknowledgement to all clients of the channel.
670  *
671  * @param pub_key_hash
672  *        H(key_pub) of the group.
673  */
674 static void
675 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
676 {
677   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
678               "Sending message ACK to client.\n");
679
680   static struct GNUNET_MessageHeader *msg = NULL;
681   if (NULL == msg)
682   {
683     msg = GNUNET_malloc (sizeof (*msg));
684     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
685     msg->size = htons (sizeof (*msg));
686   }
687   client_send_all (pub_key_hash, msg);
688 }
689
690
691 struct CadetTransmitClosure
692 {
693   struct Channel *chn;
694   const struct GNUNET_MessageHeader *msg;
695 };
696
697
698 /**
699  * Send a message to a CADET channel.
700  *
701  * @param chn  Channel.
702  * @param msg  Message.
703  */
704 static void
705 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
706 {
707   struct GNUNET_MQ_Envelope *
708     env = GNUNET_MQ_msg_copy (msg);
709
710   GNUNET_MQ_send (GNUNET_CADET_get_mq (chn->channel), env);
711
712   if (0 < chn->window_size)
713   {
714     client_send_ack (&chn->group_pub_hash);
715   }
716   else
717   {
718     chn->msgs_pending++;
719     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
720                 "%p Queuing message. Pending messages: %u\n",
721                 chn, chn->msgs_pending);
722   }
723 }
724
725
726 /**
727  * Create CADET channel and send a join request.
728  */
729 static void
730 cadet_send_join_request (struct Member *mem)
731 {
732   mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin);
733   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
734
735   uint32_t i;
736   for (i = 0; i < mem->relay_count; i++)
737   {
738     struct Channel *
739       chn = cadet_channel_create (&mem->group, &mem->relays[i]);
740     cadet_send_channel (chn, &mem->join_req->header);
741   }
742 }
743
744
745 static int
746 cadet_send_join_decision_cb (void *cls,
747                              const struct GNUNET_HashCode *group_pub_hash,
748                              void *channel)
749 {
750   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
751   struct Channel *chn = channel;
752
753   const struct MulticastJoinDecisionMessage *dcsn =
754     (struct MulticastJoinDecisionMessage *) &hdcsn[1];
755
756   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
757       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
758   {
759     if (GNUNET_YES == ntohl (dcsn->is_admitted))
760     {
761       chn->join_status = JOIN_ADMITTED;
762     }
763     else
764     {
765       chn->join_status = JOIN_REFUSED;
766     }
767
768     cadet_send_channel (chn, &hdcsn->header);
769     return GNUNET_NO;
770   }
771   return GNUNET_YES;
772 }
773
774
775 /**
776  * Send join decision to a remote peer.
777  */
778 static void
779 cadet_send_join_decision (struct Group *grp,
780                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
781 {
782   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
783                                               &cadet_send_join_decision_cb,
784                                               (void *) hdcsn);
785 }
786
787
788 /**
789  * Iterator callback for sending a message to origin clients.
790  */
791 static int
792 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
793                void *channel)
794 {
795   const struct GNUNET_MessageHeader *msg = cls;
796   struct Channel *chn = channel;
797   if (JOIN_ADMITTED == chn->join_status)
798     cadet_send_channel (chn, msg);
799   return GNUNET_YES;
800 }
801
802
803 /**
804  * Send message to all connected children.
805  */
806 static int
807 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
808                      const struct GNUNET_MessageHeader *msg)
809 {
810   int n = 0;
811   if (channels_in != NULL)
812     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
813                                                      cadet_send_cb, (void *) msg);
814   return n;
815 }
816
817
818 #if 0       // unused as yet
819 /**
820  * Send message to all connected parents.
821  */
822 static int
823 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
824                     const struct GNUNET_MessageHeader *msg)
825 {
826   int n = 0;
827   if (channels_in != NULL)
828     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
829                                                      cadet_send_cb, (void *) msg);
830   return n;
831 }
832 #endif
833
834
835 /**
836  * CADET channel connect handler.
837  *
838  * @see GNUNET_CADET_ConnectEventHandler()
839  */
840 static void *
841 cadet_notify_connect (void *cls,
842                       struct GNUNET_CADET_Channel *channel,
843                       const struct GNUNET_PeerIdentity *source)
844 {
845   struct Channel *chn = GNUNET_malloc (sizeof *chn);
846   chn->group = cls;
847   chn->channel = channel;
848   chn->direction = DIR_INCOMING;
849   chn->join_status = JOIN_NOT_ASKED;
850
851   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
852                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
853   return chn;
854 }
855
856
857 /**
858  * CADET window size change handler.
859  *
860  * @see GNUNET_CADET_WindowSizeEventHandler()
861  */
862 static void
863 cadet_notify_window_change (void *cls,
864                             const struct GNUNET_CADET_Channel *channel,
865                             int window_size)
866 {
867   struct Channel *chn = cls;
868
869   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
870               "%p Window size changed to %d.  Pending messages: %u\n",
871               chn, window_size, chn->msgs_pending);
872
873   chn->is_connected = GNUNET_YES;
874   chn->window_size = (int32_t) window_size;
875
876   for (int i = 0; i < window_size; i++)
877   {
878     if (0 < chn->msgs_pending)
879     {
880       client_send_ack (&chn->group_pub_hash);
881       chn->msgs_pending--;
882     }
883     else
884     {
885       break;
886     }
887   }
888 }
889
890
891 /**
892  * CADET channel disconnect handler.
893  *
894  * @see GNUNET_CADET_DisconnectEventHandler()
895  */
896 static void
897 cadet_notify_disconnect (void *cls,
898                          const struct GNUNET_CADET_Channel *channel)
899 {
900   if (NULL == cls)
901     return;
902
903   struct Channel *chn = cls;
904   if (NULL != chn->group)
905   {
906     if (GNUNET_NO == chn->group->is_origin)
907     {
908       struct Member *mem = (struct Member *) chn->group;
909       if (chn == mem->origin_channel)
910         mem->origin_channel = NULL;
911     }
912   }
913
914   int ret;
915   do
916   {
917     ret = replay_req_remove_cadet (chn);
918   }
919   while (GNUNET_YES == ret);
920
921   GNUNET_free (chn);
922 }
923
924
925 static int
926 check_cadet_join_request (void *cls,
927                           const struct MulticastJoinRequestMessage *req)
928 {
929   struct Channel *chn = cls;
930
931   if (NULL == chn
932       || JOIN_NOT_ASKED != chn->join_status)
933   {
934     return GNUNET_SYSERR;
935   }
936
937   uint16_t size = ntohs (req->header.size);
938   if (size < sizeof (*req))
939   {
940     GNUNET_break_op (0);
941     return GNUNET_SYSERR;
942   }
943   if (ntohl (req->purpose.size) != (size
944                                     - sizeof (req->header)
945                                     - sizeof (req->reserved)
946                                     - sizeof (req->signature)))
947   {
948     GNUNET_break_op (0);
949     return GNUNET_SYSERR;
950   }
951   if (GNUNET_OK !=
952       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
953                                   &req->purpose, &req->signature,
954                                   &req->member_pub_key))
955   {
956     GNUNET_break_op (0);
957     return GNUNET_SYSERR;
958   }
959
960   return GNUNET_OK;
961 }
962
963
964 /**
965  * Incoming join request message from CADET.
966  */
967 static void
968 handle_cadet_join_request (void *cls,
969                            const struct MulticastJoinRequestMessage *req)
970 {
971   struct Channel *chn = cls;
972   GNUNET_CADET_receive_done (chn->channel);
973
974   struct GNUNET_HashCode group_pub_hash;
975   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
976   chn->group_pub_key = req->group_pub_key;
977   chn->group_pub_hash = group_pub_hash;
978   chn->member_pub_key = req->member_pub_key;
979   chn->peer = req->peer;
980   chn->join_status = JOIN_WAITING;
981
982   client_send_all (&group_pub_hash, &req->header);
983 }
984
985
986 static int
987 check_cadet_join_decision (void *cls,
988                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
989 {
990   uint16_t size = ntohs (hdcsn->header.size);
991   if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
992              sizeof (struct MulticastJoinDecisionMessage))
993   {
994     GNUNET_break_op (0);
995     return GNUNET_SYSERR;
996   }
997
998   struct Channel *chn = cls;
999   if (NULL == chn)
1000   {
1001     GNUNET_break (0);
1002     return GNUNET_SYSERR;
1003   }
1004   if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
1005   {
1006     GNUNET_break (0);
1007     return GNUNET_SYSERR;
1008   }
1009   switch (chn->join_status)
1010   {
1011   case JOIN_REFUSED:
1012     return GNUNET_SYSERR;
1013
1014   case JOIN_ADMITTED:
1015     return GNUNET_OK;
1016
1017   case JOIN_NOT_ASKED:
1018   case JOIN_WAITING:
1019     break;
1020   }
1021
1022   return GNUNET_OK;
1023 }
1024
1025
1026 /**
1027  * Incoming join decision message from CADET.
1028  */
1029 static void
1030 handle_cadet_join_decision (void *cls,
1031                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1032 {
1033   const struct MulticastJoinDecisionMessage *
1034     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1035
1036   struct Channel *chn = cls;
1037   GNUNET_CADET_receive_done (chn->channel);
1038
1039   // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
1040   struct Member *mem = (struct Member *) chn->group;
1041   client_send_join_decision (mem, hdcsn);
1042   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1043   {
1044     chn->join_status = JOIN_ADMITTED;
1045   }
1046   else
1047   {
1048     chn->join_status = JOIN_REFUSED;
1049     cadet_channel_destroy (chn);
1050   }
1051 }
1052
1053
1054 static int
1055 check_cadet_message (void *cls,
1056                      const struct GNUNET_MULTICAST_MessageHeader *msg)
1057 {
1058   uint16_t size = ntohs (msg->header.size);
1059   if (size < sizeof (*msg))
1060   {
1061     GNUNET_break_op (0);
1062     return GNUNET_SYSERR;
1063   }
1064
1065   struct Channel *chn = cls;
1066   if (NULL == chn)
1067   {
1068     GNUNET_break (0);
1069     return GNUNET_SYSERR;
1070   }
1071   if (ntohl (msg->purpose.size) != (size
1072                                     - sizeof (msg->header)
1073                                     - sizeof (msg->hop_counter)
1074                                     - sizeof (msg->signature)))
1075   {
1076     GNUNET_break_op (0);
1077     return GNUNET_SYSERR;
1078   }
1079   if (GNUNET_OK !=
1080       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1081                                   &msg->purpose, &msg->signature,
1082                                   &chn->group_pub_key))
1083   {
1084     GNUNET_break_op (0);
1085     return GNUNET_SYSERR;
1086   }
1087
1088   return GNUNET_OK;
1089 }
1090
1091
1092 /**
1093  * Incoming multicast message from CADET.
1094  */
1095 static void
1096 handle_cadet_message (void *cls,
1097                       const struct GNUNET_MULTICAST_MessageHeader *msg)
1098 {
1099   struct Channel *chn = cls;
1100   GNUNET_CADET_receive_done (chn->channel);
1101   client_send_all (&chn->group_pub_hash, &msg->header);
1102 }
1103
1104
1105 static int
1106 check_cadet_request (void *cls,
1107                      const struct GNUNET_MULTICAST_RequestHeader *req)
1108 {
1109   uint16_t size = ntohs (req->header.size);
1110   if (size < sizeof (*req))
1111   {
1112     GNUNET_break_op (0);
1113     return GNUNET_SYSERR;
1114   }
1115
1116   struct Channel *chn = cls;
1117   if (NULL == chn)
1118   {
1119     GNUNET_break (0);
1120     return GNUNET_SYSERR;
1121   }
1122   if (ntohl (req->purpose.size) != (size
1123                                     - sizeof (req->header)
1124                                     - sizeof (req->member_pub_key)
1125                                     - sizeof (req->signature)))
1126   {
1127     GNUNET_break_op (0);
1128     return GNUNET_SYSERR;
1129   }
1130   if (GNUNET_OK !=
1131       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1132                                   &req->purpose, &req->signature,
1133                                   &req->member_pub_key))
1134   {
1135     GNUNET_break_op (0);
1136     return GNUNET_SYSERR;
1137   }
1138
1139   return GNUNET_OK;
1140 }
1141
1142
1143 /**
1144  * Incoming multicast request message from CADET.
1145  */
1146 static void
1147 handle_cadet_request (void *cls,
1148                       const struct GNUNET_MULTICAST_RequestHeader *req)
1149 {
1150   struct Channel *chn = cls;
1151   GNUNET_CADET_receive_done (chn->channel);
1152   client_send_origin (&chn->group_pub_hash, &req->header);
1153 }
1154
1155
1156 static int
1157 check_cadet_replay_request (void *cls,
1158                             const struct MulticastReplayRequestMessage *req)
1159 {
1160   uint16_t size = ntohs (req->header.size);
1161   if (size < sizeof (*req))
1162   {
1163     GNUNET_break_op (0);
1164     return GNUNET_SYSERR;
1165   }
1166
1167   struct Channel *chn = cls;
1168   if (NULL == chn)
1169   {
1170     GNUNET_break_op (0);
1171     return GNUNET_SYSERR;
1172   }
1173
1174   return GNUNET_OK;
1175 }
1176
1177
1178 /**
1179  * Incoming multicast replay request from CADET.
1180  */
1181 static void
1182 handle_cadet_replay_request (void *cls,
1183                              const struct MulticastReplayRequestMessage *req)
1184 {
1185   struct Channel *chn = cls;
1186   GNUNET_CADET_receive_done (chn->channel);
1187
1188   struct MulticastReplayRequestMessage rep = *req;
1189   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1190
1191   struct GNUNET_CONTAINER_MultiHashMap *
1192     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1193                                                         &chn->group->pub_key_hash);
1194   if (NULL == grp_replay_req)
1195   {
1196     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1197     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1198                                        &chn->group->pub_key_hash, grp_replay_req,
1199                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1200   }
1201   struct GNUNET_HashCode key_hash;
1202   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1203                    rep.flags, &key_hash);
1204   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1205                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1206
1207   client_send_random (&chn->group_pub_hash, &rep.header);
1208 }
1209
1210
1211 static int
1212 check_cadet_replay_response (void *cls,
1213                              const struct MulticastReplayResponseMessage *res)
1214 {
1215   struct Channel *chn = cls;
1216   if (NULL == chn)
1217   {
1218     GNUNET_break (0);
1219     return GNUNET_SYSERR;
1220   }
1221   return GNUNET_OK;
1222 }
1223
1224
1225 /**
1226  * Incoming multicast replay response from CADET.
1227  */
1228 static void
1229 handle_cadet_replay_response (void *cls,
1230                               const struct MulticastReplayResponseMessage *res)
1231 {
1232   struct Channel *chn = cls;
1233   GNUNET_CADET_receive_done (chn->channel);
1234
1235   /* @todo FIXME: got replay error response, send request to other members */
1236 }
1237
1238
1239 static void
1240 group_set_cadet_port_hash (struct Group *grp)
1241 {
1242   struct CadetPort {
1243     struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1244     uint32_t app_type;
1245   } port = {
1246     grp->pub_key,
1247     GNUNET_APPLICATION_TYPE_MULTICAST,
1248   };
1249   GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
1250 }
1251
1252
1253
1254 /**
1255  * Create new outgoing CADET channel.
1256  *
1257  * @param peer
1258  *        Peer to connect to.
1259  * @param group_pub_key
1260  *        Public key of group the channel belongs to.
1261  * @param group_pub_hash
1262  *        Hash of @a group_pub_key.
1263  *
1264  * @return Channel.
1265  */
1266 static struct Channel *
1267 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
1268 {
1269   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
1270   chn->group = grp;
1271   chn->group_pub_key = grp->pub_key;
1272   chn->group_pub_hash = grp->pub_key_hash;
1273   chn->peer = *peer;
1274   chn->direction = DIR_OUTGOING;
1275   chn->is_connected = GNUNET_NO;
1276   chn->join_status = JOIN_WAITING;
1277
1278   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1279     GNUNET_MQ_hd_var_size (cadet_message,
1280                            GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1281                            struct GNUNET_MULTICAST_MessageHeader,
1282                            chn),
1283
1284     GNUNET_MQ_hd_var_size (cadet_join_decision,
1285                            GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1286                            struct MulticastJoinDecisionMessageHeader,
1287                            chn),
1288
1289     GNUNET_MQ_hd_var_size (cadet_replay_request,
1290                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1291                            struct MulticastReplayRequestMessage,
1292                            chn),
1293
1294     GNUNET_MQ_hd_var_size (cadet_replay_response,
1295                            GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1296                            struct MulticastReplayResponseMessage,
1297                            chn),
1298
1299     GNUNET_MQ_handler_end ()
1300   };
1301
1302   chn->channel = GNUNET_CADET_channel_creatE (cadet, chn, &chn->peer,
1303                                               &grp->cadet_port_hash,
1304                                               GNUNET_CADET_OPTION_RELIABLE,
1305                                               cadet_notify_window_change,
1306                                               cadet_notify_disconnect,
1307                                               cadet_handlers);
1308   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
1309                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1310   return chn;
1311 }
1312
1313
1314 /**
1315  * Destroy outgoing CADET channel.
1316  */
1317 static void
1318 cadet_channel_destroy (struct Channel *chn)
1319 {
1320   GNUNET_CADET_channel_destroy (chn->channel);
1321   GNUNET_CONTAINER_multihashmap_remove_all (channels_out, &chn->group_pub_hash);
1322   GNUNET_free (chn);
1323 }
1324
1325 /**
1326  * Handle a connecting client starting an origin.
1327  */
1328 static void
1329 handle_client_origin_start (void *cls,
1330                             const struct MulticastOriginStartMessage *msg)
1331 {
1332   struct Client *c = cls;
1333   struct GNUNET_SERVICE_Client *client = c->client;
1334
1335   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1336   struct GNUNET_HashCode pub_key_hash;
1337
1338   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
1339   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1340
1341   struct Origin *
1342     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
1343   struct Group *grp;
1344
1345   if (NULL == orig)
1346   {
1347     orig = GNUNET_new (struct Origin);
1348     orig->priv_key = msg->group_key;
1349     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
1350
1351     grp = c->group = &orig->group;
1352     grp->origin = orig;
1353     grp->is_origin = GNUNET_YES;
1354     grp->pub_key = pub_key;
1355     grp->pub_key_hash = pub_key_hash;
1356
1357     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
1358                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1359
1360     group_set_cadet_port_hash (grp);
1361
1362     struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1363       GNUNET_MQ_hd_var_size (cadet_message,
1364                              GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1365                              struct GNUNET_MULTICAST_MessageHeader,
1366                              grp),
1367
1368       GNUNET_MQ_hd_var_size (cadet_request,
1369                              GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
1370                              struct GNUNET_MULTICAST_RequestHeader,
1371                              grp),
1372
1373       GNUNET_MQ_hd_var_size (cadet_join_request,
1374                              GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1375                              struct MulticastJoinRequestMessage,
1376                              grp),
1377
1378       GNUNET_MQ_hd_var_size (cadet_replay_request,
1379                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1380                              struct MulticastReplayRequestMessage,
1381                              grp),
1382
1383       GNUNET_MQ_hd_var_size (cadet_replay_response,
1384                              GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1385                              struct MulticastReplayResponseMessage,
1386                              grp),
1387
1388       GNUNET_MQ_handler_end ()
1389     };
1390
1391
1392     orig->cadet_port = GNUNET_CADET_open_porT (cadet,
1393                                                &grp->cadet_port_hash,
1394                                                cadet_notify_connect,
1395                                                NULL,
1396                                                cadet_notify_window_change,
1397                                                cadet_notify_disconnect,
1398                                                cadet_handlers);
1399   }
1400   else
1401   {
1402     grp = &orig->group;
1403   }
1404
1405   struct ClientList *cl = GNUNET_new (struct ClientList);
1406   cl->client = client;
1407   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1408
1409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410               "%p Client connected as origin to group %s.\n",
1411               orig, GNUNET_h2s (&grp->pub_key_hash));
1412   GNUNET_SERVICE_client_continue (client);
1413 }
1414
1415
1416 static int
1417 check_client_member_join (void *cls,
1418                           const struct MulticastMemberJoinMessage *msg)
1419 {
1420   uint16_t msg_size = ntohs (msg->header.size);
1421   struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1422   uint32_t relay_count = ntohl (msg->relay_count);
1423   uint16_t relay_size = relay_count * sizeof (*relays);
1424   struct GNUNET_MessageHeader *join_msg = NULL;
1425   uint16_t join_msg_size = 0;
1426   if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1427       <= msg_size)
1428   {
1429     join_msg = (struct GNUNET_MessageHeader *)
1430       (((char *) &msg[1]) + relay_size);
1431     join_msg_size = ntohs (join_msg->size);
1432   }
1433   return
1434     msg_size == (sizeof (*msg) + relay_size + join_msg_size)
1435     ? GNUNET_OK
1436     : GNUNET_SYSERR;
1437 }
1438
1439
1440 /**
1441  * Handle a connecting client joining a group.
1442  */
1443 static void
1444 handle_client_member_join (void *cls,
1445                            const struct MulticastMemberJoinMessage *msg)
1446 {
1447   struct Client *c = cls;
1448   struct GNUNET_SERVICE_Client *client = c->client;
1449
1450   uint16_t msg_size = ntohs (msg->header.size);
1451
1452   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
1453   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
1454
1455   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
1456   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
1457   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
1458
1459   struct GNUNET_CONTAINER_MultiHashMap *
1460     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
1461   struct Member *mem = NULL;
1462   struct Group *grp;
1463
1464   if (NULL != grp_mem)
1465   {
1466     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
1467   }
1468   if (NULL == mem)
1469   {
1470     mem = GNUNET_new (struct Member);
1471     mem->origin = msg->origin;
1472     mem->priv_key = msg->member_key;
1473     mem->pub_key = mem_pub_key;
1474     mem->pub_key_hash = mem_pub_key_hash;
1475     mem->max_fragment_id = 0; // FIXME
1476
1477     grp = c->group = &mem->group;
1478     grp->member = mem;
1479     grp->is_origin = GNUNET_NO;
1480     grp->pub_key = msg->group_pub_key;
1481     grp->pub_key_hash = pub_key_hash;
1482     group_set_cadet_port_hash (grp);
1483
1484     if (NULL == grp_mem)
1485     {
1486       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1487       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
1488                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1489     }
1490     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
1491                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1492     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1493                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1494   }
1495   else
1496   {
1497     grp = &mem->group;
1498   }
1499
1500   struct ClientList *cl = GNUNET_new (struct ClientList);
1501   cl->client = client;
1502   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1503
1504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505               "%p Client connected to group %s..\n",
1506               mem, GNUNET_h2s (&grp->pub_key_hash));
1507   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509               "%p ..as member %s (%s).\n",
1510               mem, GNUNET_h2s (&mem->pub_key_hash), str);
1511   GNUNET_free (str);
1512
1513   if (NULL != mem->join_dcsn)
1514   { /* Already got a join decision, send it to client. */
1515     struct GNUNET_MQ_Envelope *
1516       env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header);
1517
1518     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
1519                     env);
1520   }
1521   else
1522   { /* First client of the group, send join request. */
1523     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1524     uint32_t relay_count = ntohl (msg->relay_count);
1525     uint16_t relay_size = relay_count * sizeof (*relays);
1526     struct GNUNET_MessageHeader *join_msg = NULL;
1527     uint16_t join_msg_size = 0;
1528     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1529         <= msg_size)
1530     {
1531       join_msg = (struct GNUNET_MessageHeader *)
1532         (((char *) &msg[1]) + relay_size);
1533       join_msg_size = ntohs (join_msg->size);
1534     }
1535
1536     uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size;
1537     struct MulticastJoinRequestMessage *
1538       req = GNUNET_malloc (req_msg_size);
1539     req->header.size = htons (req_msg_size);
1540     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1541     req->group_pub_key = grp->pub_key;
1542     req->peer = this_peer;
1543     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1544     if (0 < join_msg_size)
1545       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1546
1547     req->member_pub_key = mem->pub_key;
1548     req->purpose.size = htonl (req_msg_size
1549                                - sizeof (req->header)
1550                                - sizeof (req->reserved)
1551                                - sizeof (req->signature));
1552     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1553
1554     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1555                                                &req->signature))
1556     {
1557       /* FIXME: handle error */
1558       GNUNET_assert (0);
1559     }
1560
1561     if (NULL != mem->join_req)
1562       GNUNET_free (mem->join_req);
1563     mem->join_req = req;
1564
1565     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1566     { /* No local origins, send to remote origin */
1567       cadet_send_join_request (mem);
1568     }
1569   }
1570   GNUNET_SERVICE_client_continue (client);
1571 }
1572
1573
1574 static void
1575 client_send_join_decision (struct Member *mem,
1576                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1577 {
1578   client_send_group (&mem->group, &hdcsn->header);
1579
1580   const struct MulticastJoinDecisionMessage *
1581     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1582   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1583   { /* Member admitted, store join_decision. */
1584     uint16_t dcsn_size = ntohs (dcsn->header.size);
1585     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1586     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1587   }
1588   else
1589   { /* Refused entry, but replay would be still possible for past members. */
1590   }
1591 }
1592
1593
1594 static int
1595 check_client_join_decision (void *cls,
1596                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1597 {
1598   return GNUNET_OK;
1599 }
1600
1601
1602 /**
1603  * Join decision from client.
1604  */
1605 static void
1606 handle_client_join_decision (void *cls,
1607                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
1608 {
1609   struct Client *c = cls;
1610   struct GNUNET_SERVICE_Client *client = c->client;
1611   struct Group *grp = c->group;
1612
1613   if (NULL == grp)
1614   {
1615     GNUNET_break (0);
1616     GNUNET_SERVICE_client_drop (client);
1617     return;
1618   }
1619   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620               "%p Got join decision from client for group %s..\n",
1621               grp, GNUNET_h2s (&grp->pub_key_hash));
1622
1623   struct GNUNET_CONTAINER_MultiHashMap *
1624     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1625                                                  &grp->pub_key_hash);
1626   struct Member *mem = NULL;
1627   if (NULL != grp_mem)
1628   {
1629     struct GNUNET_HashCode member_key_hash;
1630     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1631                         &member_key_hash);
1632     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1633     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1634                 "%p ..and member %s: %p\n",
1635                 grp, GNUNET_h2s (&member_key_hash), mem);
1636   }
1637   if (NULL != mem)
1638   { /* Found local member */
1639     client_send_join_decision (mem, hdcsn);
1640   }
1641   else
1642   { /* Look for remote member */
1643     cadet_send_join_decision (grp, hdcsn);
1644   }
1645   GNUNET_SERVICE_client_continue (client);
1646 }
1647
1648
1649 static int
1650 check_client_multicast_message (void *cls,
1651                                 const struct GNUNET_MULTICAST_MessageHeader *msg)
1652 {
1653   return GNUNET_OK;
1654 }
1655
1656
1657 /**
1658  * Incoming message from a client.
1659  */
1660 static void
1661 handle_client_multicast_message (void *cls,
1662                                  const struct GNUNET_MULTICAST_MessageHeader *msg)
1663 {
1664   struct Client *c = cls;
1665   struct GNUNET_SERVICE_Client *client = c->client;
1666   struct Group *grp = c->group;
1667
1668   if (NULL == grp)
1669   {
1670     GNUNET_break (0);
1671     GNUNET_SERVICE_client_drop (client);
1672     return;
1673   }
1674   GNUNET_assert (GNUNET_YES == grp->is_origin);
1675   struct Origin *orig = grp->origin;
1676
1677   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1678   struct GNUNET_MULTICAST_MessageHeader *
1679     out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
1680   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1681   out->purpose.size = htonl (ntohs (out->header.size)
1682                              - sizeof (out->header)
1683                              - sizeof (out->hop_counter)
1684                              - sizeof (out->signature));
1685   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1686
1687   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1688                                              &out->signature))
1689   {
1690     GNUNET_assert (0);
1691   }
1692
1693   client_send_all (&grp->pub_key_hash, &out->header);
1694   cadet_send_children (&grp->pub_key_hash, &out->header);
1695   client_send_ack (&grp->pub_key_hash);
1696   GNUNET_free (out);
1697
1698   GNUNET_SERVICE_client_continue (client);
1699 }
1700
1701
1702 static int
1703 check_client_multicast_request (void *cls,
1704                                 const struct GNUNET_MULTICAST_RequestHeader *req)
1705 {
1706   return GNUNET_OK;
1707 }
1708
1709
1710 /**
1711  * Incoming request from a client.
1712  */
1713 static void
1714 handle_client_multicast_request (void *cls,
1715                                  const struct GNUNET_MULTICAST_RequestHeader *req)
1716 {
1717   struct Client *c = cls;
1718   struct GNUNET_SERVICE_Client *client = c->client;
1719   struct Group *grp = c->group;
1720
1721   if (NULL == grp)
1722   {
1723     GNUNET_break (0);
1724     GNUNET_SERVICE_client_drop (client);
1725     return;
1726   }
1727   GNUNET_assert (GNUNET_NO == grp->is_origin);
1728   struct Member *mem = grp->member;
1729
1730   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1731   struct GNUNET_MULTICAST_RequestHeader *
1732     out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header);
1733   out->member_pub_key = mem->pub_key;
1734   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1735   out->purpose.size = htonl (ntohs (out->header.size)
1736                              - sizeof (out->header)
1737                              - sizeof (out->member_pub_key)
1738                              - sizeof (out->signature));
1739   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1740
1741   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1742                                              &out->signature))
1743   {
1744     GNUNET_assert (0);
1745   }
1746
1747   uint8_t send_ack = GNUNET_YES;
1748   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1749   { /* No local origins, send to remote origin */
1750     if (NULL != mem->origin_channel)
1751     {
1752       cadet_send_channel (mem->origin_channel, &out->header);
1753       send_ack = GNUNET_NO;
1754     }
1755     else
1756     {
1757       /* FIXME: not yet connected to origin */
1758       GNUNET_SERVICE_client_drop (client);
1759       GNUNET_free (out);
1760       return;
1761     }
1762   }
1763   if (GNUNET_YES == send_ack)
1764   {
1765     client_send_ack (&grp->pub_key_hash);
1766   }
1767   GNUNET_free (out);
1768   GNUNET_SERVICE_client_continue (client);
1769 }
1770
1771
1772 /**
1773  * Incoming replay request from a client.
1774  */
1775 static void
1776 handle_client_replay_request (void *cls,
1777                               const struct MulticastReplayRequestMessage *rep)
1778 {
1779   struct Client *c = cls;
1780   struct GNUNET_SERVICE_Client *client = c->client;
1781   struct Group *grp = c->group;
1782
1783   if (NULL == grp)
1784   {
1785     GNUNET_break (0);
1786     GNUNET_SERVICE_client_drop (client);
1787     return;
1788   }
1789   GNUNET_assert (GNUNET_NO == grp->is_origin);
1790   struct Member *mem = grp->member;
1791
1792   struct GNUNET_CONTAINER_MultiHashMap *
1793     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1794                                                         &grp->pub_key_hash);
1795   if (NULL == grp_replay_req)
1796   {
1797     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1798     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1799                                        &grp->pub_key_hash, grp_replay_req,
1800                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1801   }
1802
1803   struct GNUNET_HashCode key_hash;
1804   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1805                    rep->flags, &key_hash);
1806   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1807                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1808
1809   if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
1810   { /* No local origin, replay from remote members / origin. */
1811     if (NULL != mem->origin_channel)
1812     {
1813       cadet_send_channel (mem->origin_channel, &rep->header);
1814     }
1815     else
1816     {
1817       /* FIXME: not yet connected to origin */
1818       GNUNET_SERVICE_client_drop (client);
1819       return;
1820     }
1821   }
1822   GNUNET_SERVICE_client_continue (client);
1823 }
1824
1825
1826 static int
1827 cadet_send_replay_response_cb (void *cls,
1828                                const struct GNUNET_HashCode *key_hash,
1829                                void *value)
1830 {
1831   struct Channel *chn = value;
1832   struct GNUNET_MessageHeader *msg = cls;
1833
1834   cadet_send_channel (chn, msg);
1835   return GNUNET_OK;
1836 }
1837
1838
1839 static int
1840 client_send_replay_response_cb (void *cls,
1841                                 const struct GNUNET_HashCode *key_hash,
1842                                 void *value)
1843 {
1844   struct GNUNET_SERVICE_Client *client = value;
1845   struct GNUNET_MessageHeader *msg = cls;
1846
1847   client_send (client, msg);
1848   return GNUNET_OK;
1849 }
1850
1851
1852 static int
1853 check_client_replay_response_end (void *cls,
1854                                   const struct MulticastReplayResponseMessage *res)
1855 {
1856   return GNUNET_OK;
1857 }
1858
1859
1860 /**
1861  * End of replay response from a client.
1862  */
1863 static void
1864 handle_client_replay_response_end (void *cls,
1865                                    const struct MulticastReplayResponseMessage *res)
1866 {
1867   struct Client *c = cls;
1868   struct GNUNET_SERVICE_Client *client = c->client;
1869   struct Group *grp = c->group;
1870
1871   if (NULL == grp)
1872   {
1873     GNUNET_break (0);
1874     GNUNET_SERVICE_client_drop (client);
1875     return;
1876   }
1877
1878   struct GNUNET_HashCode key_hash;
1879   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1880                    res->flags, &key_hash);
1881
1882   struct GNUNET_CONTAINER_MultiHashMap *
1883     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1884                                                               &grp->pub_key_hash);
1885   if (NULL != grp_replay_req_cadet)
1886   {
1887     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1888   }
1889   struct GNUNET_CONTAINER_MultiHashMap *
1890     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1891                                                                &grp->pub_key_hash);
1892   if (NULL != grp_replay_req_client)
1893   {
1894     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1895   }
1896   GNUNET_SERVICE_client_continue (client);
1897 }
1898
1899
1900 static int
1901 check_client_replay_response (void *cls,
1902                               const struct MulticastReplayResponseMessage *res)
1903 {
1904   const struct GNUNET_MessageHeader *msg = &res->header;
1905   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1906   {
1907     msg = GNUNET_MQ_extract_nested_mh (res);
1908     if (NULL == msg)
1909     {
1910       return GNUNET_SYSERR;
1911     }
1912   }
1913   return GNUNET_OK;
1914 }
1915
1916
1917 /**
1918  * Incoming replay response from a client.
1919  *
1920  * Respond with a multicast message on success, or otherwise with an error code.
1921  */
1922 static void
1923 handle_client_replay_response (void *cls,
1924                                const struct MulticastReplayResponseMessage *res)
1925 {
1926   struct Client *c = cls;
1927   struct GNUNET_SERVICE_Client *client = c->client;
1928   struct Group *grp = c->group;
1929
1930   if (NULL == grp)
1931   {
1932     GNUNET_break (0);
1933     GNUNET_SERVICE_client_drop (client);
1934     return;
1935   }
1936
1937   const struct GNUNET_MessageHeader *msg = &res->header;
1938   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1939   {
1940     msg = GNUNET_MQ_extract_nested_mh (res);
1941   }
1942
1943   struct GNUNET_HashCode key_hash;
1944   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1945                    res->flags, &key_hash);
1946
1947   struct GNUNET_CONTAINER_MultiHashMap *
1948     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1949                                                               &grp->pub_key_hash);
1950   if (NULL != grp_replay_req_cadet)
1951   {
1952     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1953                                                 cadet_send_replay_response_cb,
1954                                                 (void *) msg);
1955   }
1956   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1957   {
1958     struct GNUNET_CONTAINER_MultiHashMap *
1959       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1960                                                                  &grp->pub_key_hash);
1961     if (NULL != grp_replay_req_client)
1962     {
1963       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1964                                                   client_send_replay_response_cb,
1965                                                   (void *) msg);
1966     }
1967   }
1968   else
1969   {
1970     handle_client_replay_response_end (c, res);
1971     return;
1972   }
1973   GNUNET_SERVICE_client_continue (client);
1974 }
1975
1976
1977 /**
1978  * A new client connected.
1979  *
1980  * @param cls NULL
1981  * @param client client to add
1982  * @param mq message queue for @a client
1983  * @return @a client
1984  */
1985 static void *
1986 client_notify_connect (void *cls,
1987                        struct GNUNET_SERVICE_Client *client,
1988                        struct GNUNET_MQ_Handle *mq)
1989 {
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1991   /* FIXME: send connect ACK */
1992
1993   struct Client *c = GNUNET_new (struct Client);
1994   c->client = client;
1995
1996   return c;
1997 }
1998
1999
2000 /**
2001  * Called whenever a client is disconnected.
2002  * Frees our resources associated with that client.
2003  *
2004  * @param cls closure
2005  * @param client identification of the client
2006  * @param app_ctx must match @a client
2007  */
2008 static void
2009 client_notify_disconnect (void *cls,
2010                           struct GNUNET_SERVICE_Client *client,
2011                           void *app_ctx)
2012 {
2013   struct Client *c = app_ctx;
2014   struct Group *grp = c->group;
2015   GNUNET_free (c);
2016
2017   if (NULL == grp)
2018   {
2019     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2020                 "%p User context is NULL in client_disconnect()\n", grp);
2021     GNUNET_break (0);
2022     return;
2023   }
2024
2025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2026               "%p Client (%s) disconnected from group %s\n",
2027               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
2028               GNUNET_h2s (&grp->pub_key_hash));
2029
2030   struct ClientList *cl = grp->clients_head;
2031   while (NULL != cl)
2032   {
2033     if (cl->client == client)
2034     {
2035       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
2036       GNUNET_free (cl);
2037       break;
2038     }
2039     cl = cl->next;
2040   }
2041
2042   while (GNUNET_YES == replay_req_remove_client (grp, client));
2043
2044   if (NULL == grp->clients_head)
2045   { /* Last client disconnected. */
2046 #if FIXME
2047     if (NULL != grp->tmit_head)
2048     { /* Send pending messages via CADET before cleanup. */
2049       transmit_message (grp);
2050     }
2051     else
2052 #endif
2053     {
2054       cleanup_group (grp);
2055     }
2056   }
2057 }
2058
2059
2060 /**
2061  * Service started.
2062  *
2063  * @param cls closure
2064  * @param server the initialized server
2065  * @param cfg configuration to use
2066  */
2067 static void
2068 run (void *cls,
2069      const struct GNUNET_CONFIGURATION_Handle *c,
2070      struct GNUNET_SERVICE_Handle *svc)
2071 {
2072   cfg = c;
2073   service = svc;
2074   GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
2075
2076   stats = GNUNET_STATISTICS_create ("multicast", cfg);
2077   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2078   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2079   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2080   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2081   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2082   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2083   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2084
2085   cadet = GNUNET_CADET_connecT (cfg);
2086
2087   GNUNET_assert (NULL != cadet);
2088
2089   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2090                                  NULL);
2091 }
2092
2093
2094 /**
2095  * Define "main" method using service macro.
2096  */
2097 GNUNET_SERVICE_MAIN
2098 ("multicast",
2099  GNUNET_SERVICE_OPTION_NONE,
2100  run,
2101  client_notify_connect,
2102  client_notify_disconnect,
2103  NULL,
2104  GNUNET_MQ_hd_fixed_size (client_origin_start,
2105                           GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
2106                           struct MulticastOriginStartMessage,
2107                           NULL),
2108  GNUNET_MQ_hd_var_size (client_member_join,
2109                         GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN,
2110                         struct MulticastMemberJoinMessage,
2111                         NULL),
2112  GNUNET_MQ_hd_var_size (client_join_decision,
2113                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
2114                         struct MulticastJoinDecisionMessageHeader,
2115                         NULL),
2116  GNUNET_MQ_hd_var_size (client_multicast_message,
2117                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
2118                         struct GNUNET_MULTICAST_MessageHeader,
2119                         NULL),
2120  GNUNET_MQ_hd_var_size (client_multicast_request,
2121                         GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
2122                         struct GNUNET_MULTICAST_RequestHeader,
2123                         NULL),
2124  GNUNET_MQ_hd_fixed_size (client_replay_request,
2125                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
2126                           struct MulticastReplayRequestMessage,
2127                           NULL),
2128  GNUNET_MQ_hd_var_size (client_replay_response,
2129                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
2130                         struct MulticastReplayResponseMessage,
2131                         NULL),
2132  GNUNET_MQ_hd_var_size (client_replay_response_end,
2133                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END,
2134                         struct MulticastReplayResponseMessage,
2135                         NULL));
2136
2137 /* end of gnunet-service-multicast.c */