consensus: destroy set handles
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Service handle.
48  */
49 static struct GNUNET_SERVICE_Handle *service;
50
51 /**
52  * Handle to the statistics service.
53  */
54 static struct GNUNET_STATISTICS_Handle *stats;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVICE_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct ClientList
189 {
190   struct ClientList *prev;
191   struct ClientList *next;
192
193   struct GNUNET_SERVICE_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVICE_Client *client;
203   struct Channel *channel;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct ClientList *clients_head;
215   struct ClientList *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this channel ready to receive messages from client?
274    * #GNUNET_YES or #GNUNET_NO
275    */
276   uint8_t is_ready;
277
278   /**
279    * Is the client disconnected?
280    * #GNUNET_YES or #GNUNET_NO
281    */
282   uint8_t is_disconnected;
283
284   /**
285    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
286    */
287   uint8_t is_master;
288
289   union {
290     struct Master *master;
291     struct Slave *slave;
292   };
293 };
294
295
296 /**
297  * Client context for a channel master.
298  */
299 struct Master
300 {
301   /**
302    * Channel struct common for Master and Slave
303    */
304   struct Channel channel;
305
306   /**
307    * Private key of the channel.
308    */
309   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
310
311   /**
312    * Handle for the multicast origin.
313    */
314   struct GNUNET_MULTICAST_Origin *origin;
315
316   /**
317    * Transmit handle for multicast.
318    */
319   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
320
321   /**
322    * Incoming join requests from multicast.
323    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
324    */
325   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
326
327   /**
328    * Last message ID transmitted to this channel.
329    *
330    * Incremented before sending a message, thus the message_id in messages sent
331    * starts from 1.
332    */
333   uint64_t max_message_id;
334
335   /**
336    * ID of the last message with state operations transmitted to the channel.
337    * 0 if there is no such message.
338    */
339   uint64_t max_state_message_id;
340
341   /**
342    * Maximum group generation transmitted to the channel.
343    */
344   uint64_t max_group_generation;
345
346   /**
347    * @see enum GNUNET_PSYC_Policy
348    */
349   enum GNUNET_PSYC_Policy policy;
350 };
351
352
353 /**
354  * Client context for a channel slave.
355  */
356 struct Slave
357 {
358   /**
359    * Channel struct common for Master and Slave
360    */
361   struct Channel channel;
362
363   /**
364    * Private key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
367
368   /**
369    * Public key of the slave.
370    */
371   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
372
373   /**
374    * Hash of @a pub_key.
375    */
376   struct GNUNET_HashCode pub_key_hash;
377
378   /**
379    * Handle for the multicast member.
380    */
381   struct GNUNET_MULTICAST_Member *member;
382
383   /**
384    * Transmit handle for multicast.
385    */
386   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
387
388   /**
389    * Peer identity of the origin.
390    */
391   struct GNUNET_PeerIdentity origin;
392
393   /**
394    * Number of items in @a relays.
395    */
396   uint32_t relay_count;
397
398   /**
399    * Relays that multicast can use to connect.
400    */
401   struct GNUNET_PeerIdentity *relays;
402
403   /**
404    * Join request to be transmitted to the master on join.
405    */
406   struct GNUNET_PSYC_Message *join_msg;
407
408   /**
409    * Join decision received from multicast.
410    */
411   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
412
413   /**
414    * Maximum request ID for this channel.
415    */
416   uint64_t max_request_id;
417
418   /**
419    * Join flags.
420    */
421   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
422 };
423
424
425 /**
426  * Client context.
427  */
428 struct Client {
429   struct GNUNET_SERVICE_Client *client;
430   struct Channel *channel;
431 };
432
433
434 struct ReplayRequestKey
435 {
436   uint64_t fragment_id;
437   uint64_t message_id;
438   uint64_t fragment_offset;
439   uint64_t flags;
440 };
441
442
443 static void
444 transmit_message (struct Channel *chn);
445
446 static uint64_t
447 message_queue_run (struct Channel *chn);
448
449 static uint64_t
450 message_queue_drop (struct Channel *chn);
451
452
453 static void
454 schedule_transmit_message (void *cls)
455 {
456   struct Channel *chn = cls;
457
458   transmit_message (chn);
459 }
460
461
462 /**
463  * Task run during shutdown.
464  *
465  * @param cls unused
466  */
467 static void
468 shutdown_task (void *cls)
469 {
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "shutting down...\n");
472   GNUNET_PSYCSTORE_disconnect (store);
473   if (NULL != stats)
474   {
475     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
476     stats = NULL;
477   }
478 }
479
480
481 static struct Operation *
482 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
483         uint64_t op_id, uint32_t flags)
484 {
485   struct Operation *op = GNUNET_malloc (sizeof (*op));
486   op->client = client;
487   op->channel = chn;
488   op->op_id = op_id;
489   op->flags = flags;
490   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
491   return op;
492 }
493
494
495 static void
496 op_remove (struct Operation *op)
497 {
498   GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
499   GNUNET_free (op);
500 }
501
502
503 /**
504  * Clean up master data structures after a client disconnected.
505  */
506 static void
507 cleanup_master (struct Master *mst)
508 {
509   struct Channel *chn = &mst->channel;
510
511   if (NULL != mst->origin)
512     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
513   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
514   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
515 }
516
517
518 /**
519  * Clean up slave data structures after a client disconnected.
520  */
521 static void
522 cleanup_slave (struct Slave *slv)
523 {
524   struct Channel *chn = &slv->channel;
525   struct GNUNET_CONTAINER_MultiHashMap *
526     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
527                                                 &chn->pub_key_hash);
528   GNUNET_assert (NULL != chn_slv);
529   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
530
531   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
532   {
533     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
534                                           chn_slv);
535     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
536   }
537   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
538
539   if (NULL != slv->join_msg)
540   {
541     GNUNET_free (slv->join_msg);
542     slv->join_msg = NULL;
543   }
544   if (NULL != slv->relays)
545   {
546     GNUNET_free (slv->relays);
547     slv->relays = NULL;
548   }
549   if (NULL != slv->member)
550   {
551     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
552     slv->member = NULL;
553   }
554   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
555 }
556
557
558 /**
559  * Clean up channel data structures after a client disconnected.
560  */
561 static void
562 cleanup_channel (struct Channel *chn)
563 {
564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565               "%p Cleaning up channel %s. master? %u\n",
566               chn,
567               GNUNET_h2s (&chn->pub_key_hash),
568               chn->is_master);
569   message_queue_drop (chn);
570   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
571   chn->recv_frags = NULL;
572
573   if (NULL != chn->store_op)
574   {
575     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
576     chn->store_op = NULL;
577   }
578
579   (GNUNET_YES == chn->is_master)
580     ? cleanup_master (chn->master)
581     : cleanup_slave (chn->slave);
582   GNUNET_free (chn);
583 }
584
585
586 /**
587  * Called whenever a client is disconnected.
588  * Frees our resources associated with that client.
589  *
590  * @param cls closure
591  * @param client identification of the client
592  * @param app_ctx must match @a client
593  */
594 static void
595 client_notify_disconnect (void *cls,
596                           struct GNUNET_SERVICE_Client *client,
597                           void *app_ctx)
598 {
599   struct Client *c = app_ctx;
600   struct Channel *chn = c->channel;
601   GNUNET_free (c);
602
603   if (NULL == chn)
604   {
605     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606                 "%p User context is NULL in client_disconnect()\n",
607                 chn);
608     GNUNET_break (0);
609     return;
610   }
611
612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613               "%p Client (%s) disconnected from channel %s\n",
614               chn,
615               (GNUNET_YES == chn->is_master) ? "master" : "slave",
616               GNUNET_h2s (&chn->pub_key_hash));
617
618   struct ClientList *cli = chn->clients_head;
619   while (NULL != cli)
620   {
621     if (cli->client == client)
622     {
623       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
624       GNUNET_free (cli);
625       break;
626     }
627     cli = cli->next;
628   }
629
630   struct Operation *op = chn->op_head;
631   while (NULL != op)
632   {
633     if (op->client == client)
634     {
635       op->client = NULL;
636       break;
637     }
638     op = op->next;
639   }
640
641   if (NULL == chn->clients_head)
642   { /* Last client disconnected. */
643     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644                 "%p Last client (%s) disconnected from channel %s\n",
645                 chn,
646                 (GNUNET_YES == chn->is_master) ? "master" : "slave",
647                 GNUNET_h2s (&chn->pub_key_hash));
648     chn->is_disconnected = GNUNET_YES;
649     if (NULL != chn->tmit_head)
650     { /* Send pending messages to multicast before cleanup. */
651       transmit_message (chn);
652     }
653     else
654     {
655       cleanup_channel (chn);
656     }
657   }
658 }
659
660
661 /**
662  * A new client connected.
663  *
664  * @param cls NULL
665  * @param client client to add
666  * @param mq message queue for @a client
667  * @return @a client
668  */
669 static void *
670 client_notify_connect (void *cls,
671                        struct GNUNET_SERVICE_Client *client,
672                        struct GNUNET_MQ_Handle *mq)
673 {
674   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
675
676   struct Client *c = GNUNET_malloc (sizeof (*c));
677   c->client = client;
678
679   return c;
680 }
681
682
683 /**
684  * Send message to all clients connected to the channel.
685  */
686 static void
687 client_send_msg (const struct Channel *chn,
688                  const struct GNUNET_MessageHeader *msg)
689 {
690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691               "%p Sending message to clients.\n",
692               chn);
693
694   struct ClientList *cli = chn->clients_head;
695   while (NULL != cli)
696   {
697     struct GNUNET_MQ_Envelope *
698       env = GNUNET_MQ_msg_copy (msg);
699
700     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
701                     env);
702
703     cli = cli->next;
704   }
705 }
706
707
708 /**
709  * Send a result code back to the client.
710  *
711  * @param client
712  *        Client that should receive the result code.
713  * @param result_code
714  *        Code to transmit.
715  * @param op_id
716  *        Operation ID in network byte order.
717  * @param data
718  *        Data payload or NULL.
719  * @param data_size
720  *        Size of @a data.
721  */
722 static void
723 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
724                     int64_t result_code, const void *data, uint16_t data_size)
725 {
726   struct GNUNET_OperationResultMessage *res;
727   struct GNUNET_MQ_Envelope *
728     env = GNUNET_MQ_msg_extra (res,
729                                data_size,
730                                GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
731   res->result_code = GNUNET_htonll (result_code);
732   res->op_id = op_id;
733   if (0 < data_size)
734     GNUNET_memcpy (&res[1], data, data_size);
735
736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737               "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
738               client,
739               GNUNET_ntohll (op_id),
740               result_code,
741               data_size);
742
743   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
744 }
745
746
747 /**
748  * Closure for join_mem_test_cb()
749  */
750 struct JoinMemTestClosure
751 {
752   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
753   struct Channel *channel;
754   struct GNUNET_MULTICAST_JoinHandle *join_handle;
755   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
756 };
757
758
759 /**
760  * Membership test result callback used for join requests.
761  */
762 static void
763 join_mem_test_cb (void *cls, int64_t result,
764                   const char *err_msg, uint16_t err_msg_size)
765 {
766   struct JoinMemTestClosure *jcls = cls;
767
768   if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
769   { /* Pass on join request to client if this is a master channel */
770     struct Master *mst = jcls->channel->master;
771     struct GNUNET_HashCode slave_pub_hash;
772     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
773                         &slave_pub_hash);
774     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
775                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
776     client_send_msg (jcls->channel, &jcls->join_msg->header);
777   }
778   else
779   {
780     if (GNUNET_SYSERR == result)
781     {
782       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
783                   "Could not perform membership test (%.*s)\n",
784                   err_msg_size, err_msg);
785     }
786     // FIXME: add relays
787     GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
788   }
789   GNUNET_free (jcls->join_msg);
790   GNUNET_free (jcls);
791 }
792
793
794 /**
795  * Incoming join request from multicast.
796  */
797 static void
798 mcast_recv_join_request (void *cls,
799                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
800                          const struct GNUNET_MessageHeader *join_msg,
801                          struct GNUNET_MULTICAST_JoinHandle *jh)
802 {
803   struct Channel *chn = cls;
804   uint16_t join_msg_size = 0;
805
806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807               "%p Got join request.\n",
808               chn);
809   if (NULL != join_msg)
810   {
811     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
812     {
813       join_msg_size = ntohs (join_msg->size);
814     }
815     else
816     {
817       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
818                   "%p Got join message with invalid type %u.\n",
819                   chn,
820                   ntohs (join_msg->type));
821     }
822   }
823
824   struct GNUNET_PSYC_JoinRequestMessage *
825     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
826   req->header.size = htons (sizeof (*req) + join_msg_size);
827   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
828   req->slave_pub_key = *slave_pub_key;
829   if (0 < join_msg_size)
830     GNUNET_memcpy (&req[1], join_msg, join_msg_size);
831
832   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
833   jcls->slave_pub_key = *slave_pub_key;
834   jcls->channel = chn;
835   jcls->join_handle = jh;
836   jcls->join_msg = req;
837
838   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
839                                     chn->max_message_id, 0,
840                                     &join_mem_test_cb, jcls);
841 }
842
843
844 /**
845  * Join decision received from multicast.
846  */
847 static void
848 mcast_recv_join_decision (void *cls, int is_admitted,
849                           const struct GNUNET_PeerIdentity *peer,
850                           uint16_t relay_count,
851                           const struct GNUNET_PeerIdentity *relays,
852                           const struct GNUNET_MessageHeader *join_resp)
853 {
854   struct Slave *slv = cls;
855   struct Channel *chn = &slv->channel;
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "%p Got join decision: %d\n",
858               slv,
859               is_admitted);
860   if (GNUNET_YES == chn->is_ready)
861   {
862     /* Already admitted */
863     return;
864   }
865
866   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
867   struct GNUNET_PSYC_JoinDecisionMessage *
868     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
869   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
870   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
871   dcsn->is_admitted = htonl (is_admitted);
872   if (0 < join_resp_size)
873     GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
874
875   client_send_msg (chn, &dcsn->header);
876
877   if (GNUNET_YES == is_admitted
878       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
879   {
880     chn->is_ready = GNUNET_YES;
881   }
882 }
883
884
885 static int
886 store_recv_fragment_replay (void *cls,
887                             struct GNUNET_MULTICAST_MessageHeader *msg,
888                             enum GNUNET_PSYCSTORE_MessageFlags flags)
889 {
890   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
891
892   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
893   return GNUNET_YES;
894 }
895
896
897 /**
898  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
899  */
900 static void
901 store_recv_fragment_replay_result (void *cls,
902                                    int64_t result,
903                                    const char *err_msg,
904                                    uint16_t err_msg_size)
905 {
906   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
907
908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
910               rh,
911               result,
912               err_msg_size,
913               err_msg);
914   switch (result)
915   {
916   case GNUNET_YES:
917     break;
918
919   case GNUNET_NO:
920     GNUNET_MULTICAST_replay_response (rh, NULL,
921                                       GNUNET_MULTICAST_REC_NOT_FOUND);
922     return;
923
924   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
925     GNUNET_MULTICAST_replay_response (rh, NULL,
926                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
927     return;
928
929   case GNUNET_SYSERR:
930     GNUNET_MULTICAST_replay_response (rh, NULL,
931                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
932     return;
933   }
934   /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
935    * an error code, so it must be ensured no further processing
936    * is attempted on 'rh'. Maybe this should be refactored as
937    * it doesn't look very intuitive.    --lynX
938    */
939   GNUNET_MULTICAST_replay_response_end (rh);
940 }
941
942
943 /**
944  * Incoming fragment replay request from multicast.
945  */
946 static void
947 mcast_recv_replay_fragment (void *cls,
948                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
949                             uint64_t fragment_id, uint64_t flags,
950                             struct GNUNET_MULTICAST_ReplayHandle *rh)
951
952 {
953   struct Channel *chn = cls;
954   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
955                                  fragment_id, fragment_id,
956                                  &store_recv_fragment_replay,
957                                  &store_recv_fragment_replay_result, rh);
958 }
959
960
961 /**
962  * Incoming message replay request from multicast.
963  */
964 static void
965 mcast_recv_replay_message (void *cls,
966                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
967                            uint64_t message_id,
968                            uint64_t fragment_offset,
969                            uint64_t flags,
970                            struct GNUNET_MULTICAST_ReplayHandle *rh)
971 {
972   struct Channel *chn = cls;
973   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
974                                 message_id, message_id, 1, NULL,
975                                 &store_recv_fragment_replay,
976                                 &store_recv_fragment_replay_result, rh);
977 }
978
979
980 /**
981  * Convert an uint64_t in network byte order to a HashCode
982  * that can be used as key in a MultiHashMap
983  */
984 static inline void
985 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
986 {
987   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
988   /* TODO: use built-in byte swap functions if available */
989
990   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
991   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
992
993   *key = (struct GNUNET_HashCode) {};
994   *((uint64_t *) key)
995     = (n << 32) | (n >> 32);
996 }
997
998
999 /**
1000  * Convert an uint64_t in host byte order to a HashCode
1001  * that can be used as key in a MultiHashMap
1002  */
1003 static inline void
1004 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
1005 {
1006 #if __BYTE_ORDER == __BIG_ENDIAN
1007   hash_key_from_nll (key, n);
1008 #elif __BYTE_ORDER == __LITTLE_ENDIAN
1009   *key = (struct GNUNET_HashCode) {};
1010   *((uint64_t *) key) = n;
1011 #else
1012   #error byteorder undefined
1013 #endif
1014 }
1015
1016
1017 /**
1018  * Initialize PSYC message header.
1019  */
1020 static inline void
1021 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1022                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1023 {
1024   uint16_t size = ntohs (mmsg->header.size);
1025   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1026
1027   pmsg->header.size = htons (psize);
1028   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1029   pmsg->message_id = mmsg->message_id;
1030   pmsg->fragment_offset = mmsg->fragment_offset;
1031   pmsg->flags = htonl (flags);
1032
1033   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1034 }
1035
1036
1037 /**
1038  * Create a new PSYC message from a multicast message for sending it to clients.
1039  */
1040 static inline struct GNUNET_PSYC_MessageHeader *
1041 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1042 {
1043   struct GNUNET_PSYC_MessageHeader *pmsg;
1044   uint16_t size = ntohs (mmsg->header.size);
1045   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1046
1047   pmsg = GNUNET_malloc (psize);
1048   psyc_msg_init (pmsg, mmsg, flags);
1049   return pmsg;
1050 }
1051
1052
1053 /**
1054  * Send multicast message to all clients connected to the channel.
1055  */
1056 static void
1057 client_send_mcast_msg (struct Channel *chn,
1058                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1059                        uint32_t flags)
1060 {
1061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062               "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1063               chn,
1064               GNUNET_ntohll (mmsg->fragment_id),
1065               GNUNET_ntohll (mmsg->message_id));
1066
1067   struct GNUNET_PSYC_MessageHeader *
1068     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1069   client_send_msg (chn, &pmsg->header);
1070   GNUNET_free (pmsg);
1071 }
1072
1073
1074 /**
1075  * Send multicast request to all clients connected to the channel.
1076  */
1077 static void
1078 client_send_mcast_req (struct Master *mst,
1079                        const struct GNUNET_MULTICAST_RequestHeader *req)
1080 {
1081   struct Channel *chn = &mst->channel;
1082
1083   struct GNUNET_PSYC_MessageHeader *pmsg;
1084   uint16_t size = ntohs (req->header.size);
1085   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1086
1087   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088               "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1089               chn,
1090               GNUNET_ntohll (req->fragment_id),
1091               GNUNET_ntohll (req->request_id));
1092
1093   pmsg = GNUNET_malloc (psize);
1094   pmsg->header.size = htons (psize);
1095   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1096   pmsg->message_id = req->request_id;
1097   pmsg->fragment_offset = req->fragment_offset;
1098   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1099   pmsg->slave_pub_key = req->member_pub_key;
1100   GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1101
1102   client_send_msg (chn, &pmsg->header);
1103
1104   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1105
1106   GNUNET_free (pmsg);
1107 }
1108
1109
1110 /**
1111  * Insert a multicast message fragment into the queue belonging to the message.
1112  *
1113  * @param chn          Channel.
1114  * @param mmsg         Multicast message fragment.
1115  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1116  * @param first_ptype  First PSYC message part type in @a mmsg.
1117  * @param last_ptype   Last PSYC message part type in @a mmsg.
1118  */
1119 static void
1120 fragment_queue_insert (struct Channel *chn,
1121                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1122                        uint16_t first_ptype, uint16_t last_ptype)
1123 {
1124   const uint16_t size = ntohs (mmsg->header.size);
1125   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1126   struct GNUNET_CONTAINER_MultiHashMap
1127     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1128                                                     &chn->pub_key_hash);
1129
1130   struct GNUNET_HashCode msg_id_hash;
1131   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1132
1133   struct FragmentQueue
1134     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1135
1136   if (NULL == fragq)
1137   {
1138     fragq = GNUNET_malloc (sizeof (*fragq));
1139     fragq->state = MSG_FRAG_STATE_HEADER;
1140     fragq->fragments
1141       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1142
1143     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1144                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1145
1146     if (NULL == chan_msgs)
1147     {
1148       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1149       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1150                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1151     }
1152   }
1153
1154   struct GNUNET_HashCode frag_id_hash;
1155   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1156   struct RecvCacheEntry
1157     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1158   if (NULL == cache_entry)
1159   {
1160     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161                 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1162                 chn,
1163                 GNUNET_ntohll (mmsg->message_id),
1164                 GNUNET_ntohll (mmsg->fragment_id));
1165     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1166                 "%p header_size: %" PRIu64 " + %u\n",
1167                 chn,
1168                 fragq->header_size,
1169                 size);
1170     cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1171     cache_entry->ref_count = 1;
1172     cache_entry->mmsg = GNUNET_malloc (size);
1173     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1174     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1175                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1176   }
1177   else
1178   {
1179     cache_entry->ref_count++;
1180     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1181                 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1182                 chn,
1183                 GNUNET_ntohll (mmsg->message_id),
1184                 GNUNET_ntohll (mmsg->fragment_id),
1185                 cache_entry->ref_count);
1186   }
1187
1188   if (MSG_FRAG_STATE_HEADER == fragq->state)
1189   {
1190     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1191     {
1192       struct GNUNET_PSYC_MessageMethod *
1193         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1194       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1195       fragq->flags = ntohl (pmeth->flags);
1196     }
1197
1198     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1199     {
1200       fragq->header_size += size;
1201     }
1202     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1203              || frag_offset == fragq->header_size)
1204     { /* header is now complete */
1205       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1206                   "%p Header of message %" PRIu64 " is complete.\n",
1207                   chn,
1208                   GNUNET_ntohll (mmsg->message_id));
1209
1210       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1211                   "%p Adding message %" PRIu64 " to queue.\n",
1212                   chn,
1213                   GNUNET_ntohll (mmsg->message_id));
1214       fragq->state = MSG_FRAG_STATE_DATA;
1215     }
1216     else
1217     {
1218       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1219                   "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1220                   chn,
1221                   GNUNET_ntohll (mmsg->message_id),
1222                   frag_offset,
1223                   fragq->header_size);
1224     }
1225   }
1226
1227   switch (last_ptype)
1228   {
1229   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1230     if (frag_offset == fragq->size)
1231       fragq->state = MSG_FRAG_STATE_END;
1232     else
1233       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1234                   "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1235                   chn,
1236                   GNUNET_ntohll (mmsg->message_id),
1237                   frag_offset,
1238                   fragq->size);
1239     break;
1240
1241   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1242     /* Drop message without delivering to client if it's a single fragment */
1243     fragq->state =
1244       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1245       ? MSG_FRAG_STATE_DROP
1246       : MSG_FRAG_STATE_CANCEL;
1247   }
1248
1249   switch (fragq->state)
1250   {
1251   case MSG_FRAG_STATE_DATA:
1252   case MSG_FRAG_STATE_END:
1253   case MSG_FRAG_STATE_CANCEL:
1254     if (GNUNET_NO == fragq->is_queued)
1255     {
1256       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1257                                     GNUNET_ntohll (mmsg->message_id));
1258       fragq->is_queued = GNUNET_YES;
1259     }
1260   }
1261
1262   fragq->size += size;
1263   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1264                                 GNUNET_ntohll (mmsg->fragment_id));
1265 }
1266
1267
1268 /**
1269  * Run fragment queue of a message.
1270  *
1271  * Send fragments of a message in order to client, after all modifiers arrived
1272  * from multicast.
1273  *
1274  * @param chn
1275  *        Channel.
1276  * @param msg_id
1277  *        ID of the message @a fragq belongs to.
1278  * @param fragq
1279  *        Fragment queue of the message.
1280  * @param drop
1281  *        Drop message without delivering to client?
1282  *        #GNUNET_YES or #GNUNET_NO.
1283  */
1284 static void
1285 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1286                     struct FragmentQueue *fragq, uint8_t drop)
1287 {
1288   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1289               "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1290               chn,
1291               msg_id,
1292               fragq->state);
1293
1294   struct GNUNET_CONTAINER_MultiHashMap
1295     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1296                                                     &chn->pub_key_hash);
1297   GNUNET_assert (NULL != chan_msgs);
1298   uint64_t frag_id;
1299
1300   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1301                                                     &frag_id))
1302   {
1303     struct GNUNET_HashCode frag_id_hash;
1304     hash_key_from_hll (&frag_id_hash, frag_id);
1305     struct RecvCacheEntry *cache_entry
1306       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1307     if (cache_entry != NULL)
1308     {
1309       if (GNUNET_NO == drop)
1310       {
1311         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1312       }
1313       if (cache_entry->ref_count <= 1)
1314       {
1315         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1316                                               cache_entry);
1317         GNUNET_free (cache_entry->mmsg);
1318         GNUNET_free (cache_entry);
1319       }
1320       else
1321       {
1322         cache_entry->ref_count--;
1323       }
1324     }
1325 #if CACHE_AGING_IMPLEMENTED
1326     else if (GNUNET_NO == drop)
1327     {
1328       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1329     }
1330 #endif
1331
1332     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1333   }
1334
1335   if (MSG_FRAG_STATE_END <= fragq->state)
1336   {
1337     struct GNUNET_HashCode msg_id_hash;
1338     hash_key_from_hll (&msg_id_hash, msg_id);
1339
1340     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1341     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1342     GNUNET_free (fragq);
1343   }
1344   else
1345   {
1346     fragq->is_queued = GNUNET_NO;
1347   }
1348 }
1349
1350
1351 struct StateModifyClosure
1352 {
1353   struct Channel *channel;
1354   uint64_t msg_id;
1355   struct GNUNET_HashCode msg_id_hash;
1356 };
1357
1358
1359 void
1360 store_recv_state_modify_result (void *cls, int64_t result,
1361                                 const char *err_msg, uint16_t err_msg_size)
1362 {
1363   struct StateModifyClosure *mcls = cls;
1364   struct Channel *chn = mcls->channel;
1365   uint64_t msg_id = mcls->msg_id;
1366
1367   struct FragmentQueue *
1368     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1369
1370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1372               chn, result, err_msg_size, err_msg);
1373
1374   switch (result)
1375   {
1376   case GNUNET_OK:
1377   case GNUNET_NO:
1378     if (NULL != fragq)
1379       fragq->state_is_modified = GNUNET_YES;
1380     if (chn->max_state_message_id < msg_id)
1381       chn->max_state_message_id = msg_id;
1382     if (chn->max_message_id < msg_id)
1383       chn->max_message_id = msg_id;
1384
1385     if (NULL != fragq)
1386       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1387     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1388     message_queue_run (chn);
1389     break;
1390
1391   default:
1392     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1393                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1394                 chn, result, err_msg_size, err_msg);
1395     /** @todo FIXME: handle state_modify error */
1396   }
1397 }
1398
1399
1400 /**
1401  * Run message queue.
1402  *
1403  * Send messages in queue to client in order after a message has arrived from
1404  * multicast, according to the following:
1405  * - A message is only sent if all of its modifiers arrived.
1406  * - A stateful message is only sent if the previous stateful message
1407  *   has already been delivered to the client.
1408  *
1409  * @param chn  Channel.
1410  *
1411  * @return Number of messages removed from queue and sent to client.
1412  */
1413 static uint64_t
1414 message_queue_run (struct Channel *chn)
1415 {
1416   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1417               "%p Running message queue.\n", chn);
1418   uint64_t n = 0;
1419   uint64_t msg_id;
1420
1421   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1422                                                     &msg_id))
1423   {
1424     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1425                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1426     struct GNUNET_HashCode msg_id_hash;
1427     hash_key_from_hll (&msg_id_hash, msg_id);
1428
1429     struct FragmentQueue *
1430       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1431
1432     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1433     {
1434       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1435                   "%p No fragq (%p) or header not complete.\n",
1436                   chn, fragq);
1437       break;
1438     }
1439
1440     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441                 "%p Fragment queue entry:  state: %u, state delta: "
1442                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1443                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1444
1445     if (MSG_FRAG_STATE_DATA <= fragq->state)
1446     {
1447       /* Check if there's a missing message before the current one */
1448       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1449       {
1450         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1451
1452         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1453             && (chn->max_message_id != msg_id - 1
1454                 && chn->max_message_id != msg_id))
1455         {
1456           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1457                       "%p Out of order message. "
1458                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1459                       chn, chn->max_message_id, msg_id);
1460           break;
1461           // FIXME: keep track of messages processed in this queue run,
1462           //        and only stop after reaching the end
1463         }
1464       }
1465       else
1466       {
1467         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1468         if (GNUNET_YES != fragq->state_is_modified)
1469         {
1470           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1471           {
1472             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1473                         "%p Out of order stateful message. "
1474                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1475                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1476             break;
1477             // FIXME: keep track of messages processed in this queue run,
1478             //        and only stop after reaching the end
1479           }
1480
1481           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1482           mcls->channel = chn;
1483           mcls->msg_id = msg_id;
1484           mcls->msg_id_hash = msg_id_hash;
1485
1486           /* Apply modifiers to state in PSYCstore */
1487           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1488                                          fragq->state_delta,
1489                                          store_recv_state_modify_result, mcls);
1490           break; // continue after asynchronous state modify result
1491         }
1492       }
1493       chn->max_message_id = msg_id;
1494     }
1495     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1496     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1497     n++;
1498   }
1499
1500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1502   return n;
1503 }
1504
1505
1506 /**
1507  * Drop message queue of a channel.
1508  *
1509  * Remove all messages in queue without sending it to clients.
1510  *
1511  * @param chn  Channel.
1512  *
1513  * @return Number of messages removed from queue.
1514  */
1515 static uint64_t
1516 message_queue_drop (struct Channel *chn)
1517 {
1518   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1519               "%p Dropping message queue.\n", chn);
1520   uint64_t n = 0;
1521   uint64_t msg_id;
1522   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1523                                                     &msg_id))
1524   {
1525     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1526                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1527     struct GNUNET_HashCode msg_id_hash;
1528     hash_key_from_hll (&msg_id_hash, msg_id);
1529
1530     struct FragmentQueue *
1531       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1532     GNUNET_assert (NULL != fragq);
1533     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1534     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1535     n++;
1536   }
1537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1539   return n;
1540 }
1541
1542
1543 /**
1544  * Received result of GNUNET_PSYCSTORE_fragment_store().
1545  */
1546 static void
1547 store_recv_fragment_store_result (void *cls, int64_t result,
1548                                   const char *err_msg, uint16_t err_msg_size)
1549 {
1550   struct Channel *chn = cls;
1551   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1552               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1553               chn, result, err_msg_size, err_msg);
1554 }
1555
1556
1557 /**
1558  * Handle incoming message fragment from multicast.
1559  *
1560  * Store it using PSYCstore and send it to the clients of the channel in order.
1561  */
1562 static void
1563 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1564 {
1565   struct Channel *chn = cls;
1566   uint16_t size = ntohs (mmsg->header.size);
1567
1568   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1569               "%p Received multicast message of size %u. "
1570               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1571               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1572               chn, size,
1573               GNUNET_ntohll (mmsg->fragment_id),
1574               GNUNET_ntohll (mmsg->message_id),
1575               GNUNET_ntohll (mmsg->fragment_offset),
1576               GNUNET_ntohll (mmsg->flags));
1577
1578   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1579                                    &store_recv_fragment_store_result, chn);
1580
1581   uint16_t first_ptype = 0, last_ptype = 0;
1582   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1583                                                (const char *) &mmsg[1],
1584                                                &first_ptype, &last_ptype);
1585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1586               "%p Message check result %d, first part type %u, last part type %u\n",
1587               chn, check, first_ptype, last_ptype);
1588   if (GNUNET_SYSERR == check)
1589   {
1590     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1591                 "%p Dropping incoming multicast message with invalid parts.\n",
1592                 chn);
1593     GNUNET_break_op (0);
1594     return;
1595   }
1596
1597   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1598   message_queue_run (chn);
1599 }
1600
1601
1602 /**
1603  * Incoming request fragment from multicast for a master.
1604  *
1605  * @param cls   Master.
1606  * @param req   The request.
1607  */
1608 static void
1609 mcast_recv_request (void *cls,
1610                     const struct GNUNET_MULTICAST_RequestHeader *req)
1611 {
1612   struct Master *mst = cls;
1613   uint16_t size = ntohs (req->header.size);
1614
1615   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617               "%p Received multicast request of size %u from %s.\n",
1618               mst, size, str);
1619   GNUNET_free (str);
1620
1621   uint16_t first_ptype = 0, last_ptype = 0;
1622   if (GNUNET_SYSERR
1623       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1624                                           (const char *) &req[1],
1625                                           &first_ptype, &last_ptype))
1626   {
1627     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1628                 "%p Dropping incoming multicast request with invalid parts.\n",
1629                 mst);
1630     GNUNET_break_op (0);
1631     return;
1632   }
1633
1634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635               "Message parts: first: type %u, last: type %u\n",
1636               first_ptype, last_ptype);
1637
1638   /* FIXME: in-order delivery */
1639   client_send_mcast_req (mst, req);
1640 }
1641
1642
1643 /**
1644  * Response from PSYCstore with the current counter values for a channel master.
1645  */
1646 static void
1647 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1648                             uint64_t max_message_id, uint64_t max_group_generation,
1649                             uint64_t max_state_message_id)
1650 {
1651   struct Master *mst = cls;
1652   struct Channel *chn = &mst->channel;
1653   chn->store_op = NULL;
1654
1655   struct GNUNET_PSYC_CountersResultMessage res;
1656   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1657   res.header.size = htons (sizeof (res));
1658   res.result_code = htonl (result);
1659   res.max_message_id = GNUNET_htonll (max_message_id);
1660
1661   if (GNUNET_OK == result || GNUNET_NO == result)
1662   {
1663     mst->max_message_id = max_message_id;
1664     chn->max_message_id = max_message_id;
1665     chn->max_state_message_id = max_state_message_id;
1666     mst->max_group_generation = max_group_generation;
1667     mst->origin
1668       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1669                                        mcast_recv_join_request,
1670                                        mcast_recv_replay_fragment,
1671                                        mcast_recv_replay_message,
1672                                        mcast_recv_request,
1673                                        mcast_recv_message, chn);
1674     chn->is_ready = GNUNET_YES;
1675   }
1676   else
1677   {
1678     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1679                 "%p GNUNET_PSYCSTORE_counters_get() "
1680                 "returned %d for channel %s.\n",
1681                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1682   }
1683
1684   client_send_msg (chn, &res.header);
1685 }
1686
1687
1688 /**
1689  * Response from PSYCstore with the current counter values for a channel slave.
1690  */
1691 void
1692 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1693                            uint64_t max_message_id, uint64_t max_group_generation,
1694                            uint64_t max_state_message_id)
1695 {
1696   struct Slave *slv = cls;
1697   struct Channel *chn = &slv->channel;
1698   chn->store_op = NULL;
1699
1700   struct GNUNET_PSYC_CountersResultMessage res;
1701   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1702   res.header.size = htons (sizeof (res));
1703   res.result_code = htonl (result);
1704   res.max_message_id = GNUNET_htonll (max_message_id);
1705
1706   if (GNUNET_OK == result || GNUNET_NO == result)
1707   {
1708     chn->max_message_id = max_message_id;
1709     chn->max_state_message_id = max_state_message_id;
1710     slv->member
1711       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1712                                       &slv->origin,
1713                                       slv->relay_count, slv->relays,
1714                                       &slv->join_msg->header,
1715                                       mcast_recv_join_request,
1716                                       mcast_recv_join_decision,
1717                                       mcast_recv_replay_fragment,
1718                                       mcast_recv_replay_message,
1719                                       mcast_recv_message, chn);
1720     if (NULL != slv->join_msg)
1721     {
1722       GNUNET_free (slv->join_msg);
1723       slv->join_msg = NULL;
1724     }
1725   }
1726   else
1727   {
1728     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1729                 "%p GNUNET_PSYCSTORE_counters_get() "
1730                 "returned %d for channel %s.\n",
1731                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1732   }
1733
1734   client_send_msg (chn, &res.header);
1735 }
1736
1737
1738 static void
1739 channel_init (struct Channel *chn)
1740 {
1741   chn->recv_msgs
1742     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1743   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1744 }
1745
1746
1747 /**
1748  * Handle a connecting client starting a channel master.
1749  */
1750 static void
1751 handle_client_master_start (void *cls,
1752                             const struct MasterStartRequest *req)
1753 {
1754   struct Client *c = cls;
1755   struct GNUNET_SERVICE_Client *client = c->client;
1756
1757   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1758   struct GNUNET_HashCode pub_key_hash;
1759
1760   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1761   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1762
1763   struct Master *
1764     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1765   struct Channel *chn;
1766
1767   if (NULL == mst)
1768   {
1769     mst = GNUNET_malloc (sizeof (*mst));
1770     mst->policy = ntohl (req->policy);
1771     mst->priv_key = req->channel_key;
1772     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1773
1774     chn = c->channel = &mst->channel;
1775     chn->master = mst;
1776     chn->is_master = GNUNET_YES;
1777     chn->pub_key = pub_key;
1778     chn->pub_key_hash = pub_key_hash;
1779     channel_init (chn);
1780
1781     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1782                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1783     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1784                                                    store_recv_master_counters, mst);
1785   }
1786   else
1787   {
1788     chn = &mst->channel;
1789
1790     struct GNUNET_PSYC_CountersResultMessage *res;
1791     struct GNUNET_MQ_Envelope *
1792       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1793     res->result_code = htonl (GNUNET_OK);
1794     res->max_message_id = GNUNET_htonll (mst->max_message_id);
1795
1796     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1797   }
1798
1799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800               "%p Client connected as master to channel %s.\n",
1801               mst, GNUNET_h2s (&chn->pub_key_hash));
1802
1803   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1804   cli->client = client;
1805   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1806
1807   GNUNET_SERVICE_client_continue (client);
1808 }
1809
1810
1811 static int
1812 check_client_slave_join (void *cls,
1813                          const struct SlaveJoinRequest *req)
1814 {
1815   return GNUNET_OK;
1816 }
1817
1818
1819 /**
1820  * Handle a connecting client joining as a channel slave.
1821  */
1822 static void
1823 handle_client_slave_join (void *cls,
1824                           const struct SlaveJoinRequest *req)
1825 {
1826   struct Client *c = cls;
1827   struct GNUNET_SERVICE_Client *client = c->client;
1828
1829   uint16_t req_size = ntohs (req->header.size);
1830
1831   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1832   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1833
1834   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1835   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1836   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1837
1838   struct GNUNET_CONTAINER_MultiHashMap *
1839     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1840   struct Slave *slv = NULL;
1841   struct Channel *chn;
1842
1843   if (NULL != chn_slv)
1844   {
1845     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1846   }
1847   if (NULL == slv)
1848   {
1849     slv = GNUNET_malloc (sizeof (*slv));
1850     slv->priv_key = req->slave_key;
1851     slv->pub_key = slv_pub_key;
1852     slv->pub_key_hash = slv_pub_hash;
1853     slv->origin = req->origin;
1854     slv->relay_count = ntohl (req->relay_count);
1855     slv->join_flags = ntohl (req->flags);
1856
1857     const struct GNUNET_PeerIdentity *
1858       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1859     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1860     uint16_t join_msg_size = 0;
1861
1862     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1863         <= req_size)
1864     {
1865       struct GNUNET_PSYC_Message *
1866         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1867       join_msg_size = ntohs (join_msg->header.size);
1868       slv->join_msg = GNUNET_malloc (join_msg_size);
1869       GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1870     }
1871     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1872     {
1873       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1874                   "%u + %u + %u != %u\n",
1875                   (unsigned int) sizeof (*req),
1876                   relay_size,
1877                   join_msg_size,
1878                   req_size);
1879       GNUNET_break (0);
1880       GNUNET_SERVICE_client_drop (client);
1881       GNUNET_free (slv);
1882       return;
1883     }
1884     if (0 < slv->relay_count)
1885     {
1886       slv->relays = GNUNET_malloc (relay_size);
1887       GNUNET_memcpy (slv->relays, &req[1], relay_size);
1888     }
1889
1890     chn = c->channel = &slv->channel;
1891     chn->slave = slv;
1892     chn->is_master = GNUNET_NO;
1893     chn->pub_key = req->channel_pub_key;
1894     chn->pub_key_hash = pub_key_hash;
1895     channel_init (chn);
1896
1897     if (NULL == chn_slv)
1898     {
1899       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1900       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1901                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1902     }
1903     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1904                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1905     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1906                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1907     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1908                                                   &store_recv_slave_counters, slv);
1909   }
1910   else
1911   {
1912     chn = &slv->channel;
1913
1914     struct GNUNET_PSYC_CountersResultMessage *res;
1915
1916     struct GNUNET_MQ_Envelope *
1917       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1918     res->result_code = htonl (GNUNET_OK);
1919     res->max_message_id = GNUNET_htonll (chn->max_message_id);
1920
1921     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1922
1923     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1924     {
1925       mcast_recv_join_decision (slv, GNUNET_YES,
1926                                 NULL, 0, NULL, NULL);
1927     }
1928     else if (NULL == slv->member)
1929     {
1930       slv->member
1931         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1932                                         &slv->origin,
1933                                         slv->relay_count, slv->relays,
1934                                         &slv->join_msg->header,
1935                                         &mcast_recv_join_request,
1936                                         &mcast_recv_join_decision,
1937                                         &mcast_recv_replay_fragment,
1938                                         &mcast_recv_replay_message,
1939                                         &mcast_recv_message, chn);
1940       if (NULL != slv->join_msg)
1941       {
1942         GNUNET_free (slv->join_msg);
1943         slv->join_msg = NULL;
1944       }
1945     }
1946     else if (NULL != slv->join_dcsn)
1947     {
1948       struct GNUNET_MQ_Envelope *
1949         env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1950       GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1951     }
1952   }
1953
1954   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955               "%p Client connected as slave to channel %s.\n",
1956               slv, GNUNET_h2s (&chn->pub_key_hash));
1957
1958   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1959   cli->client = client;
1960   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1961
1962   GNUNET_SERVICE_client_continue (client);
1963 }
1964
1965
1966 struct JoinDecisionClosure
1967 {
1968   int32_t is_admitted;
1969   struct GNUNET_MessageHeader *msg;
1970 };
1971
1972
1973 /**
1974  * Iterator callback for sending join decisions to multicast.
1975  */
1976 static int
1977 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1978                           void *value)
1979 {
1980   struct JoinDecisionClosure *jcls = cls;
1981   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1982   // FIXME: add relays
1983   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1984   return GNUNET_YES;
1985 }
1986
1987
1988 static int
1989 check_client_join_decision (void *cls,
1990                             const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1991 {
1992   return GNUNET_OK;
1993 }
1994
1995
1996 /**
1997  * Join decision from client.
1998  */
1999 static void
2000 handle_client_join_decision (void *cls,
2001                              const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
2002 {
2003   struct Client *c = cls;
2004   struct GNUNET_SERVICE_Client *client = c->client;
2005   struct Channel *chn = c->channel;
2006   if (NULL == chn)
2007   {
2008     GNUNET_break (0);
2009     GNUNET_SERVICE_client_drop (client);
2010     return;
2011   }
2012   GNUNET_assert (GNUNET_YES == chn->is_master);
2013   struct Master *mst = chn->master;
2014
2015   struct JoinDecisionClosure jcls;
2016   jcls.is_admitted = ntohl (dcsn->is_admitted);
2017   jcls.msg
2018     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2019     ? (struct GNUNET_MessageHeader *) &dcsn[1]
2020     : NULL;
2021
2022   struct GNUNET_HashCode slave_pub_hash;
2023   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2024                       &slave_pub_hash);
2025
2026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2027               "%p Got join decision (%d) from client for channel %s..\n",
2028               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2030               "%p ..and slave %s.\n",
2031               mst, GNUNET_h2s (&slave_pub_hash));
2032
2033   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2034                                               &mcast_send_join_decision, &jcls);
2035   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2036   GNUNET_SERVICE_client_continue (client);
2037 }
2038
2039
2040 /**
2041  * Send acknowledgement to a client.
2042  *
2043  * Sent after a message fragment has been passed on to multicast.
2044  *
2045  * @param chn The channel struct for the client.
2046  */
2047 static void
2048 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2049 {
2050   struct GNUNET_MessageHeader *res;
2051   struct GNUNET_MQ_Envelope *
2052       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2053
2054   /* FIXME? */
2055   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2056 }
2057
2058
2059 /**
2060  * Callback for the transmit functions of multicast.
2061  */
2062 static int
2063 transmit_notify (void *cls, size_t *data_size, void *data)
2064 {
2065   struct Channel *chn = cls;
2066   struct TransmitMessage *tmit_msg = chn->tmit_head;
2067
2068   if (NULL == tmit_msg || *data_size < tmit_msg->size)
2069   {
2070     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071                 "%p transmit_notify: nothing to send.\n", chn);
2072     if (NULL != tmit_msg && *data_size < tmit_msg->size)
2073       GNUNET_break (0);
2074     *data_size = 0;
2075     return GNUNET_NO;
2076   }
2077
2078   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2079               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2080
2081   *data_size = tmit_msg->size;
2082   GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2083
2084   int ret
2085     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2086     ? GNUNET_NO
2087     : GNUNET_YES;
2088
2089   /* FIXME: handle disconnecting clients */
2090   if (NULL != tmit_msg->client)
2091     send_message_ack (chn, tmit_msg->client);
2092
2093   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2094
2095   if (NULL != chn->tmit_head)
2096   {
2097     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2098   }
2099   else if (GNUNET_YES == chn->is_disconnected
2100            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2101   {
2102     /* FIXME: handle partial message (when still in_transmit) */
2103     GNUNET_free (tmit_msg);
2104     return GNUNET_SYSERR;
2105   }
2106   GNUNET_free (tmit_msg);
2107   return ret;
2108 }
2109
2110
2111 /**
2112  * Callback for the transmit functions of multicast.
2113  */
2114 static int
2115 master_transmit_notify (void *cls, size_t *data_size, void *data)
2116 {
2117   int ret = transmit_notify (cls, data_size, data);
2118
2119   if (GNUNET_YES == ret)
2120   {
2121     struct Master *mst = cls;
2122     mst->tmit_handle = NULL;
2123   }
2124   return ret;
2125 }
2126
2127
2128 /**
2129  * Callback for the transmit functions of multicast.
2130  */
2131 static int
2132 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2133 {
2134   int ret = transmit_notify (cls, data_size, data);
2135
2136   if (GNUNET_YES == ret)
2137   {
2138     struct Slave *slv = cls;
2139     slv->tmit_handle = NULL;
2140   }
2141   return ret;
2142 }
2143
2144
2145 /**
2146  * Transmit a message from a channel master to the multicast group.
2147  */
2148 static void
2149 master_transmit_message (struct Master *mst)
2150 {
2151   struct Channel *chn = &mst->channel;
2152   struct TransmitMessage *tmit_msg = chn->tmit_head;
2153   if (NULL == tmit_msg)
2154     return;
2155   if (NULL == mst->tmit_handle)
2156   {
2157     mst->tmit_handle = (void *) &mst->tmit_handle;
2158     struct GNUNET_MULTICAST_OriginTransmitHandle *
2159       tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2160                                                     mst->max_group_generation,
2161                                                     master_transmit_notify, mst);
2162     if (NULL != mst->tmit_handle)
2163       mst->tmit_handle = tmit_handle;
2164   }
2165   else
2166   {
2167     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2168   }
2169 }
2170
2171
2172 /**
2173  * Transmit a message from a channel slave to the multicast group.
2174  */
2175 static void
2176 slave_transmit_message (struct Slave *slv)
2177 {
2178   if (NULL == slv->channel.tmit_head)
2179     return;
2180   if (NULL == slv->tmit_handle)
2181   {
2182     slv->tmit_handle = (void *) &slv->tmit_handle;
2183     struct GNUNET_MULTICAST_MemberTransmitHandle *
2184       tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->channel.tmit_head->id,
2185                                                        slave_transmit_notify, slv);
2186     if (NULL != slv->tmit_handle)
2187       slv->tmit_handle = tmit_handle;
2188   }
2189   else
2190   {
2191     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2192   }
2193 }
2194
2195
2196 static void
2197 transmit_message (struct Channel *chn)
2198 {
2199   chn->is_master
2200     ? master_transmit_message (chn->master)
2201     : slave_transmit_message (chn->slave);
2202 }
2203
2204
2205 /**
2206  * Queue a message from a channel master for sending to the multicast group.
2207  */
2208 static void
2209 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2210 {
2211   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2212
2213   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2214   {
2215     tmit_msg->id = ++mst->max_message_id;
2216     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2217                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2218                 mst, tmit_msg->id);
2219     struct GNUNET_PSYC_MessageMethod *pmeth
2220       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2221
2222     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2223     {
2224       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2225     }
2226     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2227     {
2228       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2229                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2230                   mst, tmit_msg->id - mst->max_state_message_id);
2231       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2232                                           - mst->max_state_message_id);
2233       mst->max_state_message_id = tmit_msg->id;
2234     }
2235     else
2236     {
2237         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2238                     "%p master_queue_message: state not modified\n", mst);
2239       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2240     }
2241
2242     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2243     {
2244       /// @todo add state_hash to PSYC header
2245     }
2246   }
2247 }
2248
2249
2250 /**
2251  * Queue a message from a channel slave for sending to the multicast group.
2252  */
2253 static void
2254 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2255 {
2256   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2257   {
2258     struct GNUNET_PSYC_MessageMethod *pmeth
2259       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2260     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2261     tmit_msg->id = ++slv->max_request_id;
2262   }
2263 }
2264
2265
2266 /**
2267  * Queue PSYC message parts for sending to multicast.
2268  *
2269  * @param chn
2270  *        Channel to send to.
2271  * @param client
2272  *        Client the message originates from.
2273  * @param data_size
2274  *        Size of @a data.
2275  * @param data
2276  *        Concatenated message parts.
2277  * @param first_ptype
2278  *        First message part type in @a data.
2279  * @param last_ptype
2280  *        Last message part type in @a data.
2281  */
2282 static struct TransmitMessage *
2283 queue_message (struct Channel *chn,
2284                struct GNUNET_SERVICE_Client *client,
2285                size_t data_size,
2286                const void *data,
2287                uint16_t first_ptype, uint16_t last_ptype)
2288 {
2289   struct TransmitMessage *
2290     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2291   GNUNET_memcpy (&tmit_msg[1], data, data_size);
2292   tmit_msg->client = client;
2293   tmit_msg->size = data_size;
2294   tmit_msg->first_ptype = first_ptype;
2295   tmit_msg->last_ptype = last_ptype;
2296
2297   /* FIXME: separate queue per message ID */
2298
2299   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2300
2301   chn->is_master
2302     ? master_queue_message (chn->master, tmit_msg)
2303     : slave_queue_message (chn->slave, tmit_msg);
2304   return tmit_msg;
2305 }
2306
2307
2308 /**
2309  * Cancel transmission of current message.
2310  *
2311  * @param chn     Channel to send to.
2312  * @param client  Client the message originates from.
2313  */
2314 static void
2315 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2316 {
2317   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2318
2319   struct GNUNET_MessageHeader msg;
2320   msg.size = htons (sizeof (msg));
2321   msg.type = htons (type);
2322
2323   queue_message (chn, client, sizeof (msg), &msg, type, type);
2324   transmit_message (chn);
2325
2326   /* FIXME: cleanup */
2327 }
2328
2329
2330 static int
2331 check_client_psyc_message (void *cls,
2332                            const struct GNUNET_MessageHeader *msg)
2333 {
2334   return GNUNET_OK;
2335 }
2336
2337
2338 /**
2339  * Incoming message from a master or slave client.
2340  */
2341 static void
2342 handle_client_psyc_message (void *cls,
2343                             const struct GNUNET_MessageHeader *msg)
2344 {
2345   struct Client *c = cls;
2346   struct GNUNET_SERVICE_Client *client = c->client;
2347   struct Channel *chn = c->channel;
2348   if (NULL == chn)
2349   {
2350     GNUNET_break (0);
2351     GNUNET_SERVICE_client_drop (client);
2352     return;
2353   }
2354
2355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2356               "%p Received message from client.\n", chn);
2357   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2358
2359   if (GNUNET_YES != chn->is_ready)
2360   {
2361     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2362                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2363     GNUNET_break (0);
2364     GNUNET_SERVICE_client_drop (client);
2365     return;
2366   }
2367
2368   uint16_t size = ntohs (msg->size);
2369   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2370   {
2371     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2372                 "%p Message payload too large: %u < %u.\n",
2373                 chn,
2374                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2375                 (unsigned int) (size - sizeof (*msg)));
2376     GNUNET_break (0);
2377     transmit_cancel (chn, client);
2378     GNUNET_SERVICE_client_drop (client);
2379     return;
2380   }
2381
2382   uint16_t first_ptype = 0, last_ptype = 0;
2383   if (GNUNET_SYSERR
2384       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2385                                           (const char *) &msg[1],
2386                                           &first_ptype, &last_ptype))
2387   {
2388     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2389                 "%p Received invalid message part from client.\n", chn);
2390     GNUNET_break (0);
2391     transmit_cancel (chn, client);
2392     GNUNET_SERVICE_client_drop (client);
2393     return;
2394   }
2395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2396               "%p Received message with first part type %u and last part type %u.\n",
2397               chn, first_ptype, last_ptype);
2398
2399   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2400                  first_ptype, last_ptype);
2401   transmit_message (chn);
2402   /* FIXME: send a few ACKs even before transmit_notify is called */
2403
2404   GNUNET_SERVICE_client_continue (client);
2405 };
2406
2407
2408 /**
2409  * Received result of GNUNET_PSYCSTORE_membership_store()
2410  */
2411 static void
2412 store_recv_membership_store_result (void *cls,
2413                                     int64_t result,
2414                                     const char *err_msg,
2415                                     uint16_t err_msg_size)
2416 {
2417   struct Operation *op = cls;
2418   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2419               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2420               op->channel,
2421               result,
2422               (int) err_msg_size,
2423               err_msg);
2424
2425   if (NULL != op->client)
2426     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2427   op_remove (op);
2428 }
2429
2430
2431 /**
2432  * Client requests to add/remove a slave in the membership database.
2433  */
2434 static void
2435 handle_client_membership_store (void *cls,
2436                                 const struct ChannelMembershipStoreRequest *req)
2437 {
2438   struct Client *c = cls;
2439   struct GNUNET_SERVICE_Client *client = c->client;
2440   struct Channel *chn = c->channel;
2441   if (NULL == chn)
2442   {
2443     GNUNET_break (0);
2444     GNUNET_SERVICE_client_drop (client);
2445     return;
2446   }
2447
2448   struct Operation *op = op_add (chn, client, req->op_id, 0);
2449
2450   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2451   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2452   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2453               "%p Received membership store request from client.\n", chn);
2454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2456               chn, req->did_join, announced_at, effective_since);
2457
2458   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2459                                      req->did_join, announced_at, effective_since,
2460                                      0, /* FIXME: group_generation */
2461                                      &store_recv_membership_store_result, op);
2462   GNUNET_SERVICE_client_continue (client);
2463 }
2464
2465
2466 /**
2467  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2468  * in response to a history request from a client.
2469  */
2470 static int
2471 store_recv_fragment_history (void *cls,
2472                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2473                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2474 {
2475   struct Operation *op = cls;
2476   if (NULL == op->client)
2477   { /* Requesting client already disconnected. */
2478     return GNUNET_NO;
2479   }
2480   struct Channel *chn = op->channel;
2481
2482   struct GNUNET_PSYC_MessageHeader *pmsg;
2483   uint16_t msize = ntohs (mmsg->header.size);
2484   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2485
2486   struct GNUNET_OperationResultMessage *
2487     res = GNUNET_malloc (sizeof (*res) + psize);
2488   res->header.size = htons (sizeof (*res) + psize);
2489   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2490   res->op_id = op->op_id;
2491   res->result_code = GNUNET_htonll (GNUNET_OK);
2492
2493   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2494   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2495   GNUNET_memcpy (&res[1], pmsg, psize);
2496
2497   /** @todo FIXME: send only to requesting client */
2498   client_send_msg (chn, &res->header);
2499
2500   GNUNET_free (res);
2501   return GNUNET_YES;
2502 }
2503
2504
2505 /**
2506  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2507  * in response to a history request from a client.
2508  */
2509 static void
2510 store_recv_fragment_history_result (void *cls, int64_t result,
2511                                     const char *err_msg, uint16_t err_msg_size)
2512 {
2513   struct Operation *op = cls;
2514   if (NULL == op->client)
2515   { /* Requesting client already disconnected. */
2516     return;
2517   }
2518
2519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2520               "%p History replay #%" PRIu64 ": "
2521               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2522               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2523
2524   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2525   {
2526     /** @todo Multicast replay request for messages not found locally. */
2527   }
2528
2529   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2530   op_remove (op);
2531 }
2532
2533
2534 static int
2535 check_client_history_replay (void *cls,
2536                              const struct GNUNET_PSYC_HistoryRequestMessage *req)
2537 {
2538   return GNUNET_OK;
2539 }
2540
2541
2542 /**
2543  * Client requests channel history.
2544  */
2545 static void
2546 handle_client_history_replay (void *cls,
2547                               const struct GNUNET_PSYC_HistoryRequestMessage *req)
2548 {
2549   struct Client *c = cls;
2550   struct GNUNET_SERVICE_Client *client = c->client;
2551   struct Channel *chn = c->channel;
2552   if (NULL == chn)
2553   {
2554     GNUNET_break (0);
2555     GNUNET_SERVICE_client_drop (client);
2556     return;
2557   }
2558
2559   uint16_t size = ntohs (req->header.size);
2560   const char *method_prefix = (const char *) &req[1];
2561
2562   if (size < sizeof (*req) + 1
2563       || '\0' != method_prefix[size - sizeof (*req) - 1])
2564   {
2565     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2566                 "%p History replay #%" PRIu64 ": "
2567                 "invalid method prefix. size: %u < %u?\n",
2568                 chn,
2569                 GNUNET_ntohll (req->op_id),
2570                 size,
2571                 (unsigned int) sizeof (*req) + 1);
2572     GNUNET_break (0);
2573     GNUNET_SERVICE_client_drop (client);
2574     return;
2575   }
2576
2577   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2578
2579   if (0 == req->message_limit)
2580   {
2581     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2582                                   GNUNET_ntohll (req->start_message_id),
2583                                   GNUNET_ntohll (req->end_message_id),
2584                                   0, method_prefix,
2585                                   &store_recv_fragment_history,
2586                                   &store_recv_fragment_history_result, op);
2587   }
2588   else
2589   {
2590     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2591                                          GNUNET_ntohll (req->message_limit),
2592                                          method_prefix,
2593                                          &store_recv_fragment_history,
2594                                          &store_recv_fragment_history_result,
2595                                          op);
2596   }
2597   GNUNET_SERVICE_client_continue (client);
2598 }
2599
2600
2601 /**
2602  * Received state var from PSYCstore, send it to client.
2603  */
2604 static int
2605 store_recv_state_var (void *cls, const char *name,
2606                       const void *value, uint32_t value_size)
2607 {
2608   struct Operation *op = cls;
2609   struct GNUNET_OperationResultMessage *res;
2610   struct GNUNET_MQ_Envelope *env;
2611
2612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2613               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2614               op->channel, GNUNET_ntohll (op->op_id), name);
2615
2616   if (NULL != name) /* First part */
2617   {
2618     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2619     struct GNUNET_PSYC_MessageModifier *mod;
2620     env = GNUNET_MQ_msg_extra (res,
2621                                sizeof (*mod) + name_size + value_size,
2622                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2623     res->op_id = op->op_id;
2624
2625     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2626     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2627     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2628     mod->name_size = htons (name_size);
2629     mod->value_size = htonl (value_size);
2630     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2631     GNUNET_memcpy (&mod[1], name, name_size);
2632     GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2633   }
2634   else /* Continuation */
2635   {
2636     struct GNUNET_MessageHeader *mod;
2637     env = GNUNET_MQ_msg_extra (res,
2638                                sizeof (*mod) + value_size,
2639                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2640     res->op_id = op->op_id;
2641
2642     mod = (struct GNUNET_MessageHeader *) &res[1];
2643     mod->size = htons (sizeof (*mod) + value_size);
2644     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2645     GNUNET_memcpy (&mod[1], value, value_size);
2646   }
2647
2648   // FIXME: client might have been disconnected
2649   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2650   return GNUNET_YES;
2651 }
2652
2653
2654 /**
2655  * Received result of GNUNET_PSYCSTORE_state_get()
2656  * or GNUNET_PSYCSTORE_state_get_prefix()
2657  */
2658 static void
2659 store_recv_state_result (void *cls, int64_t result,
2660                          const char *err_msg, uint16_t err_msg_size)
2661 {
2662   struct Operation *op = cls;
2663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2664               "%p state_get #%" PRIu64 ": "
2665               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2666               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2667
2668   // FIXME: client might have been disconnected
2669   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2670   op_remove (op);
2671 }
2672
2673
2674 static int
2675 check_client_state_get (void *cls,
2676                          const struct StateRequest *req)
2677 {
2678   struct Client *c = cls;
2679   struct Channel *chn = c->channel;
2680   if (NULL == chn)
2681   {
2682     GNUNET_break (0);
2683     return GNUNET_SYSERR;
2684   }
2685
2686   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2687   const char *name = (const char *) &req[1];
2688   if (0 == name_size || '\0' != name[name_size - 1])
2689   {
2690     GNUNET_break (0);
2691     return GNUNET_SYSERR;
2692   }
2693
2694   return GNUNET_OK;
2695 }
2696
2697
2698 /**
2699  * Client requests best matching state variable from PSYCstore.
2700  */
2701 static void
2702 handle_client_state_get (void *cls,
2703                          const struct StateRequest *req)
2704 {
2705   struct Client *c = cls;
2706   struct GNUNET_SERVICE_Client *client = c->client;
2707   struct Channel *chn = c->channel;
2708
2709   const char *name = (const char *) &req[1];
2710   struct Operation *op = op_add (chn, client, req->op_id, 0);
2711   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2712                               &store_recv_state_var,
2713                               &store_recv_state_result, op);
2714   GNUNET_SERVICE_client_continue (client);
2715 }
2716
2717
2718 static int
2719 check_client_state_get_prefix (void *cls,
2720                                const struct StateRequest *req)
2721 {
2722   struct Client *c = cls;
2723   struct Channel *chn = c->channel;
2724   if (NULL == chn)
2725   {
2726     GNUNET_break (0);
2727     return GNUNET_SYSERR;
2728   }
2729
2730   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2731   const char *name = (const char *) &req[1];
2732   if (0 == name_size || '\0' != name[name_size - 1])
2733   {
2734     GNUNET_break (0);
2735     return GNUNET_SYSERR;
2736   }
2737
2738   return GNUNET_OK;
2739 }
2740
2741
2742 /**
2743  * Client requests state variables with a given prefix from PSYCstore.
2744  */
2745 static void
2746 handle_client_state_get_prefix (void *cls,
2747                                 const struct StateRequest *req)
2748 {
2749   struct Client *c = cls;
2750   struct GNUNET_SERVICE_Client *client = c->client;
2751   struct Channel *chn = c->channel;
2752
2753   const char *name = (const char *) &req[1];
2754   struct Operation *op = op_add (chn, client, req->op_id, 0);
2755   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2756                                      &store_recv_state_var,
2757                                      &store_recv_state_result, op);
2758   GNUNET_SERVICE_client_continue (client);
2759 }
2760
2761
2762 /**
2763  * Initialize the PSYC service.
2764  *
2765  * @param cls Closure.
2766  * @param server The initialized server.
2767  * @param c Configuration to use.
2768  */
2769 static void
2770 run (void *cls,
2771      const struct GNUNET_CONFIGURATION_Handle *c,
2772      struct GNUNET_SERVICE_Handle *svc)
2773 {
2774   cfg = c;
2775   service = svc;
2776   store = GNUNET_PSYCSTORE_connect (cfg);
2777   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2778   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2779   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2780   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2781   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2782   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2783 }
2784
2785
2786 /**
2787  * Define "main" method using service macro.
2788  */
2789 GNUNET_SERVICE_MAIN
2790 ("psyc",
2791  GNUNET_SERVICE_OPTION_NONE,
2792  run,
2793  client_notify_connect,
2794  client_notify_disconnect,
2795  NULL,
2796  GNUNET_MQ_hd_fixed_size (client_master_start,
2797                           GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2798                           struct MasterStartRequest,
2799                           NULL),
2800  GNUNET_MQ_hd_var_size (client_slave_join,
2801                         GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2802                         struct SlaveJoinRequest,
2803                         NULL),
2804  GNUNET_MQ_hd_var_size (client_join_decision,
2805                         GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2806                         struct GNUNET_PSYC_JoinDecisionMessage,
2807                         NULL),
2808  GNUNET_MQ_hd_var_size (client_psyc_message,
2809                         GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2810                         struct GNUNET_MessageHeader,
2811                         NULL),
2812  GNUNET_MQ_hd_fixed_size (client_membership_store,
2813                           GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2814                           struct ChannelMembershipStoreRequest,
2815                           NULL),
2816  GNUNET_MQ_hd_var_size (client_history_replay,
2817                         GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2818                         struct GNUNET_PSYC_HistoryRequestMessage,
2819                         NULL),
2820  GNUNET_MQ_hd_var_size (client_state_get,
2821                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2822                         struct StateRequest,
2823                         NULL),
2824  GNUNET_MQ_hd_var_size (client_state_get_prefix,
2825                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2826                         struct StateRequest,
2827                         NULL));
2828
2829 /* end of gnunet-service-psyc.c */