glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  */
15
16 /**
17  * @file psyc/gnunet-service-psyc.c
18  * @brief PSYC service
19  * @author Gabor X Toth
20  */
21
22 #include <inttypes.h>
23
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_constants.h"
27 #include "gnunet_protocols.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_multicast_service.h"
30 #include "gnunet_psycstore_service.h"
31 #include "gnunet_psyc_service.h"
32 #include "gnunet_psyc_util_lib.h"
33 #include "psyc.h"
34
35
36 /**
37  * Handle to our current configuration.
38  */
39 static const struct GNUNET_CONFIGURATION_Handle *cfg;
40
41 /**
42  * Service handle.
43  */
44 static struct GNUNET_SERVICE_Handle *service;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Handle to the PSYCstore.
53  */
54 static struct GNUNET_PSYCSTORE_Handle *store;
55
56 /**
57  * All connected masters.
58  * Channel's pub_key_hash -> struct Master
59  */
60 static struct GNUNET_CONTAINER_MultiHashMap *masters;
61
62 /**
63  * All connected slaves.
64  * Channel's pub_key_hash -> struct Slave
65  */
66 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
67
68 /**
69  * Connected slaves per channel.
70  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
71  */
72 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
73
74
75 /**
76  * Message in the transmission queue.
77  */
78 struct TransmitMessage
79 {
80   struct TransmitMessage *prev;
81   struct TransmitMessage *next;
82
83   struct GNUNET_SERVICE_Client *client;
84
85   /**
86    * ID assigned to the message.
87    */
88   uint64_t id;
89
90   /**
91    * Size of message.
92    */
93   uint16_t size;
94
95   /**
96    * Type of first message part.
97    */
98   uint16_t first_ptype;
99
100   /**
101    * Type of last message part.
102    */
103   uint16_t last_ptype;
104
105   /* Followed by message */
106 };
107
108
109 /**
110  * Cache for received message fragments.
111  * Message fragments are only sent to clients after all modifiers arrived.
112  *
113  * chan_key -> MultiHashMap chan_msgs
114  */
115 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
116
117
118 /**
119  * Entry in the chan_msgs hashmap of @a recv_cache:
120  * fragment_id -> RecvCacheEntry
121  */
122 struct RecvCacheEntry
123 {
124   struct GNUNET_MULTICAST_MessageHeader *mmsg;
125   uint16_t ref_count;
126 };
127
128
129 /**
130  * Entry in the @a recv_frags hash map of a @a Channel.
131  * message_id -> FragmentQueue
132  */
133 struct FragmentQueue
134 {
135   /**
136    * Fragment IDs stored in @a recv_cache.
137    */
138   struct GNUNET_CONTAINER_Heap *fragments;
139
140   /**
141    * Total size of received fragments.
142    */
143   uint64_t size;
144
145   /**
146    * Total size of received header fragments (METHOD & MODIFIERs)
147    */
148   uint64_t header_size;
149
150   /**
151    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
152    */
153   uint64_t state_delta;
154
155   /**
156    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint32_t flags;
159
160   /**
161    * Receive state of message.
162    *
163    * @see MessageFragmentState
164    */
165   uint8_t state;
166
167   /**
168    * Whether the state is already modified in PSYCstore.
169    */
170   uint8_t state_is_modified;
171
172   /**
173    * Is the message queued for delivery to the client?
174    * i.e. added to the recv_msgs queue
175    */
176   uint8_t is_queued;
177 };
178
179
180 /**
181  * List of connected clients.
182  */
183 struct ClientList
184 {
185   struct ClientList *prev;
186   struct ClientList *next;
187
188   struct GNUNET_SERVICE_Client *client;
189 };
190
191
192 struct Operation
193 {
194   struct Operation *prev;
195   struct Operation *next;
196
197   struct GNUNET_SERVICE_Client *client;
198   struct Channel *channel;
199   uint64_t op_id;
200   uint32_t flags;
201 };
202
203
204 /**
205  * Common part of the client context for both a channel master and slave.
206  */
207 struct Channel
208 {
209   struct ClientList *clients_head;
210   struct ClientList *clients_tail;
211
212   struct Operation *op_head;
213   struct Operation *op_tail;
214
215   struct TransmitMessage *tmit_head;
216   struct TransmitMessage *tmit_tail;
217
218   /**
219    * Current PSYCstore operation.
220    */
221   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
222
223   /**
224    * Received fragments not yet sent to the client.
225    * message_id -> FragmentQueue
226    */
227   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
228
229   /**
230    * Received message IDs not yet sent to the client.
231    */
232   struct GNUNET_CONTAINER_Heap *recv_msgs;
233
234   /**
235    * Public key of the channel.
236    */
237   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
238
239   /**
240    * Hash of @a pub_key.
241    */
242   struct GNUNET_HashCode pub_key_hash;
243
244   /**
245    * Last message ID sent to the client.
246    * 0 if there is no such message.
247    */
248   uint64_t max_message_id;
249
250   /**
251    * ID of the last stateful message, where the state operations has been
252    * processed and saved to PSYCstore and which has been sent to the client.
253    * 0 if there is no such message.
254    */
255   uint64_t max_state_message_id;
256
257   /**
258    * Expected value size for the modifier being received from the PSYC service.
259    */
260   uint32_t tmit_mod_value_size_expected;
261
262   /**
263    * Actual value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size;
266
267   /**
268    * Is this channel ready to receive messages from client?
269    * #GNUNET_YES or #GNUNET_NO
270    */
271   uint8_t is_ready;
272
273   /**
274    * Is the client disconnected?
275    * #GNUNET_YES or #GNUNET_NO
276    */
277   uint8_t is_disconnecting;
278
279   /**
280    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
281    */
282   uint8_t is_master;
283
284   union {
285     struct Master *master;
286     struct Slave *slave;
287   };
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel channel;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel channel;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 /**
421  * Client context.
422  */
423 struct Client {
424   struct GNUNET_SERVICE_Client *client;
425   struct Channel *channel;
426 };
427
428
429 struct ReplayRequestKey
430 {
431   uint64_t fragment_id;
432   uint64_t message_id;
433   uint64_t fragment_offset;
434   uint64_t flags;
435 };
436
437
438 static void
439 transmit_message (struct Channel *chn);
440
441 static uint64_t
442 message_queue_run (struct Channel *chn);
443
444 static uint64_t
445 message_queue_drop (struct Channel *chn);
446
447
448 static void
449 schedule_transmit_message (void *cls)
450 {
451   struct Channel *chn = cls;
452
453   transmit_message (chn);
454 }
455
456
457 /**
458  * Task run during shutdown.
459  *
460  * @param cls unused
461  */
462 static void
463 shutdown_task (void *cls)
464 {
465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
466               "shutting down...\n");
467   GNUNET_PSYCSTORE_disconnect (store);
468   if (NULL != stats)
469   {
470     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
471     stats = NULL;
472   }
473 }
474
475
476 static struct Operation *
477 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
478         uint64_t op_id, uint32_t flags)
479 {
480   struct Operation *op = GNUNET_malloc (sizeof (*op));
481   op->client = client;
482   op->channel = chn;
483   op->op_id = op_id;
484   op->flags = flags;
485   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
486   return op;
487 }
488
489
490 static void
491 op_remove (struct Operation *op)
492 {
493   GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
494   GNUNET_free (op);
495 }
496
497
498 /**
499  * Clean up master data structures after a client disconnected.
500  */
501 static void
502 cleanup_master (struct Master *mst)
503 {
504   struct Channel *chn = &mst->channel;
505
506   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
507   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
508 }
509
510
511 /**
512  * Clean up slave data structures after a client disconnected.
513  */
514 static void
515 cleanup_slave (struct Slave *slv)
516 {
517   struct Channel *chn = &slv->channel;
518   struct GNUNET_CONTAINER_MultiHashMap *
519     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
520                                                 &chn->pub_key_hash);
521   GNUNET_assert (NULL != chn_slv);
522   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
523
524   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
525   {
526     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
527                                           chn_slv);
528     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
529   }
530   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
531
532   if (NULL != slv->join_msg)
533   {
534     GNUNET_free (slv->join_msg);
535     slv->join_msg = NULL;
536   }
537   if (NULL != slv->relays)
538   {
539     GNUNET_free (slv->relays);
540     slv->relays = NULL;
541   }
542   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
543 }
544
545
546 /**
547  * Clean up channel data structures after a client disconnected.
548  */
549 static void
550 cleanup_channel (struct Channel *chn)
551 {
552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553               "%p Cleaning up channel %s. master? %u\n",
554               chn,
555               GNUNET_h2s (&chn->pub_key_hash),
556               chn->is_master);
557   message_queue_drop (chn);
558   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
559   chn->recv_frags = NULL;
560
561   if (NULL != chn->store_op)
562   {
563     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
564     chn->store_op = NULL;
565   }
566
567   (GNUNET_YES == chn->is_master)
568     ? cleanup_master (chn->master)
569     : cleanup_slave (chn->slave);
570   GNUNET_free (chn);
571 }
572
573
574 /**
575  * Called whenever a client is disconnected.
576  * Frees our resources associated with that client.
577  *
578  * @param cls closure
579  * @param client identification of the client
580  * @param app_ctx must match @a client
581  */
582 static void
583 client_notify_disconnect (void *cls,
584                           struct GNUNET_SERVICE_Client *client,
585                           void *app_ctx)
586 {
587   struct Client *c = app_ctx;
588   struct Channel *chn = c->channel;
589   GNUNET_free (c);
590
591   if (NULL == chn)
592   {
593     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                 "%p User context is NULL in client_notify_disconnect ()\n",
595                 chn);
596     GNUNET_break (0);
597     return;
598   }
599
600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
601               "%p Client %p (%s) disconnected from channel %s\n",
602               chn,
603               client,
604               (GNUNET_YES == chn->is_master) ? "master" : "slave",
605               GNUNET_h2s (&chn->pub_key_hash));
606
607   struct ClientList *cli = chn->clients_head;
608   while (NULL != cli)
609   {
610     if (cli->client == client)
611     {
612       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
613       GNUNET_free (cli);
614       break;
615     }
616     cli = cli->next;
617   }
618
619   struct Operation *op = chn->op_head;
620   while (NULL != op)
621   {
622     if (op->client == client)
623     {
624       op->client = NULL;
625       break;
626     }
627     op = op->next;
628   }
629
630   if (NULL == chn->clients_head)
631   { /* Last client disconnected. */
632     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633                 "%p Last client (%s) disconnected from channel %s\n",
634                 chn,
635                 (GNUNET_YES == chn->is_master) ? "master" : "slave",
636                 GNUNET_h2s (&chn->pub_key_hash));
637     chn->is_disconnecting = GNUNET_YES;
638     cleanup_channel (chn);
639   }
640 }
641
642
643 /**
644  * A new client connected.
645  *
646  * @param cls NULL
647  * @param client client to add
648  * @param mq message queue for @a client
649  * @return @a client
650  */
651 static void *
652 client_notify_connect (void *cls,
653                        struct GNUNET_SERVICE_Client *client,
654                        struct GNUNET_MQ_Handle *mq)
655 {
656   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
657
658   struct Client *c = GNUNET_malloc (sizeof (*c));
659   c->client = client;
660
661   return c;
662 }
663
664
665 /**
666  * Send message to all clients connected to the channel.
667  */
668 static void
669 client_send_msg (const struct Channel *chn,
670                  const struct GNUNET_MessageHeader *msg)
671 {
672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673               "Sending message to clients of channel %p.\n",
674               chn);
675
676   struct ClientList *cli = chn->clients_head;
677   while (NULL != cli)
678   {
679     struct GNUNET_MQ_Envelope *
680       env = GNUNET_MQ_msg_copy (msg);
681
682     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
683                     env);
684     cli = cli->next;
685   }
686 }
687
688
689 /**
690  * Send a result code back to the client.
691  *
692  * @param client
693  *        Client that should receive the result code.
694  * @param result_code
695  *        Code to transmit.
696  * @param op_id
697  *        Operation ID in network byte order.
698  * @param data
699  *        Data payload or NULL.
700  * @param data_size
701  *        Size of @a data.
702  */
703 static void
704 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
705                     int64_t result_code, const void *data, uint16_t data_size)
706 {
707   struct GNUNET_OperationResultMessage *res;
708   struct GNUNET_MQ_Envelope *
709     env = GNUNET_MQ_msg_extra (res,
710                                data_size,
711                                GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
712   res->result_code = GNUNET_htonll (result_code);
713   res->op_id = op_id;
714   if (0 < data_size)
715     GNUNET_memcpy (&res[1], data, data_size);
716
717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718               "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
719               client,
720               GNUNET_ntohll (op_id),
721               result_code,
722               data_size);
723
724   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
725 }
726
727
728 /**
729  * Closure for join_mem_test_cb()
730  */
731 struct JoinMemTestClosure
732 {
733   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
734   struct Channel *channel;
735   struct GNUNET_MULTICAST_JoinHandle *join_handle;
736   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
737 };
738
739
740 /**
741  * Membership test result callback used for join requests.
742  */
743 static void
744 join_mem_test_cb (void *cls, int64_t result,
745                   const char *err_msg, uint16_t err_msg_size)
746 {
747   struct JoinMemTestClosure *jcls = cls;
748
749   if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
750   { /* Pass on join request to client if this is a master channel */
751     struct Master *mst = jcls->channel->master;
752     struct GNUNET_HashCode slave_pub_hash;
753     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
754                         &slave_pub_hash);
755     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
756                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
757     client_send_msg (jcls->channel, &jcls->join_msg->header);
758   }
759   else
760   {
761     if (GNUNET_SYSERR == result)
762     {
763       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
764                   "Could not perform membership test (%.*s)\n",
765                   err_msg_size, err_msg);
766     }
767     // FIXME: add relays
768     GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
769   }
770   GNUNET_free (jcls->join_msg);
771   GNUNET_free (jcls);
772 }
773
774
775 /**
776  * Incoming join request from multicast.
777  */
778 static void
779 mcast_recv_join_request (void *cls,
780                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
781                          const struct GNUNET_MessageHeader *join_msg,
782                          struct GNUNET_MULTICAST_JoinHandle *jh)
783 {
784   struct Channel *chn = cls;
785   uint16_t join_msg_size = 0;
786
787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788               "%p Got join request.\n",
789               chn);
790   if (NULL != join_msg)
791   {
792     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
793     {
794       join_msg_size = ntohs (join_msg->size);
795     }
796     else
797     {
798       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
799                   "%p Got join message with invalid type %u.\n",
800                   chn,
801                   ntohs (join_msg->type));
802     }
803   }
804
805   struct GNUNET_PSYC_JoinRequestMessage *
806     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
807   req->header.size = htons (sizeof (*req) + join_msg_size);
808   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
809   req->slave_pub_key = *slave_pub_key;
810   if (0 < join_msg_size)
811     GNUNET_memcpy (&req[1], join_msg, join_msg_size);
812
813   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
814   jcls->slave_pub_key = *slave_pub_key;
815   jcls->channel = chn;
816   jcls->join_handle = jh;
817   jcls->join_msg = req;
818
819   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
820                                     chn->max_message_id, 0,
821                                     &join_mem_test_cb, jcls);
822 }
823
824
825 /**
826  * Join decision received from multicast.
827  */
828 static void
829 mcast_recv_join_decision (void *cls, int is_admitted,
830                           const struct GNUNET_PeerIdentity *peer,
831                           uint16_t relay_count,
832                           const struct GNUNET_PeerIdentity *relays,
833                           const struct GNUNET_MessageHeader *join_resp)
834 {
835   struct Slave *slv = cls;
836   struct Channel *chn = &slv->channel;
837   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
838               "%p Got join decision: %d\n",
839               slv,
840               is_admitted);
841   if (GNUNET_YES == chn->is_ready)
842   {
843     /* Already admitted */
844     return;
845   }
846
847   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
848   struct GNUNET_PSYC_JoinDecisionMessage *
849     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
850   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
851   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
852   dcsn->is_admitted = htonl (is_admitted);
853   if (0 < join_resp_size)
854     GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
855
856   client_send_msg (chn, &dcsn->header);
857
858   if (GNUNET_YES == is_admitted
859       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
860   {
861     chn->is_ready = GNUNET_YES;
862   }
863 }
864
865
866 static int
867 store_recv_fragment_replay (void *cls,
868                             struct GNUNET_MULTICAST_MessageHeader *msg,
869                             enum GNUNET_PSYCSTORE_MessageFlags flags)
870 {
871   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
872
873   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
874   return GNUNET_YES;
875 }
876
877
878 /**
879  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
880  */
881 static void
882 store_recv_fragment_replay_result (void *cls,
883                                    int64_t result,
884                                    const char *err_msg,
885                                    uint16_t err_msg_size)
886 {
887   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
888
889   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
891               rh,
892               result,
893               err_msg_size,
894               err_msg);
895   switch (result)
896   {
897   case GNUNET_YES:
898     break;
899
900   case GNUNET_NO:
901     GNUNET_MULTICAST_replay_response (rh, NULL,
902                                       GNUNET_MULTICAST_REC_NOT_FOUND);
903     return;
904
905   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
906     GNUNET_MULTICAST_replay_response (rh, NULL,
907                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
908     return;
909
910   case GNUNET_SYSERR:
911     GNUNET_MULTICAST_replay_response (rh, NULL,
912                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
913     return;
914   }
915   /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
916    * an error code, so it must be ensured no further processing
917    * is attempted on 'rh'. Maybe this should be refactored as
918    * it doesn't look very intuitive.    --lynX
919    */
920   GNUNET_MULTICAST_replay_response_end (rh);
921 }
922
923
924 /**
925  * Incoming fragment replay request from multicast.
926  */
927 static void
928 mcast_recv_replay_fragment (void *cls,
929                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
930                             uint64_t fragment_id, uint64_t flags,
931                             struct GNUNET_MULTICAST_ReplayHandle *rh)
932
933 {
934   struct Channel *chn = cls;
935   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
936                                  fragment_id, fragment_id,
937                                  &store_recv_fragment_replay,
938                                  &store_recv_fragment_replay_result, rh);
939 }
940
941
942 /**
943  * Incoming message replay request from multicast.
944  */
945 static void
946 mcast_recv_replay_message (void *cls,
947                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
948                            uint64_t message_id,
949                            uint64_t fragment_offset,
950                            uint64_t flags,
951                            struct GNUNET_MULTICAST_ReplayHandle *rh)
952 {
953   struct Channel *chn = cls;
954   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
955                                 message_id, message_id, 1, NULL,
956                                 &store_recv_fragment_replay,
957                                 &store_recv_fragment_replay_result, rh);
958 }
959
960
961 /**
962  * Convert an uint64_t in network byte order to a HashCode
963  * that can be used as key in a MultiHashMap
964  */
965 static inline void
966 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
967 {
968   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
969   /* TODO: use built-in byte swap functions if available */
970
971   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
972   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
973
974   *key = (struct GNUNET_HashCode) {};
975   *((uint64_t *) key)
976     = (n << 32) | (n >> 32);
977 }
978
979
980 /**
981  * Convert an uint64_t in host byte order to a HashCode
982  * that can be used as key in a MultiHashMap
983  */
984 static inline void
985 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
986 {
987 #if __BYTE_ORDER == __BIG_ENDIAN
988   hash_key_from_nll (key, n);
989 #elif __BYTE_ORDER == __LITTLE_ENDIAN
990   *key = (struct GNUNET_HashCode) {};
991   *((uint64_t *) key) = n;
992 #else
993   #error byteorder undefined
994 #endif
995 }
996
997
998 /**
999  * Initialize PSYC message header.
1000  */
1001 static inline void
1002 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1003                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1004 {
1005   uint16_t size = ntohs (mmsg->header.size);
1006   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1007
1008   pmsg->header.size = htons (psize);
1009   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1010   pmsg->message_id = mmsg->message_id;
1011   pmsg->fragment_offset = mmsg->fragment_offset;
1012   pmsg->flags = htonl (flags);
1013
1014   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1015 }
1016
1017
1018 /**
1019  * Create a new PSYC message from a multicast message for sending it to clients.
1020  */
1021 static inline struct GNUNET_PSYC_MessageHeader *
1022 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1023 {
1024   struct GNUNET_PSYC_MessageHeader *pmsg;
1025   uint16_t size = ntohs (mmsg->header.size);
1026   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1027
1028   pmsg = GNUNET_malloc (psize);
1029   psyc_msg_init (pmsg, mmsg, flags);
1030   return pmsg;
1031 }
1032
1033
1034 /**
1035  * Send multicast message to all clients connected to the channel.
1036  */
1037 static void
1038 client_send_mcast_msg (struct Channel *chn,
1039                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1040                        uint32_t flags)
1041 {
1042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043               "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1044               chn,
1045               GNUNET_ntohll (mmsg->fragment_id),
1046               GNUNET_ntohll (mmsg->message_id));
1047
1048   struct GNUNET_PSYC_MessageHeader *
1049     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1050   client_send_msg (chn, &pmsg->header);
1051   GNUNET_free (pmsg);
1052 }
1053
1054
1055 /**
1056  * Send multicast request to all clients connected to the channel.
1057  */
1058 static void
1059 client_send_mcast_req (struct Master *mst,
1060                        const struct GNUNET_MULTICAST_RequestHeader *req)
1061 {
1062   struct Channel *chn = &mst->channel;
1063
1064   struct GNUNET_PSYC_MessageHeader *pmsg;
1065   uint16_t size = ntohs (req->header.size);
1066   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1067
1068   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069               "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1070               chn,
1071               GNUNET_ntohll (req->fragment_id),
1072               GNUNET_ntohll (req->request_id));
1073
1074   pmsg = GNUNET_malloc (psize);
1075   pmsg->header.size = htons (psize);
1076   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1077   pmsg->message_id = req->request_id;
1078   pmsg->fragment_offset = req->fragment_offset;
1079   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1080   pmsg->slave_pub_key = req->member_pub_key;
1081   GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1082
1083   client_send_msg (chn, &pmsg->header);
1084
1085   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1086
1087   GNUNET_free (pmsg);
1088 }
1089
1090
1091 /**
1092  * Insert a multicast message fragment into the queue belonging to the message.
1093  *
1094  * @param chn          Channel.
1095  * @param mmsg         Multicast message fragment.
1096  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1097  * @param first_ptype  First PSYC message part type in @a mmsg.
1098  * @param last_ptype   Last PSYC message part type in @a mmsg.
1099  */
1100 static void
1101 fragment_queue_insert (struct Channel *chn,
1102                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1103                        uint16_t first_ptype, uint16_t last_ptype)
1104 {
1105   const uint16_t size = ntohs (mmsg->header.size);
1106   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1107   struct GNUNET_CONTAINER_MultiHashMap
1108     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1109                                                     &chn->pub_key_hash);
1110
1111   struct GNUNET_HashCode msg_id_hash;
1112   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1113
1114   struct FragmentQueue
1115     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1116
1117   if (NULL == fragq)
1118   {
1119     fragq = GNUNET_malloc (sizeof (*fragq));
1120     fragq->state = MSG_FRAG_STATE_HEADER;
1121     fragq->fragments
1122       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1123
1124     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1125                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1126
1127     if (NULL == chan_msgs)
1128     {
1129       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1130       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1131                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1132     }
1133   }
1134
1135   struct GNUNET_HashCode frag_id_hash;
1136   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1137   struct RecvCacheEntry
1138     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1139   if (NULL == cache_entry)
1140   {
1141     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142                 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1143                 chn,
1144                 GNUNET_ntohll (mmsg->message_id),
1145                 GNUNET_ntohll (mmsg->fragment_id));
1146     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147                 "%p header_size: %" PRIu64 " + %u\n",
1148                 chn,
1149                 fragq->header_size,
1150                 size);
1151     cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1152     cache_entry->ref_count = 1;
1153     cache_entry->mmsg = GNUNET_malloc (size);
1154     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1155     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1156                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1157   }
1158   else
1159   {
1160     cache_entry->ref_count++;
1161     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162                 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1163                 chn,
1164                 GNUNET_ntohll (mmsg->message_id),
1165                 GNUNET_ntohll (mmsg->fragment_id),
1166                 cache_entry->ref_count);
1167   }
1168
1169   if (MSG_FRAG_STATE_HEADER == fragq->state)
1170   {
1171     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1172     {
1173       struct GNUNET_PSYC_MessageMethod *
1174         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1175       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1176       fragq->flags = ntohl (pmeth->flags);
1177     }
1178
1179     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1180     {
1181       fragq->header_size += size;
1182     }
1183     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1184              || frag_offset == fragq->header_size)
1185     { /* header is now complete */
1186       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187                   "%p Header of message %" PRIu64 " is complete.\n",
1188                   chn,
1189                   GNUNET_ntohll (mmsg->message_id));
1190
1191       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1192                   "%p Adding message %" PRIu64 " to queue.\n",
1193                   chn,
1194                   GNUNET_ntohll (mmsg->message_id));
1195       fragq->state = MSG_FRAG_STATE_DATA;
1196     }
1197     else
1198     {
1199       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200                   "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1201                   chn,
1202                   GNUNET_ntohll (mmsg->message_id),
1203                   frag_offset,
1204                   fragq->header_size);
1205     }
1206   }
1207
1208   switch (last_ptype)
1209   {
1210   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1211     if (frag_offset == fragq->size)
1212       fragq->state = MSG_FRAG_STATE_END;
1213     else
1214       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215                   "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1216                   chn,
1217                   GNUNET_ntohll (mmsg->message_id),
1218                   frag_offset,
1219                   fragq->size);
1220     break;
1221
1222   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1223     /* Drop message without delivering to client if it's a single fragment */
1224     fragq->state =
1225       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1226       ? MSG_FRAG_STATE_DROP
1227       : MSG_FRAG_STATE_CANCEL;
1228   }
1229
1230   switch (fragq->state)
1231   {
1232   case MSG_FRAG_STATE_DATA:
1233   case MSG_FRAG_STATE_END:
1234   case MSG_FRAG_STATE_CANCEL:
1235     if (GNUNET_NO == fragq->is_queued)
1236     {
1237       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1238                                     GNUNET_ntohll (mmsg->message_id));
1239       fragq->is_queued = GNUNET_YES;
1240     }
1241   }
1242
1243   fragq->size += size;
1244   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1245                                 GNUNET_ntohll (mmsg->fragment_id));
1246 }
1247
1248
1249 /**
1250  * Run fragment queue of a message.
1251  *
1252  * Send fragments of a message in order to client, after all modifiers arrived
1253  * from multicast.
1254  *
1255  * @param chn
1256  *        Channel.
1257  * @param msg_id
1258  *        ID of the message @a fragq belongs to.
1259  * @param fragq
1260  *        Fragment queue of the message.
1261  * @param drop
1262  *        Drop message without delivering to client?
1263  *        #GNUNET_YES or #GNUNET_NO.
1264  */
1265 static void
1266 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1267                     struct FragmentQueue *fragq, uint8_t drop)
1268 {
1269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270               "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1271               chn,
1272               msg_id,
1273               fragq->state);
1274
1275   struct GNUNET_CONTAINER_MultiHashMap
1276     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1277                                                     &chn->pub_key_hash);
1278   GNUNET_assert (NULL != chan_msgs);
1279   uint64_t frag_id;
1280
1281   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1282                                                     &frag_id))
1283   {
1284     struct GNUNET_HashCode frag_id_hash;
1285     hash_key_from_hll (&frag_id_hash, frag_id);
1286     struct RecvCacheEntry *cache_entry
1287       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1288     if (cache_entry != NULL)
1289     {
1290       if (GNUNET_NO == drop)
1291       {
1292         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1293       }
1294       if (cache_entry->ref_count <= 1)
1295       {
1296         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1297                                               cache_entry);
1298         GNUNET_free (cache_entry->mmsg);
1299         GNUNET_free (cache_entry);
1300       }
1301       else
1302       {
1303         cache_entry->ref_count--;
1304       }
1305     }
1306 #if CACHE_AGING_IMPLEMENTED
1307     else if (GNUNET_NO == drop)
1308     {
1309       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1310     }
1311 #endif
1312
1313     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1314   }
1315
1316   if (MSG_FRAG_STATE_END <= fragq->state)
1317   {
1318     struct GNUNET_HashCode msg_id_hash;
1319     hash_key_from_hll (&msg_id_hash, msg_id);
1320
1321     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1322     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1323     GNUNET_free (fragq);
1324   }
1325   else
1326   {
1327     fragq->is_queued = GNUNET_NO;
1328   }
1329 }
1330
1331
1332 struct StateModifyClosure
1333 {
1334   struct Channel *channel;
1335   uint64_t msg_id;
1336   struct GNUNET_HashCode msg_id_hash;
1337 };
1338
1339
1340 void
1341 store_recv_state_modify_result (void *cls, int64_t result,
1342                                 const char *err_msg, uint16_t err_msg_size)
1343 {
1344   struct StateModifyClosure *mcls = cls;
1345   struct Channel *chn = mcls->channel;
1346   uint64_t msg_id = mcls->msg_id;
1347
1348   struct FragmentQueue *
1349     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1350
1351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1353               chn, result, err_msg_size, err_msg);
1354
1355   switch (result)
1356   {
1357   case GNUNET_OK:
1358   case GNUNET_NO:
1359     if (NULL != fragq)
1360       fragq->state_is_modified = GNUNET_YES;
1361     if (chn->max_state_message_id < msg_id)
1362       chn->max_state_message_id = msg_id;
1363     if (chn->max_message_id < msg_id)
1364       chn->max_message_id = msg_id;
1365
1366     if (NULL != fragq)
1367       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1368     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1369     message_queue_run (chn);
1370     break;
1371
1372   default:
1373     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1374                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1375                 chn, result, err_msg_size, err_msg);
1376     /** @todo FIXME: handle state_modify error */
1377   }
1378 }
1379
1380
1381 /**
1382  * Run message queue.
1383  *
1384  * Send messages in queue to client in order after a message has arrived from
1385  * multicast, according to the following:
1386  * - A message is only sent if all of its modifiers arrived.
1387  * - A stateful message is only sent if the previous stateful message
1388  *   has already been delivered to the client.
1389  *
1390  * @param chn  Channel.
1391  *
1392  * @return Number of messages removed from queue and sent to client.
1393  */
1394 static uint64_t
1395 message_queue_run (struct Channel *chn)
1396 {
1397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1398               "%p Running message queue.\n", chn);
1399   uint64_t n = 0;
1400   uint64_t msg_id;
1401
1402   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1403                                                     &msg_id))
1404   {
1405     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1407     struct GNUNET_HashCode msg_id_hash;
1408     hash_key_from_hll (&msg_id_hash, msg_id);
1409
1410     struct FragmentQueue *
1411       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1412
1413     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1414     {
1415       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1416                   "%p No fragq (%p) or header not complete.\n",
1417                   chn, fragq);
1418       break;
1419     }
1420
1421     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1422                 "%p Fragment queue entry:  state: %u, state delta: "
1423                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1424                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1425
1426     if (MSG_FRAG_STATE_DATA <= fragq->state)
1427     {
1428       /* Check if there's a missing message before the current one */
1429       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1430       {
1431         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1432
1433         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1434             && (chn->max_message_id != msg_id - 1
1435                 && chn->max_message_id != msg_id))
1436         {
1437           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438                       "%p Out of order message. "
1439                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1440                       chn, chn->max_message_id, msg_id);
1441           break;
1442           // FIXME: keep track of messages processed in this queue run,
1443           //        and only stop after reaching the end
1444         }
1445       }
1446       else
1447       {
1448         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1449         if (GNUNET_YES != fragq->state_is_modified)
1450         {
1451           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1452           {
1453             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1454                         "%p Out of order stateful message. "
1455                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1456                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1457             break;
1458             // FIXME: keep track of messages processed in this queue run,
1459             //        and only stop after reaching the end
1460           }
1461
1462           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1463           mcls->channel = chn;
1464           mcls->msg_id = msg_id;
1465           mcls->msg_id_hash = msg_id_hash;
1466
1467           /* Apply modifiers to state in PSYCstore */
1468           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1469                                          fragq->state_delta,
1470                                          store_recv_state_modify_result, mcls);
1471           break; // continue after asynchronous state modify result
1472         }
1473       }
1474       chn->max_message_id = msg_id;
1475     }
1476     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1477     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1478     n++;
1479   }
1480
1481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1482               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1483   return n;
1484 }
1485
1486
1487 /**
1488  * Drop message queue of a channel.
1489  *
1490  * Remove all messages in queue without sending it to clients.
1491  *
1492  * @param chn  Channel.
1493  *
1494  * @return Number of messages removed from queue.
1495  */
1496 static uint64_t
1497 message_queue_drop (struct Channel *chn)
1498 {
1499   uint64_t n = 0;
1500   uint64_t msg_id;
1501   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1502                                                     &msg_id))
1503   {
1504     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1505                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1506     struct GNUNET_HashCode msg_id_hash;
1507     hash_key_from_hll (&msg_id_hash, msg_id);
1508
1509     struct FragmentQueue *
1510       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1511     GNUNET_assert (NULL != fragq);
1512     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1513     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1514     n++;
1515   }
1516   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1517               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1518   return n;
1519 }
1520
1521
1522 /**
1523  * Received result of GNUNET_PSYCSTORE_fragment_store().
1524  */
1525 static void
1526 store_recv_fragment_store_result (void *cls, int64_t result,
1527                                   const char *err_msg, uint16_t err_msg_size)
1528 {
1529   struct Channel *chn = cls;
1530   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1532               chn, result, err_msg_size, err_msg);
1533 }
1534
1535
1536 /**
1537  * Handle incoming message fragment from multicast.
1538  *
1539  * Store it using PSYCstore and send it to the clients of the channel in order.
1540  */
1541 static void
1542 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1543 {
1544   struct Channel *chn = cls;
1545   uint16_t size = ntohs (mmsg->header.size);
1546
1547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1548               "%p Received multicast message of size %u. "
1549               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1550               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1551               chn, size,
1552               GNUNET_ntohll (mmsg->fragment_id),
1553               GNUNET_ntohll (mmsg->message_id),
1554               GNUNET_ntohll (mmsg->fragment_offset),
1555               GNUNET_ntohll (mmsg->flags));
1556
1557   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1558                                    &store_recv_fragment_store_result, chn);
1559
1560   uint16_t first_ptype = 0, last_ptype = 0;
1561   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1562                                                (const char *) &mmsg[1],
1563                                                &first_ptype, &last_ptype);
1564   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1565               "%p Message check result %d, first part type %u, last part type %u\n",
1566               chn, check, first_ptype, last_ptype);
1567   if (GNUNET_SYSERR == check)
1568   {
1569     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1570                 "%p Dropping incoming multicast message with invalid parts.\n",
1571                 chn);
1572     GNUNET_break_op (0);
1573     return;
1574   }
1575
1576   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1577   message_queue_run (chn);
1578 }
1579
1580
1581 /**
1582  * Incoming request fragment from multicast for a master.
1583  *
1584  * @param cls   Master.
1585  * @param req   The request.
1586  */
1587 static void
1588 mcast_recv_request (void *cls,
1589                     const struct GNUNET_MULTICAST_RequestHeader *req)
1590 {
1591   struct Master *mst = cls;
1592   uint16_t size = ntohs (req->header.size);
1593
1594   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1595   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1596               "%p Received multicast request of size %u from %s.\n",
1597               mst, size, str);
1598   GNUNET_free (str);
1599
1600   uint16_t first_ptype = 0, last_ptype = 0;
1601   if (GNUNET_SYSERR
1602       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1603                                           (const char *) &req[1],
1604                                           &first_ptype, &last_ptype))
1605   {
1606     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1607                 "%p Dropping incoming multicast request with invalid parts.\n",
1608                 mst);
1609     GNUNET_break_op (0);
1610     return;
1611   }
1612
1613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614               "Message parts: first: type %u, last: type %u\n",
1615               first_ptype, last_ptype);
1616
1617   /* FIXME: in-order delivery */
1618   client_send_mcast_req (mst, req);
1619 }
1620
1621
1622 /**
1623  * Response from PSYCstore with the current counter values for a channel master.
1624  */
1625 static void
1626 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1627                             uint64_t max_message_id, uint64_t max_group_generation,
1628                             uint64_t max_state_message_id)
1629 {
1630   struct Master *mst = cls;
1631   struct Channel *chn = &mst->channel;
1632   chn->store_op = NULL;
1633
1634   struct GNUNET_PSYC_CountersResultMessage res;
1635   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1636   res.header.size = htons (sizeof (res));
1637   res.result_code = htonl (result);
1638   res.max_message_id = GNUNET_htonll (max_message_id);
1639
1640   if (GNUNET_OK == result || GNUNET_NO == result)
1641   {
1642     mst->max_message_id = max_message_id;
1643     chn->max_message_id = max_message_id;
1644     chn->max_state_message_id = max_state_message_id;
1645     mst->max_group_generation = max_group_generation;
1646     mst->origin
1647       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1648                                        mcast_recv_join_request,
1649                                        mcast_recv_replay_fragment,
1650                                        mcast_recv_replay_message,
1651                                        mcast_recv_request,
1652                                        mcast_recv_message, chn);
1653     chn->is_ready = GNUNET_YES;
1654   }
1655   else
1656   {
1657     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1658                 "%p GNUNET_PSYCSTORE_counters_get() "
1659                 "returned %d for channel %s.\n",
1660                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1661   }
1662
1663   client_send_msg (chn, &res.header);
1664 }
1665
1666
1667 /**
1668  * Response from PSYCstore with the current counter values for a channel slave.
1669  */
1670 void
1671 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1672                            uint64_t max_message_id, uint64_t max_group_generation,
1673                            uint64_t max_state_message_id)
1674 {
1675   struct Slave *slv = cls;
1676   struct Channel *chn = &slv->channel;
1677   chn->store_op = NULL;
1678
1679   struct GNUNET_PSYC_CountersResultMessage res;
1680   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1681   res.header.size = htons (sizeof (res));
1682   res.result_code = htonl (result);
1683   res.max_message_id = GNUNET_htonll (max_message_id);
1684
1685   if (GNUNET_YES == result || GNUNET_NO == result)
1686   {
1687     chn->max_message_id = max_message_id;
1688     chn->max_state_message_id = max_state_message_id;
1689     slv->member
1690       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1691                                       &slv->origin,
1692                                       slv->relay_count, slv->relays,
1693                                       &slv->join_msg->header,
1694                                       mcast_recv_join_request,
1695                                       mcast_recv_join_decision,
1696                                       mcast_recv_replay_fragment,
1697                                       mcast_recv_replay_message,
1698                                       mcast_recv_message, chn);
1699     if (NULL != slv->join_msg)
1700     {
1701       GNUNET_free (slv->join_msg);
1702       slv->join_msg = NULL;
1703     }
1704   }
1705   else
1706   {
1707     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1708                 "%p GNUNET_PSYCSTORE_counters_get() "
1709                 "returned %d for channel %s.\n",
1710                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1711   }
1712
1713   client_send_msg (chn, &res.header);
1714 }
1715
1716
1717 static void
1718 channel_init (struct Channel *chn)
1719 {
1720   chn->recv_msgs
1721     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1722   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1723 }
1724
1725
1726 /**
1727  * Handle a connecting client starting a channel master.
1728  */
1729 static void
1730 handle_client_master_start (void *cls,
1731                             const struct MasterStartRequest *req)
1732 {
1733   struct Client *c = cls;
1734   struct GNUNET_SERVICE_Client *client = c->client;
1735
1736   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1737   struct GNUNET_HashCode pub_key_hash;
1738
1739   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1740   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1741
1742   struct Master *
1743     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1744   struct Channel *chn;
1745
1746   if (NULL == mst)
1747   {
1748     mst = GNUNET_malloc (sizeof (*mst));
1749     mst->policy = ntohl (req->policy);
1750     mst->priv_key = req->channel_key;
1751     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1752
1753     chn = c->channel = &mst->channel;
1754     chn->master = mst;
1755     chn->is_master = GNUNET_YES;
1756     chn->pub_key = pub_key;
1757     chn->pub_key_hash = pub_key_hash;
1758     channel_init (chn);
1759
1760     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1761                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1762     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1763                                                    store_recv_master_counters, mst);
1764   }
1765   else
1766   {
1767     chn = &mst->channel;
1768
1769     struct GNUNET_PSYC_CountersResultMessage *res;
1770     struct GNUNET_MQ_Envelope *
1771       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1772     res->result_code = htonl (GNUNET_OK);
1773     res->max_message_id = GNUNET_htonll (mst->max_message_id);
1774
1775     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1776   }
1777
1778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779               "%p Client connected as master to channel %s.\n",
1780               mst, GNUNET_h2s (&chn->pub_key_hash));
1781
1782   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1783   cli->client = client;
1784   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1785
1786   GNUNET_SERVICE_client_continue (client);
1787 }
1788
1789
1790 static int
1791 check_client_slave_join (void *cls,
1792                          const struct SlaveJoinRequest *req)
1793 {
1794   return GNUNET_OK;
1795 }
1796
1797
1798 /**
1799  * Handle a connecting client joining as a channel slave.
1800  */
1801 static void
1802 handle_client_slave_join (void *cls,
1803                           const struct SlaveJoinRequest *req)
1804 {
1805   struct Client *c = cls;
1806   struct GNUNET_SERVICE_Client *client = c->client;
1807
1808   uint16_t req_size = ntohs (req->header.size);
1809
1810   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1811   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1812
1813   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814               "got join request from client %p\n",
1815               client);
1816   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1817   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1818   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1819
1820   struct GNUNET_CONTAINER_MultiHashMap *
1821     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1822   struct Slave *slv = NULL;
1823   struct Channel *chn;
1824
1825   if (NULL != chn_slv)
1826   {
1827     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1828   }
1829   if (NULL == slv)
1830   {
1831     slv = GNUNET_malloc (sizeof (*slv));
1832     slv->priv_key = req->slave_key;
1833     slv->pub_key = slv_pub_key;
1834     slv->pub_key_hash = slv_pub_hash;
1835     slv->origin = req->origin;
1836     slv->relay_count = ntohl (req->relay_count);
1837     slv->join_flags = ntohl (req->flags);
1838
1839     const struct GNUNET_PeerIdentity *
1840       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1841     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1842     uint16_t join_msg_size = 0;
1843
1844     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1845         <= req_size)
1846     {
1847       struct GNUNET_PSYC_Message *
1848         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1849       join_msg_size = ntohs (join_msg->header.size);
1850       slv->join_msg = GNUNET_malloc (join_msg_size);
1851       GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1852     }
1853     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1854     {
1855       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1856                   "%u + %u + %u != %u\n",
1857                   (unsigned int) sizeof (*req),
1858                   relay_size,
1859                   join_msg_size,
1860                   req_size);
1861       GNUNET_break (0);
1862       GNUNET_SERVICE_client_drop (client);
1863       GNUNET_free (slv);
1864       return;
1865     }
1866     if (0 < slv->relay_count)
1867     {
1868       slv->relays = GNUNET_malloc (relay_size);
1869       GNUNET_memcpy (slv->relays, &req[1], relay_size);
1870     }
1871
1872     chn = c->channel = &slv->channel;
1873     chn->slave = slv;
1874     chn->is_master = GNUNET_NO;
1875     chn->pub_key = req->channel_pub_key;
1876     chn->pub_key_hash = pub_key_hash;
1877     channel_init (chn);
1878
1879     if (NULL == chn_slv)
1880     {
1881       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1882       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1883                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1884     }
1885     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1886                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1887     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1888                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1889     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1890                                                    &store_recv_slave_counters, slv);
1891   }
1892   else
1893   {
1894     chn = &slv->channel;
1895
1896     struct GNUNET_PSYC_CountersResultMessage *res;
1897
1898     struct GNUNET_MQ_Envelope *
1899       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1900     res->result_code = htonl (GNUNET_OK);
1901     res->max_message_id = GNUNET_htonll (chn->max_message_id);
1902
1903     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1904
1905     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1906     {
1907       mcast_recv_join_decision (slv, GNUNET_YES,
1908                                 NULL, 0, NULL, NULL);
1909     }
1910     else if (NULL == slv->member)
1911     {
1912       slv->member
1913         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1914                                         &slv->origin,
1915                                         slv->relay_count, slv->relays,
1916                                         &slv->join_msg->header,
1917                                         &mcast_recv_join_request,
1918                                         &mcast_recv_join_decision,
1919                                         &mcast_recv_replay_fragment,
1920                                         &mcast_recv_replay_message,
1921                                         &mcast_recv_message, chn);
1922       if (NULL != slv->join_msg)
1923       {
1924         GNUNET_free (slv->join_msg);
1925         slv->join_msg = NULL;
1926       }
1927     }
1928     else if (NULL != slv->join_dcsn)
1929     {
1930       struct GNUNET_MQ_Envelope *
1931         env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1932       GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1933     }
1934   }
1935
1936   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1937               "Client %p connected as slave to channel %s.\n",
1938               client,
1939               GNUNET_h2s (&chn->pub_key_hash));
1940
1941   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1942   cli->client = client;
1943   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1944
1945   GNUNET_SERVICE_client_continue (client);
1946 }
1947
1948
1949 struct JoinDecisionClosure
1950 {
1951   int32_t is_admitted;
1952   struct GNUNET_MessageHeader *msg;
1953 };
1954
1955
1956 /**
1957  * Iterator callback for sending join decisions to multicast.
1958  */
1959 static int
1960 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1961                           void *value)
1962 {
1963   struct JoinDecisionClosure *jcls = cls;
1964   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1965   // FIXME: add relays
1966   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1967   return GNUNET_YES;
1968 }
1969
1970
1971 static int
1972 check_client_join_decision (void *cls,
1973                             const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1974 {
1975   return GNUNET_OK;
1976 }
1977
1978
1979 /**
1980  * Join decision from client.
1981  */
1982 static void
1983 handle_client_join_decision (void *cls,
1984                              const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1985 {
1986   struct Client *c = cls;
1987   struct GNUNET_SERVICE_Client *client = c->client;
1988   struct Channel *chn = c->channel;
1989   if (NULL == chn)
1990   {
1991     GNUNET_break (0);
1992     GNUNET_SERVICE_client_drop (client);
1993     return;
1994   }
1995   GNUNET_assert (GNUNET_YES == chn->is_master);
1996   struct Master *mst = chn->master;
1997
1998   struct JoinDecisionClosure jcls;
1999   jcls.is_admitted = ntohl (dcsn->is_admitted);
2000   jcls.msg
2001     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2002     ? (struct GNUNET_MessageHeader *) &dcsn[1]
2003     : NULL;
2004
2005   struct GNUNET_HashCode slave_pub_hash;
2006   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2007                       &slave_pub_hash);
2008
2009   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2010               "%p Got join decision (%d) from client for channel %s..\n",
2011               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013               "%p ..and slave %s.\n",
2014               mst, GNUNET_h2s (&slave_pub_hash));
2015
2016   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2017                                               &mcast_send_join_decision, &jcls);
2018   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2019   GNUNET_SERVICE_client_continue (client);
2020 }
2021
2022
2023 static void
2024 channel_part_cb (void *cls)
2025 {
2026   struct GNUNET_SERVICE_Client *client = cls;
2027   struct GNUNET_MQ_Envelope *env;
2028
2029   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2030   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2031                   env);
2032 }
2033
2034
2035 static void
2036 handle_client_part_request (void *cls,
2037                             const struct GNUNET_MessageHeader *msg)
2038 {
2039   struct Client *c = cls;
2040
2041   c->channel->is_disconnecting = GNUNET_YES;
2042   if (GNUNET_YES == c->channel->is_master)
2043   {
2044     struct Master *mst = (struct Master *) c->channel;
2045    
2046     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047                 "Got part request from master %p\n",
2048                 mst);
2049     GNUNET_assert (NULL != mst->origin);
2050     GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2051   }
2052   else
2053   {
2054     struct Slave *slv = (struct Slave *) c->channel;
2055
2056     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2057                 "Got part request from slave %p\n",
2058                 slv);
2059     GNUNET_assert (NULL != slv->member);
2060     GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2061   }
2062   GNUNET_SERVICE_client_continue (c->client);
2063 }
2064
2065
2066 /**
2067  * Send acknowledgement to a client.
2068  *
2069  * Sent after a message fragment has been passed on to multicast.
2070  *
2071  * @param chn The channel struct for the client.
2072  */
2073 static void
2074 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2075 {
2076   struct GNUNET_MessageHeader *res;
2077   struct GNUNET_MQ_Envelope *
2078       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2079
2080   /* FIXME? */
2081   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2082 }
2083
2084
2085 /**
2086  * Callback for the transmit functions of multicast.
2087  */
2088 static int
2089 transmit_notify (void *cls, size_t *data_size, void *data)
2090 {
2091   struct Channel *chn = cls;
2092   struct TransmitMessage *tmit_msg = chn->tmit_head;
2093
2094   if (NULL == tmit_msg || *data_size < tmit_msg->size)
2095   {
2096     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2097                 "%p transmit_notify: nothing to send.\n", chn);
2098     if (NULL != tmit_msg && *data_size < tmit_msg->size)
2099       GNUNET_break (0);
2100     *data_size = 0;
2101     return GNUNET_NO;
2102   }
2103
2104   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2106
2107   *data_size = tmit_msg->size;
2108   GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2109
2110   int ret
2111     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2112     ? GNUNET_NO
2113     : GNUNET_YES;
2114
2115   /* FIXME: handle disconnecting clients */
2116   if (NULL != tmit_msg->client)
2117     send_message_ack (chn, tmit_msg->client);
2118
2119   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2120
2121   if (NULL != chn->tmit_head)
2122   {
2123     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2124   }
2125   else if (GNUNET_YES == chn->is_disconnecting
2126            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2127   {
2128     /* FIXME: handle partial message (when still in_transmit) */
2129     GNUNET_free (tmit_msg);
2130     return GNUNET_SYSERR;
2131   }
2132   GNUNET_free (tmit_msg);
2133   return ret;
2134 }
2135
2136
2137 /**
2138  * Callback for the transmit functions of multicast.
2139  */
2140 static int
2141 master_transmit_notify (void *cls, size_t *data_size, void *data)
2142 {
2143   int ret = transmit_notify (cls, data_size, data);
2144
2145   if (GNUNET_YES == ret)
2146   {
2147     struct Master *mst = cls;
2148     mst->tmit_handle = NULL;
2149   }
2150   return ret;
2151 }
2152
2153
2154 /**
2155  * Callback for the transmit functions of multicast.
2156  */
2157 static int
2158 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2159 {
2160   int ret = transmit_notify (cls, data_size, data);
2161
2162   if (GNUNET_YES == ret)
2163   {
2164     struct Slave *slv = cls;
2165     slv->tmit_handle = NULL;
2166   }
2167   return ret;
2168 }
2169
2170
2171 /**
2172  * Transmit a message from a channel master to the multicast group.
2173  */
2174 static void
2175 master_transmit_message (struct Master *mst)
2176 {
2177   struct Channel *chn = &mst->channel;
2178   struct TransmitMessage *tmit_msg = chn->tmit_head;
2179   if (NULL == tmit_msg)
2180     return;
2181   if (NULL == mst->tmit_handle)
2182   {
2183     mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2184                                                        tmit_msg->id,
2185                                                        mst->max_group_generation,
2186                                                        &master_transmit_notify,
2187                                                        mst);
2188   }
2189   else
2190   {
2191     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2192   }
2193 }
2194
2195
2196 /**
2197  * Transmit a message from a channel slave to the multicast group.
2198  */
2199 static void
2200 slave_transmit_message (struct Slave *slv)
2201 {
2202   if (NULL == slv->channel.tmit_head)
2203     return;
2204   if (NULL == slv->tmit_handle)
2205   {
2206     slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2207                                                           slv->channel.tmit_head->id,
2208                                                           &slave_transmit_notify,
2209                                                           slv);
2210   }
2211   else
2212   {
2213     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2214   }
2215 }
2216
2217
2218 static void
2219 transmit_message (struct Channel *chn)
2220 {
2221   chn->is_master
2222     ? master_transmit_message (chn->master)
2223     : slave_transmit_message (chn->slave);
2224 }
2225
2226
2227 /**
2228  * Queue a message from a channel master for sending to the multicast group.
2229  */
2230 static void
2231 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2232 {
2233   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2234   {
2235     tmit_msg->id = ++mst->max_message_id;
2236     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2238                 mst, tmit_msg->id);
2239     struct GNUNET_PSYC_MessageMethod *pmeth
2240       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2241
2242     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2243     {
2244       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2245     }
2246     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2247     {
2248       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2250                   mst, tmit_msg->id - mst->max_state_message_id);
2251       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2252                                           - mst->max_state_message_id);
2253       mst->max_state_message_id = tmit_msg->id;
2254     }
2255     else
2256     {
2257         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2258                     "%p master_queue_message: state not modified\n", mst);
2259       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2260     }
2261
2262     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2263     {
2264       /// @todo add state_hash to PSYC header
2265     }
2266   }
2267 }
2268
2269
2270 /**
2271  * Queue a message from a channel slave for sending to the multicast group.
2272  */
2273 static void
2274 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2275 {
2276   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2277   {
2278     struct GNUNET_PSYC_MessageMethod *pmeth
2279       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2280     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2281     tmit_msg->id = ++slv->max_request_id;
2282   }
2283 }
2284
2285
2286 /**
2287  * Queue PSYC message parts for sending to multicast.
2288  *
2289  * @param chn
2290  *        Channel to send to.
2291  * @param client
2292  *        Client the message originates from.
2293  * @param data_size
2294  *        Size of @a data.
2295  * @param data
2296  *        Concatenated message parts.
2297  * @param first_ptype
2298  *        First message part type in @a data.
2299  * @param last_ptype
2300  *        Last message part type in @a data.
2301  */
2302 static struct TransmitMessage *
2303 queue_message (struct Channel *chn,
2304                struct GNUNET_SERVICE_Client *client,
2305                size_t data_size,
2306                const void *data,
2307                uint16_t first_ptype, uint16_t last_ptype)
2308 {
2309   struct TransmitMessage *
2310     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2311   GNUNET_memcpy (&tmit_msg[1], data, data_size);
2312   tmit_msg->client = client;
2313   tmit_msg->size = data_size;
2314   tmit_msg->first_ptype = first_ptype;
2315   tmit_msg->last_ptype = last_ptype;
2316
2317   /* FIXME: separate queue per message ID */
2318
2319   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2320
2321   chn->is_master
2322     ? master_queue_message (chn->master, tmit_msg)
2323     : slave_queue_message (chn->slave, tmit_msg);
2324   return tmit_msg;
2325 }
2326
2327
2328 /**
2329  * Cancel transmission of current message.
2330  *
2331  * @param chn     Channel to send to.
2332  * @param client  Client the message originates from.
2333  */
2334 static void
2335 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2336 {
2337   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2338
2339   struct GNUNET_MessageHeader msg;
2340   msg.size = htons (sizeof (msg));
2341   msg.type = htons (type);
2342
2343   queue_message (chn, client, sizeof (msg), &msg, type, type);
2344   transmit_message (chn);
2345
2346   /* FIXME: cleanup */
2347 }
2348
2349
2350 static int
2351 check_client_psyc_message (void *cls,
2352                            const struct GNUNET_MessageHeader *msg)
2353 {
2354   return GNUNET_OK;
2355 }
2356
2357
2358 /**
2359  * Incoming message from a master or slave client.
2360  */
2361 static void
2362 handle_client_psyc_message (void *cls,
2363                             const struct GNUNET_MessageHeader *msg)
2364 {
2365   struct Client *c = cls;
2366   struct GNUNET_SERVICE_Client *client = c->client;
2367   struct Channel *chn = c->channel;
2368   if (NULL == chn)
2369   {
2370     GNUNET_break (0);
2371     GNUNET_SERVICE_client_drop (client);
2372     return;
2373   }
2374
2375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2376               "%p Received message from client.\n", chn);
2377   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2378
2379   if (GNUNET_YES != chn->is_ready)
2380   {
2381     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2382                 "%p Channel is not ready yet, disconnecting client %p.\n",
2383                 chn,
2384                 client);
2385     GNUNET_break (0);
2386     GNUNET_SERVICE_client_drop (client);
2387     return;
2388   }
2389
2390   uint16_t size = ntohs (msg->size);
2391   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2392   {
2393     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2394                 "%p Message payload too large: %u < %u.\n",
2395                 chn,
2396                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2397                 (unsigned int) (size - sizeof (*msg)));
2398     GNUNET_break (0);
2399     transmit_cancel (chn, client);
2400     GNUNET_SERVICE_client_drop (client);
2401     return;
2402   }
2403
2404   uint16_t first_ptype = 0, last_ptype = 0;
2405   if (GNUNET_SYSERR
2406       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2407                                           (const char *) &msg[1],
2408                                           &first_ptype, &last_ptype))
2409   {
2410     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2411                 "%p Received invalid message part from client.\n", chn);
2412     GNUNET_break (0);
2413     transmit_cancel (chn, client);
2414     GNUNET_SERVICE_client_drop (client);
2415     return;
2416   }
2417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2418               "%p Received message with first part type %u and last part type %u.\n",
2419               chn, first_ptype, last_ptype);
2420
2421   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2422                  first_ptype, last_ptype);
2423   transmit_message (chn);
2424   /* FIXME: send a few ACKs even before transmit_notify is called */
2425
2426   GNUNET_SERVICE_client_continue (client);
2427 };
2428
2429
2430 /**
2431  * Received result of GNUNET_PSYCSTORE_membership_store()
2432  */
2433 static void
2434 store_recv_membership_store_result (void *cls,
2435                                     int64_t result,
2436                                     const char *err_msg,
2437                                     uint16_t err_msg_size)
2438 {
2439   struct Operation *op = cls;
2440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2441               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2442               op->channel,
2443               result,
2444               (int) err_msg_size,
2445               err_msg);
2446
2447   if (NULL != op->client)
2448     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2449   op_remove (op);
2450 }
2451
2452
2453 /**
2454  * Client requests to add/remove a slave in the membership database.
2455  */
2456 static void
2457 handle_client_membership_store (void *cls,
2458                                 const struct ChannelMembershipStoreRequest *req)
2459 {
2460   struct Client *c = cls;
2461   struct GNUNET_SERVICE_Client *client = c->client;
2462   struct Channel *chn = c->channel;
2463   if (NULL == chn)
2464   {
2465     GNUNET_break (0);
2466     GNUNET_SERVICE_client_drop (client);
2467     return;
2468   }
2469
2470   struct Operation *op = op_add (chn, client, req->op_id, 0);
2471
2472   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2473   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2475               "%p Received membership store request from client.\n", chn);
2476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2477               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2478               chn, req->did_join, announced_at, effective_since);
2479
2480   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2481                                      req->did_join, announced_at, effective_since,
2482                                      0, /* FIXME: group_generation */
2483                                      &store_recv_membership_store_result, op);
2484   GNUNET_SERVICE_client_continue (client);
2485 }
2486
2487
2488 /**
2489  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2490  * in response to a history request from a client.
2491  */
2492 static int
2493 store_recv_fragment_history (void *cls,
2494                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2495                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2496 {
2497   struct Operation *op = cls;
2498   if (NULL == op->client)
2499   { /* Requesting client already disconnected. */
2500     return GNUNET_NO;
2501   }
2502   struct Channel *chn = op->channel;
2503
2504   struct GNUNET_PSYC_MessageHeader *pmsg;
2505   uint16_t msize = ntohs (mmsg->header.size);
2506   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2507
2508   struct GNUNET_OperationResultMessage *
2509     res = GNUNET_malloc (sizeof (*res) + psize);
2510   res->header.size = htons (sizeof (*res) + psize);
2511   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2512   res->op_id = op->op_id;
2513   res->result_code = GNUNET_htonll (GNUNET_OK);
2514
2515   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2516   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2517   GNUNET_memcpy (&res[1], pmsg, psize);
2518
2519   /** @todo FIXME: send only to requesting client */
2520   client_send_msg (chn, &res->header);
2521
2522   GNUNET_free (res);
2523   return GNUNET_YES;
2524 }
2525
2526
2527 /**
2528  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2529  * in response to a history request from a client.
2530  */
2531 static void
2532 store_recv_fragment_history_result (void *cls, int64_t result,
2533                                     const char *err_msg, uint16_t err_msg_size)
2534 {
2535   struct Operation *op = cls;
2536   if (NULL == op->client)
2537   { /* Requesting client already disconnected. */
2538     return;
2539   }
2540
2541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2542               "%p History replay #%" PRIu64 ": "
2543               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2544               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2545
2546   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2547   {
2548     /** @todo Multicast replay request for messages not found locally. */
2549   }
2550
2551   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2552   op_remove (op);
2553 }
2554
2555
2556 static int
2557 check_client_history_replay (void *cls,
2558                              const struct GNUNET_PSYC_HistoryRequestMessage *req)
2559 {
2560   return GNUNET_OK;
2561 }
2562
2563
2564 /**
2565  * Client requests channel history.
2566  */
2567 static void
2568 handle_client_history_replay (void *cls,
2569                               const struct GNUNET_PSYC_HistoryRequestMessage *req)
2570 {
2571   struct Client *c = cls;
2572   struct GNUNET_SERVICE_Client *client = c->client;
2573   struct Channel *chn = c->channel;
2574   if (NULL == chn)
2575   {
2576     GNUNET_break (0);
2577     GNUNET_SERVICE_client_drop (client);
2578     return;
2579   }
2580
2581   uint16_t size = ntohs (req->header.size);
2582   const char *method_prefix = (const char *) &req[1];
2583
2584   if (size < sizeof (*req) + 1
2585       || '\0' != method_prefix[size - sizeof (*req) - 1])
2586   {
2587     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2588                 "%p History replay #%" PRIu64 ": "
2589                 "invalid method prefix. size: %u < %u?\n",
2590                 chn,
2591                 GNUNET_ntohll (req->op_id),
2592                 size,
2593                 (unsigned int) sizeof (*req) + 1);
2594     GNUNET_break (0);
2595     GNUNET_SERVICE_client_drop (client);
2596     return;
2597   }
2598
2599   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2600
2601   if (0 == req->message_limit)
2602   {
2603     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2604                                   GNUNET_ntohll (req->start_message_id),
2605                                   GNUNET_ntohll (req->end_message_id),
2606                                   0, method_prefix,
2607                                   &store_recv_fragment_history,
2608                                   &store_recv_fragment_history_result, op);
2609   }
2610   else
2611   {
2612     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2613                                          GNUNET_ntohll (req->message_limit),
2614                                          method_prefix,
2615                                          &store_recv_fragment_history,
2616                                          &store_recv_fragment_history_result,
2617                                          op);
2618   }
2619   GNUNET_SERVICE_client_continue (client);
2620 }
2621
2622
2623 /**
2624  * Received state var from PSYCstore, send it to client.
2625  */
2626 static int
2627 store_recv_state_var (void *cls, const char *name,
2628                       const void *value, uint32_t value_size)
2629 {
2630   struct Operation *op = cls;
2631   struct GNUNET_OperationResultMessage *res;
2632   struct GNUNET_MQ_Envelope *env;
2633
2634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2635               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2636               op->channel, GNUNET_ntohll (op->op_id), name);
2637
2638   if (NULL != name) /* First part */
2639   {
2640     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2641     struct GNUNET_PSYC_MessageModifier *mod;
2642     env = GNUNET_MQ_msg_extra (res,
2643                                sizeof (*mod) + name_size + value_size,
2644                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2645     res->op_id = op->op_id;
2646
2647     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2648     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2649     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2650     mod->name_size = htons (name_size);
2651     mod->value_size = htonl (value_size);
2652     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2653     GNUNET_memcpy (&mod[1], name, name_size);
2654     GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2655   }
2656   else /* Continuation */
2657   {
2658     struct GNUNET_MessageHeader *mod;
2659     env = GNUNET_MQ_msg_extra (res,
2660                                sizeof (*mod) + value_size,
2661                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2662     res->op_id = op->op_id;
2663
2664     mod = (struct GNUNET_MessageHeader *) &res[1];
2665     mod->size = htons (sizeof (*mod) + value_size);
2666     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2667     GNUNET_memcpy (&mod[1], value, value_size);
2668   }
2669
2670   // FIXME: client might have been disconnected
2671   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2672   return GNUNET_YES;
2673 }
2674
2675
2676 /**
2677  * Received result of GNUNET_PSYCSTORE_state_get()
2678  * or GNUNET_PSYCSTORE_state_get_prefix()
2679  */
2680 static void
2681 store_recv_state_result (void *cls, int64_t result,
2682                          const char *err_msg, uint16_t err_msg_size)
2683 {
2684   struct Operation *op = cls;
2685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2686               "%p state_get #%" PRIu64 ": "
2687               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2688               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2689
2690   // FIXME: client might have been disconnected
2691   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2692   op_remove (op);
2693 }
2694
2695
2696 static int
2697 check_client_state_get (void *cls,
2698                          const struct StateRequest *req)
2699 {
2700   struct Client *c = cls;
2701   struct Channel *chn = c->channel;
2702   if (NULL == chn)
2703   {
2704     GNUNET_break (0);
2705     return GNUNET_SYSERR;
2706   }
2707
2708   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2709   const char *name = (const char *) &req[1];
2710   if (0 == name_size || '\0' != name[name_size - 1])
2711   {
2712     GNUNET_break (0);
2713     return GNUNET_SYSERR;
2714   }
2715
2716   return GNUNET_OK;
2717 }
2718
2719
2720 /**
2721  * Client requests best matching state variable from PSYCstore.
2722  */
2723 static void
2724 handle_client_state_get (void *cls,
2725                          const struct StateRequest *req)
2726 {
2727   struct Client *c = cls;
2728   struct GNUNET_SERVICE_Client *client = c->client;
2729   struct Channel *chn = c->channel;
2730
2731   const char *name = (const char *) &req[1];
2732   struct Operation *op = op_add (chn, client, req->op_id, 0);
2733   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2734                               &store_recv_state_var,
2735                               &store_recv_state_result, op);
2736   GNUNET_SERVICE_client_continue (client);
2737 }
2738
2739
2740 static int
2741 check_client_state_get_prefix (void *cls,
2742                                const struct StateRequest *req)
2743 {
2744   struct Client *c = cls;
2745   struct Channel *chn = c->channel;
2746   if (NULL == chn)
2747   {
2748     GNUNET_break (0);
2749     return GNUNET_SYSERR;
2750   }
2751
2752   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2753   const char *name = (const char *) &req[1];
2754   if (0 == name_size || '\0' != name[name_size - 1])
2755   {
2756     GNUNET_break (0);
2757     return GNUNET_SYSERR;
2758   }
2759
2760   return GNUNET_OK;
2761 }
2762
2763
2764 /**
2765  * Client requests state variables with a given prefix from PSYCstore.
2766  */
2767 static void
2768 handle_client_state_get_prefix (void *cls,
2769                                 const struct StateRequest *req)
2770 {
2771   struct Client *c = cls;
2772   struct GNUNET_SERVICE_Client *client = c->client;
2773   struct Channel *chn = c->channel;
2774
2775   const char *name = (const char *) &req[1];
2776   struct Operation *op = op_add (chn, client, req->op_id, 0);
2777   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2778                                      &store_recv_state_var,
2779                                      &store_recv_state_result, op);
2780   GNUNET_SERVICE_client_continue (client);
2781 }
2782
2783
2784 /**
2785  * Initialize the PSYC service.
2786  *
2787  * @param cls Closure.
2788  * @param server The initialized server.
2789  * @param c Configuration to use.
2790  */
2791 static void
2792 run (void *cls,
2793      const struct GNUNET_CONFIGURATION_Handle *c,
2794      struct GNUNET_SERVICE_Handle *svc)
2795 {
2796   cfg = c;
2797   service = svc;
2798   store = GNUNET_PSYCSTORE_connect (cfg);
2799   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2800   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2801   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2802   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2803   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2804   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2805 }
2806
2807
2808 /**
2809  * Define "main" method using service macro.
2810  */
2811 GNUNET_SERVICE_MAIN
2812 ("psyc",
2813  GNUNET_SERVICE_OPTION_NONE,
2814  &run,
2815  &client_notify_connect,
2816  &client_notify_disconnect,
2817  NULL,
2818  GNUNET_MQ_hd_fixed_size (client_master_start,
2819                           GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2820                           struct MasterStartRequest,
2821                           NULL),
2822  GNUNET_MQ_hd_var_size (client_slave_join,
2823                         GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2824                         struct SlaveJoinRequest,
2825                         NULL),
2826  GNUNET_MQ_hd_var_size (client_join_decision,
2827                         GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2828                         struct GNUNET_PSYC_JoinDecisionMessage,
2829                         NULL),
2830  GNUNET_MQ_hd_fixed_size (client_part_request,
2831                           GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2832                           struct GNUNET_MessageHeader,
2833                           NULL),
2834  GNUNET_MQ_hd_var_size (client_psyc_message,
2835                         GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2836                         struct GNUNET_MessageHeader,
2837                         NULL),
2838  GNUNET_MQ_hd_fixed_size (client_membership_store,
2839                           GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2840                           struct ChannelMembershipStoreRequest,
2841                           NULL),
2842  GNUNET_MQ_hd_var_size (client_history_replay,
2843                         GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2844                         struct GNUNET_PSYC_HistoryRequestMessage,
2845                         NULL),
2846  GNUNET_MQ_hd_var_size (client_state_get,
2847                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2848                         struct StateRequest,
2849                         NULL),
2850  GNUNET_MQ_hd_var_size (client_state_get_prefix,
2851                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2852                         struct StateRequest,
2853                         NULL));
2854
2855 /* end of gnunet-service-psyc.c */