-fix (C) notices
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 static void
421 transmit_message (struct Channel *chn);
422
423 static uint64_t
424 message_queue_run (struct Channel *chn);
425
426 static uint64_t
427 message_queue_drop (struct Channel *chn);
428
429
430 static void
431 schedule_transmit_message (void *cls,
432                            const struct GNUNET_SCHEDULER_TaskContext *tc)
433 {
434   struct Channel *chn = cls;
435   transmit_message (chn);
436 }
437
438
439 /**
440  * Task run during shutdown.
441  *
442  * @param cls unused
443  * @param tc unused
444  */
445 static void
446 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
447 {
448   if (NULL != nc)
449   {
450     GNUNET_SERVER_notification_context_destroy (nc);
451     nc = NULL;
452   }
453   if (NULL != stats)
454   {
455     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
456     stats = NULL;
457   }
458 }
459
460
461 static struct Operation *
462 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
463         uint64_t op_id, uint32_t flags)
464 {
465   struct Operation *op = GNUNET_malloc (sizeof (*op));
466   op->client = client;
467   op->chn = chn;
468   op->op_id = op_id;
469   op->flags = flags;
470   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
471   return op;
472 }
473
474
475 static void
476 op_remove (struct Operation *op)
477 {
478   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
479   GNUNET_free (op);
480 }
481
482
483 /**
484  * Clean up master data structures after a client disconnected.
485  */
486 static void
487 cleanup_master (struct Master *mst)
488 {
489   struct Channel *chn = &mst->chn;
490
491   if (NULL != mst->origin)
492     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
493   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
494   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
495 }
496
497
498 /**
499  * Clean up slave data structures after a client disconnected.
500  */
501 static void
502 cleanup_slave (struct Slave *slv)
503 {
504   struct Channel *chn = &slv->chn;
505   struct GNUNET_CONTAINER_MultiHashMap *
506     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
507                                                 &chn->pub_key_hash);
508   GNUNET_assert (NULL != chn_slv);
509   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
510
511   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
512   {
513     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
514                                           chn_slv);
515     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
516   }
517   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
518
519   if (NULL != slv->join_msg)
520   {
521     GNUNET_free (slv->join_msg);
522     slv->join_msg = NULL;
523   }
524   if (NULL != slv->relays)
525   {
526     GNUNET_free (slv->relays);
527     slv->relays = NULL;
528   }
529   if (NULL != slv->member)
530   {
531     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
532     slv->member = NULL;
533   }
534   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
535 }
536
537
538 /**
539  * Clean up channel data structures after a client disconnected.
540  */
541 static void
542 cleanup_channel (struct Channel *chn)
543 {
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545               "%p Cleaning up channel %s. master? %u\n",
546               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
547   message_queue_drop (chn);
548   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
549   chn->recv_frags = NULL;
550
551   if (NULL != chn->store_op)
552   {
553     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
554     chn->store_op = NULL;
555   }
556
557   (GNUNET_YES == chn->is_master)
558     ? cleanup_master ((struct Master *) chn)
559     : cleanup_slave ((struct Slave *) chn);
560   GNUNET_free (chn);
561 }
562
563
564 /**
565  * Called whenever a client is disconnected.
566  * Frees our resources associated with that client.
567  *
568  * @param cls Closure.
569  * @param client Identification of the client.
570  */
571 static void
572 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
573 {
574   if (NULL == client)
575     return;
576
577   struct Channel *
578     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
579
580   if (NULL == chn)
581   {
582     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583                 "%p User context is NULL in client_disconnect()\n", chn);
584     GNUNET_break (0);
585     return;
586   }
587
588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589               "%p Client (%s) disconnected from channel %s\n",
590               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
591               GNUNET_h2s (&chn->pub_key_hash));
592
593   struct Client *cli = chn->clients_head;
594   while (NULL != cli)
595   {
596     if (cli->client == client)
597     {
598       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
599       GNUNET_free (cli);
600       break;
601     }
602     cli = cli->next;
603   }
604
605   struct Operation *op = chn->op_head;
606   while (NULL != op)
607   {
608     if (op->client == client)
609     {
610       op->client = NULL;
611       break;
612     }
613     op = op->next;
614   }
615
616   if (NULL == chn->clients_head)
617   { /* Last client disconnected. */
618     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619                 "%p Last client (%s) disconnected from channel %s\n",
620                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
621                 GNUNET_h2s (&chn->pub_key_hash));
622     chn->is_disconnected = GNUNET_YES;
623     if (NULL != chn->tmit_head)
624     { /* Send pending messages to multicast before cleanup. */
625       transmit_message (chn);
626     }
627     else
628     {
629       cleanup_channel (chn);
630     }
631   }
632 }
633
634
635 /**
636  * Send message to all clients connected to the channel.
637  */
638 static void
639 client_send_msg (const struct Channel *chn,
640                  const struct GNUNET_MessageHeader *msg)
641 {
642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643               "%p Sending message to clients.\n", chn);
644
645   struct Client *cli = chn->clients_head;
646   while (NULL != cli)
647   {
648     GNUNET_SERVER_notification_context_add (nc, cli->client);
649     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
650     cli = cli->next;
651   }
652 }
653
654
655 /**
656  * Send a result code back to the client.
657  *
658  * @param client
659  *        Client that should receive the result code.
660  * @param result_code
661  *        Code to transmit.
662  * @param op_id
663  *        Operation ID in network byte order.
664  * @param data
665  *        Data payload or NULL.
666  * @param data_size
667  *        Size of @a data.
668  */
669 static void
670 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
671                     int64_t result_code, const void *data, uint16_t data_size)
672 {
673   struct GNUNET_OperationResultMessage *res;
674
675   res = GNUNET_malloc (sizeof (*res) + data_size);
676   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
677   res->header.size = htons (sizeof (*res) + data_size);
678   res->result_code = GNUNET_htonll (result_code);
679   res->op_id = op_id;
680   if (0 < data_size)
681     memcpy (&res[1], data, data_size);
682
683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
684               "%p Sending result to client for operation #%" PRIu64 ": "
685               "%" PRId64 " (size: %u)\n",
686               client, GNUNET_ntohll (op_id), result_code, data_size);
687
688   GNUNET_SERVER_notification_context_add (nc, client);
689   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
690                                               GNUNET_NO);
691   GNUNET_free (res);
692 }
693
694
695 /**
696  * Closure for join_mem_test_cb()
697  */
698 struct JoinMemTestClosure
699 {
700   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
701   struct Channel *chn;
702   struct GNUNET_MULTICAST_JoinHandle *jh;
703   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
704 };
705
706
707 /**
708  * Membership test result callback used for join requests.
709  */
710 static void
711 join_mem_test_cb (void *cls, int64_t result,
712                   const char *err_msg, uint16_t err_msg_size)
713 {
714   struct JoinMemTestClosure *jcls = cls;
715
716   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
717   { /* Pass on join request to client if this is a master channel */
718     struct Master *mst = (struct Master *) jcls->chn;
719     struct GNUNET_HashCode slave_pub_hash;
720     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
721                         &slave_pub_hash);
722     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
723                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
724     client_send_msg (jcls->chn, &jcls->join_msg->header);
725   }
726   else
727   {
728     if (GNUNET_SYSERR == result)
729     {
730       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
731                   "Could not perform membership test (%.*s)\n",
732                   err_msg_size, err_msg);
733     }
734     // FIXME: add relays
735     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
736   }
737   GNUNET_free (jcls->join_msg);
738   GNUNET_free (jcls);
739 }
740
741
742 /**
743  * Incoming join request from multicast.
744  */
745 static void
746 mcast_recv_join_request (void *cls,
747                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
748                          const struct GNUNET_MessageHeader *join_msg,
749                          struct GNUNET_MULTICAST_JoinHandle *jh)
750 {
751   struct Channel *chn = cls;
752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
753
754   uint16_t join_msg_size = 0;
755   if (NULL != join_msg)
756   {
757     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
758     {
759       join_msg_size = ntohs (join_msg->size);
760     }
761     else
762     {
763       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
764                   "%p Got join message with invalid type %u.\n",
765                   chn, ntohs (join_msg->type));
766     }
767   }
768
769   struct GNUNET_PSYC_JoinRequestMessage *
770     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
771   req->header.size = htons (sizeof (*req) + join_msg_size);
772   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
773   req->slave_pub_key = *slave_pub_key;
774   if (0 < join_msg_size)
775     memcpy (&req[1], join_msg, join_msg_size);
776
777   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
778   jcls->slave_pub_key = *slave_pub_key;
779   jcls->chn = chn;
780   jcls->jh = jh;
781   jcls->join_msg = req;
782
783   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
784                                     chn->max_message_id, 0,
785                                     &join_mem_test_cb, jcls);
786 }
787
788
789 /**
790  * Join decision received from multicast.
791  */
792 static void
793 mcast_recv_join_decision (void *cls, int is_admitted,
794                           const struct GNUNET_PeerIdentity *peer,
795                           uint16_t relay_count,
796                           const struct GNUNET_PeerIdentity *relays,
797                           const struct GNUNET_MessageHeader *join_resp)
798 {
799   struct Slave *slv = cls;
800   struct Channel *chn = &slv->chn;
801   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802               "%p Got join decision: %d\n", slv, is_admitted);
803   if (GNUNET_YES == chn->is_ready)
804   {
805     /* Already admitted */
806     return;
807   }
808
809   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
810   struct GNUNET_PSYC_JoinDecisionMessage *
811     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
812   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
813   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
814   dcsn->is_admitted = htonl (is_admitted);
815   if (0 < join_resp_size)
816     memcpy (&dcsn[1], join_resp, join_resp_size);
817
818   client_send_msg (chn, &dcsn->header);
819
820   if (GNUNET_YES == is_admitted
821       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
822   {
823     chn->is_ready = GNUNET_YES;
824   }
825 }
826
827
828 static int
829 store_recv_fragment_replay (void *cls,
830                             struct GNUNET_MULTICAST_MessageHeader *msg,
831                             enum GNUNET_PSYCSTORE_MessageFlags flags)
832 {
833   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
834
835   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
836   return GNUNET_YES;
837 }
838
839
840 /**
841  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
842  */
843 static void
844 store_recv_fragment_replay_result (void *cls, int64_t result,
845                                    const char *err_msg, uint16_t err_msg_size)
846 {
847   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
850               rh, result, err_msg_size, err_msg);
851
852   switch (result)
853   {
854   case GNUNET_YES:
855     break;
856
857   case GNUNET_NO:
858     GNUNET_MULTICAST_replay_response (rh, NULL,
859                                       GNUNET_MULTICAST_REC_NOT_FOUND);
860     break;
861
862   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
863     GNUNET_MULTICAST_replay_response (rh, NULL,
864                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
865     break;
866
867   case GNUNET_SYSERR:
868     GNUNET_MULTICAST_replay_response (rh, NULL,
869                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
870     break;
871   }
872   GNUNET_MULTICAST_replay_response_end (rh);
873 }
874
875
876 /**
877  * Incoming fragment replay request from multicast.
878  */
879 static void
880 mcast_recv_replay_fragment (void *cls,
881                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
882                             uint64_t fragment_id, uint64_t flags,
883                             struct GNUNET_MULTICAST_ReplayHandle *rh)
884
885 {
886   struct Channel *chn = cls;
887   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
888                                  fragment_id, fragment_id,
889                                  &store_recv_fragment_replay,
890                                  &store_recv_fragment_replay_result, rh);
891 }
892
893
894 /**
895  * Incoming message replay request from multicast.
896  */
897 static void
898 mcast_recv_replay_message (void *cls,
899                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
900                            uint64_t message_id,
901                            uint64_t fragment_offset,
902                            uint64_t flags,
903                            struct GNUNET_MULTICAST_ReplayHandle *rh)
904 {
905   struct Channel *chn = cls;
906   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
907                                 message_id, message_id, 1, NULL,
908                                 &store_recv_fragment_replay,
909                                 &store_recv_fragment_replay_result, rh);
910 }
911
912
913 /**
914  * Convert an uint64_t in network byte order to a HashCode
915  * that can be used as key in a MultiHashMap
916  */
917 static inline void
918 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
919 {
920   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
921   /* TODO: use built-in byte swap functions if available */
922
923   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
924   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
925
926   *key = (struct GNUNET_HashCode) {};
927   *((uint64_t *) key)
928     = (n << 32) | (n >> 32);
929 }
930
931
932 /**
933  * Convert an uint64_t in host byte order to a HashCode
934  * that can be used as key in a MultiHashMap
935  */
936 static inline void
937 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
938 {
939 #if __BYTE_ORDER == __BIG_ENDIAN
940   hash_key_from_nll (key, n);
941 #elif __BYTE_ORDER == __LITTLE_ENDIAN
942   *key = (struct GNUNET_HashCode) {};
943   *((uint64_t *) key) = n;
944 #else
945   #error byteorder undefined
946 #endif
947 }
948
949
950 /**
951  * Initialize PSYC message header.
952  */
953 static inline void
954 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
955                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
956 {
957   uint16_t size = ntohs (mmsg->header.size);
958   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
959
960   pmsg->header.size = htons (psize);
961   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
962   pmsg->message_id = mmsg->message_id;
963   pmsg->fragment_offset = mmsg->fragment_offset;
964   pmsg->flags = htonl (flags);
965
966   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
967 }
968
969
970 /**
971  * Create a new PSYC message from a multicast message for sending it to clients.
972  */
973 static inline struct GNUNET_PSYC_MessageHeader *
974 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
975 {
976   struct GNUNET_PSYC_MessageHeader *pmsg;
977   uint16_t size = ntohs (mmsg->header.size);
978   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
979
980   pmsg = GNUNET_malloc (psize);
981   psyc_msg_init (pmsg, mmsg, flags);
982   return pmsg;
983 }
984
985
986 /**
987  * Send multicast message to all clients connected to the channel.
988  */
989 static void
990 client_send_mcast_msg (struct Channel *chn,
991                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
992                        uint32_t flags)
993 {
994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995               "%p Sending multicast message to client. "
996               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
997               chn, GNUNET_ntohll (mmsg->fragment_id),
998               GNUNET_ntohll (mmsg->message_id));
999
1000   struct GNUNET_PSYC_MessageHeader *
1001     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1002   client_send_msg (chn, &pmsg->header);
1003   GNUNET_free (pmsg);
1004 }
1005
1006
1007 /**
1008  * Send multicast request to all clients connected to the channel.
1009  */
1010 static void
1011 client_send_mcast_req (struct Master *mst,
1012                        const struct GNUNET_MULTICAST_RequestHeader *req)
1013 {
1014   struct Channel *chn = &mst->chn;
1015
1016   struct GNUNET_PSYC_MessageHeader *pmsg;
1017   uint16_t size = ntohs (req->header.size);
1018   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1019
1020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1021               "%p Sending multicast request to client. "
1022               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1023               chn, GNUNET_ntohll (req->fragment_id),
1024               GNUNET_ntohll (req->request_id));
1025
1026   pmsg = GNUNET_malloc (psize);
1027   pmsg->header.size = htons (psize);
1028   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1029   pmsg->message_id = req->request_id;
1030   pmsg->fragment_offset = req->fragment_offset;
1031   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1032   pmsg->slave_pub_key = req->member_pub_key;
1033
1034   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1035   client_send_msg (chn, &pmsg->header);
1036   GNUNET_free (pmsg);
1037 }
1038
1039
1040 /**
1041  * Insert a multicast message fragment into the queue belonging to the message.
1042  *
1043  * @param chn          Channel.
1044  * @param mmsg         Multicast message fragment.
1045  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1046  * @param first_ptype  First PSYC message part type in @a mmsg.
1047  * @param last_ptype   Last PSYC message part type in @a mmsg.
1048  */
1049 static void
1050 fragment_queue_insert (struct Channel *chn,
1051                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1052                        uint16_t first_ptype, uint16_t last_ptype)
1053 {
1054   const uint16_t size = ntohs (mmsg->header.size);
1055   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1056   struct GNUNET_CONTAINER_MultiHashMap
1057     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1058                                                     &chn->pub_key_hash);
1059
1060   struct GNUNET_HashCode msg_id_hash;
1061   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1062
1063   struct FragmentQueue
1064     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1065
1066   if (NULL == fragq)
1067   {
1068     fragq = GNUNET_new (struct FragmentQueue);
1069     fragq->state = MSG_FRAG_STATE_HEADER;
1070     fragq->fragments
1071       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1072
1073     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1074                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1075
1076     if (NULL == chan_msgs)
1077     {
1078       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1079       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1080                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1081     }
1082   }
1083
1084   struct GNUNET_HashCode frag_id_hash;
1085   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1086   struct RecvCacheEntry
1087     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1088   if (NULL == cache_entry)
1089   {
1090     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1091                 "%p Adding message fragment to cache. "
1092                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1093                 chn, GNUNET_ntohll (mmsg->message_id),
1094                 GNUNET_ntohll (mmsg->fragment_id));
1095     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1096                 "%p header_size: %" PRIu64 " + %u\n",
1097                 chn, fragq->header_size, size);
1098     cache_entry = GNUNET_new (struct RecvCacheEntry);
1099     cache_entry->ref_count = 1;
1100     cache_entry->mmsg = GNUNET_malloc (size);
1101     memcpy (cache_entry->mmsg, mmsg, size);
1102     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1103                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1104   }
1105   else
1106   {
1107     cache_entry->ref_count++;
1108     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109                 "%p Message fragment is already in cache. "
1110                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1111                 ", ref_count: %u\n",
1112                 chn, GNUNET_ntohll (mmsg->message_id),
1113                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1114   }
1115
1116   if (MSG_FRAG_STATE_HEADER == fragq->state)
1117   {
1118     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1119     {
1120       struct GNUNET_PSYC_MessageMethod *
1121         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1122       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1123       fragq->flags = ntohl (pmeth->flags);
1124     }
1125
1126     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1127     {
1128       fragq->header_size += size;
1129     }
1130     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1131              || frag_offset == fragq->header_size)
1132     { /* header is now complete */
1133       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1134                   "%p Header of message %" PRIu64 " is complete.\n",
1135                   chn, GNUNET_ntohll (mmsg->message_id));
1136
1137       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1138                   "%p Adding message %" PRIu64 " to queue.\n",
1139                   chn, GNUNET_ntohll (mmsg->message_id));
1140       fragq->state = MSG_FRAG_STATE_DATA;
1141     }
1142     else
1143     {
1144       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1145                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1146                   "%" PRIu64 " != %" PRIu64 "\n",
1147                   chn, GNUNET_ntohll (mmsg->message_id),
1148                   frag_offset, fragq->header_size);
1149     }
1150   }
1151
1152   switch (last_ptype)
1153   {
1154   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1155     if (frag_offset == fragq->size)
1156       fragq->state = MSG_FRAG_STATE_END;
1157     else
1158       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1159                   "%p Message %" PRIu64 " is NOT complete yet: "
1160                   "%" PRIu64 " != %" PRIu64 "\n",
1161                   chn, GNUNET_ntohll (mmsg->message_id),
1162                   frag_offset, fragq->size);
1163     break;
1164
1165   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1166     /* Drop message without delivering to client if it's a single fragment */
1167     fragq->state =
1168       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1169       ? MSG_FRAG_STATE_DROP
1170       : MSG_FRAG_STATE_CANCEL;
1171   }
1172
1173   switch (fragq->state)
1174   {
1175   case MSG_FRAG_STATE_DATA:
1176   case MSG_FRAG_STATE_END:
1177   case MSG_FRAG_STATE_CANCEL:
1178     if (GNUNET_NO == fragq->is_queued)
1179     {
1180       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1181                                     GNUNET_ntohll (mmsg->message_id));
1182       fragq->is_queued = GNUNET_YES;
1183     }
1184   }
1185
1186   fragq->size += size;
1187   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1188                                 GNUNET_ntohll (mmsg->fragment_id));
1189 }
1190
1191
1192 /**
1193  * Run fragment queue of a message.
1194  *
1195  * Send fragments of a message in order to client, after all modifiers arrived
1196  * from multicast.
1197  *
1198  * @param chn
1199  *        Channel.
1200  * @param msg_id
1201  *        ID of the message @a fragq belongs to.
1202  * @param fragq
1203  *        Fragment queue of the message.
1204  * @param drop
1205  *        Drop message without delivering to client?
1206  *        #GNUNET_YES or #GNUNET_NO.
1207  */
1208 static void
1209 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1210                     struct FragmentQueue *fragq, uint8_t drop)
1211 {
1212   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1213               "%p Running message fragment queue for message %" PRIu64
1214               " (state: %u).\n",
1215               chn, msg_id, fragq->state);
1216
1217   struct GNUNET_CONTAINER_MultiHashMap
1218     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1219                                                     &chn->pub_key_hash);
1220   GNUNET_assert (NULL != chan_msgs);
1221   uint64_t frag_id;
1222
1223   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1224                                                     &frag_id))
1225   {
1226     struct GNUNET_HashCode frag_id_hash;
1227     hash_key_from_hll (&frag_id_hash, frag_id);
1228     struct RecvCacheEntry *cache_entry
1229       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1230     if (cache_entry != NULL)
1231     {
1232       if (GNUNET_NO == drop)
1233       {
1234         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1235       }
1236       if (cache_entry->ref_count <= 1)
1237       {
1238         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1239                                               cache_entry);
1240         GNUNET_free (cache_entry->mmsg);
1241         GNUNET_free (cache_entry);
1242       }
1243       else
1244       {
1245         cache_entry->ref_count--;
1246       }
1247     }
1248 #if CACHE_AGING_IMPLEMENTED
1249     else if (GNUNET_NO == drop)
1250     {
1251       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1252     }
1253 #endif
1254
1255     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1256   }
1257
1258   if (MSG_FRAG_STATE_END <= fragq->state)
1259   {
1260     struct GNUNET_HashCode msg_id_hash;
1261     hash_key_from_hll (&msg_id_hash, msg_id);
1262
1263     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1264     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1265     GNUNET_free (fragq);
1266   }
1267   else
1268   {
1269     fragq->is_queued = GNUNET_NO;
1270   }
1271 }
1272
1273
1274 struct StateModifyClosure
1275 {
1276   struct Channel *chn;
1277   uint64_t msg_id;
1278   struct GNUNET_HashCode msg_id_hash;
1279 };
1280
1281
1282 void
1283 store_recv_state_modify_result (void *cls, int64_t result,
1284                                 const char *err_msg, uint16_t err_msg_size)
1285 {
1286   struct StateModifyClosure *mcls = cls;
1287   struct Channel *chn = mcls->chn;
1288   uint64_t msg_id = mcls->msg_id;
1289
1290   struct FragmentQueue *
1291     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1292
1293   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1295               chn, result, err_msg_size, err_msg);
1296
1297   switch (result)
1298   {
1299   case GNUNET_OK:
1300   case GNUNET_NO:
1301     if (NULL != fragq)
1302       fragq->state_is_modified = GNUNET_YES;
1303     if (chn->max_state_message_id < msg_id)
1304       chn->max_state_message_id = msg_id;
1305     if (chn->max_message_id < msg_id)
1306       chn->max_message_id = msg_id;
1307
1308     if (NULL != fragq)
1309       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1310     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1311     message_queue_run (chn);
1312     break;
1313
1314   default:
1315     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1316                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1317                 chn, result, err_msg_size, err_msg);
1318     /** @todo FIXME: handle state_modify error */
1319   }
1320 }
1321
1322
1323 /**
1324  * Run message queue.
1325  *
1326  * Send messages in queue to client in order after a message has arrived from
1327  * multicast, according to the following:
1328  * - A message is only sent if all of its modifiers arrived.
1329  * - A stateful message is only sent if the previous stateful message
1330  *   has already been delivered to the client.
1331  *
1332  * @param chn  Channel.
1333  *
1334  * @return Number of messages removed from queue and sent to client.
1335  */
1336 static uint64_t
1337 message_queue_run (struct Channel *chn)
1338 {
1339   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1340               "%p Running message queue.\n", chn);
1341   uint64_t n = 0;
1342   uint64_t msg_id;
1343
1344   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1345                                                     &msg_id))
1346   {
1347     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1348                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1349     struct GNUNET_HashCode msg_id_hash;
1350     hash_key_from_hll (&msg_id_hash, msg_id);
1351
1352     struct FragmentQueue *
1353       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1354
1355     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1356     {
1357       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1358                   "%p No fragq (%p) or header not complete.\n",
1359                   chn, fragq);
1360       break;
1361     }
1362
1363     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1364                 "%p Fragment queue entry:  state: %u, state delta: "
1365                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1366                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1367
1368     if (MSG_FRAG_STATE_DATA <= fragq->state)
1369     {
1370       /* Check if there's a missing message before the current one */
1371       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1372       {
1373         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1374
1375         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1376             && (chn->max_message_id != msg_id - 1
1377                 && chn->max_message_id != msg_id))
1378         {
1379           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1380                       "%p Out of order message. "
1381                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1382                       chn, chn->max_message_id, msg_id);
1383           break;
1384           // FIXME: keep track of messages processed in this queue run,
1385           //        and only stop after reaching the end
1386         }
1387       }
1388       else
1389       {
1390         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1391         if (GNUNET_YES != fragq->state_is_modified)
1392         {
1393           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1394           {
1395             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1396                         "%p Out of order stateful message. "
1397                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1398                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1399             break;
1400             // FIXME: keep track of messages processed in this queue run,
1401             //        and only stop after reaching the end
1402           }
1403
1404           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1405           mcls->chn = chn;
1406           mcls->msg_id = msg_id;
1407           mcls->msg_id_hash = msg_id_hash;
1408
1409           /* Apply modifiers to state in PSYCstore */
1410           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1411                                          fragq->state_delta,
1412                                          store_recv_state_modify_result, mcls);
1413           break; // continue after asynchronous state modify result
1414         }
1415       }
1416       chn->max_message_id = msg_id;
1417     }
1418     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1419     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1420     n++;
1421   }
1422
1423   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1424               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1425   return n;
1426 }
1427
1428
1429 /**
1430  * Drop message queue of a channel.
1431  *
1432  * Remove all messages in queue without sending it to clients.
1433  *
1434  * @param chn  Channel.
1435  *
1436  * @return Number of messages removed from queue.
1437  */
1438 static uint64_t
1439 message_queue_drop (struct Channel *chn)
1440 {
1441   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1442               "%p Dropping message queue.\n", chn);
1443   uint64_t n = 0;
1444   uint64_t msg_id;
1445   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1446                                                     &msg_id))
1447   {
1448     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1449                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1450     struct GNUNET_HashCode msg_id_hash;
1451     hash_key_from_hll (&msg_id_hash, msg_id);
1452
1453     struct FragmentQueue *
1454       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1455     GNUNET_assert (NULL != fragq);
1456     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1457     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1458     n++;
1459   }
1460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1462   return n;
1463 }
1464
1465
1466 /**
1467  * Received result of GNUNET_PSYCSTORE_fragment_store().
1468  */
1469 static void
1470 store_recv_fragment_store_result (void *cls, int64_t result,
1471                                   const char *err_msg, uint16_t err_msg_size)
1472 {
1473   struct Channel *chn = cls;
1474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1475               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1476               chn, result, err_msg_size, err_msg);
1477 }
1478
1479
1480 /**
1481  * Handle incoming message fragment from multicast.
1482  *
1483  * Store it using PSYCstore and send it to the clients of the channel in order.
1484  */
1485 static void
1486 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1487 {
1488   struct Channel *chn = cls;
1489   uint16_t size = ntohs (mmsg->header.size);
1490
1491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492               "%p Received multicast message of size %u. "
1493               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1494               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1495               chn, size,
1496               GNUNET_ntohll (mmsg->fragment_id),
1497               GNUNET_ntohll (mmsg->message_id),
1498               GNUNET_ntohll (mmsg->fragment_offset),
1499               GNUNET_ntohll (mmsg->flags));
1500
1501   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1502                                    &store_recv_fragment_store_result, chn);
1503
1504   uint16_t first_ptype = 0, last_ptype = 0;
1505   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1506                                                (const char *) &mmsg[1],
1507                                                &first_ptype, &last_ptype);
1508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509               "%p Message check result %d, first part type %u, last part type %u\n",
1510               chn, check, first_ptype, last_ptype);
1511   if (GNUNET_SYSERR == check)
1512   {
1513     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1514                 "%p Dropping incoming multicast message with invalid parts.\n",
1515                 chn);
1516     GNUNET_break_op (0);
1517     return;
1518   }
1519
1520   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1521   message_queue_run (chn);
1522 }
1523
1524
1525 /**
1526  * Incoming request fragment from multicast for a master.
1527  *
1528  * @param cls   Master.
1529  * @param req   The request.
1530  */
1531 static void
1532 mcast_recv_request (void *cls,
1533                     const struct GNUNET_MULTICAST_RequestHeader *req)
1534 {
1535   struct Master *mst = cls;
1536   uint16_t size = ntohs (req->header.size);
1537
1538   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540               "%p Received multicast request of size %u from %s.\n",
1541               mst, size, str);
1542   GNUNET_free (str);
1543
1544   uint16_t first_ptype = 0, last_ptype = 0;
1545   if (GNUNET_SYSERR
1546       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1547                                           (const char *) &req[1],
1548                                           &first_ptype, &last_ptype))
1549   {
1550     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1551                 "%p Dropping incoming multicast request with invalid parts.\n",
1552                 mst);
1553     GNUNET_break_op (0);
1554     return;
1555   }
1556
1557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558               "Message parts: first: type %u, last: type %u\n",
1559               first_ptype, last_ptype);
1560
1561   /* FIXME: in-order delivery */
1562   client_send_mcast_req (mst, req);
1563 }
1564
1565
1566 /**
1567  * Response from PSYCstore with the current counter values for a channel master.
1568  */
1569 static void
1570 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1571                             uint64_t max_message_id, uint64_t max_group_generation,
1572                             uint64_t max_state_message_id)
1573 {
1574   struct Master *mst = cls;
1575   struct Channel *chn = &mst->chn;
1576   chn->store_op = NULL;
1577
1578   struct GNUNET_PSYC_CountersResultMessage res;
1579   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1580   res.header.size = htons (sizeof (res));
1581   res.result_code = htonl (result);
1582   res.max_message_id = GNUNET_htonll (max_message_id);
1583
1584   if (GNUNET_OK == result || GNUNET_NO == result)
1585   {
1586     mst->max_message_id = max_message_id;
1587     chn->max_message_id = max_message_id;
1588     chn->max_state_message_id = max_state_message_id;
1589     mst->max_group_generation = max_group_generation;
1590     mst->origin
1591       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1592                                        mcast_recv_join_request,
1593                                        mcast_recv_replay_fragment,
1594                                        mcast_recv_replay_message,
1595                                        mcast_recv_request,
1596                                        mcast_recv_message, chn);
1597     chn->is_ready = GNUNET_YES;
1598   }
1599   else
1600   {
1601     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1602                 "%p GNUNET_PSYCSTORE_counters_get() "
1603                 "returned %d for channel %s.\n",
1604                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1605   }
1606
1607   client_send_msg (chn, &res.header);
1608 }
1609
1610
1611 /**
1612  * Response from PSYCstore with the current counter values for a channel slave.
1613  */
1614 void
1615 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1616                            uint64_t max_message_id, uint64_t max_group_generation,
1617                            uint64_t max_state_message_id)
1618 {
1619   struct Slave *slv = cls;
1620   struct Channel *chn = &slv->chn;
1621   chn->store_op = NULL;
1622
1623   struct GNUNET_PSYC_CountersResultMessage res;
1624   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1625   res.header.size = htons (sizeof (res));
1626   res.result_code = htonl (result);
1627   res.max_message_id = GNUNET_htonll (max_message_id);
1628
1629   if (GNUNET_OK == result || GNUNET_NO == result)
1630   {
1631     chn->max_message_id = max_message_id;
1632     chn->max_state_message_id = max_state_message_id;
1633     slv->member
1634       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1635                                       &slv->origin,
1636                                       slv->relay_count, slv->relays,
1637                                       &slv->join_msg->header,
1638                                       mcast_recv_join_request,
1639                                       mcast_recv_join_decision,
1640                                       mcast_recv_replay_fragment,
1641                                       mcast_recv_replay_message,
1642                                       mcast_recv_message, chn);
1643     if (NULL != slv->join_msg)
1644     {
1645       GNUNET_free (slv->join_msg);
1646       slv->join_msg = NULL;
1647     }
1648   }
1649   else
1650   {
1651     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1652                 "%p GNUNET_PSYCSTORE_counters_get() "
1653                 "returned %d for channel %s.\n",
1654                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1655   }
1656
1657   client_send_msg (chn, &res.header);
1658 }
1659
1660
1661 static void
1662 channel_init (struct Channel *chn)
1663 {
1664   chn->recv_msgs
1665     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1666   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1667 }
1668
1669
1670 /**
1671  * Handle a connecting client starting a channel master.
1672  */
1673 static void
1674 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1675                           const struct GNUNET_MessageHeader *msg)
1676 {
1677   const struct MasterStartRequest *req
1678     = (const struct MasterStartRequest *) msg;
1679
1680   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1681   struct GNUNET_HashCode pub_key_hash;
1682
1683   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1684   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1685
1686   struct Master *
1687     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1688   struct Channel *chn;
1689
1690   if (NULL == mst)
1691   {
1692     mst = GNUNET_new (struct Master);
1693     mst->policy = ntohl (req->policy);
1694     mst->priv_key = req->channel_key;
1695     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1696
1697     chn = &mst->chn;
1698     chn->is_master = GNUNET_YES;
1699     chn->pub_key = pub_key;
1700     chn->pub_key_hash = pub_key_hash;
1701     channel_init (chn);
1702
1703     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1704                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1705     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1706                                                    store_recv_master_counters, mst);
1707   }
1708   else
1709   {
1710     chn = &mst->chn;
1711
1712     struct GNUNET_PSYC_CountersResultMessage res;
1713     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1714     res.header.size = htons (sizeof (res));
1715     res.result_code = htonl (GNUNET_OK);
1716     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1717
1718     GNUNET_SERVER_notification_context_add (nc, client);
1719     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1720                                                 GNUNET_NO);
1721   }
1722
1723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1724               "%p Client connected as master to channel %s.\n",
1725               mst, GNUNET_h2s (&chn->pub_key_hash));
1726
1727   struct Client *cli = GNUNET_new (struct Client);
1728   cli->client = client;
1729   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1730
1731   GNUNET_SERVER_client_set_user_context (client, chn);
1732   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1733 }
1734
1735
1736 /**
1737  * Handle a connecting client joining as a channel slave.
1738  */
1739 static void
1740 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1741                         const struct GNUNET_MessageHeader *msg)
1742 {
1743   const struct SlaveJoinRequest *req
1744     = (const struct SlaveJoinRequest *) msg;
1745   uint16_t req_size = ntohs (req->header.size);
1746
1747   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1748   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1749
1750   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1751   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1752   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1753
1754   struct GNUNET_CONTAINER_MultiHashMap *
1755     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1756   struct Slave *slv = NULL;
1757   struct Channel *chn;
1758
1759   if (NULL != chn_slv)
1760   {
1761     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1762   }
1763   if (NULL == slv)
1764   {
1765     slv = GNUNET_new (struct Slave);
1766     slv->priv_key = req->slave_key;
1767     slv->pub_key = slv_pub_key;
1768     slv->pub_key_hash = slv_pub_hash;
1769     slv->origin = req->origin;
1770     slv->relay_count = ntohl (req->relay_count);
1771     slv->join_flags = ntohl (req->flags);
1772
1773     const struct GNUNET_PeerIdentity *
1774       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1775     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1776     uint16_t join_msg_size = 0;
1777
1778     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1779         <= req_size)
1780     {
1781       struct GNUNET_PSYC_Message *
1782         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1783       join_msg_size = ntohs (join_msg->header.size);
1784       slv->join_msg = GNUNET_malloc (join_msg_size);
1785       memcpy (slv->join_msg, join_msg, join_msg_size);
1786     }
1787     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1788     {
1789       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1790                   "%u + %u + %u != %u\n",
1791                   sizeof (*req), relay_size, join_msg_size, req_size);
1792       GNUNET_break (0);
1793       GNUNET_SERVER_client_disconnect (client);
1794       GNUNET_free (slv);
1795       return;
1796     }
1797     if (0 < slv->relay_count)
1798     {
1799       slv->relays = GNUNET_malloc (relay_size);
1800       memcpy (slv->relays, &req[1], relay_size);
1801     }
1802
1803     chn = &slv->chn;
1804     chn->is_master = GNUNET_NO;
1805     chn->pub_key = req->channel_pub_key;
1806     chn->pub_key_hash = pub_key_hash;
1807     channel_init (chn);
1808
1809     if (NULL == chn_slv)
1810     {
1811       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1812       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1813                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1814     }
1815     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1816                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1817     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1818                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1819     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1820                                                   &store_recv_slave_counters, slv);
1821   }
1822   else
1823   {
1824     chn = &slv->chn;
1825
1826     struct GNUNET_PSYC_CountersResultMessage res;
1827     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1828     res.header.size = htons (sizeof (res));
1829     res.result_code = htonl (GNUNET_OK);
1830     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1831
1832     GNUNET_SERVER_notification_context_add (nc, client);
1833     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1834                                                 GNUNET_NO);
1835
1836     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1837     {
1838       mcast_recv_join_decision (slv, GNUNET_YES,
1839                                 NULL, 0, NULL, NULL);
1840     }
1841     else if (NULL == slv->member)
1842     {
1843       slv->member
1844         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1845                                         &slv->origin,
1846                                         slv->relay_count, slv->relays,
1847                                         &slv->join_msg->header,
1848                                         &mcast_recv_join_request,
1849                                         &mcast_recv_join_decision,
1850                                         &mcast_recv_replay_fragment,
1851                                         &mcast_recv_replay_message,
1852                                         &mcast_recv_message, chn);
1853       if (NULL != slv->join_msg)
1854       {
1855         GNUNET_free (slv->join_msg);
1856         slv->join_msg = NULL;
1857       }
1858     }
1859     else if (NULL != slv->join_dcsn)
1860     {
1861       GNUNET_SERVER_notification_context_add (nc, client);
1862       GNUNET_SERVER_notification_context_unicast (nc, client,
1863                                                   &slv->join_dcsn->header,
1864                                                   GNUNET_NO);
1865     }
1866   }
1867
1868   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1869               "%p Client connected as slave to channel %s.\n",
1870               slv, GNUNET_h2s (&chn->pub_key_hash));
1871
1872   struct Client *cli = GNUNET_new (struct Client);
1873   cli->client = client;
1874   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1875
1876   GNUNET_SERVER_client_set_user_context (client, chn);
1877   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1878 }
1879
1880
1881 struct JoinDecisionClosure
1882 {
1883   int32_t is_admitted;
1884   struct GNUNET_MessageHeader *msg;
1885 };
1886
1887
1888 /**
1889  * Iterator callback for sending join decisions to multicast.
1890  */
1891 static int
1892 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1893                           void *value)
1894 {
1895   struct JoinDecisionClosure *jcls = cls;
1896   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1897   // FIXME: add relays
1898   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1899   return GNUNET_YES;
1900 }
1901
1902
1903 /**
1904  * Join decision from client.
1905  */
1906 static void
1907 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1908                            const struct GNUNET_MessageHeader *msg)
1909 {
1910   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1911     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1912   struct Channel *chn;
1913   struct Master *mst;
1914   struct JoinDecisionClosure jcls;
1915
1916   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1917   if (NULL == chn)
1918   {
1919     GNUNET_break (0);
1920     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1921     return;
1922   }
1923   GNUNET_assert (GNUNET_YES == chn->is_master);
1924   mst = (struct Master *) chn;
1925   jcls.is_admitted = ntohl (dcsn->is_admitted);
1926   jcls.msg
1927     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1928     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1929     : NULL;
1930
1931   struct GNUNET_HashCode slave_pub_hash;
1932   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1933                       &slave_pub_hash);
1934
1935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1936               "%p Got join decision (%d) from client for channel %s..\n",
1937               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939               "%p ..and slave %s.\n",
1940               mst, GNUNET_h2s (&slave_pub_hash));
1941
1942   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1943                                               &mcast_send_join_decision, &jcls);
1944   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1945   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1946 }
1947
1948
1949 /**
1950  * Send acknowledgement to a client.
1951  *
1952  * Sent after a message fragment has been passed on to multicast.
1953  *
1954  * @param chn The channel struct for the client.
1955  */
1956 static void
1957 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1958 {
1959   struct GNUNET_MessageHeader res;
1960   res.size = htons (sizeof (res));
1961   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1962
1963   /* FIXME */
1964   GNUNET_SERVER_notification_context_add (nc, client);
1965   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1966 }
1967
1968
1969 /**
1970  * Callback for the transmit functions of multicast.
1971  */
1972 static int
1973 transmit_notify (void *cls, size_t *data_size, void *data)
1974 {
1975   struct Channel *chn = cls;
1976   struct TransmitMessage *tmit_msg = chn->tmit_head;
1977
1978   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1979   {
1980     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981                 "%p transmit_notify: nothing to send.\n", chn);
1982     if (NULL != tmit_msg && *data_size < tmit_msg->size)
1983       GNUNET_break (0);
1984     *data_size = 0;
1985     return GNUNET_NO;
1986   }
1987
1988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1989               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1990
1991   *data_size = tmit_msg->size;
1992   memcpy (data, &tmit_msg[1], *data_size);
1993
1994   int ret
1995     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1996     ? GNUNET_NO
1997     : GNUNET_YES;
1998
1999   /* FIXME: handle disconnecting clients */
2000   if (NULL != tmit_msg->client)
2001     send_message_ack (chn, tmit_msg->client);
2002
2003   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2004   GNUNET_free (tmit_msg);
2005
2006   if (NULL != chn->tmit_head)
2007   {
2008     GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
2009   }
2010   else if (GNUNET_YES == chn->is_disconnected
2011            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2012   {
2013     /* FIXME: handle partial message (when still in_transmit) */
2014     return GNUNET_SYSERR;
2015   }
2016   return ret;
2017 }
2018
2019
2020 /**
2021  * Callback for the transmit functions of multicast.
2022  */
2023 static int
2024 master_transmit_notify (void *cls, size_t *data_size, void *data)
2025 {
2026   int ret = transmit_notify (cls, data_size, data);
2027
2028   if (GNUNET_YES == ret)
2029   {
2030     struct Master *mst = cls;
2031     mst->tmit_handle = NULL;
2032   }
2033   return ret;
2034 }
2035
2036
2037 /**
2038  * Callback for the transmit functions of multicast.
2039  */
2040 static int
2041 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2042 {
2043   int ret = transmit_notify (cls, data_size, data);
2044
2045   if (GNUNET_YES == ret)
2046   {
2047     struct Slave *slv = cls;
2048     slv->tmit_handle = NULL;
2049   }
2050   return ret;
2051 }
2052
2053
2054 /**
2055  * Transmit a message from a channel master to the multicast group.
2056  */
2057 static void
2058 master_transmit_message (struct Master *mst)
2059 {
2060   if (NULL == mst->chn.tmit_head)
2061     return;
2062   if (NULL == mst->tmit_handle)
2063   {
2064     mst->tmit_handle
2065       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
2066                                         mst->max_group_generation,
2067                                         master_transmit_notify, mst);
2068   }
2069   else
2070   {
2071     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2072   }
2073 }
2074
2075
2076 /**
2077  * Transmit a message from a channel slave to the multicast group.
2078  */
2079 static void
2080 slave_transmit_message (struct Slave *slv)
2081 {
2082   if (NULL == slv->chn.tmit_head)
2083     return;
2084   if (NULL == slv->tmit_handle)
2085   {
2086     slv->tmit_handle
2087       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2088                                            slave_transmit_notify, slv);
2089   }
2090   else
2091   {
2092     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2093   }
2094 }
2095
2096
2097 static void
2098 transmit_message (struct Channel *chn)
2099 {
2100   chn->is_master
2101     ? master_transmit_message ((struct Master *) chn)
2102     : slave_transmit_message ((struct Slave *) chn);
2103 }
2104
2105
2106 /**
2107  * Queue a message from a channel master for sending to the multicast group.
2108  */
2109 static void
2110 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2111 {
2112   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2113
2114   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2115   {
2116     tmit_msg->id = ++mst->max_message_id;
2117     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2118                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2119                 mst, tmit_msg->id);
2120     struct GNUNET_PSYC_MessageMethod *pmeth
2121       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2122
2123     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2124     {
2125       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2126     }
2127     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2128     {
2129       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2130                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2131                   mst, tmit_msg->id - mst->max_state_message_id);
2132       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2133                                           - mst->max_state_message_id);
2134       mst->max_state_message_id = tmit_msg->id;
2135     }
2136     else
2137     {
2138         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2139                     "%p master_queue_message: state not modified\n", mst);
2140       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2141     }
2142
2143     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2144     {
2145       /// @todo add state_hash to PSYC header
2146     }
2147   }
2148 }
2149
2150
2151 /**
2152  * Queue a message from a channel slave for sending to the multicast group.
2153  */
2154 static void
2155 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2156 {
2157   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2158   {
2159     struct GNUNET_PSYC_MessageMethod *pmeth
2160       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2161     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2162     tmit_msg->id = ++slv->max_request_id;
2163   }
2164 }
2165
2166
2167 /**
2168  * Queue PSYC message parts for sending to multicast.
2169  *
2170  * @param chn           Channel to send to.
2171  * @param client       Client the message originates from.
2172  * @param data_size    Size of @a data.
2173  * @param data         Concatenated message parts.
2174  * @param first_ptype  First message part type in @a data.
2175  * @param last_ptype   Last message part type in @a data.
2176  */
2177 static struct TransmitMessage *
2178 queue_message (struct Channel *chn,
2179                struct GNUNET_SERVER_Client *client,
2180                size_t data_size,
2181                const void *data,
2182                uint16_t first_ptype, uint16_t last_ptype)
2183 {
2184   struct TransmitMessage *
2185     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2186   memcpy (&tmit_msg[1], data, data_size);
2187   tmit_msg->client = client;
2188   tmit_msg->size = data_size;
2189   tmit_msg->first_ptype = first_ptype;
2190   tmit_msg->last_ptype = last_ptype;
2191
2192   /* FIXME: separate queue per message ID */
2193
2194   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2195
2196   chn->is_master
2197     ? master_queue_message ((struct Master *) chn, tmit_msg)
2198     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2199   return tmit_msg;
2200 }
2201
2202
2203 /**
2204  * Cancel transmission of current message.
2205  *
2206  * @param chn     Channel to send to.
2207  * @param client  Client the message originates from.
2208  */
2209 static void
2210 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2211 {
2212   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2213
2214   struct GNUNET_MessageHeader msg;
2215   msg.size = htons (sizeof (msg));
2216   msg.type = htons (type);
2217
2218   queue_message (chn, client, sizeof (msg), &msg, type, type);
2219   transmit_message (chn);
2220
2221   /* FIXME: cleanup */
2222 }
2223
2224
2225 /**
2226  * Incoming message from a master or slave client.
2227  */
2228 static void
2229 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2230                           const struct GNUNET_MessageHeader *msg)
2231 {
2232   struct Channel *
2233     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2234   GNUNET_assert (NULL != chn);
2235
2236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237               "%p Received message from client.\n", chn);
2238   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2239
2240   if (GNUNET_YES != chn->is_ready)
2241   {
2242     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2243                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2244     GNUNET_break (0);
2245     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2246     return;
2247   }
2248
2249   uint16_t size = ntohs (msg->size);
2250   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2251   {
2252     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2253                 "%p Message payload too large: %u < %u.\n",
2254                 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2255     GNUNET_break (0);
2256     transmit_cancel (chn, client);
2257     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2258     return;
2259   }
2260
2261   uint16_t first_ptype = 0, last_ptype = 0;
2262   if (GNUNET_SYSERR
2263       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2264                                           (const char *) &msg[1],
2265                                           &first_ptype, &last_ptype))
2266   {
2267     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2268                 "%p Received invalid message part from client.\n", chn);
2269     GNUNET_break (0);
2270     transmit_cancel (chn, client);
2271     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2272     return;
2273   }
2274   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2275               "%p Received message with first part type %u and last part type %u.\n",
2276               chn, first_ptype, last_ptype);
2277
2278   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2279                  first_ptype, last_ptype);
2280   transmit_message (chn);
2281   /* FIXME: send a few ACKs even before transmit_notify is called */
2282
2283   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2284 };
2285
2286
2287 /**
2288  * Received result of GNUNET_PSYCSTORE_membership_store()
2289  */
2290 static void
2291 store_recv_membership_store_result (void *cls, int64_t result,
2292                                     const char *err_msg, uint16_t err_msg_size)
2293 {
2294   struct Operation *op = cls;
2295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2296               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2297               op->chn, result, err_msg_size, err_msg);
2298
2299   if (NULL != op->client)
2300     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2301   op_remove (op);
2302 }
2303
2304
2305 /**
2306  * Client requests to add/remove a slave in the membership database.
2307  */
2308 static void
2309 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2310                               const struct GNUNET_MessageHeader *msg)
2311 {
2312   struct Channel *
2313     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2314   GNUNET_assert (NULL != chn);
2315
2316   const struct ChannelMembershipStoreRequest *
2317     req = (const struct ChannelMembershipStoreRequest *) msg;
2318
2319   struct Operation *op = op_add (chn, client, req->op_id, 0);
2320
2321   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2322   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2324               "%p Received membership store request from client.\n", chn);
2325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2327               chn, req->did_join, announced_at, effective_since);
2328
2329   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2330                                      req->did_join, announced_at, effective_since,
2331                                      0, /* FIXME: group_generation */
2332                                      &store_recv_membership_store_result, op);
2333   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2334 }
2335
2336
2337 /**
2338  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2339  * in response to a history request from a client.
2340  */
2341 static int
2342 store_recv_fragment_history (void *cls,
2343                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2344                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2345 {
2346   struct Operation *op = cls;
2347   if (NULL == op->client)
2348   { /* Requesting client already disconnected. */
2349     return GNUNET_NO;
2350   }
2351   struct Channel *chn = op->chn;
2352
2353   struct GNUNET_PSYC_MessageHeader *pmsg;
2354   uint16_t msize = ntohs (mmsg->header.size);
2355   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2356
2357   struct GNUNET_OperationResultMessage *
2358     res = GNUNET_malloc (sizeof (*res) + psize);
2359   res->header.size = htons (sizeof (*res) + psize);
2360   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2361   res->op_id = op->op_id;
2362   res->result_code = GNUNET_htonll (GNUNET_OK);
2363
2364   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2365   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2366   memcpy (&res[1], pmsg, psize);
2367
2368   /** @todo FIXME: send only to requesting client */
2369   client_send_msg (chn, &res->header);
2370   return GNUNET_YES;
2371 }
2372
2373
2374 /**
2375  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2376  * in response to a history request from a client.
2377  */
2378 static void
2379 store_recv_fragment_history_result (void *cls, int64_t result,
2380                                     const char *err_msg, uint16_t err_msg_size)
2381 {
2382   struct Operation *op = cls;
2383   if (NULL == op->client)
2384   { /* Requesting client already disconnected. */
2385     return;
2386   }
2387
2388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2389               "%p History replay #%" PRIu64 ": "
2390               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2391               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2392
2393   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2394   {
2395     /** @todo Multicast replay request for messages not found locally. */
2396   }
2397
2398   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2399   op_remove (op);
2400 }
2401
2402
2403 /**
2404  * Client requests channel history.
2405  */
2406 static void
2407 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2408                             const struct GNUNET_MessageHeader *msg)
2409 {
2410   struct Channel *
2411     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2412   GNUNET_assert (NULL != chn);
2413
2414   const struct GNUNET_PSYC_HistoryRequestMessage *
2415     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2416   uint16_t size = ntohs (msg->size);
2417   const char *method_prefix = (const char *) &req[1];
2418
2419   if (size < sizeof (*req) + 1
2420       || '\0' != method_prefix[size - sizeof (*req) - 1])
2421   {
2422     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2423                 "%p History replay #%" PRIu64 ": "
2424                 "invalid method prefix. size: %u < %u?\n",
2425                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2426     GNUNET_break (0);
2427     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2428     return;
2429   }
2430
2431   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2432
2433   if (0 == req->message_limit)
2434     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2435                                   GNUNET_ntohll (req->start_message_id),
2436                                   GNUNET_ntohll (req->end_message_id),
2437                                   0, method_prefix,
2438                                   &store_recv_fragment_history,
2439                                   &store_recv_fragment_history_result, op);
2440   else
2441     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2442                                          GNUNET_ntohll (req->message_limit),
2443                                          method_prefix,
2444                                          &store_recv_fragment_history,
2445                                          &store_recv_fragment_history_result,
2446                                          op);
2447
2448   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2449 }
2450
2451
2452 /**
2453  * Received state var from PSYCstore, send it to client.
2454  */
2455 static int
2456 store_recv_state_var (void *cls, const char *name,
2457                       const void *value, uint32_t value_size)
2458 {
2459   struct Operation *op = cls;
2460   struct GNUNET_OperationResultMessage *res;
2461
2462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2463               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2464               op->chn, GNUNET_ntohll (op->op_id), name);
2465
2466   if (NULL != name) /* First part */
2467   {
2468     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2469     struct GNUNET_PSYC_MessageModifier *mod;
2470     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2471     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2472     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2473     res->op_id = op->op_id;
2474
2475     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2476     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2477     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2478     mod->name_size = htons (name_size);
2479     mod->value_size = htonl (value_size);
2480     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2481     memcpy (&mod[1], name, name_size);
2482     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2483   }
2484   else /* Continuation */
2485   {
2486     struct GNUNET_MessageHeader *mod;
2487     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2488     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2489     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2490     res->op_id = op->op_id;
2491
2492     mod = (struct GNUNET_MessageHeader *) &res[1];
2493     mod->size = htons (sizeof (*mod) + value_size);
2494     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2495     memcpy (&mod[1], value, value_size);
2496   }
2497
2498   // FIXME: client might have been disconnected
2499   GNUNET_SERVER_notification_context_add (nc, op->client);
2500   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2501                                               GNUNET_NO);
2502   return GNUNET_YES;
2503 }
2504
2505
2506 /**
2507  * Received result of GNUNET_PSYCSTORE_state_get()
2508  * or GNUNET_PSYCSTORE_state_get_prefix()
2509  */
2510 static void
2511 store_recv_state_result (void *cls, int64_t result,
2512                          const char *err_msg, uint16_t err_msg_size)
2513 {
2514   struct Operation *op = cls;
2515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2516               "%p state_get #%" PRIu64 ": "
2517               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2518               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2519
2520   // FIXME: client might have been disconnected
2521   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2522   op_remove (op);
2523 }
2524
2525
2526 /**
2527  * Client requests best matching state variable from PSYCstore.
2528  */
2529 static void
2530 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2531                        const struct GNUNET_MessageHeader *msg)
2532 {
2533   struct Channel *
2534     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2535   GNUNET_assert (NULL != chn);
2536
2537   const struct StateRequest *
2538     req = (const struct StateRequest *) msg;
2539
2540   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2541   const char *name = (const char *) &req[1];
2542   if (0 == name_size || '\0' != name[name_size - 1])
2543   {
2544     GNUNET_break (0);
2545     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2546     return;
2547   }
2548
2549   struct Operation *op = op_add (chn, client, req->op_id, 0);
2550   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2551                               &store_recv_state_var,
2552                               &store_recv_state_result, op);
2553   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2554 }
2555
2556
2557 /**
2558  * Client requests state variables with a given prefix from PSYCstore.
2559  */
2560 static void
2561 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2562                               const struct GNUNET_MessageHeader *msg)
2563 {
2564   struct Channel *
2565     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2566   GNUNET_assert (NULL != chn);
2567
2568   const struct StateRequest *
2569     req = (const struct StateRequest *) msg;
2570
2571   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2572   const char *name = (const char *) &req[1];
2573   if (0 == name_size || '\0' != name[name_size - 1])
2574   {
2575     GNUNET_break (0);
2576     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2577     return;
2578   }
2579
2580   struct Operation *op = op_add (chn, client, req->op_id, 0);
2581   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2582                                      &store_recv_state_var,
2583                                      &store_recv_state_result, op);
2584   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2585 }
2586
2587
2588 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2589   { &client_recv_master_start, NULL,
2590     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2591
2592   { &client_recv_slave_join, NULL,
2593     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2594
2595   { &client_recv_join_decision, NULL,
2596     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2597
2598   { &client_recv_psyc_message, NULL,
2599     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2600
2601   { &client_recv_membership_store, NULL,
2602     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2603
2604   { &client_recv_history_replay, NULL,
2605     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2606
2607   { &client_recv_state_get, NULL,
2608     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2609
2610   { &client_recv_state_get_prefix, NULL,
2611     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2612
2613   { NULL, NULL, 0, 0 }
2614 };
2615
2616
2617 /**
2618  * Initialize the PSYC service.
2619  *
2620  * @param cls Closure.
2621  * @param server The initialized server.
2622  * @param c Configuration to use.
2623  */
2624 static void
2625 run (void *cls, struct GNUNET_SERVER_Handle *server,
2626      const struct GNUNET_CONFIGURATION_Handle *c)
2627 {
2628   cfg = c;
2629   store = GNUNET_PSYCSTORE_connect (cfg);
2630   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2631   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2632   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2633   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2634   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2635   nc = GNUNET_SERVER_notification_context_create (server, 1);
2636   GNUNET_SERVER_add_handlers (server, server_handlers);
2637   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2638   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2639                                 &shutdown_task, NULL);
2640 }
2641
2642
2643 /**
2644  * The main function for the service.
2645  *
2646  * @param argc number of arguments from the command line
2647  * @param argv command line arguments
2648  * @return 0 ok, 1 on error
2649  */
2650 int
2651 main (int argc, char *const *argv)
2652 {
2653   return (GNUNET_OK ==
2654           GNUNET_SERVICE_run (argc, argv, "psyc",
2655                               GNUNET_SERVICE_OPTION_NONE,
2656                               &run, NULL)) ? 0 : 1;
2657 }
2658
2659 /* end of gnunet-service-psyc.c */