psyc/social: get state from psycstore
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1
2 /*
3  * This file is part of GNUnet
4  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5  *
6  * GNUnet is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published
8  * by the Free Software Foundation; either version 3, or (at your
9  * option) any later version.
10  *
11  * GNUnet is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with GNUnet; see the file COPYING.  If not, write to the
18  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * @file psyc/gnunet-service-psyc.c
24  * @brief PSYC service
25  * @author Gabor X Toth
26  */
27
28 #include <inttypes.h>
29
30 #include "platform.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_constants.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_multicast_service.h"
36 #include "gnunet_psycstore_service.h"
37 #include "gnunet_psyc_service.h"
38 #include "gnunet_psyc_util_lib.h"
39 #include "psyc.h"
40
41
42 /**
43  * Handle to our current configuration.
44  */
45 static const struct GNUNET_CONFIGURATION_Handle *cfg;
46
47 /**
48  * Handle to the statistics service.
49  */
50 static struct GNUNET_STATISTICS_Handle *stats;
51
52 /**
53  * Notification context, simplifies client broadcasts.
54  */
55 static struct GNUNET_SERVER_NotificationContext *nc;
56
57 /**
58  * Handle to the PSYCstore.
59  */
60 static struct GNUNET_PSYCSTORE_Handle *store;
61
62 /**
63  * All connected masters.
64  * Channel's pub_key_hash -> struct Master
65  */
66 static struct GNUNET_CONTAINER_MultiHashMap *masters;
67
68 /**
69  * All connected slaves.
70  * Channel's pub_key_hash -> struct Slave
71  */
72 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
73
74 /**
75  * Connected slaves per channel.
76  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
77  */
78 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
79
80
81 /**
82  * Message in the transmission queue.
83  */
84 struct TransmitMessage
85 {
86   struct TransmitMessage *prev;
87   struct TransmitMessage *next;
88
89   struct GNUNET_SERVER_Client *client;
90
91   /**
92    * ID assigned to the message.
93    */
94   uint64_t id;
95
96   /**
97    * Size of message.
98    */
99   uint16_t size;
100
101   /**
102    * @see enum MessageState
103    */
104   uint8_t state;
105
106   /**
107    * Whether a message ACK has already been sent to the client.
108    * #GNUNET_YES or #GNUNET_NO
109    */
110   uint8_t ack_sent;
111
112   /* Followed by message */
113 };
114
115
116 /**
117  * Cache for received message fragments.
118  * Message fragments are only sent to clients after all modifiers arrived.
119  *
120  * chan_key -> MultiHashMap chan_msgs
121  */
122 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
123
124
125 /**
126  * Entry in the chan_msgs hashmap of @a recv_cache:
127  * fragment_id -> RecvCacheEntry
128  */
129 struct RecvCacheEntry
130 {
131   struct GNUNET_MULTICAST_MessageHeader *mmsg;
132   uint16_t ref_count;
133 };
134
135
136 /**
137  * Entry in the @a recv_frags hash map of a @a Channel.
138  * message_id -> FragmentQueue
139  */
140 struct FragmentQueue
141 {
142   /**
143    * Fragment IDs stored in @a recv_cache.
144    */
145   struct GNUNET_CONTAINER_Heap *fragments;
146
147   /**
148    * Total size of received fragments.
149    */
150   uint64_t size;
151
152   /**
153    * Total size of received header fragments (METHOD & MODIFIERs)
154    */
155   uint64_t header_size;
156
157   /**
158    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
159    */
160   uint64_t state_delta;
161
162   /**
163    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
164    */
165   uint32_t flags;
166
167   /**
168    * Receive state of message.
169    *
170    * @see MessageFragmentState
171    */
172   uint8_t state;
173
174   /**
175    * Whether the state is already modified in PSYCstore.
176    */
177   uint8_t state_is_modified;
178
179   /**
180    * Is the message queued for delivery to the client?
181    * i.e. added to the recv_msgs queue
182    */
183   uint8_t is_queued;
184 };
185
186
187 /**
188  * List of connected clients.
189  */
190 struct Client
191 {
192   struct Client *prev;
193   struct Client *next;
194
195   struct GNUNET_SERVER_Client *client;
196 };
197
198
199 struct Operation
200 {
201   struct Operation *prev;
202   struct Operation *next;
203
204   struct GNUNET_SERVER_Client *client;
205   struct Channel *chn;
206   uint64_t op_id;
207   uint32_t flags;
208 };
209
210
211 /**
212  * Common part of the client context for both a channel master and slave.
213  */
214 struct Channel
215 {
216   struct Client *clients_head;
217   struct Client *clients_tail;
218
219   struct Operation *op_head;
220   struct Operation *op_tail;
221
222   struct TransmitMessage *tmit_head;
223   struct TransmitMessage *tmit_tail;
224
225   /**
226    * Current PSYCstore operation.
227    */
228   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
229
230   /**
231    * Received fragments not yet sent to the client.
232    * message_id -> FragmentQueue
233    */
234   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
235
236   /**
237    * Received message IDs not yet sent to the client.
238    */
239   struct GNUNET_CONTAINER_Heap *recv_msgs;
240
241   /**
242    * Public key of the channel.
243    */
244   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
245
246   /**
247    * Hash of @a pub_key.
248    */
249   struct GNUNET_HashCode pub_key_hash;
250
251   /**
252    * Last message ID sent to the client.
253    * 0 if there is no such message.
254    */
255   uint64_t max_message_id;
256
257   /**
258    * ID of the last stateful message, where the state operations has been
259    * processed and saved to PSYCstore and which has been sent to the client.
260    * 0 if there is no such message.
261    */
262   uint64_t max_state_message_id;
263
264   /**
265    * Expected value size for the modifier being received from the PSYC service.
266    */
267   uint32_t tmit_mod_value_size_expected;
268
269   /**
270    * Actual value size for the modifier being received from the PSYC service.
271    */
272   uint32_t tmit_mod_value_size;
273
274   /**
275    * @see enum MessageState
276    */
277   uint8_t tmit_state;
278
279   /**
280    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
281    */
282   uint8_t is_master;
283
284   /**
285    * Is this channel ready to receive messages from client?
286    * #GNUNET_YES or #GNUNET_NO
287    */
288   uint8_t is_ready;
289
290   /**
291    * Is the client disconnected?
292    * #GNUNET_YES or #GNUNET_NO
293    */
294   uint8_t is_disconnected;
295 };
296
297
298 /**
299  * Client context for a channel master.
300  */
301 struct Master
302 {
303   /**
304    * Channel struct common for Master and Slave
305    */
306   struct Channel chn;
307
308   /**
309    * Private key of the channel.
310    */
311   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
312
313   /**
314    * Handle for the multicast origin.
315    */
316   struct GNUNET_MULTICAST_Origin *origin;
317
318   /**
319    * Transmit handle for multicast.
320    */
321   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
322
323   /**
324    * Incoming join requests from multicast.
325    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
326    */
327   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
328
329   /**
330    * Last message ID transmitted to this channel.
331    *
332    * Incremented before sending a message, thus the message_id in messages sent
333    * starts from 1.
334    */
335   uint64_t max_message_id;
336
337   /**
338    * ID of the last message with state operations transmitted to the channel.
339    * 0 if there is no such message.
340    */
341   uint64_t max_state_message_id;
342
343   /**
344    * Maximum group generation transmitted to the channel.
345    */
346   uint64_t max_group_generation;
347
348   /**
349    * @see enum GNUNET_PSYC_Policy
350    */
351   enum GNUNET_PSYC_Policy policy;
352 };
353
354
355 /**
356  * Client context for a channel slave.
357  */
358 struct Slave
359 {
360   /**
361    * Channel struct common for Master and Slave
362    */
363   struct Channel chn;
364
365   /**
366    * Private key of the slave.
367    */
368   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
369
370   /**
371    * Public key of the slave.
372    */
373   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
374
375   /**
376    * Hash of @a pub_key.
377    */
378   struct GNUNET_HashCode pub_key_hash;
379
380   /**
381    * Handle for the multicast member.
382    */
383   struct GNUNET_MULTICAST_Member *member;
384
385   /**
386    * Transmit handle for multicast.
387    */
388   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
389
390   /**
391    * Peer identity of the origin.
392    */
393   struct GNUNET_PeerIdentity origin;
394
395   /**
396    * Number of items in @a relays.
397    */
398   uint32_t relay_count;
399
400   /**
401    * Relays that multicast can use to connect.
402    */
403   struct GNUNET_PeerIdentity *relays;
404
405   /**
406    * Join request to be transmitted to the master on join.
407    */
408   struct GNUNET_PSYC_Message *join_msg;
409
410   /**
411    * Join decision received from multicast.
412    */
413   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
414
415   /**
416    * Maximum request ID for this channel.
417    */
418   uint64_t max_request_id;
419 };
420
421
422 static void
423 transmit_message (struct Channel *chn);
424
425 static uint64_t
426 message_queue_run (struct Channel *chn);
427
428 static uint64_t
429 message_queue_drop (struct Channel *chn);
430
431
432 /**
433  * Task run during shutdown.
434  *
435  * @param cls unused
436  * @param tc unused
437  */
438 static void
439 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
440 {
441   if (NULL != nc)
442   {
443     GNUNET_SERVER_notification_context_destroy (nc);
444     nc = NULL;
445   }
446   if (NULL != stats)
447   {
448     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
449     stats = NULL;
450   }
451 }
452
453
454 static struct Operation *
455 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
456         uint64_t op_id, uint32_t flags)
457 {
458   struct Operation *op = GNUNET_malloc (sizeof (*op));
459   op->client = client;
460   op->chn = chn;
461   op->op_id = op_id;
462   op->flags = flags;
463   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
464   return op;
465 }
466
467
468 static void
469 op_remove (struct Operation *op)
470 {
471   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
472   GNUNET_free (op);
473 }
474
475
476 /**
477  * Clean up master data structures after a client disconnected.
478  */
479 static void
480 cleanup_master (struct Master *mst)
481 {
482   struct Channel *chn = &mst->chn;
483
484   if (NULL != mst->origin)
485     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
486   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
487   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
488 }
489
490
491 /**
492  * Clean up slave data structures after a client disconnected.
493  */
494 static void
495 cleanup_slave (struct Slave *slv)
496 {
497   struct Channel *chn = &slv->chn;
498   struct GNUNET_CONTAINER_MultiHashMap *
499     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
500                                                 &chn->pub_key_hash);
501   GNUNET_assert (NULL != chn_slv);
502   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
503
504   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
505   {
506     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
507                                           chn_slv);
508     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
509   }
510   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
511
512   if (NULL != slv->join_msg)
513   {
514     GNUNET_free (slv->join_msg);
515     slv->join_msg = NULL;
516   }
517   if (NULL != slv->relays)
518   {
519     GNUNET_free (slv->relays);
520     slv->relays = NULL;
521   }
522   if (NULL != slv->member)
523   {
524     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
525     slv->member = NULL;
526   }
527   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
528 }
529
530
531 /**
532  * Clean up channel data structures after a client disconnected.
533  */
534 static void
535 cleanup_channel (struct Channel *chn)
536 {
537   message_queue_drop (chn);
538   GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
539
540   if (NULL != chn->store_op)
541   {
542     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
543     chn->store_op = NULL;
544   }
545
546   (GNUNET_YES == chn->is_master)
547     ? cleanup_master ((struct Master *) chn)
548     : cleanup_slave ((struct Slave *) chn);
549   GNUNET_free (chn);
550 }
551
552
553 /**
554  * Called whenever a client is disconnected.
555  * Frees our resources associated with that client.
556  *
557  * @param cls Closure.
558  * @param client Identification of the client.
559  */
560 static void
561 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
562 {
563   if (NULL == client)
564     return;
565
566   struct Channel *
567     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
568
569   if (NULL == chn)
570   {
571     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
572                 "%p User context is NULL in client_disconnect()\n", chn);
573     GNUNET_break (0);
574     return;
575   }
576
577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578               "%p Client (%s) disconnected from channel %s\n",
579               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
580               GNUNET_h2s (&chn->pub_key_hash));
581
582   struct Client *cli = chn->clients_head;
583   while (NULL != cli)
584   {
585     if (cli->client == client)
586     {
587       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
588       GNUNET_free (cli);
589       break;
590     }
591     cli = cli->next;
592   }
593
594   struct Operation *op = chn->op_head;
595   while (NULL != op)
596   {
597     if (op->client == client)
598     {
599       op->client = NULL;
600       break;
601     }
602     op = op->next;
603   }
604
605   if (NULL == chn->clients_head)
606   { /* Last client disconnected. */
607     if (NULL != chn->tmit_head)
608     { /* Send pending messages to multicast before cleanup. */
609       transmit_message (chn);
610     }
611     else
612     {
613       cleanup_channel (chn);
614     }
615   }
616 }
617
618
619 /**
620  * Send message to all clients connected to the channel.
621  */
622 static void
623 client_send_msg (const struct Channel *chn,
624                  const struct GNUNET_MessageHeader *msg)
625 {
626   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627               "%p Sending message to clients.\n", chn);
628
629   struct Client *cli = chn->clients_head;
630   while (NULL != cli)
631   {
632     GNUNET_SERVER_notification_context_add (nc, cli->client);
633     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
634     cli = cli->next;
635   }
636 }
637
638
639 /**
640  * Send a result code back to the client.
641  *
642  * @param client
643  *        Client that should receive the result code.
644  * @param result_code
645  *        Code to transmit.
646  * @param op_id
647  *        Operation ID in network byte order.
648  * @param data
649  *        Data payload or NULL.
650  * @param data_size
651  *        Size of @a data.
652  */
653 static void
654 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
655                     int64_t result_code, const void *data, uint16_t data_size)
656 {
657   struct GNUNET_OperationResultMessage *res;
658
659   res = GNUNET_malloc (sizeof (*res) + data_size);
660   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
661   res->header.size = htons (sizeof (*res) + data_size);
662   res->result_code = GNUNET_htonll (result_code);
663   res->op_id = op_id;
664   if (0 < data_size)
665     memcpy (&res[1], data, data_size);
666
667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
668               "%p Sending result to client for operation #%" PRIu64 ": "
669               "%" PRId64 " (size: %u)\n",
670               client, GNUNET_ntohll (op_id), result_code, data_size);
671
672   GNUNET_SERVER_notification_context_add (nc, client);
673   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
674                                               GNUNET_NO);
675   GNUNET_free (res);
676 }
677
678
679 /**
680  * Closure for join_mem_test_cb()
681  */
682 struct JoinMemTestClosure
683 {
684   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
685   struct Channel *chn;
686   struct GNUNET_MULTICAST_JoinHandle *jh;
687   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
688 };
689
690
691 /**
692  * Membership test result callback used for join requests.
693  */
694 static void
695 join_mem_test_cb (void *cls, int64_t result,
696                   const char *err_msg, uint16_t err_msg_size)
697 {
698   struct JoinMemTestClosure *jcls = cls;
699
700   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
701   { /* Pass on join request to client if this is a master channel */
702     struct Master *mst = (struct Master *) jcls->chn;
703     struct GNUNET_HashCode slave_key_hash;
704     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
705                         &slave_key_hash);
706     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
707                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
708     client_send_msg (jcls->chn, &jcls->join_msg->header);
709   }
710   else
711   {
712     if (GNUNET_SYSERR == result)
713     {
714       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
715                   "Could not perform membership test (%.*s)\n",
716                   err_msg_size, err_msg);
717     }
718     // FIXME: add relays
719     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
720   }
721   GNUNET_free (jcls->join_msg);
722   GNUNET_free (jcls);
723 }
724
725
726 /**
727  * Incoming join request from multicast.
728  */
729 static void
730 mcast_recv_join_request (void *cls,
731                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
732                          const struct GNUNET_MessageHeader *join_msg,
733                          struct GNUNET_MULTICAST_JoinHandle *jh)
734 {
735   struct Channel *chn = cls;
736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
737
738   uint16_t join_msg_size = 0;
739   if (NULL != join_msg)
740   {
741     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
742     {
743       join_msg_size = ntohs (join_msg->size);
744     }
745     else
746     {
747       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
748                   "%p Got join message with invalid type %u.\n",
749                   chn, ntohs (join_msg->type));
750     }
751   }
752
753   struct GNUNET_PSYC_JoinRequestMessage *
754     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
755   req->header.size = htons (sizeof (*req) + join_msg_size);
756   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
757   req->slave_key = *slave_key;
758   if (0 < join_msg_size)
759     memcpy (&req[1], join_msg, join_msg_size);
760
761   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
762   jcls->slave_key = *slave_key;
763   jcls->chn = chn;
764   jcls->jh = jh;
765   jcls->join_msg = req;
766
767   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
768                                     chn->max_message_id, 0,
769                                     &join_mem_test_cb, jcls);
770 }
771
772
773 /**
774  * Join decision received from multicast.
775  */
776 static void
777 mcast_recv_join_decision (void *cls, int is_admitted,
778                           const struct GNUNET_PeerIdentity *peer,
779                           uint16_t relay_count,
780                           const struct GNUNET_PeerIdentity *relays,
781                           const struct GNUNET_MessageHeader *join_resp)
782 {
783   struct Slave *slv = cls;
784   struct Channel *chn = &slv->chn;
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786               "%p Got join decision: %d\n", slv, is_admitted);
787
788   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
789   struct GNUNET_PSYC_JoinDecisionMessage *
790     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
791   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
792   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
793   dcsn->is_admitted = htonl (is_admitted);
794   if (0 < join_resp_size)
795     memcpy (&dcsn[1], join_resp, join_resp_size);
796
797   client_send_msg (chn, &dcsn->header);
798
799   if (GNUNET_YES == is_admitted)
800   {
801     chn->is_ready = GNUNET_YES;
802   }
803   else
804   {
805     slv->member = NULL;
806   }
807 }
808
809
810 /**
811  * Received result of GNUNET_PSYCSTORE_membership_test()
812  */
813 static void
814 store_recv_membership_test_result (void *cls, int64_t result,
815                                    const char *err_msg, uint16_t err_msg_size)
816 {
817   struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819               "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
820               mth, result, err_msg_size, err_msg);
821
822   GNUNET_MULTICAST_membership_test_result (mth, result);
823 }
824
825
826 /**
827  * Incoming membership test request from multicast.
828  */
829 static void
830 mcast_recv_membership_test (void *cls,
831                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
832                             uint64_t message_id, uint64_t group_generation,
833                             struct GNUNET_MULTICAST_MembershipTestHandle *mth)
834 {
835   struct Channel *chn = cls;
836   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
837               "%p Received membership test request from multicast.\n",
838               mth);
839   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
840                                     message_id, group_generation,
841                                     &store_recv_membership_test_result, mth);
842 }
843
844
845 static int
846 store_recv_fragment_replay (void *cls,
847                             struct GNUNET_MULTICAST_MessageHeader *msg,
848                             enum GNUNET_PSYCSTORE_MessageFlags flags)
849 {
850   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
851
852   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
853   return GNUNET_YES;
854 }
855
856
857 /**
858  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
859  */
860 static void
861 store_recv_fragment_replay_result (void *cls, int64_t result,
862                                    const char *err_msg, uint16_t err_msg_size)
863 {
864   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
867               rh, result, err_msg_size, err_msg);
868
869   switch (result)
870   {
871   case GNUNET_YES:
872     break;
873
874   case GNUNET_NO:
875     GNUNET_MULTICAST_replay_response (rh, NULL,
876                                       GNUNET_MULTICAST_REC_NOT_FOUND);
877     break;
878
879   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
880     GNUNET_MULTICAST_replay_response (rh, NULL,
881                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
882     break;
883
884   case GNUNET_SYSERR:
885     GNUNET_MULTICAST_replay_response (rh, NULL,
886                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
887     break;
888   }
889   GNUNET_MULTICAST_replay_response_end (rh);
890 }
891
892
893 /**
894  * Incoming fragment replay request from multicast.
895  */
896 static void
897 mcast_recv_replay_fragment (void *cls,
898                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
899                             uint64_t fragment_id, uint64_t flags,
900                             struct GNUNET_MULTICAST_ReplayHandle *rh)
901
902 {
903   struct Channel *chn = cls;
904   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
905                                  fragment_id, fragment_id,
906                                  &store_recv_fragment_replay,
907                                  &store_recv_fragment_replay_result, rh);
908 }
909
910
911 /**
912  * Incoming message replay request from multicast.
913  */
914 static void
915 mcast_recv_replay_message (void *cls,
916                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
917                            uint64_t message_id,
918                            uint64_t fragment_offset,
919                            uint64_t flags,
920                            struct GNUNET_MULTICAST_ReplayHandle *rh)
921 {
922   struct Channel *chn = cls;
923   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
924                                 message_id, message_id, NULL,
925                                 &store_recv_fragment_replay,
926                                 &store_recv_fragment_replay_result, rh);
927 }
928
929
930 /**
931  * Convert an uint64_t in network byte order to a HashCode
932  * that can be used as key in a MultiHashMap
933  */
934 static inline void
935 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
936 {
937   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
938   /* TODO: use built-in byte swap functions if available */
939
940   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
941   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
942
943   *key = (struct GNUNET_HashCode) {};
944   *((uint64_t *) key)
945     = (n << 32) | (n >> 32);
946 }
947
948
949 /**
950  * Convert an uint64_t in host byte order to a HashCode
951  * that can be used as key in a MultiHashMap
952  */
953 static inline void
954 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
955 {
956 #if __BYTE_ORDER == __BIG_ENDIAN
957   hash_key_from_nll (key, n);
958 #elif __BYTE_ORDER == __LITTLE_ENDIAN
959   *key = (struct GNUNET_HashCode) {};
960   *((uint64_t *) key) = n;
961 #else
962   #error byteorder undefined
963 #endif
964 }
965
966
967 /**
968  * Initialize PSYC message header.
969  */
970 static inline void
971 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
972                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
973 {
974   uint16_t size = ntohs (mmsg->header.size);
975   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
976
977   pmsg->header.size = htons (psize);
978   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
979   pmsg->message_id = mmsg->message_id;
980   pmsg->fragment_offset = mmsg->fragment_offset;
981   pmsg->flags = htonl (flags);
982
983   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
984 }
985
986
987 /**
988  * Create a new PSYC message from a multicast message for sending it to clients.
989  */
990 static inline struct GNUNET_PSYC_MessageHeader *
991 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
992 {
993   struct GNUNET_PSYC_MessageHeader *pmsg;
994   uint16_t size = ntohs (mmsg->header.size);
995   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
996
997   pmsg = GNUNET_malloc (psize);
998   psyc_msg_init (pmsg, mmsg, flags);
999   return pmsg;
1000 }
1001
1002
1003 /**
1004  * Send multicast message to all clients connected to the channel.
1005  */
1006 static void
1007 client_send_mcast_msg (struct Channel *chn,
1008                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1009                        uint32_t flags)
1010 {
1011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012               "%p Sending multicast message to client. "
1013               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1014               chn, GNUNET_ntohll (mmsg->fragment_id),
1015               GNUNET_ntohll (mmsg->message_id));
1016
1017   struct GNUNET_PSYC_MessageHeader *
1018     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1019   client_send_msg (chn, &pmsg->header);
1020   GNUNET_free (pmsg);
1021 }
1022
1023
1024 /**
1025  * Send multicast request to all clients connected to the channel.
1026  */
1027 static void
1028 client_send_mcast_req (struct Master *mst,
1029                        const struct GNUNET_MULTICAST_RequestHeader *req)
1030 {
1031   struct Channel *chn = &mst->chn;
1032
1033   struct GNUNET_PSYC_MessageHeader *pmsg;
1034   uint16_t size = ntohs (req->header.size);
1035   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1036
1037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038               "%p Sending multicast request to client. "
1039               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1040               chn, GNUNET_ntohll (req->fragment_id),
1041               GNUNET_ntohll (req->request_id));
1042
1043   pmsg = GNUNET_malloc (psize);
1044   pmsg->header.size = htons (psize);
1045   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1046   pmsg->message_id = req->request_id;
1047   pmsg->fragment_offset = req->fragment_offset;
1048   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1049
1050   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1051   client_send_msg (chn, &pmsg->header);
1052   GNUNET_free (pmsg);
1053 }
1054
1055
1056 /**
1057  * Insert a multicast message fragment into the queue belonging to the message.
1058  *
1059  * @param chn          Channel.
1060  * @param mmsg         Multicast message fragment.
1061  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1062  * @param first_ptype  First PSYC message part type in @a mmsg.
1063  * @param last_ptype   Last PSYC message part type in @a mmsg.
1064  */
1065 static void
1066 fragment_queue_insert (struct Channel *chn,
1067                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1068                        uint16_t first_ptype, uint16_t last_ptype)
1069 {
1070   const uint16_t size = ntohs (mmsg->header.size);
1071   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1072   struct GNUNET_CONTAINER_MultiHashMap
1073     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1074                                                     &chn->pub_key_hash);
1075
1076   struct GNUNET_HashCode msg_id_hash;
1077   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1078
1079   struct FragmentQueue
1080     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1081
1082   if (NULL == fragq)
1083   {
1084     fragq = GNUNET_new (struct FragmentQueue);
1085     fragq->state = MSG_FRAG_STATE_HEADER;
1086     fragq->fragments
1087       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1088
1089     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1090                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1091
1092     if (NULL == chan_msgs)
1093     {
1094       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1095       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1096                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1097     }
1098   }
1099
1100   struct GNUNET_HashCode frag_id_hash;
1101   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1102   struct RecvCacheEntry
1103     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1104   if (NULL == cache_entry)
1105   {
1106     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107                 "%p Adding message fragment to cache. "
1108                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1109                 chn, GNUNET_ntohll (mmsg->message_id),
1110                 GNUNET_ntohll (mmsg->fragment_id));
1111     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112                 "%p header_size: %" PRIu64 " + %u\n",
1113                 chn, fragq->header_size, size);
1114     cache_entry = GNUNET_new (struct RecvCacheEntry);
1115     cache_entry->ref_count = 1;
1116     cache_entry->mmsg = GNUNET_malloc (size);
1117     memcpy (cache_entry->mmsg, mmsg, size);
1118     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1119                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1120   }
1121   else
1122   {
1123     cache_entry->ref_count++;
1124     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125                 "%p Message fragment is already in cache. "
1126                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1127                 ", ref_count: %u\n",
1128                 chn, GNUNET_ntohll (mmsg->message_id),
1129                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1130   }
1131
1132   if (MSG_FRAG_STATE_HEADER == fragq->state)
1133   {
1134     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1135     {
1136       struct GNUNET_PSYC_MessageMethod *
1137         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1138       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1139       fragq->flags = ntohl (pmeth->flags);
1140     }
1141
1142     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1143     {
1144       fragq->header_size += size;
1145     }
1146     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1147              || frag_offset == fragq->header_size)
1148     { /* header is now complete */
1149       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1150                   "%p Header of message %" PRIu64 " is complete.\n",
1151                   chn, GNUNET_ntohll (mmsg->message_id));
1152
1153       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1154                   "%p Adding message %" PRIu64 " to queue.\n",
1155                   chn, GNUNET_ntohll (mmsg->message_id));
1156       fragq->state = MSG_FRAG_STATE_DATA;
1157     }
1158     else
1159     {
1160       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1162                   "%" PRIu64 " != %" PRIu64 "\n",
1163                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1164                   fragq->header_size);
1165     }
1166   }
1167
1168   switch (last_ptype)
1169   {
1170   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1171     if (frag_offset == fragq->size)
1172       fragq->state = MSG_FRAG_STATE_END;
1173     else
1174       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1175                   "%p Message %" PRIu64 " is NOT complete yet: "
1176                   "%" PRIu64 " != %" PRIu64 "\n",
1177                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1178                   fragq->size);
1179     break;
1180
1181   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1182     /* Drop message without delivering to client if it's a single fragment */
1183     fragq->state =
1184       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1185       ? MSG_FRAG_STATE_DROP
1186       : MSG_FRAG_STATE_CANCEL;
1187   }
1188
1189   switch (fragq->state)
1190   {
1191   case MSG_FRAG_STATE_DATA:
1192   case MSG_FRAG_STATE_END:
1193   case MSG_FRAG_STATE_CANCEL:
1194     if (GNUNET_NO == fragq->is_queued)
1195     {
1196       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1197                                     GNUNET_ntohll (mmsg->message_id));
1198       fragq->is_queued = GNUNET_YES;
1199     }
1200   }
1201
1202   fragq->size += size;
1203   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1204                                 GNUNET_ntohll (mmsg->fragment_id));
1205 }
1206
1207
1208 /**
1209  * Run fragment queue of a message.
1210  *
1211  * Send fragments of a message in order to client, after all modifiers arrived
1212  * from multicast.
1213  *
1214  * @param chn      Channel.
1215  * @param msg_id  ID of the message @a fragq belongs to.
1216  * @param fragq   Fragment queue of the message.
1217  * @param drop    Drop message without delivering to client?
1218  *                #GNUNET_YES or #GNUNET_NO.
1219  */
1220 static void
1221 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1222                     struct FragmentQueue *fragq, uint8_t drop)
1223 {
1224   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1225               "%p Running message fragment queue for message %" PRIu64
1226               " (state: %u).\n",
1227               chn, msg_id, fragq->state);
1228
1229   struct GNUNET_CONTAINER_MultiHashMap
1230     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1231                                                     &chn->pub_key_hash);
1232   GNUNET_assert (NULL != chan_msgs); // FIXME
1233   uint64_t frag_id;
1234
1235   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1236                                                     &frag_id))
1237   {
1238     struct GNUNET_HashCode frag_id_hash;
1239     hash_key_from_hll (&frag_id_hash, frag_id);
1240     struct RecvCacheEntry *cache_entry
1241       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1242     if (cache_entry != NULL)
1243     {
1244       if (GNUNET_NO == drop)
1245       {
1246         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1247       }
1248       if (cache_entry->ref_count <= 1)
1249       {
1250         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1251                                               cache_entry);
1252         GNUNET_free (cache_entry->mmsg);
1253         GNUNET_free (cache_entry);
1254       }
1255       else
1256       {
1257         cache_entry->ref_count--;
1258       }
1259     }
1260 #if CACHE_AGING_IMPLEMENTED
1261     else if (GNUNET_NO == drop)
1262     {
1263       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1264     }
1265 #endif
1266
1267     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1268   }
1269
1270   if (MSG_FRAG_STATE_END <= fragq->state)
1271   {
1272     struct GNUNET_HashCode msg_id_hash;
1273     hash_key_from_hll (&msg_id_hash, msg_id);
1274
1275     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1276     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1277     GNUNET_free (fragq);
1278   }
1279   else
1280   {
1281     fragq->is_queued = GNUNET_NO;
1282   }
1283 }
1284
1285
1286 struct StateModifyClosure
1287 {
1288   struct Channel *chn;
1289   uint64_t msg_id;
1290   struct GNUNET_HashCode msg_id_hash;
1291 };
1292
1293
1294 void
1295 store_recv_state_modify_result (void *cls, int64_t result,
1296                                 const char *err_msg, uint16_t err_msg_size)
1297 {
1298   struct StateModifyClosure *mcls = cls;
1299   struct Channel *chn = mcls->chn;
1300   uint64_t msg_id = mcls->msg_id;
1301
1302   struct FragmentQueue *
1303     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1304
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1307               chn, result, err_msg_size, err_msg);
1308
1309   switch (result)
1310   {
1311   case GNUNET_OK:
1312   case GNUNET_NO:
1313     if (NULL != fragq)
1314       fragq->state_is_modified = GNUNET_YES;
1315     if (chn->max_state_message_id < msg_id)
1316       chn->max_state_message_id = msg_id;
1317     if (chn->max_message_id < msg_id)
1318       chn->max_message_id = msg_id;
1319
1320     if (NULL != fragq)
1321       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1322     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1323     message_queue_run (chn);
1324     break;
1325
1326   default:
1327     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1328                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1329                 chn, result, err_msg_size, err_msg);
1330     /** @todo FIXME: handle state_modify error */
1331   }
1332 }
1333
1334
1335 /**
1336  * Run message queue.
1337  *
1338  * Send messages in queue to client in order after a message has arrived from
1339  * multicast, according to the following:
1340  * - A message is only sent if all of its modifiers arrived.
1341  * - A stateful message is only sent if the previous stateful message
1342  *   has already been delivered to the client.
1343  *
1344  * @param chn  Channel.
1345  *
1346  * @return Number of messages removed from queue and sent to client.
1347  */
1348 static uint64_t
1349 message_queue_run (struct Channel *chn)
1350 {
1351   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1352               "%p Running message queue.\n", chn);
1353   uint64_t n = 0;
1354   uint64_t msg_id;
1355
1356   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1357                                                     &msg_id))
1358   {
1359     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1360                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1361     struct GNUNET_HashCode msg_id_hash;
1362     hash_key_from_hll (&msg_id_hash, msg_id);
1363
1364     struct FragmentQueue *
1365       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1366
1367     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1368     {
1369       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1370                   "%p No fragq (%p) or header not complete.\n",
1371                   chn, fragq);
1372       break;
1373     }
1374
1375     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376                 "%p Fragment queue entry:  state: %u, state delta: "
1377                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1378                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1379
1380     if (MSG_FRAG_STATE_DATA <= fragq->state)
1381     {
1382       /* Check if there's a missing message before the current one */
1383       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1384       {
1385         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n");
1386
1387         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1388             && (chn->max_message_id != msg_id - 1
1389                 && chn->max_message_id != msg_id))
1390         {
1391           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1392                       "%p Out of order message. "
1393                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1394                       chn, chn->max_message_id, msg_id);
1395           break;
1396           // FIXME: keep track of messages processed in this queue run,
1397           //        and only stop after reaching the end
1398         }
1399       }
1400       else
1401       {
1402         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n");
1403         if (GNUNET_YES != fragq->state_is_modified)
1404         {
1405           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1406           {
1407             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1408                         "%p Out of order stateful message. "
1409                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1410                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1411             break;
1412             // FIXME: keep track of messages processed in this queue run,
1413             //        and only stop after reaching the end
1414           }
1415
1416           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1417           mcls->chn = chn;
1418           mcls->msg_id = msg_id;
1419           mcls->msg_id_hash = msg_id_hash;
1420
1421           /* Apply modifiers to state in PSYCstore */
1422           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1423                                          fragq->state_delta,
1424                                          store_recv_state_modify_result, mcls);
1425           break; // continue after asynchronous state modify result
1426         }
1427       }
1428       chn->max_message_id = msg_id;
1429     }
1430     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1431     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1432     n++;
1433   }
1434
1435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1437   return n;
1438 }
1439
1440
1441 /**
1442  * Drop message queue of a channel.
1443  *
1444  * Remove all messages in queue without sending it to clients.
1445  *
1446  * @param chn  Channel.
1447  *
1448  * @return Number of messages removed from queue.
1449  */
1450 static uint64_t
1451 message_queue_drop (struct Channel *chn)
1452 {
1453   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1454               "%p Dropping message queue.\n", chn);
1455   uint64_t n = 0;
1456   uint64_t msg_id;
1457   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1458                                                     &msg_id))
1459   {
1460     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1461                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1462     struct GNUNET_HashCode msg_id_hash;
1463     hash_key_from_hll (&msg_id_hash, msg_id);
1464
1465     struct FragmentQueue *
1466       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1467     GNUNET_assert (NULL != fragq);
1468     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1469     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1470     n++;
1471   }
1472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1474   return n;
1475 }
1476
1477
1478 /**
1479  * Received result of GNUNET_PSYCSTORE_fragment_store().
1480  */
1481 static void
1482 store_recv_fragment_store_result (void *cls, int64_t result,
1483                                   const char *err_msg, uint16_t err_msg_size)
1484 {
1485   struct Channel *chn = cls;
1486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1488               chn, result, err_msg_size, err_msg);
1489 }
1490
1491
1492 /**
1493  * Handle incoming message fragment from multicast.
1494  *
1495  * Store it using PSYCstore and send it to the clients of the channel in order.
1496  */
1497 static void
1498 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1499 {
1500   struct Channel *chn = cls;
1501   uint16_t size = ntohs (mmsg->header.size);
1502
1503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1504               "%p Received multicast message of size %u.\n",
1505               chn, size);
1506
1507   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1508                                    &store_recv_fragment_store_result, chn);
1509
1510   uint16_t first_ptype = 0, last_ptype = 0;
1511   if (GNUNET_SYSERR
1512       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1513                                           (const char *) &mmsg[1],
1514                                           &first_ptype, &last_ptype))
1515   {
1516     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1517                 "%p Dropping incoming multicast message with invalid parts.\n",
1518                 chn);
1519     GNUNET_break_op (0);
1520     return;
1521   }
1522
1523   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524               "Message parts: first: type %u, last: type %u\n",
1525               first_ptype, last_ptype);
1526
1527   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1528   message_queue_run (chn);
1529 }
1530
1531
1532 /**
1533  * Incoming request fragment from multicast for a master.
1534  *
1535  * @param cls   Master.
1536  * @param req   The request.
1537  */
1538 static void
1539 mcast_recv_request (void *cls,
1540                     const struct GNUNET_MULTICAST_RequestHeader *req)
1541 {
1542   struct Master *mst = cls;
1543   uint16_t size = ntohs (req->header.size);
1544
1545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546               "%p Received multicast request of size %u.\n",
1547               mst, size);
1548
1549   uint16_t first_ptype = 0, last_ptype = 0;
1550   if (GNUNET_SYSERR
1551       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1552                                           (const char *) &req[1],
1553                                           &first_ptype, &last_ptype))
1554   {
1555     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1556                 "%p Dropping incoming multicast request with invalid parts.\n",
1557                 mst);
1558     GNUNET_break_op (0);
1559     return;
1560   }
1561
1562   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1563               "Message parts: first: type %u, last: type %u\n",
1564               first_ptype, last_ptype);
1565
1566   /* FIXME: in-order delivery */
1567   client_send_mcast_req (mst, req);
1568 }
1569
1570
1571 /**
1572  * Response from PSYCstore with the current counter values for a channel master.
1573  */
1574 static void
1575 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1576                             uint64_t max_message_id, uint64_t max_group_generation,
1577                             uint64_t max_state_message_id)
1578 {
1579   struct Master *mst = cls;
1580   struct Channel *chn = &mst->chn;
1581   chn->store_op = NULL;
1582
1583   struct GNUNET_PSYC_CountersResultMessage res;
1584   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1585   res.header.size = htons (sizeof (res));
1586   res.result_code = htonl (result);
1587   res.max_message_id = GNUNET_htonll (max_message_id);
1588
1589   if (GNUNET_OK == result || GNUNET_NO == result)
1590   {
1591     mst->max_message_id = max_message_id;
1592     chn->max_message_id = max_message_id;
1593     chn->max_state_message_id = max_state_message_id;
1594     mst->max_group_generation = max_group_generation;
1595     mst->origin
1596       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1597                                        &mcast_recv_join_request,
1598                                        &mcast_recv_membership_test,
1599                                        &mcast_recv_replay_fragment,
1600                                        &mcast_recv_replay_message,
1601                                        &mcast_recv_request,
1602                                        &mcast_recv_message, chn);
1603     chn->is_ready = GNUNET_YES;
1604   }
1605   else
1606   {
1607     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1608                 "%p GNUNET_PSYCSTORE_counters_get() "
1609                 "returned %d for channel %s.\n",
1610                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1611   }
1612
1613   client_send_msg (chn, &res.header);
1614 }
1615
1616
1617 /**
1618  * Response from PSYCstore with the current counter values for a channel slave.
1619  */
1620 void
1621 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1622                            uint64_t max_message_id, uint64_t max_group_generation,
1623                            uint64_t max_state_message_id)
1624 {
1625   struct Slave *slv = cls;
1626   struct Channel *chn = &slv->chn;
1627   chn->store_op = NULL;
1628
1629   struct GNUNET_PSYC_CountersResultMessage res;
1630   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1631   res.header.size = htons (sizeof (res));
1632   res.result_code = htonl (result);
1633   res.max_message_id = GNUNET_htonll (max_message_id);
1634
1635   if (GNUNET_OK == result || GNUNET_NO == result)
1636   {
1637     chn->max_message_id = max_message_id;
1638     chn->max_state_message_id = max_state_message_id;
1639     slv->member
1640       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1641                                       &slv->origin,
1642                                       slv->relay_count, slv->relays,
1643                                       &slv->join_msg->header,
1644                                       &mcast_recv_join_request,
1645                                       &mcast_recv_join_decision,
1646                                       &mcast_recv_membership_test,
1647                                       &mcast_recv_replay_fragment,
1648                                       &mcast_recv_replay_message,
1649                                       &mcast_recv_message, chn);
1650     if (NULL != slv->join_msg)
1651     {
1652       GNUNET_free (slv->join_msg);
1653       slv->join_msg = NULL;
1654     }
1655   }
1656   else
1657   {
1658     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1659                 "%p GNUNET_PSYCSTORE_counters_get() "
1660                 "returned %d for channel %s.\n",
1661                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1662   }
1663
1664   client_send_msg (chn, &res.header);
1665 }
1666
1667
1668 static void
1669 channel_init (struct Channel *chn)
1670 {
1671   chn->recv_msgs
1672     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1673   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1674 }
1675
1676
1677 /**
1678  * Handle a connecting client starting a channel master.
1679  */
1680 static void
1681 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1682                           const struct GNUNET_MessageHeader *msg)
1683 {
1684   const struct MasterStartRequest *req
1685     = (const struct MasterStartRequest *) msg;
1686
1687   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1688   struct GNUNET_HashCode pub_key_hash;
1689
1690   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1691   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1692
1693   struct Master *
1694     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1695   struct Channel *chn;
1696
1697   if (NULL == mst)
1698   {
1699     mst = GNUNET_new (struct Master);
1700     mst->policy = ntohl (req->policy);
1701     mst->priv_key = req->channel_key;
1702     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1703
1704     chn = &mst->chn;
1705     chn->is_master = GNUNET_YES;
1706     chn->pub_key = pub_key;
1707     chn->pub_key_hash = pub_key_hash;
1708     channel_init (chn);
1709
1710     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1711                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1712     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1713                                                    store_recv_master_counters, mst);
1714   }
1715   else
1716   {
1717     chn = &mst->chn;
1718
1719     struct GNUNET_PSYC_CountersResultMessage res;
1720     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1721     res.header.size = htons (sizeof (res));
1722     res.result_code = htonl (GNUNET_OK);
1723     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1724
1725     GNUNET_SERVER_notification_context_add (nc, client);
1726     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1727                                                 GNUNET_NO);
1728   }
1729
1730   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1731               "%p Client connected as master to channel %s.\n",
1732               mst, GNUNET_h2s (&chn->pub_key_hash));
1733
1734   struct Client *cli = GNUNET_new (struct Client);
1735   cli->client = client;
1736   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1737
1738   GNUNET_SERVER_client_set_user_context (client, chn);
1739   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1740 }
1741
1742
1743 /**
1744  * Handle a connecting client joining as a channel slave.
1745  */
1746 static void
1747 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1748                         const struct GNUNET_MessageHeader *msg)
1749 {
1750   const struct SlaveJoinRequest *req
1751     = (const struct SlaveJoinRequest *) msg;
1752   uint16_t req_size = ntohs (req->header.size);
1753
1754   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1755   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1756
1757   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1758   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1759   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1760
1761   struct GNUNET_CONTAINER_MultiHashMap *
1762     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1763   struct Slave *slv = NULL;
1764   struct Channel *chn;
1765
1766   if (NULL != chn_slv)
1767   {
1768     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1769   }
1770   if (NULL == slv)
1771   {
1772     slv = GNUNET_new (struct Slave);
1773     slv->priv_key = req->slave_key;
1774     slv->pub_key = slv_pub_key;
1775     slv->pub_key_hash = slv_pub_key_hash;
1776     slv->origin = req->origin;
1777     slv->relay_count = ntohl (req->relay_count);
1778
1779     const struct GNUNET_PeerIdentity *
1780       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1781     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1782     uint16_t join_msg_size = 0;
1783
1784     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1785         <= req_size)
1786     {
1787       join_msg_size = ntohs (slv->join_msg->header.size);
1788       slv->join_msg = GNUNET_malloc (join_msg_size);
1789       memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1790     }
1791     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1792     {
1793       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1794                   "%u + %u + %u != %u\n",
1795                   sizeof (*req), relay_size, join_msg_size, req_size);
1796       GNUNET_break (0);
1797       GNUNET_SERVER_client_disconnect (client);
1798       GNUNET_free (slv);
1799       return;
1800     }
1801     if (0 < slv->relay_count)
1802     {
1803       slv->relays = GNUNET_malloc (relay_size);
1804       memcpy (slv->relays, &req[1], relay_size);
1805     }
1806
1807     chn = &slv->chn;
1808     chn->is_master = GNUNET_NO;
1809     chn->pub_key = req->channel_key;
1810     chn->pub_key_hash = pub_key_hash;
1811     channel_init (chn);
1812
1813     if (NULL == chn_slv)
1814     {
1815       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1816       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1817                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1818     }
1819     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1820                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1821     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1822                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1823     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1824                                                   &store_recv_slave_counters, slv);
1825   }
1826   else
1827   {
1828     chn = &slv->chn;
1829
1830     struct GNUNET_PSYC_CountersResultMessage res;
1831     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1832     res.header.size = htons (sizeof (res));
1833     res.result_code = htonl (GNUNET_OK);
1834     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1835
1836     GNUNET_SERVER_notification_context_add (nc, client);
1837     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1838                                                 GNUNET_NO);
1839
1840     if (NULL == slv->member)
1841     {
1842       slv->member
1843         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1844                                         &slv->origin,
1845                                         slv->relay_count, slv->relays,
1846                                         &slv->join_msg->header,
1847                                         &mcast_recv_join_request,
1848                                         &mcast_recv_join_decision,
1849                                         &mcast_recv_membership_test,
1850                                         &mcast_recv_replay_fragment,
1851                                         &mcast_recv_replay_message,
1852                                         &mcast_recv_message, chn);
1853       if (NULL != slv->join_msg)
1854       {
1855         GNUNET_free (slv->join_msg);
1856         slv->join_msg = NULL;
1857       }
1858     }
1859     else if (NULL != slv->join_dcsn)
1860     {
1861       GNUNET_SERVER_notification_context_add (nc, client);
1862       GNUNET_SERVER_notification_context_unicast (nc, client,
1863                                                   &slv->join_dcsn->header,
1864                                                   GNUNET_NO);
1865     }
1866   }
1867
1868   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1869               "%p Client connected as slave to channel %s.\n",
1870               slv, GNUNET_h2s (&chn->pub_key_hash));
1871
1872   struct Client *cli = GNUNET_new (struct Client);
1873   cli->client = client;
1874   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1875
1876   GNUNET_SERVER_client_set_user_context (client, chn);
1877   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1878 }
1879
1880
1881 struct JoinDecisionClosure
1882 {
1883   int32_t is_admitted;
1884   struct GNUNET_MessageHeader *msg;
1885 };
1886
1887
1888 /**
1889  * Iterator callback for sending join decisions to multicast.
1890  */
1891 static int
1892 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1893                           void *value)
1894 {
1895   struct JoinDecisionClosure *jcls = cls;
1896   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1897   // FIXME: add relays
1898   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1899   return GNUNET_YES;
1900 }
1901
1902
1903 /**
1904  * Join decision from client.
1905  */
1906 static void
1907 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1908                            const struct GNUNET_MessageHeader *msg)
1909 {
1910   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1911     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1912   struct Channel *chn;
1913   struct Master *mst;
1914   struct JoinDecisionClosure jcls;
1915
1916   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1917   if (NULL == chn)
1918   {
1919     GNUNET_break (0);
1920     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1921     return;
1922   }
1923   GNUNET_assert (GNUNET_YES == chn->is_master);
1924   mst = (struct Master *) chn;
1925   jcls.is_admitted = ntohl (dcsn->is_admitted);
1926   jcls.msg
1927     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1928     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1929     : NULL;
1930
1931   struct GNUNET_HashCode slave_key_hash;
1932   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1933                       &slave_key_hash);
1934
1935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1936               "%p Got join decision (%d) from client for channel %s..\n",
1937               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939               "%p ..and slave %s.\n",
1940               mst, GNUNET_h2s (&slave_key_hash));
1941
1942   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1943                                               &mcast_send_join_decision, &jcls);
1944   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1945   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1946 }
1947
1948
1949 /**
1950  * Send acknowledgement to a client.
1951  *
1952  * Sent after a message fragment has been passed on to multicast.
1953  *
1954  * @param chn The channel struct for the client.
1955  */
1956 static void
1957 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1958 {
1959   struct GNUNET_MessageHeader res;
1960   res.size = htons (sizeof (res));
1961   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1962
1963   /* FIXME */
1964   GNUNET_SERVER_notification_context_add (nc, client);
1965   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1966 }
1967
1968
1969 /**
1970  * Callback for the transmit functions of multicast.
1971  */
1972 static int
1973 transmit_notify (void *cls, size_t *data_size, void *data)
1974 {
1975   struct Channel *chn = cls;
1976   struct TransmitMessage *tmit_msg = chn->tmit_head;
1977
1978   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1979   {
1980     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981                 "%p transmit_notify: nothing to send.\n", chn);
1982     *data_size = 0;
1983     return GNUNET_NO;
1984   }
1985
1986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1988
1989   *data_size = tmit_msg->size;
1990   memcpy (data, &tmit_msg[1], *data_size);
1991
1992   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1993
1994   if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1995     send_message_ack (chn, tmit_msg->client);
1996
1997   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1998   GNUNET_free (tmit_msg);
1999
2000   if (NULL != chn->tmit_head)
2001   {
2002     transmit_message (chn);
2003   }
2004   else if (GNUNET_YES == chn->is_disconnected)
2005   {
2006     /* FIXME: handle partial message (when still in_transmit) */
2007     cleanup_channel (chn);
2008   }
2009   return ret;
2010 }
2011
2012
2013 /**
2014  * Callback for the transmit functions of multicast.
2015  */
2016 static int
2017 master_transmit_notify (void *cls, size_t *data_size, void *data)
2018 {
2019   int ret = transmit_notify (cls, data_size, data);
2020
2021   if (GNUNET_YES == ret)
2022   {
2023     struct Master *mst = cls;
2024     mst->tmit_handle = NULL;
2025   }
2026   return ret;
2027 }
2028
2029
2030 /**
2031  * Callback for the transmit functions of multicast.
2032  */
2033 static int
2034 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2035 {
2036   int ret = transmit_notify (cls, data_size, data);
2037
2038   if (GNUNET_YES == ret)
2039   {
2040     struct Slave *slv = cls;
2041     slv->tmit_handle = NULL;
2042   }
2043   return ret;
2044 }
2045
2046
2047 /**
2048  * Transmit a message from a channel master to the multicast group.
2049  */
2050 static void
2051 master_transmit_message (struct Master *mst)
2052 {
2053   if (NULL == mst->tmit_handle)
2054   {
2055     mst->tmit_handle
2056       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
2057                                         mst->max_group_generation,
2058                                         master_transmit_notify, mst);
2059   }
2060   else
2061   {
2062     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2063   }
2064 }
2065
2066
2067 /**
2068  * Transmit a message from a channel slave to the multicast group.
2069  */
2070 static void
2071 slave_transmit_message (struct Slave *slv)
2072 {
2073   if (NULL == slv->tmit_handle)
2074   {
2075     slv->tmit_handle
2076       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
2077                                            slave_transmit_notify, slv);
2078   }
2079   else
2080   {
2081     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2082   }
2083 }
2084
2085
2086 static void
2087 transmit_message (struct Channel *chn)
2088 {
2089   chn->is_master
2090     ? master_transmit_message ((struct Master *) chn)
2091     : slave_transmit_message ((struct Slave *) chn);
2092 }
2093
2094
2095 /**
2096  * Queue a message from a channel master for sending to the multicast group.
2097  */
2098 static void
2099 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2100                      uint16_t first_ptype, uint16_t last_ptype)
2101 {
2102   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2103
2104   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2105   {
2106     tmit_msg->id = ++mst->max_message_id;
2107     struct GNUNET_PSYC_MessageMethod *pmeth
2108       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2109
2110     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2111     {
2112       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2113     }
2114     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2115     {
2116       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst);
2117       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2118                                           - mst->max_state_message_id);
2119     }
2120     else
2121     {
2122         GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst);
2123       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2124     }
2125
2126     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2127     {
2128       /// @todo add state_hash to PSYC header
2129     }
2130   }
2131 }
2132
2133
2134 /**
2135  * Queue a message from a channel slave for sending to the multicast group.
2136  */
2137 static void
2138 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
2139                      uint16_t first_ptype, uint16_t last_ptype)
2140 {
2141   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2142   {
2143     struct GNUNET_PSYC_MessageMethod *pmeth
2144       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2145     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2146     tmit_msg->id = ++slv->max_request_id;
2147   }
2148 }
2149
2150
2151 /**
2152  * Queue PSYC message parts for sending to multicast.
2153  *
2154  * @param chn           Channel to send to.
2155  * @param client       Client the message originates from.
2156  * @param data_size    Size of @a data.
2157  * @param data         Concatenated message parts.
2158  * @param first_ptype  First message part type in @a data.
2159  * @param last_ptype   Last message part type in @a data.
2160  */
2161 static struct TransmitMessage *
2162 queue_message (struct Channel *chn,
2163                struct GNUNET_SERVER_Client *client,
2164                size_t data_size,
2165                const void *data,
2166                uint16_t first_ptype, uint16_t last_ptype)
2167 {
2168   struct TransmitMessage *
2169     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2170   memcpy (&tmit_msg[1], data, data_size);
2171   tmit_msg->client = client;
2172   tmit_msg->size = data_size;
2173   tmit_msg->state = chn->tmit_state;
2174
2175   /* FIXME: separate queue per message ID */
2176
2177   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2178
2179   chn->is_master
2180     ? master_queue_message ((struct Master *) chn, tmit_msg,
2181                             first_ptype, last_ptype)
2182     : slave_queue_message ((struct Slave *) chn, tmit_msg,
2183                            first_ptype, last_ptype);
2184   return tmit_msg;
2185 }
2186
2187
2188 /**
2189  * Cancel transmission of current message.
2190  *
2191  * @param chn     Channel to send to.
2192  * @param client  Client the message originates from.
2193  */
2194 static void
2195 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2196 {
2197   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2198
2199   struct GNUNET_MessageHeader msg;
2200   msg.size = htons (sizeof (msg));
2201   msg.type = htons (type);
2202
2203   queue_message (chn, client, sizeof (msg), &msg, type, type);
2204   transmit_message (chn);
2205
2206   /* FIXME: cleanup */
2207 }
2208
2209
2210 /**
2211  * Incoming message from a master or slave client.
2212  */
2213 static void
2214 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2215                           const struct GNUNET_MessageHeader *msg)
2216 {
2217   struct Channel *
2218     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2219   GNUNET_assert (NULL != chn);
2220
2221   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2222               "%p Received message from client.\n", chn);
2223   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2224
2225   if (GNUNET_YES != chn->is_ready)
2226   {
2227     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2228                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2229     GNUNET_break (0);
2230     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2231     return;
2232   }
2233
2234   uint16_t size = ntohs (msg->size);
2235   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2236   {
2237     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2238     GNUNET_break (0);
2239     transmit_cancel (chn, client);
2240     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2241     return;
2242   }
2243
2244   uint16_t first_ptype = 0, last_ptype = 0;
2245   if (GNUNET_SYSERR
2246       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2247                                           (const char *) &msg[1],
2248                                           &first_ptype, &last_ptype))
2249   {
2250     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2251                 "%p Received invalid message part from client.\n", chn);
2252     GNUNET_break (0);
2253     transmit_cancel (chn, client);
2254     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2255     return;
2256   }
2257   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258               "%p Received message with first part type %u and last part type %u.\n",
2259               chn, first_ptype, last_ptype);
2260
2261   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2262                  first_ptype, last_ptype);
2263   transmit_message (chn);
2264   /* FIXME: send a few ACKs even before transmit_notify is called */
2265
2266   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2267 };
2268
2269
2270 /**
2271  * Received result of GNUNET_PSYCSTORE_membership_store()
2272  */
2273 static void
2274 store_recv_membership_store_result (void *cls, int64_t result,
2275                                     const char *err_msg, uint16_t err_msg_size)
2276 {
2277   struct Operation *op = cls;
2278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2279               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2280               op->chn, result, err_msg_size, err_msg);
2281
2282   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2283   op_remove (op);
2284 }
2285
2286
2287 /**
2288  * Client requests to add/remove a slave in the membership database.
2289  */
2290 static void
2291 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2292                               const struct GNUNET_MessageHeader *msg)
2293 {
2294   struct Channel *
2295     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2296   GNUNET_assert (NULL != chn);
2297
2298   const struct ChannelMembershipStoreRequest *
2299     req = (const struct ChannelMembershipStoreRequest *) msg;
2300
2301   struct Operation *op = op_add (chn, client, req->op_id, 0);
2302
2303   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2304   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306               "%p Received membership store request from client.\n", chn);
2307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2309               chn, req->did_join, announced_at, effective_since);
2310
2311   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2312                                      req->did_join, announced_at, effective_since,
2313                                      0, /* FIXME: group_generation */
2314                                      &store_recv_membership_store_result, op);
2315   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2316 }
2317
2318
2319 /**
2320  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2321  * in response to a history request from a client.
2322  */
2323 static int
2324 store_recv_fragment_history (void *cls,
2325                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2326                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2327 {
2328   struct Operation *op = cls;
2329   if (NULL == op->client)
2330   { /* Requesting client already disconnected. */
2331     return GNUNET_NO;
2332   }
2333   struct Channel *chn = op->chn;
2334
2335   struct GNUNET_PSYC_MessageHeader *pmsg;
2336   uint16_t msize = ntohs (mmsg->header.size);
2337   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2338
2339   struct GNUNET_OperationResultMessage *
2340     res = GNUNET_malloc (sizeof (*res) + psize);
2341   res->header.size = htons (sizeof (*res) + psize);
2342   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2343   res->op_id = op->op_id;
2344   res->result_code = GNUNET_htonll (GNUNET_OK);
2345
2346   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2347   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2348   memcpy (&res[1], pmsg, psize);
2349
2350   /** @todo FIXME: send only to requesting client */
2351   client_send_msg (chn, &res->header);
2352   return GNUNET_YES;
2353 }
2354
2355
2356 /**
2357  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2358  * in response to a history request from a client.
2359  */
2360 static void
2361 store_recv_fragment_history_result (void *cls, int64_t result,
2362                                     const char *err_msg, uint16_t err_msg_size)
2363 {
2364   struct Operation *op = cls;
2365   if (NULL == op->client)
2366   { /* Requesting client already disconnected. */
2367     return;
2368   }
2369
2370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2371               "%p History replay #%" PRIu64 ": "
2372               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2373               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2374
2375   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2376   {
2377     /** @todo Multicast replay request for messages not found locally. */
2378   }
2379
2380   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2381   op_remove (op);
2382 }
2383
2384
2385 /**
2386  * Client requests channel history.
2387  */
2388 static void
2389 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2390                             const struct GNUNET_MessageHeader *msg)
2391 {
2392   struct Channel *
2393     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2394   GNUNET_assert (NULL != chn);
2395
2396   const struct GNUNET_PSYC_HistoryRequestMessage *
2397     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2398   uint16_t size = ntohs (msg->size);
2399   const char *method_prefix = (const char *) &req[1];
2400
2401   if (size < sizeof (*req) + 1
2402       || '\0' != method_prefix[size - sizeof (*req) - 1])
2403   {
2404     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2405                 "%p History replay #%" PRIu64 ": "
2406                 "invalid method prefix. size: %u < %u?\n",
2407                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2408     GNUNET_break (0);
2409     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2410     return;
2411   }
2412
2413   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2414
2415   if (0 == req->message_limit)
2416     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2417                                   GNUNET_ntohll (req->start_message_id),
2418                                   GNUNET_ntohll (req->end_message_id),
2419                                   method_prefix,
2420                                   &store_recv_fragment_history,
2421                                   &store_recv_fragment_history_result, op);
2422   else
2423     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2424                                          GNUNET_ntohll (req->message_limit),
2425                                          method_prefix,
2426                                          &store_recv_fragment_history,
2427                                          &store_recv_fragment_history_result,
2428                                          op);
2429
2430   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2431 }
2432
2433
2434 /**
2435  * Received state var from PSYCstore, send it to client.
2436  */
2437 static int
2438 store_recv_state_var (void *cls, const char *name,
2439                       const void *value, uint32_t value_size)
2440 {
2441   struct Operation *op = cls;
2442   struct GNUNET_OperationResultMessage *res;
2443
2444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2445               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2446               op->chn, GNUNET_ntohll (op->op_id), name);
2447
2448   if (NULL != name) /* First part */
2449   {
2450     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2451     struct GNUNET_PSYC_MessageModifier *mod;
2452     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2453     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2454     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2455     res->op_id = op->op_id;
2456
2457     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2458     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2459     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2460     mod->name_size = htons (name_size);
2461     mod->value_size = htonl (value_size);
2462     mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2463     memcpy (&mod[1], name, name_size);
2464     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2465   }
2466   else /* Continuation */
2467   {
2468     struct GNUNET_MessageHeader *mod;
2469     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2470     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2471     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2472     res->op_id = op->op_id;
2473
2474     mod = (struct GNUNET_MessageHeader *) &res[1];
2475     mod->size = htons (sizeof (*mod) + value_size);
2476     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2477     memcpy (&mod[1], value, value_size);
2478   }
2479
2480   // FIXME: client might have been disconnected
2481   GNUNET_SERVER_notification_context_add (nc, op->client);
2482   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2483                                               GNUNET_NO);
2484   return GNUNET_YES;
2485 }
2486
2487
2488 /**
2489  * Received result of GNUNET_PSYCSTORE_state_get()
2490  * or GNUNET_PSYCSTORE_state_get_prefix()
2491  */
2492 static void
2493 store_recv_state_result (void *cls, int64_t result,
2494                          const char *err_msg, uint16_t err_msg_size)
2495 {
2496   struct Operation *op = cls;
2497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2498               "%p state_get #%" PRIu64 ": "
2499               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2500               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2501
2502   // FIXME: client might have been disconnected
2503   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2504   op_remove (op);
2505 }
2506
2507
2508 /**
2509  * Client requests best matching state variable from PSYCstore.
2510  */
2511 static void
2512 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2513                        const struct GNUNET_MessageHeader *msg)
2514 {
2515   struct Channel *
2516     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2517   GNUNET_assert (NULL != chn);
2518
2519   const struct StateRequest *
2520     req = (const struct StateRequest *) msg;
2521
2522   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2523   const char *name = (const char *) &req[1];
2524   if (0 == name_size || '\0' != name[name_size - 1])
2525   {
2526     GNUNET_break (0);
2527     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2528     return;
2529   }
2530
2531   struct Operation *op = op_add (chn, client, req->op_id, 0);
2532   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2533                               &store_recv_state_var,
2534                               &store_recv_state_result, op);
2535   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2536 }
2537
2538
2539 /**
2540  * Client requests state variables with a given prefix from PSYCstore.
2541  */
2542 static void
2543 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2544                               const struct GNUNET_MessageHeader *msg)
2545 {
2546   struct Channel *
2547     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2548   GNUNET_assert (NULL != chn);
2549
2550   const struct StateRequest *
2551     req = (const struct StateRequest *) msg;
2552
2553   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2554   const char *name = (const char *) &req[1];
2555   if (0 == name_size || '\0' != name[name_size - 1])
2556   {
2557     GNUNET_break (0);
2558     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2559     return;
2560   }
2561
2562   struct Operation *op = op_add (chn, client, req->op_id, 0);
2563   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2564                                      &store_recv_state_var,
2565                                      &store_recv_state_result, op);
2566   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2567 }
2568
2569
2570 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2571   { &client_recv_master_start, NULL,
2572     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2573
2574   { &client_recv_slave_join, NULL,
2575     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2576
2577   { &client_recv_join_decision, NULL,
2578     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2579
2580   { &client_recv_psyc_message, NULL,
2581     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2582
2583   { &client_recv_membership_store, NULL,
2584     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2585
2586   { &client_recv_history_replay, NULL,
2587     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2588
2589   { &client_recv_state_get, NULL,
2590     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2591
2592   { &client_recv_state_get_prefix, NULL,
2593     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2594
2595   { NULL, NULL, 0, 0 }
2596 };
2597
2598
2599 /**
2600  * Initialize the PSYC service.
2601  *
2602  * @param cls Closure.
2603  * @param server The initialized server.
2604  * @param c Configuration to use.
2605  */
2606 static void
2607 run (void *cls, struct GNUNET_SERVER_Handle *server,
2608      const struct GNUNET_CONFIGURATION_Handle *c)
2609 {
2610   cfg = c;
2611   store = GNUNET_PSYCSTORE_connect (cfg);
2612   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2613   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2614   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2615   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2616   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2617   nc = GNUNET_SERVER_notification_context_create (server, 1);
2618   GNUNET_SERVER_add_handlers (server, server_handlers);
2619   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2620   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2621                                 &shutdown_task, NULL);
2622 }
2623
2624
2625 /**
2626  * The main function for the service.
2627  *
2628  * @param argc number of arguments from the command line
2629  * @param argv command line arguments
2630  * @return 0 ok, 1 on error
2631  */
2632 int
2633 main (int argc, char *const *argv)
2634 {
2635   return (GNUNET_OK ==
2636           GNUNET_SERVICE_run (argc, argv, "psyc",
2637                               GNUNET_SERVICE_OPTION_NONE,
2638                               &run, NULL)) ? 0 : 1;
2639 }
2640
2641 /* end of gnunet-service-psyc.c */