reduce loop counters to more practical levels
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Service handle.
48  */
49 static struct GNUNET_SERVICE_Handle *service;
50
51 /**
52  * Handle to the statistics service.
53  */
54 static struct GNUNET_STATISTICS_Handle *stats;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVICE_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct ClientList
189 {
190   struct ClientList *prev;
191   struct ClientList *next;
192
193   struct GNUNET_SERVICE_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVICE_Client *client;
203   struct Channel *channel;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct ClientList *clients_head;
215   struct ClientList *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this channel ready to receive messages from client?
274    * #GNUNET_YES or #GNUNET_NO
275    */
276   uint8_t is_ready;
277
278   /**
279    * Is the client disconnected?
280    * #GNUNET_YES or #GNUNET_NO
281    */
282   uint8_t is_disconnecting;
283
284   /**
285    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
286    */
287   uint8_t is_master;
288
289   union {
290     struct Master *master;
291     struct Slave *slave;
292   };
293 };
294
295
296 /**
297  * Client context for a channel master.
298  */
299 struct Master
300 {
301   /**
302    * Channel struct common for Master and Slave
303    */
304   struct Channel channel;
305
306   /**
307    * Private key of the channel.
308    */
309   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
310
311   /**
312    * Handle for the multicast origin.
313    */
314   struct GNUNET_MULTICAST_Origin *origin;
315
316   /**
317    * Transmit handle for multicast.
318    */
319   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
320
321   /**
322    * Incoming join requests from multicast.
323    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
324    */
325   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
326
327   /**
328    * Last message ID transmitted to this channel.
329    *
330    * Incremented before sending a message, thus the message_id in messages sent
331    * starts from 1.
332    */
333   uint64_t max_message_id;
334
335   /**
336    * ID of the last message with state operations transmitted to the channel.
337    * 0 if there is no such message.
338    */
339   uint64_t max_state_message_id;
340
341   /**
342    * Maximum group generation transmitted to the channel.
343    */
344   uint64_t max_group_generation;
345
346   /**
347    * @see enum GNUNET_PSYC_Policy
348    */
349   enum GNUNET_PSYC_Policy policy;
350 };
351
352
353 /**
354  * Client context for a channel slave.
355  */
356 struct Slave
357 {
358   /**
359    * Channel struct common for Master and Slave
360    */
361   struct Channel channel;
362
363   /**
364    * Private key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
367
368   /**
369    * Public key of the slave.
370    */
371   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
372
373   /**
374    * Hash of @a pub_key.
375    */
376   struct GNUNET_HashCode pub_key_hash;
377
378   /**
379    * Handle for the multicast member.
380    */
381   struct GNUNET_MULTICAST_Member *member;
382
383   /**
384    * Transmit handle for multicast.
385    */
386   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
387
388   /**
389    * Peer identity of the origin.
390    */
391   struct GNUNET_PeerIdentity origin;
392
393   /**
394    * Number of items in @a relays.
395    */
396   uint32_t relay_count;
397
398   /**
399    * Relays that multicast can use to connect.
400    */
401   struct GNUNET_PeerIdentity *relays;
402
403   /**
404    * Join request to be transmitted to the master on join.
405    */
406   struct GNUNET_PSYC_Message *join_msg;
407
408   /**
409    * Join decision received from multicast.
410    */
411   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
412
413   /**
414    * Maximum request ID for this channel.
415    */
416   uint64_t max_request_id;
417
418   /**
419    * Join flags.
420    */
421   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
422 };
423
424
425 /**
426  * Client context.
427  */
428 struct Client {
429   struct GNUNET_SERVICE_Client *client;
430   struct Channel *channel;
431 };
432
433
434 struct ReplayRequestKey
435 {
436   uint64_t fragment_id;
437   uint64_t message_id;
438   uint64_t fragment_offset;
439   uint64_t flags;
440 };
441
442
443 static void
444 transmit_message (struct Channel *chn);
445
446 static uint64_t
447 message_queue_run (struct Channel *chn);
448
449 static uint64_t
450 message_queue_drop (struct Channel *chn);
451
452
453 static void
454 schedule_transmit_message (void *cls)
455 {
456   struct Channel *chn = cls;
457
458   transmit_message (chn);
459 }
460
461
462 /**
463  * Task run during shutdown.
464  *
465  * @param cls unused
466  */
467 static void
468 shutdown_task (void *cls)
469 {
470   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471               "shutting down...\n");
472   GNUNET_PSYCSTORE_disconnect (store);
473   if (NULL != stats)
474   {
475     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
476     stats = NULL;
477   }
478 }
479
480
481 static struct Operation *
482 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
483         uint64_t op_id, uint32_t flags)
484 {
485   struct Operation *op = GNUNET_malloc (sizeof (*op));
486   op->client = client;
487   op->channel = chn;
488   op->op_id = op_id;
489   op->flags = flags;
490   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
491   return op;
492 }
493
494
495 static void
496 op_remove (struct Operation *op)
497 {
498   GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
499   GNUNET_free (op);
500 }
501
502
503 /**
504  * Clean up master data structures after a client disconnected.
505  */
506 static void
507 cleanup_master (struct Master *mst)
508 {
509   struct Channel *chn = &mst->channel;
510
511   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
512   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
513 }
514
515
516 /**
517  * Clean up slave data structures after a client disconnected.
518  */
519 static void
520 cleanup_slave (struct Slave *slv)
521 {
522   struct Channel *chn = &slv->channel;
523   struct GNUNET_CONTAINER_MultiHashMap *
524     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
525                                                 &chn->pub_key_hash);
526   GNUNET_assert (NULL != chn_slv);
527   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
528
529   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
530   {
531     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
532                                           chn_slv);
533     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
534   }
535   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
536
537   if (NULL != slv->join_msg)
538   {
539     GNUNET_free (slv->join_msg);
540     slv->join_msg = NULL;
541   }
542   if (NULL != slv->relays)
543   {
544     GNUNET_free (slv->relays);
545     slv->relays = NULL;
546   }
547   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
548 }
549
550
551 /**
552  * Clean up channel data structures after a client disconnected.
553  */
554 static void
555 cleanup_channel (struct Channel *chn)
556 {
557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558               "%p Cleaning up channel %s. master? %u\n",
559               chn,
560               GNUNET_h2s (&chn->pub_key_hash),
561               chn->is_master);
562   message_queue_drop (chn);
563   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
564   chn->recv_frags = NULL;
565
566   if (NULL != chn->store_op)
567   {
568     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
569     chn->store_op = NULL;
570   }
571
572   (GNUNET_YES == chn->is_master)
573     ? cleanup_master (chn->master)
574     : cleanup_slave (chn->slave);
575   GNUNET_free (chn);
576 }
577
578
579 /**
580  * Called whenever a client is disconnected.
581  * Frees our resources associated with that client.
582  *
583  * @param cls closure
584  * @param client identification of the client
585  * @param app_ctx must match @a client
586  */
587 static void
588 client_notify_disconnect (void *cls,
589                           struct GNUNET_SERVICE_Client *client,
590                           void *app_ctx)
591 {
592   struct Client *c = app_ctx;
593   struct Channel *chn = c->channel;
594   GNUNET_free (c);
595
596   if (NULL == chn)
597   {
598     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599                 "%p User context is NULL in client_notify_disconnect ()\n",
600                 chn);
601     GNUNET_break (0);
602     return;
603   }
604
605   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606               "%p Client %p (%s) disconnected from channel %s\n",
607               chn,
608               client,
609               (GNUNET_YES == chn->is_master) ? "master" : "slave",
610               GNUNET_h2s (&chn->pub_key_hash));
611
612   struct ClientList *cli = chn->clients_head;
613   while (NULL != cli)
614   {
615     if (cli->client == client)
616     {
617       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
618       GNUNET_free (cli);
619       break;
620     }
621     cli = cli->next;
622   }
623
624   struct Operation *op = chn->op_head;
625   while (NULL != op)
626   {
627     if (op->client == client)
628     {
629       op->client = NULL;
630       break;
631     }
632     op = op->next;
633   }
634
635   if (NULL == chn->clients_head)
636   { /* Last client disconnected. */
637     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638                 "%p Last client (%s) disconnected from channel %s\n",
639                 chn,
640                 (GNUNET_YES == chn->is_master) ? "master" : "slave",
641                 GNUNET_h2s (&chn->pub_key_hash));
642     chn->is_disconnecting = GNUNET_YES;
643     cleanup_channel (chn);
644   }
645 }
646
647
648 /**
649  * A new client connected.
650  *
651  * @param cls NULL
652  * @param client client to add
653  * @param mq message queue for @a client
654  * @return @a client
655  */
656 static void *
657 client_notify_connect (void *cls,
658                        struct GNUNET_SERVICE_Client *client,
659                        struct GNUNET_MQ_Handle *mq)
660 {
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
662
663   struct Client *c = GNUNET_malloc (sizeof (*c));
664   c->client = client;
665
666   return c;
667 }
668
669
670 /**
671  * Send message to all clients connected to the channel.
672  */
673 static void
674 client_send_msg (const struct Channel *chn,
675                  const struct GNUNET_MessageHeader *msg)
676 {
677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678               "Sending message to clients of channel %p.\n",
679               chn);
680
681   struct ClientList *cli = chn->clients_head;
682   while (NULL != cli)
683   {
684     struct GNUNET_MQ_Envelope *
685       env = GNUNET_MQ_msg_copy (msg);
686
687     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
688                     env);
689     cli = cli->next;
690   }
691 }
692
693
694 /**
695  * Send a result code back to the client.
696  *
697  * @param client
698  *        Client that should receive the result code.
699  * @param result_code
700  *        Code to transmit.
701  * @param op_id
702  *        Operation ID in network byte order.
703  * @param data
704  *        Data payload or NULL.
705  * @param data_size
706  *        Size of @a data.
707  */
708 static void
709 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
710                     int64_t result_code, const void *data, uint16_t data_size)
711 {
712   struct GNUNET_OperationResultMessage *res;
713   struct GNUNET_MQ_Envelope *
714     env = GNUNET_MQ_msg_extra (res,
715                                data_size,
716                                GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
717   res->result_code = GNUNET_htonll (result_code);
718   res->op_id = op_id;
719   if (0 < data_size)
720     GNUNET_memcpy (&res[1], data, data_size);
721
722   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723               "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
724               client,
725               GNUNET_ntohll (op_id),
726               result_code,
727               data_size);
728
729   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
730 }
731
732
733 /**
734  * Closure for join_mem_test_cb()
735  */
736 struct JoinMemTestClosure
737 {
738   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
739   struct Channel *channel;
740   struct GNUNET_MULTICAST_JoinHandle *join_handle;
741   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
742 };
743
744
745 /**
746  * Membership test result callback used for join requests.
747  */
748 static void
749 join_mem_test_cb (void *cls, int64_t result,
750                   const char *err_msg, uint16_t err_msg_size)
751 {
752   struct JoinMemTestClosure *jcls = cls;
753
754   if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
755   { /* Pass on join request to client if this is a master channel */
756     struct Master *mst = jcls->channel->master;
757     struct GNUNET_HashCode slave_pub_hash;
758     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
759                         &slave_pub_hash);
760     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
761                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
762     client_send_msg (jcls->channel, &jcls->join_msg->header);
763   }
764   else
765   {
766     if (GNUNET_SYSERR == result)
767     {
768       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
769                   "Could not perform membership test (%.*s)\n",
770                   err_msg_size, err_msg);
771     }
772     // FIXME: add relays
773     GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
774   }
775   GNUNET_free (jcls->join_msg);
776   GNUNET_free (jcls);
777 }
778
779
780 /**
781  * Incoming join request from multicast.
782  */
783 static void
784 mcast_recv_join_request (void *cls,
785                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
786                          const struct GNUNET_MessageHeader *join_msg,
787                          struct GNUNET_MULTICAST_JoinHandle *jh)
788 {
789   struct Channel *chn = cls;
790   uint16_t join_msg_size = 0;
791
792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793               "%p Got join request.\n",
794               chn);
795   if (NULL != join_msg)
796   {
797     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
798     {
799       join_msg_size = ntohs (join_msg->size);
800     }
801     else
802     {
803       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
804                   "%p Got join message with invalid type %u.\n",
805                   chn,
806                   ntohs (join_msg->type));
807     }
808   }
809
810   struct GNUNET_PSYC_JoinRequestMessage *
811     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
812   req->header.size = htons (sizeof (*req) + join_msg_size);
813   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
814   req->slave_pub_key = *slave_pub_key;
815   if (0 < join_msg_size)
816     GNUNET_memcpy (&req[1], join_msg, join_msg_size);
817
818   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
819   jcls->slave_pub_key = *slave_pub_key;
820   jcls->channel = chn;
821   jcls->join_handle = jh;
822   jcls->join_msg = req;
823
824   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
825                                     chn->max_message_id, 0,
826                                     &join_mem_test_cb, jcls);
827 }
828
829
830 /**
831  * Join decision received from multicast.
832  */
833 static void
834 mcast_recv_join_decision (void *cls, int is_admitted,
835                           const struct GNUNET_PeerIdentity *peer,
836                           uint16_t relay_count,
837                           const struct GNUNET_PeerIdentity *relays,
838                           const struct GNUNET_MessageHeader *join_resp)
839 {
840   struct Slave *slv = cls;
841   struct Channel *chn = &slv->channel;
842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843               "%p Got join decision: %d\n",
844               slv,
845               is_admitted);
846   if (GNUNET_YES == chn->is_ready)
847   {
848     /* Already admitted */
849     return;
850   }
851
852   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
853   struct GNUNET_PSYC_JoinDecisionMessage *
854     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
855   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
856   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
857   dcsn->is_admitted = htonl (is_admitted);
858   if (0 < join_resp_size)
859     GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
860
861   client_send_msg (chn, &dcsn->header);
862
863   if (GNUNET_YES == is_admitted
864       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
865   {
866     chn->is_ready = GNUNET_YES;
867   }
868 }
869
870
871 static int
872 store_recv_fragment_replay (void *cls,
873                             struct GNUNET_MULTICAST_MessageHeader *msg,
874                             enum GNUNET_PSYCSTORE_MessageFlags flags)
875 {
876   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
877
878   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
879   return GNUNET_YES;
880 }
881
882
883 /**
884  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
885  */
886 static void
887 store_recv_fragment_replay_result (void *cls,
888                                    int64_t result,
889                                    const char *err_msg,
890                                    uint16_t err_msg_size)
891 {
892   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
893
894   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
896               rh,
897               result,
898               err_msg_size,
899               err_msg);
900   switch (result)
901   {
902   case GNUNET_YES:
903     break;
904
905   case GNUNET_NO:
906     GNUNET_MULTICAST_replay_response (rh, NULL,
907                                       GNUNET_MULTICAST_REC_NOT_FOUND);
908     return;
909
910   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
911     GNUNET_MULTICAST_replay_response (rh, NULL,
912                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
913     return;
914
915   case GNUNET_SYSERR:
916     GNUNET_MULTICAST_replay_response (rh, NULL,
917                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
918     return;
919   }
920   /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
921    * an error code, so it must be ensured no further processing
922    * is attempted on 'rh'. Maybe this should be refactored as
923    * it doesn't look very intuitive.    --lynX
924    */
925   GNUNET_MULTICAST_replay_response_end (rh);
926 }
927
928
929 /**
930  * Incoming fragment replay request from multicast.
931  */
932 static void
933 mcast_recv_replay_fragment (void *cls,
934                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
935                             uint64_t fragment_id, uint64_t flags,
936                             struct GNUNET_MULTICAST_ReplayHandle *rh)
937
938 {
939   struct Channel *chn = cls;
940   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
941                                  fragment_id, fragment_id,
942                                  &store_recv_fragment_replay,
943                                  &store_recv_fragment_replay_result, rh);
944 }
945
946
947 /**
948  * Incoming message replay request from multicast.
949  */
950 static void
951 mcast_recv_replay_message (void *cls,
952                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
953                            uint64_t message_id,
954                            uint64_t fragment_offset,
955                            uint64_t flags,
956                            struct GNUNET_MULTICAST_ReplayHandle *rh)
957 {
958   struct Channel *chn = cls;
959   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
960                                 message_id, message_id, 1, NULL,
961                                 &store_recv_fragment_replay,
962                                 &store_recv_fragment_replay_result, rh);
963 }
964
965
966 /**
967  * Convert an uint64_t in network byte order to a HashCode
968  * that can be used as key in a MultiHashMap
969  */
970 static inline void
971 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
972 {
973   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
974   /* TODO: use built-in byte swap functions if available */
975
976   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
977   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
978
979   *key = (struct GNUNET_HashCode) {};
980   *((uint64_t *) key)
981     = (n << 32) | (n >> 32);
982 }
983
984
985 /**
986  * Convert an uint64_t in host byte order to a HashCode
987  * that can be used as key in a MultiHashMap
988  */
989 static inline void
990 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
991 {
992 #if __BYTE_ORDER == __BIG_ENDIAN
993   hash_key_from_nll (key, n);
994 #elif __BYTE_ORDER == __LITTLE_ENDIAN
995   *key = (struct GNUNET_HashCode) {};
996   *((uint64_t *) key) = n;
997 #else
998   #error byteorder undefined
999 #endif
1000 }
1001
1002
1003 /**
1004  * Initialize PSYC message header.
1005  */
1006 static inline void
1007 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1008                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1009 {
1010   uint16_t size = ntohs (mmsg->header.size);
1011   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1012
1013   pmsg->header.size = htons (psize);
1014   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1015   pmsg->message_id = mmsg->message_id;
1016   pmsg->fragment_offset = mmsg->fragment_offset;
1017   pmsg->flags = htonl (flags);
1018
1019   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1020 }
1021
1022
1023 /**
1024  * Create a new PSYC message from a multicast message for sending it to clients.
1025  */
1026 static inline struct GNUNET_PSYC_MessageHeader *
1027 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1028 {
1029   struct GNUNET_PSYC_MessageHeader *pmsg;
1030   uint16_t size = ntohs (mmsg->header.size);
1031   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1032
1033   pmsg = GNUNET_malloc (psize);
1034   psyc_msg_init (pmsg, mmsg, flags);
1035   return pmsg;
1036 }
1037
1038
1039 /**
1040  * Send multicast message to all clients connected to the channel.
1041  */
1042 static void
1043 client_send_mcast_msg (struct Channel *chn,
1044                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1045                        uint32_t flags)
1046 {
1047   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048               "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1049               chn,
1050               GNUNET_ntohll (mmsg->fragment_id),
1051               GNUNET_ntohll (mmsg->message_id));
1052
1053   struct GNUNET_PSYC_MessageHeader *
1054     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1055   client_send_msg (chn, &pmsg->header);
1056   GNUNET_free (pmsg);
1057 }
1058
1059
1060 /**
1061  * Send multicast request to all clients connected to the channel.
1062  */
1063 static void
1064 client_send_mcast_req (struct Master *mst,
1065                        const struct GNUNET_MULTICAST_RequestHeader *req)
1066 {
1067   struct Channel *chn = &mst->channel;
1068
1069   struct GNUNET_PSYC_MessageHeader *pmsg;
1070   uint16_t size = ntohs (req->header.size);
1071   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1072
1073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1074               "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1075               chn,
1076               GNUNET_ntohll (req->fragment_id),
1077               GNUNET_ntohll (req->request_id));
1078
1079   pmsg = GNUNET_malloc (psize);
1080   pmsg->header.size = htons (psize);
1081   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1082   pmsg->message_id = req->request_id;
1083   pmsg->fragment_offset = req->fragment_offset;
1084   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1085   pmsg->slave_pub_key = req->member_pub_key;
1086   GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1087
1088   client_send_msg (chn, &pmsg->header);
1089
1090   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1091
1092   GNUNET_free (pmsg);
1093 }
1094
1095
1096 /**
1097  * Insert a multicast message fragment into the queue belonging to the message.
1098  *
1099  * @param chn          Channel.
1100  * @param mmsg         Multicast message fragment.
1101  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1102  * @param first_ptype  First PSYC message part type in @a mmsg.
1103  * @param last_ptype   Last PSYC message part type in @a mmsg.
1104  */
1105 static void
1106 fragment_queue_insert (struct Channel *chn,
1107                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1108                        uint16_t first_ptype, uint16_t last_ptype)
1109 {
1110   const uint16_t size = ntohs (mmsg->header.size);
1111   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1112   struct GNUNET_CONTAINER_MultiHashMap
1113     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1114                                                     &chn->pub_key_hash);
1115
1116   struct GNUNET_HashCode msg_id_hash;
1117   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1118
1119   struct FragmentQueue
1120     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1121
1122   if (NULL == fragq)
1123   {
1124     fragq = GNUNET_malloc (sizeof (*fragq));
1125     fragq->state = MSG_FRAG_STATE_HEADER;
1126     fragq->fragments
1127       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1128
1129     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1130                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1131
1132     if (NULL == chan_msgs)
1133     {
1134       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1135       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1136                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1137     }
1138   }
1139
1140   struct GNUNET_HashCode frag_id_hash;
1141   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1142   struct RecvCacheEntry
1143     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1144   if (NULL == cache_entry)
1145   {
1146     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147                 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1148                 chn,
1149                 GNUNET_ntohll (mmsg->message_id),
1150                 GNUNET_ntohll (mmsg->fragment_id));
1151     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152                 "%p header_size: %" PRIu64 " + %u\n",
1153                 chn,
1154                 fragq->header_size,
1155                 size);
1156     cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1157     cache_entry->ref_count = 1;
1158     cache_entry->mmsg = GNUNET_malloc (size);
1159     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1160     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1161                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1162   }
1163   else
1164   {
1165     cache_entry->ref_count++;
1166     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167                 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1168                 chn,
1169                 GNUNET_ntohll (mmsg->message_id),
1170                 GNUNET_ntohll (mmsg->fragment_id),
1171                 cache_entry->ref_count);
1172   }
1173
1174   if (MSG_FRAG_STATE_HEADER == fragq->state)
1175   {
1176     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1177     {
1178       struct GNUNET_PSYC_MessageMethod *
1179         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1180       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1181       fragq->flags = ntohl (pmeth->flags);
1182     }
1183
1184     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1185     {
1186       fragq->header_size += size;
1187     }
1188     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1189              || frag_offset == fragq->header_size)
1190     { /* header is now complete */
1191       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192                   "%p Header of message %" PRIu64 " is complete.\n",
1193                   chn,
1194                   GNUNET_ntohll (mmsg->message_id));
1195
1196       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1197                   "%p Adding message %" PRIu64 " to queue.\n",
1198                   chn,
1199                   GNUNET_ntohll (mmsg->message_id));
1200       fragq->state = MSG_FRAG_STATE_DATA;
1201     }
1202     else
1203     {
1204       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205                   "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1206                   chn,
1207                   GNUNET_ntohll (mmsg->message_id),
1208                   frag_offset,
1209                   fragq->header_size);
1210     }
1211   }
1212
1213   switch (last_ptype)
1214   {
1215   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1216     if (frag_offset == fragq->size)
1217       fragq->state = MSG_FRAG_STATE_END;
1218     else
1219       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220                   "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1221                   chn,
1222                   GNUNET_ntohll (mmsg->message_id),
1223                   frag_offset,
1224                   fragq->size);
1225     break;
1226
1227   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1228     /* Drop message without delivering to client if it's a single fragment */
1229     fragq->state =
1230       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1231       ? MSG_FRAG_STATE_DROP
1232       : MSG_FRAG_STATE_CANCEL;
1233   }
1234
1235   switch (fragq->state)
1236   {
1237   case MSG_FRAG_STATE_DATA:
1238   case MSG_FRAG_STATE_END:
1239   case MSG_FRAG_STATE_CANCEL:
1240     if (GNUNET_NO == fragq->is_queued)
1241     {
1242       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1243                                     GNUNET_ntohll (mmsg->message_id));
1244       fragq->is_queued = GNUNET_YES;
1245     }
1246   }
1247
1248   fragq->size += size;
1249   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1250                                 GNUNET_ntohll (mmsg->fragment_id));
1251 }
1252
1253
1254 /**
1255  * Run fragment queue of a message.
1256  *
1257  * Send fragments of a message in order to client, after all modifiers arrived
1258  * from multicast.
1259  *
1260  * @param chn
1261  *        Channel.
1262  * @param msg_id
1263  *        ID of the message @a fragq belongs to.
1264  * @param fragq
1265  *        Fragment queue of the message.
1266  * @param drop
1267  *        Drop message without delivering to client?
1268  *        #GNUNET_YES or #GNUNET_NO.
1269  */
1270 static void
1271 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1272                     struct FragmentQueue *fragq, uint8_t drop)
1273 {
1274   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1275               "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1276               chn,
1277               msg_id,
1278               fragq->state);
1279
1280   struct GNUNET_CONTAINER_MultiHashMap
1281     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1282                                                     &chn->pub_key_hash);
1283   GNUNET_assert (NULL != chan_msgs);
1284   uint64_t frag_id;
1285
1286   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1287                                                     &frag_id))
1288   {
1289     struct GNUNET_HashCode frag_id_hash;
1290     hash_key_from_hll (&frag_id_hash, frag_id);
1291     struct RecvCacheEntry *cache_entry
1292       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1293     if (cache_entry != NULL)
1294     {
1295       if (GNUNET_NO == drop)
1296       {
1297         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1298       }
1299       if (cache_entry->ref_count <= 1)
1300       {
1301         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1302                                               cache_entry);
1303         GNUNET_free (cache_entry->mmsg);
1304         GNUNET_free (cache_entry);
1305       }
1306       else
1307       {
1308         cache_entry->ref_count--;
1309       }
1310     }
1311 #if CACHE_AGING_IMPLEMENTED
1312     else if (GNUNET_NO == drop)
1313     {
1314       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1315     }
1316 #endif
1317
1318     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1319   }
1320
1321   if (MSG_FRAG_STATE_END <= fragq->state)
1322   {
1323     struct GNUNET_HashCode msg_id_hash;
1324     hash_key_from_hll (&msg_id_hash, msg_id);
1325
1326     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1327     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1328     GNUNET_free (fragq);
1329   }
1330   else
1331   {
1332     fragq->is_queued = GNUNET_NO;
1333   }
1334 }
1335
1336
1337 struct StateModifyClosure
1338 {
1339   struct Channel *channel;
1340   uint64_t msg_id;
1341   struct GNUNET_HashCode msg_id_hash;
1342 };
1343
1344
1345 void
1346 store_recv_state_modify_result (void *cls, int64_t result,
1347                                 const char *err_msg, uint16_t err_msg_size)
1348 {
1349   struct StateModifyClosure *mcls = cls;
1350   struct Channel *chn = mcls->channel;
1351   uint64_t msg_id = mcls->msg_id;
1352
1353   struct FragmentQueue *
1354     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1355
1356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1358               chn, result, err_msg_size, err_msg);
1359
1360   switch (result)
1361   {
1362   case GNUNET_OK:
1363   case GNUNET_NO:
1364     if (NULL != fragq)
1365       fragq->state_is_modified = GNUNET_YES;
1366     if (chn->max_state_message_id < msg_id)
1367       chn->max_state_message_id = msg_id;
1368     if (chn->max_message_id < msg_id)
1369       chn->max_message_id = msg_id;
1370
1371     if (NULL != fragq)
1372       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1373     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1374     message_queue_run (chn);
1375     break;
1376
1377   default:
1378     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1379                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1380                 chn, result, err_msg_size, err_msg);
1381     /** @todo FIXME: handle state_modify error */
1382   }
1383 }
1384
1385
1386 /**
1387  * Run message queue.
1388  *
1389  * Send messages in queue to client in order after a message has arrived from
1390  * multicast, according to the following:
1391  * - A message is only sent if all of its modifiers arrived.
1392  * - A stateful message is only sent if the previous stateful message
1393  *   has already been delivered to the client.
1394  *
1395  * @param chn  Channel.
1396  *
1397  * @return Number of messages removed from queue and sent to client.
1398  */
1399 static uint64_t
1400 message_queue_run (struct Channel *chn)
1401 {
1402   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403               "%p Running message queue.\n", chn);
1404   uint64_t n = 0;
1405   uint64_t msg_id;
1406
1407   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1408                                                     &msg_id))
1409   {
1410     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1411                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1412     struct GNUNET_HashCode msg_id_hash;
1413     hash_key_from_hll (&msg_id_hash, msg_id);
1414
1415     struct FragmentQueue *
1416       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1417
1418     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1419     {
1420       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421                   "%p No fragq (%p) or header not complete.\n",
1422                   chn, fragq);
1423       break;
1424     }
1425
1426     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427                 "%p Fragment queue entry:  state: %u, state delta: "
1428                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1429                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1430
1431     if (MSG_FRAG_STATE_DATA <= fragq->state)
1432     {
1433       /* Check if there's a missing message before the current one */
1434       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1435       {
1436         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1437
1438         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1439             && (chn->max_message_id != msg_id - 1
1440                 && chn->max_message_id != msg_id))
1441         {
1442           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443                       "%p Out of order message. "
1444                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1445                       chn, chn->max_message_id, msg_id);
1446           break;
1447           // FIXME: keep track of messages processed in this queue run,
1448           //        and only stop after reaching the end
1449         }
1450       }
1451       else
1452       {
1453         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1454         if (GNUNET_YES != fragq->state_is_modified)
1455         {
1456           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1457           {
1458             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1459                         "%p Out of order stateful message. "
1460                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1461                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1462             break;
1463             // FIXME: keep track of messages processed in this queue run,
1464             //        and only stop after reaching the end
1465           }
1466
1467           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1468           mcls->channel = chn;
1469           mcls->msg_id = msg_id;
1470           mcls->msg_id_hash = msg_id_hash;
1471
1472           /* Apply modifiers to state in PSYCstore */
1473           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1474                                          fragq->state_delta,
1475                                          store_recv_state_modify_result, mcls);
1476           break; // continue after asynchronous state modify result
1477         }
1478       }
1479       chn->max_message_id = msg_id;
1480     }
1481     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1482     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1483     n++;
1484   }
1485
1486   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1488   return n;
1489 }
1490
1491
1492 /**
1493  * Drop message queue of a channel.
1494  *
1495  * Remove all messages in queue without sending it to clients.
1496  *
1497  * @param chn  Channel.
1498  *
1499  * @return Number of messages removed from queue.
1500  */
1501 static uint64_t
1502 message_queue_drop (struct Channel *chn)
1503 {
1504   uint64_t n = 0;
1505   uint64_t msg_id;
1506   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1507                                                     &msg_id))
1508   {
1509     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1511     struct GNUNET_HashCode msg_id_hash;
1512     hash_key_from_hll (&msg_id_hash, msg_id);
1513
1514     struct FragmentQueue *
1515       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1516     GNUNET_assert (NULL != fragq);
1517     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1518     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1519     n++;
1520   }
1521   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1523   return n;
1524 }
1525
1526
1527 /**
1528  * Received result of GNUNET_PSYCSTORE_fragment_store().
1529  */
1530 static void
1531 store_recv_fragment_store_result (void *cls, int64_t result,
1532                                   const char *err_msg, uint16_t err_msg_size)
1533 {
1534   struct Channel *chn = cls;
1535   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1537               chn, result, err_msg_size, err_msg);
1538 }
1539
1540
1541 /**
1542  * Handle incoming message fragment from multicast.
1543  *
1544  * Store it using PSYCstore and send it to the clients of the channel in order.
1545  */
1546 static void
1547 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1548 {
1549   struct Channel *chn = cls;
1550   uint16_t size = ntohs (mmsg->header.size);
1551
1552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553               "%p Received multicast message of size %u. "
1554               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1555               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1556               chn, size,
1557               GNUNET_ntohll (mmsg->fragment_id),
1558               GNUNET_ntohll (mmsg->message_id),
1559               GNUNET_ntohll (mmsg->fragment_offset),
1560               GNUNET_ntohll (mmsg->flags));
1561
1562   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1563                                    &store_recv_fragment_store_result, chn);
1564
1565   uint16_t first_ptype = 0, last_ptype = 0;
1566   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1567                                                (const char *) &mmsg[1],
1568                                                &first_ptype, &last_ptype);
1569   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570               "%p Message check result %d, first part type %u, last part type %u\n",
1571               chn, check, first_ptype, last_ptype);
1572   if (GNUNET_SYSERR == check)
1573   {
1574     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1575                 "%p Dropping incoming multicast message with invalid parts.\n",
1576                 chn);
1577     GNUNET_break_op (0);
1578     return;
1579   }
1580
1581   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1582   message_queue_run (chn);
1583 }
1584
1585
1586 /**
1587  * Incoming request fragment from multicast for a master.
1588  *
1589  * @param cls   Master.
1590  * @param req   The request.
1591  */
1592 static void
1593 mcast_recv_request (void *cls,
1594                     const struct GNUNET_MULTICAST_RequestHeader *req)
1595 {
1596   struct Master *mst = cls;
1597   uint16_t size = ntohs (req->header.size);
1598
1599   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601               "%p Received multicast request of size %u from %s.\n",
1602               mst, size, str);
1603   GNUNET_free (str);
1604
1605   uint16_t first_ptype = 0, last_ptype = 0;
1606   if (GNUNET_SYSERR
1607       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1608                                           (const char *) &req[1],
1609                                           &first_ptype, &last_ptype))
1610   {
1611     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1612                 "%p Dropping incoming multicast request with invalid parts.\n",
1613                 mst);
1614     GNUNET_break_op (0);
1615     return;
1616   }
1617
1618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619               "Message parts: first: type %u, last: type %u\n",
1620               first_ptype, last_ptype);
1621
1622   /* FIXME: in-order delivery */
1623   client_send_mcast_req (mst, req);
1624 }
1625
1626
1627 /**
1628  * Response from PSYCstore with the current counter values for a channel master.
1629  */
1630 static void
1631 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1632                             uint64_t max_message_id, uint64_t max_group_generation,
1633                             uint64_t max_state_message_id)
1634 {
1635   struct Master *mst = cls;
1636   struct Channel *chn = &mst->channel;
1637   chn->store_op = NULL;
1638
1639   struct GNUNET_PSYC_CountersResultMessage res;
1640   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1641   res.header.size = htons (sizeof (res));
1642   res.result_code = htonl (result);
1643   res.max_message_id = GNUNET_htonll (max_message_id);
1644
1645   if (GNUNET_OK == result || GNUNET_NO == result)
1646   {
1647     mst->max_message_id = max_message_id;
1648     chn->max_message_id = max_message_id;
1649     chn->max_state_message_id = max_state_message_id;
1650     mst->max_group_generation = max_group_generation;
1651     mst->origin
1652       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1653                                        mcast_recv_join_request,
1654                                        mcast_recv_replay_fragment,
1655                                        mcast_recv_replay_message,
1656                                        mcast_recv_request,
1657                                        mcast_recv_message, chn);
1658     chn->is_ready = GNUNET_YES;
1659   }
1660   else
1661   {
1662     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1663                 "%p GNUNET_PSYCSTORE_counters_get() "
1664                 "returned %d for channel %s.\n",
1665                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1666   }
1667
1668   client_send_msg (chn, &res.header);
1669 }
1670
1671
1672 /**
1673  * Response from PSYCstore with the current counter values for a channel slave.
1674  */
1675 void
1676 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1677                            uint64_t max_message_id, uint64_t max_group_generation,
1678                            uint64_t max_state_message_id)
1679 {
1680   struct Slave *slv = cls;
1681   struct Channel *chn = &slv->channel;
1682   chn->store_op = NULL;
1683
1684   struct GNUNET_PSYC_CountersResultMessage res;
1685   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1686   res.header.size = htons (sizeof (res));
1687   res.result_code = htonl (result);
1688   res.max_message_id = GNUNET_htonll (max_message_id);
1689
1690   if (GNUNET_YES == result || GNUNET_NO == result)
1691   {
1692     chn->max_message_id = max_message_id;
1693     chn->max_state_message_id = max_state_message_id;
1694     slv->member
1695       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1696                                       &slv->origin,
1697                                       slv->relay_count, slv->relays,
1698                                       &slv->join_msg->header,
1699                                       mcast_recv_join_request,
1700                                       mcast_recv_join_decision,
1701                                       mcast_recv_replay_fragment,
1702                                       mcast_recv_replay_message,
1703                                       mcast_recv_message, chn);
1704     if (NULL != slv->join_msg)
1705     {
1706       GNUNET_free (slv->join_msg);
1707       slv->join_msg = NULL;
1708     }
1709   }
1710   else
1711   {
1712     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1713                 "%p GNUNET_PSYCSTORE_counters_get() "
1714                 "returned %d for channel %s.\n",
1715                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1716   }
1717
1718   client_send_msg (chn, &res.header);
1719 }
1720
1721
1722 static void
1723 channel_init (struct Channel *chn)
1724 {
1725   chn->recv_msgs
1726     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1727   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1728 }
1729
1730
1731 /**
1732  * Handle a connecting client starting a channel master.
1733  */
1734 static void
1735 handle_client_master_start (void *cls,
1736                             const struct MasterStartRequest *req)
1737 {
1738   struct Client *c = cls;
1739   struct GNUNET_SERVICE_Client *client = c->client;
1740
1741   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1742   struct GNUNET_HashCode pub_key_hash;
1743
1744   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1745   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1746
1747   struct Master *
1748     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1749   struct Channel *chn;
1750
1751   if (NULL == mst)
1752   {
1753     mst = GNUNET_malloc (sizeof (*mst));
1754     mst->policy = ntohl (req->policy);
1755     mst->priv_key = req->channel_key;
1756     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1757
1758     chn = c->channel = &mst->channel;
1759     chn->master = mst;
1760     chn->is_master = GNUNET_YES;
1761     chn->pub_key = pub_key;
1762     chn->pub_key_hash = pub_key_hash;
1763     channel_init (chn);
1764
1765     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1766                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1767     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1768                                                    store_recv_master_counters, mst);
1769   }
1770   else
1771   {
1772     chn = &mst->channel;
1773
1774     struct GNUNET_PSYC_CountersResultMessage *res;
1775     struct GNUNET_MQ_Envelope *
1776       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1777     res->result_code = htonl (GNUNET_OK);
1778     res->max_message_id = GNUNET_htonll (mst->max_message_id);
1779
1780     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1781   }
1782
1783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784               "%p Client connected as master to channel %s.\n",
1785               mst, GNUNET_h2s (&chn->pub_key_hash));
1786
1787   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1788   cli->client = client;
1789   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1790
1791   GNUNET_SERVICE_client_continue (client);
1792 }
1793
1794
1795 static int
1796 check_client_slave_join (void *cls,
1797                          const struct SlaveJoinRequest *req)
1798 {
1799   return GNUNET_OK;
1800 }
1801
1802
1803 /**
1804  * Handle a connecting client joining as a channel slave.
1805  */
1806 static void
1807 handle_client_slave_join (void *cls,
1808                           const struct SlaveJoinRequest *req)
1809 {
1810   struct Client *c = cls;
1811   struct GNUNET_SERVICE_Client *client = c->client;
1812
1813   uint16_t req_size = ntohs (req->header.size);
1814
1815   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1816   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1817
1818   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819               "got join request from client %p\n",
1820               client);
1821   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1822   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1823   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1824
1825   struct GNUNET_CONTAINER_MultiHashMap *
1826     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1827   struct Slave *slv = NULL;
1828   struct Channel *chn;
1829
1830   if (NULL != chn_slv)
1831   {
1832     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1833   }
1834   if (NULL == slv)
1835   {
1836     slv = GNUNET_malloc (sizeof (*slv));
1837     slv->priv_key = req->slave_key;
1838     slv->pub_key = slv_pub_key;
1839     slv->pub_key_hash = slv_pub_hash;
1840     slv->origin = req->origin;
1841     slv->relay_count = ntohl (req->relay_count);
1842     slv->join_flags = ntohl (req->flags);
1843
1844     const struct GNUNET_PeerIdentity *
1845       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1846     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1847     uint16_t join_msg_size = 0;
1848
1849     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1850         <= req_size)
1851     {
1852       struct GNUNET_PSYC_Message *
1853         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1854       join_msg_size = ntohs (join_msg->header.size);
1855       slv->join_msg = GNUNET_malloc (join_msg_size);
1856       GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1857     }
1858     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1859     {
1860       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1861                   "%u + %u + %u != %u\n",
1862                   (unsigned int) sizeof (*req),
1863                   relay_size,
1864                   join_msg_size,
1865                   req_size);
1866       GNUNET_break (0);
1867       GNUNET_SERVICE_client_drop (client);
1868       GNUNET_free (slv);
1869       return;
1870     }
1871     if (0 < slv->relay_count)
1872     {
1873       slv->relays = GNUNET_malloc (relay_size);
1874       GNUNET_memcpy (slv->relays, &req[1], relay_size);
1875     }
1876
1877     chn = c->channel = &slv->channel;
1878     chn->slave = slv;
1879     chn->is_master = GNUNET_NO;
1880     chn->pub_key = req->channel_pub_key;
1881     chn->pub_key_hash = pub_key_hash;
1882     channel_init (chn);
1883
1884     if (NULL == chn_slv)
1885     {
1886       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1887       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1888                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1889     }
1890     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1891                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1892     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1893                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1894     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1895                                                    &store_recv_slave_counters, slv);
1896   }
1897   else
1898   {
1899     chn = &slv->channel;
1900
1901     struct GNUNET_PSYC_CountersResultMessage *res;
1902
1903     struct GNUNET_MQ_Envelope *
1904       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1905     res->result_code = htonl (GNUNET_OK);
1906     res->max_message_id = GNUNET_htonll (chn->max_message_id);
1907
1908     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1909
1910     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1911     {
1912       mcast_recv_join_decision (slv, GNUNET_YES,
1913                                 NULL, 0, NULL, NULL);
1914     }
1915     else if (NULL == slv->member)
1916     {
1917       slv->member
1918         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1919                                         &slv->origin,
1920                                         slv->relay_count, slv->relays,
1921                                         &slv->join_msg->header,
1922                                         &mcast_recv_join_request,
1923                                         &mcast_recv_join_decision,
1924                                         &mcast_recv_replay_fragment,
1925                                         &mcast_recv_replay_message,
1926                                         &mcast_recv_message, chn);
1927       if (NULL != slv->join_msg)
1928       {
1929         GNUNET_free (slv->join_msg);
1930         slv->join_msg = NULL;
1931       }
1932     }
1933     else if (NULL != slv->join_dcsn)
1934     {
1935       struct GNUNET_MQ_Envelope *
1936         env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1937       GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1938     }
1939   }
1940
1941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1942               "Client %p connected as slave to channel %s.\n",
1943               client,
1944               GNUNET_h2s (&chn->pub_key_hash));
1945
1946   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1947   cli->client = client;
1948   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1949
1950   GNUNET_SERVICE_client_continue (client);
1951 }
1952
1953
1954 struct JoinDecisionClosure
1955 {
1956   int32_t is_admitted;
1957   struct GNUNET_MessageHeader *msg;
1958 };
1959
1960
1961 /**
1962  * Iterator callback for sending join decisions to multicast.
1963  */
1964 static int
1965 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1966                           void *value)
1967 {
1968   struct JoinDecisionClosure *jcls = cls;
1969   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1970   // FIXME: add relays
1971   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1972   return GNUNET_YES;
1973 }
1974
1975
1976 static int
1977 check_client_join_decision (void *cls,
1978                             const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1979 {
1980   return GNUNET_OK;
1981 }
1982
1983
1984 /**
1985  * Join decision from client.
1986  */
1987 static void
1988 handle_client_join_decision (void *cls,
1989                              const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1990 {
1991   struct Client *c = cls;
1992   struct GNUNET_SERVICE_Client *client = c->client;
1993   struct Channel *chn = c->channel;
1994   if (NULL == chn)
1995   {
1996     GNUNET_break (0);
1997     GNUNET_SERVICE_client_drop (client);
1998     return;
1999   }
2000   GNUNET_assert (GNUNET_YES == chn->is_master);
2001   struct Master *mst = chn->master;
2002
2003   struct JoinDecisionClosure jcls;
2004   jcls.is_admitted = ntohl (dcsn->is_admitted);
2005   jcls.msg
2006     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2007     ? (struct GNUNET_MessageHeader *) &dcsn[1]
2008     : NULL;
2009
2010   struct GNUNET_HashCode slave_pub_hash;
2011   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2012                       &slave_pub_hash);
2013
2014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2015               "%p Got join decision (%d) from client for channel %s..\n",
2016               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2018               "%p ..and slave %s.\n",
2019               mst, GNUNET_h2s (&slave_pub_hash));
2020
2021   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2022                                               &mcast_send_join_decision, &jcls);
2023   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2024   GNUNET_SERVICE_client_continue (client);
2025 }
2026
2027
2028 static void
2029 channel_part_cb (void *cls)
2030 {
2031   struct GNUNET_SERVICE_Client *client = cls;
2032   struct GNUNET_MQ_Envelope *env;
2033
2034   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2035   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2036                   env);
2037 }
2038
2039
2040 static void
2041 handle_client_part_request (void *cls,
2042                             const struct GNUNET_MessageHeader *msg)
2043 {
2044   struct Client *c = cls;
2045
2046   c->channel->is_disconnecting = GNUNET_YES;
2047   if (GNUNET_YES == c->channel->is_master)
2048   {
2049     struct Master *mst = (struct Master *) c->channel;
2050    
2051     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052                 "Got part request from master %p\n",
2053                 mst);
2054     GNUNET_assert (NULL != mst->origin);
2055     GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2056   }
2057   else
2058   {
2059     struct Slave *slv = (struct Slave *) c->channel;
2060
2061     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062                 "Got part request from slave %p\n",
2063                 slv);
2064     GNUNET_assert (NULL != slv->member);
2065     GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2066   }
2067   GNUNET_SERVICE_client_continue (c->client);
2068 }
2069
2070
2071 /**
2072  * Send acknowledgement to a client.
2073  *
2074  * Sent after a message fragment has been passed on to multicast.
2075  *
2076  * @param chn The channel struct for the client.
2077  */
2078 static void
2079 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2080 {
2081   struct GNUNET_MessageHeader *res;
2082   struct GNUNET_MQ_Envelope *
2083       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2084
2085   /* FIXME? */
2086   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2087 }
2088
2089
2090 /**
2091  * Callback for the transmit functions of multicast.
2092  */
2093 static int
2094 transmit_notify (void *cls, size_t *data_size, void *data)
2095 {
2096   struct Channel *chn = cls;
2097   struct TransmitMessage *tmit_msg = chn->tmit_head;
2098
2099   if (NULL == tmit_msg || *data_size < tmit_msg->size)
2100   {
2101     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2102                 "%p transmit_notify: nothing to send.\n", chn);
2103     if (NULL != tmit_msg && *data_size < tmit_msg->size)
2104       GNUNET_break (0);
2105     *data_size = 0;
2106     return GNUNET_NO;
2107   }
2108
2109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2111
2112   *data_size = tmit_msg->size;
2113   GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2114
2115   int ret
2116     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2117     ? GNUNET_NO
2118     : GNUNET_YES;
2119
2120   /* FIXME: handle disconnecting clients */
2121   if (NULL != tmit_msg->client)
2122     send_message_ack (chn, tmit_msg->client);
2123
2124   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2125
2126   if (NULL != chn->tmit_head)
2127   {
2128     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2129   }
2130   else if (GNUNET_YES == chn->is_disconnecting
2131            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2132   {
2133     /* FIXME: handle partial message (when still in_transmit) */
2134     GNUNET_free (tmit_msg);
2135     return GNUNET_SYSERR;
2136   }
2137   GNUNET_free (tmit_msg);
2138   return ret;
2139 }
2140
2141
2142 /**
2143  * Callback for the transmit functions of multicast.
2144  */
2145 static int
2146 master_transmit_notify (void *cls, size_t *data_size, void *data)
2147 {
2148   int ret = transmit_notify (cls, data_size, data);
2149
2150   if (GNUNET_YES == ret)
2151   {
2152     struct Master *mst = cls;
2153     mst->tmit_handle = NULL;
2154   }
2155   return ret;
2156 }
2157
2158
2159 /**
2160  * Callback for the transmit functions of multicast.
2161  */
2162 static int
2163 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2164 {
2165   int ret = transmit_notify (cls, data_size, data);
2166
2167   if (GNUNET_YES == ret)
2168   {
2169     struct Slave *slv = cls;
2170     slv->tmit_handle = NULL;
2171   }
2172   return ret;
2173 }
2174
2175
2176 /**
2177  * Transmit a message from a channel master to the multicast group.
2178  */
2179 static void
2180 master_transmit_message (struct Master *mst)
2181 {
2182   struct Channel *chn = &mst->channel;
2183   struct TransmitMessage *tmit_msg = chn->tmit_head;
2184   if (NULL == tmit_msg)
2185     return;
2186   if (NULL == mst->tmit_handle)
2187   {
2188     mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2189                                                        tmit_msg->id,
2190                                                        mst->max_group_generation,
2191                                                        &master_transmit_notify,
2192                                                        mst);
2193   }
2194   else
2195   {
2196     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2197   }
2198 }
2199
2200
2201 /**
2202  * Transmit a message from a channel slave to the multicast group.
2203  */
2204 static void
2205 slave_transmit_message (struct Slave *slv)
2206 {
2207   if (NULL == slv->channel.tmit_head)
2208     return;
2209   if (NULL == slv->tmit_handle)
2210   {
2211     slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2212                                                           slv->channel.tmit_head->id,
2213                                                           &slave_transmit_notify,
2214                                                           slv);
2215   }
2216   else
2217   {
2218     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2219   }
2220 }
2221
2222
2223 static void
2224 transmit_message (struct Channel *chn)
2225 {
2226   chn->is_master
2227     ? master_transmit_message (chn->master)
2228     : slave_transmit_message (chn->slave);
2229 }
2230
2231
2232 /**
2233  * Queue a message from a channel master for sending to the multicast group.
2234  */
2235 static void
2236 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2237 {
2238   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2239   {
2240     tmit_msg->id = ++mst->max_message_id;
2241     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2243                 mst, tmit_msg->id);
2244     struct GNUNET_PSYC_MessageMethod *pmeth
2245       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2246
2247     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2248     {
2249       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2250     }
2251     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2252     {
2253       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2255                   mst, tmit_msg->id - mst->max_state_message_id);
2256       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2257                                           - mst->max_state_message_id);
2258       mst->max_state_message_id = tmit_msg->id;
2259     }
2260     else
2261     {
2262         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2263                     "%p master_queue_message: state not modified\n", mst);
2264       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2265     }
2266
2267     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2268     {
2269       /// @todo add state_hash to PSYC header
2270     }
2271   }
2272 }
2273
2274
2275 /**
2276  * Queue a message from a channel slave for sending to the multicast group.
2277  */
2278 static void
2279 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2280 {
2281   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2282   {
2283     struct GNUNET_PSYC_MessageMethod *pmeth
2284       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2285     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2286     tmit_msg->id = ++slv->max_request_id;
2287   }
2288 }
2289
2290
2291 /**
2292  * Queue PSYC message parts for sending to multicast.
2293  *
2294  * @param chn
2295  *        Channel to send to.
2296  * @param client
2297  *        Client the message originates from.
2298  * @param data_size
2299  *        Size of @a data.
2300  * @param data
2301  *        Concatenated message parts.
2302  * @param first_ptype
2303  *        First message part type in @a data.
2304  * @param last_ptype
2305  *        Last message part type in @a data.
2306  */
2307 static struct TransmitMessage *
2308 queue_message (struct Channel *chn,
2309                struct GNUNET_SERVICE_Client *client,
2310                size_t data_size,
2311                const void *data,
2312                uint16_t first_ptype, uint16_t last_ptype)
2313 {
2314   struct TransmitMessage *
2315     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2316   GNUNET_memcpy (&tmit_msg[1], data, data_size);
2317   tmit_msg->client = client;
2318   tmit_msg->size = data_size;
2319   tmit_msg->first_ptype = first_ptype;
2320   tmit_msg->last_ptype = last_ptype;
2321
2322   /* FIXME: separate queue per message ID */
2323
2324   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2325
2326   chn->is_master
2327     ? master_queue_message (chn->master, tmit_msg)
2328     : slave_queue_message (chn->slave, tmit_msg);
2329   return tmit_msg;
2330 }
2331
2332
2333 /**
2334  * Cancel transmission of current message.
2335  *
2336  * @param chn     Channel to send to.
2337  * @param client  Client the message originates from.
2338  */
2339 static void
2340 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2341 {
2342   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2343
2344   struct GNUNET_MessageHeader msg;
2345   msg.size = htons (sizeof (msg));
2346   msg.type = htons (type);
2347
2348   queue_message (chn, client, sizeof (msg), &msg, type, type);
2349   transmit_message (chn);
2350
2351   /* FIXME: cleanup */
2352 }
2353
2354
2355 static int
2356 check_client_psyc_message (void *cls,
2357                            const struct GNUNET_MessageHeader *msg)
2358 {
2359   return GNUNET_OK;
2360 }
2361
2362
2363 /**
2364  * Incoming message from a master or slave client.
2365  */
2366 static void
2367 handle_client_psyc_message (void *cls,
2368                             const struct GNUNET_MessageHeader *msg)
2369 {
2370   struct Client *c = cls;
2371   struct GNUNET_SERVICE_Client *client = c->client;
2372   struct Channel *chn = c->channel;
2373   if (NULL == chn)
2374   {
2375     GNUNET_break (0);
2376     GNUNET_SERVICE_client_drop (client);
2377     return;
2378   }
2379
2380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381               "%p Received message from client.\n", chn);
2382   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2383
2384   if (GNUNET_YES != chn->is_ready)
2385   {
2386     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2387                 "%p Channel is not ready yet, disconnecting client %p.\n",
2388                 chn,
2389                 client);
2390     GNUNET_break (0);
2391     GNUNET_SERVICE_client_drop (client);
2392     return;
2393   }
2394
2395   uint16_t size = ntohs (msg->size);
2396   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2397   {
2398     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2399                 "%p Message payload too large: %u < %u.\n",
2400                 chn,
2401                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2402                 (unsigned int) (size - sizeof (*msg)));
2403     GNUNET_break (0);
2404     transmit_cancel (chn, client);
2405     GNUNET_SERVICE_client_drop (client);
2406     return;
2407   }
2408
2409   uint16_t first_ptype = 0, last_ptype = 0;
2410   if (GNUNET_SYSERR
2411       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2412                                           (const char *) &msg[1],
2413                                           &first_ptype, &last_ptype))
2414   {
2415     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2416                 "%p Received invalid message part from client.\n", chn);
2417     GNUNET_break (0);
2418     transmit_cancel (chn, client);
2419     GNUNET_SERVICE_client_drop (client);
2420     return;
2421   }
2422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2423               "%p Received message with first part type %u and last part type %u.\n",
2424               chn, first_ptype, last_ptype);
2425
2426   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2427                  first_ptype, last_ptype);
2428   transmit_message (chn);
2429   /* FIXME: send a few ACKs even before transmit_notify is called */
2430
2431   GNUNET_SERVICE_client_continue (client);
2432 };
2433
2434
2435 /**
2436  * Received result of GNUNET_PSYCSTORE_membership_store()
2437  */
2438 static void
2439 store_recv_membership_store_result (void *cls,
2440                                     int64_t result,
2441                                     const char *err_msg,
2442                                     uint16_t err_msg_size)
2443 {
2444   struct Operation *op = cls;
2445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2447               op->channel,
2448               result,
2449               (int) err_msg_size,
2450               err_msg);
2451
2452   if (NULL != op->client)
2453     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2454   op_remove (op);
2455 }
2456
2457
2458 /**
2459  * Client requests to add/remove a slave in the membership database.
2460  */
2461 static void
2462 handle_client_membership_store (void *cls,
2463                                 const struct ChannelMembershipStoreRequest *req)
2464 {
2465   struct Client *c = cls;
2466   struct GNUNET_SERVICE_Client *client = c->client;
2467   struct Channel *chn = c->channel;
2468   if (NULL == chn)
2469   {
2470     GNUNET_break (0);
2471     GNUNET_SERVICE_client_drop (client);
2472     return;
2473   }
2474
2475   struct Operation *op = op_add (chn, client, req->op_id, 0);
2476
2477   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2478   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480               "%p Received membership store request from client.\n", chn);
2481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2483               chn, req->did_join, announced_at, effective_since);
2484
2485   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2486                                      req->did_join, announced_at, effective_since,
2487                                      0, /* FIXME: group_generation */
2488                                      &store_recv_membership_store_result, op);
2489   GNUNET_SERVICE_client_continue (client);
2490 }
2491
2492
2493 /**
2494  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2495  * in response to a history request from a client.
2496  */
2497 static int
2498 store_recv_fragment_history (void *cls,
2499                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2500                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2501 {
2502   struct Operation *op = cls;
2503   if (NULL == op->client)
2504   { /* Requesting client already disconnected. */
2505     return GNUNET_NO;
2506   }
2507   struct Channel *chn = op->channel;
2508
2509   struct GNUNET_PSYC_MessageHeader *pmsg;
2510   uint16_t msize = ntohs (mmsg->header.size);
2511   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2512
2513   struct GNUNET_OperationResultMessage *
2514     res = GNUNET_malloc (sizeof (*res) + psize);
2515   res->header.size = htons (sizeof (*res) + psize);
2516   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2517   res->op_id = op->op_id;
2518   res->result_code = GNUNET_htonll (GNUNET_OK);
2519
2520   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2521   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2522   GNUNET_memcpy (&res[1], pmsg, psize);
2523
2524   /** @todo FIXME: send only to requesting client */
2525   client_send_msg (chn, &res->header);
2526
2527   GNUNET_free (res);
2528   return GNUNET_YES;
2529 }
2530
2531
2532 /**
2533  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2534  * in response to a history request from a client.
2535  */
2536 static void
2537 store_recv_fragment_history_result (void *cls, int64_t result,
2538                                     const char *err_msg, uint16_t err_msg_size)
2539 {
2540   struct Operation *op = cls;
2541   if (NULL == op->client)
2542   { /* Requesting client already disconnected. */
2543     return;
2544   }
2545
2546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2547               "%p History replay #%" PRIu64 ": "
2548               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2549               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2550
2551   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2552   {
2553     /** @todo Multicast replay request for messages not found locally. */
2554   }
2555
2556   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2557   op_remove (op);
2558 }
2559
2560
2561 static int
2562 check_client_history_replay (void *cls,
2563                              const struct GNUNET_PSYC_HistoryRequestMessage *req)
2564 {
2565   return GNUNET_OK;
2566 }
2567
2568
2569 /**
2570  * Client requests channel history.
2571  */
2572 static void
2573 handle_client_history_replay (void *cls,
2574                               const struct GNUNET_PSYC_HistoryRequestMessage *req)
2575 {
2576   struct Client *c = cls;
2577   struct GNUNET_SERVICE_Client *client = c->client;
2578   struct Channel *chn = c->channel;
2579   if (NULL == chn)
2580   {
2581     GNUNET_break (0);
2582     GNUNET_SERVICE_client_drop (client);
2583     return;
2584   }
2585
2586   uint16_t size = ntohs (req->header.size);
2587   const char *method_prefix = (const char *) &req[1];
2588
2589   if (size < sizeof (*req) + 1
2590       || '\0' != method_prefix[size - sizeof (*req) - 1])
2591   {
2592     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2593                 "%p History replay #%" PRIu64 ": "
2594                 "invalid method prefix. size: %u < %u?\n",
2595                 chn,
2596                 GNUNET_ntohll (req->op_id),
2597                 size,
2598                 (unsigned int) sizeof (*req) + 1);
2599     GNUNET_break (0);
2600     GNUNET_SERVICE_client_drop (client);
2601     return;
2602   }
2603
2604   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2605
2606   if (0 == req->message_limit)
2607   {
2608     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2609                                   GNUNET_ntohll (req->start_message_id),
2610                                   GNUNET_ntohll (req->end_message_id),
2611                                   0, method_prefix,
2612                                   &store_recv_fragment_history,
2613                                   &store_recv_fragment_history_result, op);
2614   }
2615   else
2616   {
2617     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2618                                          GNUNET_ntohll (req->message_limit),
2619                                          method_prefix,
2620                                          &store_recv_fragment_history,
2621                                          &store_recv_fragment_history_result,
2622                                          op);
2623   }
2624   GNUNET_SERVICE_client_continue (client);
2625 }
2626
2627
2628 /**
2629  * Received state var from PSYCstore, send it to client.
2630  */
2631 static int
2632 store_recv_state_var (void *cls, const char *name,
2633                       const void *value, uint32_t value_size)
2634 {
2635   struct Operation *op = cls;
2636   struct GNUNET_OperationResultMessage *res;
2637   struct GNUNET_MQ_Envelope *env;
2638
2639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2640               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2641               op->channel, GNUNET_ntohll (op->op_id), name);
2642
2643   if (NULL != name) /* First part */
2644   {
2645     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2646     struct GNUNET_PSYC_MessageModifier *mod;
2647     env = GNUNET_MQ_msg_extra (res,
2648                                sizeof (*mod) + name_size + value_size,
2649                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2650     res->op_id = op->op_id;
2651
2652     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2653     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2654     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2655     mod->name_size = htons (name_size);
2656     mod->value_size = htonl (value_size);
2657     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2658     GNUNET_memcpy (&mod[1], name, name_size);
2659     GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2660   }
2661   else /* Continuation */
2662   {
2663     struct GNUNET_MessageHeader *mod;
2664     env = GNUNET_MQ_msg_extra (res,
2665                                sizeof (*mod) + value_size,
2666                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2667     res->op_id = op->op_id;
2668
2669     mod = (struct GNUNET_MessageHeader *) &res[1];
2670     mod->size = htons (sizeof (*mod) + value_size);
2671     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2672     GNUNET_memcpy (&mod[1], value, value_size);
2673   }
2674
2675   // FIXME: client might have been disconnected
2676   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2677   return GNUNET_YES;
2678 }
2679
2680
2681 /**
2682  * Received result of GNUNET_PSYCSTORE_state_get()
2683  * or GNUNET_PSYCSTORE_state_get_prefix()
2684  */
2685 static void
2686 store_recv_state_result (void *cls, int64_t result,
2687                          const char *err_msg, uint16_t err_msg_size)
2688 {
2689   struct Operation *op = cls;
2690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2691               "%p state_get #%" PRIu64 ": "
2692               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2693               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2694
2695   // FIXME: client might have been disconnected
2696   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2697   op_remove (op);
2698 }
2699
2700
2701 static int
2702 check_client_state_get (void *cls,
2703                          const struct StateRequest *req)
2704 {
2705   struct Client *c = cls;
2706   struct Channel *chn = c->channel;
2707   if (NULL == chn)
2708   {
2709     GNUNET_break (0);
2710     return GNUNET_SYSERR;
2711   }
2712
2713   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2714   const char *name = (const char *) &req[1];
2715   if (0 == name_size || '\0' != name[name_size - 1])
2716   {
2717     GNUNET_break (0);
2718     return GNUNET_SYSERR;
2719   }
2720
2721   return GNUNET_OK;
2722 }
2723
2724
2725 /**
2726  * Client requests best matching state variable from PSYCstore.
2727  */
2728 static void
2729 handle_client_state_get (void *cls,
2730                          const struct StateRequest *req)
2731 {
2732   struct Client *c = cls;
2733   struct GNUNET_SERVICE_Client *client = c->client;
2734   struct Channel *chn = c->channel;
2735
2736   const char *name = (const char *) &req[1];
2737   struct Operation *op = op_add (chn, client, req->op_id, 0);
2738   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2739                               &store_recv_state_var,
2740                               &store_recv_state_result, op);
2741   GNUNET_SERVICE_client_continue (client);
2742 }
2743
2744
2745 static int
2746 check_client_state_get_prefix (void *cls,
2747                                const struct StateRequest *req)
2748 {
2749   struct Client *c = cls;
2750   struct Channel *chn = c->channel;
2751   if (NULL == chn)
2752   {
2753     GNUNET_break (0);
2754     return GNUNET_SYSERR;
2755   }
2756
2757   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2758   const char *name = (const char *) &req[1];
2759   if (0 == name_size || '\0' != name[name_size - 1])
2760   {
2761     GNUNET_break (0);
2762     return GNUNET_SYSERR;
2763   }
2764
2765   return GNUNET_OK;
2766 }
2767
2768
2769 /**
2770  * Client requests state variables with a given prefix from PSYCstore.
2771  */
2772 static void
2773 handle_client_state_get_prefix (void *cls,
2774                                 const struct StateRequest *req)
2775 {
2776   struct Client *c = cls;
2777   struct GNUNET_SERVICE_Client *client = c->client;
2778   struct Channel *chn = c->channel;
2779
2780   const char *name = (const char *) &req[1];
2781   struct Operation *op = op_add (chn, client, req->op_id, 0);
2782   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2783                                      &store_recv_state_var,
2784                                      &store_recv_state_result, op);
2785   GNUNET_SERVICE_client_continue (client);
2786 }
2787
2788
2789 /**
2790  * Initialize the PSYC service.
2791  *
2792  * @param cls Closure.
2793  * @param server The initialized server.
2794  * @param c Configuration to use.
2795  */
2796 static void
2797 run (void *cls,
2798      const struct GNUNET_CONFIGURATION_Handle *c,
2799      struct GNUNET_SERVICE_Handle *svc)
2800 {
2801   cfg = c;
2802   service = svc;
2803   store = GNUNET_PSYCSTORE_connect (cfg);
2804   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2805   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2806   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2807   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2808   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2809   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2810 }
2811
2812
2813 /**
2814  * Define "main" method using service macro.
2815  */
2816 GNUNET_SERVICE_MAIN
2817 ("psyc",
2818  GNUNET_SERVICE_OPTION_NONE,
2819  &run,
2820  &client_notify_connect,
2821  &client_notify_disconnect,
2822  NULL,
2823  GNUNET_MQ_hd_fixed_size (client_master_start,
2824                           GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2825                           struct MasterStartRequest,
2826                           NULL),
2827  GNUNET_MQ_hd_var_size (client_slave_join,
2828                         GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2829                         struct SlaveJoinRequest,
2830                         NULL),
2831  GNUNET_MQ_hd_var_size (client_join_decision,
2832                         GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2833                         struct GNUNET_PSYC_JoinDecisionMessage,
2834                         NULL),
2835  GNUNET_MQ_hd_fixed_size (client_part_request,
2836                           GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2837                           struct GNUNET_MessageHeader,
2838                           NULL),
2839  GNUNET_MQ_hd_var_size (client_psyc_message,
2840                         GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2841                         struct GNUNET_MessageHeader,
2842                         NULL),
2843  GNUNET_MQ_hd_fixed_size (client_membership_store,
2844                           GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2845                           struct ChannelMembershipStoreRequest,
2846                           NULL),
2847  GNUNET_MQ_hd_var_size (client_history_replay,
2848                         GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2849                         struct GNUNET_PSYC_HistoryRequestMessage,
2850                         NULL),
2851  GNUNET_MQ_hd_var_size (client_state_get,
2852                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2853                         struct StateRequest,
2854                         NULL),
2855  GNUNET_MQ_hd_var_size (client_state_get_prefix,
2856                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2857                         struct StateRequest,
2858                         NULL));
2859
2860 /* end of gnunet-service-psyc.c */