psyc, multicast: join decision
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "psyc.h"
38
39
40 /**
41  * Handle to our current configuration.
42  */
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44
45 /**
46  * Handle to the statistics service.
47  */
48 static struct GNUNET_STATISTICS_Handle *stats;
49
50 /**
51  * Notification context, simplifies client broadcasts.
52  */
53 static struct GNUNET_SERVER_NotificationContext *nc;
54
55 /**
56  * Handle to the PSYCstore.
57  */
58 static struct GNUNET_PSYCSTORE_Handle *store;
59
60 /**
61  * All connected masters.
62  * Channel's pub_key_hash -> struct Master
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *masters;
65
66 /**
67  * All connected slaves.
68  * Channel's pub_key_hash -> struct Slave
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
71
72 /**
73  * Connected slaves per channel.
74  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
77
78
79 /**
80  * Message in the transmission queue.
81  */
82 struct TransmitMessage
83 {
84   struct TransmitMessage *prev;
85   struct TransmitMessage *next;
86
87   struct GNUNET_SERVER_Client *client;
88
89   /**
90    * ID assigned to the message.
91    */
92   uint64_t id;
93
94   /**
95    * Size of @a buf
96    */
97   uint16_t size;
98
99   /**
100    * @see enum MessageState
101    */
102   uint8_t state;
103
104   /* Followed by message */
105 };
106
107
108 /**
109  * Cache for received message fragments.
110  * Message fragments are only sent to clients after all modifiers arrived.
111  *
112  * chan_key -> MultiHashMap chan_msgs
113  */
114 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
115
116
117 /**
118  * Entry in the chan_msgs hashmap of @a recv_cache:
119  * fragment_id -> RecvCacheEntry
120  */
121 struct RecvCacheEntry
122 {
123   struct GNUNET_MULTICAST_MessageHeader *mmsg;
124   uint16_t ref_count;
125 };
126
127
128 /**
129  * Entry in the @a recv_frags hash map of a @a Channel.
130  * message_id -> FragmentQueue
131  */
132 struct FragmentQueue
133 {
134   /**
135    * Fragment IDs stored in @a recv_cache.
136    */
137   struct GNUNET_CONTAINER_Heap *fragments;
138
139   /**
140    * Total size of received fragments.
141    */
142   uint64_t size;
143
144   /**
145    * Total size of received header fragments (METHOD & MODIFIERs)
146    */
147   uint64_t header_size;
148
149   /**
150    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
151    */
152   uint64_t state_delta;
153
154   /**
155    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
156    */
157   uint32_t flags;
158
159   /**
160    * Receive state of message.
161    *
162    * @see MessageFragmentState
163    */
164   uint8_t state;
165
166   /**
167    * Is the message queued for delivery to the client?
168    * i.e. added to the recv_msgs queue
169    */
170   uint8_t queued;
171 };
172
173
174 /**
175  * List of connected clients.
176  */
177 struct ClientList
178 {
179   struct ClientList *prev;
180   struct ClientList *next;
181   struct GNUNET_SERVER_Client *client;
182 };
183
184
185 /**
186  * Common part of the client context for both a channel master and slave.
187  */
188 struct Channel
189 {
190   struct ClientList *clients_head;
191   struct ClientList *clients_tail;
192
193   struct TransmitMessage *tmit_head;
194   struct TransmitMessage *tmit_tail;
195
196   /**
197    * Current PSYCstore operation.
198    */
199   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
200
201   /**
202    * Received fragments not yet sent to the client.
203    * message_id -> FragmentQueue
204    */
205   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
206
207   /**
208    * Received message IDs not yet sent to the client.
209    */
210   struct GNUNET_CONTAINER_Heap *recv_msgs;
211
212   /**
213    * FIXME: needed?
214    */
215   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
216
217   /**
218    * Public key of the channel.
219    */
220   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
221
222   /**
223    * Hash of @a pub_key.
224    */
225   struct GNUNET_HashCode pub_key_hash;
226
227   /**
228    * Last message ID sent to the client.
229    * 0 if there is no such message.
230    */
231   uint64_t max_message_id;
232
233   /**
234    * ID of the last stateful message, where the state operations has been
235    * processed and saved to PSYCstore and which has been sent to the client.
236    * 0 if there is no such message.
237    */
238   uint64_t max_state_message_id;
239
240   /**
241    * Expected value size for the modifier being received from the PSYC service.
242    */
243   uint32_t tmit_mod_value_size_expected;
244
245   /**
246    * Actual value size for the modifier being received from the PSYC service.
247    */
248   uint32_t tmit_mod_value_size;
249
250   /**
251    * @see enum MessageState
252    */
253   uint8_t tmit_state;
254
255   /**
256    * FIXME: needed?
257    */
258   uint8_t in_transmit;
259
260   /**
261    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
262    */
263   uint8_t is_master;
264
265   /**
266    * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
267    */
268   uint8_t ready;
269
270   /**
271    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
272    */
273   uint8_t disconnected;
274 };
275
276
277 /**
278  * Client context for a channel master.
279  */
280 struct Master
281 {
282   /**
283    * Channel struct common for Master and Slave
284    */
285   struct Channel ch;
286
287   /**
288    * Private key of the channel.
289    */
290   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
291
292   /**
293    * Handle for the multicast origin.
294    */
295   struct GNUNET_MULTICAST_Origin *origin;
296
297   /**
298    * Transmit handle for multicast.
299    */
300   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
301
302   /**
303    * Incoming join requests from multicast.
304    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
305    */
306   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
307
308   /**
309    * Last message ID transmitted to this channel.
310    *
311    * Incremented before sending a message, thus the message_id in messages sent
312    * starts from 1.
313    */
314   uint64_t max_message_id;
315
316   /**
317    * ID of the last message with state operations transmitted to the channel.
318    * 0 if there is no such message.
319    */
320   uint64_t max_state_message_id;
321
322   /**
323    * Maximum group generation transmitted to the channel.
324    */
325   uint64_t max_group_generation;
326
327   /**
328    * @see enum GNUNET_PSYC_Policy
329    */
330   uint32_t policy;
331 };
332
333
334 /**
335  * Client context for a channel slave.
336  */
337 struct Slave
338 {
339   /**
340    * Channel struct common for Master and Slave
341    */
342   struct Channel ch;
343
344   /**
345    * Private key of the slave.
346    */
347   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
348
349   /**
350    * Public key of the slave.
351    */
352   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
353
354   /**
355    * Hash of @a pub_key.
356    */
357   struct GNUNET_HashCode pub_key_hash;
358
359   /**
360    * Handle for the multicast member.
361    */
362   struct GNUNET_MULTICAST_Member *member;
363
364   /**
365    * Transmit handle for multicast.
366    */
367   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
368
369   /**
370    * Peer identity of the origin.
371    */
372   struct GNUNET_PeerIdentity origin;
373
374   /**
375    * Number of items in @a relays.
376    */
377   uint32_t relay_count;
378
379   /**
380    * Relays that multicast can use to connect.
381    */
382   struct GNUNET_PeerIdentity *relays;
383
384   /**
385    * Join request to be transmitted to the master on join.
386    */
387   struct GNUNET_MessageHeader *join_req;
388
389   /**
390    * Maximum request ID for this channel.
391    */
392   uint64_t max_request_id;
393 };
394
395
396 static inline void
397 transmit_message (struct Channel *ch);
398
399
400 /**
401  * Task run during shutdown.
402  *
403  * @param cls unused
404  * @param tc unused
405  */
406 static void
407 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
408 {
409   if (NULL != nc)
410   {
411     GNUNET_SERVER_notification_context_destroy (nc);
412     nc = NULL;
413   }
414   if (NULL != stats)
415   {
416     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
417     stats = NULL;
418   }
419 }
420
421
422 /**
423  * Clean up master data structures after a client disconnected.
424  */
425 static void
426 cleanup_master (struct Master *mst)
427 {
428   struct Channel *ch = &mst->ch;
429
430   if (NULL != mst->origin)
431     GNUNET_MULTICAST_origin_stop (mst->origin);
432   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
433   GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
434 }
435
436
437 /**
438  * Clean up slave data structures after a client disconnected.
439  */
440 static void
441 cleanup_slave (struct Slave *slv)
442 {
443   struct Channel *ch = &slv->ch;
444   struct GNUNET_CONTAINER_MultiHashMap *
445     ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
446                                                 &ch->pub_key_hash);
447   GNUNET_assert (NULL != ch_slv);
448   GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
449
450   if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
451   {
452     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
453                                           ch_slv);
454     GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
455   }
456   GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
457
458   if (NULL != slv->join_req)
459     GNUNET_free (slv->join_req);
460   if (NULL != slv->relays)
461     GNUNET_free (slv->relays);
462   if (NULL != slv->member)
463     GNUNET_MULTICAST_member_part (slv->member);
464   GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
465 }
466
467
468 /**
469  * Clean up channel data structures after a client disconnected.
470  */
471 static void
472 cleanup_channel (struct Channel *ch)
473 {
474   /* FIXME: fragment_cache_clear */
475
476   if (NULL != ch->store_op)
477     GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
478
479   (GNUNET_YES == ch->is_master)
480     ? cleanup_master ((struct Master *) ch)
481     : cleanup_slave ((struct Slave *) ch);
482   GNUNET_free (ch);
483 }
484
485
486 /**
487  * Called whenever a client is disconnected.
488  * Frees our resources associated with that client.
489  *
490  * @param cls Closure.
491  * @param client Identification of the client.
492  */
493 static void
494 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
495 {
496   if (NULL == client)
497     return;
498
499   struct Channel *
500     ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502               "%p Client (%s) disconnected from channel %s\n",
503               ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
504               GNUNET_h2s (&ch->pub_key_hash));
505
506   if (NULL == ch)
507   {
508     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
509                 "%p User context is NULL in client_disconnect()\n", ch);
510     GNUNET_break (0);
511     return;
512   }
513
514   struct ClientList *cl = ch->clients_head;
515   while (NULL != cl)
516   {
517     if (cl->client == client)
518     {
519       GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
520       GNUNET_free (cl);
521       break;
522     }
523     cl = cl->next;
524   }
525
526   if (NULL == ch->clients_head)
527   { /* Last client disconnected. */
528     if (NULL != ch->tmit_head)
529     { /* Send pending messages to multicast before cleanup. */
530       transmit_message (ch);
531     }
532     else
533     {
534       cleanup_channel (ch);
535     }
536   }
537 }
538
539
540 /**
541  * Send message to all clients connected to the channel.
542  */
543 static void
544 msg_to_clients (const struct Channel *ch,
545                 const struct GNUNET_MessageHeader *msg)
546 {
547   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
548               "%p Sending message to clients.\n", ch);
549
550   struct ClientList *cl = ch->clients_head;
551   while (NULL != cl)
552   {
553     GNUNET_SERVER_notification_context_add (nc, cl->client);
554     GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
555     cl = cl->next;
556   }
557 }
558
559
560 /**
561  * Closure for join_mem_test_cb()
562  */
563 struct JoinMemTestClosure
564 {
565   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
566   struct Channel *ch;
567   struct GNUNET_MULTICAST_JoinHandle *jh;
568   struct MasterJoinRequest *master_join_req;
569 };
570
571
572 /**
573  * Membership test result callback used for join requests.m
574  */
575 static void
576 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
577 {
578   struct JoinMemTestClosure *jcls = cls;
579
580   if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
581   { /* Pass on join request to client if this is a master channel */
582     struct Master *mst = (struct Master *) jcls->ch;
583     struct GNUNET_HashCode slave_key_hash;
584     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
585                         &slave_key_hash);
586     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
587                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
588     msg_to_clients (jcls->ch,
589                     (struct GNUNET_MessageHeader *) jcls->master_join_req);
590   }
591   else
592   {
593     // FIXME: add relays
594     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
595   }
596   GNUNET_free (jcls->master_join_req);
597   GNUNET_free (jcls);
598 }
599
600
601 /**
602  * Incoming join request from multicast.
603  */
604 static void
605 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
606          const struct GNUNET_MessageHeader *join_msg,
607          struct GNUNET_MULTICAST_JoinHandle *jh)
608 {
609   struct Channel *ch = cls;
610   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
611
612   uint16_t join_msg_size = 0;
613   if (NULL != join_msg)
614   {
615     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
616     {
617       join_msg_size = ntohs (join_msg->size);
618     }
619     else
620     {
621       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
622                   "%p Got join message with invalid type %u.\n",
623                   ch, ntohs (join_msg->type));
624     }
625   }
626
627   struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
628   req->header.size = htons (sizeof (*req) + join_msg_size);
629   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
630   req->slave_key = *slave_key;
631   memcpy (&req[1], join_msg, join_msg_size);
632
633   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
634   jcls->slave_key = *slave_key;
635   jcls->ch = ch;
636   jcls->jh = jh;
637   jcls->master_join_req = req;
638
639   GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
640                                     ch->max_message_id, 0,
641                                     &join_mem_test_cb, jcls);
642 }
643
644
645 static void
646 membership_test_cb (void *cls,
647                     const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
648                     uint64_t message_id, uint64_t group_generation,
649                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
650 {
651
652 }
653
654
655 static void
656 replay_fragment_cb (void *cls,
657                     const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
658                     uint64_t fragment_id, uint64_t flags,
659                     struct GNUNET_MULTICAST_ReplayHandle *rh)
660
661 {
662
663 }
664
665
666 static void
667 replay_message_cb (void *cls,
668                    const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
669                    uint64_t message_id,
670                    uint64_t fragment_offset,
671                    uint64_t flags,
672                    struct GNUNET_MULTICAST_ReplayHandle *rh)
673 {
674
675 }
676
677
678 static void
679 fragment_store_result (void *cls, int64_t result, const char *err_msg)
680 {
681   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682               "fragment_store() returned %l (%s)\n", result, err_msg);
683 }
684
685
686 /**
687  * Convert an uint64_t in network byte order to a HashCode
688  * that can be used as key in a MultiHashMap
689  */
690 static inline void
691 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
692 {
693   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
694   /* TODO: use built-in byte swap functions if available */
695
696   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
697   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
698
699   *key = (struct GNUNET_HashCode) {{ 0 }};
700   *((uint64_t *) key)
701     = (n << 32) | (n >> 32);
702 }
703
704
705 /**
706  * Convert an uint64_t in host byte order to a HashCode
707  * that can be used as key in a MultiHashMap
708  */
709 static inline void
710 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
711 {
712 #if __BYTE_ORDER == __BIG_ENDIAN
713   hash_key_from_nll (key, n);
714 #elif __BYTE_ORDER == __LITTLE_ENDIAN
715   *key = (struct GNUNET_HashCode) {{ 0 }};
716   *((uint64_t *) key) = n;
717 #else
718   #error byteorder undefined
719 #endif
720 }
721
722
723 /**
724  * Send multicast message to all clients connected to the channel.
725  */
726 static void
727 mmsg_to_clients (struct Channel *ch,
728                  const struct GNUNET_MULTICAST_MessageHeader *mmsg)
729 {
730   uint16_t size = ntohs (mmsg->header.size);
731   struct GNUNET_PSYC_MessageHeader *pmsg;
732   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
733
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "%p Sending message to client. "
736               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
737               ch, GNUNET_ntohll (mmsg->fragment_id),
738               GNUNET_ntohll (mmsg->message_id));
739
740   pmsg = GNUNET_malloc (psize);
741   pmsg->header.size = htons (psize);
742   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
743   pmsg->message_id = mmsg->message_id;
744
745   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
746   msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
747   GNUNET_free (pmsg);
748 }
749
750
751 /**
752  * Insert a multicast message fragment into the queue belonging to the message.
753  *
754  * @param ch           Channel.
755  * @param mmsg         Multicast message fragment.
756  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
757  * @param first_ptype  First PSYC message part type in @a mmsg.
758  * @param last_ptype   Last PSYC message part type in @a mmsg.
759  */
760 static void
761 fragment_queue_insert (struct Channel *ch,
762                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
763                        uint16_t first_ptype, uint16_t last_ptype)
764 {
765   const uint16_t size = ntohs (mmsg->header.size);
766   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
767   struct GNUNET_CONTAINER_MultiHashMap
768     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
769                                                     &ch->pub_key_hash);
770
771   struct GNUNET_HashCode msg_id_hash;
772   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
773
774   struct FragmentQueue
775     *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
776
777   if (NULL == fragq)
778   {
779     fragq = GNUNET_new (struct FragmentQueue);
780     fragq->state = MSG_FRAG_STATE_HEADER;
781     fragq->fragments
782       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
783
784     GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
785                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
786
787     if (NULL == chan_msgs)
788     {
789       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
790       GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
791                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
792     }
793   }
794
795   struct GNUNET_HashCode frag_id_hash;
796   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
797   struct RecvCacheEntry
798     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
799   if (NULL == cache_entry)
800   {
801     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802                 "%p Adding message fragment to cache. "
803                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
804                 "header_size: %" PRIu64 " + %u).\n",
805                 ch, GNUNET_ntohll (mmsg->message_id),
806                 GNUNET_ntohll (mmsg->fragment_id),
807                 fragq->header_size, size);
808     cache_entry = GNUNET_new (struct RecvCacheEntry);
809     cache_entry->ref_count = 1;
810     cache_entry->mmsg = GNUNET_malloc (size);
811     memcpy (cache_entry->mmsg, mmsg, size);
812     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
813                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
814   }
815   else
816   {
817     cache_entry->ref_count++;
818     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819                 "%p Message fragment is already in cache. "
820                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
821                 ", ref_count: %u\n",
822                 ch, GNUNET_ntohll (mmsg->message_id),
823                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
824   }
825
826   if (MSG_FRAG_STATE_HEADER == fragq->state)
827   {
828     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
829     {
830       struct GNUNET_PSYC_MessageMethod *
831         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
832       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
833       fragq->flags = ntohl (pmeth->flags);
834     }
835
836     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
837     {
838       fragq->header_size += size;
839     }
840     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
841              || frag_offset == fragq->header_size)
842     { /* header is now complete */
843       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
844                   "%p Header of message %" PRIu64 " is complete.\n",
845                   ch, GNUNET_ntohll (mmsg->message_id));
846
847       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
848                   "%p Adding message %" PRIu64 " to queue.\n",
849                   ch, GNUNET_ntohll (mmsg->message_id));
850       fragq->state = MSG_FRAG_STATE_DATA;
851     }
852     else
853     {
854       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
855                   "%p Header of message %" PRIu64 " is NOT complete yet: "
856                   "%" PRIu64 " != %" PRIu64 "\n",
857                   ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
858                   fragq->header_size);
859     }
860   }
861
862   switch (last_ptype)
863   {
864   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
865     if (frag_offset == fragq->size)
866       fragq->state = MSG_FRAG_STATE_END;
867     else
868       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
869                   "%p Message %" PRIu64 " is NOT complete yet: "
870                   "%" PRIu64 " != %" PRIu64 "\n",
871                   ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
872                   fragq->size);
873     break;
874
875   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
876     /* Drop message without delivering to client if it's a single fragment */
877     fragq->state =
878       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
879       ? MSG_FRAG_STATE_DROP
880       : MSG_FRAG_STATE_CANCEL;
881   }
882
883   switch (fragq->state)
884   {
885   case MSG_FRAG_STATE_DATA:
886   case MSG_FRAG_STATE_END:
887   case MSG_FRAG_STATE_CANCEL:
888     if (GNUNET_NO == fragq->queued)
889     {
890       GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
891                                     GNUNET_ntohll (mmsg->message_id));
892       fragq->queued = GNUNET_YES;
893     }
894   }
895
896   fragq->size += size;
897   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
898                                 GNUNET_ntohll (mmsg->fragment_id));
899 }
900
901
902 /**
903  * Run fragment queue of a message.
904  *
905  * Send fragments of a message in order to client, after all modifiers arrived
906  * from multicast.
907  *
908  * @param ch      Channel.
909  * @param msg_id  ID of the message @a fragq belongs to.
910  * @param fragq   Fragment queue of the message.
911  * @param drop    Drop message without delivering to client?
912  *                #GNUNET_YES or #GNUNET_NO.
913  */
914 static void
915 fragment_queue_run (struct Channel *ch, uint64_t msg_id,
916                     struct FragmentQueue *fragq, uint8_t drop)
917 {
918   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
919               "%p Running message fragment queue for message %" PRIu64
920               " (state: %u).\n",
921               ch, msg_id, fragq->state);
922
923   struct GNUNET_CONTAINER_MultiHashMap
924     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
925                                                     &ch->pub_key_hash);
926   GNUNET_assert (NULL != chan_msgs);
927   uint64_t frag_id;
928
929   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
930                                                     &frag_id))
931   {
932     struct GNUNET_HashCode frag_id_hash;
933     hash_key_from_hll (&frag_id_hash, frag_id);
934     struct RecvCacheEntry *cache_entry
935       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
936     if (cache_entry != NULL)
937     {
938       if (GNUNET_NO == drop)
939       {
940         mmsg_to_clients (ch, cache_entry->mmsg);
941       }
942       if (cache_entry->ref_count <= 1)
943       {
944         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
945                                               cache_entry);
946         GNUNET_free (cache_entry->mmsg);
947         GNUNET_free (cache_entry);
948       }
949       else
950       {
951         cache_entry->ref_count--;
952       }
953     }
954 #if CACHE_AGING_IMPLEMENTED
955     else if (GNUNET_NO == drop)
956     {
957       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
958     }
959 #endif
960
961     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
962   }
963
964   if (MSG_FRAG_STATE_END <= fragq->state)
965   {
966     struct GNUNET_HashCode msg_id_hash;
967     hash_key_from_nll (&msg_id_hash, msg_id);
968
969     GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
970     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
971     GNUNET_free (fragq);
972   }
973   else
974   {
975     fragq->queued = GNUNET_NO;
976   }
977 }
978
979
980 /**
981  * Run message queue.
982  *
983  * Send messages in queue to client in order after a message has arrived from
984  * multicast, according to the following:
985  * - A message is only sent if all of its modifiers arrived.
986  * - A stateful message is only sent if the previous stateful message
987  *   has already been delivered to the client.
988  *
989  * @param ch  Channel.
990  * @return Number of messages removed from queue and sent to client.
991  */
992 static uint64_t
993 message_queue_run (struct Channel *ch)
994 {
995   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
996               "%p Running message queue.\n", ch);
997   uint64_t n = 0;
998   uint64_t msg_id;
999   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
1000                                                     &msg_id))
1001   {
1002     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1003                 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
1004     struct GNUNET_HashCode msg_id_hash;
1005     hash_key_from_hll (&msg_id_hash, msg_id);
1006
1007     struct FragmentQueue *
1008       fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
1009
1010     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1011     {
1012       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1013                   "%p No fragq (%p) or header not complete.\n",
1014                   ch, fragq);
1015       break;
1016     }
1017
1018     if (MSG_FRAG_STATE_HEADER == fragq->state)
1019     {
1020       /* Check if there's a missing message before the current one */
1021       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1022       {
1023         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1024             && msg_id - 1 != ch->max_message_id)
1025         {
1026           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1027                       "%p Out of order message. "
1028                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1029                       ch, msg_id, ch->max_message_id);
1030           break;
1031         }
1032       }
1033       else
1034       {
1035         if (msg_id - fragq->state_delta != ch->max_state_message_id)
1036         {
1037           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1038                       "%p Out of order stateful message. "
1039                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1040                       ch, msg_id, fragq->state_delta, ch->max_state_message_id);
1041           break;
1042         }
1043 #if TODO
1044         /* FIXME: apply modifiers to state in PSYCstore */
1045         GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
1046                                        state_modify_result_cb, cls);
1047 #endif
1048         ch->max_state_message_id = msg_id;
1049       }
1050       ch->max_message_id = msg_id;
1051     }
1052     fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1053     GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
1054     n++;
1055   }
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
1058   return n;
1059 }
1060
1061
1062 /**
1063  * Handle incoming message from multicast.
1064  *
1065  * @param ch   Channel.
1066  * @param mmsg Multicast message.
1067  *
1068  * @return #GNUNET_OK or #GNUNET_SYSERR
1069  */
1070 static int
1071 handle_multicast_message (struct Channel *ch,
1072                           const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1073 {
1074   GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
1075
1076   uint16_t size = ntohs (mmsg->header.size);
1077   uint16_t first_ptype = 0, last_ptype = 0;
1078
1079   if (GNUNET_SYSERR
1080       == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
1081                                           (const char *) &mmsg[1],
1082                                           &first_ptype, &last_ptype))
1083   {
1084     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1085                 "%p Received message with invalid parts from multicast. "
1086                 "Dropping message.\n", ch);
1087     GNUNET_break_op (0);
1088     return GNUNET_SYSERR;
1089   }
1090
1091   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092               "Message parts: first: type %u, last: type %u\n",
1093               first_ptype, last_ptype);
1094
1095   fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
1096   message_queue_run (ch);
1097
1098   return GNUNET_OK;
1099 }
1100
1101
1102 /**
1103  * Incoming message fragment from multicast.
1104  *
1105  * Store it using PSYCstore and send it to the client of the channel.
1106  */
1107 static void
1108 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
1109 {
1110   struct Channel *ch = cls;
1111   uint16_t type = ntohs (msg->type);
1112   uint16_t size = ntohs (msg->size);
1113
1114   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115               "%p Received message of type %u and size %u from multicast.\n",
1116               ch, type, size);
1117
1118   switch (type)
1119   {
1120   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
1121   {
1122     handle_multicast_message (ch, (const struct
1123                                    GNUNET_MULTICAST_MessageHeader *) msg);
1124     break;
1125   }
1126   default:
1127     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1128                 "%p Dropping unknown message of type %u and size %u.\n",
1129                 ch, type, size);
1130   }
1131 }
1132
1133
1134 /**
1135  * Incoming request fragment from multicast for a master.
1136  *
1137  * @param cls           Master.
1138  * @param slave_key     Sending slave's public key.
1139  * @param msg           The message.
1140  * @param flags         Request flags.
1141  */
1142 static void
1143 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1144             const struct GNUNET_MessageHeader *msg,
1145             enum GNUNET_MULTICAST_MessageFlags flags)
1146 {
1147   struct Master *mst = cls;
1148   struct Channel *ch = &mst->ch;
1149
1150   uint16_t type = ntohs (msg->type);
1151   uint16_t size = ntohs (msg->size);
1152
1153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154               "%p Received request of type %u and size %u from multicast.\n",
1155               ch, type, size);
1156
1157   switch (type)
1158   {
1159   case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
1160   {
1161     const struct GNUNET_MULTICAST_RequestHeader *req
1162       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
1163
1164     /* FIXME: see message_cb() */
1165     if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
1166                                                           (const char *) &req[1],
1167                                                           NULL, NULL))
1168     {
1169       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1170                   "%p Dropping request with invalid parts "
1171                   "received from multicast.\n", ch);
1172       GNUNET_break_op (0);
1173       break;
1174     }
1175
1176     struct GNUNET_PSYC_MessageHeader *pmsg;
1177     uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1178     pmsg = GNUNET_malloc (psize);
1179     pmsg->header.size = htons (psize);
1180     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1181     pmsg->message_id = req->request_id;
1182     pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1183
1184     memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1185     msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
1186     GNUNET_free (pmsg);
1187     break;
1188   }
1189   default:
1190     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191                 "%p Dropping unknown request of type %u and size %u.\n",
1192                 ch, type, size);
1193     GNUNET_break_op (0);
1194   }
1195 }
1196
1197
1198 /**
1199  * Response from PSYCstore with the current counter values for a channel master.
1200  */
1201 static void
1202 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1203                     uint64_t max_message_id, uint64_t max_group_generation,
1204                     uint64_t max_state_message_id)
1205 {
1206   struct Master *mst = cls;
1207   struct Channel *ch = &mst->ch;
1208   ch->store_op = NULL;
1209
1210   struct CountersResult res;
1211   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1212   res.header.size = htons (sizeof (res));
1213   res.result_code = htonl (result);
1214   res.max_message_id = GNUNET_htonll (max_message_id);
1215
1216   if (GNUNET_OK == result || GNUNET_NO == result)
1217   {
1218     mst->max_message_id = max_message_id;
1219     ch->max_message_id = max_message_id;
1220     ch->max_state_message_id = max_state_message_id;
1221     mst->max_group_generation = max_group_generation;
1222     mst->origin
1223       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
1224                                        max_fragment_id,
1225                                        join_cb, membership_test_cb,
1226                                        replay_fragment_cb, replay_message_cb,
1227                                        request_cb, message_cb, ch);
1228     ch->ready = GNUNET_YES;
1229   }
1230   else
1231   {
1232     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1233                 "%p GNUNET_PSYCSTORE_counters_get() "
1234                 "returned %d for channel %s.\n",
1235                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1236   }
1237
1238   msg_to_clients (ch, &res.header);
1239 }
1240
1241
1242 /**
1243  * Response from PSYCstore with the current counter values for a channel slave.
1244  */
1245 void
1246 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1247                    uint64_t max_message_id, uint64_t max_group_generation,
1248                    uint64_t max_state_message_id)
1249 {
1250   struct Slave *slv = cls;
1251   struct Channel *ch = &slv->ch;
1252   ch->store_op = NULL;
1253
1254   struct CountersResult res;
1255   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1256   res.header.size = htons (sizeof (res));
1257   res.result_code = htonl (result);
1258   res.max_message_id = GNUNET_htonll (max_message_id);
1259
1260   if (GNUNET_OK == result || GNUNET_NO == result)
1261   {
1262     ch->max_message_id = max_message_id;
1263     ch->max_state_message_id = max_state_message_id;
1264     slv->member
1265       = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1266                                       &slv->origin,
1267                                       slv->relay_count, slv->relays,
1268                                       slv->join_req, join_cb,
1269                                       membership_test_cb,
1270                                       replay_fragment_cb, replay_message_cb,
1271                                       message_cb, ch);
1272     ch->ready = GNUNET_YES;
1273   }
1274   else
1275   {
1276     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277                 "%p GNUNET_PSYCSTORE_counters_get() "
1278                 "returned %d for channel %s.\n",
1279                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1280   }
1281
1282   msg_to_clients (ch, &res.header);
1283 }
1284
1285
1286 static void
1287 channel_init (struct Channel *ch)
1288 {
1289   ch->recv_msgs
1290     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1291   ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1292 }
1293
1294
1295 /**
1296  * Handle a connecting client starting a channel master.
1297  */
1298 static void
1299 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1300                      const struct GNUNET_MessageHeader *msg)
1301 {
1302   const struct MasterStartRequest *req
1303     = (const struct MasterStartRequest *) msg;
1304
1305   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1306   struct GNUNET_HashCode pub_key_hash;
1307
1308   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1309   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1310
1311   struct Master *
1312     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1313   struct Channel *ch;
1314
1315   if (NULL == mst)
1316   {
1317     mst = GNUNET_new (struct Master);
1318     mst->policy = ntohl (req->policy);
1319     mst->priv_key = req->channel_key;
1320     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1321
1322     ch = &mst->ch;
1323     ch->is_master = GNUNET_YES;
1324     ch->pub_key = pub_key;
1325     ch->pub_key_hash = pub_key_hash;
1326     channel_init (ch);
1327
1328     GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1329                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1330     ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1331                                                   master_counters_cb, mst);
1332   }
1333   else
1334   {
1335     ch = &mst->ch;
1336
1337     struct CountersResult res;
1338     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1339     res.header.size = htons (sizeof (res));
1340     res.result_code = htonl (GNUNET_OK);
1341     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1342
1343     GNUNET_SERVER_notification_context_add (nc, client);
1344     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1345                                                 GNUNET_NO);
1346   }
1347
1348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349               "%p Client connected as master to channel %s.\n",
1350               mst, GNUNET_h2s (&ch->pub_key_hash));
1351
1352   struct ClientList *cl = GNUNET_new (struct ClientList);
1353   cl->client = client;
1354   GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1355
1356   GNUNET_SERVER_client_set_user_context (client, ch);
1357   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1358 }
1359
1360
1361 /**
1362  * Handle a connecting client joining as a channel slave.
1363  */
1364 static void
1365 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1366                    const struct GNUNET_MessageHeader *msg)
1367 {
1368   const struct SlaveJoinRequest *req
1369     = (const struct SlaveJoinRequest *) msg;
1370
1371   struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1372   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1373
1374   GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1375   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1376   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1377
1378   struct GNUNET_CONTAINER_MultiHashMap *
1379     ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1380   struct Slave *slv = NULL;
1381   struct Channel *ch;
1382
1383   if (NULL != ch_slv)
1384   {
1385     slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
1386   }
1387   if (NULL == slv)
1388   {
1389     slv = GNUNET_new (struct Slave);
1390     slv->priv_key = req->slave_key;
1391     slv->origin = req->origin;
1392     slv->relay_count = ntohl (req->relay_count);
1393     if (0 < slv->relay_count)
1394     {
1395       const struct GNUNET_PeerIdentity *relays
1396         = (const struct GNUNET_PeerIdentity *) &req[1];
1397       slv->relays
1398         = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1399       uint32_t i;
1400       for (i = 0; i < slv->relay_count; i++)
1401         memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1402     }
1403
1404     ch = &slv->ch;
1405     ch->is_master = GNUNET_NO;
1406     ch->pub_key = req->channel_key;
1407     ch->pub_key_hash = pub_key_hash;
1408     channel_init (ch);
1409
1410     if (NULL == ch_slv)
1411     {
1412       ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1413       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
1414                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1415     }
1416     GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
1417                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1418     GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1419                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1420     ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1421                                                   slave_counters_cb, slv);
1422   }
1423   else
1424   {
1425     ch = &slv->ch;
1426
1427     struct CountersResult res;
1428     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1429     res.header.size = htons (sizeof (res));
1430     res.result_code = htonl (GNUNET_OK);
1431     res.max_message_id = GNUNET_htonll (ch->max_message_id);
1432
1433     GNUNET_SERVER_notification_context_add (nc, client);
1434     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1435                                                 GNUNET_NO);
1436   }
1437
1438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439               "%p Client connected as slave to channel %s.\n",
1440               slv, GNUNET_h2s (&ch->pub_key_hash));
1441
1442   struct ClientList *cl = GNUNET_new (struct ClientList);
1443   cl->client = client;
1444   GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1445
1446   GNUNET_SERVER_client_set_user_context (client, &slv->ch);
1447   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1448 }
1449
1450
1451 struct JoinDecisionClosure
1452 {
1453   uint8_t is_admitted;
1454   struct GNUNET_MessageHeader *msg;
1455 };
1456
1457
1458 /**
1459  * Iterator callback for responding to join requests of a slave.
1460  */
1461 static int
1462 join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1463                   void *jh)
1464 {
1465   struct JoinDecisionClosure *jcls = cls;
1466   // FIXME: add relays
1467   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1468   return GNUNET_YES;
1469 }
1470
1471
1472 /**
1473  * Join decision from client.
1474  */
1475 static void
1476 handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1477                       const struct GNUNET_MessageHeader *msg)
1478 {
1479   struct Channel *
1480     ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1481   GNUNET_assert (GNUNET_YES == ch->is_master);
1482   struct Master *mst = (struct Master *) ch;
1483
1484   struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1485   struct JoinDecisionClosure jcls;
1486   jcls.is_admitted = dcsn->is_admitted;
1487   jcls.msg
1488     = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
1489        <= ntohs (msg->size))
1490     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1491     : NULL;
1492
1493   struct GNUNET_HashCode slave_key_hash;
1494   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1495                       &slave_key_hash);
1496   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1497                                               &join_decision_cb, &jcls);
1498   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1499 }
1500
1501
1502 /**
1503  * Send acknowledgement to a client.
1504  *
1505  * Sent after a message fragment has been passed on to multicast.
1506  *
1507  * @param ch The channel struct for the client.
1508  */
1509 static void
1510 send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1511 {
1512   struct GNUNET_MessageHeader res;
1513   res.size = htons (sizeof (res));
1514   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1515
1516   /* FIXME */
1517   GNUNET_SERVER_notification_context_add (nc, client);
1518   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1519 }
1520
1521
1522 /**
1523  * Callback for the transmit functions of multicast.
1524  */
1525 static int
1526 transmit_notify (void *cls, size_t *data_size, void *data)
1527 {
1528   struct Channel *ch = cls;
1529   struct TransmitMessage *tmit_msg = ch->tmit_head;
1530
1531   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1532   {
1533     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534                 "%p transmit_notify: nothing to send.\n", ch);
1535     *data_size = 0;
1536     return GNUNET_NO;
1537   }
1538
1539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540               "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
1541
1542   *data_size = tmit_msg->size;
1543   memcpy (data, &tmit_msg[1], *data_size);
1544
1545   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1546   if (NULL != tmit_msg->client)
1547     send_message_ack (ch, tmit_msg->client);
1548
1549   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1550   GNUNET_free (tmit_msg);
1551
1552   if (0 == ch->tmit_task)
1553   {
1554     if (NULL != ch->tmit_head)
1555     {
1556       transmit_message (ch);
1557     }
1558     else if (ch->disconnected)
1559     {
1560       /* FIXME: handle partial message (when still in_transmit) */
1561       cleanup_channel (ch);
1562     }
1563   }
1564
1565   return ret;
1566 }
1567
1568
1569 /**
1570  * Callback for the transmit functions of multicast.
1571  */
1572 static int
1573 master_transmit_notify (void *cls, size_t *data_size, void *data)
1574 {
1575   int ret = transmit_notify (cls, data_size, data);
1576
1577   if (GNUNET_YES == ret)
1578   {
1579     struct Master *mst = cls;
1580     mst->tmit_handle = NULL;
1581   }
1582   return ret;
1583 }
1584
1585
1586 /**
1587  * Callback for the transmit functions of multicast.
1588  */
1589 static int
1590 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1591 {
1592   int ret = transmit_notify (cls, data_size, data);
1593
1594   if (GNUNET_YES == ret)
1595   {
1596     struct Slave *slv = cls;
1597     slv->tmit_handle = NULL;
1598   }
1599   return ret;
1600 }
1601
1602
1603 /**
1604  * Transmit a message from a channel master to the multicast group.
1605  */
1606 static void
1607 master_transmit_message (struct Master *mst)
1608 {
1609   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1610   mst->ch.tmit_task = 0;
1611   if (NULL == mst->tmit_handle)
1612   {
1613     mst->tmit_handle
1614       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1615                                         mst->max_group_generation,
1616                                         master_transmit_notify, mst);
1617   }
1618   else
1619   {
1620     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1621   }
1622 }
1623
1624
1625 /**
1626  * Transmit a message from a channel slave to the multicast group.
1627  */
1628 static void
1629 slave_transmit_message (struct Slave *slv)
1630 {
1631   slv->ch.tmit_task = 0;
1632   if (NULL == slv->tmit_handle)
1633   {
1634     slv->tmit_handle
1635       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1636                                            slave_transmit_notify, slv);
1637   }
1638   else
1639   {
1640     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1641   }
1642 }
1643
1644
1645 static inline void
1646 transmit_message (struct Channel *ch)
1647 {
1648   ch->is_master
1649     ? master_transmit_message ((struct Master *) ch)
1650     : slave_transmit_message ((struct Slave *) ch);
1651 }
1652
1653
1654 /**
1655  * Queue a message from a channel master for sending to the multicast group.
1656  */
1657 static void
1658 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1659                      uint16_t first_ptype, uint16_t last_ptype)
1660 {
1661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1662
1663   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1664   {
1665     tmit_msg->id = ++mst->max_message_id;
1666     struct GNUNET_PSYC_MessageMethod *pmeth
1667       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1668
1669     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1670     {
1671       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1672     }
1673     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1674     {
1675       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1676                                           - mst->max_state_message_id);
1677     }
1678     else
1679     {
1680       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1681     }
1682   }
1683 }
1684
1685
1686 /**
1687  * Queue a message from a channel slave for sending to the multicast group.
1688  */
1689 static void
1690 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1691                      uint16_t first_ptype, uint16_t last_ptype)
1692 {
1693   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1694   {
1695     struct GNUNET_PSYC_MessageMethod *pmeth
1696       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1697     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1698     tmit_msg->id = ++slv->max_request_id;
1699   }
1700 }
1701
1702
1703 static void
1704 queue_message (struct Channel *ch,
1705                struct GNUNET_SERVER_Client *client,
1706                const struct GNUNET_MessageHeader *msg,
1707                uint16_t first_ptype, uint16_t last_ptype)
1708 {
1709   uint16_t size = ntohs (msg->size) - sizeof (*msg);
1710   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1711   memcpy (&tmit_msg[1], &msg[1], size);
1712   tmit_msg->client = client;
1713   tmit_msg->size = size;
1714   tmit_msg->state = ch->tmit_state;
1715
1716   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1717
1718   ch->is_master
1719     ? master_queue_message ((struct Master *) ch, tmit_msg,
1720                             first_ptype, last_ptype)
1721     : slave_queue_message ((struct Slave *) ch, tmit_msg,
1722                            first_ptype, last_ptype);
1723 }
1724
1725
1726 static void
1727 transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1728 {
1729   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1730
1731   struct GNUNET_MessageHeader msg;
1732   msg.size = ntohs (sizeof (msg));
1733   msg.type = ntohs (type);
1734
1735   queue_message (ch, client, &msg, type, type);
1736   transmit_message (ch);
1737
1738   /* FIXME: cleanup */
1739 }
1740
1741
1742 /**
1743  * Incoming message from a client.
1744  */
1745 static void
1746 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1747                      const struct GNUNET_MessageHeader *msg)
1748 {
1749   struct Channel *
1750     ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1751   GNUNET_assert (NULL != ch);
1752
1753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754               "%p Received message from client.\n", ch);
1755   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1756
1757   if (GNUNET_YES != ch->ready)
1758   {
1759     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1760                 "%p Dropping message from client, channel is not ready yet.\n",
1761                 ch);
1762     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1763     return;
1764   }
1765
1766   uint16_t size = ntohs (msg->size);
1767   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1768   {
1769     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1770     GNUNET_break (0);
1771     transmit_error (ch, client);
1772     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1773     return;
1774   }
1775
1776   uint16_t first_ptype = 0, last_ptype = 0;
1777   if (GNUNET_SYSERR
1778       == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1779                                           (const char *) &msg[1],
1780                                           &first_ptype, &last_ptype))
1781   {
1782     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1783                 "%p Received invalid message part from client.\n", ch);
1784     GNUNET_break (0);
1785     transmit_error (ch, client);
1786     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1787     return;
1788   }
1789
1790   queue_message (ch, client, msg, first_ptype, last_ptype);
1791   transmit_message (ch);
1792
1793   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1794 };
1795
1796
1797 /**
1798  * Client requests to add a slave to the membership database.
1799  */
1800 static void
1801 handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1802                   const struct GNUNET_MessageHeader *msg)
1803 {
1804
1805 }
1806
1807
1808 /**
1809  * Client requests to remove a slave from the membership database.
1810  */
1811 static void
1812 handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1813                      const struct GNUNET_MessageHeader *msg)
1814 {
1815
1816 }
1817
1818
1819 /**
1820  * Client requests channel history from PSYCstore.
1821  */
1822 static void
1823 handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1824                       const struct GNUNET_MessageHeader *msg)
1825 {
1826
1827 }
1828
1829
1830 /**
1831  * Client requests best matching state variable from PSYCstore.
1832  */
1833 static void
1834 handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1835                   const struct GNUNET_MessageHeader *msg)
1836 {
1837
1838 }
1839
1840
1841 /**
1842  * Client requests state variables with a given prefix from PSYCstore.
1843  */
1844 static void
1845 handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1846                          const struct GNUNET_MessageHeader *msg)
1847 {
1848
1849 }
1850
1851
1852 /**
1853  * Initialize the PSYC service.
1854  *
1855  * @param cls Closure.
1856  * @param server The initialized server.
1857  * @param c Configuration to use.
1858  */
1859 static void
1860 run (void *cls, struct GNUNET_SERVER_Handle *server,
1861      const struct GNUNET_CONFIGURATION_Handle *c)
1862 {
1863   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1864     { &handle_master_start, NULL,
1865       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1866
1867     { &handle_slave_join, NULL,
1868       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1869
1870     { &handle_join_decision, NULL,
1871       GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1872
1873     { &handle_psyc_message, NULL,
1874       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1875
1876     { &handle_slave_add, NULL,
1877       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
1878
1879     { &handle_slave_remove, NULL,
1880       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
1881
1882     { &handle_story_request, NULL,
1883       GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
1884
1885     { &handle_state_get, NULL,
1886       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1887
1888     { &handle_state_get_prefix, NULL,
1889       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
1890   };
1891
1892   cfg = c;
1893   store = GNUNET_PSYCSTORE_connect (cfg);
1894   stats = GNUNET_STATISTICS_create ("psyc", cfg);
1895   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1896   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1897   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1898   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1899   nc = GNUNET_SERVER_notification_context_create (server, 1);
1900   GNUNET_SERVER_add_handlers (server, handlers);
1901   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1902   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1903                                 &shutdown_task, NULL);
1904 }
1905
1906
1907 /**
1908  * The main function for the service.
1909  *
1910  * @param argc number of arguments from the command line
1911  * @param argv command line arguments
1912  * @return 0 ok, 1 on error
1913  */
1914 int
1915 main (int argc, char *const *argv)
1916 {
1917   return (GNUNET_OK ==
1918           GNUNET_SERVICE_run (argc, argv, "psyc",
1919                               GNUNET_SERVICE_OPTION_NONE,
1920                               &run, NULL)) ? 0 : 1;
1921 }
1922
1923 /* end of gnunet-service-psycstore.c */