13f908b6cd341e3190863e950987becca9bf034f
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "psyc.h"
38
39
40 /**
41  * Handle to our current configuration.
42  */
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44
45 /**
46  * Handle to the statistics service.
47  */
48 static struct GNUNET_STATISTICS_Handle *stats;
49
50 /**
51  * Notification context, simplifies client broadcasts.
52  */
53 static struct GNUNET_SERVER_NotificationContext *nc;
54
55 /**
56  * Handle to the PSYCstore.
57  */
58 static struct GNUNET_PSYCSTORE_Handle *store;
59
60 /**
61  * All connected masters.
62  * Channel's pub_key_hash -> struct Master
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *masters;
65
66 /**
67  * All connected slaves.
68  * Channel's pub_key_hash -> struct Slave
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
71
72 /**
73  * Connected slaves per channel.
74  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
75  */
76 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
77
78
79 /**
80  * Message in the transmission queue.
81  */
82 struct TransmitMessage
83 {
84   struct TransmitMessage *prev;
85   struct TransmitMessage *next;
86
87   struct GNUNET_SERVER_Client *client;
88
89   /**
90    * ID assigned to the message.
91    */
92   uint64_t id;
93
94   /**
95    * Size of @a buf
96    */
97   uint16_t size;
98
99   /**
100    * @see enum MessageState
101    */
102   uint8_t state;
103
104   /* Followed by message */
105 };
106
107
108 /**
109  * Cache for received message fragments.
110  * Message fragments are only sent to clients after all modifiers arrived.
111  *
112  * chan_key -> MultiHashMap chan_msgs
113  */
114 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
115
116
117 /**
118  * Entry in the chan_msgs hashmap of @a recv_cache:
119  * fragment_id -> RecvCacheEntry
120  */
121 struct RecvCacheEntry
122 {
123   struct GNUNET_MULTICAST_MessageHeader *mmsg;
124   uint16_t ref_count;
125 };
126
127
128 /**
129  * Entry in the @a recv_frags hash map of a @a Channel.
130  * message_id -> FragmentQueue
131  */
132 struct FragmentQueue
133 {
134   /**
135    * Fragment IDs stored in @a recv_cache.
136    */
137   struct GNUNET_CONTAINER_Heap *fragments;
138
139   /**
140    * Total size of received fragments.
141    */
142   uint64_t size;
143
144   /**
145    * Total size of received header fragments (METHOD & MODIFIERs)
146    */
147   uint64_t header_size;
148
149   /**
150    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
151    */
152   uint64_t state_delta;
153
154   /**
155    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
156    */
157   uint32_t flags;
158
159   /**
160    * Receive state of message.
161    *
162    * @see MessageFragmentState
163    */
164   uint8_t state;
165
166   /**
167    * Is the message queued for delivery to the client?
168    * i.e. added to the recv_msgs queue
169    */
170   uint8_t queued;
171 };
172
173
174 /**
175  * List of connected clients.
176  */
177 struct ClientList
178 {
179   struct ClientList *prev;
180   struct ClientList *next;
181   struct GNUNET_SERVER_Client *client;
182 };
183
184
185 /**
186  * Common part of the client context for both a channel master and slave.
187  */
188 struct Channel
189 {
190   struct ClientList *clients_head;
191   struct ClientList *clients_tail;
192
193   struct TransmitMessage *tmit_head;
194   struct TransmitMessage *tmit_tail;
195
196   /**
197    * Current PSYCstore operation.
198    */
199   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
200
201   /**
202    * Received fragments not yet sent to the client.
203    * message_id -> FragmentQueue
204    */
205   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
206
207   /**
208    * Received message IDs not yet sent to the client.
209    */
210   struct GNUNET_CONTAINER_Heap *recv_msgs;
211
212   /**
213    * FIXME: needed?
214    */
215   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
216
217   /**
218    * Public key of the channel.
219    */
220   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
221
222   /**
223    * Hash of @a pub_key.
224    */
225   struct GNUNET_HashCode pub_key_hash;
226
227   /**
228    * Last message ID sent to the client.
229    * 0 if there is no such message.
230    */
231   uint64_t max_message_id;
232
233   /**
234    * ID of the last stateful message, where the state operations has been
235    * processed and saved to PSYCstore and which has been sent to the client.
236    * 0 if there is no such message.
237    */
238   uint64_t max_state_message_id;
239
240   /**
241    * Expected value size for the modifier being received from the PSYC service.
242    */
243   uint32_t tmit_mod_value_size_expected;
244
245   /**
246    * Actual value size for the modifier being received from the PSYC service.
247    */
248   uint32_t tmit_mod_value_size;
249
250   /**
251    * @see enum MessageState
252    */
253   uint8_t tmit_state;
254
255   /**
256    * FIXME: needed?
257    */
258   uint8_t in_transmit;
259
260   /**
261    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
262    */
263   uint8_t is_master;
264
265   /**
266    * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
267    */
268   uint8_t ready;
269
270   /**
271    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
272    */
273   uint8_t disconnected;
274 };
275
276
277 /**
278  * Client context for a channel master.
279  */
280 struct Master
281 {
282   /**
283    * Channel struct common for Master and Slave
284    */
285   struct Channel ch;
286
287   /**
288    * Private key of the channel.
289    */
290   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
291
292   /**
293    * Handle for the multicast origin.
294    */
295   struct GNUNET_MULTICAST_Origin *origin;
296
297   /**
298    * Transmit handle for multicast.
299    */
300   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
301
302   /**
303    * Incoming join requests from multicast.
304    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
305    */
306   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
307
308   /**
309    * Last message ID transmitted to this channel.
310    *
311    * Incremented before sending a message, thus the message_id in messages sent
312    * starts from 1.
313    */
314   uint64_t max_message_id;
315
316   /**
317    * ID of the last message with state operations transmitted to the channel.
318    * 0 if there is no such message.
319    */
320   uint64_t max_state_message_id;
321
322   /**
323    * Maximum group generation transmitted to the channel.
324    */
325   uint64_t max_group_generation;
326
327   /**
328    * @see enum GNUNET_PSYC_Policy
329    */
330   enum GNUNET_PSYC_Policy policy;
331 };
332
333
334 /**
335  * Client context for a channel slave.
336  */
337 struct Slave
338 {
339   /**
340    * Channel struct common for Master and Slave
341    */
342   struct Channel ch;
343
344   /**
345    * Private key of the slave.
346    */
347   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
348
349   /**
350    * Public key of the slave.
351    */
352   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
353
354   /**
355    * Hash of @a pub_key.
356    */
357   struct GNUNET_HashCode pub_key_hash;
358
359   /**
360    * Handle for the multicast member.
361    */
362   struct GNUNET_MULTICAST_Member *member;
363
364   /**
365    * Transmit handle for multicast.
366    */
367   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
368
369   /**
370    * Peer identity of the origin.
371    */
372   struct GNUNET_PeerIdentity origin;
373
374   /**
375    * Number of items in @a relays.
376    */
377   uint32_t relay_count;
378
379   /**
380    * Relays that multicast can use to connect.
381    */
382   struct GNUNET_PeerIdentity *relays;
383
384   /**
385    * Join request to be transmitted to the master on join.
386    */
387   struct GNUNET_MessageHeader *join_req;
388
389   /**
390    * Join decision received from multicast.
391    */
392   struct SlaveJoinDecision *join_dcsn;
393
394   /**
395    * Maximum request ID for this channel.
396    */
397   uint64_t max_request_id;
398 };
399
400
401 static inline void
402 transmit_message (struct Channel *ch);
403
404
405 static uint64_t
406 message_queue_drop (struct Channel *ch);
407
408
409 /**
410  * Task run during shutdown.
411  *
412  * @param cls unused
413  * @param tc unused
414  */
415 static void
416 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   if (NULL != nc)
419   {
420     GNUNET_SERVER_notification_context_destroy (nc);
421     nc = NULL;
422   }
423   if (NULL != stats)
424   {
425     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
426     stats = NULL;
427   }
428 }
429
430
431 /**
432  * Clean up master data structures after a client disconnected.
433  */
434 static void
435 cleanup_master (struct Master *mst)
436 {
437   struct Channel *ch = &mst->ch;
438
439   if (NULL != mst->origin)
440     GNUNET_MULTICAST_origin_stop (mst->origin);
441   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
442   GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
443 }
444
445
446 /**
447  * Clean up slave data structures after a client disconnected.
448  */
449 static void
450 cleanup_slave (struct Slave *slv)
451 {
452   struct Channel *ch = &slv->ch;
453   struct GNUNET_CONTAINER_MultiHashMap *
454     ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
455                                                 &ch->pub_key_hash);
456   GNUNET_assert (NULL != ch_slv);
457   GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
458
459   if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
460   {
461     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
462                                           ch_slv);
463     GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
464   }
465   GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
466
467   if (NULL != slv->join_req)
468     GNUNET_free (slv->join_req);
469   if (NULL != slv->relays)
470     GNUNET_free (slv->relays);
471   if (NULL != slv->member)
472     GNUNET_MULTICAST_member_part (slv->member);
473   GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
474 }
475
476
477 /**
478  * Clean up channel data structures after a client disconnected.
479  */
480 static void
481 cleanup_channel (struct Channel *ch)
482 {
483   message_queue_drop (ch);
484   GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash);
485
486   if (NULL != ch->store_op)
487     GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
488
489   (GNUNET_YES == ch->is_master)
490     ? cleanup_master ((struct Master *) ch)
491     : cleanup_slave ((struct Slave *) ch);
492   GNUNET_free (ch);
493 }
494
495
496 /**
497  * Called whenever a client is disconnected.
498  * Frees our resources associated with that client.
499  *
500  * @param cls Closure.
501  * @param client Identification of the client.
502  */
503 static void
504 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
505 {
506   if (NULL == client)
507     return;
508
509   struct Channel *
510     ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512               "%p Client (%s) disconnected from channel %s\n",
513               ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
514               GNUNET_h2s (&ch->pub_key_hash));
515
516   if (NULL == ch)
517   {
518     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
519                 "%p User context is NULL in client_disconnect()\n", ch);
520     GNUNET_break (0);
521     return;
522   }
523
524   struct ClientList *cl = ch->clients_head;
525   while (NULL != cl)
526   {
527     if (cl->client == client)
528     {
529       GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
530       GNUNET_free (cl);
531       break;
532     }
533     cl = cl->next;
534   }
535
536   if (NULL == ch->clients_head)
537   { /* Last client disconnected. */
538     if (NULL != ch->tmit_head)
539     { /* Send pending messages to multicast before cleanup. */
540       transmit_message (ch);
541     }
542     else
543     {
544       cleanup_channel (ch);
545     }
546   }
547 }
548
549
550 /**
551  * Send message to all clients connected to the channel.
552  */
553 static void
554 msg_to_clients (const struct Channel *ch,
555                 const struct GNUNET_MessageHeader *msg)
556 {
557   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
558               "%p Sending message to clients.\n", ch);
559
560   struct ClientList *cl = ch->clients_head;
561   while (NULL != cl)
562   {
563     GNUNET_SERVER_notification_context_add (nc, cl->client);
564     GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
565     cl = cl->next;
566   }
567 }
568
569
570 /**
571  * Closure for join_mem_test_cb()
572  */
573 struct JoinMemTestClosure
574 {
575   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
576   struct Channel *ch;
577   struct GNUNET_MULTICAST_JoinHandle *jh;
578   struct MasterJoinRequest *master_join_req;
579 };
580
581
582 /**
583  * Membership test result callback used for join requests.
584  */
585 static void
586 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
587 {
588   struct JoinMemTestClosure *jcls = cls;
589
590   if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
591   { /* Pass on join request to client if this is a master channel */
592     struct Master *mst = (struct Master *) jcls->ch;
593     struct GNUNET_HashCode slave_key_hash;
594     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
595                         &slave_key_hash);
596     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
597                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
598     msg_to_clients (jcls->ch, &jcls->master_join_req->header);
599   }
600   else
601   {
602     // FIXME: add relays
603     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
604   }
605   GNUNET_free (jcls->master_join_req);
606   GNUNET_free (jcls);
607 }
608
609
610 /**
611  * Incoming join request from multicast.
612  */
613 static void
614 mcast_join_request_cb (void *cls,
615                        const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
616                        const struct GNUNET_MessageHeader *join_msg,
617                        struct GNUNET_MULTICAST_JoinHandle *jh)
618 {
619   struct Channel *ch = cls;
620   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
621
622   uint16_t join_msg_size = 0;
623   if (NULL != join_msg)
624   {
625     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
626     {
627       join_msg_size = ntohs (join_msg->size);
628     }
629     else
630     {
631       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
632                   "%p Got join message with invalid type %u.\n",
633                   ch, ntohs (join_msg->type));
634     }
635   }
636
637   struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
638   req->header.size = htons (sizeof (*req) + join_msg_size);
639   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
640   req->slave_key = *slave_key;
641   if (0 < join_msg_size)
642     memcpy (&req[1], join_msg, join_msg_size);
643
644   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
645   jcls->slave_key = *slave_key;
646   jcls->ch = ch;
647   jcls->jh = jh;
648   jcls->master_join_req = req;
649
650   GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
651                                     ch->max_message_id, 0,
652                                     &join_mem_test_cb, jcls);
653 }
654
655
656 /**
657  * Join decision received from multicast.
658  */
659 static void
660 mcast_join_decision_cb (void *cls, int is_admitted,
661                         const struct GNUNET_PeerIdentity *peer,
662                         uint16_t relay_count,
663                         const struct GNUNET_PeerIdentity *relays,
664                         const struct GNUNET_MessageHeader *join_resp)
665 {
666   struct Slave *slv = cls;
667   struct Channel *ch = &slv->ch;
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669               "%p Got join decision: %d\n", slv, is_admitted);
670
671   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
672   struct SlaveJoinDecision *
673     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
674   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
675   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
676   dcsn->is_admitted = htonl (is_admitted);
677   if (0 < join_resp_size)
678     memcpy (&dcsn[1], join_resp, join_resp_size);
679
680   msg_to_clients (ch, &dcsn->header);
681
682   if (GNUNET_YES == is_admitted)
683   {
684     ch->ready = GNUNET_YES;
685   }
686   else
687   {
688     slv->member = NULL;
689   }
690 }
691
692
693 static void
694 mcast_membership_test_cb (void *cls,
695                           const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
696                           uint64_t message_id, uint64_t group_generation,
697                           struct GNUNET_MULTICAST_MembershipTestHandle *mth)
698 {
699
700 }
701
702
703 static void
704 mcast_replay_fragment_cb (void *cls,
705                           const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
706                           uint64_t fragment_id, uint64_t flags,
707                           struct GNUNET_MULTICAST_ReplayHandle *rh)
708
709 {
710
711 }
712
713
714 static void
715 mcast_replay_message_cb (void *cls,
716                          const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
717                          uint64_t message_id,
718                          uint64_t fragment_offset,
719                          uint64_t flags,
720                          struct GNUNET_MULTICAST_ReplayHandle *rh)
721 {
722
723 }
724
725
726 static void
727 fragment_store_result (void *cls, int64_t result, const char *err_msg)
728 {
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "fragment_store() returned %l (%s)\n", result, err_msg);
731 }
732
733
734 /**
735  * Convert an uint64_t in network byte order to a HashCode
736  * that can be used as key in a MultiHashMap
737  */
738 static inline void
739 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
740 {
741   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
742   /* TODO: use built-in byte swap functions if available */
743
744   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
745   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
746
747   *key = (struct GNUNET_HashCode) {{ 0 }};
748   *((uint64_t *) key)
749     = (n << 32) | (n >> 32);
750 }
751
752
753 /**
754  * Convert an uint64_t in host byte order to a HashCode
755  * that can be used as key in a MultiHashMap
756  */
757 static inline void
758 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
759 {
760 #if __BYTE_ORDER == __BIG_ENDIAN
761   hash_key_from_nll (key, n);
762 #elif __BYTE_ORDER == __LITTLE_ENDIAN
763   *key = (struct GNUNET_HashCode) {{ 0 }};
764   *((uint64_t *) key) = n;
765 #else
766   #error byteorder undefined
767 #endif
768 }
769
770
771 /**
772  * Send multicast message to all clients connected to the channel.
773  */
774 static void
775 mmsg_to_clients (struct Channel *ch,
776                  const struct GNUNET_MULTICAST_MessageHeader *mmsg)
777 {
778   uint16_t size = ntohs (mmsg->header.size);
779   struct GNUNET_PSYC_MessageHeader *pmsg;
780   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
781
782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783               "%p Sending message to client. "
784               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
785               ch, GNUNET_ntohll (mmsg->fragment_id),
786               GNUNET_ntohll (mmsg->message_id));
787
788   pmsg = GNUNET_malloc (psize);
789   pmsg->header.size = htons (psize);
790   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
791   pmsg->message_id = mmsg->message_id;
792
793   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
794   msg_to_clients (ch, &pmsg->header);
795   GNUNET_free (pmsg);
796 }
797
798
799 /**
800  * Insert a multicast message fragment into the queue belonging to the message.
801  *
802  * @param ch           Channel.
803  * @param mmsg         Multicast message fragment.
804  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
805  * @param first_ptype  First PSYC message part type in @a mmsg.
806  * @param last_ptype   Last PSYC message part type in @a mmsg.
807  */
808 static void
809 fragment_queue_insert (struct Channel *ch,
810                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
811                        uint16_t first_ptype, uint16_t last_ptype)
812 {
813   const uint16_t size = ntohs (mmsg->header.size);
814   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
815   struct GNUNET_CONTAINER_MultiHashMap
816     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
817                                                     &ch->pub_key_hash);
818
819   struct GNUNET_HashCode msg_id_hash;
820   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
821
822   struct FragmentQueue
823     *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
824
825   if (NULL == fragq)
826   {
827     fragq = GNUNET_new (struct FragmentQueue);
828     fragq->state = MSG_FRAG_STATE_HEADER;
829     fragq->fragments
830       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
831
832     GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
833                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
834
835     if (NULL == chan_msgs)
836     {
837       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
838       GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
839                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
840     }
841   }
842
843   struct GNUNET_HashCode frag_id_hash;
844   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
845   struct RecvCacheEntry
846     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
847   if (NULL == cache_entry)
848   {
849     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                 "%p Adding message fragment to cache. "
851                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
852                 "header_size: %" PRIu64 " + %u).\n",
853                 ch, GNUNET_ntohll (mmsg->message_id),
854                 GNUNET_ntohll (mmsg->fragment_id),
855                 fragq->header_size, size);
856     cache_entry = GNUNET_new (struct RecvCacheEntry);
857     cache_entry->ref_count = 1;
858     cache_entry->mmsg = GNUNET_malloc (size);
859     memcpy (cache_entry->mmsg, mmsg, size);
860     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
861                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
862   }
863   else
864   {
865     cache_entry->ref_count++;
866     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867                 "%p Message fragment is already in cache. "
868                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
869                 ", ref_count: %u\n",
870                 ch, GNUNET_ntohll (mmsg->message_id),
871                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
872   }
873
874   if (MSG_FRAG_STATE_HEADER == fragq->state)
875   {
876     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
877     {
878       struct GNUNET_PSYC_MessageMethod *
879         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
880       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
881       fragq->flags = ntohl (pmeth->flags);
882     }
883
884     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
885     {
886       fragq->header_size += size;
887     }
888     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
889              || frag_offset == fragq->header_size)
890     { /* header is now complete */
891       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
892                   "%p Header of message %" PRIu64 " is complete.\n",
893                   ch, GNUNET_ntohll (mmsg->message_id));
894
895       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
896                   "%p Adding message %" PRIu64 " to queue.\n",
897                   ch, GNUNET_ntohll (mmsg->message_id));
898       fragq->state = MSG_FRAG_STATE_DATA;
899     }
900     else
901     {
902       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
903                   "%p Header of message %" PRIu64 " is NOT complete yet: "
904                   "%" PRIu64 " != %" PRIu64 "\n",
905                   ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
906                   fragq->header_size);
907     }
908   }
909
910   switch (last_ptype)
911   {
912   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
913     if (frag_offset == fragq->size)
914       fragq->state = MSG_FRAG_STATE_END;
915     else
916       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
917                   "%p Message %" PRIu64 " is NOT complete yet: "
918                   "%" PRIu64 " != %" PRIu64 "\n",
919                   ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
920                   fragq->size);
921     break;
922
923   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
924     /* Drop message without delivering to client if it's a single fragment */
925     fragq->state =
926       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
927       ? MSG_FRAG_STATE_DROP
928       : MSG_FRAG_STATE_CANCEL;
929   }
930
931   switch (fragq->state)
932   {
933   case MSG_FRAG_STATE_DATA:
934   case MSG_FRAG_STATE_END:
935   case MSG_FRAG_STATE_CANCEL:
936     if (GNUNET_NO == fragq->queued)
937     {
938       GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
939                                     GNUNET_ntohll (mmsg->message_id));
940       fragq->queued = GNUNET_YES;
941     }
942   }
943
944   fragq->size += size;
945   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
946                                 GNUNET_ntohll (mmsg->fragment_id));
947 }
948
949
950 /**
951  * Run fragment queue of a message.
952  *
953  * Send fragments of a message in order to client, after all modifiers arrived
954  * from multicast.
955  *
956  * @param ch      Channel.
957  * @param msg_id  ID of the message @a fragq belongs to.
958  * @param fragq   Fragment queue of the message.
959  * @param drop    Drop message without delivering to client?
960  *                #GNUNET_YES or #GNUNET_NO.
961  */
962 static void
963 fragment_queue_run (struct Channel *ch, uint64_t msg_id,
964                     struct FragmentQueue *fragq, uint8_t drop)
965 {
966   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
967               "%p Running message fragment queue for message %" PRIu64
968               " (state: %u).\n",
969               ch, msg_id, fragq->state);
970
971   struct GNUNET_CONTAINER_MultiHashMap
972     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
973                                                     &ch->pub_key_hash);
974   GNUNET_assert (NULL != chan_msgs);
975   uint64_t frag_id;
976
977   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
978                                                     &frag_id))
979   {
980     struct GNUNET_HashCode frag_id_hash;
981     hash_key_from_hll (&frag_id_hash, frag_id);
982     struct RecvCacheEntry *cache_entry
983       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
984     if (cache_entry != NULL)
985     {
986       if (GNUNET_NO == drop)
987       {
988         mmsg_to_clients (ch, cache_entry->mmsg);
989       }
990       if (cache_entry->ref_count <= 1)
991       {
992         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
993                                               cache_entry);
994         GNUNET_free (cache_entry->mmsg);
995         GNUNET_free (cache_entry);
996       }
997       else
998       {
999         cache_entry->ref_count--;
1000       }
1001     }
1002 #if CACHE_AGING_IMPLEMENTED
1003     else if (GNUNET_NO == drop)
1004     {
1005       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1006     }
1007 #endif
1008
1009     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1010   }
1011
1012   if (MSG_FRAG_STATE_END <= fragq->state)
1013   {
1014     struct GNUNET_HashCode msg_id_hash;
1015     hash_key_from_nll (&msg_id_hash, msg_id);
1016
1017     GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
1018     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1019     GNUNET_free (fragq);
1020   }
1021   else
1022   {
1023     fragq->queued = GNUNET_NO;
1024   }
1025 }
1026
1027
1028 /**
1029  * Run message queue.
1030  *
1031  * Send messages in queue to client in order after a message has arrived from
1032  * multicast, according to the following:
1033  * - A message is only sent if all of its modifiers arrived.
1034  * - A stateful message is only sent if the previous stateful message
1035  *   has already been delivered to the client.
1036  *
1037  * @param ch  Channel.
1038  *
1039  * @return Number of messages removed from queue and sent to client.
1040  */
1041 static uint64_t
1042 message_queue_run (struct Channel *ch)
1043 {
1044   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1045               "%p Running message queue.\n", ch);
1046   uint64_t n = 0;
1047   uint64_t msg_id;
1048   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
1049                                                     &msg_id))
1050   {
1051     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1052                 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
1053     struct GNUNET_HashCode msg_id_hash;
1054     hash_key_from_hll (&msg_id_hash, msg_id);
1055
1056     struct FragmentQueue *
1057       fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
1058
1059     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1060     {
1061       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1062                   "%p No fragq (%p) or header not complete.\n",
1063                   ch, fragq);
1064       break;
1065     }
1066
1067     if (MSG_FRAG_STATE_HEADER == fragq->state)
1068     {
1069       /* Check if there's a missing message before the current one */
1070       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1071       {
1072         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1073             && msg_id - 1 != ch->max_message_id)
1074         {
1075           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1076                       "%p Out of order message. "
1077                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1078                       ch, msg_id, ch->max_message_id);
1079           break;
1080         }
1081       }
1082       else
1083       {
1084         if (msg_id - fragq->state_delta != ch->max_state_message_id)
1085         {
1086           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1087                       "%p Out of order stateful message. "
1088                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1089                       ch, msg_id, fragq->state_delta, ch->max_state_message_id);
1090           break;
1091         }
1092 #if TODO
1093         /* FIXME: apply modifiers to state in PSYCstore */
1094         GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
1095                                        state_modify_result_cb, cls);
1096 #endif
1097         ch->max_state_message_id = msg_id;
1098       }
1099       ch->max_message_id = msg_id;
1100     }
1101     fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1102     GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
1103     n++;
1104   }
1105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1106               "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
1107   return n;
1108 }
1109
1110
1111 /**
1112  * Drop message queue of a channel.
1113  *
1114  * Remove all messages in queue without sending it to clients.
1115  *
1116  * @param ch  Channel.
1117  *
1118  * @return Number of messages removed from queue.
1119  */
1120 static uint64_t
1121 message_queue_drop (struct Channel *ch)
1122 {
1123   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1124               "%p Dropping message queue.\n", ch);
1125   uint64_t n = 0;
1126   uint64_t msg_id;
1127   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
1128                                                     &msg_id))
1129   {
1130     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1131                 "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id);
1132     struct GNUNET_HashCode msg_id_hash;
1133     hash_key_from_hll (&msg_id_hash, msg_id);
1134
1135     struct FragmentQueue *
1136       fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
1137
1138     fragment_queue_run (ch, msg_id, fragq, GNUNET_YES);
1139     GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
1140     n++;
1141   }
1142   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143               "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
1144   return n;
1145 }
1146
1147
1148 /**
1149  * Handle incoming message from multicast.
1150  *
1151  * @param ch   Channel.
1152  * @param mmsg Multicast message.
1153  *
1154  * @return #GNUNET_OK or #GNUNET_SYSERR
1155  */
1156 static int
1157 client_multicast_message (struct Channel *ch,
1158                           const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1159 {
1160   GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
1161
1162   uint16_t size = ntohs (mmsg->header.size);
1163   uint16_t first_ptype = 0, last_ptype = 0;
1164
1165   if (GNUNET_SYSERR
1166       == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
1167                                           (const char *) &mmsg[1],
1168                                           &first_ptype, &last_ptype))
1169   {
1170     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171                 "%p Received message with invalid parts from multicast. "
1172                 "Dropping message.\n", ch);
1173     GNUNET_break_op (0);
1174     return GNUNET_SYSERR;
1175   }
1176
1177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178               "Message parts: first: type %u, last: type %u\n",
1179               first_ptype, last_ptype);
1180
1181   fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
1182   message_queue_run (ch);
1183
1184   return GNUNET_OK;
1185 }
1186
1187
1188 /**
1189  * Incoming message fragment from multicast.
1190  *
1191  * Store it using PSYCstore and send it to the client of the channel.
1192  */
1193 static void
1194 mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
1195 {
1196   struct Channel *ch = cls;
1197   uint16_t type = ntohs (msg->type);
1198   uint16_t size = ntohs (msg->size);
1199
1200   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201               "%p Received message of type %u and size %u from multicast.\n",
1202               ch, type, size);
1203
1204   switch (type)
1205   {
1206   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
1207   {
1208     client_multicast_message (ch, (const struct
1209                                    GNUNET_MULTICAST_MessageHeader *) msg);
1210     break;
1211   }
1212   default:
1213     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1214                 "%p Dropping unknown message of type %u and size %u.\n",
1215                 ch, type, size);
1216   }
1217 }
1218
1219
1220 /**
1221  * Incoming request fragment from multicast for a master.
1222  *
1223  * @param cls           Master.
1224  * @param slave_key     Sending slave's public key.
1225  * @param msg           The message.
1226  * @param flags         Request flags.
1227  */
1228 static void
1229 mcast_request_cb (void *cls,
1230                   const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1231                   const struct GNUNET_MessageHeader *msg,
1232                   enum GNUNET_MULTICAST_MessageFlags flags)
1233 {
1234   struct Master *mst = cls;
1235   struct Channel *ch = &mst->ch;
1236
1237   uint16_t type = ntohs (msg->type);
1238   uint16_t size = ntohs (msg->size);
1239
1240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241               "%p Received request of type %u and size %u from multicast.\n",
1242               ch, type, size);
1243
1244   switch (type)
1245   {
1246   case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
1247   {
1248     const struct GNUNET_MULTICAST_RequestHeader *req
1249       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
1250
1251     /* FIXME: see message_cb() */
1252     if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
1253                                                           (const char *) &req[1],
1254                                                           NULL, NULL))
1255     {
1256       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1257                   "%p Dropping request with invalid parts "
1258                   "received from multicast.\n", ch);
1259       GNUNET_break_op (0);
1260       break;
1261     }
1262
1263     struct GNUNET_PSYC_MessageHeader *pmsg;
1264     uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1265     pmsg = GNUNET_malloc (psize);
1266     pmsg->header.size = htons (psize);
1267     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1268     pmsg->message_id = req->request_id;
1269     pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1270
1271     memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1272     msg_to_clients (ch, &pmsg->header);
1273     GNUNET_free (pmsg);
1274     break;
1275   }
1276   default:
1277     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278                 "%p Dropping unknown request of type %u and size %u.\n",
1279                 ch, type, size);
1280     GNUNET_break_op (0);
1281   }
1282 }
1283
1284
1285 /**
1286  * Response from PSYCstore with the current counter values for a channel master.
1287  */
1288 static void
1289 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1290                     uint64_t max_message_id, uint64_t max_group_generation,
1291                     uint64_t max_state_message_id)
1292 {
1293   struct Master *mst = cls;
1294   struct Channel *ch = &mst->ch;
1295   ch->store_op = NULL;
1296
1297   struct CountersResult res;
1298   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1299   res.header.size = htons (sizeof (res));
1300   res.result_code = htonl (result);
1301   res.max_message_id = GNUNET_htonll (max_message_id);
1302
1303   if (GNUNET_OK == result || GNUNET_NO == result)
1304   {
1305     mst->max_message_id = max_message_id;
1306     ch->max_message_id = max_message_id;
1307     ch->max_state_message_id = max_state_message_id;
1308     mst->max_group_generation = max_group_generation;
1309     mst->origin
1310       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1311                                        &mcast_join_request_cb,
1312                                        &mcast_membership_test_cb,
1313                                        &mcast_replay_fragment_cb,
1314                                        &mcast_replay_message_cb,
1315                                        &mcast_request_cb,
1316                                        &mcast_message_cb, ch);
1317     ch->ready = GNUNET_YES;
1318   }
1319   else
1320   {
1321     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1322                 "%p GNUNET_PSYCSTORE_counters_get() "
1323                 "returned %d for channel %s.\n",
1324                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1325   }
1326
1327   msg_to_clients (ch, &res.header);
1328 }
1329
1330
1331 /**
1332  * Response from PSYCstore with the current counter values for a channel slave.
1333  */
1334 void
1335 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1336                    uint64_t max_message_id, uint64_t max_group_generation,
1337                    uint64_t max_state_message_id)
1338 {
1339   struct Slave *slv = cls;
1340   struct Channel *ch = &slv->ch;
1341   ch->store_op = NULL;
1342
1343   struct CountersResult res;
1344   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1345   res.header.size = htons (sizeof (res));
1346   res.result_code = htonl (result);
1347   res.max_message_id = GNUNET_htonll (max_message_id);
1348
1349   if (GNUNET_OK == result || GNUNET_NO == result)
1350   {
1351     ch->max_message_id = max_message_id;
1352     ch->max_state_message_id = max_state_message_id;
1353     slv->member
1354       = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1355                                       &slv->origin,
1356                                       slv->relay_count, slv->relays,
1357                                       slv->join_req,
1358                                       &mcast_join_request_cb,
1359                                       &mcast_join_decision_cb,
1360                                       &mcast_membership_test_cb,
1361                                       &mcast_replay_fragment_cb,
1362                                       &mcast_replay_message_cb,
1363                                       &mcast_message_cb, ch);
1364   }
1365   else
1366   {
1367     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1368                 "%p GNUNET_PSYCSTORE_counters_get() "
1369                 "returned %d for channel %s.\n",
1370                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1371   }
1372
1373   msg_to_clients (ch, &res.header);
1374 }
1375
1376
1377 static void
1378 channel_init (struct Channel *ch)
1379 {
1380   ch->recv_msgs
1381     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1382   ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1383 }
1384
1385
1386 /**
1387  * Handle a connecting client starting a channel master.
1388  */
1389 static void
1390 client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1391                      const struct GNUNET_MessageHeader *msg)
1392 {
1393   const struct MasterStartRequest *req
1394     = (const struct MasterStartRequest *) msg;
1395
1396   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1397   struct GNUNET_HashCode pub_key_hash;
1398
1399   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1400   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1401
1402   struct Master *
1403     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1404   struct Channel *ch;
1405
1406   if (NULL == mst)
1407   {
1408     mst = GNUNET_new (struct Master);
1409     mst->policy = ntohl (req->policy);
1410     mst->priv_key = req->channel_key;
1411     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1412
1413     ch = &mst->ch;
1414     ch->is_master = GNUNET_YES;
1415     ch->pub_key = pub_key;
1416     ch->pub_key_hash = pub_key_hash;
1417     channel_init (ch);
1418
1419     GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1420                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1421     ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1422                                                   master_counters_cb, mst);
1423   }
1424   else
1425   {
1426     ch = &mst->ch;
1427
1428     struct CountersResult res;
1429     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1430     res.header.size = htons (sizeof (res));
1431     res.result_code = htonl (GNUNET_OK);
1432     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1433
1434     GNUNET_SERVER_notification_context_add (nc, client);
1435     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1436                                                 GNUNET_NO);
1437   }
1438
1439   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440               "%p Client connected as master to channel %s.\n",
1441               mst, GNUNET_h2s (&ch->pub_key_hash));
1442
1443   struct ClientList *cl = GNUNET_new (struct ClientList);
1444   cl->client = client;
1445   GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1446
1447   GNUNET_SERVER_client_set_user_context (client, ch);
1448   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1449 }
1450
1451
1452 /**
1453  * Handle a connecting client joining as a channel slave.
1454  */
1455 static void
1456 client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1457                    const struct GNUNET_MessageHeader *msg)
1458 {
1459   const struct SlaveJoinRequest *req
1460     = (const struct SlaveJoinRequest *) msg;
1461
1462   struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1463   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1464
1465   GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1466   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1467   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1468
1469   struct GNUNET_CONTAINER_MultiHashMap *
1470     ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1471   struct Slave *slv = NULL;
1472   struct Channel *ch;
1473
1474   if (NULL != ch_slv)
1475   {
1476     slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
1477   }
1478   if (NULL == slv)
1479   {
1480     slv = GNUNET_new (struct Slave);
1481     slv->priv_key = req->slave_key;
1482     slv->pub_key = slv_pub_key;
1483     slv->pub_key_hash = slv_pub_key_hash;
1484     slv->origin = req->origin;
1485     slv->relay_count = ntohl (req->relay_count);
1486     if (0 < slv->relay_count)
1487     {
1488       const struct GNUNET_PeerIdentity *relays
1489         = (const struct GNUNET_PeerIdentity *) &req[1];
1490       slv->relays
1491         = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1492       uint32_t i;
1493       for (i = 0; i < slv->relay_count; i++)
1494         memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1495     }
1496
1497     ch = &slv->ch;
1498     ch->is_master = GNUNET_NO;
1499     ch->pub_key = req->channel_key;
1500     ch->pub_key_hash = pub_key_hash;
1501     channel_init (ch);
1502
1503     if (NULL == ch_slv)
1504     {
1505       ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1506       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv,
1507                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1508     }
1509     GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch,
1510                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1511     GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1512                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1513     ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
1514                                                   slave_counters_cb, slv);
1515   }
1516   else
1517   {
1518     ch = &slv->ch;
1519
1520     struct CountersResult res;
1521     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1522     res.header.size = htons (sizeof (res));
1523     res.result_code = htonl (GNUNET_OK);
1524     res.max_message_id = GNUNET_htonll (ch->max_message_id);
1525
1526     GNUNET_SERVER_notification_context_add (nc, client);
1527     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1528                                                 GNUNET_NO);
1529
1530     if (NULL == slv->member)
1531     {
1532       slv->member
1533         = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1534                                         &slv->origin,
1535                                         slv->relay_count, slv->relays,
1536                                         slv->join_req,
1537                                         &mcast_join_request_cb,
1538                                         &mcast_join_decision_cb,
1539                                         &mcast_membership_test_cb,
1540                                         &mcast_replay_fragment_cb,
1541                                         &mcast_replay_message_cb,
1542                                         &mcast_message_cb, ch);
1543
1544     }
1545     else if (NULL != slv->join_dcsn)
1546     {
1547       GNUNET_SERVER_notification_context_add (nc, client);
1548       GNUNET_SERVER_notification_context_unicast (nc, client,
1549                                                   &slv->join_dcsn->header,
1550                                                   GNUNET_NO);
1551     }
1552   }
1553
1554   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1555               "%p Client connected as slave to channel %s.\n",
1556               slv, GNUNET_h2s (&ch->pub_key_hash));
1557
1558   struct ClientList *cl = GNUNET_new (struct ClientList);
1559   cl->client = client;
1560   GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1561
1562   GNUNET_SERVER_client_set_user_context (client, &slv->ch);
1563   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1564 }
1565
1566
1567 struct JoinDecisionClosure
1568 {
1569   int32_t is_admitted;
1570   struct GNUNET_MessageHeader *msg;
1571 };
1572
1573
1574 /**
1575  * Iterator callback for responding to join requests of a slave.
1576  */
1577 static int
1578 send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1579                        void *jh)
1580 {
1581   struct JoinDecisionClosure *jcls = cls;
1582   // FIXME: add relays
1583   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1584   return GNUNET_YES;
1585 }
1586
1587
1588 /**
1589  * Join decision from client.
1590  */
1591 static void
1592 client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1593                       const struct GNUNET_MessageHeader *msg)
1594 {
1595   struct Channel *
1596     ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1597   GNUNET_assert (GNUNET_YES == ch->is_master);
1598   struct Master *mst = (struct Master *) ch;
1599
1600   struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1601   struct JoinDecisionClosure jcls;
1602   jcls.is_admitted = ntohl (dcsn->is_admitted);
1603   jcls.msg
1604     = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
1605        <= ntohs (msg->size))
1606     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1607     : NULL;
1608
1609   struct GNUNET_HashCode slave_key_hash;
1610   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1611                       &slave_key_hash);
1612
1613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614               "%p Got join decision (%d) from client for channel %s..\n",
1615               mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash));
1616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617               "%p ..and slave %s.\n",
1618               mst, GNUNET_h2s (&slave_key_hash));
1619
1620   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1621                                               &send_join_decision_cb, &jcls);
1622   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1623   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1624 }
1625
1626
1627 /**
1628  * Send acknowledgement to a client.
1629  *
1630  * Sent after a message fragment has been passed on to multicast.
1631  *
1632  * @param ch The channel struct for the client.
1633  */
1634 static void
1635 send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1636 {
1637   struct GNUNET_MessageHeader res;
1638   res.size = htons (sizeof (res));
1639   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1640
1641   /* FIXME */
1642   GNUNET_SERVER_notification_context_add (nc, client);
1643   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1644 }
1645
1646
1647 /**
1648  * Callback for the transmit functions of multicast.
1649  */
1650 static int
1651 transmit_notify (void *cls, size_t *data_size, void *data)
1652 {
1653   struct Channel *ch = cls;
1654   struct TransmitMessage *tmit_msg = ch->tmit_head;
1655
1656   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1657   {
1658     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                 "%p transmit_notify: nothing to send.\n", ch);
1660     *data_size = 0;
1661     return GNUNET_NO;
1662   }
1663
1664   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665               "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
1666
1667   *data_size = tmit_msg->size;
1668   memcpy (data, &tmit_msg[1], *data_size);
1669
1670   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1671   if (NULL != tmit_msg->client)
1672     send_message_ack (ch, tmit_msg->client);
1673
1674   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1675   GNUNET_free (tmit_msg);
1676
1677   if (0 == ch->tmit_task)
1678   {
1679     if (NULL != ch->tmit_head)
1680     {
1681       transmit_message (ch);
1682     }
1683     else if (ch->disconnected)
1684     {
1685       /* FIXME: handle partial message (when still in_transmit) */
1686       cleanup_channel (ch);
1687     }
1688   }
1689
1690   return ret;
1691 }
1692
1693
1694 /**
1695  * Callback for the transmit functions of multicast.
1696  */
1697 static int
1698 master_transmit_notify (void *cls, size_t *data_size, void *data)
1699 {
1700   int ret = transmit_notify (cls, data_size, data);
1701
1702   if (GNUNET_YES == ret)
1703   {
1704     struct Master *mst = cls;
1705     mst->tmit_handle = NULL;
1706   }
1707   return ret;
1708 }
1709
1710
1711 /**
1712  * Callback for the transmit functions of multicast.
1713  */
1714 static int
1715 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1716 {
1717   int ret = transmit_notify (cls, data_size, data);
1718
1719   if (GNUNET_YES == ret)
1720   {
1721     struct Slave *slv = cls;
1722     slv->tmit_handle = NULL;
1723   }
1724   return ret;
1725 }
1726
1727
1728 /**
1729  * Transmit a message from a channel master to the multicast group.
1730  */
1731 static void
1732 master_transmit_message (struct Master *mst)
1733 {
1734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1735   mst->ch.tmit_task = 0;
1736   if (NULL == mst->tmit_handle)
1737   {
1738     mst->tmit_handle
1739       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1740                                         mst->max_group_generation,
1741                                         master_transmit_notify, mst);
1742   }
1743   else
1744   {
1745     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1746   }
1747 }
1748
1749
1750 /**
1751  * Transmit a message from a channel slave to the multicast group.
1752  */
1753 static void
1754 slave_transmit_message (struct Slave *slv)
1755 {
1756   slv->ch.tmit_task = 0;
1757   if (NULL == slv->tmit_handle)
1758   {
1759     slv->tmit_handle
1760       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1761                                            slave_transmit_notify, slv);
1762   }
1763   else
1764   {
1765     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1766   }
1767 }
1768
1769
1770 static inline void
1771 transmit_message (struct Channel *ch)
1772 {
1773   ch->is_master
1774     ? master_transmit_message ((struct Master *) ch)
1775     : slave_transmit_message ((struct Slave *) ch);
1776 }
1777
1778
1779 /**
1780  * Queue a message from a channel master for sending to the multicast group.
1781  */
1782 static void
1783 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1784                      uint16_t first_ptype, uint16_t last_ptype)
1785 {
1786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1787
1788   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1789   {
1790     tmit_msg->id = ++mst->max_message_id;
1791     struct GNUNET_PSYC_MessageMethod *pmeth
1792       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1793
1794     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1795     {
1796       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1797     }
1798     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1799     {
1800       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1801                                           - mst->max_state_message_id);
1802     }
1803     else
1804     {
1805       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1806     }
1807   }
1808 }
1809
1810
1811 /**
1812  * Queue a message from a channel slave for sending to the multicast group.
1813  */
1814 static void
1815 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1816                      uint16_t first_ptype, uint16_t last_ptype)
1817 {
1818   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1819   {
1820     struct GNUNET_PSYC_MessageMethod *pmeth
1821       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1822     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1823     tmit_msg->id = ++slv->max_request_id;
1824   }
1825 }
1826
1827
1828 /**
1829  * Queue PSYC message parts for sending to multicast.
1830  *
1831  * @param ch           Channel to send to.
1832  * @param client       Client the message originates from.
1833  * @param data_size    Size of @a data.
1834  * @param data         Concatenated message parts.
1835  * @param first_ptype  First message part type in @a data.
1836  * @param last_ptype   Last message part type in @a data.
1837  */
1838 static void
1839 queue_message (struct Channel *ch,
1840                struct GNUNET_SERVER_Client *client,
1841                size_t data_size,
1842                const void *data,
1843                uint16_t first_ptype, uint16_t last_ptype)
1844 {
1845   struct TransmitMessage *
1846     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
1847   memcpy (&tmit_msg[1], data, data_size);
1848   tmit_msg->client = client;
1849   tmit_msg->size = data_size;
1850   tmit_msg->state = ch->tmit_state;
1851
1852   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1853
1854   ch->is_master
1855     ? master_queue_message ((struct Master *) ch, tmit_msg,
1856                             first_ptype, last_ptype)
1857     : slave_queue_message ((struct Slave *) ch, tmit_msg,
1858                            first_ptype, last_ptype);
1859 }
1860
1861
1862 /**
1863  * Cancel transmission of current message.
1864  *
1865  * @param ch      Channel to send to.
1866  * @param client  Client the message originates from.
1867  */
1868 static void
1869 transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1870 {
1871   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1872
1873   struct GNUNET_MessageHeader msg;
1874   msg.size = htons (sizeof (msg));
1875   msg.type = htons (type);
1876
1877   queue_message (ch, client, sizeof (msg), &msg, type, type);
1878   transmit_message (ch);
1879
1880   /* FIXME: cleanup */
1881 }
1882
1883
1884 /**
1885  * Incoming message from a master or slave client.
1886  */
1887 static void
1888 client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1889                      const struct GNUNET_MessageHeader *msg)
1890 {
1891   struct Channel *
1892     ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1893   GNUNET_assert (NULL != ch);
1894
1895   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1896               "%p Received message from client.\n", ch);
1897   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1898
1899   if (GNUNET_YES != ch->ready)
1900   {
1901     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1902                 "%p Channel is not ready, dropping message from client.\n", ch);
1903     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1904     return;
1905   }
1906
1907   uint16_t size = ntohs (msg->size);
1908   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1909   {
1910     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch);
1911     GNUNET_break (0);
1912     transmit_cancel (ch, client);
1913     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1914     return;
1915   }
1916
1917   uint16_t first_ptype = 0, last_ptype = 0;
1918   if (GNUNET_SYSERR
1919       == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1920                                           (const char *) &msg[1],
1921                                           &first_ptype, &last_ptype))
1922   {
1923     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1924                 "%p Received invalid message part from client.\n", ch);
1925     GNUNET_break (0);
1926     transmit_cancel (ch, client);
1927     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1928     return;
1929   }
1930
1931   queue_message (ch, client, size - sizeof (*msg), &msg[1],
1932                  first_ptype, last_ptype);
1933   transmit_message (ch);
1934
1935   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1936 };
1937
1938
1939 /**
1940  * Client requests to add a slave to the membership database.
1941  */
1942 static void
1943 client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1944                   const struct GNUNET_MessageHeader *msg)
1945 {
1946
1947 }
1948
1949
1950 /**
1951  * Client requests to remove a slave from the membership database.
1952  */
1953 static void
1954 client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1955                      const struct GNUNET_MessageHeader *msg)
1956 {
1957
1958 }
1959
1960
1961 /**
1962  * Client requests channel history from PSYCstore.
1963  */
1964 static void
1965 client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1966                       const struct GNUNET_MessageHeader *msg)
1967 {
1968
1969 }
1970
1971
1972 /**
1973  * Client requests best matching state variable from PSYCstore.
1974  */
1975 static void
1976 client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1977                   const struct GNUNET_MessageHeader *msg)
1978 {
1979
1980 }
1981
1982
1983 /**
1984  * Client requests state variables with a given prefix from PSYCstore.
1985  */
1986 static void
1987 client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1988                          const struct GNUNET_MessageHeader *msg)
1989 {
1990
1991 }
1992
1993
1994 /**
1995  * Initialize the PSYC service.
1996  *
1997  * @param cls Closure.
1998  * @param server The initialized server.
1999  * @param c Configuration to use.
2000  */
2001 static void
2002 run (void *cls, struct GNUNET_SERVER_Handle *server,
2003      const struct GNUNET_CONFIGURATION_Handle *c)
2004 {
2005   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2006     { &client_master_start, NULL,
2007       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2008
2009     { &client_slave_join, NULL,
2010       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2011
2012     { &client_join_decision, NULL,
2013       GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2014
2015     { &client_psyc_message, NULL,
2016       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2017
2018     { &client_slave_add, NULL,
2019       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
2020
2021     { &client_slave_remove, NULL,
2022       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
2023
2024     { &client_story_request, NULL,
2025       GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2026
2027     { &client_state_get, NULL,
2028       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2029
2030     { &client_state_get_prefix, NULL,
2031       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
2032   };
2033
2034   cfg = c;
2035   store = GNUNET_PSYCSTORE_connect (cfg);
2036   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2037   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2038   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2039   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2040   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2041   nc = GNUNET_SERVER_notification_context_create (server, 1);
2042   GNUNET_SERVER_add_handlers (server, handlers);
2043   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2044   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2045                                 &shutdown_task, NULL);
2046 }
2047
2048
2049 /**
2050  * The main function for the service.
2051  *
2052  * @param argc number of arguments from the command line
2053  * @param argv command line arguments
2054  * @return 0 ok, 1 on error
2055  */
2056 int
2057 main (int argc, char *const *argv)
2058 {
2059   return (GNUNET_OK ==
2060           GNUNET_SERVICE_run (argc, argv, "psyc",
2061                               GNUNET_SERVICE_OPTION_NONE,
2062                               &run, NULL)) ? 0 : 1;
2063 }
2064
2065 /* end of gnunet-service-psyc.c */