psycstore
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Service handle.
48  */
49 static struct GNUNET_SERVICE_Handle *service;
50
51 /**
52  * Handle to the statistics service.
53  */
54 static struct GNUNET_STATISTICS_Handle *stats;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVICE_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct ClientList
189 {
190   struct ClientList *prev;
191   struct ClientList *next;
192
193   struct GNUNET_SERVICE_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVICE_Client *client;
203   struct Channel *channel;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct ClientList *clients_head;
215   struct ClientList *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this channel ready to receive messages from client?
274    * #GNUNET_YES or #GNUNET_NO
275    */
276   uint8_t is_ready;
277
278   /**
279    * Is the client disconnected?
280    * #GNUNET_YES or #GNUNET_NO
281    */
282   uint8_t is_disconnected;
283
284   /**
285    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
286    */
287   uint8_t is_master;
288
289   union {
290     struct Master *master;
291     struct Slave *slave;
292   };
293 };
294
295
296 /**
297  * Client context for a channel master.
298  */
299 struct Master
300 {
301   /**
302    * Channel struct common for Master and Slave
303    */
304   struct Channel channel;
305
306   /**
307    * Private key of the channel.
308    */
309   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
310
311   /**
312    * Handle for the multicast origin.
313    */
314   struct GNUNET_MULTICAST_Origin *origin;
315
316   /**
317    * Transmit handle for multicast.
318    */
319   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
320
321   /**
322    * Incoming join requests from multicast.
323    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
324    */
325   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
326
327   /**
328    * Last message ID transmitted to this channel.
329    *
330    * Incremented before sending a message, thus the message_id in messages sent
331    * starts from 1.
332    */
333   uint64_t max_message_id;
334
335   /**
336    * ID of the last message with state operations transmitted to the channel.
337    * 0 if there is no such message.
338    */
339   uint64_t max_state_message_id;
340
341   /**
342    * Maximum group generation transmitted to the channel.
343    */
344   uint64_t max_group_generation;
345
346   /**
347    * @see enum GNUNET_PSYC_Policy
348    */
349   enum GNUNET_PSYC_Policy policy;
350 };
351
352
353 /**
354  * Client context for a channel slave.
355  */
356 struct Slave
357 {
358   /**
359    * Channel struct common for Master and Slave
360    */
361   struct Channel channel;
362
363   /**
364    * Private key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
367
368   /**
369    * Public key of the slave.
370    */
371   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
372
373   /**
374    * Hash of @a pub_key.
375    */
376   struct GNUNET_HashCode pub_key_hash;
377
378   /**
379    * Handle for the multicast member.
380    */
381   struct GNUNET_MULTICAST_Member *member;
382
383   /**
384    * Transmit handle for multicast.
385    */
386   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
387
388   /**
389    * Peer identity of the origin.
390    */
391   struct GNUNET_PeerIdentity origin;
392
393   /**
394    * Number of items in @a relays.
395    */
396   uint32_t relay_count;
397
398   /**
399    * Relays that multicast can use to connect.
400    */
401   struct GNUNET_PeerIdentity *relays;
402
403   /**
404    * Join request to be transmitted to the master on join.
405    */
406   struct GNUNET_PSYC_Message *join_msg;
407
408   /**
409    * Join decision received from multicast.
410    */
411   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
412
413   /**
414    * Maximum request ID for this channel.
415    */
416   uint64_t max_request_id;
417
418   /**
419    * Join flags.
420    */
421   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
422 };
423
424
425 /**
426  * Client context.
427  */
428 struct Client {
429   struct GNUNET_SERVICE_Client *client;
430   struct Channel *channel;
431 };
432
433
434 struct ReplayRequestKey
435 {
436   uint64_t fragment_id;
437   uint64_t message_id;
438   uint64_t fragment_offset;
439   uint64_t flags;
440 };
441
442
443 static void
444 transmit_message (struct Channel *chn);
445
446 static uint64_t
447 message_queue_run (struct Channel *chn);
448
449 static uint64_t
450 message_queue_drop (struct Channel *chn);
451
452
453 static void
454 schedule_transmit_message (void *cls)
455 {
456   struct Channel *chn = cls;
457
458   transmit_message (chn);
459 }
460
461
462 /**
463  * Task run during shutdown.
464  *
465  * @param cls unused
466  */
467 static void
468 shutdown_task (void *cls)
469 {
470   if (NULL != stats)
471   {
472     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
473     stats = NULL;
474   }
475 }
476
477
478 static struct Operation *
479 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
480         uint64_t op_id, uint32_t flags)
481 {
482   struct Operation *op = GNUNET_malloc (sizeof (*op));
483   op->client = client;
484   op->channel = chn;
485   op->op_id = op_id;
486   op->flags = flags;
487   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
488   return op;
489 }
490
491
492 static void
493 op_remove (struct Operation *op)
494 {
495   GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
496   GNUNET_free (op);
497 }
498
499
500 /**
501  * Clean up master data structures after a client disconnected.
502  */
503 static void
504 cleanup_master (struct Master *mst)
505 {
506   struct Channel *chn = &mst->channel;
507
508   if (NULL != mst->origin)
509     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
510   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
511   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
512 }
513
514
515 /**
516  * Clean up slave data structures after a client disconnected.
517  */
518 static void
519 cleanup_slave (struct Slave *slv)
520 {
521   struct Channel *chn = &slv->channel;
522   struct GNUNET_CONTAINER_MultiHashMap *
523     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
524                                                 &chn->pub_key_hash);
525   GNUNET_assert (NULL != chn_slv);
526   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
527
528   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
529   {
530     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
531                                           chn_slv);
532     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
533   }
534   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
535
536   if (NULL != slv->join_msg)
537   {
538     GNUNET_free (slv->join_msg);
539     slv->join_msg = NULL;
540   }
541   if (NULL != slv->relays)
542   {
543     GNUNET_free (slv->relays);
544     slv->relays = NULL;
545   }
546   if (NULL != slv->member)
547   {
548     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
549     slv->member = NULL;
550   }
551   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
552 }
553
554
555 /**
556  * Clean up channel data structures after a client disconnected.
557  */
558 static void
559 cleanup_channel (struct Channel *chn)
560 {
561   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562               "%p Cleaning up channel %s. master? %u\n",
563               chn,
564               GNUNET_h2s (&chn->pub_key_hash),
565               chn->is_master);
566   message_queue_drop (chn);
567   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
568   chn->recv_frags = NULL;
569
570   if (NULL != chn->store_op)
571   {
572     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
573     chn->store_op = NULL;
574   }
575
576   (GNUNET_YES == chn->is_master)
577     ? cleanup_master (chn->master)
578     : cleanup_slave (chn->slave);
579   GNUNET_free (chn);
580 }
581
582
583 /**
584  * Called whenever a client is disconnected.
585  * Frees our resources associated with that client.
586  *
587  * @param cls closure
588  * @param client identification of the client
589  * @param app_ctx must match @a client
590  */
591 static void
592 client_notify_disconnect (void *cls,
593                           struct GNUNET_SERVICE_Client *client,
594                           void *app_ctx)
595 {
596   struct Client *c = app_ctx;
597   struct Channel *chn = c->channel;
598   GNUNET_free (c);
599
600   if (NULL == chn)
601   {
602     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603                 "%p User context is NULL in client_disconnect()\n",
604                 chn);
605     GNUNET_break (0);
606     return;
607   }
608
609   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610               "%p Client (%s) disconnected from channel %s\n",
611               chn,
612               (GNUNET_YES == chn->is_master) ? "master" : "slave",
613               GNUNET_h2s (&chn->pub_key_hash));
614
615   struct ClientList *cli = chn->clients_head;
616   while (NULL != cli)
617   {
618     if (cli->client == client)
619     {
620       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
621       GNUNET_free (cli);
622       break;
623     }
624     cli = cli->next;
625   }
626
627   struct Operation *op = chn->op_head;
628   while (NULL != op)
629   {
630     if (op->client == client)
631     {
632       op->client = NULL;
633       break;
634     }
635     op = op->next;
636   }
637
638   if (NULL == chn->clients_head)
639   { /* Last client disconnected. */
640     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
641                 "%p Last client (%s) disconnected from channel %s\n",
642                 chn,
643                 (GNUNET_YES == chn->is_master) ? "master" : "slave",
644                 GNUNET_h2s (&chn->pub_key_hash));
645     chn->is_disconnected = GNUNET_YES;
646     if (NULL != chn->tmit_head)
647     { /* Send pending messages to multicast before cleanup. */
648       transmit_message (chn);
649     }
650     else
651     {
652       cleanup_channel (chn);
653     }
654   }
655 }
656
657
658 /**
659  * A new client connected.
660  *
661  * @param cls NULL
662  * @param client client to add
663  * @param mq message queue for @a client
664  * @return @a client
665  */
666 static void *
667 client_notify_connect (void *cls,
668                        struct GNUNET_SERVICE_Client *client,
669                        struct GNUNET_MQ_Handle *mq)
670 {
671   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
672
673   struct Client *c = GNUNET_malloc (sizeof (*c));
674   c->client = client;
675
676   return c;
677 }
678
679
680 /**
681  * Send message to all clients connected to the channel.
682  */
683 static void
684 client_send_msg (const struct Channel *chn,
685                  const struct GNUNET_MessageHeader *msg)
686 {
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688               "%p Sending message to clients.\n",
689               chn);
690
691   struct ClientList *cli = chn->clients_head;
692   while (NULL != cli)
693   {
694     struct GNUNET_MQ_Envelope *
695       env = GNUNET_MQ_msg_copy (msg);
696
697     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
698                     env);
699
700     cli = cli->next;
701   }
702 }
703
704
705 /**
706  * Send a result code back to the client.
707  *
708  * @param client
709  *        Client that should receive the result code.
710  * @param result_code
711  *        Code to transmit.
712  * @param op_id
713  *        Operation ID in network byte order.
714  * @param data
715  *        Data payload or NULL.
716  * @param data_size
717  *        Size of @a data.
718  */
719 static void
720 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
721                     int64_t result_code, const void *data, uint16_t data_size)
722 {
723   struct GNUNET_OperationResultMessage *res;
724   struct GNUNET_MQ_Envelope *
725     env = GNUNET_MQ_msg_extra (res,
726                                data_size,
727                                GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
728   res->result_code = GNUNET_htonll (result_code);
729   res->op_id = op_id;
730   if (0 < data_size)
731     GNUNET_memcpy (&res[1], data, data_size);
732
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
735               client,
736               GNUNET_ntohll (op_id),
737               result_code,
738               data_size);
739
740   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
741 }
742
743
744 /**
745  * Closure for join_mem_test_cb()
746  */
747 struct JoinMemTestClosure
748 {
749   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
750   struct Channel *channel;
751   struct GNUNET_MULTICAST_JoinHandle *join_handle;
752   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
753 };
754
755
756 /**
757  * Membership test result callback used for join requests.
758  */
759 static void
760 join_mem_test_cb (void *cls, int64_t result,
761                   const char *err_msg, uint16_t err_msg_size)
762 {
763   struct JoinMemTestClosure *jcls = cls;
764
765   if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
766   { /* Pass on join request to client if this is a master channel */
767     struct Master *mst = jcls->channel->master;
768     struct GNUNET_HashCode slave_pub_hash;
769     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
770                         &slave_pub_hash);
771     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
772                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
773     client_send_msg (jcls->channel, &jcls->join_msg->header);
774   }
775   else
776   {
777     if (GNUNET_SYSERR == result)
778     {
779       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
780                   "Could not perform membership test (%.*s)\n",
781                   err_msg_size, err_msg);
782     }
783     // FIXME: add relays
784     GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
785   }
786   GNUNET_free (jcls->join_msg);
787   GNUNET_free (jcls);
788 }
789
790
791 /**
792  * Incoming join request from multicast.
793  */
794 static void
795 mcast_recv_join_request (void *cls,
796                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
797                          const struct GNUNET_MessageHeader *join_msg,
798                          struct GNUNET_MULTICAST_JoinHandle *jh)
799 {
800   struct Channel *chn = cls;
801   uint16_t join_msg_size = 0;
802
803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804               "%p Got join request.\n",
805               chn);
806   if (NULL != join_msg)
807   {
808     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
809     {
810       join_msg_size = ntohs (join_msg->size);
811     }
812     else
813     {
814       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
815                   "%p Got join message with invalid type %u.\n",
816                   chn,
817                   ntohs (join_msg->type));
818     }
819   }
820
821   struct GNUNET_PSYC_JoinRequestMessage *
822     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
823   req->header.size = htons (sizeof (*req) + join_msg_size);
824   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
825   req->slave_pub_key = *slave_pub_key;
826   if (0 < join_msg_size)
827     GNUNET_memcpy (&req[1], join_msg, join_msg_size);
828
829   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
830   jcls->slave_pub_key = *slave_pub_key;
831   jcls->channel = chn;
832   jcls->join_handle = jh;
833   jcls->join_msg = req;
834
835   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
836                                     chn->max_message_id, 0,
837                                     &join_mem_test_cb, jcls);
838 }
839
840
841 /**
842  * Join decision received from multicast.
843  */
844 static void
845 mcast_recv_join_decision (void *cls, int is_admitted,
846                           const struct GNUNET_PeerIdentity *peer,
847                           uint16_t relay_count,
848                           const struct GNUNET_PeerIdentity *relays,
849                           const struct GNUNET_MessageHeader *join_resp)
850 {
851   struct Slave *slv = cls;
852   struct Channel *chn = &slv->channel;
853   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854               "%p Got join decision: %d\n",
855               slv,
856               is_admitted);
857   if (GNUNET_YES == chn->is_ready)
858   {
859     /* Already admitted */
860     return;
861   }
862
863   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
864   struct GNUNET_PSYC_JoinDecisionMessage *
865     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
866   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
867   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
868   dcsn->is_admitted = htonl (is_admitted);
869   if (0 < join_resp_size)
870     GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
871
872   client_send_msg (chn, &dcsn->header);
873
874   if (GNUNET_YES == is_admitted
875       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
876   {
877     chn->is_ready = GNUNET_YES;
878   }
879 }
880
881
882 static int
883 store_recv_fragment_replay (void *cls,
884                             struct GNUNET_MULTICAST_MessageHeader *msg,
885                             enum GNUNET_PSYCSTORE_MessageFlags flags)
886 {
887   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
888
889   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
890   return GNUNET_YES;
891 }
892
893
894 /**
895  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
896  */
897 static void
898 store_recv_fragment_replay_result (void *cls,
899                                    int64_t result,
900                                    const char *err_msg,
901                                    uint16_t err_msg_size)
902 {
903   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
904
905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
907               rh,
908               result,
909               err_msg_size,
910               err_msg);
911   switch (result)
912   {
913   case GNUNET_YES:
914     break;
915
916   case GNUNET_NO:
917     GNUNET_MULTICAST_replay_response (rh, NULL,
918                                       GNUNET_MULTICAST_REC_NOT_FOUND);
919     return;
920
921   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
922     GNUNET_MULTICAST_replay_response (rh, NULL,
923                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
924     return;
925
926   case GNUNET_SYSERR:
927     GNUNET_MULTICAST_replay_response (rh, NULL,
928                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
929     return;
930   }
931   /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
932    * an error code, so it must be ensured no further processing
933    * is attempted on 'rh'. Maybe this should be refactored as
934    * it doesn't look very intuitive.    --lynX
935    */
936   GNUNET_MULTICAST_replay_response_end (rh);
937 }
938
939
940 /**
941  * Incoming fragment replay request from multicast.
942  */
943 static void
944 mcast_recv_replay_fragment (void *cls,
945                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
946                             uint64_t fragment_id, uint64_t flags,
947                             struct GNUNET_MULTICAST_ReplayHandle *rh)
948
949 {
950   struct Channel *chn = cls;
951   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
952                                  fragment_id, fragment_id,
953                                  &store_recv_fragment_replay,
954                                  &store_recv_fragment_replay_result, rh);
955 }
956
957
958 /**
959  * Incoming message replay request from multicast.
960  */
961 static void
962 mcast_recv_replay_message (void *cls,
963                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
964                            uint64_t message_id,
965                            uint64_t fragment_offset,
966                            uint64_t flags,
967                            struct GNUNET_MULTICAST_ReplayHandle *rh)
968 {
969   struct Channel *chn = cls;
970   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
971                                 message_id, message_id, 1, NULL,
972                                 &store_recv_fragment_replay,
973                                 &store_recv_fragment_replay_result, rh);
974 }
975
976
977 /**
978  * Convert an uint64_t in network byte order to a HashCode
979  * that can be used as key in a MultiHashMap
980  */
981 static inline void
982 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
983 {
984   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
985   /* TODO: use built-in byte swap functions if available */
986
987   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
988   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
989
990   *key = (struct GNUNET_HashCode) {};
991   *((uint64_t *) key)
992     = (n << 32) | (n >> 32);
993 }
994
995
996 /**
997  * Convert an uint64_t in host byte order to a HashCode
998  * that can be used as key in a MultiHashMap
999  */
1000 static inline void
1001 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
1002 {
1003 #if __BYTE_ORDER == __BIG_ENDIAN
1004   hash_key_from_nll (key, n);
1005 #elif __BYTE_ORDER == __LITTLE_ENDIAN
1006   *key = (struct GNUNET_HashCode) {};
1007   *((uint64_t *) key) = n;
1008 #else
1009   #error byteorder undefined
1010 #endif
1011 }
1012
1013
1014 /**
1015  * Initialize PSYC message header.
1016  */
1017 static inline void
1018 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1019                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1020 {
1021   uint16_t size = ntohs (mmsg->header.size);
1022   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1023
1024   pmsg->header.size = htons (psize);
1025   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1026   pmsg->message_id = mmsg->message_id;
1027   pmsg->fragment_offset = mmsg->fragment_offset;
1028   pmsg->flags = htonl (flags);
1029
1030   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1031 }
1032
1033
1034 /**
1035  * Create a new PSYC message from a multicast message for sending it to clients.
1036  */
1037 static inline struct GNUNET_PSYC_MessageHeader *
1038 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1039 {
1040   struct GNUNET_PSYC_MessageHeader *pmsg;
1041   uint16_t size = ntohs (mmsg->header.size);
1042   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1043
1044   pmsg = GNUNET_malloc (psize);
1045   psyc_msg_init (pmsg, mmsg, flags);
1046   return pmsg;
1047 }
1048
1049
1050 /**
1051  * Send multicast message to all clients connected to the channel.
1052  */
1053 static void
1054 client_send_mcast_msg (struct Channel *chn,
1055                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1056                        uint32_t flags)
1057 {
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1060               chn,
1061               GNUNET_ntohll (mmsg->fragment_id),
1062               GNUNET_ntohll (mmsg->message_id));
1063
1064   struct GNUNET_PSYC_MessageHeader *
1065     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1066   client_send_msg (chn, &pmsg->header);
1067   GNUNET_free (pmsg);
1068 }
1069
1070
1071 /**
1072  * Send multicast request to all clients connected to the channel.
1073  */
1074 static void
1075 client_send_mcast_req (struct Master *mst,
1076                        const struct GNUNET_MULTICAST_RequestHeader *req)
1077 {
1078   struct Channel *chn = &mst->channel;
1079
1080   struct GNUNET_PSYC_MessageHeader *pmsg;
1081   uint16_t size = ntohs (req->header.size);
1082   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1083
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1086               chn,
1087               GNUNET_ntohll (req->fragment_id),
1088               GNUNET_ntohll (req->request_id));
1089
1090   pmsg = GNUNET_malloc (psize);
1091   pmsg->header.size = htons (psize);
1092   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1093   pmsg->message_id = req->request_id;
1094   pmsg->fragment_offset = req->fragment_offset;
1095   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1096   pmsg->slave_pub_key = req->member_pub_key;
1097   GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1098
1099   client_send_msg (chn, &pmsg->header);
1100
1101   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1102
1103   GNUNET_free (pmsg);
1104 }
1105
1106
1107 /**
1108  * Insert a multicast message fragment into the queue belonging to the message.
1109  *
1110  * @param chn          Channel.
1111  * @param mmsg         Multicast message fragment.
1112  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1113  * @param first_ptype  First PSYC message part type in @a mmsg.
1114  * @param last_ptype   Last PSYC message part type in @a mmsg.
1115  */
1116 static void
1117 fragment_queue_insert (struct Channel *chn,
1118                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1119                        uint16_t first_ptype, uint16_t last_ptype)
1120 {
1121   const uint16_t size = ntohs (mmsg->header.size);
1122   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1123   struct GNUNET_CONTAINER_MultiHashMap
1124     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1125                                                     &chn->pub_key_hash);
1126
1127   struct GNUNET_HashCode msg_id_hash;
1128   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1129
1130   struct FragmentQueue
1131     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1132
1133   if (NULL == fragq)
1134   {
1135     fragq = GNUNET_malloc (sizeof (*fragq));
1136     fragq->state = MSG_FRAG_STATE_HEADER;
1137     fragq->fragments
1138       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1139
1140     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1141                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1142
1143     if (NULL == chan_msgs)
1144     {
1145       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1146       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1147                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1148     }
1149   }
1150
1151   struct GNUNET_HashCode frag_id_hash;
1152   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1153   struct RecvCacheEntry
1154     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1155   if (NULL == cache_entry)
1156   {
1157     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158                 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1159                 chn,
1160                 GNUNET_ntohll (mmsg->message_id),
1161                 GNUNET_ntohll (mmsg->fragment_id));
1162     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163                 "%p header_size: %" PRIu64 " + %u\n",
1164                 chn,
1165                 fragq->header_size,
1166                 size);
1167     cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1168     cache_entry->ref_count = 1;
1169     cache_entry->mmsg = GNUNET_malloc (size);
1170     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1171     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1172                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1173   }
1174   else
1175   {
1176     cache_entry->ref_count++;
1177     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178                 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1179                 chn,
1180                 GNUNET_ntohll (mmsg->message_id),
1181                 GNUNET_ntohll (mmsg->fragment_id),
1182                 cache_entry->ref_count);
1183   }
1184
1185   if (MSG_FRAG_STATE_HEADER == fragq->state)
1186   {
1187     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1188     {
1189       struct GNUNET_PSYC_MessageMethod *
1190         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1191       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1192       fragq->flags = ntohl (pmeth->flags);
1193     }
1194
1195     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1196     {
1197       fragq->header_size += size;
1198     }
1199     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1200              || frag_offset == fragq->header_size)
1201     { /* header is now complete */
1202       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1203                   "%p Header of message %" PRIu64 " is complete.\n",
1204                   chn,
1205                   GNUNET_ntohll (mmsg->message_id));
1206
1207       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1208                   "%p Adding message %" PRIu64 " to queue.\n",
1209                   chn,
1210                   GNUNET_ntohll (mmsg->message_id));
1211       fragq->state = MSG_FRAG_STATE_DATA;
1212     }
1213     else
1214     {
1215       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1216                   "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1217                   chn,
1218                   GNUNET_ntohll (mmsg->message_id),
1219                   frag_offset,
1220                   fragq->header_size);
1221     }
1222   }
1223
1224   switch (last_ptype)
1225   {
1226   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1227     if (frag_offset == fragq->size)
1228       fragq->state = MSG_FRAG_STATE_END;
1229     else
1230       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1231                   "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1232                   chn,
1233                   GNUNET_ntohll (mmsg->message_id),
1234                   frag_offset,
1235                   fragq->size);
1236     break;
1237
1238   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1239     /* Drop message without delivering to client if it's a single fragment */
1240     fragq->state =
1241       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1242       ? MSG_FRAG_STATE_DROP
1243       : MSG_FRAG_STATE_CANCEL;
1244   }
1245
1246   switch (fragq->state)
1247   {
1248   case MSG_FRAG_STATE_DATA:
1249   case MSG_FRAG_STATE_END:
1250   case MSG_FRAG_STATE_CANCEL:
1251     if (GNUNET_NO == fragq->is_queued)
1252     {
1253       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1254                                     GNUNET_ntohll (mmsg->message_id));
1255       fragq->is_queued = GNUNET_YES;
1256     }
1257   }
1258
1259   fragq->size += size;
1260   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1261                                 GNUNET_ntohll (mmsg->fragment_id));
1262 }
1263
1264
1265 /**
1266  * Run fragment queue of a message.
1267  *
1268  * Send fragments of a message in order to client, after all modifiers arrived
1269  * from multicast.
1270  *
1271  * @param chn
1272  *        Channel.
1273  * @param msg_id
1274  *        ID of the message @a fragq belongs to.
1275  * @param fragq
1276  *        Fragment queue of the message.
1277  * @param drop
1278  *        Drop message without delivering to client?
1279  *        #GNUNET_YES or #GNUNET_NO.
1280  */
1281 static void
1282 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1283                     struct FragmentQueue *fragq, uint8_t drop)
1284 {
1285   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1286               "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1287               chn,
1288               msg_id,
1289               fragq->state);
1290
1291   struct GNUNET_CONTAINER_MultiHashMap
1292     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1293                                                     &chn->pub_key_hash);
1294   GNUNET_assert (NULL != chan_msgs);
1295   uint64_t frag_id;
1296
1297   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1298                                                     &frag_id))
1299   {
1300     struct GNUNET_HashCode frag_id_hash;
1301     hash_key_from_hll (&frag_id_hash, frag_id);
1302     struct RecvCacheEntry *cache_entry
1303       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1304     if (cache_entry != NULL)
1305     {
1306       if (GNUNET_NO == drop)
1307       {
1308         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1309       }
1310       if (cache_entry->ref_count <= 1)
1311       {
1312         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1313                                               cache_entry);
1314         GNUNET_free (cache_entry->mmsg);
1315         GNUNET_free (cache_entry);
1316       }
1317       else
1318       {
1319         cache_entry->ref_count--;
1320       }
1321     }
1322 #if CACHE_AGING_IMPLEMENTED
1323     else if (GNUNET_NO == drop)
1324     {
1325       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1326     }
1327 #endif
1328
1329     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1330   }
1331
1332   if (MSG_FRAG_STATE_END <= fragq->state)
1333   {
1334     struct GNUNET_HashCode msg_id_hash;
1335     hash_key_from_hll (&msg_id_hash, msg_id);
1336
1337     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1338     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1339     GNUNET_free (fragq);
1340   }
1341   else
1342   {
1343     fragq->is_queued = GNUNET_NO;
1344   }
1345 }
1346
1347
1348 struct StateModifyClosure
1349 {
1350   struct Channel *channel;
1351   uint64_t msg_id;
1352   struct GNUNET_HashCode msg_id_hash;
1353 };
1354
1355
1356 void
1357 store_recv_state_modify_result (void *cls, int64_t result,
1358                                 const char *err_msg, uint16_t err_msg_size)
1359 {
1360   struct StateModifyClosure *mcls = cls;
1361   struct Channel *chn = mcls->channel;
1362   uint64_t msg_id = mcls->msg_id;
1363
1364   struct FragmentQueue *
1365     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1366
1367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1368               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1369               chn, result, err_msg_size, err_msg);
1370
1371   switch (result)
1372   {
1373   case GNUNET_OK:
1374   case GNUNET_NO:
1375     if (NULL != fragq)
1376       fragq->state_is_modified = GNUNET_YES;
1377     if (chn->max_state_message_id < msg_id)
1378       chn->max_state_message_id = msg_id;
1379     if (chn->max_message_id < msg_id)
1380       chn->max_message_id = msg_id;
1381
1382     if (NULL != fragq)
1383       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1384     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1385     message_queue_run (chn);
1386     break;
1387
1388   default:
1389     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1390                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1391                 chn, result, err_msg_size, err_msg);
1392     /** @todo FIXME: handle state_modify error */
1393   }
1394 }
1395
1396
1397 /**
1398  * Run message queue.
1399  *
1400  * Send messages in queue to client in order after a message has arrived from
1401  * multicast, according to the following:
1402  * - A message is only sent if all of its modifiers arrived.
1403  * - A stateful message is only sent if the previous stateful message
1404  *   has already been delivered to the client.
1405  *
1406  * @param chn  Channel.
1407  *
1408  * @return Number of messages removed from queue and sent to client.
1409  */
1410 static uint64_t
1411 message_queue_run (struct Channel *chn)
1412 {
1413   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1414               "%p Running message queue.\n", chn);
1415   uint64_t n = 0;
1416   uint64_t msg_id;
1417
1418   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1419                                                     &msg_id))
1420   {
1421     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1422                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1423     struct GNUNET_HashCode msg_id_hash;
1424     hash_key_from_hll (&msg_id_hash, msg_id);
1425
1426     struct FragmentQueue *
1427       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1428
1429     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1430     {
1431       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1432                   "%p No fragq (%p) or header not complete.\n",
1433                   chn, fragq);
1434       break;
1435     }
1436
1437     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438                 "%p Fragment queue entry:  state: %u, state delta: "
1439                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1440                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1441
1442     if (MSG_FRAG_STATE_DATA <= fragq->state)
1443     {
1444       /* Check if there's a missing message before the current one */
1445       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1446       {
1447         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1448
1449         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1450             && (chn->max_message_id != msg_id - 1
1451                 && chn->max_message_id != msg_id))
1452         {
1453           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1454                       "%p Out of order message. "
1455                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1456                       chn, chn->max_message_id, msg_id);
1457           break;
1458           // FIXME: keep track of messages processed in this queue run,
1459           //        and only stop after reaching the end
1460         }
1461       }
1462       else
1463       {
1464         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1465         if (GNUNET_YES != fragq->state_is_modified)
1466         {
1467           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1468           {
1469             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1470                         "%p Out of order stateful message. "
1471                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1472                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1473             break;
1474             // FIXME: keep track of messages processed in this queue run,
1475             //        and only stop after reaching the end
1476           }
1477
1478           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1479           mcls->channel = chn;
1480           mcls->msg_id = msg_id;
1481           mcls->msg_id_hash = msg_id_hash;
1482
1483           /* Apply modifiers to state in PSYCstore */
1484           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1485                                          fragq->state_delta,
1486                                          store_recv_state_modify_result, mcls);
1487           break; // continue after asynchronous state modify result
1488         }
1489       }
1490       chn->max_message_id = msg_id;
1491     }
1492     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1493     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1494     n++;
1495   }
1496
1497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1498               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1499   return n;
1500 }
1501
1502
1503 /**
1504  * Drop message queue of a channel.
1505  *
1506  * Remove all messages in queue without sending it to clients.
1507  *
1508  * @param chn  Channel.
1509  *
1510  * @return Number of messages removed from queue.
1511  */
1512 static uint64_t
1513 message_queue_drop (struct Channel *chn)
1514 {
1515   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1516               "%p Dropping message queue.\n", chn);
1517   uint64_t n = 0;
1518   uint64_t msg_id;
1519   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1520                                                     &msg_id))
1521   {
1522     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1523                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1524     struct GNUNET_HashCode msg_id_hash;
1525     hash_key_from_hll (&msg_id_hash, msg_id);
1526
1527     struct FragmentQueue *
1528       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1529     GNUNET_assert (NULL != fragq);
1530     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1531     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1532     n++;
1533   }
1534   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1535               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1536   return n;
1537 }
1538
1539
1540 /**
1541  * Received result of GNUNET_PSYCSTORE_fragment_store().
1542  */
1543 static void
1544 store_recv_fragment_store_result (void *cls, int64_t result,
1545                                   const char *err_msg, uint16_t err_msg_size)
1546 {
1547   struct Channel *chn = cls;
1548   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1549               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1550               chn, result, err_msg_size, err_msg);
1551 }
1552
1553
1554 /**
1555  * Handle incoming message fragment from multicast.
1556  *
1557  * Store it using PSYCstore and send it to the clients of the channel in order.
1558  */
1559 static void
1560 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1561 {
1562   struct Channel *chn = cls;
1563   uint16_t size = ntohs (mmsg->header.size);
1564
1565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566               "%p Received multicast message of size %u. "
1567               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1568               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1569               chn, size,
1570               GNUNET_ntohll (mmsg->fragment_id),
1571               GNUNET_ntohll (mmsg->message_id),
1572               GNUNET_ntohll (mmsg->fragment_offset),
1573               GNUNET_ntohll (mmsg->flags));
1574
1575   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1576                                    &store_recv_fragment_store_result, chn);
1577
1578   uint16_t first_ptype = 0, last_ptype = 0;
1579   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1580                                                (const char *) &mmsg[1],
1581                                                &first_ptype, &last_ptype);
1582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1583               "%p Message check result %d, first part type %u, last part type %u\n",
1584               chn, check, first_ptype, last_ptype);
1585   if (GNUNET_SYSERR == check)
1586   {
1587     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1588                 "%p Dropping incoming multicast message with invalid parts.\n",
1589                 chn);
1590     GNUNET_break_op (0);
1591     return;
1592   }
1593
1594   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1595   message_queue_run (chn);
1596 }
1597
1598
1599 /**
1600  * Incoming request fragment from multicast for a master.
1601  *
1602  * @param cls   Master.
1603  * @param req   The request.
1604  */
1605 static void
1606 mcast_recv_request (void *cls,
1607                     const struct GNUNET_MULTICAST_RequestHeader *req)
1608 {
1609   struct Master *mst = cls;
1610   uint16_t size = ntohs (req->header.size);
1611
1612   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614               "%p Received multicast request of size %u from %s.\n",
1615               mst, size, str);
1616   GNUNET_free (str);
1617
1618   uint16_t first_ptype = 0, last_ptype = 0;
1619   if (GNUNET_SYSERR
1620       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1621                                           (const char *) &req[1],
1622                                           &first_ptype, &last_ptype))
1623   {
1624     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1625                 "%p Dropping incoming multicast request with invalid parts.\n",
1626                 mst);
1627     GNUNET_break_op (0);
1628     return;
1629   }
1630
1631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1632               "Message parts: first: type %u, last: type %u\n",
1633               first_ptype, last_ptype);
1634
1635   /* FIXME: in-order delivery */
1636   client_send_mcast_req (mst, req);
1637 }
1638
1639
1640 /**
1641  * Response from PSYCstore with the current counter values for a channel master.
1642  */
1643 static void
1644 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1645                             uint64_t max_message_id, uint64_t max_group_generation,
1646                             uint64_t max_state_message_id)
1647 {
1648   struct Master *mst = cls;
1649   struct Channel *chn = &mst->channel;
1650   chn->store_op = NULL;
1651
1652   struct GNUNET_PSYC_CountersResultMessage res;
1653   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1654   res.header.size = htons (sizeof (res));
1655   res.result_code = htonl (result);
1656   res.max_message_id = GNUNET_htonll (max_message_id);
1657
1658   if (GNUNET_OK == result || GNUNET_NO == result)
1659   {
1660     mst->max_message_id = max_message_id;
1661     chn->max_message_id = max_message_id;
1662     chn->max_state_message_id = max_state_message_id;
1663     mst->max_group_generation = max_group_generation;
1664     mst->origin
1665       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1666                                        mcast_recv_join_request,
1667                                        mcast_recv_replay_fragment,
1668                                        mcast_recv_replay_message,
1669                                        mcast_recv_request,
1670                                        mcast_recv_message, chn);
1671     chn->is_ready = GNUNET_YES;
1672   }
1673   else
1674   {
1675     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1676                 "%p GNUNET_PSYCSTORE_counters_get() "
1677                 "returned %d for channel %s.\n",
1678                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1679   }
1680
1681   client_send_msg (chn, &res.header);
1682 }
1683
1684
1685 /**
1686  * Response from PSYCstore with the current counter values for a channel slave.
1687  */
1688 void
1689 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1690                            uint64_t max_message_id, uint64_t max_group_generation,
1691                            uint64_t max_state_message_id)
1692 {
1693   struct Slave *slv = cls;
1694   struct Channel *chn = &slv->channel;
1695   chn->store_op = NULL;
1696
1697   struct GNUNET_PSYC_CountersResultMessage res;
1698   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1699   res.header.size = htons (sizeof (res));
1700   res.result_code = htonl (result);
1701   res.max_message_id = GNUNET_htonll (max_message_id);
1702
1703   if (GNUNET_OK == result || GNUNET_NO == result)
1704   {
1705     chn->max_message_id = max_message_id;
1706     chn->max_state_message_id = max_state_message_id;
1707     slv->member
1708       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1709                                       &slv->origin,
1710                                       slv->relay_count, slv->relays,
1711                                       &slv->join_msg->header,
1712                                       mcast_recv_join_request,
1713                                       mcast_recv_join_decision,
1714                                       mcast_recv_replay_fragment,
1715                                       mcast_recv_replay_message,
1716                                       mcast_recv_message, chn);
1717     if (NULL != slv->join_msg)
1718     {
1719       GNUNET_free (slv->join_msg);
1720       slv->join_msg = NULL;
1721     }
1722   }
1723   else
1724   {
1725     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1726                 "%p GNUNET_PSYCSTORE_counters_get() "
1727                 "returned %d for channel %s.\n",
1728                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1729   }
1730
1731   client_send_msg (chn, &res.header);
1732 }
1733
1734
1735 static void
1736 channel_init (struct Channel *chn)
1737 {
1738   chn->recv_msgs
1739     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1740   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1741 }
1742
1743
1744 /**
1745  * Handle a connecting client starting a channel master.
1746  */
1747 static void
1748 handle_client_master_start (void *cls,
1749                             const struct MasterStartRequest *req)
1750 {
1751   struct Client *c = cls;
1752   struct GNUNET_SERVICE_Client *client = c->client;
1753
1754   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1755   struct GNUNET_HashCode pub_key_hash;
1756
1757   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1758   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1759
1760   struct Master *
1761     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1762   struct Channel *chn;
1763
1764   if (NULL == mst)
1765   {
1766     mst = GNUNET_malloc (sizeof (*mst));
1767     mst->policy = ntohl (req->policy);
1768     mst->priv_key = req->channel_key;
1769     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1770
1771     chn = c->channel = &mst->channel;
1772     chn->master = mst;
1773     chn->is_master = GNUNET_YES;
1774     chn->pub_key = pub_key;
1775     chn->pub_key_hash = pub_key_hash;
1776     channel_init (chn);
1777
1778     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1779                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1780     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1781                                                    store_recv_master_counters, mst);
1782   }
1783   else
1784   {
1785     chn = &mst->channel;
1786
1787     struct GNUNET_PSYC_CountersResultMessage *res;
1788     struct GNUNET_MQ_Envelope *
1789       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1790     res->result_code = htonl (GNUNET_OK);
1791     res->max_message_id = GNUNET_htonll (mst->max_message_id);
1792
1793     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1794   }
1795
1796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797               "%p Client connected as master to channel %s.\n",
1798               mst, GNUNET_h2s (&chn->pub_key_hash));
1799
1800   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1801   cli->client = client;
1802   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1803
1804   GNUNET_SERVICE_client_continue (client);
1805 }
1806
1807
1808 static int
1809 check_client_slave_join (void *cls,
1810                          const struct SlaveJoinRequest *req)
1811 {
1812   return GNUNET_OK;
1813 }
1814
1815
1816 /**
1817  * Handle a connecting client joining as a channel slave.
1818  */
1819 static void
1820 handle_client_slave_join (void *cls,
1821                           const struct SlaveJoinRequest *req)
1822 {
1823   struct Client *c = cls;
1824   struct GNUNET_SERVICE_Client *client = c->client;
1825
1826   uint16_t req_size = ntohs (req->header.size);
1827
1828   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1829   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1830
1831   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1832   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1833   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1834
1835   struct GNUNET_CONTAINER_MultiHashMap *
1836     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1837   struct Slave *slv = NULL;
1838   struct Channel *chn;
1839
1840   if (NULL != chn_slv)
1841   {
1842     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1843   }
1844   if (NULL == slv)
1845   {
1846     slv = GNUNET_malloc (sizeof (*slv));
1847     slv->priv_key = req->slave_key;
1848     slv->pub_key = slv_pub_key;
1849     slv->pub_key_hash = slv_pub_hash;
1850     slv->origin = req->origin;
1851     slv->relay_count = ntohl (req->relay_count);
1852     slv->join_flags = ntohl (req->flags);
1853
1854     const struct GNUNET_PeerIdentity *
1855       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1856     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1857     uint16_t join_msg_size = 0;
1858
1859     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1860         <= req_size)
1861     {
1862       struct GNUNET_PSYC_Message *
1863         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1864       join_msg_size = ntohs (join_msg->header.size);
1865       slv->join_msg = GNUNET_malloc (join_msg_size);
1866       GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1867     }
1868     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1869     {
1870       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1871                   "%u + %u + %u != %u\n",
1872                   (unsigned int) sizeof (*req),
1873                   relay_size,
1874                   join_msg_size,
1875                   req_size);
1876       GNUNET_break (0);
1877       GNUNET_SERVICE_client_drop (client);
1878       GNUNET_free (slv);
1879       return;
1880     }
1881     if (0 < slv->relay_count)
1882     {
1883       slv->relays = GNUNET_malloc (relay_size);
1884       GNUNET_memcpy (slv->relays, &req[1], relay_size);
1885     }
1886
1887     chn = c->channel = &slv->channel;
1888     chn->slave = slv;
1889     chn->is_master = GNUNET_NO;
1890     chn->pub_key = req->channel_pub_key;
1891     chn->pub_key_hash = pub_key_hash;
1892     channel_init (chn);
1893
1894     if (NULL == chn_slv)
1895     {
1896       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1897       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1898                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1899     }
1900     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1901                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1902     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1903                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1904     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1905                                                   &store_recv_slave_counters, slv);
1906   }
1907   else
1908   {
1909     chn = &slv->channel;
1910
1911     struct GNUNET_PSYC_CountersResultMessage *res;
1912
1913     struct GNUNET_MQ_Envelope *
1914       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1915     res->result_code = htonl (GNUNET_OK);
1916     res->max_message_id = GNUNET_htonll (chn->max_message_id);
1917
1918     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1919
1920     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1921     {
1922       mcast_recv_join_decision (slv, GNUNET_YES,
1923                                 NULL, 0, NULL, NULL);
1924     }
1925     else if (NULL == slv->member)
1926     {
1927       slv->member
1928         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1929                                         &slv->origin,
1930                                         slv->relay_count, slv->relays,
1931                                         &slv->join_msg->header,
1932                                         &mcast_recv_join_request,
1933                                         &mcast_recv_join_decision,
1934                                         &mcast_recv_replay_fragment,
1935                                         &mcast_recv_replay_message,
1936                                         &mcast_recv_message, chn);
1937       if (NULL != slv->join_msg)
1938       {
1939         GNUNET_free (slv->join_msg);
1940         slv->join_msg = NULL;
1941       }
1942     }
1943     else if (NULL != slv->join_dcsn)
1944     {
1945       struct GNUNET_MQ_Envelope *
1946         env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1947       GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1948     }
1949   }
1950
1951   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1952               "%p Client connected as slave to channel %s.\n",
1953               slv, GNUNET_h2s (&chn->pub_key_hash));
1954
1955   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1956   cli->client = client;
1957   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1958
1959   GNUNET_SERVICE_client_continue (client);
1960 }
1961
1962
1963 struct JoinDecisionClosure
1964 {
1965   int32_t is_admitted;
1966   struct GNUNET_MessageHeader *msg;
1967 };
1968
1969
1970 /**
1971  * Iterator callback for sending join decisions to multicast.
1972  */
1973 static int
1974 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1975                           void *value)
1976 {
1977   struct JoinDecisionClosure *jcls = cls;
1978   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1979   // FIXME: add relays
1980   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1981   return GNUNET_YES;
1982 }
1983
1984
1985 static int
1986 check_client_join_decision (void *cls,
1987                             const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1988 {
1989   return GNUNET_OK;
1990 }
1991
1992
1993 /**
1994  * Join decision from client.
1995  */
1996 static void
1997 handle_client_join_decision (void *cls,
1998                              const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1999 {
2000   struct Client *c = cls;
2001   struct GNUNET_SERVICE_Client *client = c->client;
2002   struct Channel *chn = c->channel;
2003   if (NULL == chn)
2004   {
2005     GNUNET_break (0);
2006     GNUNET_SERVICE_client_drop (client);
2007     return;
2008   }
2009   GNUNET_assert (GNUNET_YES == chn->is_master);
2010   struct Master *mst = chn->master;
2011
2012   struct JoinDecisionClosure jcls;
2013   jcls.is_admitted = ntohl (dcsn->is_admitted);
2014   jcls.msg
2015     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2016     ? (struct GNUNET_MessageHeader *) &dcsn[1]
2017     : NULL;
2018
2019   struct GNUNET_HashCode slave_pub_hash;
2020   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2021                       &slave_pub_hash);
2022
2023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024               "%p Got join decision (%d) from client for channel %s..\n",
2025               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027               "%p ..and slave %s.\n",
2028               mst, GNUNET_h2s (&slave_pub_hash));
2029
2030   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2031                                               &mcast_send_join_decision, &jcls);
2032   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2033   GNUNET_SERVICE_client_continue (client);
2034 }
2035
2036
2037 /**
2038  * Send acknowledgement to a client.
2039  *
2040  * Sent after a message fragment has been passed on to multicast.
2041  *
2042  * @param chn The channel struct for the client.
2043  */
2044 static void
2045 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2046 {
2047   struct GNUNET_MessageHeader *res;
2048   struct GNUNET_MQ_Envelope *
2049       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2050
2051   /* FIXME? */
2052   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2053 }
2054
2055
2056 /**
2057  * Callback for the transmit functions of multicast.
2058  */
2059 static int
2060 transmit_notify (void *cls, size_t *data_size, void *data)
2061 {
2062   struct Channel *chn = cls;
2063   struct TransmitMessage *tmit_msg = chn->tmit_head;
2064
2065   if (NULL == tmit_msg || *data_size < tmit_msg->size)
2066   {
2067     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068                 "%p transmit_notify: nothing to send.\n", chn);
2069     if (NULL != tmit_msg && *data_size < tmit_msg->size)
2070       GNUNET_break (0);
2071     *data_size = 0;
2072     return GNUNET_NO;
2073   }
2074
2075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2076               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2077
2078   *data_size = tmit_msg->size;
2079   GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2080
2081   int ret
2082     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2083     ? GNUNET_NO
2084     : GNUNET_YES;
2085
2086   /* FIXME: handle disconnecting clients */
2087   if (NULL != tmit_msg->client)
2088     send_message_ack (chn, tmit_msg->client);
2089
2090   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2091
2092   if (NULL != chn->tmit_head)
2093   {
2094     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2095   }
2096   else if (GNUNET_YES == chn->is_disconnected
2097            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2098   {
2099     /* FIXME: handle partial message (when still in_transmit) */
2100     GNUNET_free (tmit_msg);
2101     return GNUNET_SYSERR;
2102   }
2103   GNUNET_free (tmit_msg);
2104   return ret;
2105 }
2106
2107
2108 /**
2109  * Callback for the transmit functions of multicast.
2110  */
2111 static int
2112 master_transmit_notify (void *cls, size_t *data_size, void *data)
2113 {
2114   int ret = transmit_notify (cls, data_size, data);
2115
2116   if (GNUNET_YES == ret)
2117   {
2118     struct Master *mst = cls;
2119     mst->tmit_handle = NULL;
2120   }
2121   return ret;
2122 }
2123
2124
2125 /**
2126  * Callback for the transmit functions of multicast.
2127  */
2128 static int
2129 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2130 {
2131   int ret = transmit_notify (cls, data_size, data);
2132
2133   if (GNUNET_YES == ret)
2134   {
2135     struct Slave *slv = cls;
2136     slv->tmit_handle = NULL;
2137   }
2138   return ret;
2139 }
2140
2141
2142 /**
2143  * Transmit a message from a channel master to the multicast group.
2144  */
2145 static void
2146 master_transmit_message (struct Master *mst)
2147 {
2148   struct Channel *chn = &mst->channel;
2149   struct TransmitMessage *tmit_msg = chn->tmit_head;
2150   if (NULL == tmit_msg)
2151     return;
2152   if (NULL == mst->tmit_handle)
2153   {
2154     mst->tmit_handle = (void *) &mst->tmit_handle;
2155     struct GNUNET_MULTICAST_OriginTransmitHandle *
2156       tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2157                                                     mst->max_group_generation,
2158                                                     master_transmit_notify, mst);
2159     if (NULL != mst->tmit_handle)
2160       mst->tmit_handle = tmit_handle;
2161   }
2162   else
2163   {
2164     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2165   }
2166 }
2167
2168
2169 /**
2170  * Transmit a message from a channel slave to the multicast group.
2171  */
2172 static void
2173 slave_transmit_message (struct Slave *slv)
2174 {
2175   if (NULL == slv->channel.tmit_head)
2176     return;
2177   if (NULL == slv->tmit_handle)
2178   {
2179     slv->tmit_handle = (void *) &slv->tmit_handle;
2180     struct GNUNET_MULTICAST_MemberTransmitHandle *
2181       tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->channel.tmit_head->id,
2182                                                        slave_transmit_notify, slv);
2183     if (NULL != slv->tmit_handle)
2184       slv->tmit_handle = tmit_handle;
2185   }
2186   else
2187   {
2188     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2189   }
2190 }
2191
2192
2193 static void
2194 transmit_message (struct Channel *chn)
2195 {
2196   chn->is_master
2197     ? master_transmit_message (chn->master)
2198     : slave_transmit_message (chn->slave);
2199 }
2200
2201
2202 /**
2203  * Queue a message from a channel master for sending to the multicast group.
2204  */
2205 static void
2206 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2207 {
2208   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2209
2210   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2211   {
2212     tmit_msg->id = ++mst->max_message_id;
2213     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2214                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2215                 mst, tmit_msg->id);
2216     struct GNUNET_PSYC_MessageMethod *pmeth
2217       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2218
2219     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2220     {
2221       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2222     }
2223     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2224     {
2225       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2226                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2227                   mst, tmit_msg->id - mst->max_state_message_id);
2228       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2229                                           - mst->max_state_message_id);
2230       mst->max_state_message_id = tmit_msg->id;
2231     }
2232     else
2233     {
2234         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2235                     "%p master_queue_message: state not modified\n", mst);
2236       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2237     }
2238
2239     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2240     {
2241       /// @todo add state_hash to PSYC header
2242     }
2243   }
2244 }
2245
2246
2247 /**
2248  * Queue a message from a channel slave for sending to the multicast group.
2249  */
2250 static void
2251 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2252 {
2253   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2254   {
2255     struct GNUNET_PSYC_MessageMethod *pmeth
2256       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2257     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2258     tmit_msg->id = ++slv->max_request_id;
2259   }
2260 }
2261
2262
2263 /**
2264  * Queue PSYC message parts for sending to multicast.
2265  *
2266  * @param chn
2267  *        Channel to send to.
2268  * @param client
2269  *        Client the message originates from.
2270  * @param data_size
2271  *        Size of @a data.
2272  * @param data
2273  *        Concatenated message parts.
2274  * @param first_ptype
2275  *        First message part type in @a data.
2276  * @param last_ptype
2277  *        Last message part type in @a data.
2278  */
2279 static struct TransmitMessage *
2280 queue_message (struct Channel *chn,
2281                struct GNUNET_SERVICE_Client *client,
2282                size_t data_size,
2283                const void *data,
2284                uint16_t first_ptype, uint16_t last_ptype)
2285 {
2286   struct TransmitMessage *
2287     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2288   GNUNET_memcpy (&tmit_msg[1], data, data_size);
2289   tmit_msg->client = client;
2290   tmit_msg->size = data_size;
2291   tmit_msg->first_ptype = first_ptype;
2292   tmit_msg->last_ptype = last_ptype;
2293
2294   /* FIXME: separate queue per message ID */
2295
2296   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2297
2298   chn->is_master
2299     ? master_queue_message (chn->master, tmit_msg)
2300     : slave_queue_message (chn->slave, tmit_msg);
2301   return tmit_msg;
2302 }
2303
2304
2305 /**
2306  * Cancel transmission of current message.
2307  *
2308  * @param chn     Channel to send to.
2309  * @param client  Client the message originates from.
2310  */
2311 static void
2312 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2313 {
2314   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2315
2316   struct GNUNET_MessageHeader msg;
2317   msg.size = htons (sizeof (msg));
2318   msg.type = htons (type);
2319
2320   queue_message (chn, client, sizeof (msg), &msg, type, type);
2321   transmit_message (chn);
2322
2323   /* FIXME: cleanup */
2324 }
2325
2326
2327 static int
2328 check_client_psyc_message (void *cls,
2329                            const struct GNUNET_MessageHeader *msg)
2330 {
2331   return GNUNET_OK;
2332 }
2333
2334
2335 /**
2336  * Incoming message from a master or slave client.
2337  */
2338 static void
2339 handle_client_psyc_message (void *cls,
2340                             const struct GNUNET_MessageHeader *msg)
2341 {
2342   struct Client *c = cls;
2343   struct GNUNET_SERVICE_Client *client = c->client;
2344   struct Channel *chn = c->channel;
2345   if (NULL == chn)
2346   {
2347     GNUNET_break (0);
2348     GNUNET_SERVICE_client_drop (client);
2349     return;
2350   }
2351
2352   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2353               "%p Received message from client.\n", chn);
2354   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2355
2356   if (GNUNET_YES != chn->is_ready)
2357   {
2358     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2359                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2360     GNUNET_break (0);
2361     GNUNET_SERVICE_client_drop (client);
2362     return;
2363   }
2364
2365   uint16_t size = ntohs (msg->size);
2366   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2367   {
2368     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2369                 "%p Message payload too large: %u < %u.\n",
2370                 chn,
2371                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2372                 (unsigned int) (size - sizeof (*msg)));
2373     GNUNET_break (0);
2374     transmit_cancel (chn, client);
2375     GNUNET_SERVICE_client_drop (client);
2376     return;
2377   }
2378
2379   uint16_t first_ptype = 0, last_ptype = 0;
2380   if (GNUNET_SYSERR
2381       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2382                                           (const char *) &msg[1],
2383                                           &first_ptype, &last_ptype))
2384   {
2385     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2386                 "%p Received invalid message part from client.\n", chn);
2387     GNUNET_break (0);
2388     transmit_cancel (chn, client);
2389     GNUNET_SERVICE_client_drop (client);
2390     return;
2391   }
2392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2393               "%p Received message with first part type %u and last part type %u.\n",
2394               chn, first_ptype, last_ptype);
2395
2396   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2397                  first_ptype, last_ptype);
2398   transmit_message (chn);
2399   /* FIXME: send a few ACKs even before transmit_notify is called */
2400
2401   GNUNET_SERVICE_client_continue (client);
2402 };
2403
2404
2405 /**
2406  * Received result of GNUNET_PSYCSTORE_membership_store()
2407  */
2408 static void
2409 store_recv_membership_store_result (void *cls,
2410                                     int64_t result,
2411                                     const char *err_msg,
2412                                     uint16_t err_msg_size)
2413 {
2414   struct Operation *op = cls;
2415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2416               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2417               op->channel,
2418               result,
2419               (int) err_msg_size,
2420               err_msg);
2421
2422   if (NULL != op->client)
2423     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2424   op_remove (op);
2425 }
2426
2427
2428 /**
2429  * Client requests to add/remove a slave in the membership database.
2430  */
2431 static void
2432 handle_client_membership_store (void *cls,
2433                                 const struct ChannelMembershipStoreRequest *req)
2434 {
2435   struct Client *c = cls;
2436   struct GNUNET_SERVICE_Client *client = c->client;
2437   struct Channel *chn = c->channel;
2438   if (NULL == chn)
2439   {
2440     GNUNET_break (0);
2441     GNUNET_SERVICE_client_drop (client);
2442     return;
2443   }
2444
2445   struct Operation *op = op_add (chn, client, req->op_id, 0);
2446
2447   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2448   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2450               "%p Received membership store request from client.\n", chn);
2451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2452               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2453               chn, req->did_join, announced_at, effective_since);
2454
2455   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2456                                      req->did_join, announced_at, effective_since,
2457                                      0, /* FIXME: group_generation */
2458                                      &store_recv_membership_store_result, op);
2459   GNUNET_SERVICE_client_continue (client);
2460 }
2461
2462
2463 /**
2464  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2465  * in response to a history request from a client.
2466  */
2467 static int
2468 store_recv_fragment_history (void *cls,
2469                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2470                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2471 {
2472   struct Operation *op = cls;
2473   if (NULL == op->client)
2474   { /* Requesting client already disconnected. */
2475     return GNUNET_NO;
2476   }
2477   struct Channel *chn = op->channel;
2478
2479   struct GNUNET_PSYC_MessageHeader *pmsg;
2480   uint16_t msize = ntohs (mmsg->header.size);
2481   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2482
2483   struct GNUNET_OperationResultMessage *
2484     res = GNUNET_malloc (sizeof (*res) + psize);
2485   res->header.size = htons (sizeof (*res) + psize);
2486   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2487   res->op_id = op->op_id;
2488   res->result_code = GNUNET_htonll (GNUNET_OK);
2489
2490   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2491   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2492   GNUNET_memcpy (&res[1], pmsg, psize);
2493
2494   /** @todo FIXME: send only to requesting client */
2495   client_send_msg (chn, &res->header);
2496
2497   GNUNET_free (res);
2498   return GNUNET_YES;
2499 }
2500
2501
2502 /**
2503  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2504  * in response to a history request from a client.
2505  */
2506 static void
2507 store_recv_fragment_history_result (void *cls, int64_t result,
2508                                     const char *err_msg, uint16_t err_msg_size)
2509 {
2510   struct Operation *op = cls;
2511   if (NULL == op->client)
2512   { /* Requesting client already disconnected. */
2513     return;
2514   }
2515
2516   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2517               "%p History replay #%" PRIu64 ": "
2518               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2519               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2520
2521   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2522   {
2523     /** @todo Multicast replay request for messages not found locally. */
2524   }
2525
2526   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2527   op_remove (op);
2528 }
2529
2530
2531 static int
2532 check_client_history_replay (void *cls,
2533                              const struct GNUNET_PSYC_HistoryRequestMessage *req)
2534 {
2535   return GNUNET_OK;
2536 }
2537
2538
2539 /**
2540  * Client requests channel history.
2541  */
2542 static void
2543 handle_client_history_replay (void *cls,
2544                               const struct GNUNET_PSYC_HistoryRequestMessage *req)
2545 {
2546   struct Client *c = cls;
2547   struct GNUNET_SERVICE_Client *client = c->client;
2548   struct Channel *chn = c->channel;
2549   if (NULL == chn)
2550   {
2551     GNUNET_break (0);
2552     GNUNET_SERVICE_client_drop (client);
2553     return;
2554   }
2555
2556   uint16_t size = ntohs (req->header.size);
2557   const char *method_prefix = (const char *) &req[1];
2558
2559   if (size < sizeof (*req) + 1
2560       || '\0' != method_prefix[size - sizeof (*req) - 1])
2561   {
2562     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2563                 "%p History replay #%" PRIu64 ": "
2564                 "invalid method prefix. size: %u < %u?\n",
2565                 chn,
2566                 GNUNET_ntohll (req->op_id),
2567                 size,
2568                 (unsigned int) sizeof (*req) + 1);
2569     GNUNET_break (0);
2570     GNUNET_SERVICE_client_drop (client);
2571     return;
2572   }
2573
2574   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2575
2576   if (0 == req->message_limit)
2577   {
2578     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2579                                   GNUNET_ntohll (req->start_message_id),
2580                                   GNUNET_ntohll (req->end_message_id),
2581                                   0, method_prefix,
2582                                   &store_recv_fragment_history,
2583                                   &store_recv_fragment_history_result, op);
2584   }
2585   else
2586   {
2587     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2588                                          GNUNET_ntohll (req->message_limit),
2589                                          method_prefix,
2590                                          &store_recv_fragment_history,
2591                                          &store_recv_fragment_history_result,
2592                                          op);
2593   }
2594   GNUNET_SERVICE_client_continue (client);
2595 }
2596
2597
2598 /**
2599  * Received state var from PSYCstore, send it to client.
2600  */
2601 static int
2602 store_recv_state_var (void *cls, const char *name,
2603                       const void *value, uint32_t value_size)
2604 {
2605   struct Operation *op = cls;
2606   struct GNUNET_OperationResultMessage *res;
2607   struct GNUNET_MQ_Envelope *env;
2608
2609   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2610               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2611               op->channel, GNUNET_ntohll (op->op_id), name);
2612
2613   if (NULL != name) /* First part */
2614   {
2615     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2616     struct GNUNET_PSYC_MessageModifier *mod;
2617     env = GNUNET_MQ_msg_extra (res,
2618                                sizeof (*mod) + name_size + value_size,
2619                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2620     res->op_id = op->op_id;
2621
2622     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2623     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2624     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2625     mod->name_size = htons (name_size);
2626     mod->value_size = htonl (value_size);
2627     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2628     GNUNET_memcpy (&mod[1], name, name_size);
2629     GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2630   }
2631   else /* Continuation */
2632   {
2633     struct GNUNET_MessageHeader *mod;
2634     env = GNUNET_MQ_msg_extra (res,
2635                                sizeof (*mod) + value_size,
2636                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2637     res->op_id = op->op_id;
2638
2639     mod = (struct GNUNET_MessageHeader *) &res[1];
2640     mod->size = htons (sizeof (*mod) + value_size);
2641     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2642     GNUNET_memcpy (&mod[1], value, value_size);
2643   }
2644
2645   // FIXME: client might have been disconnected
2646   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2647   return GNUNET_YES;
2648 }
2649
2650
2651 /**
2652  * Received result of GNUNET_PSYCSTORE_state_get()
2653  * or GNUNET_PSYCSTORE_state_get_prefix()
2654  */
2655 static void
2656 store_recv_state_result (void *cls, int64_t result,
2657                          const char *err_msg, uint16_t err_msg_size)
2658 {
2659   struct Operation *op = cls;
2660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2661               "%p state_get #%" PRIu64 ": "
2662               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2663               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2664
2665   // FIXME: client might have been disconnected
2666   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2667   op_remove (op);
2668 }
2669
2670
2671 static int
2672 check_client_state_get (void *cls,
2673                          const struct StateRequest *req)
2674 {
2675   struct Client *c = cls;
2676   struct Channel *chn = c->channel;
2677   if (NULL == chn)
2678   {
2679     GNUNET_break (0);
2680     return GNUNET_SYSERR;
2681   }
2682
2683   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2684   const char *name = (const char *) &req[1];
2685   if (0 == name_size || '\0' != name[name_size - 1])
2686   {
2687     GNUNET_break (0);
2688     return GNUNET_SYSERR;
2689   }
2690
2691   return GNUNET_OK;
2692 }
2693
2694
2695 /**
2696  * Client requests best matching state variable from PSYCstore.
2697  */
2698 static void
2699 handle_client_state_get (void *cls,
2700                          const struct StateRequest *req)
2701 {
2702   struct Client *c = cls;
2703   struct GNUNET_SERVICE_Client *client = c->client;
2704   struct Channel *chn = c->channel;
2705
2706   const char *name = (const char *) &req[1];
2707   struct Operation *op = op_add (chn, client, req->op_id, 0);
2708   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2709                               &store_recv_state_var,
2710                               &store_recv_state_result, op);
2711   GNUNET_SERVICE_client_continue (client);
2712 }
2713
2714
2715 static int
2716 check_client_state_get_prefix (void *cls,
2717                                const struct StateRequest *req)
2718 {
2719   struct Client *c = cls;
2720   struct Channel *chn = c->channel;
2721   if (NULL == chn)
2722   {
2723     GNUNET_break (0);
2724     return GNUNET_SYSERR;
2725   }
2726
2727   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2728   const char *name = (const char *) &req[1];
2729   if (0 == name_size || '\0' != name[name_size - 1])
2730   {
2731     GNUNET_break (0);
2732     return GNUNET_SYSERR;
2733   }
2734
2735   return GNUNET_OK;
2736 }
2737
2738
2739 /**
2740  * Client requests state variables with a given prefix from PSYCstore.
2741  */
2742 static void
2743 handle_client_state_get_prefix (void *cls,
2744                                 const struct StateRequest *req)
2745 {
2746   struct Client *c = cls;
2747   struct GNUNET_SERVICE_Client *client = c->client;
2748   struct Channel *chn = c->channel;
2749
2750   const char *name = (const char *) &req[1];
2751   struct Operation *op = op_add (chn, client, req->op_id, 0);
2752   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2753                                      &store_recv_state_var,
2754                                      &store_recv_state_result, op);
2755   GNUNET_SERVICE_client_continue (client);
2756 }
2757
2758
2759 /**
2760  * Initialize the PSYC service.
2761  *
2762  * @param cls Closure.
2763  * @param server The initialized server.
2764  * @param c Configuration to use.
2765  */
2766 static void
2767 run (void *cls,
2768      const struct GNUNET_CONFIGURATION_Handle *c,
2769      struct GNUNET_SERVICE_Handle *svc)
2770 {
2771   cfg = c;
2772   service = svc;
2773   store = GNUNET_PSYCSTORE_connect (cfg);
2774   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2775   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2776   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2777   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2778   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2779   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2780 }
2781
2782
2783 /**
2784  * Define "main" method using service macro.
2785  */
2786 GNUNET_SERVICE_MAIN
2787 ("psyc",
2788  GNUNET_SERVICE_OPTION_NONE,
2789  run,
2790  client_notify_connect,
2791  client_notify_disconnect,
2792  NULL,
2793  GNUNET_MQ_hd_fixed_size (client_master_start,
2794                           GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2795                           struct MasterStartRequest,
2796                           NULL),
2797  GNUNET_MQ_hd_var_size (client_slave_join,
2798                         GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2799                         struct SlaveJoinRequest,
2800                         NULL),
2801  GNUNET_MQ_hd_var_size (client_join_decision,
2802                         GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2803                         struct GNUNET_PSYC_JoinDecisionMessage,
2804                         NULL),
2805  GNUNET_MQ_hd_var_size (client_psyc_message,
2806                         GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2807                         struct GNUNET_MessageHeader,
2808                         NULL),
2809  GNUNET_MQ_hd_fixed_size (client_membership_store,
2810                           GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2811                           struct ChannelMembershipStoreRequest,
2812                           NULL),
2813  GNUNET_MQ_hd_var_size (client_history_replay,
2814                         GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2815                         struct GNUNET_PSYC_HistoryRequestMessage,
2816                         NULL),
2817  GNUNET_MQ_hd_var_size (client_state_get,
2818                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2819                         struct StateRequest,
2820                         NULL),
2821  GNUNET_MQ_hd_var_size (client_state_get_prefix,
2822                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2823                         struct StateRequest,
2824                         NULL));
2825
2826 /* end of gnunet-service-psyc.c */