More API function tests...
[oweals/gnunet.git] / src / multicast / gnunet-service-multicast.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file multicast/gnunet-service-multicast.c
23  * @brief program that does multicast
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_signatures.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "multicast.h"
34
35 /**
36  * Handle to our current configuration.
37  */
38 static const struct GNUNET_CONFIGURATION_Handle *cfg;
39
40 /**
41  * Service handle.
42  */
43 static struct GNUNET_SERVICE_Handle *service;
44
45 /**
46  * CADET handle.
47  */
48 static struct GNUNET_CADET_Handle *cadet;
49
50 /**
51  * Identity of this peer.
52  */
53 static struct GNUNET_PeerIdentity this_peer;
54
55 /**
56  * Handle to the statistics service.
57  */
58 static struct GNUNET_STATISTICS_Handle *stats;
59
60 /**
61  * All connected origin clients.
62  * Group's pub_key_hash -> struct Origin * (uniq)
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *origins;
65
66 /**
67  * All connected member clients.
68  * Group's pub_key_hash -> struct Member * (multi)
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *members;
71
72 /**
73  * Connected member clients per group.
74  * Group's pub_key_hash -> Member's pub_key_hash (uniq) -> struct Member * (uniq)
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
77
78 /**
79  * Incoming CADET channels with connected children in the tree.
80  * Group's pub_key_hash -> struct Channel * (multi)
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
83
84 /**
85  * Outgoing CADET channels connecting to parents in the tree.
86  * Group's pub_key_hash -> struct Channel * (multi)
87  */
88 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
89
90 /**
91  * Incoming replay requests from CADET.
92  * Group's pub_key_hash ->
93  *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
94  */
95 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
96
97 /**
98  * Incoming replay requests from clients.
99  * Group's pub_key_hash ->
100  *   H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVICE_Client *
101  */
102 static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
103
104
105 /**
106  * Join status of a remote peer.
107  */
108 enum JoinStatus
109 {
110   JOIN_REFUSED  = -1,
111   JOIN_NOT_ASKED = 0,
112   JOIN_WAITING   = 1,
113   JOIN_ADMITTED  = 2,
114 };
115
116 enum ChannelDirection
117 {
118   DIR_INCOMING = 0,
119   DIR_OUTGOING = 1,
120 };
121
122
123 /**
124  * Context for a CADET channel.
125  */
126 struct Channel
127 {
128   /**
129    * Group the channel belongs to.
130    *
131    * Only set for outgoing channels.
132    */
133   struct Group *group;
134
135   /**
136    * CADET channel.
137    */
138   struct GNUNET_CADET_Channel *channel;
139
140   /**
141    * CADET transmission handle.
142    */
143   struct GNUNET_CADET_TransmitHandle *tmit_handle;
144
145   /**
146    * Public key of the target group.
147    */
148   struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
149
150   /**
151    * Hash of @a group_pub_key.
152    */
153   struct GNUNET_HashCode group_pub_hash;
154
155   /**
156    * Public key of the joining member.
157    */
158   struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
159
160   /**
161    * Remote peer identity.
162    */
163   struct GNUNET_PeerIdentity peer;
164
165   /**
166    * Is the remote peer admitted to the group?
167    * @see enum JoinStatus
168    */
169   int8_t join_status;
170
171   /**
172    * Number of messages waiting to be sent to CADET.
173    */
174   uint8_t msgs_pending;
175
176   /**
177    * Channel direction.
178    * @see enum ChannelDirection
179    */
180   uint8_t direction;
181 };
182
183
184 /**
185  * List of connected clients.
186  */
187 struct ClientList
188 {
189   struct ClientList *prev;
190   struct ClientList *next;
191   struct GNUNET_SERVICE_Client *client;
192 };
193
194
195 /**
196  * Client context for an origin or member.
197  */
198 struct Group
199 {
200   struct ClientList *clients_head;
201   struct ClientList *clients_tail;
202
203   /**
204    * Public key of the group.
205    */
206   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
207
208   /**
209    * Hash of @a pub_key.
210    */
211   struct GNUNET_HashCode pub_key_hash;
212
213   /**
214    * CADET port hash.
215    */
216   struct GNUNET_HashCode cadet_port_hash;
217
218   /**
219    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
220    */
221   uint8_t disconnected;
222
223   /**
224    * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
225    */
226   uint8_t is_origin;
227
228   union {
229     struct Origin *origin;
230     struct Member *member;
231   };
232 };
233
234
235 /**
236 * Client context for a group's origin.
237  */
238 struct Origin
239 {
240   struct Group group;
241
242   /**
243    * Private key of the group.
244    */
245   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
246
247   /**
248    * CADET port.
249    */
250   struct GNUNET_CADET_Port *cadet_port;
251
252   /**
253    * Last message fragment ID sent to the group.
254    */
255   uint64_t max_fragment_id;
256 };
257
258
259 /**
260  * Client context for a group member.
261  */
262 struct Member
263 {
264   struct Group group;
265
266   /**
267    * Private key of the member.
268    */
269   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
270
271   /**
272    * Public key of the member.
273    */
274   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
275
276   /**
277    * Hash of @a pub_key.
278    */
279   struct GNUNET_HashCode pub_key_hash;
280
281   /**
282    * Join request sent to the origin / members.
283    */
284   struct MulticastJoinRequestMessage *join_req;
285
286   /**
287    * Join decision sent in reply to our request.
288    *
289    * Only a positive decision is stored here, in case of a negative decision the
290    * client is disconnected.
291    */
292   struct MulticastJoinDecisionMessageHeader *join_dcsn;
293
294   /**
295    * CADET channel to the origin.
296    */
297   struct Channel *origin_channel;
298
299   /**
300    * Peer identity of origin.
301    */
302   struct GNUNET_PeerIdentity origin;
303
304   /**
305    * Peer identity of relays (other members to connect).
306    */
307   struct GNUNET_PeerIdentity *relays;
308
309   /**
310    * Last request fragment ID sent to the origin.
311    */
312   uint64_t max_fragment_id;
313
314   /**
315    * Number of @a relays.
316    */
317   uint32_t relay_count;
318 };
319
320
321 /**
322  * Client context.
323  */
324 struct Client {
325   struct GNUNET_SERVICE_Client *client;
326   struct Group *group;
327 };
328
329
330 struct ReplayRequestKey
331 {
332   uint64_t fragment_id;
333   uint64_t message_id;
334   uint64_t fragment_offset;
335   uint64_t flags;
336 };
337
338
339 /**
340  * Task run during shutdown.
341  *
342  * @param cls unused
343  */
344 static void
345 shutdown_task (void *cls)
346 {
347   if (NULL != cadet)
348   {
349     GNUNET_CADET_disconnect (cadet);
350     cadet = NULL;
351   }
352   if (NULL != stats)
353   {
354     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
355     stats = NULL;
356   }
357   /* FIXME: do more clean up here */
358 }
359
360
361 /**
362  * Clean up origin data structures after a client disconnected.
363  */
364 static void
365 cleanup_origin (struct Origin *orig)
366 {
367   struct Group *grp = &orig->group;
368   GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
369   if (NULL != orig->cadet_port)
370   {
371     GNUNET_CADET_close_port (orig->cadet_port);
372     orig->cadet_port = NULL;
373   }
374   GNUNET_free (orig);
375 }
376
377
378 /**
379  * Clean up member data structures after a client disconnected.
380  */
381 static void
382 cleanup_member (struct Member *mem)
383 {
384   struct Group *grp = &mem->group;
385   struct GNUNET_CONTAINER_MultiHashMap *
386     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
387                                                  &grp->pub_key_hash);
388   GNUNET_assert (NULL != grp_mem);
389   GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
390
391   if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
392   {
393     GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
394                                           grp_mem);
395     GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
396   }
397   if (NULL != mem->join_dcsn)
398   {
399     GNUNET_free (mem->join_dcsn);
400     mem->join_dcsn = NULL;
401   }
402   GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
403   GNUNET_free (mem);
404 }
405
406
407 /**
408  * Clean up group data structures after a client disconnected.
409  */
410 static void
411 cleanup_group (struct Group *grp)
412 {
413   (GNUNET_YES == grp->is_origin)
414     ? cleanup_origin (grp->origin)
415     : cleanup_member (grp->member);
416 }
417
418
419 void
420 replay_key_hash (uint64_t fragment_id, uint64_t message_id,
421                  uint64_t fragment_offset, uint64_t flags,
422                  struct GNUNET_HashCode *key_hash)
423 {
424   struct ReplayRequestKey key = {
425     .fragment_id = fragment_id,
426     .message_id = message_id,
427     .fragment_offset = fragment_offset,
428     .flags = flags,
429   };
430   GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
431 }
432
433
434 /**
435  * Remove channel from replay request hashmap.
436  *
437  * @param chn
438  *        Channel to remove.
439  *
440  * @return #GNUNET_YES if there are more entries to process,
441  *         #GNUNET_NO when reached end of hashmap.
442  */
443 static int
444 replay_req_remove_cadet (struct Channel *chn)
445 {
446   struct GNUNET_CONTAINER_MultiHashMap *
447     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
448                                                         &chn->group->pub_key_hash);
449   if (NULL == grp_replay_req)
450     return GNUNET_NO;
451
452   struct GNUNET_CONTAINER_MultiHashMapIterator *
453     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
454   struct GNUNET_HashCode key;
455   const struct Channel *c;
456   while (GNUNET_YES
457          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
458                                                          (const void **) &c))
459   {
460     if (c == chn)
461     {
462       GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
463       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
464       return GNUNET_YES;
465     }
466   }
467   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
468   return GNUNET_NO;
469 }
470
471
472 /**
473  * Remove client from replay request hashmap.
474  *
475  * @param client
476  *        Client to remove.
477  *
478  * @return #GNUNET_YES if there are more entries to process,
479  *         #GNUNET_NO when reached end of hashmap.
480  */
481 static int
482 replay_req_remove_client (struct Group *grp, struct GNUNET_SERVICE_Client *client)
483 {
484   struct GNUNET_CONTAINER_MultiHashMap *
485     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
486                                                         &grp->pub_key_hash);
487   if (NULL == grp_replay_req)
488     return GNUNET_NO;
489
490   struct GNUNET_CONTAINER_MultiHashMapIterator *
491     it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
492   struct GNUNET_HashCode key;
493   const struct GNUNET_SERVICE_Client *c;
494   while (GNUNET_YES
495          == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
496                                                          (const void **) &c))
497   {
498     if (c == client)
499     {
500       GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
501       GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
502       return GNUNET_YES;
503     }
504   }
505   GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
506   return GNUNET_NO;
507 }
508
509
510 /**
511  * Send message to a client.
512  */
513 static void
514 client_send (struct GNUNET_SERVICE_Client *client,
515              const struct GNUNET_MessageHeader *msg)
516 {
517   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
518               "%p Sending message to client.\n", client);
519
520   struct GNUNET_MQ_Envelope *
521     env = GNUNET_MQ_msg_copy (msg);
522
523   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
524                   env);
525 }
526
527
528 /**
529  * Send message to all clients connected to the group.
530  */
531 static void
532 client_send_group (const struct Group *grp,
533                    const struct GNUNET_MessageHeader *msg)
534 {
535   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
536               "%p Sending message to all clients of the group.\n", grp);
537
538   struct ClientList *cl = grp->clients_head;
539   while (NULL != cl)
540   {
541     struct GNUNET_MQ_Envelope *
542       env = GNUNET_MQ_msg_copy (msg);
543
544     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
545                     env);
546     cl = cl->next;
547   }
548 }
549
550
551 /**
552  * Iterator callback for sending a message to origin clients.
553  */
554 static int
555 client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
556                        void *origin)
557 {
558   const struct GNUNET_MessageHeader *msg = cls;
559   struct Member *orig = origin;
560
561   client_send_group (&orig->group, msg);
562   return GNUNET_YES;
563 }
564
565
566 /**
567  * Iterator callback for sending a message to member clients.
568  */
569 static int
570 client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
571                        void *member)
572 {
573   const struct GNUNET_MessageHeader *msg = cls;
574   struct Member *mem = member;
575
576   if (NULL != mem->join_dcsn)
577   { /* Only send message to admitted members */
578     client_send_group (&mem->group, msg);
579   }
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Send message to all origin and member clients connected to the group.
586  *
587  * @param pub_key_hash
588  *        H(key_pub) of the group.
589  * @param msg
590  *        Message to send.
591  */
592 static int
593 client_send_all (struct GNUNET_HashCode *pub_key_hash,
594                  const struct GNUNET_MessageHeader *msg)
595 {
596   int n = 0;
597   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
598                                                    client_send_origin_cb,
599                                                    (void *) msg);
600   n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
601                                                    client_send_member_cb,
602                                                    (void *) msg);
603   return n;
604 }
605
606
607 /**
608  * Send message to a random origin client or a random member client.
609  *
610  * @param grp  The group to send @a msg to.
611  * @param msg  Message to send.
612  */
613 static int
614 client_send_random (struct GNUNET_HashCode *pub_key_hash,
615                     const struct GNUNET_MessageHeader *msg)
616 {
617   int n = 0;
618   n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
619                                                  (void *) msg);
620   if (n <= 0)
621     n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
622                                                    (void *) msg);
623   return n;
624 }
625
626
627 /**
628  * Send message to all origin clients connected to the group.
629  *
630  * @param pub_key_hash
631  *        H(key_pub) of the group.
632  * @param msg
633  *        Message to send.
634  */
635 static int
636 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
637                     const struct GNUNET_MessageHeader *msg)
638 {
639   int n = 0;
640   n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
641                                                    client_send_origin_cb,
642                                                    (void *) msg);
643   return n;
644 }
645
646
647 /**
648  * Send fragment acknowledgement to all clients of the channel.
649  *
650  * @param pub_key_hash
651  *        H(key_pub) of the group.
652  */
653 static void
654 client_send_ack (struct GNUNET_HashCode *pub_key_hash)
655 {
656   static struct GNUNET_MessageHeader *msg = NULL;
657   if (NULL == msg)
658   {
659     msg = GNUNET_malloc (sizeof (*msg));
660     msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
661     msg->size = htons (sizeof (*msg));
662   }
663   client_send_all (pub_key_hash, msg);
664 }
665
666
667 struct CadetTransmitClosure
668 {
669   struct Channel *chn;
670   const struct GNUNET_MessageHeader *msg;
671 };
672
673
674 /**
675  * CADET is ready to transmit a message.
676  */
677 size_t
678 cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
679 {
680   if (0 == buf_size)
681   {
682     /* FIXME: connection closed */
683     return 0;
684   }
685   struct CadetTransmitClosure *tcls = cls;
686   struct Channel *chn = tcls->chn;
687   uint16_t msg_size = ntohs (tcls->msg->size);
688   GNUNET_assert (msg_size <= buf_size);
689   GNUNET_memcpy (buf, tcls->msg, msg_size);
690   GNUNET_free (tcls);
691
692   if (0 == chn->msgs_pending)
693   {
694     GNUNET_break (0);
695   }
696   else if (0 == --chn->msgs_pending)
697   {
698     client_send_ack (&chn->group_pub_hash);
699   }
700   return msg_size;
701 }
702
703
704 /**
705  * Send a message to a CADET channel.
706  *
707  * @param chn  Channel.
708  * @param msg  Message.
709  */
710 static void
711 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
712 {
713   uint16_t msg_size = ntohs (msg->size);
714   struct GNUNET_MessageHeader *msg_copy = GNUNET_malloc (msg_size);
715   GNUNET_memcpy (msg_copy, msg, msg_size);
716
717   struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
718   tcls->chn = chn;
719   tcls->msg = msg_copy;
720
721   chn->msgs_pending++;
722   chn->tmit_handle
723     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
724                                           GNUNET_TIME_UNIT_FOREVER_REL,
725                                           msg_size,
726                                           &cadet_notify_transmit_ready,
727                                           tcls);
728   GNUNET_assert (NULL != chn->tmit_handle);
729 }
730
731
732 /**
733  * Create new outgoing CADET channel.
734  *
735  * @param peer
736  *        Peer to connect to.
737  * @param group_pub_key
738  *        Public key of group the channel belongs to.
739  * @param group_pub_hash
740  *        Hash of @a group_pub_key.
741  *
742  * @return Channel.
743  */
744 static struct Channel *
745 cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
746 {
747   struct Channel *chn = GNUNET_malloc (sizeof (*chn));
748   chn->group = grp;
749   chn->group_pub_key = grp->pub_key;
750   chn->group_pub_hash = grp->pub_key_hash;
751   chn->peer = *peer;
752   chn->direction = DIR_OUTGOING;
753   chn->join_status = JOIN_WAITING;
754   chn->channel = GNUNET_CADET_channel_create (cadet, chn, &chn->peer,
755                                               &grp->cadet_port_hash,
756                                               GNUNET_CADET_OPTION_RELIABLE);
757   GNUNET_CONTAINER_multihashmap_put (channels_out, &chn->group_pub_hash, chn,
758                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
759   return chn;
760 }
761
762
763 /**
764  * Create CADET channel and send a join request.
765  */
766 static void
767 cadet_send_join_request (struct Member *mem)
768 {
769   mem->origin_channel = cadet_channel_create (&mem->group, &mem->origin);
770   cadet_send_channel (mem->origin_channel, &mem->join_req->header);
771
772   uint32_t i;
773   for (i = 0; i < mem->relay_count; i++)
774   {
775     struct Channel *
776       chn = cadet_channel_create (&mem->group, &mem->relays[i]);
777     cadet_send_channel (chn, &mem->join_req->header);
778   }
779 }
780
781
782 static int
783 cadet_send_join_decision_cb (void *cls,
784                              const struct GNUNET_HashCode *group_pub_hash,
785                              void *channel)
786 {
787   const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
788   struct Channel *chn = channel;
789
790   const struct MulticastJoinDecisionMessage *dcsn = 
791     (struct MulticastJoinDecisionMessage *) &hdcsn[1];
792
793   if (0 == memcmp (&hdcsn->member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key))
794       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
795   {
796     if (GNUNET_YES == ntohl (dcsn->is_admitted))
797     {
798       chn->join_status = JOIN_ADMITTED;
799     }
800     else
801     {
802       chn->join_status = JOIN_REFUSED;
803     }
804
805     cadet_send_channel (chn, &hdcsn->header);
806     return GNUNET_NO;
807   }
808   return GNUNET_YES;
809 }
810
811
812 /**
813  * Send join decision to a remote peer.
814  */
815 static void
816 cadet_send_join_decision (struct Group *grp,
817                           const struct MulticastJoinDecisionMessageHeader *hdcsn)
818 {
819   GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, &grp->pub_key_hash,
820                                               &cadet_send_join_decision_cb,
821                                               (void *) hdcsn);
822 }
823
824
825 /**
826  * Iterator callback for sending a message to origin clients.
827  */
828 static int
829 cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
830                void *channel)
831 {
832   const struct GNUNET_MessageHeader *msg = cls;
833   struct Channel *chn = channel;
834   if (JOIN_ADMITTED == chn->join_status)
835     cadet_send_channel (chn, msg);
836   return GNUNET_YES;
837 }
838
839
840 /**
841  * Send message to all connected children.
842  */
843 static int
844 cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
845                      const struct GNUNET_MessageHeader *msg)
846 {
847   int n = 0;
848   if (channels_in != NULL)
849     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
850                                                      cadet_send_cb, (void *) msg);
851   return n;
852 }
853
854
855 #if 0       // unused as yet
856 /**
857  * Send message to all connected parents.
858  */
859 static int
860 cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
861                     const struct GNUNET_MessageHeader *msg)
862 {
863   int n = 0;
864   if (channels_in != NULL)
865     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
866                                                      cadet_send_cb, (void *) msg);
867   return n;
868 }
869 #endif
870
871
872 /**
873  * New incoming CADET channel.
874  */
875 static void *
876 cadet_notify_channel_new (void *cls,
877                           struct GNUNET_CADET_Channel *channel,
878                           const struct GNUNET_PeerIdentity *initiator,
879                           const struct GNUNET_HashCode *port,
880                           enum GNUNET_CADET_ChannelOption options)
881 {
882   return NULL;
883 }
884
885
886 /**
887  * CADET channel is being destroyed.
888  */
889 static void
890 cadet_notify_channel_end (void *cls,
891                           const struct GNUNET_CADET_Channel *channel,
892                           void *ctx)
893 {
894   if (NULL == ctx)
895     return;
896
897   struct Channel *chn = ctx;
898   if (NULL != chn->group)
899   {
900     if (GNUNET_NO == chn->group->is_origin)
901     {
902       struct Member *mem = (struct Member *) chn->group;
903       if (chn == mem->origin_channel)
904         mem->origin_channel = NULL;
905     }
906   }
907
908   while (GNUNET_YES == replay_req_remove_cadet (chn));
909
910   GNUNET_free (chn);
911 }
912
913
914 static void
915 group_set_cadet_port_hash (struct Group *grp)
916 {
917   struct CadetPort {
918     struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
919     uint32_t app_type;
920   } port = {
921     grp->pub_key,
922     GNUNET_APPLICATION_TYPE_MULTICAST,
923   };
924   GNUNET_CRYPTO_hash (&port, sizeof (port), &grp->cadet_port_hash);
925 }
926
927
928 /**
929  * Handle a connecting client starting an origin.
930  */
931 static void
932 handle_client_origin_start (void *cls,
933                             const struct MulticastOriginStartMessage *msg)
934 {
935   struct Client *c = cls;
936   struct GNUNET_SERVICE_Client *client = c->client;
937
938   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
939   struct GNUNET_HashCode pub_key_hash;
940
941   GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
942   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
943
944   struct Origin *
945     orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
946   struct Group *grp;
947
948   if (NULL == orig)
949   {
950     orig = GNUNET_new (struct Origin);
951     orig->priv_key = msg->group_key;
952     orig->max_fragment_id = GNUNET_ntohll (msg->max_fragment_id);
953
954     grp = c->group = &orig->group;
955     grp->origin = orig;
956     grp->is_origin = GNUNET_YES;
957     grp->pub_key = pub_key;
958     grp->pub_key_hash = pub_key_hash;
959
960     GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
961                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
962
963     group_set_cadet_port_hash (grp);
964     orig->cadet_port = GNUNET_CADET_open_port (cadet, &grp->cadet_port_hash,
965                                                cadet_notify_channel_new, NULL);
966   }
967   else
968   {
969     grp = &orig->group;
970   }
971
972   struct ClientList *cl = GNUNET_new (struct ClientList);
973   cl->client = client;
974   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
975
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977               "%p Client connected as origin to group %s.\n",
978               orig, GNUNET_h2s (&grp->pub_key_hash));
979   GNUNET_SERVICE_client_continue (client);
980 }
981
982
983 static int
984 check_client_member_join (void *cls,
985                           const struct MulticastMemberJoinMessage *msg)
986 {
987   uint16_t msg_size = ntohs (msg->header.size);
988   struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
989   uint32_t relay_count = ntohl (msg->relay_count);
990   uint16_t relay_size = relay_count * sizeof (*relays);
991   struct GNUNET_MessageHeader *join_msg = NULL;
992   uint16_t join_msg_size = 0;
993   if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
994       <= msg_size)
995   {
996     join_msg = (struct GNUNET_MessageHeader *)
997       (((char *) &msg[1]) + relay_size);
998     join_msg_size = ntohs (join_msg->size);
999   }
1000   return
1001     msg_size == (sizeof (*msg) + relay_size + join_msg_size)
1002     ? GNUNET_OK
1003     : GNUNET_SYSERR;
1004 }
1005
1006
1007 /**
1008  * Handle a connecting client joining a group.
1009  */
1010 static void
1011 handle_client_member_join (void *cls,
1012                            const struct MulticastMemberJoinMessage *msg)
1013 {
1014   struct Client *c = cls;
1015   struct GNUNET_SERVICE_Client *client = c->client;
1016
1017   uint16_t msg_size = ntohs (msg->header.size);
1018
1019   struct GNUNET_CRYPTO_EcdsaPublicKey mem_pub_key;
1020   struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
1021
1022   GNUNET_CRYPTO_ecdsa_key_get_public (&msg->member_key, &mem_pub_key);
1023   GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
1024   GNUNET_CRYPTO_hash (&msg->group_pub_key, sizeof (msg->group_pub_key), &pub_key_hash);
1025
1026   struct GNUNET_CONTAINER_MultiHashMap *
1027     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
1028   struct Member *mem = NULL;
1029   struct Group *grp;
1030
1031   if (NULL != grp_mem)
1032   {
1033     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
1034   }
1035   if (NULL == mem)
1036   {
1037     mem = GNUNET_new (struct Member);
1038     mem->origin = msg->origin;
1039     mem->priv_key = msg->member_key;
1040     mem->pub_key = mem_pub_key;
1041     mem->pub_key_hash = mem_pub_key_hash;
1042     mem->max_fragment_id = 0; // FIXME
1043
1044     grp = c->group = &mem->group;
1045     grp->member = mem;
1046     grp->is_origin = GNUNET_NO;
1047     grp->pub_key = msg->group_pub_key;
1048     grp->pub_key_hash = pub_key_hash;
1049     group_set_cadet_port_hash (grp);
1050
1051     if (NULL == grp_mem)
1052     {
1053       grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1054       GNUNET_CONTAINER_multihashmap_put (group_members, &grp->pub_key_hash, grp_mem,
1055                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1056     }
1057     GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
1058                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1059     GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1060                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1061   }
1062   else
1063   {
1064     grp = &mem->group;
1065   }
1066
1067   struct ClientList *cl = GNUNET_new (struct ClientList);
1068   cl->client = client;
1069   GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
1070
1071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072               "%p Client connected to group %s..\n",
1073               mem, GNUNET_h2s (&grp->pub_key_hash));
1074   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
1075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076               "%p ..as member %s (%s).\n",
1077               mem, GNUNET_h2s (&mem->pub_key_hash), str);
1078   GNUNET_free (str);
1079
1080   if (NULL != mem->join_dcsn)
1081   { /* Already got a join decision, send it to client. */
1082     struct GNUNET_MQ_Envelope *
1083       env = GNUNET_MQ_msg_copy (&mem->join_dcsn->header);
1084
1085     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
1086                     env);
1087   }
1088   else
1089   { /* First client of the group, send join request. */
1090     struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
1091     uint32_t relay_count = ntohl (msg->relay_count);
1092     uint16_t relay_size = relay_count * sizeof (*relays);
1093     struct GNUNET_MessageHeader *join_msg = NULL;
1094     uint16_t join_msg_size = 0;
1095     if (sizeof (*msg) + relay_size + sizeof (struct GNUNET_MessageHeader)
1096         <= msg_size)
1097     {
1098       join_msg = (struct GNUNET_MessageHeader *)
1099         (((char *) &msg[1]) + relay_size);
1100       join_msg_size = ntohs (join_msg->size);
1101     }
1102
1103     uint16_t req_msg_size = sizeof (struct MulticastJoinRequestMessage) + join_msg_size;
1104     struct MulticastJoinRequestMessage *
1105       req = GNUNET_malloc (req_msg_size);
1106     req->header.size = htons (req_msg_size);
1107     req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
1108     req->group_pub_key = grp->pub_key;
1109     req->peer = this_peer;
1110     GNUNET_CRYPTO_ecdsa_key_get_public (&mem->priv_key, &req->member_pub_key);
1111     if (0 < join_msg_size)
1112       GNUNET_memcpy (&req[1], join_msg, join_msg_size);
1113
1114     req->member_pub_key = mem->pub_key;
1115     req->purpose.size = htonl (req_msg_size
1116                                - sizeof (req->header)
1117                                - sizeof (req->reserved)
1118                                - sizeof (req->signature));
1119     req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1120
1121     if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &req->purpose,
1122                                                &req->signature))
1123     {
1124       /* FIXME: handle error */
1125       GNUNET_assert (0);
1126     }
1127
1128     if (NULL != mem->join_req)
1129       GNUNET_free (mem->join_req);
1130     mem->join_req = req;
1131
1132     if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
1133     { /* No local origins, send to remote origin */
1134       cadet_send_join_request (mem);
1135     }
1136   }
1137   GNUNET_SERVICE_client_continue (client);
1138 }
1139
1140
1141 static void
1142 client_send_join_decision (struct Member *mem,
1143                            const struct MulticastJoinDecisionMessageHeader *hdcsn)
1144 {
1145   client_send_group (&mem->group, &hdcsn->header);
1146
1147   const struct MulticastJoinDecisionMessage *
1148     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1149   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1150   { /* Member admitted, store join_decision. */
1151     uint16_t dcsn_size = ntohs (dcsn->header.size);
1152     mem->join_dcsn = GNUNET_malloc (dcsn_size);
1153     GNUNET_memcpy (mem->join_dcsn, dcsn, dcsn_size);
1154   }
1155   else
1156   { /* Refused entry, but replay would be still possible for past members. */
1157   }
1158 }
1159
1160
1161 static int
1162 check_client_join_decision (void *cls,
1163                             const struct MulticastJoinDecisionMessageHeader *hdcsn)
1164 {
1165   return GNUNET_OK;
1166 }
1167
1168
1169 /**
1170  * Join decision from client.
1171  */
1172 static void
1173 handle_client_join_decision (void *cls,
1174                              const struct MulticastJoinDecisionMessageHeader *hdcsn)
1175 {
1176   struct Client *c = cls;
1177   struct GNUNET_SERVICE_Client *client = c->client;
1178   struct Group *grp = c->group;
1179
1180   if (NULL == grp)
1181   {
1182     GNUNET_break (0);
1183     GNUNET_SERVICE_client_drop (client);
1184     return;
1185   }
1186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187               "%p Got join decision from client for group %s..\n",
1188               grp, GNUNET_h2s (&grp->pub_key_hash));
1189
1190   struct GNUNET_CONTAINER_MultiHashMap *
1191     grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
1192                                                  &grp->pub_key_hash);
1193   struct Member *mem = NULL;
1194   if (NULL != grp_mem)
1195   {
1196     struct GNUNET_HashCode member_key_hash;
1197     GNUNET_CRYPTO_hash (&hdcsn->member_pub_key, sizeof (hdcsn->member_pub_key),
1198                         &member_key_hash);
1199     mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
1200     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201                 "%p ..and member %s: %p\n",
1202                 grp, GNUNET_h2s (&member_key_hash), mem);
1203   }
1204   if (NULL != mem)
1205   { /* Found local member */
1206     client_send_join_decision (mem, hdcsn);
1207   }
1208   else
1209   { /* Look for remote member */
1210     cadet_send_join_decision (grp, hdcsn);
1211   }
1212   GNUNET_SERVICE_client_continue (client);
1213 }
1214
1215
1216 static int
1217 check_client_multicast_message (void *cls,
1218                                 const struct GNUNET_MULTICAST_MessageHeader *msg)
1219 {
1220   return GNUNET_OK;
1221 }
1222
1223
1224 /**
1225  * Incoming message from a client.
1226  */
1227 static void
1228 handle_client_multicast_message (void *cls,
1229                                  const struct GNUNET_MULTICAST_MessageHeader *msg)
1230 {
1231   struct Client *c = cls;
1232   struct GNUNET_SERVICE_Client *client = c->client;
1233   struct Group *grp = c->group;
1234
1235   if (NULL == grp)
1236   {
1237     GNUNET_break (0);
1238     GNUNET_SERVICE_client_drop (client);
1239     return;
1240   }
1241   GNUNET_assert (GNUNET_YES == grp->is_origin);
1242   struct Origin *orig = grp->origin;
1243
1244   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1245   struct GNUNET_MULTICAST_MessageHeader *
1246     out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
1247   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
1248   out->purpose.size = htonl (ntohs (out->header.size)
1249                              - sizeof (out->header)
1250                              - sizeof (out->hop_counter)
1251                              - sizeof (out->signature));
1252   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
1253
1254   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &out->purpose,
1255                                              &out->signature))
1256   {
1257     GNUNET_assert (0);
1258   }
1259
1260   client_send_all (&grp->pub_key_hash, &out->header);
1261   if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
1262   {
1263     client_send_ack (&grp->pub_key_hash);
1264   }
1265   GNUNET_free (out);
1266
1267   GNUNET_SERVICE_client_continue (client);
1268 }
1269
1270
1271 static int
1272 check_client_multicast_request (void *cls,
1273                                 const struct GNUNET_MULTICAST_RequestHeader *req)
1274 {
1275   return GNUNET_OK;
1276 }
1277
1278
1279 /**
1280  * Incoming request from a client.
1281  */
1282 static void
1283 handle_client_multicast_request (void *cls,
1284                                  const struct GNUNET_MULTICAST_RequestHeader *req)
1285 {
1286   struct Client *c = cls;
1287   struct GNUNET_SERVICE_Client *client = c->client;
1288   struct Group *grp = c->group;
1289
1290   if (NULL == grp)
1291   {
1292     GNUNET_break (0);
1293     GNUNET_SERVICE_client_drop (client);
1294     return;
1295   }
1296   GNUNET_assert (GNUNET_NO == grp->is_origin);
1297   struct Member *mem = grp->member;
1298
1299   /* FIXME: yucky, should use separate message structs for P2P and CS! */
1300   struct GNUNET_MULTICAST_RequestHeader *
1301     out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (&req->header);
1302   out->member_pub_key = mem->pub_key;
1303   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1304   out->purpose.size = htonl (ntohs (out->header.size)
1305                              - sizeof (out->header)
1306                              - sizeof (out->member_pub_key)
1307                              - sizeof (out->signature));
1308   out->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
1309
1310   if (GNUNET_OK != GNUNET_CRYPTO_ecdsa_sign (&mem->priv_key, &out->purpose,
1311                                              &out->signature))
1312   {
1313     GNUNET_assert (0);
1314   }
1315
1316   uint8_t send_ack = GNUNET_YES;
1317   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
1318   { /* No local origins, send to remote origin */
1319     if (NULL != mem->origin_channel)
1320     {
1321       cadet_send_channel (mem->origin_channel, &out->header);
1322       send_ack = GNUNET_NO;
1323     }
1324     else
1325     {
1326       /* FIXME: not yet connected to origin */
1327       GNUNET_SERVICE_client_drop (client);
1328       GNUNET_free (out);
1329       return;
1330     }
1331   }
1332   if (GNUNET_YES == send_ack)
1333   {
1334     client_send_ack (&grp->pub_key_hash);
1335   }
1336   GNUNET_free (out);
1337   GNUNET_SERVICE_client_continue (client);
1338 }
1339
1340
1341 /**
1342  * Incoming replay request from a client.
1343  */
1344 static void
1345 handle_client_replay_request (void *cls,
1346                               const struct MulticastReplayRequestMessage *rep)
1347 {
1348   struct Client *c = cls;
1349   struct GNUNET_SERVICE_Client *client = c->client;
1350   struct Group *grp = c->group;
1351
1352   if (NULL == grp)
1353   {
1354     GNUNET_break (0);
1355     GNUNET_SERVICE_client_drop (client);
1356     return;
1357   }
1358   GNUNET_assert (GNUNET_NO == grp->is_origin);
1359   struct Member *mem = grp->member;
1360
1361   struct GNUNET_CONTAINER_MultiHashMap *
1362     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1363                                                         &grp->pub_key_hash);
1364   if (NULL == grp_replay_req)
1365   {
1366     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1367     GNUNET_CONTAINER_multihashmap_put (replay_req_client,
1368                                        &grp->pub_key_hash, grp_replay_req,
1369                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1370   }
1371
1372   struct GNUNET_HashCode key_hash;
1373   replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
1374                    rep->flags, &key_hash);
1375   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
1376                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1377
1378   if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
1379   { /* No local origin, replay from remote members / origin. */
1380     if (NULL != mem->origin_channel)
1381     {
1382       cadet_send_channel (mem->origin_channel, &rep->header);
1383     }
1384     else
1385     {
1386       /* FIXME: not yet connected to origin */
1387       GNUNET_SERVICE_client_drop (client);
1388       return;
1389     }
1390   }
1391   GNUNET_SERVICE_client_continue (client);
1392 }
1393
1394
1395 static int
1396 cadet_send_replay_response_cb (void *cls,
1397                                const struct GNUNET_HashCode *key_hash,
1398                                void *value)
1399 {
1400   struct Channel *chn = value;
1401   struct GNUNET_MessageHeader *msg = cls;
1402
1403   cadet_send_channel (chn, msg);
1404   return GNUNET_OK;
1405 }
1406
1407
1408 static int
1409 client_send_replay_response_cb (void *cls,
1410                                 const struct GNUNET_HashCode *key_hash,
1411                                 void *value)
1412 {
1413   struct GNUNET_SERVICE_Client *client = value;
1414   struct GNUNET_MessageHeader *msg = cls;
1415
1416   client_send (client, msg);
1417   return GNUNET_OK;
1418 }
1419
1420
1421 static int
1422 check_client_replay_response_end (void *cls,
1423                                   const struct MulticastReplayResponseMessage *res)
1424 {
1425   return GNUNET_OK;
1426 }
1427
1428
1429 /**
1430  * End of replay response from a client.
1431  */
1432 static void
1433 handle_client_replay_response_end (void *cls,
1434                                    const struct MulticastReplayResponseMessage *res)
1435 {
1436   struct Client *c = cls;
1437   struct GNUNET_SERVICE_Client *client = c->client;
1438   struct Group *grp = c->group;
1439
1440   if (NULL == grp)
1441   {
1442     GNUNET_break (0);
1443     GNUNET_SERVICE_client_drop (client);
1444     return;
1445   }
1446
1447   struct GNUNET_HashCode key_hash;
1448   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1449                    res->flags, &key_hash);
1450
1451   struct GNUNET_CONTAINER_MultiHashMap *
1452     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1453                                                               &grp->pub_key_hash);
1454   if (NULL != grp_replay_req_cadet)
1455   {
1456     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
1457   }
1458   struct GNUNET_CONTAINER_MultiHashMap *
1459     grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1460                                                                &grp->pub_key_hash);
1461   if (NULL != grp_replay_req_client)
1462   {
1463     GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
1464   }
1465   GNUNET_SERVICE_client_continue (client);
1466 }
1467
1468
1469 static int
1470 check_client_replay_response (void *cls,
1471                               const struct MulticastReplayResponseMessage *res)
1472 {
1473   const struct GNUNET_MessageHeader *msg = &res->header;
1474   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1475   {
1476     msg = GNUNET_MQ_extract_nested_mh (res);
1477     if (NULL == msg)
1478     {
1479       return GNUNET_SYSERR;
1480     }
1481   }
1482   return GNUNET_OK;
1483 }
1484
1485
1486 /**
1487  * Incoming replay response from a client.
1488  *
1489  * Respond with a multicast message on success, or otherwise with an error code.
1490  */
1491 static void
1492 handle_client_replay_response (void *cls,
1493                                const struct MulticastReplayResponseMessage *res)
1494 {
1495   struct Client *c = cls;
1496   struct GNUNET_SERVICE_Client *client = c->client;
1497   struct Group *grp = c->group;
1498
1499   if (NULL == grp)
1500   {
1501     GNUNET_break (0);
1502     GNUNET_SERVICE_client_drop (client);
1503     return;
1504   }
1505
1506   const struct GNUNET_MessageHeader *msg = &res->header;
1507   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1508   {
1509     msg = GNUNET_MQ_extract_nested_mh (res);
1510   }
1511
1512   struct GNUNET_HashCode key_hash;
1513   replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
1514                    res->flags, &key_hash);
1515
1516   struct GNUNET_CONTAINER_MultiHashMap *
1517     grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1518                                                               &grp->pub_key_hash);
1519   if (NULL != grp_replay_req_cadet)
1520   {
1521     GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
1522                                                 cadet_send_replay_response_cb,
1523                                                 (void *) msg);
1524   }
1525   if (GNUNET_MULTICAST_REC_OK == res->error_code)
1526   {
1527     struct GNUNET_CONTAINER_MultiHashMap *
1528       grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
1529                                                                  &grp->pub_key_hash);
1530     if (NULL != grp_replay_req_client)
1531     {
1532       GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
1533                                                   client_send_replay_response_cb,
1534                                                   (void *) msg);
1535     }
1536   }
1537   else
1538   {
1539     handle_client_replay_response_end (c, res);
1540     return;
1541   }
1542   GNUNET_SERVICE_client_continue (client);
1543 }
1544
1545
1546 /**
1547  * Incoming join request message from CADET.
1548  */
1549 int
1550 cadet_recv_join_request (void *cls,
1551                          struct GNUNET_CADET_Channel *channel,
1552                          void **ctx,
1553                          const struct GNUNET_MessageHeader *m)
1554 {
1555   GNUNET_CADET_receive_done(channel);
1556   const struct MulticastJoinRequestMessage *
1557     req = (const struct MulticastJoinRequestMessage *) m;
1558   uint16_t size = ntohs (m->size);
1559   if (size < sizeof (*req))
1560   {
1561     GNUNET_break_op (0);
1562     return GNUNET_SYSERR;
1563   }
1564   if (NULL != *ctx)
1565   {
1566     GNUNET_break_op (0);
1567     return GNUNET_SYSERR;
1568   }
1569   if (ntohl (req->purpose.size) != (size
1570                                     - sizeof (req->header)
1571                                     - sizeof (req->reserved)
1572                                     - sizeof (req->signature)))
1573   {
1574     GNUNET_break_op (0);
1575     return GNUNET_SYSERR;
1576   }
1577   if (GNUNET_OK !=
1578       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1579                                   &req->purpose, &req->signature,
1580                                   &req->member_pub_key))
1581   {
1582     GNUNET_break_op (0);
1583     return GNUNET_SYSERR;
1584   }
1585
1586   struct GNUNET_HashCode group_pub_hash;
1587   GNUNET_CRYPTO_hash (&req->group_pub_key, sizeof (req->group_pub_key), &group_pub_hash);
1588
1589   struct Channel *chn = GNUNET_malloc (sizeof *chn);
1590   chn->channel = channel;
1591   chn->group_pub_key = req->group_pub_key;
1592   chn->group_pub_hash = group_pub_hash;
1593   chn->member_pub_key = req->member_pub_key;
1594   chn->peer = req->peer;
1595   chn->join_status = JOIN_WAITING;
1596   GNUNET_CONTAINER_multihashmap_put (channels_in, &chn->group_pub_hash, chn,
1597                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1598   *ctx = chn;
1599
1600   client_send_all (&group_pub_hash, m);
1601   return GNUNET_OK;
1602 }
1603
1604
1605 /**
1606  * Incoming join decision message from CADET.
1607  */
1608 int
1609 cadet_recv_join_decision (void *cls,
1610                           struct GNUNET_CADET_Channel *channel,
1611                           void **ctx,
1612                           const struct GNUNET_MessageHeader *m)
1613 {
1614   GNUNET_CADET_receive_done (channel);
1615   const struct MulticastJoinDecisionMessageHeader *
1616     hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
1617   const struct MulticastJoinDecisionMessage *
1618     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
1619   uint16_t size = ntohs (m->size);
1620   if (size < sizeof (struct MulticastJoinDecisionMessageHeader) +
1621              sizeof (struct MulticastJoinDecisionMessage))
1622   {
1623     GNUNET_break_op (0);
1624     return GNUNET_SYSERR;
1625   }
1626   struct Channel *chn = *ctx;
1627   if (NULL == chn)
1628   {
1629     GNUNET_break_op (0);
1630     return GNUNET_SYSERR;
1631   }
1632   if (NULL == chn->group || GNUNET_NO != chn->group->is_origin)
1633   {
1634     GNUNET_break_op (0);
1635     return GNUNET_SYSERR;
1636   }
1637   switch (chn->join_status)
1638   {
1639   case JOIN_REFUSED:
1640     return GNUNET_SYSERR;
1641
1642   case JOIN_ADMITTED:
1643     return GNUNET_OK;
1644
1645   case JOIN_NOT_ASKED:
1646   case JOIN_WAITING:
1647     break;
1648   }
1649
1650   // FIXME: do we need to copy chn->peer or compare it with hdcsn->peer?
1651   struct Member *mem = (struct Member *) chn->group;
1652   client_send_join_decision (mem, hdcsn);
1653   if (GNUNET_YES == ntohl (dcsn->is_admitted))
1654   {
1655     chn->join_status = JOIN_ADMITTED;
1656     return GNUNET_OK;
1657   }
1658   else
1659   {
1660     chn->join_status = JOIN_REFUSED;
1661     return GNUNET_SYSERR;
1662   }
1663 }
1664
1665 /**
1666  * Incoming multicast message from CADET.
1667  */
1668 int
1669 cadet_recv_message (void *cls,
1670                     struct GNUNET_CADET_Channel *channel,
1671                     void **ctx,
1672                     const struct GNUNET_MessageHeader *m)
1673 {
1674   GNUNET_CADET_receive_done(channel);
1675   const struct GNUNET_MULTICAST_MessageHeader *
1676     msg = (const struct GNUNET_MULTICAST_MessageHeader *) m;
1677   uint16_t size = ntohs (m->size);
1678   if (size < sizeof (*msg))
1679   {
1680     GNUNET_break_op (0);
1681     return GNUNET_SYSERR;
1682   }
1683   struct Channel *chn = *ctx;
1684   if (NULL == chn)
1685   {
1686     GNUNET_break_op (0);
1687     return GNUNET_SYSERR;
1688   }
1689   if (ntohl (msg->purpose.size) != (size
1690                                     - sizeof (msg->header)
1691                                     - sizeof (msg->hop_counter)
1692                                     - sizeof (msg->signature)))
1693   {
1694     GNUNET_break_op (0);
1695     return GNUNET_SYSERR;
1696   }
1697   if (GNUNET_OK !=
1698       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE,
1699                                   &msg->purpose, &msg->signature,
1700                                   &chn->group_pub_key))
1701   {
1702     GNUNET_break_op (0);
1703     return GNUNET_SYSERR;
1704   }
1705
1706   client_send_all (&chn->group_pub_hash, m);
1707   return GNUNET_OK;
1708 }
1709
1710
1711 /**
1712  * Incoming multicast request message from CADET.
1713  */
1714 int
1715 cadet_recv_request (void *cls,
1716                     struct GNUNET_CADET_Channel *channel,
1717                     void **ctx,
1718                     const struct GNUNET_MessageHeader *m)
1719 {
1720   GNUNET_CADET_receive_done(channel);
1721   const struct GNUNET_MULTICAST_RequestHeader *
1722     req = (const struct GNUNET_MULTICAST_RequestHeader *) m;
1723   uint16_t size = ntohs (m->size);
1724   if (size < sizeof (*req))
1725   {
1726     GNUNET_break_op (0);
1727     return GNUNET_SYSERR;
1728   }
1729   struct Channel *chn = *ctx;
1730   if (NULL == chn)
1731   {
1732     GNUNET_break_op (0);
1733     return GNUNET_SYSERR;
1734   }
1735   if (ntohl (req->purpose.size) != (size
1736                                     - sizeof (req->header)
1737                                     - sizeof (req->member_pub_key)
1738                                     - sizeof (req->signature)))
1739   {
1740     GNUNET_break_op (0);
1741     return GNUNET_SYSERR;
1742   }
1743   if (GNUNET_OK !=
1744       GNUNET_CRYPTO_ecdsa_verify (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST,
1745                                   &req->purpose, &req->signature,
1746                                   &req->member_pub_key))
1747   {
1748     GNUNET_break_op (0);
1749     return GNUNET_SYSERR;
1750   }
1751
1752   client_send_origin (&chn->group_pub_hash, m);
1753   return GNUNET_OK;
1754 }
1755
1756
1757 /**
1758  * Incoming multicast replay request from CADET.
1759  */
1760 int
1761 cadet_recv_replay_request (void *cls,
1762                            struct GNUNET_CADET_Channel *channel,
1763                            void **ctx,
1764                            const struct GNUNET_MessageHeader *m)
1765 {
1766   GNUNET_CADET_receive_done(channel);
1767   struct MulticastReplayRequestMessage rep;
1768   uint16_t size = ntohs (m->size);
1769   if (size < sizeof (rep))
1770   {
1771     GNUNET_break_op (0);
1772     return GNUNET_SYSERR;
1773   }
1774   struct Channel *chn = *ctx;
1775
1776   GNUNET_memcpy (&rep, m, sizeof (rep));
1777   GNUNET_memcpy (&rep.member_pub_key, &chn->member_pub_key, sizeof (chn->member_pub_key));
1778
1779   struct GNUNET_CONTAINER_MultiHashMap *
1780     grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
1781                                                         &chn->group->pub_key_hash);
1782   if (NULL == grp_replay_req)
1783   {
1784     grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1785     GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
1786                                        &chn->group->pub_key_hash, grp_replay_req,
1787                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1788   }
1789   struct GNUNET_HashCode key_hash;
1790   replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
1791                    rep.flags, &key_hash);
1792   GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
1793                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1794
1795   client_send_random (&chn->group_pub_hash, &rep.header);
1796   return GNUNET_OK;
1797 }
1798
1799
1800 /**
1801  * Incoming multicast replay response from CADET.
1802  */
1803 int
1804 cadet_recv_replay_response (void *cls,
1805                             struct GNUNET_CADET_Channel *channel,
1806                             void **ctx,
1807                             const struct GNUNET_MessageHeader *m)
1808 {
1809   GNUNET_CADET_receive_done(channel);
1810   //struct Channel *chn = *ctx;
1811
1812   /* @todo FIXME: got replay error response, send request to other members */
1813
1814   return GNUNET_OK;
1815 }
1816
1817
1818 /**
1819  * A new client connected.
1820  *
1821  * @param cls NULL
1822  * @param client client to add
1823  * @param mq message queue for @a client
1824  * @return @a client
1825  */
1826 static void *
1827 client_notify_connect (void *cls,
1828                        struct GNUNET_SERVICE_Client *client,
1829                        struct GNUNET_MQ_Handle *mq)
1830 {
1831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
1832   /* FIXME: send connect ACK */
1833
1834   struct Client *c = GNUNET_new (struct Client);
1835   c->client = client;
1836
1837   return c;
1838 }
1839
1840
1841 /**
1842  * Called whenever a client is disconnected.
1843  * Frees our resources associated with that client.
1844  *
1845  * @param cls closure
1846  * @param client identification of the client
1847  * @param app_ctx must match @a client
1848  */
1849 static void
1850 client_notify_disconnect (void *cls,
1851                           struct GNUNET_SERVICE_Client *client,
1852                           void *app_ctx)
1853 {
1854   struct Client *c = app_ctx;
1855   struct Group *grp = c->group;
1856   GNUNET_free (c);
1857
1858   if (NULL == grp)
1859   {
1860     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1861                 "%p User context is NULL in client_disconnect()\n", grp);
1862     GNUNET_break (0);
1863     return;
1864   }
1865
1866   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1867               "%p Client (%s) disconnected from group %s\n",
1868               grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
1869               GNUNET_h2s (&grp->pub_key_hash));
1870
1871   struct ClientList *cl = grp->clients_head;
1872   while (NULL != cl)
1873   {
1874     if (cl->client == client)
1875     {
1876       GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
1877       GNUNET_free (cl);
1878       break;
1879     }
1880     cl = cl->next;
1881   }
1882
1883   while (GNUNET_YES == replay_req_remove_client (grp, client));
1884
1885   if (NULL == grp->clients_head)
1886   { /* Last client disconnected. */
1887 #if FIXME
1888     if (NULL != grp->tmit_head)
1889     { /* Send pending messages via CADET before cleanup. */
1890       transmit_message (grp);
1891     }
1892     else
1893 #endif
1894     {
1895       cleanup_group (grp);
1896     }
1897   }
1898 }
1899
1900
1901 /**
1902  * Message handlers for CADET.
1903  */
1904 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1905   { cadet_recv_join_request,
1906     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
1907
1908   { cadet_recv_join_decision,
1909     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
1910
1911   { cadet_recv_message,
1912     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
1913
1914   { cadet_recv_request,
1915     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
1916
1917   { cadet_recv_replay_request,
1918     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
1919
1920   { cadet_recv_replay_response,
1921     GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
1922
1923   { NULL, 0, 0 }
1924 };
1925
1926
1927 /**
1928  * Service started.
1929  *
1930  * @param cls closure
1931  * @param server the initialized server
1932  * @param cfg configuration to use
1933  */
1934 static void
1935 run (void *cls,
1936      const struct GNUNET_CONFIGURATION_Handle *c,
1937      struct GNUNET_SERVICE_Handle *svc)
1938 {
1939   cfg = c;
1940   service = svc;
1941   GNUNET_CRYPTO_get_peer_identity (cfg, &this_peer);
1942
1943   stats = GNUNET_STATISTICS_create ("multicast", cfg);
1944   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1945   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1946   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1947   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1948   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1949   replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1950   replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1951
1952   cadet = GNUNET_CADET_connect (cfg, NULL,
1953                                 cadet_notify_channel_end,
1954                                 cadet_handlers);
1955   GNUNET_assert (NULL != cadet);
1956
1957   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1958                                  NULL);
1959 }
1960
1961
1962 /**
1963  * Define "main" method using service macro.
1964  */
1965 GNUNET_SERVICE_MAIN
1966 ("multicast",
1967  GNUNET_SERVICE_OPTION_NONE,
1968  run,
1969  client_notify_connect,
1970  client_notify_disconnect,
1971  NULL,
1972  GNUNET_MQ_hd_fixed_size (client_origin_start,
1973                           GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
1974                           struct MulticastOriginStartMessage,
1975                           NULL),
1976  GNUNET_MQ_hd_var_size (client_member_join,
1977                         GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN,
1978                         struct MulticastMemberJoinMessage,
1979                         NULL),
1980  GNUNET_MQ_hd_var_size (client_join_decision,
1981                         GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1982                         struct MulticastJoinDecisionMessageHeader,
1983                         NULL),
1984  GNUNET_MQ_hd_var_size (client_multicast_message,
1985                         GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1986                         struct GNUNET_MULTICAST_MessageHeader,
1987                         NULL),
1988  GNUNET_MQ_hd_var_size (client_multicast_request,
1989                         GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
1990                         struct GNUNET_MULTICAST_RequestHeader,
1991                         NULL),
1992  GNUNET_MQ_hd_fixed_size (client_replay_request,
1993                           GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1994                           struct MulticastReplayRequestMessage,
1995                           NULL),
1996  GNUNET_MQ_hd_var_size (client_replay_response,
1997                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1998                         struct MulticastReplayResponseMessage,
1999                         NULL),
2000  GNUNET_MQ_hd_var_size (client_replay_response_end,
2001                         GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END,
2002                         struct MulticastReplayResponseMessage,
2003                         NULL));
2004
2005 /* end of gnunet-service-multicast.c */