-indent, doxygen
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of @a buf
97    */
98   uint16_t size;
99
100   /**
101    * @see enum MessageState
102    */
103   uint8_t state;
104
105   /* Followed by message */
106 };
107
108
109 /**
110  * Cache for received message fragments.
111  * Message fragments are only sent to clients after all modifiers arrived.
112  *
113  * chan_key -> MultiHashMap chan_msgs
114  */
115 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
116
117
118 /**
119  * Entry in the chan_msgs hashmap of @a recv_cache:
120  * fragment_id -> RecvCacheEntry
121  */
122 struct RecvCacheEntry
123 {
124   struct GNUNET_MULTICAST_MessageHeader *mmsg;
125   uint16_t ref_count;
126 };
127
128
129 /**
130  * Entry in the @a recv_frags hash map of a @a Channel.
131  * message_id -> FragmentQueue
132  */
133 struct FragmentQueue
134 {
135   /**
136    * Fragment IDs stored in @a recv_cache.
137    */
138   struct GNUNET_CONTAINER_Heap *fragments;
139
140   /**
141    * Total size of received fragments.
142    */
143   uint64_t size;
144
145   /**
146    * Total size of received header fragments (METHOD & MODIFIERs)
147    */
148   uint64_t header_size;
149
150   /**
151    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
152    */
153   uint64_t state_delta;
154
155   /**
156    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint32_t flags;
159
160   /**
161    * Receive state of message.
162    *
163    * @see MessageFragmentState
164    */
165   uint8_t state;
166
167   /**
168    * Is the message queued for delivery to the client?
169    * i.e. added to the recv_msgs queue
170    */
171   uint8_t queued;
172 };
173
174
175 /**
176  * List of connected clients.
177  */
178 struct ClientListItem
179 {
180   struct ClientListItem *prev;
181   struct ClientListItem *next;
182   struct GNUNET_SERVER_Client *client;
183 };
184
185
186 /**
187  * Common part of the client context for both a channel master and slave.
188  */
189 struct Channel
190 {
191   struct ClientListItem *clients_head;
192   struct ClientListItem *clients_tail;
193
194   struct TransmitMessage *tmit_head;
195   struct TransmitMessage *tmit_tail;
196
197   /**
198    * Current PSYCstore operation.
199    */
200   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
201
202   /**
203    * Received fragments not yet sent to the client.
204    * message_id -> FragmentQueue
205    */
206   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
207
208   /**
209    * Received message IDs not yet sent to the client.
210    */
211   struct GNUNET_CONTAINER_Heap *recv_msgs;
212
213   /**
214    * FIXME: needed?
215    */
216   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
217
218   /**
219    * Public key of the channel.
220    */
221   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
222
223   /**
224    * Hash of @a pub_key.
225    */
226   struct GNUNET_HashCode pub_key_hash;
227
228   /**
229    * Last message ID sent to the client.
230    * 0 if there is no such message.
231    */
232   uint64_t max_message_id;
233
234   /**
235    * ID of the last stateful message, where the state operations has been
236    * processed and saved to PSYCstore and which has been sent to the client.
237    * 0 if there is no such message.
238    */
239   uint64_t max_state_message_id;
240
241   /**
242    * Expected value size for the modifier being received from the PSYC service.
243    */
244   uint32_t tmit_mod_value_size_expected;
245
246   /**
247    * Actual value size for the modifier being received from the PSYC service.
248    */
249   uint32_t tmit_mod_value_size;
250
251   /**
252    * @see enum MessageState
253    */
254   uint8_t tmit_state;
255
256   /**
257    * FIXME: needed?
258    */
259   uint8_t in_transmit;
260
261   /**
262    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
263    */
264   uint8_t is_master;
265
266   /**
267    * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
268    */
269   uint8_t ready;
270
271   /**
272    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
273    */
274   uint8_t disconnected;
275 };
276
277
278 /**
279  * Client context for a channel master.
280  */
281 struct Master
282 {
283   /**
284    * Channel struct common for Master and Slave
285    */
286   struct Channel chn;
287
288   /**
289    * Private key of the channel.
290    */
291   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
292
293   /**
294    * Handle for the multicast origin.
295    */
296   struct GNUNET_MULTICAST_Origin *origin;
297
298   /**
299    * Transmit handle for multicast.
300    */
301   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
302
303   /**
304    * Incoming join requests from multicast.
305    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
306    */
307   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
308
309   /**
310    * Last message ID transmitted to this channel.
311    *
312    * Incremented before sending a message, thus the message_id in messages sent
313    * starts from 1.
314    */
315   uint64_t max_message_id;
316
317   /**
318    * ID of the last message with state operations transmitted to the channel.
319    * 0 if there is no such message.
320    */
321   uint64_t max_state_message_id;
322
323   /**
324    * Maximum group generation transmitted to the channel.
325    */
326   uint64_t max_group_generation;
327
328   /**
329    * @see enum GNUNET_PSYC_Policy
330    */
331   enum GNUNET_PSYC_Policy policy;
332 };
333
334
335 /**
336  * Client context for a channel slave.
337  */
338 struct Slave
339 {
340   /**
341    * Channel struct common for Master and Slave
342    */
343   struct Channel chn;
344
345   /**
346    * Private key of the slave.
347    */
348   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
349
350   /**
351    * Public key of the slave.
352    */
353   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
354
355   /**
356    * Hash of @a pub_key.
357    */
358   struct GNUNET_HashCode pub_key_hash;
359
360   /**
361    * Handle for the multicast member.
362    */
363   struct GNUNET_MULTICAST_Member *member;
364
365   /**
366    * Transmit handle for multicast.
367    */
368   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
369
370   /**
371    * Peer identity of the origin.
372    */
373   struct GNUNET_PeerIdentity origin;
374
375   /**
376    * Number of items in @a relays.
377    */
378   uint32_t relay_count;
379
380   /**
381    * Relays that multicast can use to connect.
382    */
383   struct GNUNET_PeerIdentity *relays;
384
385   /**
386    * Join request to be transmitted to the master on join.
387    */
388   struct GNUNET_MessageHeader *join_req;
389
390   /**
391    * Join decision received from multicast.
392    */
393   struct SlaveJoinDecision *join_dcsn;
394
395   /**
396    * Maximum request ID for this channel.
397    */
398   uint64_t max_request_id;
399 };
400
401
402 static inline void
403 transmit_message (struct Channel *chn);
404
405
406 static uint64_t
407 message_queue_drop (struct Channel *chn);
408
409
410 /**
411  * Task run during shutdown.
412  *
413  * @param cls unused
414  * @param tc unused
415  */
416 static void
417 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
418 {
419   if (NULL != nc)
420   {
421     GNUNET_SERVER_notification_context_destroy (nc);
422     nc = NULL;
423   }
424   if (NULL != stats)
425   {
426     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
427     stats = NULL;
428   }
429 }
430
431
432 /**
433  * Clean up master data structures after a client disconnected.
434  */
435 static void
436 cleanup_master (struct Master *mst)
437 {
438   struct Channel *chn = &mst->chn;
439
440   if (NULL != mst->origin)
441     GNUNET_MULTICAST_origin_stop (mst->origin);
442   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
443   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
444 }
445
446
447 /**
448  * Clean up slave data structures after a client disconnected.
449  */
450 static void
451 cleanup_slave (struct Slave *slv)
452 {
453   struct Channel *chn = &slv->chn;
454   struct GNUNET_CONTAINER_MultiHashMap *
455     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
456                                                 &chn->pub_key_hash);
457   GNUNET_assert (NULL != chn_slv);
458   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
459
460   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
461   {
462     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
463                                           chn_slv);
464     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
465   }
466   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
467
468   if (NULL != slv->join_req)
469     GNUNET_free (slv->join_req);
470   if (NULL != slv->relays)
471     GNUNET_free (slv->relays);
472   if (NULL != slv->member)
473     GNUNET_MULTICAST_member_part (slv->member);
474   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
475 }
476
477
478 /**
479  * Clean up channel data structures after a client disconnected.
480  */
481 static void
482 cleanup_channel (struct Channel *chn)
483 {
484   message_queue_drop (chn);
485   GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
486
487   if (NULL != chn->store_op)
488     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
489
490   (GNUNET_YES == chn->is_master)
491     ? cleanup_master ((struct Master *) chn)
492     : cleanup_slave ((struct Slave *) chn);
493   GNUNET_free (chn);
494 }
495
496
497 /**
498  * Called whenever a client is disconnected.
499  * Frees our resources associated with that client.
500  *
501  * @param cls Closure.
502  * @param client Identification of the client.
503  */
504 static void
505 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
506 {
507   if (NULL == client)
508     return;
509
510   struct Channel *
511     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
512
513   if (NULL == chn)
514   {
515     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
516                 "%p User context is NULL in client_disconnect()\n", chn);
517     return;
518   }
519
520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521               "%p Client (%s) disconnected from channel %s\n",
522               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
523               GNUNET_h2s (&chn->pub_key_hash));
524
525   struct ClientListItem *cli = chn->clients_head;
526   while (NULL != cli)
527   {
528     if (cli->client == client)
529     {
530       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
531       GNUNET_free (cli);
532       break;
533     }
534     cli = cli->next;
535   }
536
537   if (NULL == chn->clients_head)
538   { /* Last client disconnected. */
539     if (NULL != chn->tmit_head)
540     { /* Send pending messages to multicast before cleanup. */
541       transmit_message (chn);
542     }
543     else
544     {
545       cleanup_channel (chn);
546     }
547   }
548 }
549
550
551 /**
552  * Send message to all clients connected to the channel.
553  */
554 static void
555 client_send_msg (const struct Channel *chn,
556                  const struct GNUNET_MessageHeader *msg)
557 {
558   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
559               "%p Sending message to clients.\n", chn);
560
561   struct ClientListItem *cli = chn->clients_head;
562   while (NULL != cli)
563   {
564     GNUNET_SERVER_notification_context_add (nc, cli->client);
565     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
566     cli = cli->next;
567   }
568 }
569
570
571 /**
572  * Closure for join_mem_test_cb()
573  */
574 struct JoinMemTestClosure
575 {
576   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
577   struct Channel *chn;
578   struct GNUNET_MULTICAST_JoinHandle *jh;
579   struct MasterJoinRequest *master_join_req;
580 };
581
582
583 /**
584  * Membership test result callback used for join requests.
585  */
586 static void
587 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
588 {
589   struct JoinMemTestClosure *jcls = cls;
590
591   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
592   { /* Pass on join request to client if this is a master channel */
593     struct Master *mst = (struct Master *) jcls->chn;
594     struct GNUNET_HashCode slave_key_hash;
595     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
596                         &slave_key_hash);
597     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
598                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
599     client_send_msg (jcls->chn, &jcls->master_join_req->header);
600   }
601   else
602   {
603     // FIXME: add relays
604     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
605   }
606   GNUNET_free (jcls->master_join_req);
607   GNUNET_free (jcls);
608 }
609
610
611 /**
612  * Incoming join request from multicast.
613  */
614 static void
615 mcast_recv_join_request (void *cls,
616                          const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
617                          const struct GNUNET_MessageHeader *join_msg,
618                          struct GNUNET_MULTICAST_JoinHandle *jh)
619 {
620   struct Channel *chn = cls;
621   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
622
623   uint16_t join_msg_size = 0;
624   if (NULL != join_msg)
625   {
626     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
627     {
628       join_msg_size = ntohs (join_msg->size);
629     }
630     else
631     {
632       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
633                   "%p Got join message with invalid type %u.\n",
634                   chn, ntohs (join_msg->type));
635     }
636   }
637
638   struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size);
639   req->header.size = htons (sizeof (*req) + join_msg_size);
640   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
641   req->slave_key = *slave_key;
642   if (0 < join_msg_size)
643     memcpy (&req[1], join_msg, join_msg_size);
644
645   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
646   jcls->slave_key = *slave_key;
647   jcls->chn = chn;
648   jcls->jh = jh;
649   jcls->master_join_req = req;
650
651   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
652                                     chn->max_message_id, 0,
653                                     &join_mem_test_cb, jcls);
654 }
655
656
657 /**
658  * Join decision received from multicast.
659  */
660 static void
661 mcast_recv_join_decision (void *cls, int is_admitted,
662                         const struct GNUNET_PeerIdentity *peer,
663                         uint16_t relay_count,
664                         const struct GNUNET_PeerIdentity *relays,
665                         const struct GNUNET_MessageHeader *join_resp)
666 {
667   struct Slave *slv = cls;
668   struct Channel *chn = &slv->chn;
669   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670               "%p Got join decision: %d\n", slv, is_admitted);
671
672   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
673   struct SlaveJoinDecision *
674     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
675   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
676   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
677   dcsn->is_admitted = htonl (is_admitted);
678   if (0 < join_resp_size)
679     memcpy (&dcsn[1], join_resp, join_resp_size);
680
681   client_send_msg (chn, &dcsn->header);
682
683   if (GNUNET_YES == is_admitted)
684   {
685     chn->ready = GNUNET_YES;
686   }
687   else
688   {
689     slv->member = NULL;
690   }
691 }
692
693
694 static void
695 mcast_recv_membership_test (void *cls,
696                             const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
697                             uint64_t message_id, uint64_t group_generation,
698                             struct GNUNET_MULTICAST_MembershipTestHandle *mth)
699 {
700
701 }
702
703
704 static void
705 mcast_recv_replay_fragment (void *cls,
706                             const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
707                             uint64_t fragment_id, uint64_t flags,
708                             struct GNUNET_MULTICAST_ReplayHandle *rh)
709
710 {
711
712 }
713
714
715 static void
716 mcast_recv_replay_message (void *cls,
717                            const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
718                            uint64_t message_id,
719                            uint64_t fragment_offset,
720                            uint64_t flags,
721                            struct GNUNET_MULTICAST_ReplayHandle *rh)
722 {
723
724 }
725
726
727 /**
728  * Convert an uint64_t in network byte order to a HashCode
729  * that can be used as key in a MultiHashMap
730  */
731 static inline void
732 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
733 {
734   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
735   /* TODO: use built-in byte swap functions if available */
736
737   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
738   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
739
740   *key = (struct GNUNET_HashCode) {{ 0 }};
741   *((uint64_t *) key)
742     = (n << 32) | (n >> 32);
743 }
744
745
746 /**
747  * Convert an uint64_t in host byte order to a HashCode
748  * that can be used as key in a MultiHashMap
749  */
750 static inline void
751 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
752 {
753 #if __BYTE_ORDER == __BIG_ENDIAN
754   hash_key_from_nll (key, n);
755 #elif __BYTE_ORDER == __LITTLE_ENDIAN
756   *key = (struct GNUNET_HashCode) {{ 0 }};
757   *((uint64_t *) key) = n;
758 #else
759   #error byteorder undefined
760 #endif
761 }
762
763
764 /**
765  * Send multicast message to all clients connected to the channel.
766  */
767 static void
768 client_send_mcast_msg (struct Channel *chn,
769                        const struct GNUNET_MULTICAST_MessageHeader *mmsg)
770 {
771   struct GNUNET_PSYC_MessageHeader *pmsg;
772   uint16_t size = ntohs (mmsg->header.size);
773   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
774
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "%p Sending multicast message to client. "
777               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
778               chn, GNUNET_ntohll (mmsg->fragment_id),
779               GNUNET_ntohll (mmsg->message_id));
780
781   pmsg = GNUNET_malloc (psize);
782   pmsg->header.size = htons (psize);
783   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
784   pmsg->message_id = mmsg->message_id;
785
786   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
787   client_send_msg (chn, &pmsg->header);
788   GNUNET_free (pmsg);
789 }
790
791
792 /**
793  * Send multicast request to all clients connected to the channel.
794  */
795 static void
796 client_send_mcast_req (struct Master *mst,
797                        const struct GNUNET_MULTICAST_RequestHeader *req)
798 {
799   struct Channel *chn = &mst->chn;
800
801   struct GNUNET_PSYC_MessageHeader *pmsg;
802   uint16_t size = ntohs (req->header.size);
803   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
804
805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806               "%p Sending multicast request to client. "
807               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
808               chn, GNUNET_ntohll (req->fragment_id),
809               GNUNET_ntohll (req->request_id));
810
811   pmsg = GNUNET_malloc (psize);
812   pmsg->header.size = htons (psize);
813   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
814   pmsg->message_id = req->request_id;
815   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
816
817   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
818   client_send_msg (chn, &pmsg->header);
819   GNUNET_free (pmsg);
820 }
821
822
823 /**
824  * Insert a multicast message fragment into the queue belonging to the message.
825  *
826  * @param chn           Channel.
827  * @param mmsg         Multicast message fragment.
828  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
829  * @param first_ptype  First PSYC message part type in @a mmsg.
830  * @param last_ptype   Last PSYC message part type in @a mmsg.
831  */
832 static void
833 fragment_queue_insert (struct Channel *chn,
834                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
835                        uint16_t first_ptype, uint16_t last_ptype)
836 {
837   const uint16_t size = ntohs (mmsg->header.size);
838   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
839   struct GNUNET_CONTAINER_MultiHashMap
840     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
841                                                     &chn->pub_key_hash);
842
843   struct GNUNET_HashCode msg_id_hash;
844   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
845
846   struct FragmentQueue
847     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
848
849   if (NULL == fragq)
850   {
851     fragq = GNUNET_new (struct FragmentQueue);
852     fragq->state = MSG_FRAG_STATE_HEADER;
853     fragq->fragments
854       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
855
856     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
857                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
858
859     if (NULL == chan_msgs)
860     {
861       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
862       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
863                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
864     }
865   }
866
867   struct GNUNET_HashCode frag_id_hash;
868   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
869   struct RecvCacheEntry
870     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
871   if (NULL == cache_entry)
872   {
873     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874                 "%p Adding message fragment to cache. "
875                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
876                 "header_size: %" PRIu64 " + %u).\n",
877                 chn, GNUNET_ntohll (mmsg->message_id),
878                 GNUNET_ntohll (mmsg->fragment_id),
879                 fragq->header_size, size);
880     cache_entry = GNUNET_new (struct RecvCacheEntry);
881     cache_entry->ref_count = 1;
882     cache_entry->mmsg = GNUNET_malloc (size);
883     memcpy (cache_entry->mmsg, mmsg, size);
884     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
885                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
886   }
887   else
888   {
889     cache_entry->ref_count++;
890     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891                 "%p Message fragment is already in cache. "
892                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
893                 ", ref_count: %u\n",
894                 chn, GNUNET_ntohll (mmsg->message_id),
895                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
896   }
897
898   if (MSG_FRAG_STATE_HEADER == fragq->state)
899   {
900     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
901     {
902       struct GNUNET_PSYC_MessageMethod *
903         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
904       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
905       fragq->flags = ntohl (pmeth->flags);
906     }
907
908     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
909     {
910       fragq->header_size += size;
911     }
912     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
913              || frag_offset == fragq->header_size)
914     { /* header is now complete */
915       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
916                   "%p Header of message %" PRIu64 " is complete.\n",
917                   chn, GNUNET_ntohll (mmsg->message_id));
918
919       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
920                   "%p Adding message %" PRIu64 " to queue.\n",
921                   chn, GNUNET_ntohll (mmsg->message_id));
922       fragq->state = MSG_FRAG_STATE_DATA;
923     }
924     else
925     {
926       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
927                   "%p Header of message %" PRIu64 " is NOT complete yet: "
928                   "%" PRIu64 " != %" PRIu64 "\n",
929                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
930                   fragq->header_size);
931     }
932   }
933
934   switch (last_ptype)
935   {
936   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
937     if (frag_offset == fragq->size)
938       fragq->state = MSG_FRAG_STATE_END;
939     else
940       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
941                   "%p Message %" PRIu64 " is NOT complete yet: "
942                   "%" PRIu64 " != %" PRIu64 "\n",
943                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
944                   fragq->size);
945     break;
946
947   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
948     /* Drop message without delivering to client if it's a single fragment */
949     fragq->state =
950       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
951       ? MSG_FRAG_STATE_DROP
952       : MSG_FRAG_STATE_CANCEL;
953   }
954
955   switch (fragq->state)
956   {
957   case MSG_FRAG_STATE_DATA:
958   case MSG_FRAG_STATE_END:
959   case MSG_FRAG_STATE_CANCEL:
960     if (GNUNET_NO == fragq->queued)
961     {
962       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
963                                     GNUNET_ntohll (mmsg->message_id));
964       fragq->queued = GNUNET_YES;
965     }
966   }
967
968   fragq->size += size;
969   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
970                                 GNUNET_ntohll (mmsg->fragment_id));
971 }
972
973
974 /**
975  * Run fragment queue of a message.
976  *
977  * Send fragments of a message in order to client, after all modifiers arrived
978  * from multicast.
979  *
980  * @param chn      Channel.
981  * @param msg_id  ID of the message @a fragq belongs to.
982  * @param fragq   Fragment queue of the message.
983  * @param drop    Drop message without delivering to client?
984  *                #GNUNET_YES or #GNUNET_NO.
985  */
986 static void
987 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
988                     struct FragmentQueue *fragq, uint8_t drop)
989 {
990   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
991               "%p Running message fragment queue for message %" PRIu64
992               " (state: %u).\n",
993               chn, msg_id, fragq->state);
994
995   struct GNUNET_CONTAINER_MultiHashMap
996     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
997                                                     &chn->pub_key_hash);
998   GNUNET_assert (NULL != chan_msgs);
999   uint64_t frag_id;
1000
1001   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1002                                                     &frag_id))
1003   {
1004     struct GNUNET_HashCode frag_id_hash;
1005     hash_key_from_hll (&frag_id_hash, frag_id);
1006     struct RecvCacheEntry *cache_entry
1007       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1008     if (cache_entry != NULL)
1009     {
1010       if (GNUNET_NO == drop)
1011       {
1012         client_send_mcast_msg (chn, cache_entry->mmsg);
1013       }
1014       if (cache_entry->ref_count <= 1)
1015       {
1016         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1017                                               cache_entry);
1018         GNUNET_free (cache_entry->mmsg);
1019         GNUNET_free (cache_entry);
1020       }
1021       else
1022       {
1023         cache_entry->ref_count--;
1024       }
1025     }
1026 #if CACHE_AGING_IMPLEMENTED
1027     else if (GNUNET_NO == drop)
1028     {
1029       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1030     }
1031 #endif
1032
1033     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1034   }
1035
1036   if (MSG_FRAG_STATE_END <= fragq->state)
1037   {
1038     struct GNUNET_HashCode msg_id_hash;
1039     hash_key_from_nll (&msg_id_hash, msg_id);
1040
1041     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1042     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1043     GNUNET_free (fragq);
1044   }
1045   else
1046   {
1047     fragq->queued = GNUNET_NO;
1048   }
1049 }
1050
1051
1052 /**
1053  * Run message queue.
1054  *
1055  * Send messages in queue to client in order after a message has arrived from
1056  * multicast, according to the following:
1057  * - A message is only sent if all of its modifiers arrived.
1058  * - A stateful message is only sent if the previous stateful message
1059  *   has already been delivered to the client.
1060  *
1061  * @param chn  Channel.
1062  *
1063  * @return Number of messages removed from queue and sent to client.
1064  */
1065 static uint64_t
1066 message_queue_run (struct Channel *chn)
1067 {
1068   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1069               "%p Running message queue.\n", chn);
1070   uint64_t n = 0;
1071   uint64_t msg_id;
1072   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1073                                                     &msg_id))
1074   {
1075     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1076                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1077     struct GNUNET_HashCode msg_id_hash;
1078     hash_key_from_hll (&msg_id_hash, msg_id);
1079
1080     struct FragmentQueue *
1081       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1082
1083     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1084     {
1085       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1086                   "%p No fragq (%p) or header not complete.\n",
1087                   chn, fragq);
1088       break;
1089     }
1090
1091     if (MSG_FRAG_STATE_HEADER == fragq->state)
1092     {
1093       /* Check if there's a missing message before the current one */
1094       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1095       {
1096         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1097             && msg_id - 1 != chn->max_message_id)
1098         {
1099           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1100                       "%p Out of order message. "
1101                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1102                       chn, msg_id, chn->max_message_id);
1103           break;
1104         }
1105       }
1106       else
1107       {
1108         if (msg_id - fragq->state_delta != chn->max_state_message_id)
1109         {
1110           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1111                       "%p Out of order stateful message. "
1112                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1113                       chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1114           break;
1115         }
1116 #if TODO
1117         /* FIXME: apply modifiers to state in PSYCstore */
1118         GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1119                                        store_recv_state_modify_result, cls);
1120 #endif
1121         chn->max_state_message_id = msg_id;
1122       }
1123       chn->max_message_id = msg_id;
1124     }
1125     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1126     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1127     n++;
1128   }
1129   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1131   return n;
1132 }
1133
1134
1135 /**
1136  * Drop message queue of a channel.
1137  *
1138  * Remove all messages in queue without sending it to clients.
1139  *
1140  * @param chn  Channel.
1141  *
1142  * @return Number of messages removed from queue.
1143  */
1144 static uint64_t
1145 message_queue_drop (struct Channel *chn)
1146 {
1147   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1148               "%p Dropping message queue.\n", chn);
1149   uint64_t n = 0;
1150   uint64_t msg_id;
1151   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1152                                                     &msg_id))
1153   {
1154     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1155                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1156     struct GNUNET_HashCode msg_id_hash;
1157     hash_key_from_hll (&msg_id_hash, msg_id);
1158
1159     struct FragmentQueue *
1160       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1161
1162     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1163     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1164     n++;
1165   }
1166   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1168   return n;
1169 }
1170
1171
1172 /**
1173  * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
1174  */
1175 static void
1176 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1177 {
1178   struct Channel *chn = cls;
1179   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1181               chn, result, err_msg);
1182 }
1183
1184
1185 /**
1186  * Handle incoming message fragment from multicast.
1187  *
1188  * Store it using PSYCstore and send it to the clients of the channel in order.
1189  */
1190 static void
1191 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1192 {
1193   struct Channel *chn = cls;
1194   uint16_t size = ntohs (mmsg->header.size);
1195
1196   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197               "%p Received multicast message of size %u.\n",
1198               chn, size);
1199
1200   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1201                                    &store_recv_fragment_store_result, chn);
1202
1203   uint16_t first_ptype = 0, last_ptype = 0;
1204   if (GNUNET_SYSERR
1205       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1206                                           (const char *) &mmsg[1],
1207                                           &first_ptype, &last_ptype))
1208   {
1209     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1210                 "%p Dropping incoming multicast message with invalid parts.\n",
1211                 chn);
1212     GNUNET_break_op (0);
1213     return;
1214   }
1215
1216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217               "Message parts: first: type %u, last: type %u\n",
1218               first_ptype, last_ptype);
1219
1220   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1221   message_queue_run (chn);
1222 }
1223
1224
1225 /**
1226  * Incoming request fragment from multicast for a master.
1227  *
1228  * @param cls           Master.
1229  * @param slave_key     Sending slave's public key.
1230  * @param msg           The message.
1231  * @param flags         Request flags.
1232  */
1233 static void
1234 mcast_recv_request (void *cls,
1235                     const struct GNUNET_MULTICAST_RequestHeader *req)
1236 {
1237   struct Master *mst = cls;
1238   uint16_t size = ntohs (req->header.size);
1239
1240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241               "%p Received multicast request of size %u.\n",
1242               mst, size);
1243
1244   uint16_t first_ptype = 0, last_ptype = 0;
1245   if (GNUNET_SYSERR
1246       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1247                                           (const char *) &req[1],
1248                                           &first_ptype, &last_ptype))
1249   {
1250     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1251                 "%p Dropping incoming multicast request with invalid parts.\n",
1252                 mst);
1253     GNUNET_break_op (0);
1254     return;
1255   }
1256
1257   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258               "Message parts: first: type %u, last: type %u\n",
1259               first_ptype, last_ptype);
1260
1261   /* FIXME: in-order delivery */
1262   client_send_mcast_req (mst, req);
1263 }
1264
1265
1266 /**
1267  * Response from PSYCstore with the current counter values for a channel master.
1268  */
1269 static void
1270 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1271                             uint64_t max_message_id, uint64_t max_group_generation,
1272                             uint64_t max_state_message_id)
1273 {
1274   struct Master *mst = cls;
1275   struct Channel *chn = &mst->chn;
1276   chn->store_op = NULL;
1277
1278   struct CountersResult res;
1279   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1280   res.header.size = htons (sizeof (res));
1281   res.result_code = htonl (result);
1282   res.max_message_id = GNUNET_htonll (max_message_id);
1283
1284   if (GNUNET_OK == result || GNUNET_NO == result)
1285   {
1286     mst->max_message_id = max_message_id;
1287     chn->max_message_id = max_message_id;
1288     chn->max_state_message_id = max_state_message_id;
1289     mst->max_group_generation = max_group_generation;
1290     mst->origin
1291       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1292                                        &mcast_recv_join_request,
1293                                        &mcast_recv_membership_test,
1294                                        &mcast_recv_replay_fragment,
1295                                        &mcast_recv_replay_message,
1296                                        &mcast_recv_request,
1297                                        &mcast_recv_message, chn);
1298     chn->ready = GNUNET_YES;
1299   }
1300   else
1301   {
1302     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1303                 "%p GNUNET_PSYCSTORE_counters_get() "
1304                 "returned %d for channel %s.\n",
1305                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1306   }
1307
1308   client_send_msg (chn, &res.header);
1309 }
1310
1311
1312 /**
1313  * Response from PSYCstore with the current counter values for a channel slave.
1314  */
1315 void
1316 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1317                            uint64_t max_message_id, uint64_t max_group_generation,
1318                            uint64_t max_state_message_id)
1319 {
1320   struct Slave *slv = cls;
1321   struct Channel *chn = &slv->chn;
1322   chn->store_op = NULL;
1323
1324   struct CountersResult res;
1325   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1326   res.header.size = htons (sizeof (res));
1327   res.result_code = htonl (result);
1328   res.max_message_id = GNUNET_htonll (max_message_id);
1329
1330   if (GNUNET_OK == result || GNUNET_NO == result)
1331   {
1332     chn->max_message_id = max_message_id;
1333     chn->max_state_message_id = max_state_message_id;
1334     slv->member
1335       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1336                                       &slv->origin,
1337                                       slv->relay_count, slv->relays,
1338                                       slv->join_req,
1339                                       &mcast_recv_join_request,
1340                                       &mcast_recv_join_decision,
1341                                       &mcast_recv_membership_test,
1342                                       &mcast_recv_replay_fragment,
1343                                       &mcast_recv_replay_message,
1344                                       &mcast_recv_message, chn);
1345   }
1346   else
1347   {
1348     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1349                 "%p GNUNET_PSYCSTORE_counters_get() "
1350                 "returned %d for channel %s.\n",
1351                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1352   }
1353
1354   client_send_msg (chn, &res.header);
1355 }
1356
1357
1358 static void
1359 channel_init (struct Channel *chn)
1360 {
1361   chn->recv_msgs
1362     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1363   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1364 }
1365
1366
1367 /**
1368  * Handle a connecting client starting a channel master.
1369  */
1370 static void
1371 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1372                           const struct GNUNET_MessageHeader *msg)
1373 {
1374   const struct MasterStartRequest *req
1375     = (const struct MasterStartRequest *) msg;
1376
1377   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1378   struct GNUNET_HashCode pub_key_hash;
1379
1380   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1381   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1382
1383   struct Master *
1384     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1385   struct Channel *chn;
1386
1387   if (NULL == mst)
1388   {
1389     mst = GNUNET_new (struct Master);
1390     mst->policy = ntohl (req->policy);
1391     mst->priv_key = req->channel_key;
1392     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1393
1394     chn = &mst->chn;
1395     chn->is_master = GNUNET_YES;
1396     chn->pub_key = pub_key;
1397     chn->pub_key_hash = pub_key_hash;
1398     channel_init (chn);
1399
1400     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1401                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1402     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1403                                                   store_recv_master_counters, mst);
1404   }
1405   else
1406   {
1407     chn = &mst->chn;
1408
1409     struct CountersResult res;
1410     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1411     res.header.size = htons (sizeof (res));
1412     res.result_code = htonl (GNUNET_OK);
1413     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1414
1415     GNUNET_SERVER_notification_context_add (nc, client);
1416     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1417                                                 GNUNET_NO);
1418   }
1419
1420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421               "%p Client connected as master to channel %s.\n",
1422               mst, GNUNET_h2s (&chn->pub_key_hash));
1423
1424   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1425   cli->client = client;
1426   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1427
1428   GNUNET_SERVER_client_set_user_context (client, chn);
1429   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1430 }
1431
1432
1433 /**
1434  * Handle a connecting client joining as a channel slave.
1435  */
1436 static void
1437 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1438                         const struct GNUNET_MessageHeader *msg)
1439 {
1440   const struct SlaveJoinRequest *req
1441     = (const struct SlaveJoinRequest *) msg;
1442
1443   struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1444   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1445
1446   GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1447   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1448   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1449
1450   struct GNUNET_CONTAINER_MultiHashMap *
1451     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1452   struct Slave *slv = NULL;
1453   struct Channel *chn;
1454
1455   if (NULL != chn_slv)
1456   {
1457     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1458   }
1459   if (NULL == slv)
1460   {
1461     slv = GNUNET_new (struct Slave);
1462     slv->priv_key = req->slave_key;
1463     slv->pub_key = slv_pub_key;
1464     slv->pub_key_hash = slv_pub_key_hash;
1465     slv->origin = req->origin;
1466     slv->relay_count = ntohl (req->relay_count);
1467     if (0 < slv->relay_count)
1468     {
1469       const struct GNUNET_PeerIdentity *relays
1470         = (const struct GNUNET_PeerIdentity *) &req[1];
1471       slv->relays
1472         = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1473       uint32_t i;
1474       for (i = 0; i < slv->relay_count; i++)
1475         memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1476     }
1477
1478     chn = &slv->chn;
1479     chn->is_master = GNUNET_NO;
1480     chn->pub_key = req->channel_key;
1481     chn->pub_key_hash = pub_key_hash;
1482     channel_init (chn);
1483
1484     if (NULL == chn_slv)
1485     {
1486       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1487       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1488                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1489     }
1490     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1491                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1492     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1493                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1494     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1495                                                   &store_recv_slave_counters, slv);
1496   }
1497   else
1498   {
1499     chn = &slv->chn;
1500
1501     struct CountersResult res;
1502     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1503     res.header.size = htons (sizeof (res));
1504     res.result_code = htonl (GNUNET_OK);
1505     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1506
1507     GNUNET_SERVER_notification_context_add (nc, client);
1508     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1509                                                 GNUNET_NO);
1510
1511     if (NULL == slv->member)
1512     {
1513       slv->member
1514         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1515                                         &slv->origin,
1516                                         slv->relay_count, slv->relays,
1517                                         slv->join_req,
1518                                         &mcast_recv_join_request,
1519                                         &mcast_recv_join_decision,
1520                                         &mcast_recv_membership_test,
1521                                         &mcast_recv_replay_fragment,
1522                                         &mcast_recv_replay_message,
1523                                         &mcast_recv_message, chn);
1524
1525     }
1526     else if (NULL != slv->join_dcsn)
1527     {
1528       GNUNET_SERVER_notification_context_add (nc, client);
1529       GNUNET_SERVER_notification_context_unicast (nc, client,
1530                                                   &slv->join_dcsn->header,
1531                                                   GNUNET_NO);
1532     }
1533   }
1534
1535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536               "%p Client connected as slave to channel %s.\n",
1537               slv, GNUNET_h2s (&chn->pub_key_hash));
1538
1539   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1540   cli->client = client;
1541   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1542
1543   GNUNET_SERVER_client_set_user_context (client, &slv->chn);
1544   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1545 }
1546
1547
1548 struct JoinDecisionClosure
1549 {
1550   int32_t is_admitted;
1551   struct GNUNET_MessageHeader *msg;
1552 };
1553
1554
1555 /**
1556  * Iterator callback for responding to join requests of a slave.
1557  */
1558 static int
1559 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1560                           void *jh)
1561 {
1562   struct JoinDecisionClosure *jcls = cls;
1563   // FIXME: add relays
1564   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1565   return GNUNET_YES;
1566 }
1567
1568
1569 /**
1570  * Join decision from client.
1571  */
1572 static void
1573 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1574                            const struct GNUNET_MessageHeader *msg)
1575 {
1576   struct Channel *
1577     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1578   GNUNET_assert (GNUNET_YES == chn->is_master);
1579   struct Master *mst = (struct Master *) chn;
1580
1581   struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1582   struct JoinDecisionClosure jcls;
1583   jcls.is_admitted = ntohl (dcsn->is_admitted);
1584   jcls.msg
1585     = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
1586        <= ntohs (msg->size))
1587     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1588     : NULL;
1589
1590   struct GNUNET_HashCode slave_key_hash;
1591   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1592                       &slave_key_hash);
1593
1594   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1595               "%p Got join decision (%d) from client for channel %s..\n",
1596               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1598               "%p ..and slave %s.\n",
1599               mst, GNUNET_h2s (&slave_key_hash));
1600
1601   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1602                                               &mcast_send_join_decision, &jcls);
1603   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1604   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1605 }
1606
1607
1608 /**
1609  * Send acknowledgement to a client.
1610  *
1611  * Sent after a message fragment has been passed on to multicast.
1612  *
1613  * @param chn The channel struct for the client.
1614  */
1615 static void
1616 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1617 {
1618   struct GNUNET_MessageHeader res;
1619   res.size = htons (sizeof (res));
1620   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1621
1622   /* FIXME */
1623   GNUNET_SERVER_notification_context_add (nc, client);
1624   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1625 }
1626
1627
1628 /**
1629  * Callback for the transmit functions of multicast.
1630  */
1631 static int
1632 transmit_notify (void *cls, size_t *data_size, void *data)
1633 {
1634   struct Channel *chn = cls;
1635   struct TransmitMessage *tmit_msg = chn->tmit_head;
1636
1637   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1638   {
1639     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1640                 "%p transmit_notify: nothing to send.\n", chn);
1641     *data_size = 0;
1642     return GNUNET_NO;
1643   }
1644
1645   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1646               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1647
1648   *data_size = tmit_msg->size;
1649   memcpy (data, &tmit_msg[1], *data_size);
1650
1651   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1652   if (NULL != tmit_msg->client)
1653     send_message_ack (chn, tmit_msg->client);
1654
1655   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1656   GNUNET_free (tmit_msg);
1657
1658   if (0 == chn->tmit_task)
1659   {
1660     if (NULL != chn->tmit_head)
1661     {
1662       transmit_message (chn);
1663     }
1664     else if (chn->disconnected)
1665     {
1666       /* FIXME: handle partial message (when still in_transmit) */
1667       cleanup_channel (chn);
1668     }
1669   }
1670
1671   return ret;
1672 }
1673
1674
1675 /**
1676  * Callback for the transmit functions of multicast.
1677  */
1678 static int
1679 master_transmit_notify (void *cls, size_t *data_size, void *data)
1680 {
1681   int ret = transmit_notify (cls, data_size, data);
1682
1683   if (GNUNET_YES == ret)
1684   {
1685     struct Master *mst = cls;
1686     mst->tmit_handle = NULL;
1687   }
1688   return ret;
1689 }
1690
1691
1692 /**
1693  * Callback for the transmit functions of multicast.
1694  */
1695 static int
1696 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1697 {
1698   int ret = transmit_notify (cls, data_size, data);
1699
1700   if (GNUNET_YES == ret)
1701   {
1702     struct Slave *slv = cls;
1703     slv->tmit_handle = NULL;
1704   }
1705   return ret;
1706 }
1707
1708
1709 /**
1710  * Transmit a message from a channel master to the multicast group.
1711  */
1712 static void
1713 master_transmit_message (struct Master *mst)
1714 {
1715   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1716   mst->chn.tmit_task = 0;
1717   if (NULL == mst->tmit_handle)
1718   {
1719     mst->tmit_handle
1720       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1721                                         mst->max_group_generation,
1722                                         master_transmit_notify, mst);
1723   }
1724   else
1725   {
1726     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1727   }
1728 }
1729
1730
1731 /**
1732  * Transmit a message from a channel slave to the multicast group.
1733  */
1734 static void
1735 slave_transmit_message (struct Slave *slv)
1736 {
1737   slv->chn.tmit_task = 0;
1738   if (NULL == slv->tmit_handle)
1739   {
1740     slv->tmit_handle
1741       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1742                                            slave_transmit_notify, slv);
1743   }
1744   else
1745   {
1746     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1747   }
1748 }
1749
1750
1751 static inline void
1752 transmit_message (struct Channel *chn)
1753 {
1754   chn->is_master
1755     ? master_transmit_message ((struct Master *) chn)
1756     : slave_transmit_message ((struct Slave *) chn);
1757 }
1758
1759
1760 /**
1761  * Queue a message from a channel master for sending to the multicast group.
1762  */
1763 static void
1764 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1765                      uint16_t first_ptype, uint16_t last_ptype)
1766 {
1767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1768
1769   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1770   {
1771     tmit_msg->id = ++mst->max_message_id;
1772     struct GNUNET_PSYC_MessageMethod *pmeth
1773       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1774
1775     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1776     {
1777       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1778     }
1779     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1780     {
1781       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1782                                           - mst->max_state_message_id);
1783     }
1784     else
1785     {
1786       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1787     }
1788   }
1789 }
1790
1791
1792 /**
1793  * Queue a message from a channel slave for sending to the multicast group.
1794  */
1795 static void
1796 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1797                      uint16_t first_ptype, uint16_t last_ptype)
1798 {
1799   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1800   {
1801     struct GNUNET_PSYC_MessageMethod *pmeth
1802       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1803     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1804     tmit_msg->id = ++slv->max_request_id;
1805   }
1806 }
1807
1808
1809 /**
1810  * Queue PSYC message parts for sending to multicast.
1811  *
1812  * @param chn           Channel to send to.
1813  * @param client       Client the message originates from.
1814  * @param data_size    Size of @a data.
1815  * @param data         Concatenated message parts.
1816  * @param first_ptype  First message part type in @a data.
1817  * @param last_ptype   Last message part type in @a data.
1818  */
1819 static void
1820 queue_message (struct Channel *chn,
1821                struct GNUNET_SERVER_Client *client,
1822                size_t data_size,
1823                const void *data,
1824                uint16_t first_ptype, uint16_t last_ptype)
1825 {
1826   struct TransmitMessage *
1827     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
1828   memcpy (&tmit_msg[1], data, data_size);
1829   tmit_msg->client = client;
1830   tmit_msg->size = data_size;
1831   tmit_msg->state = chn->tmit_state;
1832
1833   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
1834
1835   chn->is_master
1836     ? master_queue_message ((struct Master *) chn, tmit_msg,
1837                             first_ptype, last_ptype)
1838     : slave_queue_message ((struct Slave *) chn, tmit_msg,
1839                            first_ptype, last_ptype);
1840 }
1841
1842
1843 /**
1844  * Cancel transmission of current message.
1845  *
1846  * @param chn     Channel to send to.
1847  * @param client  Client the message originates from.
1848  */
1849 static void
1850 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1851 {
1852   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1853
1854   struct GNUNET_MessageHeader msg;
1855   msg.size = htons (sizeof (msg));
1856   msg.type = htons (type);
1857
1858   queue_message (chn, client, sizeof (msg), &msg, type, type);
1859   transmit_message (chn);
1860
1861   /* FIXME: cleanup */
1862 }
1863
1864
1865 /**
1866  * Incoming message from a master or slave client.
1867  */
1868 static void
1869 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1870                           const struct GNUNET_MessageHeader *msg)
1871 {
1872   struct Channel *
1873     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1874   GNUNET_assert (NULL != chn);
1875
1876   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1877               "%p Received message from client.\n", chn);
1878   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1879
1880   if (GNUNET_YES != chn->ready)
1881   {
1882     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1883                 "%p Channel is not ready, dropping message from client.\n", chn);
1884     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1885     return;
1886   }
1887
1888   uint16_t size = ntohs (msg->size);
1889   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1890   {
1891     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
1892     GNUNET_break (0);
1893     transmit_cancel (chn, client);
1894     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1895     return;
1896   }
1897
1898   uint16_t first_ptype = 0, last_ptype = 0;
1899   if (GNUNET_SYSERR
1900       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
1901                                           (const char *) &msg[1],
1902                                           &first_ptype, &last_ptype))
1903   {
1904     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1905                 "%p Received invalid message part from client.\n", chn);
1906     GNUNET_break (0);
1907     transmit_cancel (chn, client);
1908     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1909     return;
1910   }
1911
1912   queue_message (chn, client, size - sizeof (*msg), &msg[1],
1913                  first_ptype, last_ptype);
1914   transmit_message (chn);
1915
1916   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1917 };
1918
1919
1920 /**
1921  * Client requests to add a slave to the membership database.
1922  */
1923 static void
1924 client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1925                        const struct GNUNET_MessageHeader *msg)
1926 {
1927
1928 }
1929
1930
1931 /**
1932  * Client requests to remove a slave from the membership database.
1933  */
1934 static void
1935 client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1936                           const struct GNUNET_MessageHeader *msg)
1937 {
1938
1939 }
1940
1941
1942 /**
1943  * Client requests channel history from PSYCstore.
1944  */
1945 static void
1946 client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1947                            const struct GNUNET_MessageHeader *msg)
1948 {
1949
1950 }
1951
1952
1953 /**
1954  * Client requests best matching state variable from PSYCstore.
1955  */
1956 static void
1957 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1958                        const struct GNUNET_MessageHeader *msg)
1959 {
1960
1961 }
1962
1963
1964 /**
1965  * Client requests state variables with a given prefix from PSYCstore.
1966  */
1967 static void
1968 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1969                               const struct GNUNET_MessageHeader *msg)
1970 {
1971
1972 }
1973
1974
1975 /**
1976  * Initialize the PSYC service.
1977  *
1978  * @param cls Closure.
1979  * @param server The initialized server.
1980  * @param c Configuration to use.
1981  */
1982 static void
1983 run (void *cls, struct GNUNET_SERVER_Handle *server,
1984      const struct GNUNET_CONFIGURATION_Handle *c)
1985 {
1986   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1987     { &client_recv_master_start, NULL,
1988       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1989
1990     { &client_recv_slave_join, NULL,
1991       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1992
1993     { &client_recv_join_decision, NULL,
1994       GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1995
1996     { &client_recv_psyc_message, NULL,
1997       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1998
1999     { &client_recv_slave_add, NULL,
2000       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
2001
2002     { &client_recv_slave_remove, NULL,
2003       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
2004
2005     { &client_recv_story_request, NULL,
2006       GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2007
2008     { &client_recv_state_get, NULL,
2009       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2010
2011     { &client_recv_state_get_prefix, NULL,
2012       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2013
2014     { NULL, NULL, 0, 0 }
2015   };
2016
2017   cfg = c;
2018   store = GNUNET_PSYCSTORE_connect (cfg);
2019   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2020   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2021   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2022   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2023   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2024   nc = GNUNET_SERVER_notification_context_create (server, 1);
2025   GNUNET_SERVER_add_handlers (server, handlers);
2026   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2027   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2028                                 &shutdown_task, NULL);
2029 }
2030
2031
2032 /**
2033  * The main function for the service.
2034  *
2035  * @param argc number of arguments from the command line
2036  * @param argv command line arguments
2037  * @return 0 ok, 1 on error
2038  */
2039 int
2040 main (int argc, char *const *argv)
2041 {
2042   return (GNUNET_OK ==
2043           GNUNET_SERVICE_run (argc, argv, "psyc",
2044                               GNUNET_SERVICE_OPTION_NONE,
2045                               &run, NULL)) ? 0 : 1;
2046 }
2047
2048 /* end of gnunet-service-psyc.c */