paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 /**
20  * @file psyc/gnunet-service-psyc.c
21  * @brief PSYC service
22  * @author Gabor X Toth
23  */
24
25 #include <inttypes.h>
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "gnunet_psyc_util_lib.h"
36 #include "psyc.h"
37
38
39 /**
40  * Handle to our current configuration.
41  */
42 static const struct GNUNET_CONFIGURATION_Handle *cfg;
43
44 /**
45  * Service handle.
46  */
47 static struct GNUNET_SERVICE_Handle *service;
48
49 /**
50  * Handle to the statistics service.
51  */
52 static struct GNUNET_STATISTICS_Handle *stats;
53
54 /**
55  * Handle to the PSYCstore.
56  */
57 static struct GNUNET_PSYCSTORE_Handle *store;
58
59 /**
60  * All connected masters.
61  * Channel's pub_key_hash -> struct Master
62  */
63 static struct GNUNET_CONTAINER_MultiHashMap *masters;
64
65 /**
66  * All connected slaves.
67  * Channel's pub_key_hash -> struct Slave
68  */
69 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
70
71 /**
72  * Connected slaves per channel.
73  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
74  */
75 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
76
77
78 /**
79  * Message in the transmission queue.
80  */
81 struct TransmitMessage
82 {
83   struct TransmitMessage *prev;
84   struct TransmitMessage *next;
85
86   struct GNUNET_SERVICE_Client *client;
87
88   /**
89    * ID assigned to the message.
90    */
91   uint64_t id;
92
93   /**
94    * Size of message.
95    */
96   uint16_t size;
97
98   /**
99    * Type of first message part.
100    */
101   uint16_t first_ptype;
102
103   /**
104    * Type of last message part.
105    */
106   uint16_t last_ptype;
107
108   /* Followed by message */
109 };
110
111
112 /**
113  * Cache for received message fragments.
114  * Message fragments are only sent to clients after all modifiers arrived.
115  *
116  * chan_key -> MultiHashMap chan_msgs
117  */
118 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
119
120
121 /**
122  * Entry in the chan_msgs hashmap of @a recv_cache:
123  * fragment_id -> RecvCacheEntry
124  */
125 struct RecvCacheEntry
126 {
127   struct GNUNET_MULTICAST_MessageHeader *mmsg;
128   uint16_t ref_count;
129 };
130
131
132 /**
133  * Entry in the @a recv_frags hash map of a @a Channel.
134  * message_id -> FragmentQueue
135  */
136 struct FragmentQueue
137 {
138   /**
139    * Fragment IDs stored in @a recv_cache.
140    */
141   struct GNUNET_CONTAINER_Heap *fragments;
142
143   /**
144    * Total size of received fragments.
145    */
146   uint64_t size;
147
148   /**
149    * Total size of received header fragments (METHOD & MODIFIERs)
150    */
151   uint64_t header_size;
152
153   /**
154    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
155    */
156   uint64_t state_delta;
157
158   /**
159    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
160    */
161   uint32_t flags;
162
163   /**
164    * Receive state of message.
165    *
166    * @see MessageFragmentState
167    */
168   uint8_t state;
169
170   /**
171    * Whether the state is already modified in PSYCstore.
172    */
173   uint8_t state_is_modified;
174
175   /**
176    * Is the message queued for delivery to the client?
177    * i.e. added to the recv_msgs queue
178    */
179   uint8_t is_queued;
180 };
181
182
183 /**
184  * List of connected clients.
185  */
186 struct ClientList
187 {
188   struct ClientList *prev;
189   struct ClientList *next;
190
191   struct GNUNET_SERVICE_Client *client;
192 };
193
194
195 struct Operation
196 {
197   struct Operation *prev;
198   struct Operation *next;
199
200   struct GNUNET_SERVICE_Client *client;
201   struct Channel *channel;
202   uint64_t op_id;
203   uint32_t flags;
204 };
205
206
207 /**
208  * Common part of the client context for both a channel master and slave.
209  */
210 struct Channel
211 {
212   struct ClientList *clients_head;
213   struct ClientList *clients_tail;
214
215   struct Operation *op_head;
216   struct Operation *op_tail;
217
218   struct TransmitMessage *tmit_head;
219   struct TransmitMessage *tmit_tail;
220
221   /**
222    * Current PSYCstore operation.
223    */
224   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
225
226   /**
227    * Received fragments not yet sent to the client.
228    * message_id -> FragmentQueue
229    */
230   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
231
232   /**
233    * Received message IDs not yet sent to the client.
234    */
235   struct GNUNET_CONTAINER_Heap *recv_msgs;
236
237   /**
238    * Public key of the channel.
239    */
240   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
241
242   /**
243    * Hash of @a pub_key.
244    */
245   struct GNUNET_HashCode pub_key_hash;
246
247   /**
248    * Last message ID sent to the client.
249    * 0 if there is no such message.
250    */
251   uint64_t max_message_id;
252
253   /**
254    * ID of the last stateful message, where the state operations has been
255    * processed and saved to PSYCstore and which has been sent to the client.
256    * 0 if there is no such message.
257    */
258   uint64_t max_state_message_id;
259
260   /**
261    * Expected value size for the modifier being received from the PSYC service.
262    */
263   uint32_t tmit_mod_value_size_expected;
264
265   /**
266    * Actual value size for the modifier being received from the PSYC service.
267    */
268   uint32_t tmit_mod_value_size;
269
270   /**
271    * Is this channel ready to receive messages from client?
272    * #GNUNET_YES or #GNUNET_NO
273    */
274   uint8_t is_ready;
275
276   /**
277    * Is the client disconnected?
278    * #GNUNET_YES or #GNUNET_NO
279    */
280   uint8_t is_disconnecting;
281
282   /**
283    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
284    */
285   uint8_t is_master;
286
287   union {
288     struct Master *master;
289     struct Slave *slave;
290   };
291 };
292
293
294 /**
295  * Client context for a channel master.
296  */
297 struct Master
298 {
299   /**
300    * Channel struct common for Master and Slave
301    */
302   struct Channel channel;
303
304   /**
305    * Private key of the channel.
306    */
307   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
308
309   /**
310    * Handle for the multicast origin.
311    */
312   struct GNUNET_MULTICAST_Origin *origin;
313
314   /**
315    * Transmit handle for multicast.
316    */
317   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
318
319   /**
320    * Incoming join requests from multicast.
321    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
322    */
323   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
324
325   /**
326    * Last message ID transmitted to this channel.
327    *
328    * Incremented before sending a message, thus the message_id in messages sent
329    * starts from 1.
330    */
331   uint64_t max_message_id;
332
333   /**
334    * ID of the last message with state operations transmitted to the channel.
335    * 0 if there is no such message.
336    */
337   uint64_t max_state_message_id;
338
339   /**
340    * Maximum group generation transmitted to the channel.
341    */
342   uint64_t max_group_generation;
343
344   /**
345    * @see enum GNUNET_PSYC_Policy
346    */
347   enum GNUNET_PSYC_Policy policy;
348 };
349
350
351 /**
352  * Client context for a channel slave.
353  */
354 struct Slave
355 {
356   /**
357    * Channel struct common for Master and Slave
358    */
359   struct Channel channel;
360
361   /**
362    * Private key of the slave.
363    */
364   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
365
366   /**
367    * Public key of the slave.
368    */
369   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
370
371   /**
372    * Hash of @a pub_key.
373    */
374   struct GNUNET_HashCode pub_key_hash;
375
376   /**
377    * Handle for the multicast member.
378    */
379   struct GNUNET_MULTICAST_Member *member;
380
381   /**
382    * Transmit handle for multicast.
383    */
384   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
385
386   /**
387    * Peer identity of the origin.
388    */
389   struct GNUNET_PeerIdentity origin;
390
391   /**
392    * Number of items in @a relays.
393    */
394   uint32_t relay_count;
395
396   /**
397    * Relays that multicast can use to connect.
398    */
399   struct GNUNET_PeerIdentity *relays;
400
401   /**
402    * Join request to be transmitted to the master on join.
403    */
404   struct GNUNET_PSYC_Message *join_msg;
405
406   /**
407    * Join decision received from multicast.
408    */
409   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
410
411   /**
412    * Maximum request ID for this channel.
413    */
414   uint64_t max_request_id;
415
416   /**
417    * Join flags.
418    */
419   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
420 };
421
422
423 /**
424  * Client context.
425  */
426 struct Client {
427   struct GNUNET_SERVICE_Client *client;
428   struct Channel *channel;
429 };
430
431
432 struct ReplayRequestKey
433 {
434   uint64_t fragment_id;
435   uint64_t message_id;
436   uint64_t fragment_offset;
437   uint64_t flags;
438 };
439
440
441 static void
442 transmit_message (struct Channel *chn);
443
444 static uint64_t
445 message_queue_run (struct Channel *chn);
446
447 static uint64_t
448 message_queue_drop (struct Channel *chn);
449
450
451 static void
452 schedule_transmit_message (void *cls)
453 {
454   struct Channel *chn = cls;
455
456   transmit_message (chn);
457 }
458
459
460 /**
461  * Task run during shutdown.
462  *
463  * @param cls unused
464  */
465 static void
466 shutdown_task (void *cls)
467 {
468   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
469               "shutting down...\n");
470   GNUNET_PSYCSTORE_disconnect (store);
471   if (NULL != stats)
472   {
473     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
474     stats = NULL;
475   }
476 }
477
478
479 static struct Operation *
480 op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
481         uint64_t op_id, uint32_t flags)
482 {
483   struct Operation *op = GNUNET_malloc (sizeof (*op));
484   op->client = client;
485   op->channel = chn;
486   op->op_id = op_id;
487   op->flags = flags;
488   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
489   return op;
490 }
491
492
493 static void
494 op_remove (struct Operation *op)
495 {
496   GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
497   GNUNET_free (op);
498 }
499
500
501 /**
502  * Clean up master data structures after a client disconnected.
503  */
504 static void
505 cleanup_master (struct Master *mst)
506 {
507   struct Channel *chn = &mst->channel;
508
509   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
510   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
511 }
512
513
514 /**
515  * Clean up slave data structures after a client disconnected.
516  */
517 static void
518 cleanup_slave (struct Slave *slv)
519 {
520   struct Channel *chn = &slv->channel;
521   struct GNUNET_CONTAINER_MultiHashMap *
522     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
523                                                 &chn->pub_key_hash);
524   GNUNET_assert (NULL != chn_slv);
525   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
526
527   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
528   {
529     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
530                                           chn_slv);
531     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
532   }
533   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
534
535   if (NULL != slv->join_msg)
536   {
537     GNUNET_free (slv->join_msg);
538     slv->join_msg = NULL;
539   }
540   if (NULL != slv->relays)
541   {
542     GNUNET_free (slv->relays);
543     slv->relays = NULL;
544   }
545   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
546 }
547
548
549 /**
550  * Clean up channel data structures after a client disconnected.
551  */
552 static void
553 cleanup_channel (struct Channel *chn)
554 {
555   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
556               "%p Cleaning up channel %s. master? %u\n",
557               chn,
558               GNUNET_h2s (&chn->pub_key_hash),
559               chn->is_master);
560   message_queue_drop (chn);
561   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
562   chn->recv_frags = NULL;
563
564   if (NULL != chn->store_op)
565   {
566     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
567     chn->store_op = NULL;
568   }
569
570   (GNUNET_YES == chn->is_master)
571     ? cleanup_master (chn->master)
572     : cleanup_slave (chn->slave);
573   GNUNET_free (chn);
574 }
575
576
577 /**
578  * Called whenever a client is disconnected.
579  * Frees our resources associated with that client.
580  *
581  * @param cls closure
582  * @param client identification of the client
583  * @param app_ctx must match @a client
584  */
585 static void
586 client_notify_disconnect (void *cls,
587                           struct GNUNET_SERVICE_Client *client,
588                           void *app_ctx)
589 {
590   struct Client *c = app_ctx;
591   struct Channel *chn = c->channel;
592   GNUNET_free (c);
593
594   if (NULL == chn)
595   {
596     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
597                 "%p User context is NULL in client_notify_disconnect ()\n",
598                 chn);
599     GNUNET_break (0);
600     return;
601   }
602
603   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604               "%p Client %p (%s) disconnected from channel %s\n",
605               chn,
606               client,
607               (GNUNET_YES == chn->is_master) ? "master" : "slave",
608               GNUNET_h2s (&chn->pub_key_hash));
609
610   struct ClientList *cli = chn->clients_head;
611   while (NULL != cli)
612   {
613     if (cli->client == client)
614     {
615       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
616       GNUNET_free (cli);
617       break;
618     }
619     cli = cli->next;
620   }
621
622   struct Operation *op = chn->op_head;
623   while (NULL != op)
624   {
625     if (op->client == client)
626     {
627       op->client = NULL;
628       break;
629     }
630     op = op->next;
631   }
632
633   if (NULL == chn->clients_head)
634   { /* Last client disconnected. */
635     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636                 "%p Last client (%s) disconnected from channel %s\n",
637                 chn,
638                 (GNUNET_YES == chn->is_master) ? "master" : "slave",
639                 GNUNET_h2s (&chn->pub_key_hash));
640     chn->is_disconnecting = GNUNET_YES;
641     cleanup_channel (chn);
642   }
643 }
644
645
646 /**
647  * A new client connected.
648  *
649  * @param cls NULL
650  * @param client client to add
651  * @param mq message queue for @a client
652  * @return @a client
653  */
654 static void *
655 client_notify_connect (void *cls,
656                        struct GNUNET_SERVICE_Client *client,
657                        struct GNUNET_MQ_Handle *mq)
658 {
659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
660
661   struct Client *c = GNUNET_malloc (sizeof (*c));
662   c->client = client;
663
664   return c;
665 }
666
667
668 /**
669  * Send message to all clients connected to the channel.
670  */
671 static void
672 client_send_msg (const struct Channel *chn,
673                  const struct GNUNET_MessageHeader *msg)
674 {
675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676               "Sending message to clients of channel %p.\n",
677               chn);
678
679   struct ClientList *cli = chn->clients_head;
680   while (NULL != cli)
681   {
682     struct GNUNET_MQ_Envelope *
683       env = GNUNET_MQ_msg_copy (msg);
684
685     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
686                     env);
687     cli = cli->next;
688   }
689 }
690
691
692 /**
693  * Send a result code back to the client.
694  *
695  * @param client
696  *        Client that should receive the result code.
697  * @param result_code
698  *        Code to transmit.
699  * @param op_id
700  *        Operation ID in network byte order.
701  * @param data
702  *        Data payload or NULL.
703  * @param data_size
704  *        Size of @a data.
705  */
706 static void
707 client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
708                     int64_t result_code, const void *data, uint16_t data_size)
709 {
710   struct GNUNET_OperationResultMessage *res;
711   struct GNUNET_MQ_Envelope *
712     env = GNUNET_MQ_msg_extra (res,
713                                data_size,
714                                GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
715   res->result_code = GNUNET_htonll (result_code);
716   res->op_id = op_id;
717   if (0 < data_size)
718     GNUNET_memcpy (&res[1], data, data_size);
719
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721               "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
722               client,
723               GNUNET_ntohll (op_id),
724               result_code,
725               data_size);
726
727   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
728 }
729
730
731 /**
732  * Closure for join_mem_test_cb()
733  */
734 struct JoinMemTestClosure
735 {
736   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
737   struct Channel *channel;
738   struct GNUNET_MULTICAST_JoinHandle *join_handle;
739   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
740 };
741
742
743 /**
744  * Membership test result callback used for join requests.
745  */
746 static void
747 join_mem_test_cb (void *cls, int64_t result,
748                   const char *err_msg, uint16_t err_msg_size)
749 {
750   struct JoinMemTestClosure *jcls = cls;
751
752   if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
753   { /* Pass on join request to client if this is a master channel */
754     struct Master *mst = jcls->channel->master;
755     struct GNUNET_HashCode slave_pub_hash;
756     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
757                         &slave_pub_hash);
758     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
759                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
760     client_send_msg (jcls->channel, &jcls->join_msg->header);
761   }
762   else
763   {
764     if (GNUNET_SYSERR == result)
765     {
766       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
767                   "Could not perform membership test (%.*s)\n",
768                   err_msg_size, err_msg);
769     }
770     // FIXME: add relays
771     GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
772   }
773   GNUNET_free (jcls->join_msg);
774   GNUNET_free (jcls);
775 }
776
777
778 /**
779  * Incoming join request from multicast.
780  */
781 static void
782 mcast_recv_join_request (void *cls,
783                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
784                          const struct GNUNET_MessageHeader *join_msg,
785                          struct GNUNET_MULTICAST_JoinHandle *jh)
786 {
787   struct Channel *chn = cls;
788   uint16_t join_msg_size = 0;
789
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "%p Got join request.\n",
792               chn);
793   if (NULL != join_msg)
794   {
795     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
796     {
797       join_msg_size = ntohs (join_msg->size);
798     }
799     else
800     {
801       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
802                   "%p Got join message with invalid type %u.\n",
803                   chn,
804                   ntohs (join_msg->type));
805     }
806   }
807
808   struct GNUNET_PSYC_JoinRequestMessage *
809     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
810   req->header.size = htons (sizeof (*req) + join_msg_size);
811   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
812   req->slave_pub_key = *slave_pub_key;
813   if (0 < join_msg_size)
814     GNUNET_memcpy (&req[1], join_msg, join_msg_size);
815
816   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
817   jcls->slave_pub_key = *slave_pub_key;
818   jcls->channel = chn;
819   jcls->join_handle = jh;
820   jcls->join_msg = req;
821
822   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
823                                     chn->max_message_id, 0,
824                                     &join_mem_test_cb, jcls);
825 }
826
827
828 /**
829  * Join decision received from multicast.
830  */
831 static void
832 mcast_recv_join_decision (void *cls, int is_admitted,
833                           const struct GNUNET_PeerIdentity *peer,
834                           uint16_t relay_count,
835                           const struct GNUNET_PeerIdentity *relays,
836                           const struct GNUNET_MessageHeader *join_resp)
837 {
838   struct Slave *slv = cls;
839   struct Channel *chn = &slv->channel;
840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841               "%p Got join decision: %d\n",
842               slv,
843               is_admitted);
844   if (GNUNET_YES == chn->is_ready)
845   {
846     /* Already admitted */
847     return;
848   }
849
850   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
851   struct GNUNET_PSYC_JoinDecisionMessage *
852     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
853   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
854   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
855   dcsn->is_admitted = htonl (is_admitted);
856   if (0 < join_resp_size)
857     GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
858
859   client_send_msg (chn, &dcsn->header);
860
861   if (GNUNET_YES == is_admitted
862       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
863   {
864     chn->is_ready = GNUNET_YES;
865   }
866 }
867
868
869 static int
870 store_recv_fragment_replay (void *cls,
871                             struct GNUNET_MULTICAST_MessageHeader *msg,
872                             enum GNUNET_PSYCSTORE_MessageFlags flags)
873 {
874   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
875
876   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
877   return GNUNET_YES;
878 }
879
880
881 /**
882  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
883  */
884 static void
885 store_recv_fragment_replay_result (void *cls,
886                                    int64_t result,
887                                    const char *err_msg,
888                                    uint16_t err_msg_size)
889 {
890   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
891
892   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
893               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
894               rh,
895               result,
896               err_msg_size,
897               err_msg);
898   switch (result)
899   {
900   case GNUNET_YES:
901     break;
902
903   case GNUNET_NO:
904     GNUNET_MULTICAST_replay_response (rh, NULL,
905                                       GNUNET_MULTICAST_REC_NOT_FOUND);
906     return;
907
908   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
909     GNUNET_MULTICAST_replay_response (rh, NULL,
910                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
911     return;
912
913   case GNUNET_SYSERR:
914     GNUNET_MULTICAST_replay_response (rh, NULL,
915                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
916     return;
917   }
918   /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
919    * an error code, so it must be ensured no further processing
920    * is attempted on 'rh'. Maybe this should be refactored as
921    * it doesn't look very intuitive.    --lynX
922    */
923   GNUNET_MULTICAST_replay_response_end (rh);
924 }
925
926
927 /**
928  * Incoming fragment replay request from multicast.
929  */
930 static void
931 mcast_recv_replay_fragment (void *cls,
932                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
933                             uint64_t fragment_id, uint64_t flags,
934                             struct GNUNET_MULTICAST_ReplayHandle *rh)
935
936 {
937   struct Channel *chn = cls;
938   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
939                                  fragment_id, fragment_id,
940                                  &store_recv_fragment_replay,
941                                  &store_recv_fragment_replay_result, rh);
942 }
943
944
945 /**
946  * Incoming message replay request from multicast.
947  */
948 static void
949 mcast_recv_replay_message (void *cls,
950                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
951                            uint64_t message_id,
952                            uint64_t fragment_offset,
953                            uint64_t flags,
954                            struct GNUNET_MULTICAST_ReplayHandle *rh)
955 {
956   struct Channel *chn = cls;
957   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
958                                 message_id, message_id, 1, NULL,
959                                 &store_recv_fragment_replay,
960                                 &store_recv_fragment_replay_result, rh);
961 }
962
963
964 /**
965  * Convert an uint64_t in network byte order to a HashCode
966  * that can be used as key in a MultiHashMap
967  */
968 static inline void
969 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
970 {
971   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
972   /* TODO: use built-in byte swap functions if available */
973
974   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
975   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
976
977   *key = (struct GNUNET_HashCode) {};
978   *((uint64_t *) key)
979     = (n << 32) | (n >> 32);
980 }
981
982
983 /**
984  * Convert an uint64_t in host byte order to a HashCode
985  * that can be used as key in a MultiHashMap
986  */
987 static inline void
988 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
989 {
990 #if __BYTE_ORDER == __BIG_ENDIAN
991   hash_key_from_nll (key, n);
992 #elif __BYTE_ORDER == __LITTLE_ENDIAN
993   *key = (struct GNUNET_HashCode) {};
994   *((uint64_t *) key) = n;
995 #else
996   #error byteorder undefined
997 #endif
998 }
999
1000
1001 /**
1002  * Initialize PSYC message header.
1003  */
1004 static inline void
1005 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1006                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1007 {
1008   uint16_t size = ntohs (mmsg->header.size);
1009   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1010
1011   pmsg->header.size = htons (psize);
1012   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1013   pmsg->message_id = mmsg->message_id;
1014   pmsg->fragment_offset = mmsg->fragment_offset;
1015   pmsg->flags = htonl (flags);
1016
1017   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1018 }
1019
1020
1021 /**
1022  * Create a new PSYC message from a multicast message for sending it to clients.
1023  */
1024 static inline struct GNUNET_PSYC_MessageHeader *
1025 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
1026 {
1027   struct GNUNET_PSYC_MessageHeader *pmsg;
1028   uint16_t size = ntohs (mmsg->header.size);
1029   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1030
1031   pmsg = GNUNET_malloc (psize);
1032   psyc_msg_init (pmsg, mmsg, flags);
1033   return pmsg;
1034 }
1035
1036
1037 /**
1038  * Send multicast message to all clients connected to the channel.
1039  */
1040 static void
1041 client_send_mcast_msg (struct Channel *chn,
1042                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1043                        uint32_t flags)
1044 {
1045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046               "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1047               chn,
1048               GNUNET_ntohll (mmsg->fragment_id),
1049               GNUNET_ntohll (mmsg->message_id));
1050
1051   struct GNUNET_PSYC_MessageHeader *
1052     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1053   client_send_msg (chn, &pmsg->header);
1054   GNUNET_free (pmsg);
1055 }
1056
1057
1058 /**
1059  * Send multicast request to all clients connected to the channel.
1060  */
1061 static void
1062 client_send_mcast_req (struct Master *mst,
1063                        const struct GNUNET_MULTICAST_RequestHeader *req)
1064 {
1065   struct Channel *chn = &mst->channel;
1066
1067   struct GNUNET_PSYC_MessageHeader *pmsg;
1068   uint16_t size = ntohs (req->header.size);
1069   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1070
1071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072               "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1073               chn,
1074               GNUNET_ntohll (req->fragment_id),
1075               GNUNET_ntohll (req->request_id));
1076
1077   pmsg = GNUNET_malloc (psize);
1078   pmsg->header.size = htons (psize);
1079   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1080   pmsg->message_id = req->request_id;
1081   pmsg->fragment_offset = req->fragment_offset;
1082   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1083   pmsg->slave_pub_key = req->member_pub_key;
1084   GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1085
1086   client_send_msg (chn, &pmsg->header);
1087
1088   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1089
1090   GNUNET_free (pmsg);
1091 }
1092
1093
1094 /**
1095  * Insert a multicast message fragment into the queue belonging to the message.
1096  *
1097  * @param chn          Channel.
1098  * @param mmsg         Multicast message fragment.
1099  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1100  * @param first_ptype  First PSYC message part type in @a mmsg.
1101  * @param last_ptype   Last PSYC message part type in @a mmsg.
1102  */
1103 static void
1104 fragment_queue_insert (struct Channel *chn,
1105                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1106                        uint16_t first_ptype, uint16_t last_ptype)
1107 {
1108   const uint16_t size = ntohs (mmsg->header.size);
1109   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1110   struct GNUNET_CONTAINER_MultiHashMap
1111     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1112                                                     &chn->pub_key_hash);
1113
1114   struct GNUNET_HashCode msg_id_hash;
1115   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1116
1117   struct FragmentQueue
1118     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1119
1120   if (NULL == fragq)
1121   {
1122     fragq = GNUNET_malloc (sizeof (*fragq));
1123     fragq->state = MSG_FRAG_STATE_HEADER;
1124     fragq->fragments
1125       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1126
1127     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1128                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1129
1130     if (NULL == chan_msgs)
1131     {
1132       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1133       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1134                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1135     }
1136   }
1137
1138   struct GNUNET_HashCode frag_id_hash;
1139   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1140   struct RecvCacheEntry
1141     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1142   if (NULL == cache_entry)
1143   {
1144     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1145                 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1146                 chn,
1147                 GNUNET_ntohll (mmsg->message_id),
1148                 GNUNET_ntohll (mmsg->fragment_id));
1149     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1150                 "%p header_size: %" PRIu64 " + %u\n",
1151                 chn,
1152                 fragq->header_size,
1153                 size);
1154     cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1155     cache_entry->ref_count = 1;
1156     cache_entry->mmsg = GNUNET_malloc (size);
1157     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1158     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1159                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1160   }
1161   else
1162   {
1163     cache_entry->ref_count++;
1164     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165                 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1166                 chn,
1167                 GNUNET_ntohll (mmsg->message_id),
1168                 GNUNET_ntohll (mmsg->fragment_id),
1169                 cache_entry->ref_count);
1170   }
1171
1172   if (MSG_FRAG_STATE_HEADER == fragq->state)
1173   {
1174     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1175     {
1176       struct GNUNET_PSYC_MessageMethod *
1177         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1178       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1179       fragq->flags = ntohl (pmeth->flags);
1180     }
1181
1182     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1183     {
1184       fragq->header_size += size;
1185     }
1186     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1187              || frag_offset == fragq->header_size)
1188     { /* header is now complete */
1189       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190                   "%p Header of message %" PRIu64 " is complete.\n",
1191                   chn,
1192                   GNUNET_ntohll (mmsg->message_id));
1193
1194       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195                   "%p Adding message %" PRIu64 " to queue.\n",
1196                   chn,
1197                   GNUNET_ntohll (mmsg->message_id));
1198       fragq->state = MSG_FRAG_STATE_DATA;
1199     }
1200     else
1201     {
1202       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203                   "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1204                   chn,
1205                   GNUNET_ntohll (mmsg->message_id),
1206                   frag_offset,
1207                   fragq->header_size);
1208     }
1209   }
1210
1211   switch (last_ptype)
1212   {
1213   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1214     if (frag_offset == fragq->size)
1215       fragq->state = MSG_FRAG_STATE_END;
1216     else
1217       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218                   "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1219                   chn,
1220                   GNUNET_ntohll (mmsg->message_id),
1221                   frag_offset,
1222                   fragq->size);
1223     break;
1224
1225   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1226     /* Drop message without delivering to client if it's a single fragment */
1227     fragq->state =
1228       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1229       ? MSG_FRAG_STATE_DROP
1230       : MSG_FRAG_STATE_CANCEL;
1231   }
1232
1233   switch (fragq->state)
1234   {
1235   case MSG_FRAG_STATE_DATA:
1236   case MSG_FRAG_STATE_END:
1237   case MSG_FRAG_STATE_CANCEL:
1238     if (GNUNET_NO == fragq->is_queued)
1239     {
1240       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1241                                     GNUNET_ntohll (mmsg->message_id));
1242       fragq->is_queued = GNUNET_YES;
1243     }
1244   }
1245
1246   fragq->size += size;
1247   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1248                                 GNUNET_ntohll (mmsg->fragment_id));
1249 }
1250
1251
1252 /**
1253  * Run fragment queue of a message.
1254  *
1255  * Send fragments of a message in order to client, after all modifiers arrived
1256  * from multicast.
1257  *
1258  * @param chn
1259  *        Channel.
1260  * @param msg_id
1261  *        ID of the message @a fragq belongs to.
1262  * @param fragq
1263  *        Fragment queue of the message.
1264  * @param drop
1265  *        Drop message without delivering to client?
1266  *        #GNUNET_YES or #GNUNET_NO.
1267  */
1268 static void
1269 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1270                     struct FragmentQueue *fragq, uint8_t drop)
1271 {
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273               "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1274               chn,
1275               msg_id,
1276               fragq->state);
1277
1278   struct GNUNET_CONTAINER_MultiHashMap
1279     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1280                                                     &chn->pub_key_hash);
1281   GNUNET_assert (NULL != chan_msgs);
1282   uint64_t frag_id;
1283
1284   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1285                                                     &frag_id))
1286   {
1287     struct GNUNET_HashCode frag_id_hash;
1288     hash_key_from_hll (&frag_id_hash, frag_id);
1289     struct RecvCacheEntry *cache_entry
1290       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1291     if (cache_entry != NULL)
1292     {
1293       if (GNUNET_NO == drop)
1294       {
1295         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1296       }
1297       if (cache_entry->ref_count <= 1)
1298       {
1299         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1300                                               cache_entry);
1301         GNUNET_free (cache_entry->mmsg);
1302         GNUNET_free (cache_entry);
1303       }
1304       else
1305       {
1306         cache_entry->ref_count--;
1307       }
1308     }
1309 #if CACHE_AGING_IMPLEMENTED
1310     else if (GNUNET_NO == drop)
1311     {
1312       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1313     }
1314 #endif
1315
1316     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1317   }
1318
1319   if (MSG_FRAG_STATE_END <= fragq->state)
1320   {
1321     struct GNUNET_HashCode msg_id_hash;
1322     hash_key_from_hll (&msg_id_hash, msg_id);
1323
1324     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1325     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1326     GNUNET_free (fragq);
1327   }
1328   else
1329   {
1330     fragq->is_queued = GNUNET_NO;
1331   }
1332 }
1333
1334
1335 struct StateModifyClosure
1336 {
1337   struct Channel *channel;
1338   uint64_t msg_id;
1339   struct GNUNET_HashCode msg_id_hash;
1340 };
1341
1342
1343 void
1344 store_recv_state_modify_result (void *cls, int64_t result,
1345                                 const char *err_msg, uint16_t err_msg_size)
1346 {
1347   struct StateModifyClosure *mcls = cls;
1348   struct Channel *chn = mcls->channel;
1349   uint64_t msg_id = mcls->msg_id;
1350
1351   struct FragmentQueue *
1352     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1353
1354   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1356               chn, result, err_msg_size, err_msg);
1357
1358   switch (result)
1359   {
1360   case GNUNET_OK:
1361   case GNUNET_NO:
1362     if (NULL != fragq)
1363       fragq->state_is_modified = GNUNET_YES;
1364     if (chn->max_state_message_id < msg_id)
1365       chn->max_state_message_id = msg_id;
1366     if (chn->max_message_id < msg_id)
1367       chn->max_message_id = msg_id;
1368
1369     if (NULL != fragq)
1370       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1371     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1372     message_queue_run (chn);
1373     break;
1374
1375   default:
1376     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1377                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1378                 chn, result, err_msg_size, err_msg);
1379     /** @todo FIXME: handle state_modify error */
1380   }
1381 }
1382
1383
1384 /**
1385  * Run message queue.
1386  *
1387  * Send messages in queue to client in order after a message has arrived from
1388  * multicast, according to the following:
1389  * - A message is only sent if all of its modifiers arrived.
1390  * - A stateful message is only sent if the previous stateful message
1391  *   has already been delivered to the client.
1392  *
1393  * @param chn  Channel.
1394  *
1395  * @return Number of messages removed from queue and sent to client.
1396  */
1397 static uint64_t
1398 message_queue_run (struct Channel *chn)
1399 {
1400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401               "%p Running message queue.\n", chn);
1402   uint64_t n = 0;
1403   uint64_t msg_id;
1404
1405   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1406                                                     &msg_id))
1407   {
1408     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1409                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1410     struct GNUNET_HashCode msg_id_hash;
1411     hash_key_from_hll (&msg_id_hash, msg_id);
1412
1413     struct FragmentQueue *
1414       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1415
1416     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1417     {
1418       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419                   "%p No fragq (%p) or header not complete.\n",
1420                   chn, fragq);
1421       break;
1422     }
1423
1424     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425                 "%p Fragment queue entry:  state: %u, state delta: "
1426                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1427                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1428
1429     if (MSG_FRAG_STATE_DATA <= fragq->state)
1430     {
1431       /* Check if there's a missing message before the current one */
1432       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1433       {
1434         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1435
1436         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1437             && (chn->max_message_id != msg_id - 1
1438                 && chn->max_message_id != msg_id))
1439         {
1440           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441                       "%p Out of order message. "
1442                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1443                       chn, chn->max_message_id, msg_id);
1444           break;
1445           // FIXME: keep track of messages processed in this queue run,
1446           //        and only stop after reaching the end
1447         }
1448       }
1449       else
1450       {
1451         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1452         if (GNUNET_YES != fragq->state_is_modified)
1453         {
1454           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1455           {
1456             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1457                         "%p Out of order stateful message. "
1458                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1459                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1460             break;
1461             // FIXME: keep track of messages processed in this queue run,
1462             //        and only stop after reaching the end
1463           }
1464
1465           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1466           mcls->channel = chn;
1467           mcls->msg_id = msg_id;
1468           mcls->msg_id_hash = msg_id_hash;
1469
1470           /* Apply modifiers to state in PSYCstore */
1471           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1472                                          fragq->state_delta,
1473                                          store_recv_state_modify_result, mcls);
1474           break; // continue after asynchronous state modify result
1475         }
1476       }
1477       chn->max_message_id = msg_id;
1478     }
1479     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1480     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1481     n++;
1482   }
1483
1484   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1485               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1486   return n;
1487 }
1488
1489
1490 /**
1491  * Drop message queue of a channel.
1492  *
1493  * Remove all messages in queue without sending it to clients.
1494  *
1495  * @param chn  Channel.
1496  *
1497  * @return Number of messages removed from queue.
1498  */
1499 static uint64_t
1500 message_queue_drop (struct Channel *chn)
1501 {
1502   uint64_t n = 0;
1503   uint64_t msg_id;
1504   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1505                                                     &msg_id))
1506   {
1507     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1508                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1509     struct GNUNET_HashCode msg_id_hash;
1510     hash_key_from_hll (&msg_id_hash, msg_id);
1511
1512     struct FragmentQueue *
1513       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1514     GNUNET_assert (NULL != fragq);
1515     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1516     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1517     n++;
1518   }
1519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1521   return n;
1522 }
1523
1524
1525 /**
1526  * Received result of GNUNET_PSYCSTORE_fragment_store().
1527  */
1528 static void
1529 store_recv_fragment_store_result (void *cls, int64_t result,
1530                                   const char *err_msg, uint16_t err_msg_size)
1531 {
1532   struct Channel *chn = cls;
1533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1535               chn, result, err_msg_size, err_msg);
1536 }
1537
1538
1539 /**
1540  * Handle incoming message fragment from multicast.
1541  *
1542  * Store it using PSYCstore and send it to the clients of the channel in order.
1543  */
1544 static void
1545 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1546 {
1547   struct Channel *chn = cls;
1548   uint16_t size = ntohs (mmsg->header.size);
1549
1550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551               "%p Received multicast message of size %u. "
1552               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1553               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1554               chn, size,
1555               GNUNET_ntohll (mmsg->fragment_id),
1556               GNUNET_ntohll (mmsg->message_id),
1557               GNUNET_ntohll (mmsg->fragment_offset),
1558               GNUNET_ntohll (mmsg->flags));
1559
1560   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1561                                    &store_recv_fragment_store_result, chn);
1562
1563   uint16_t first_ptype = 0, last_ptype = 0;
1564   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1565                                                (const char *) &mmsg[1],
1566                                                &first_ptype, &last_ptype);
1567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1568               "%p Message check result %d, first part type %u, last part type %u\n",
1569               chn, check, first_ptype, last_ptype);
1570   if (GNUNET_SYSERR == check)
1571   {
1572     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1573                 "%p Dropping incoming multicast message with invalid parts.\n",
1574                 chn);
1575     GNUNET_break_op (0);
1576     return;
1577   }
1578
1579   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1580   message_queue_run (chn);
1581 }
1582
1583
1584 /**
1585  * Incoming request fragment from multicast for a master.
1586  *
1587  * @param cls   Master.
1588  * @param req   The request.
1589  */
1590 static void
1591 mcast_recv_request (void *cls,
1592                     const struct GNUNET_MULTICAST_RequestHeader *req)
1593 {
1594   struct Master *mst = cls;
1595   uint16_t size = ntohs (req->header.size);
1596
1597   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599               "%p Received multicast request of size %u from %s.\n",
1600               mst, size, str);
1601   GNUNET_free (str);
1602
1603   uint16_t first_ptype = 0, last_ptype = 0;
1604   if (GNUNET_SYSERR
1605       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1606                                           (const char *) &req[1],
1607                                           &first_ptype, &last_ptype))
1608   {
1609     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1610                 "%p Dropping incoming multicast request with invalid parts.\n",
1611                 mst);
1612     GNUNET_break_op (0);
1613     return;
1614   }
1615
1616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617               "Message parts: first: type %u, last: type %u\n",
1618               first_ptype, last_ptype);
1619
1620   /* FIXME: in-order delivery */
1621   client_send_mcast_req (mst, req);
1622 }
1623
1624
1625 /**
1626  * Response from PSYCstore with the current counter values for a channel master.
1627  */
1628 static void
1629 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1630                             uint64_t max_message_id, uint64_t max_group_generation,
1631                             uint64_t max_state_message_id)
1632 {
1633   struct Master *mst = cls;
1634   struct Channel *chn = &mst->channel;
1635   chn->store_op = NULL;
1636
1637   struct GNUNET_PSYC_CountersResultMessage res;
1638   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1639   res.header.size = htons (sizeof (res));
1640   res.result_code = htonl (result);
1641   res.max_message_id = GNUNET_htonll (max_message_id);
1642
1643   if (GNUNET_OK == result || GNUNET_NO == result)
1644   {
1645     mst->max_message_id = max_message_id;
1646     chn->max_message_id = max_message_id;
1647     chn->max_state_message_id = max_state_message_id;
1648     mst->max_group_generation = max_group_generation;
1649     mst->origin
1650       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1651                                        mcast_recv_join_request,
1652                                        mcast_recv_replay_fragment,
1653                                        mcast_recv_replay_message,
1654                                        mcast_recv_request,
1655                                        mcast_recv_message, chn);
1656     chn->is_ready = GNUNET_YES;
1657   }
1658   else
1659   {
1660     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1661                 "%p GNUNET_PSYCSTORE_counters_get() "
1662                 "returned %d for channel %s.\n",
1663                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1664   }
1665
1666   client_send_msg (chn, &res.header);
1667 }
1668
1669
1670 /**
1671  * Response from PSYCstore with the current counter values for a channel slave.
1672  */
1673 void
1674 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1675                            uint64_t max_message_id, uint64_t max_group_generation,
1676                            uint64_t max_state_message_id)
1677 {
1678   struct Slave *slv = cls;
1679   struct Channel *chn = &slv->channel;
1680   chn->store_op = NULL;
1681
1682   struct GNUNET_PSYC_CountersResultMessage res;
1683   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1684   res.header.size = htons (sizeof (res));
1685   res.result_code = htonl (result);
1686   res.max_message_id = GNUNET_htonll (max_message_id);
1687
1688   if (GNUNET_YES == result || GNUNET_NO == result)
1689   {
1690     chn->max_message_id = max_message_id;
1691     chn->max_state_message_id = max_state_message_id;
1692     slv->member
1693       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1694                                       &slv->origin,
1695                                       slv->relay_count, slv->relays,
1696                                       &slv->join_msg->header,
1697                                       mcast_recv_join_request,
1698                                       mcast_recv_join_decision,
1699                                       mcast_recv_replay_fragment,
1700                                       mcast_recv_replay_message,
1701                                       mcast_recv_message, chn);
1702     if (NULL != slv->join_msg)
1703     {
1704       GNUNET_free (slv->join_msg);
1705       slv->join_msg = NULL;
1706     }
1707   }
1708   else
1709   {
1710     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1711                 "%p GNUNET_PSYCSTORE_counters_get() "
1712                 "returned %d for channel %s.\n",
1713                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1714   }
1715
1716   client_send_msg (chn, &res.header);
1717 }
1718
1719
1720 static void
1721 channel_init (struct Channel *chn)
1722 {
1723   chn->recv_msgs
1724     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1725   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1726 }
1727
1728
1729 /**
1730  * Handle a connecting client starting a channel master.
1731  */
1732 static void
1733 handle_client_master_start (void *cls,
1734                             const struct MasterStartRequest *req)
1735 {
1736   struct Client *c = cls;
1737   struct GNUNET_SERVICE_Client *client = c->client;
1738
1739   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1740   struct GNUNET_HashCode pub_key_hash;
1741
1742   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1743   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1744
1745   struct Master *
1746     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1747   struct Channel *chn;
1748
1749   if (NULL == mst)
1750   {
1751     mst = GNUNET_malloc (sizeof (*mst));
1752     mst->policy = ntohl (req->policy);
1753     mst->priv_key = req->channel_key;
1754     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1755
1756     chn = c->channel = &mst->channel;
1757     chn->master = mst;
1758     chn->is_master = GNUNET_YES;
1759     chn->pub_key = pub_key;
1760     chn->pub_key_hash = pub_key_hash;
1761     channel_init (chn);
1762
1763     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1764                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1765     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1766                                                    store_recv_master_counters, mst);
1767   }
1768   else
1769   {
1770     chn = &mst->channel;
1771
1772     struct GNUNET_PSYC_CountersResultMessage *res;
1773     struct GNUNET_MQ_Envelope *
1774       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1775     res->result_code = htonl (GNUNET_OK);
1776     res->max_message_id = GNUNET_htonll (mst->max_message_id);
1777
1778     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1779   }
1780
1781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782               "%p Client connected as master to channel %s.\n",
1783               mst, GNUNET_h2s (&chn->pub_key_hash));
1784
1785   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1786   cli->client = client;
1787   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1788
1789   GNUNET_SERVICE_client_continue (client);
1790 }
1791
1792
1793 static int
1794 check_client_slave_join (void *cls,
1795                          const struct SlaveJoinRequest *req)
1796 {
1797   return GNUNET_OK;
1798 }
1799
1800
1801 /**
1802  * Handle a connecting client joining as a channel slave.
1803  */
1804 static void
1805 handle_client_slave_join (void *cls,
1806                           const struct SlaveJoinRequest *req)
1807 {
1808   struct Client *c = cls;
1809   struct GNUNET_SERVICE_Client *client = c->client;
1810
1811   uint16_t req_size = ntohs (req->header.size);
1812
1813   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1814   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1815
1816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1817               "got join request from client %p\n",
1818               client);
1819   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1820   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1821   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1822
1823   struct GNUNET_CONTAINER_MultiHashMap *
1824     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1825   struct Slave *slv = NULL;
1826   struct Channel *chn;
1827
1828   if (NULL != chn_slv)
1829   {
1830     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1831   }
1832   if (NULL == slv)
1833   {
1834     slv = GNUNET_malloc (sizeof (*slv));
1835     slv->priv_key = req->slave_key;
1836     slv->pub_key = slv_pub_key;
1837     slv->pub_key_hash = slv_pub_hash;
1838     slv->origin = req->origin;
1839     slv->relay_count = ntohl (req->relay_count);
1840     slv->join_flags = ntohl (req->flags);
1841
1842     const struct GNUNET_PeerIdentity *
1843       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1844     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1845     uint16_t join_msg_size = 0;
1846
1847     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1848         <= req_size)
1849     {
1850       struct GNUNET_PSYC_Message *
1851         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1852       join_msg_size = ntohs (join_msg->header.size);
1853       slv->join_msg = GNUNET_malloc (join_msg_size);
1854       GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1855     }
1856     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1857     {
1858       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1859                   "%u + %u + %u != %u\n",
1860                   (unsigned int) sizeof (*req),
1861                   relay_size,
1862                   join_msg_size,
1863                   req_size);
1864       GNUNET_break (0);
1865       GNUNET_SERVICE_client_drop (client);
1866       GNUNET_free (slv);
1867       return;
1868     }
1869     if (0 < slv->relay_count)
1870     {
1871       slv->relays = GNUNET_malloc (relay_size);
1872       GNUNET_memcpy (slv->relays, &req[1], relay_size);
1873     }
1874
1875     chn = c->channel = &slv->channel;
1876     chn->slave = slv;
1877     chn->is_master = GNUNET_NO;
1878     chn->pub_key = req->channel_pub_key;
1879     chn->pub_key_hash = pub_key_hash;
1880     channel_init (chn);
1881
1882     if (NULL == chn_slv)
1883     {
1884       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1885       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1886                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1887     }
1888     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1889                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1890     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1891                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1892     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1893                                                    &store_recv_slave_counters, slv);
1894   }
1895   else
1896   {
1897     chn = &slv->channel;
1898
1899     struct GNUNET_PSYC_CountersResultMessage *res;
1900
1901     struct GNUNET_MQ_Envelope *
1902       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1903     res->result_code = htonl (GNUNET_OK);
1904     res->max_message_id = GNUNET_htonll (chn->max_message_id);
1905
1906     GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1907
1908     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1909     {
1910       mcast_recv_join_decision (slv, GNUNET_YES,
1911                                 NULL, 0, NULL, NULL);
1912     }
1913     else if (NULL == slv->member)
1914     {
1915       slv->member
1916         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1917                                         &slv->origin,
1918                                         slv->relay_count, slv->relays,
1919                                         &slv->join_msg->header,
1920                                         &mcast_recv_join_request,
1921                                         &mcast_recv_join_decision,
1922                                         &mcast_recv_replay_fragment,
1923                                         &mcast_recv_replay_message,
1924                                         &mcast_recv_message, chn);
1925       if (NULL != slv->join_msg)
1926       {
1927         GNUNET_free (slv->join_msg);
1928         slv->join_msg = NULL;
1929       }
1930     }
1931     else if (NULL != slv->join_dcsn)
1932     {
1933       struct GNUNET_MQ_Envelope *
1934         env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1935       GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1936     }
1937   }
1938
1939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1940               "Client %p connected as slave to channel %s.\n",
1941               client,
1942               GNUNET_h2s (&chn->pub_key_hash));
1943
1944   struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1945   cli->client = client;
1946   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1947
1948   GNUNET_SERVICE_client_continue (client);
1949 }
1950
1951
1952 struct JoinDecisionClosure
1953 {
1954   int32_t is_admitted;
1955   struct GNUNET_MessageHeader *msg;
1956 };
1957
1958
1959 /**
1960  * Iterator callback for sending join decisions to multicast.
1961  */
1962 static int
1963 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1964                           void *value)
1965 {
1966   struct JoinDecisionClosure *jcls = cls;
1967   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1968   // FIXME: add relays
1969   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1970   return GNUNET_YES;
1971 }
1972
1973
1974 static int
1975 check_client_join_decision (void *cls,
1976                             const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1977 {
1978   return GNUNET_OK;
1979 }
1980
1981
1982 /**
1983  * Join decision from client.
1984  */
1985 static void
1986 handle_client_join_decision (void *cls,
1987                              const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1988 {
1989   struct Client *c = cls;
1990   struct GNUNET_SERVICE_Client *client = c->client;
1991   struct Channel *chn = c->channel;
1992   if (NULL == chn)
1993   {
1994     GNUNET_break (0);
1995     GNUNET_SERVICE_client_drop (client);
1996     return;
1997   }
1998   GNUNET_assert (GNUNET_YES == chn->is_master);
1999   struct Master *mst = chn->master;
2000
2001   struct JoinDecisionClosure jcls;
2002   jcls.is_admitted = ntohl (dcsn->is_admitted);
2003   jcls.msg
2004     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
2005     ? (struct GNUNET_MessageHeader *) &dcsn[1]
2006     : NULL;
2007
2008   struct GNUNET_HashCode slave_pub_hash;
2009   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
2010                       &slave_pub_hash);
2011
2012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013               "%p Got join decision (%d) from client for channel %s..\n",
2014               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
2015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016               "%p ..and slave %s.\n",
2017               mst, GNUNET_h2s (&slave_pub_hash));
2018
2019   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
2020                                               &mcast_send_join_decision, &jcls);
2021   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
2022   GNUNET_SERVICE_client_continue (client);
2023 }
2024
2025
2026 static void
2027 channel_part_cb (void *cls)
2028 {
2029   struct GNUNET_SERVICE_Client *client = cls;
2030   struct GNUNET_MQ_Envelope *env;
2031
2032   env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2033   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2034                   env);
2035 }
2036
2037
2038 static void
2039 handle_client_part_request (void *cls,
2040                             const struct GNUNET_MessageHeader *msg)
2041 {
2042   struct Client *c = cls;
2043
2044   c->channel->is_disconnecting = GNUNET_YES;
2045   if (GNUNET_YES == c->channel->is_master)
2046   {
2047     struct Master *mst = (struct Master *) c->channel;
2048    
2049     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050                 "Got part request from master %p\n",
2051                 mst);
2052     GNUNET_assert (NULL != mst->origin);
2053     GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2054   }
2055   else
2056   {
2057     struct Slave *slv = (struct Slave *) c->channel;
2058
2059     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2060                 "Got part request from slave %p\n",
2061                 slv);
2062     GNUNET_assert (NULL != slv->member);
2063     GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2064   }
2065   GNUNET_SERVICE_client_continue (c->client);
2066 }
2067
2068
2069 /**
2070  * Send acknowledgement to a client.
2071  *
2072  * Sent after a message fragment has been passed on to multicast.
2073  *
2074  * @param chn The channel struct for the client.
2075  */
2076 static void
2077 send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2078 {
2079   struct GNUNET_MessageHeader *res;
2080   struct GNUNET_MQ_Envelope *
2081       env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
2082
2083   /* FIXME? */
2084   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2085 }
2086
2087
2088 /**
2089  * Callback for the transmit functions of multicast.
2090  */
2091 static int
2092 transmit_notify (void *cls, size_t *data_size, void *data)
2093 {
2094   struct Channel *chn = cls;
2095   struct TransmitMessage *tmit_msg = chn->tmit_head;
2096
2097   if (NULL == tmit_msg || *data_size < tmit_msg->size)
2098   {
2099     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100                 "%p transmit_notify: nothing to send.\n", chn);
2101     if (NULL != tmit_msg && *data_size < tmit_msg->size)
2102       GNUNET_break (0);
2103     *data_size = 0;
2104     return GNUNET_NO;
2105   }
2106
2107   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2108               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2109
2110   *data_size = tmit_msg->size;
2111   GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2112
2113   int ret
2114     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2115     ? GNUNET_NO
2116     : GNUNET_YES;
2117
2118   /* FIXME: handle disconnecting clients */
2119   if (NULL != tmit_msg->client)
2120     send_message_ack (chn, tmit_msg->client);
2121
2122   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2123
2124   if (NULL != chn->tmit_head)
2125   {
2126     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2127   }
2128   else if (GNUNET_YES == chn->is_disconnecting
2129            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2130   {
2131     /* FIXME: handle partial message (when still in_transmit) */
2132     GNUNET_free (tmit_msg);
2133     return GNUNET_SYSERR;
2134   }
2135   GNUNET_free (tmit_msg);
2136   return ret;
2137 }
2138
2139
2140 /**
2141  * Callback for the transmit functions of multicast.
2142  */
2143 static int
2144 master_transmit_notify (void *cls, size_t *data_size, void *data)
2145 {
2146   int ret = transmit_notify (cls, data_size, data);
2147
2148   if (GNUNET_YES == ret)
2149   {
2150     struct Master *mst = cls;
2151     mst->tmit_handle = NULL;
2152   }
2153   return ret;
2154 }
2155
2156
2157 /**
2158  * Callback for the transmit functions of multicast.
2159  */
2160 static int
2161 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2162 {
2163   int ret = transmit_notify (cls, data_size, data);
2164
2165   if (GNUNET_YES == ret)
2166   {
2167     struct Slave *slv = cls;
2168     slv->tmit_handle = NULL;
2169   }
2170   return ret;
2171 }
2172
2173
2174 /**
2175  * Transmit a message from a channel master to the multicast group.
2176  */
2177 static void
2178 master_transmit_message (struct Master *mst)
2179 {
2180   struct Channel *chn = &mst->channel;
2181   struct TransmitMessage *tmit_msg = chn->tmit_head;
2182   if (NULL == tmit_msg)
2183     return;
2184   if (NULL == mst->tmit_handle)
2185   {
2186     mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin,
2187                                                        tmit_msg->id,
2188                                                        mst->max_group_generation,
2189                                                        &master_transmit_notify,
2190                                                        mst);
2191   }
2192   else
2193   {
2194     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2195   }
2196 }
2197
2198
2199 /**
2200  * Transmit a message from a channel slave to the multicast group.
2201  */
2202 static void
2203 slave_transmit_message (struct Slave *slv)
2204 {
2205   if (NULL == slv->channel.tmit_head)
2206     return;
2207   if (NULL == slv->tmit_handle)
2208   {
2209     slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
2210                                                           slv->channel.tmit_head->id,
2211                                                           &slave_transmit_notify,
2212                                                           slv);
2213   }
2214   else
2215   {
2216     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2217   }
2218 }
2219
2220
2221 static void
2222 transmit_message (struct Channel *chn)
2223 {
2224   chn->is_master
2225     ? master_transmit_message (chn->master)
2226     : slave_transmit_message (chn->slave);
2227 }
2228
2229
2230 /**
2231  * Queue a message from a channel master for sending to the multicast group.
2232  */
2233 static void
2234 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2235 {
2236   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2237   {
2238     tmit_msg->id = ++mst->max_message_id;
2239     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2240                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2241                 mst, tmit_msg->id);
2242     struct GNUNET_PSYC_MessageMethod *pmeth
2243       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2244
2245     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2246     {
2247       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2248     }
2249     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2250     {
2251       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2252                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2253                   mst, tmit_msg->id - mst->max_state_message_id);
2254       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2255                                           - mst->max_state_message_id);
2256       mst->max_state_message_id = tmit_msg->id;
2257     }
2258     else
2259     {
2260         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2261                     "%p master_queue_message: state not modified\n", mst);
2262       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2263     }
2264
2265     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2266     {
2267       /// @todo add state_hash to PSYC header
2268     }
2269   }
2270 }
2271
2272
2273 /**
2274  * Queue a message from a channel slave for sending to the multicast group.
2275  */
2276 static void
2277 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2278 {
2279   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2280   {
2281     struct GNUNET_PSYC_MessageMethod *pmeth
2282       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2283     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2284     tmit_msg->id = ++slv->max_request_id;
2285   }
2286 }
2287
2288
2289 /**
2290  * Queue PSYC message parts for sending to multicast.
2291  *
2292  * @param chn
2293  *        Channel to send to.
2294  * @param client
2295  *        Client the message originates from.
2296  * @param data_size
2297  *        Size of @a data.
2298  * @param data
2299  *        Concatenated message parts.
2300  * @param first_ptype
2301  *        First message part type in @a data.
2302  * @param last_ptype
2303  *        Last message part type in @a data.
2304  */
2305 static struct TransmitMessage *
2306 queue_message (struct Channel *chn,
2307                struct GNUNET_SERVICE_Client *client,
2308                size_t data_size,
2309                const void *data,
2310                uint16_t first_ptype, uint16_t last_ptype)
2311 {
2312   struct TransmitMessage *
2313     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2314   GNUNET_memcpy (&tmit_msg[1], data, data_size);
2315   tmit_msg->client = client;
2316   tmit_msg->size = data_size;
2317   tmit_msg->first_ptype = first_ptype;
2318   tmit_msg->last_ptype = last_ptype;
2319
2320   /* FIXME: separate queue per message ID */
2321
2322   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2323
2324   chn->is_master
2325     ? master_queue_message (chn->master, tmit_msg)
2326     : slave_queue_message (chn->slave, tmit_msg);
2327   return tmit_msg;
2328 }
2329
2330
2331 /**
2332  * Cancel transmission of current message.
2333  *
2334  * @param chn     Channel to send to.
2335  * @param client  Client the message originates from.
2336  */
2337 static void
2338 transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2339 {
2340   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2341
2342   struct GNUNET_MessageHeader msg;
2343   msg.size = htons (sizeof (msg));
2344   msg.type = htons (type);
2345
2346   queue_message (chn, client, sizeof (msg), &msg, type, type);
2347   transmit_message (chn);
2348
2349   /* FIXME: cleanup */
2350 }
2351
2352
2353 static int
2354 check_client_psyc_message (void *cls,
2355                            const struct GNUNET_MessageHeader *msg)
2356 {
2357   return GNUNET_OK;
2358 }
2359
2360
2361 /**
2362  * Incoming message from a master or slave client.
2363  */
2364 static void
2365 handle_client_psyc_message (void *cls,
2366                             const struct GNUNET_MessageHeader *msg)
2367 {
2368   struct Client *c = cls;
2369   struct GNUNET_SERVICE_Client *client = c->client;
2370   struct Channel *chn = c->channel;
2371   if (NULL == chn)
2372   {
2373     GNUNET_break (0);
2374     GNUNET_SERVICE_client_drop (client);
2375     return;
2376   }
2377
2378   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2379               "%p Received message from client.\n", chn);
2380   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2381
2382   if (GNUNET_YES != chn->is_ready)
2383   {
2384     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2385                 "%p Channel is not ready yet, disconnecting client %p.\n",
2386                 chn,
2387                 client);
2388     GNUNET_break (0);
2389     GNUNET_SERVICE_client_drop (client);
2390     return;
2391   }
2392
2393   uint16_t size = ntohs (msg->size);
2394   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2395   {
2396     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2397                 "%p Message payload too large: %u < %u.\n",
2398                 chn,
2399                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2400                 (unsigned int) (size - sizeof (*msg)));
2401     GNUNET_break (0);
2402     transmit_cancel (chn, client);
2403     GNUNET_SERVICE_client_drop (client);
2404     return;
2405   }
2406
2407   uint16_t first_ptype = 0, last_ptype = 0;
2408   if (GNUNET_SYSERR
2409       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2410                                           (const char *) &msg[1],
2411                                           &first_ptype, &last_ptype))
2412   {
2413     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2414                 "%p Received invalid message part from client.\n", chn);
2415     GNUNET_break (0);
2416     transmit_cancel (chn, client);
2417     GNUNET_SERVICE_client_drop (client);
2418     return;
2419   }
2420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2421               "%p Received message with first part type %u and last part type %u.\n",
2422               chn, first_ptype, last_ptype);
2423
2424   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2425                  first_ptype, last_ptype);
2426   transmit_message (chn);
2427   /* FIXME: send a few ACKs even before transmit_notify is called */
2428
2429   GNUNET_SERVICE_client_continue (client);
2430 };
2431
2432
2433 /**
2434  * Received result of GNUNET_PSYCSTORE_membership_store()
2435  */
2436 static void
2437 store_recv_membership_store_result (void *cls,
2438                                     int64_t result,
2439                                     const char *err_msg,
2440                                     uint16_t err_msg_size)
2441 {
2442   struct Operation *op = cls;
2443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2444               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2445               op->channel,
2446               result,
2447               (int) err_msg_size,
2448               err_msg);
2449
2450   if (NULL != op->client)
2451     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2452   op_remove (op);
2453 }
2454
2455
2456 /**
2457  * Client requests to add/remove a slave in the membership database.
2458  */
2459 static void
2460 handle_client_membership_store (void *cls,
2461                                 const struct ChannelMembershipStoreRequest *req)
2462 {
2463   struct Client *c = cls;
2464   struct GNUNET_SERVICE_Client *client = c->client;
2465   struct Channel *chn = c->channel;
2466   if (NULL == chn)
2467   {
2468     GNUNET_break (0);
2469     GNUNET_SERVICE_client_drop (client);
2470     return;
2471   }
2472
2473   struct Operation *op = op_add (chn, client, req->op_id, 0);
2474
2475   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2476   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2478               "%p Received membership store request from client.\n", chn);
2479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2481               chn, req->did_join, announced_at, effective_since);
2482
2483   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2484                                      req->did_join, announced_at, effective_since,
2485                                      0, /* FIXME: group_generation */
2486                                      &store_recv_membership_store_result, op);
2487   GNUNET_SERVICE_client_continue (client);
2488 }
2489
2490
2491 /**
2492  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2493  * in response to a history request from a client.
2494  */
2495 static int
2496 store_recv_fragment_history (void *cls,
2497                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2498                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2499 {
2500   struct Operation *op = cls;
2501   if (NULL == op->client)
2502   { /* Requesting client already disconnected. */
2503     return GNUNET_NO;
2504   }
2505   struct Channel *chn = op->channel;
2506
2507   struct GNUNET_PSYC_MessageHeader *pmsg;
2508   uint16_t msize = ntohs (mmsg->header.size);
2509   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2510
2511   struct GNUNET_OperationResultMessage *
2512     res = GNUNET_malloc (sizeof (*res) + psize);
2513   res->header.size = htons (sizeof (*res) + psize);
2514   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2515   res->op_id = op->op_id;
2516   res->result_code = GNUNET_htonll (GNUNET_OK);
2517
2518   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2519   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2520   GNUNET_memcpy (&res[1], pmsg, psize);
2521
2522   /** @todo FIXME: send only to requesting client */
2523   client_send_msg (chn, &res->header);
2524
2525   GNUNET_free (res);
2526   return GNUNET_YES;
2527 }
2528
2529
2530 /**
2531  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2532  * in response to a history request from a client.
2533  */
2534 static void
2535 store_recv_fragment_history_result (void *cls, int64_t result,
2536                                     const char *err_msg, uint16_t err_msg_size)
2537 {
2538   struct Operation *op = cls;
2539   if (NULL == op->client)
2540   { /* Requesting client already disconnected. */
2541     return;
2542   }
2543
2544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2545               "%p History replay #%" PRIu64 ": "
2546               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2547               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2548
2549   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2550   {
2551     /** @todo Multicast replay request for messages not found locally. */
2552   }
2553
2554   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2555   op_remove (op);
2556 }
2557
2558
2559 static int
2560 check_client_history_replay (void *cls,
2561                              const struct GNUNET_PSYC_HistoryRequestMessage *req)
2562 {
2563   return GNUNET_OK;
2564 }
2565
2566
2567 /**
2568  * Client requests channel history.
2569  */
2570 static void
2571 handle_client_history_replay (void *cls,
2572                               const struct GNUNET_PSYC_HistoryRequestMessage *req)
2573 {
2574   struct Client *c = cls;
2575   struct GNUNET_SERVICE_Client *client = c->client;
2576   struct Channel *chn = c->channel;
2577   if (NULL == chn)
2578   {
2579     GNUNET_break (0);
2580     GNUNET_SERVICE_client_drop (client);
2581     return;
2582   }
2583
2584   uint16_t size = ntohs (req->header.size);
2585   const char *method_prefix = (const char *) &req[1];
2586
2587   if (size < sizeof (*req) + 1
2588       || '\0' != method_prefix[size - sizeof (*req) - 1])
2589   {
2590     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2591                 "%p History replay #%" PRIu64 ": "
2592                 "invalid method prefix. size: %u < %u?\n",
2593                 chn,
2594                 GNUNET_ntohll (req->op_id),
2595                 size,
2596                 (unsigned int) sizeof (*req) + 1);
2597     GNUNET_break (0);
2598     GNUNET_SERVICE_client_drop (client);
2599     return;
2600   }
2601
2602   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2603
2604   if (0 == req->message_limit)
2605   {
2606     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2607                                   GNUNET_ntohll (req->start_message_id),
2608                                   GNUNET_ntohll (req->end_message_id),
2609                                   0, method_prefix,
2610                                   &store_recv_fragment_history,
2611                                   &store_recv_fragment_history_result, op);
2612   }
2613   else
2614   {
2615     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2616                                          GNUNET_ntohll (req->message_limit),
2617                                          method_prefix,
2618                                          &store_recv_fragment_history,
2619                                          &store_recv_fragment_history_result,
2620                                          op);
2621   }
2622   GNUNET_SERVICE_client_continue (client);
2623 }
2624
2625
2626 /**
2627  * Received state var from PSYCstore, send it to client.
2628  */
2629 static int
2630 store_recv_state_var (void *cls, const char *name,
2631                       const void *value, uint32_t value_size)
2632 {
2633   struct Operation *op = cls;
2634   struct GNUNET_OperationResultMessage *res;
2635   struct GNUNET_MQ_Envelope *env;
2636
2637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2638               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2639               op->channel, GNUNET_ntohll (op->op_id), name);
2640
2641   if (NULL != name) /* First part */
2642   {
2643     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2644     struct GNUNET_PSYC_MessageModifier *mod;
2645     env = GNUNET_MQ_msg_extra (res,
2646                                sizeof (*mod) + name_size + value_size,
2647                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2648     res->op_id = op->op_id;
2649
2650     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2651     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2652     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2653     mod->name_size = htons (name_size);
2654     mod->value_size = htonl (value_size);
2655     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2656     GNUNET_memcpy (&mod[1], name, name_size);
2657     GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2658   }
2659   else /* Continuation */
2660   {
2661     struct GNUNET_MessageHeader *mod;
2662     env = GNUNET_MQ_msg_extra (res,
2663                                sizeof (*mod) + value_size,
2664                                GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2665     res->op_id = op->op_id;
2666
2667     mod = (struct GNUNET_MessageHeader *) &res[1];
2668     mod->size = htons (sizeof (*mod) + value_size);
2669     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2670     GNUNET_memcpy (&mod[1], value, value_size);
2671   }
2672
2673   // FIXME: client might have been disconnected
2674   GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2675   return GNUNET_YES;
2676 }
2677
2678
2679 /**
2680  * Received result of GNUNET_PSYCSTORE_state_get()
2681  * or GNUNET_PSYCSTORE_state_get_prefix()
2682  */
2683 static void
2684 store_recv_state_result (void *cls, int64_t result,
2685                          const char *err_msg, uint16_t err_msg_size)
2686 {
2687   struct Operation *op = cls;
2688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2689               "%p state_get #%" PRIu64 ": "
2690               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2691               op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2692
2693   // FIXME: client might have been disconnected
2694   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2695   op_remove (op);
2696 }
2697
2698
2699 static int
2700 check_client_state_get (void *cls,
2701                          const struct StateRequest *req)
2702 {
2703   struct Client *c = cls;
2704   struct Channel *chn = c->channel;
2705   if (NULL == chn)
2706   {
2707     GNUNET_break (0);
2708     return GNUNET_SYSERR;
2709   }
2710
2711   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2712   const char *name = (const char *) &req[1];
2713   if (0 == name_size || '\0' != name[name_size - 1])
2714   {
2715     GNUNET_break (0);
2716     return GNUNET_SYSERR;
2717   }
2718
2719   return GNUNET_OK;
2720 }
2721
2722
2723 /**
2724  * Client requests best matching state variable from PSYCstore.
2725  */
2726 static void
2727 handle_client_state_get (void *cls,
2728                          const struct StateRequest *req)
2729 {
2730   struct Client *c = cls;
2731   struct GNUNET_SERVICE_Client *client = c->client;
2732   struct Channel *chn = c->channel;
2733
2734   const char *name = (const char *) &req[1];
2735   struct Operation *op = op_add (chn, client, req->op_id, 0);
2736   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2737                               &store_recv_state_var,
2738                               &store_recv_state_result, op);
2739   GNUNET_SERVICE_client_continue (client);
2740 }
2741
2742
2743 static int
2744 check_client_state_get_prefix (void *cls,
2745                                const struct StateRequest *req)
2746 {
2747   struct Client *c = cls;
2748   struct Channel *chn = c->channel;
2749   if (NULL == chn)
2750   {
2751     GNUNET_break (0);
2752     return GNUNET_SYSERR;
2753   }
2754
2755   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2756   const char *name = (const char *) &req[1];
2757   if (0 == name_size || '\0' != name[name_size - 1])
2758   {
2759     GNUNET_break (0);
2760     return GNUNET_SYSERR;
2761   }
2762
2763   return GNUNET_OK;
2764 }
2765
2766
2767 /**
2768  * Client requests state variables with a given prefix from PSYCstore.
2769  */
2770 static void
2771 handle_client_state_get_prefix (void *cls,
2772                                 const struct StateRequest *req)
2773 {
2774   struct Client *c = cls;
2775   struct GNUNET_SERVICE_Client *client = c->client;
2776   struct Channel *chn = c->channel;
2777
2778   const char *name = (const char *) &req[1];
2779   struct Operation *op = op_add (chn, client, req->op_id, 0);
2780   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2781                                      &store_recv_state_var,
2782                                      &store_recv_state_result, op);
2783   GNUNET_SERVICE_client_continue (client);
2784 }
2785
2786
2787 /**
2788  * Initialize the PSYC service.
2789  *
2790  * @param cls Closure.
2791  * @param server The initialized server.
2792  * @param c Configuration to use.
2793  */
2794 static void
2795 run (void *cls,
2796      const struct GNUNET_CONFIGURATION_Handle *c,
2797      struct GNUNET_SERVICE_Handle *svc)
2798 {
2799   cfg = c;
2800   service = svc;
2801   store = GNUNET_PSYCSTORE_connect (cfg);
2802   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2803   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2804   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2805   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2806   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2807   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2808 }
2809
2810
2811 /**
2812  * Define "main" method using service macro.
2813  */
2814 GNUNET_SERVICE_MAIN
2815 ("psyc",
2816  GNUNET_SERVICE_OPTION_NONE,
2817  &run,
2818  &client_notify_connect,
2819  &client_notify_disconnect,
2820  NULL,
2821  GNUNET_MQ_hd_fixed_size (client_master_start,
2822                           GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2823                           struct MasterStartRequest,
2824                           NULL),
2825  GNUNET_MQ_hd_var_size (client_slave_join,
2826                         GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2827                         struct SlaveJoinRequest,
2828                         NULL),
2829  GNUNET_MQ_hd_var_size (client_join_decision,
2830                         GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2831                         struct GNUNET_PSYC_JoinDecisionMessage,
2832                         NULL),
2833  GNUNET_MQ_hd_fixed_size (client_part_request,
2834                           GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2835                           struct GNUNET_MessageHeader,
2836                           NULL),
2837  GNUNET_MQ_hd_var_size (client_psyc_message,
2838                         GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2839                         struct GNUNET_MessageHeader,
2840                         NULL),
2841  GNUNET_MQ_hd_fixed_size (client_membership_store,
2842                           GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2843                           struct ChannelMembershipStoreRequest,
2844                           NULL),
2845  GNUNET_MQ_hd_var_size (client_history_replay,
2846                         GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2847                         struct GNUNET_PSYC_HistoryRequestMessage,
2848                         NULL),
2849  GNUNET_MQ_hd_var_size (client_state_get,
2850                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2851                         struct StateRequest,
2852                         NULL),
2853  GNUNET_MQ_hd_var_size (client_state_get_prefix,
2854                         GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2855                         struct StateRequest,
2856                         NULL));
2857
2858 /* end of gnunet-service-psyc.c */