-rps service: prevent division by zero
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 static void
421 transmit_message (struct Channel *chn);
422
423 static uint64_t
424 message_queue_run (struct Channel *chn);
425
426 static uint64_t
427 message_queue_drop (struct Channel *chn);
428
429
430 static void
431 schedule_transmit_message (void *cls)
432 {
433   struct Channel *chn = cls;
434
435   transmit_message (chn);
436 }
437
438
439 /**
440  * Task run during shutdown.
441  *
442  * @param cls unused
443  */
444 static void
445 shutdown_task (void *cls)
446 {
447   if (NULL != nc)
448   {
449     GNUNET_SERVER_notification_context_destroy (nc);
450     nc = NULL;
451   }
452   if (NULL != stats)
453   {
454     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
455     stats = NULL;
456   }
457 }
458
459
460 static struct Operation *
461 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
462         uint64_t op_id, uint32_t flags)
463 {
464   struct Operation *op = GNUNET_malloc (sizeof (*op));
465   op->client = client;
466   op->chn = chn;
467   op->op_id = op_id;
468   op->flags = flags;
469   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
470   return op;
471 }
472
473
474 static void
475 op_remove (struct Operation *op)
476 {
477   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
478   GNUNET_free (op);
479 }
480
481
482 /**
483  * Clean up master data structures after a client disconnected.
484  */
485 static void
486 cleanup_master (struct Master *mst)
487 {
488   struct Channel *chn = &mst->chn;
489
490   if (NULL != mst->origin)
491     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
492   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
493   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
494 }
495
496
497 /**
498  * Clean up slave data structures after a client disconnected.
499  */
500 static void
501 cleanup_slave (struct Slave *slv)
502 {
503   struct Channel *chn = &slv->chn;
504   struct GNUNET_CONTAINER_MultiHashMap *
505     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
506                                                 &chn->pub_key_hash);
507   GNUNET_assert (NULL != chn_slv);
508   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
509
510   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
511   {
512     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
513                                           chn_slv);
514     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
515   }
516   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
517
518   if (NULL != slv->join_msg)
519   {
520     GNUNET_free (slv->join_msg);
521     slv->join_msg = NULL;
522   }
523   if (NULL != slv->relays)
524   {
525     GNUNET_free (slv->relays);
526     slv->relays = NULL;
527   }
528   if (NULL != slv->member)
529   {
530     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
531     slv->member = NULL;
532   }
533   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
534 }
535
536
537 /**
538  * Clean up channel data structures after a client disconnected.
539  */
540 static void
541 cleanup_channel (struct Channel *chn)
542 {
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "%p Cleaning up channel %s. master? %u\n",
545               chn,
546               GNUNET_h2s (&chn->pub_key_hash),
547               chn->is_master);
548   message_queue_drop (chn);
549   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
550   chn->recv_frags = NULL;
551
552   if (NULL != chn->store_op)
553   {
554     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
555     chn->store_op = NULL;
556   }
557
558   (GNUNET_YES == chn->is_master)
559     ? cleanup_master ((struct Master *) chn)
560     : cleanup_slave ((struct Slave *) chn);
561   GNUNET_free (chn);
562 }
563
564
565 /**
566  * Called whenever a client is disconnected.
567  * Frees our resources associated with that client.
568  *
569  * @param cls Closure.
570  * @param client Identification of the client.
571  */
572 static void
573 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
574 {
575   if (NULL == client)
576     return;
577
578   struct Channel *
579     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
580
581   if (NULL == chn)
582   {
583     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584                 "%p User context is NULL in client_disconnect()\n",
585                 chn);
586     GNUNET_break (0);
587     return;
588   }
589
590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591               "%p Client (%s) disconnected from channel %s\n",
592               chn,
593               (GNUNET_YES == chn->is_master) ? "master" : "slave",
594               GNUNET_h2s (&chn->pub_key_hash));
595
596   struct Client *cli = chn->clients_head;
597   while (NULL != cli)
598   {
599     if (cli->client == client)
600     {
601       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
602       GNUNET_free (cli);
603       break;
604     }
605     cli = cli->next;
606   }
607
608   struct Operation *op = chn->op_head;
609   while (NULL != op)
610   {
611     if (op->client == client)
612     {
613       op->client = NULL;
614       break;
615     }
616     op = op->next;
617   }
618
619   if (NULL == chn->clients_head)
620   { /* Last client disconnected. */
621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                 "%p Last client (%s) disconnected from channel %s\n",
623                 chn,
624                 (GNUNET_YES == chn->is_master) ? "master" : "slave",
625                 GNUNET_h2s (&chn->pub_key_hash));
626     chn->is_disconnected = GNUNET_YES;
627     if (NULL != chn->tmit_head)
628     { /* Send pending messages to multicast before cleanup. */
629       transmit_message (chn);
630     }
631     else
632     {
633       cleanup_channel (chn);
634     }
635   }
636 }
637
638
639 /**
640  * Send message to all clients connected to the channel.
641  */
642 static void
643 client_send_msg (const struct Channel *chn,
644                  const struct GNUNET_MessageHeader *msg)
645 {
646   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
647               "%p Sending message to clients.\n",
648               chn);
649
650   struct Client *cli = chn->clients_head;
651   while (NULL != cli)
652   {
653     GNUNET_SERVER_notification_context_add (nc, cli->client);
654     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
655     cli = cli->next;
656   }
657 }
658
659
660 /**
661  * Send a result code back to the client.
662  *
663  * @param client
664  *        Client that should receive the result code.
665  * @param result_code
666  *        Code to transmit.
667  * @param op_id
668  *        Operation ID in network byte order.
669  * @param data
670  *        Data payload or NULL.
671  * @param data_size
672  *        Size of @a data.
673  */
674 static void
675 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
676                     int64_t result_code, const void *data, uint16_t data_size)
677 {
678   struct GNUNET_OperationResultMessage *res;
679
680   res = GNUNET_malloc (sizeof (*res) + data_size);
681   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
682   res->header.size = htons (sizeof (*res) + data_size);
683   res->result_code = GNUNET_htonll (result_code);
684   res->op_id = op_id;
685   if (0 < data_size)
686     GNUNET_memcpy (&res[1], data, data_size);
687
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n",
690               client,
691               GNUNET_ntohll (op_id),
692               result_code,
693               data_size);
694
695   GNUNET_SERVER_notification_context_add (nc, client);
696   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
697                                               GNUNET_NO);
698   GNUNET_free (res);
699 }
700
701
702 /**
703  * Closure for join_mem_test_cb()
704  */
705 struct JoinMemTestClosure
706 {
707   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
708   struct Channel *chn;
709   struct GNUNET_MULTICAST_JoinHandle *jh;
710   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
711 };
712
713
714 /**
715  * Membership test result callback used for join requests.
716  */
717 static void
718 join_mem_test_cb (void *cls, int64_t result,
719                   const char *err_msg, uint16_t err_msg_size)
720 {
721   struct JoinMemTestClosure *jcls = cls;
722
723   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
724   { /* Pass on join request to client if this is a master channel */
725     struct Master *mst = (struct Master *) jcls->chn;
726     struct GNUNET_HashCode slave_pub_hash;
727     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
728                         &slave_pub_hash);
729     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
730                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
731     client_send_msg (jcls->chn, &jcls->join_msg->header);
732   }
733   else
734   {
735     if (GNUNET_SYSERR == result)
736     {
737       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
738                   "Could not perform membership test (%.*s)\n",
739                   err_msg_size, err_msg);
740     }
741     // FIXME: add relays
742     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
743   }
744   GNUNET_free (jcls->join_msg);
745   GNUNET_free (jcls);
746 }
747
748
749 /**
750  * Incoming join request from multicast.
751  */
752 static void
753 mcast_recv_join_request (void *cls,
754                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
755                          const struct GNUNET_MessageHeader *join_msg,
756                          struct GNUNET_MULTICAST_JoinHandle *jh)
757 {
758   struct Channel *chn = cls;
759   uint16_t join_msg_size = 0;
760
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "%p Got join request.\n",
763               chn);
764   if (NULL != join_msg)
765   {
766     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
767     {
768       join_msg_size = ntohs (join_msg->size);
769     }
770     else
771     {
772       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
773                   "%p Got join message with invalid type %u.\n",
774                   chn,
775                   ntohs (join_msg->type));
776     }
777   }
778
779   struct GNUNET_PSYC_JoinRequestMessage *
780     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
781   req->header.size = htons (sizeof (*req) + join_msg_size);
782   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
783   req->slave_pub_key = *slave_pub_key;
784   if (0 < join_msg_size)
785     GNUNET_memcpy (&req[1], join_msg, join_msg_size);
786
787   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
788   jcls->slave_pub_key = *slave_pub_key;
789   jcls->chn = chn;
790   jcls->jh = jh;
791   jcls->join_msg = req;
792
793   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
794                                     chn->max_message_id, 0,
795                                     &join_mem_test_cb, jcls);
796 }
797
798
799 /**
800  * Join decision received from multicast.
801  */
802 static void
803 mcast_recv_join_decision (void *cls, int is_admitted,
804                           const struct GNUNET_PeerIdentity *peer,
805                           uint16_t relay_count,
806                           const struct GNUNET_PeerIdentity *relays,
807                           const struct GNUNET_MessageHeader *join_resp)
808 {
809   struct Slave *slv = cls;
810   struct Channel *chn = &slv->chn;
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "%p Got join decision: %d\n",
813               slv,
814               is_admitted);
815   if (GNUNET_YES == chn->is_ready)
816   {
817     /* Already admitted */
818     return;
819   }
820
821   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
822   struct GNUNET_PSYC_JoinDecisionMessage *
823     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
824   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
825   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
826   dcsn->is_admitted = htonl (is_admitted);
827   if (0 < join_resp_size)
828     GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
829
830   client_send_msg (chn, &dcsn->header);
831
832   if (GNUNET_YES == is_admitted
833       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
834   {
835     chn->is_ready = GNUNET_YES;
836   }
837 }
838
839
840 static int
841 store_recv_fragment_replay (void *cls,
842                             struct GNUNET_MULTICAST_MessageHeader *msg,
843                             enum GNUNET_PSYCSTORE_MessageFlags flags)
844 {
845   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
846
847   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
848   return GNUNET_YES;
849 }
850
851
852 /**
853  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
854  */
855 static void
856 store_recv_fragment_replay_result (void *cls,
857                                    int64_t result,
858                                    const char *err_msg,
859                                    uint16_t err_msg_size)
860 {
861   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
862
863   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
864               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
865               rh,
866               result,
867               err_msg_size,
868               err_msg);
869   switch (result)
870   {
871   case GNUNET_YES:
872     break;
873
874   case GNUNET_NO:
875     GNUNET_MULTICAST_replay_response (rh, NULL,
876                                       GNUNET_MULTICAST_REC_NOT_FOUND);
877     return;
878
879   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
880     GNUNET_MULTICAST_replay_response (rh, NULL,
881                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
882     return;
883
884   case GNUNET_SYSERR:
885     GNUNET_MULTICAST_replay_response (rh, NULL,
886                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
887     return;
888   }
889   /* GNUNET_MULTICAST_replay_response frees 'rh' when passed
890    * an error code, so it must be ensured no further processing
891    * is attempted on 'rh'. Maybe this should be refactored as
892    * it doesn't look very intuitive.    --lynX
893    */
894   GNUNET_MULTICAST_replay_response_end (rh);
895 }
896
897
898 /**
899  * Incoming fragment replay request from multicast.
900  */
901 static void
902 mcast_recv_replay_fragment (void *cls,
903                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
904                             uint64_t fragment_id, uint64_t flags,
905                             struct GNUNET_MULTICAST_ReplayHandle *rh)
906
907 {
908   struct Channel *chn = cls;
909   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
910                                  fragment_id, fragment_id,
911                                  &store_recv_fragment_replay,
912                                  &store_recv_fragment_replay_result, rh);
913 }
914
915
916 /**
917  * Incoming message replay request from multicast.
918  */
919 static void
920 mcast_recv_replay_message (void *cls,
921                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
922                            uint64_t message_id,
923                            uint64_t fragment_offset,
924                            uint64_t flags,
925                            struct GNUNET_MULTICAST_ReplayHandle *rh)
926 {
927   struct Channel *chn = cls;
928   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
929                                 message_id, message_id, 1, NULL,
930                                 &store_recv_fragment_replay,
931                                 &store_recv_fragment_replay_result, rh);
932 }
933
934
935 /**
936  * Convert an uint64_t in network byte order to a HashCode
937  * that can be used as key in a MultiHashMap
938  */
939 static inline void
940 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
941 {
942   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
943   /* TODO: use built-in byte swap functions if available */
944
945   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
946   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
947
948   *key = (struct GNUNET_HashCode) {};
949   *((uint64_t *) key)
950     = (n << 32) | (n >> 32);
951 }
952
953
954 /**
955  * Convert an uint64_t in host byte order to a HashCode
956  * that can be used as key in a MultiHashMap
957  */
958 static inline void
959 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
960 {
961 #if __BYTE_ORDER == __BIG_ENDIAN
962   hash_key_from_nll (key, n);
963 #elif __BYTE_ORDER == __LITTLE_ENDIAN
964   *key = (struct GNUNET_HashCode) {};
965   *((uint64_t *) key) = n;
966 #else
967   #error byteorder undefined
968 #endif
969 }
970
971
972 /**
973  * Initialize PSYC message header.
974  */
975 static inline void
976 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
977                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
978 {
979   uint16_t size = ntohs (mmsg->header.size);
980   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
981
982   pmsg->header.size = htons (psize);
983   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
984   pmsg->message_id = mmsg->message_id;
985   pmsg->fragment_offset = mmsg->fragment_offset;
986   pmsg->flags = htonl (flags);
987
988   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
989 }
990
991
992 /**
993  * Create a new PSYC message from a multicast message for sending it to clients.
994  */
995 static inline struct GNUNET_PSYC_MessageHeader *
996 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
997 {
998   struct GNUNET_PSYC_MessageHeader *pmsg;
999   uint16_t size = ntohs (mmsg->header.size);
1000   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1001
1002   pmsg = GNUNET_malloc (psize);
1003   psyc_msg_init (pmsg, mmsg, flags);
1004   return pmsg;
1005 }
1006
1007
1008 /**
1009  * Send multicast message to all clients connected to the channel.
1010  */
1011 static void
1012 client_send_mcast_msg (struct Channel *chn,
1013                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1014                        uint32_t flags)
1015 {
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017               "%p Sending multicast message to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1018               chn,
1019               GNUNET_ntohll (mmsg->fragment_id),
1020               GNUNET_ntohll (mmsg->message_id));
1021
1022   struct GNUNET_PSYC_MessageHeader *
1023     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1024   client_send_msg (chn, &pmsg->header);
1025   GNUNET_free (pmsg);
1026 }
1027
1028
1029 /**
1030  * Send multicast request to all clients connected to the channel.
1031  */
1032 static void
1033 client_send_mcast_req (struct Master *mst,
1034                        const struct GNUNET_MULTICAST_RequestHeader *req)
1035 {
1036   struct Channel *chn = &mst->chn;
1037
1038   struct GNUNET_PSYC_MessageHeader *pmsg;
1039   uint16_t size = ntohs (req->header.size);
1040   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1041
1042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043               "%p Sending multicast request to client. fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1044               chn,
1045               GNUNET_ntohll (req->fragment_id),
1046               GNUNET_ntohll (req->request_id));
1047
1048   pmsg = GNUNET_malloc (psize);
1049   pmsg->header.size = htons (psize);
1050   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1051   pmsg->message_id = req->request_id;
1052   pmsg->fragment_offset = req->fragment_offset;
1053   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1054   pmsg->slave_pub_key = req->member_pub_key;
1055   GNUNET_memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1056
1057   client_send_msg (chn, &pmsg->header);
1058
1059   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1060
1061   GNUNET_free (pmsg);
1062 }
1063
1064
1065 /**
1066  * Insert a multicast message fragment into the queue belonging to the message.
1067  *
1068  * @param chn          Channel.
1069  * @param mmsg         Multicast message fragment.
1070  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1071  * @param first_ptype  First PSYC message part type in @a mmsg.
1072  * @param last_ptype   Last PSYC message part type in @a mmsg.
1073  */
1074 static void
1075 fragment_queue_insert (struct Channel *chn,
1076                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1077                        uint16_t first_ptype, uint16_t last_ptype)
1078 {
1079   const uint16_t size = ntohs (mmsg->header.size);
1080   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1081   struct GNUNET_CONTAINER_MultiHashMap
1082     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1083                                                     &chn->pub_key_hash);
1084
1085   struct GNUNET_HashCode msg_id_hash;
1086   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1087
1088   struct FragmentQueue
1089     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1090
1091   if (NULL == fragq)
1092   {
1093     fragq = GNUNET_new (struct FragmentQueue);
1094     fragq->state = MSG_FRAG_STATE_HEADER;
1095     fragq->fragments
1096       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1097
1098     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1099                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1100
1101     if (NULL == chan_msgs)
1102     {
1103       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1104       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1105                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1106     }
1107   }
1108
1109   struct GNUNET_HashCode frag_id_hash;
1110   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1111   struct RecvCacheEntry
1112     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1113   if (NULL == cache_entry)
1114   {
1115     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116                 "%p Adding message fragment to cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1117                 chn,
1118                 GNUNET_ntohll (mmsg->message_id),
1119                 GNUNET_ntohll (mmsg->fragment_id));
1120     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121                 "%p header_size: %" PRIu64 " + %u\n",
1122                 chn,
1123                 fragq->header_size,
1124                 size);
1125     cache_entry = GNUNET_new (struct RecvCacheEntry);
1126     cache_entry->ref_count = 1;
1127     cache_entry->mmsg = GNUNET_malloc (size);
1128     GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
1129     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1130                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1131   }
1132   else
1133   {
1134     cache_entry->ref_count++;
1135     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136                 "%p Message fragment is already in cache. message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n",
1137                 chn,
1138                 GNUNET_ntohll (mmsg->message_id),
1139                 GNUNET_ntohll (mmsg->fragment_id),
1140                 cache_entry->ref_count);
1141   }
1142
1143   if (MSG_FRAG_STATE_HEADER == fragq->state)
1144   {
1145     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1146     {
1147       struct GNUNET_PSYC_MessageMethod *
1148         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1149       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1150       fragq->flags = ntohl (pmeth->flags);
1151     }
1152
1153     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1154     {
1155       fragq->header_size += size;
1156     }
1157     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1158              || frag_offset == fragq->header_size)
1159     { /* header is now complete */
1160       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161                   "%p Header of message %" PRIu64 " is complete.\n",
1162                   chn,
1163                   GNUNET_ntohll (mmsg->message_id));
1164
1165       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1166                   "%p Adding message %" PRIu64 " to queue.\n",
1167                   chn,
1168                   GNUNET_ntohll (mmsg->message_id));
1169       fragq->state = MSG_FRAG_STATE_DATA;
1170     }
1171     else
1172     {
1173       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1174                   "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1175                   chn,
1176                   GNUNET_ntohll (mmsg->message_id),
1177                   frag_offset,
1178                   fragq->header_size);
1179     }
1180   }
1181
1182   switch (last_ptype)
1183   {
1184   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1185     if (frag_offset == fragq->size)
1186       fragq->state = MSG_FRAG_STATE_END;
1187     else
1188       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1189                   "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1190                   chn,
1191                   GNUNET_ntohll (mmsg->message_id),
1192                   frag_offset,
1193                   fragq->size);
1194     break;
1195
1196   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1197     /* Drop message without delivering to client if it's a single fragment */
1198     fragq->state =
1199       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1200       ? MSG_FRAG_STATE_DROP
1201       : MSG_FRAG_STATE_CANCEL;
1202   }
1203
1204   switch (fragq->state)
1205   {
1206   case MSG_FRAG_STATE_DATA:
1207   case MSG_FRAG_STATE_END:
1208   case MSG_FRAG_STATE_CANCEL:
1209     if (GNUNET_NO == fragq->is_queued)
1210     {
1211       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1212                                     GNUNET_ntohll (mmsg->message_id));
1213       fragq->is_queued = GNUNET_YES;
1214     }
1215   }
1216
1217   fragq->size += size;
1218   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1219                                 GNUNET_ntohll (mmsg->fragment_id));
1220 }
1221
1222
1223 /**
1224  * Run fragment queue of a message.
1225  *
1226  * Send fragments of a message in order to client, after all modifiers arrived
1227  * from multicast.
1228  *
1229  * @param chn
1230  *        Channel.
1231  * @param msg_id
1232  *        ID of the message @a fragq belongs to.
1233  * @param fragq
1234  *        Fragment queue of the message.
1235  * @param drop
1236  *        Drop message without delivering to client?
1237  *        #GNUNET_YES or #GNUNET_NO.
1238  */
1239 static void
1240 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1241                     struct FragmentQueue *fragq, uint8_t drop)
1242 {
1243   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1244               "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1245               chn,
1246               msg_id,
1247               fragq->state);
1248
1249   struct GNUNET_CONTAINER_MultiHashMap
1250     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1251                                                     &chn->pub_key_hash);
1252   GNUNET_assert (NULL != chan_msgs);
1253   uint64_t frag_id;
1254
1255   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1256                                                     &frag_id))
1257   {
1258     struct GNUNET_HashCode frag_id_hash;
1259     hash_key_from_hll (&frag_id_hash, frag_id);
1260     struct RecvCacheEntry *cache_entry
1261       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1262     if (cache_entry != NULL)
1263     {
1264       if (GNUNET_NO == drop)
1265       {
1266         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1267       }
1268       if (cache_entry->ref_count <= 1)
1269       {
1270         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1271                                               cache_entry);
1272         GNUNET_free (cache_entry->mmsg);
1273         GNUNET_free (cache_entry);
1274       }
1275       else
1276       {
1277         cache_entry->ref_count--;
1278       }
1279     }
1280 #if CACHE_AGING_IMPLEMENTED
1281     else if (GNUNET_NO == drop)
1282     {
1283       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1284     }
1285 #endif
1286
1287     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1288   }
1289
1290   if (MSG_FRAG_STATE_END <= fragq->state)
1291   {
1292     struct GNUNET_HashCode msg_id_hash;
1293     hash_key_from_hll (&msg_id_hash, msg_id);
1294
1295     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1296     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1297     GNUNET_free (fragq);
1298   }
1299   else
1300   {
1301     fragq->is_queued = GNUNET_NO;
1302   }
1303 }
1304
1305
1306 struct StateModifyClosure
1307 {
1308   struct Channel *chn;
1309   uint64_t msg_id;
1310   struct GNUNET_HashCode msg_id_hash;
1311 };
1312
1313
1314 void
1315 store_recv_state_modify_result (void *cls, int64_t result,
1316                                 const char *err_msg, uint16_t err_msg_size)
1317 {
1318   struct StateModifyClosure *mcls = cls;
1319   struct Channel *chn = mcls->chn;
1320   uint64_t msg_id = mcls->msg_id;
1321
1322   struct FragmentQueue *
1323     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1324
1325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1327               chn, result, err_msg_size, err_msg);
1328
1329   switch (result)
1330   {
1331   case GNUNET_OK:
1332   case GNUNET_NO:
1333     if (NULL != fragq)
1334       fragq->state_is_modified = GNUNET_YES;
1335     if (chn->max_state_message_id < msg_id)
1336       chn->max_state_message_id = msg_id;
1337     if (chn->max_message_id < msg_id)
1338       chn->max_message_id = msg_id;
1339
1340     if (NULL != fragq)
1341       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1342     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1343     message_queue_run (chn);
1344     break;
1345
1346   default:
1347     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1348                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1349                 chn, result, err_msg_size, err_msg);
1350     /** @todo FIXME: handle state_modify error */
1351   }
1352 }
1353
1354
1355 /**
1356  * Run message queue.
1357  *
1358  * Send messages in queue to client in order after a message has arrived from
1359  * multicast, according to the following:
1360  * - A message is only sent if all of its modifiers arrived.
1361  * - A stateful message is only sent if the previous stateful message
1362  *   has already been delivered to the client.
1363  *
1364  * @param chn  Channel.
1365  *
1366  * @return Number of messages removed from queue and sent to client.
1367  */
1368 static uint64_t
1369 message_queue_run (struct Channel *chn)
1370 {
1371   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1372               "%p Running message queue.\n", chn);
1373   uint64_t n = 0;
1374   uint64_t msg_id;
1375
1376   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1377                                                     &msg_id))
1378   {
1379     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1380                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1381     struct GNUNET_HashCode msg_id_hash;
1382     hash_key_from_hll (&msg_id_hash, msg_id);
1383
1384     struct FragmentQueue *
1385       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1386
1387     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1388     {
1389       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1390                   "%p No fragq (%p) or header not complete.\n",
1391                   chn, fragq);
1392       break;
1393     }
1394
1395     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1396                 "%p Fragment queue entry:  state: %u, state delta: "
1397                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1398                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1399
1400     if (MSG_FRAG_STATE_DATA <= fragq->state)
1401     {
1402       /* Check if there's a missing message before the current one */
1403       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1404       {
1405         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1406
1407         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1408             && (chn->max_message_id != msg_id - 1
1409                 && chn->max_message_id != msg_id))
1410         {
1411           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1412                       "%p Out of order message. "
1413                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1414                       chn, chn->max_message_id, msg_id);
1415           break;
1416           // FIXME: keep track of messages processed in this queue run,
1417           //        and only stop after reaching the end
1418         }
1419       }
1420       else
1421       {
1422         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1423         if (GNUNET_YES != fragq->state_is_modified)
1424         {
1425           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1426           {
1427             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1428                         "%p Out of order stateful message. "
1429                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1430                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1431             break;
1432             // FIXME: keep track of messages processed in this queue run,
1433             //        and only stop after reaching the end
1434           }
1435
1436           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1437           mcls->chn = chn;
1438           mcls->msg_id = msg_id;
1439           mcls->msg_id_hash = msg_id_hash;
1440
1441           /* Apply modifiers to state in PSYCstore */
1442           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1443                                          fragq->state_delta,
1444                                          store_recv_state_modify_result, mcls);
1445           break; // continue after asynchronous state modify result
1446         }
1447       }
1448       chn->max_message_id = msg_id;
1449     }
1450     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1451     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1452     n++;
1453   }
1454
1455   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1456               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1457   return n;
1458 }
1459
1460
1461 /**
1462  * Drop message queue of a channel.
1463  *
1464  * Remove all messages in queue without sending it to clients.
1465  *
1466  * @param chn  Channel.
1467  *
1468  * @return Number of messages removed from queue.
1469  */
1470 static uint64_t
1471 message_queue_drop (struct Channel *chn)
1472 {
1473   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1474               "%p Dropping message queue.\n", chn);
1475   uint64_t n = 0;
1476   uint64_t msg_id;
1477   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1478                                                     &msg_id))
1479   {
1480     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1481                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1482     struct GNUNET_HashCode msg_id_hash;
1483     hash_key_from_hll (&msg_id_hash, msg_id);
1484
1485     struct FragmentQueue *
1486       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1487     GNUNET_assert (NULL != fragq);
1488     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1489     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1490     n++;
1491   }
1492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1494   return n;
1495 }
1496
1497
1498 /**
1499  * Received result of GNUNET_PSYCSTORE_fragment_store().
1500  */
1501 static void
1502 store_recv_fragment_store_result (void *cls, int64_t result,
1503                                   const char *err_msg, uint16_t err_msg_size)
1504 {
1505   struct Channel *chn = cls;
1506   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1507               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1508               chn, result, err_msg_size, err_msg);
1509 }
1510
1511
1512 /**
1513  * Handle incoming message fragment from multicast.
1514  *
1515  * Store it using PSYCstore and send it to the clients of the channel in order.
1516  */
1517 static void
1518 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1519 {
1520   struct Channel *chn = cls;
1521   uint16_t size = ntohs (mmsg->header.size);
1522
1523   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524               "%p Received multicast message of size %u. "
1525               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1526               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1527               chn, size,
1528               GNUNET_ntohll (mmsg->fragment_id),
1529               GNUNET_ntohll (mmsg->message_id),
1530               GNUNET_ntohll (mmsg->fragment_offset),
1531               GNUNET_ntohll (mmsg->flags));
1532
1533   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1534                                    &store_recv_fragment_store_result, chn);
1535
1536   uint16_t first_ptype = 0, last_ptype = 0;
1537   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1538                                                (const char *) &mmsg[1],
1539                                                &first_ptype, &last_ptype);
1540   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1541               "%p Message check result %d, first part type %u, last part type %u\n",
1542               chn, check, first_ptype, last_ptype);
1543   if (GNUNET_SYSERR == check)
1544   {
1545     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1546                 "%p Dropping incoming multicast message with invalid parts.\n",
1547                 chn);
1548     GNUNET_break_op (0);
1549     return;
1550   }
1551
1552   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1553   message_queue_run (chn);
1554 }
1555
1556
1557 /**
1558  * Incoming request fragment from multicast for a master.
1559  *
1560  * @param cls   Master.
1561  * @param req   The request.
1562  */
1563 static void
1564 mcast_recv_request (void *cls,
1565                     const struct GNUNET_MULTICAST_RequestHeader *req)
1566 {
1567   struct Master *mst = cls;
1568   uint16_t size = ntohs (req->header.size);
1569
1570   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572               "%p Received multicast request of size %u from %s.\n",
1573               mst, size, str);
1574   GNUNET_free (str);
1575
1576   uint16_t first_ptype = 0, last_ptype = 0;
1577   if (GNUNET_SYSERR
1578       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1579                                           (const char *) &req[1],
1580                                           &first_ptype, &last_ptype))
1581   {
1582     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1583                 "%p Dropping incoming multicast request with invalid parts.\n",
1584                 mst);
1585     GNUNET_break_op (0);
1586     return;
1587   }
1588
1589   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1590               "Message parts: first: type %u, last: type %u\n",
1591               first_ptype, last_ptype);
1592
1593   /* FIXME: in-order delivery */
1594   client_send_mcast_req (mst, req);
1595 }
1596
1597
1598 /**
1599  * Response from PSYCstore with the current counter values for a channel master.
1600  */
1601 static void
1602 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1603                             uint64_t max_message_id, uint64_t max_group_generation,
1604                             uint64_t max_state_message_id)
1605 {
1606   struct Master *mst = cls;
1607   struct Channel *chn = &mst->chn;
1608   chn->store_op = NULL;
1609
1610   struct GNUNET_PSYC_CountersResultMessage res;
1611   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1612   res.header.size = htons (sizeof (res));
1613   res.result_code = htonl (result);
1614   res.max_message_id = GNUNET_htonll (max_message_id);
1615
1616   if (GNUNET_OK == result || GNUNET_NO == result)
1617   {
1618     mst->max_message_id = max_message_id;
1619     chn->max_message_id = max_message_id;
1620     chn->max_state_message_id = max_state_message_id;
1621     mst->max_group_generation = max_group_generation;
1622     mst->origin
1623       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1624                                        mcast_recv_join_request,
1625                                        mcast_recv_replay_fragment,
1626                                        mcast_recv_replay_message,
1627                                        mcast_recv_request,
1628                                        mcast_recv_message, chn);
1629     chn->is_ready = GNUNET_YES;
1630   }
1631   else
1632   {
1633     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1634                 "%p GNUNET_PSYCSTORE_counters_get() "
1635                 "returned %d for channel %s.\n",
1636                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1637   }
1638
1639   client_send_msg (chn, &res.header);
1640 }
1641
1642
1643 /**
1644  * Response from PSYCstore with the current counter values for a channel slave.
1645  */
1646 void
1647 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1648                            uint64_t max_message_id, uint64_t max_group_generation,
1649                            uint64_t max_state_message_id)
1650 {
1651   struct Slave *slv = cls;
1652   struct Channel *chn = &slv->chn;
1653   chn->store_op = NULL;
1654
1655   struct GNUNET_PSYC_CountersResultMessage res;
1656   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1657   res.header.size = htons (sizeof (res));
1658   res.result_code = htonl (result);
1659   res.max_message_id = GNUNET_htonll (max_message_id);
1660
1661   if (GNUNET_OK == result || GNUNET_NO == result)
1662   {
1663     chn->max_message_id = max_message_id;
1664     chn->max_state_message_id = max_state_message_id;
1665     slv->member
1666       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1667                                       &slv->origin,
1668                                       slv->relay_count, slv->relays,
1669                                       &slv->join_msg->header,
1670                                       mcast_recv_join_request,
1671                                       mcast_recv_join_decision,
1672                                       mcast_recv_replay_fragment,
1673                                       mcast_recv_replay_message,
1674                                       mcast_recv_message, chn);
1675     if (NULL != slv->join_msg)
1676     {
1677       GNUNET_free (slv->join_msg);
1678       slv->join_msg = NULL;
1679     }
1680   }
1681   else
1682   {
1683     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1684                 "%p GNUNET_PSYCSTORE_counters_get() "
1685                 "returned %d for channel %s.\n",
1686                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1687   }
1688
1689   client_send_msg (chn, &res.header);
1690 }
1691
1692
1693 static void
1694 channel_init (struct Channel *chn)
1695 {
1696   chn->recv_msgs
1697     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1698   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1699 }
1700
1701
1702 /**
1703  * Handle a connecting client starting a channel master.
1704  */
1705 static void
1706 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1707                           const struct GNUNET_MessageHeader *msg)
1708 {
1709   const struct MasterStartRequest *req
1710     = (const struct MasterStartRequest *) msg;
1711
1712   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1713   struct GNUNET_HashCode pub_key_hash;
1714
1715   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1716   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1717
1718   struct Master *
1719     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1720   struct Channel *chn;
1721
1722   if (NULL == mst)
1723   {
1724     mst = GNUNET_new (struct Master);
1725     mst->policy = ntohl (req->policy);
1726     mst->priv_key = req->channel_key;
1727     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1728
1729     chn = &mst->chn;
1730     chn->is_master = GNUNET_YES;
1731     chn->pub_key = pub_key;
1732     chn->pub_key_hash = pub_key_hash;
1733     channel_init (chn);
1734
1735     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1736                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1737     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1738                                                    store_recv_master_counters, mst);
1739   }
1740   else
1741   {
1742     chn = &mst->chn;
1743
1744     struct GNUNET_PSYC_CountersResultMessage res;
1745     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1746     res.header.size = htons (sizeof (res));
1747     res.result_code = htonl (GNUNET_OK);
1748     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1749
1750     GNUNET_SERVER_notification_context_add (nc, client);
1751     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1752                                                 GNUNET_NO);
1753   }
1754
1755   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1756               "%p Client connected as master to channel %s.\n",
1757               mst, GNUNET_h2s (&chn->pub_key_hash));
1758
1759   struct Client *cli = GNUNET_new (struct Client);
1760   cli->client = client;
1761   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1762
1763   GNUNET_SERVER_client_set_user_context (client, chn);
1764   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1765 }
1766
1767
1768 /**
1769  * Handle a connecting client joining as a channel slave.
1770  */
1771 static void
1772 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1773                         const struct GNUNET_MessageHeader *msg)
1774 {
1775   const struct SlaveJoinRequest *req
1776     = (const struct SlaveJoinRequest *) msg;
1777   uint16_t req_size = ntohs (req->header.size);
1778
1779   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1780   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1781
1782   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1783   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1784   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1785
1786   struct GNUNET_CONTAINER_MultiHashMap *
1787     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1788   struct Slave *slv = NULL;
1789   struct Channel *chn;
1790
1791   if (NULL != chn_slv)
1792   {
1793     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1794   }
1795   if (NULL == slv)
1796   {
1797     slv = GNUNET_new (struct Slave);
1798     slv->priv_key = req->slave_key;
1799     slv->pub_key = slv_pub_key;
1800     slv->pub_key_hash = slv_pub_hash;
1801     slv->origin = req->origin;
1802     slv->relay_count = ntohl (req->relay_count);
1803     slv->join_flags = ntohl (req->flags);
1804
1805     const struct GNUNET_PeerIdentity *
1806       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1807     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1808     uint16_t join_msg_size = 0;
1809
1810     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1811         <= req_size)
1812     {
1813       struct GNUNET_PSYC_Message *
1814         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1815       join_msg_size = ntohs (join_msg->header.size);
1816       slv->join_msg = GNUNET_malloc (join_msg_size);
1817       GNUNET_memcpy (slv->join_msg, join_msg, join_msg_size);
1818     }
1819     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1820     {
1821       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1822                   "%u + %u + %u != %u\n",
1823                   (unsigned int) sizeof (*req),
1824                   relay_size,
1825                   join_msg_size,
1826                   req_size);
1827       GNUNET_break (0);
1828       GNUNET_SERVER_client_disconnect (client);
1829       GNUNET_free (slv);
1830       return;
1831     }
1832     if (0 < slv->relay_count)
1833     {
1834       slv->relays = GNUNET_malloc (relay_size);
1835       GNUNET_memcpy (slv->relays, &req[1], relay_size);
1836     }
1837
1838     chn = &slv->chn;
1839     chn->is_master = GNUNET_NO;
1840     chn->pub_key = req->channel_pub_key;
1841     chn->pub_key_hash = pub_key_hash;
1842     channel_init (chn);
1843
1844     if (NULL == chn_slv)
1845     {
1846       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1847       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1848                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1849     }
1850     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1851                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1852     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1853                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1854     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1855                                                   &store_recv_slave_counters, slv);
1856   }
1857   else
1858   {
1859     chn = &slv->chn;
1860
1861     struct GNUNET_PSYC_CountersResultMessage res;
1862     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1863     res.header.size = htons (sizeof (res));
1864     res.result_code = htonl (GNUNET_OK);
1865     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1866
1867     GNUNET_SERVER_notification_context_add (nc, client);
1868     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1869                                                 GNUNET_NO);
1870
1871     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1872     {
1873       mcast_recv_join_decision (slv, GNUNET_YES,
1874                                 NULL, 0, NULL, NULL);
1875     }
1876     else if (NULL == slv->member)
1877     {
1878       slv->member
1879         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1880                                         &slv->origin,
1881                                         slv->relay_count, slv->relays,
1882                                         &slv->join_msg->header,
1883                                         &mcast_recv_join_request,
1884                                         &mcast_recv_join_decision,
1885                                         &mcast_recv_replay_fragment,
1886                                         &mcast_recv_replay_message,
1887                                         &mcast_recv_message, chn);
1888       if (NULL != slv->join_msg)
1889       {
1890         GNUNET_free (slv->join_msg);
1891         slv->join_msg = NULL;
1892       }
1893     }
1894     else if (NULL != slv->join_dcsn)
1895     {
1896       GNUNET_SERVER_notification_context_add (nc, client);
1897       GNUNET_SERVER_notification_context_unicast (nc, client,
1898                                                   &slv->join_dcsn->header,
1899                                                   GNUNET_NO);
1900     }
1901   }
1902
1903   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904               "%p Client connected as slave to channel %s.\n",
1905               slv, GNUNET_h2s (&chn->pub_key_hash));
1906
1907   struct Client *cli = GNUNET_new (struct Client);
1908   cli->client = client;
1909   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1910
1911   GNUNET_SERVER_client_set_user_context (client, chn);
1912   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1913 }
1914
1915
1916 struct JoinDecisionClosure
1917 {
1918   int32_t is_admitted;
1919   struct GNUNET_MessageHeader *msg;
1920 };
1921
1922
1923 /**
1924  * Iterator callback for sending join decisions to multicast.
1925  */
1926 static int
1927 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1928                           void *value)
1929 {
1930   struct JoinDecisionClosure *jcls = cls;
1931   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1932   // FIXME: add relays
1933   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1934   return GNUNET_YES;
1935 }
1936
1937
1938 /**
1939  * Join decision from client.
1940  */
1941 static void
1942 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1943                            const struct GNUNET_MessageHeader *msg)
1944 {
1945   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1946     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1947   struct Channel *chn;
1948   struct Master *mst;
1949   struct JoinDecisionClosure jcls;
1950
1951   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1952   if (NULL == chn)
1953   {
1954     GNUNET_break (0);
1955     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1956     return;
1957   }
1958   GNUNET_assert (GNUNET_YES == chn->is_master);
1959   mst = (struct Master *) chn;
1960   jcls.is_admitted = ntohl (dcsn->is_admitted);
1961   jcls.msg
1962     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1963     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1964     : NULL;
1965
1966   struct GNUNET_HashCode slave_pub_hash;
1967   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1968                       &slave_pub_hash);
1969
1970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1971               "%p Got join decision (%d) from client for channel %s..\n",
1972               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1974               "%p ..and slave %s.\n",
1975               mst, GNUNET_h2s (&slave_pub_hash));
1976
1977   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1978                                               &mcast_send_join_decision, &jcls);
1979   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1980   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1981 }
1982
1983
1984 /**
1985  * Send acknowledgement to a client.
1986  *
1987  * Sent after a message fragment has been passed on to multicast.
1988  *
1989  * @param chn The channel struct for the client.
1990  */
1991 static void
1992 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1993 {
1994   struct GNUNET_MessageHeader res;
1995   res.size = htons (sizeof (res));
1996   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1997
1998   /* FIXME */
1999   GNUNET_SERVER_notification_context_add (nc, client);
2000   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
2001 }
2002
2003
2004 /**
2005  * Callback for the transmit functions of multicast.
2006  */
2007 static int
2008 transmit_notify (void *cls, size_t *data_size, void *data)
2009 {
2010   struct Channel *chn = cls;
2011   struct TransmitMessage *tmit_msg = chn->tmit_head;
2012
2013   if (NULL == tmit_msg || *data_size < tmit_msg->size)
2014   {
2015     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016                 "%p transmit_notify: nothing to send.\n", chn);
2017     if (NULL != tmit_msg && *data_size < tmit_msg->size)
2018       GNUNET_break (0);
2019     *data_size = 0;
2020     return GNUNET_NO;
2021   }
2022
2023   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
2025
2026   *data_size = tmit_msg->size;
2027   GNUNET_memcpy (data, &tmit_msg[1], *data_size);
2028
2029   int ret
2030     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2031     ? GNUNET_NO
2032     : GNUNET_YES;
2033
2034   /* FIXME: handle disconnecting clients */
2035   if (NULL != tmit_msg->client)
2036     send_message_ack (chn, tmit_msg->client);
2037
2038   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2039
2040   if (NULL != chn->tmit_head)
2041   {
2042     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2043   }
2044   else if (GNUNET_YES == chn->is_disconnected
2045            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2046   {
2047     /* FIXME: handle partial message (when still in_transmit) */
2048     GNUNET_free (tmit_msg);
2049     return GNUNET_SYSERR;
2050   }
2051   GNUNET_free (tmit_msg);
2052   return ret;
2053 }
2054
2055
2056 /**
2057  * Callback for the transmit functions of multicast.
2058  */
2059 static int
2060 master_transmit_notify (void *cls, size_t *data_size, void *data)
2061 {
2062   int ret = transmit_notify (cls, data_size, data);
2063
2064   if (GNUNET_YES == ret)
2065   {
2066     struct Master *mst = cls;
2067     mst->tmit_handle = NULL;
2068   }
2069   return ret;
2070 }
2071
2072
2073 /**
2074  * Callback for the transmit functions of multicast.
2075  */
2076 static int
2077 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2078 {
2079   int ret = transmit_notify (cls, data_size, data);
2080
2081   if (GNUNET_YES == ret)
2082   {
2083     struct Slave *slv = cls;
2084     slv->tmit_handle = NULL;
2085   }
2086   return ret;
2087 }
2088
2089
2090 /**
2091  * Transmit a message from a channel master to the multicast group.
2092  */
2093 static void
2094 master_transmit_message (struct Master *mst)
2095 {
2096   struct Channel *chn = &mst->chn;
2097   struct TransmitMessage *tmit_msg = chn->tmit_head;
2098   if (NULL == tmit_msg)
2099     return;
2100   if (NULL == mst->tmit_handle)
2101   {
2102     mst->tmit_handle = (void *) &mst->tmit_handle;
2103     struct GNUNET_MULTICAST_OriginTransmitHandle *
2104       tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2105                                                     mst->max_group_generation,
2106                                                     master_transmit_notify, mst);
2107     if (NULL != mst->tmit_handle)
2108       mst->tmit_handle = tmit_handle;
2109   }
2110   else
2111   {
2112     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2113   }
2114 }
2115
2116
2117 /**
2118  * Transmit a message from a channel slave to the multicast group.
2119  */
2120 static void
2121 slave_transmit_message (struct Slave *slv)
2122 {
2123   if (NULL == slv->chn.tmit_head)
2124     return;
2125   if (NULL == slv->tmit_handle)
2126   {
2127     slv->tmit_handle = (void *) &slv->tmit_handle;
2128     struct GNUNET_MULTICAST_MemberTransmitHandle *
2129       tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2130                                                        slave_transmit_notify, slv);
2131     if (NULL != slv->tmit_handle)
2132       slv->tmit_handle = tmit_handle;
2133   }
2134   else
2135   {
2136     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2137   }
2138 }
2139
2140
2141 static void
2142 transmit_message (struct Channel *chn)
2143 {
2144   chn->is_master
2145     ? master_transmit_message ((struct Master *) chn)
2146     : slave_transmit_message ((struct Slave *) chn);
2147 }
2148
2149
2150 /**
2151  * Queue a message from a channel master for sending to the multicast group.
2152  */
2153 static void
2154 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2155 {
2156   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2157
2158   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2159   {
2160     tmit_msg->id = ++mst->max_message_id;
2161     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2162                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2163                 mst, tmit_msg->id);
2164     struct GNUNET_PSYC_MessageMethod *pmeth
2165       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2166
2167     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2168     {
2169       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2170     }
2171     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2172     {
2173       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2174                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2175                   mst, tmit_msg->id - mst->max_state_message_id);
2176       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2177                                           - mst->max_state_message_id);
2178       mst->max_state_message_id = tmit_msg->id;
2179     }
2180     else
2181     {
2182         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2183                     "%p master_queue_message: state not modified\n", mst);
2184       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2185     }
2186
2187     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2188     {
2189       /// @todo add state_hash to PSYC header
2190     }
2191   }
2192 }
2193
2194
2195 /**
2196  * Queue a message from a channel slave for sending to the multicast group.
2197  */
2198 static void
2199 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2200 {
2201   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2202   {
2203     struct GNUNET_PSYC_MessageMethod *pmeth
2204       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2205     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2206     tmit_msg->id = ++slv->max_request_id;
2207   }
2208 }
2209
2210
2211 /**
2212  * Queue PSYC message parts for sending to multicast.
2213  *
2214  * @param chn
2215  *        Channel to send to.
2216  * @param client
2217  *        Client the message originates from.
2218  * @param data_size
2219  *        Size of @a data.
2220  * @param data
2221  *        Concatenated message parts.
2222  * @param first_ptype
2223  *        First message part type in @a data.
2224  * @param last_ptype
2225  *        Last message part type in @a data.
2226  */
2227 static struct TransmitMessage *
2228 queue_message (struct Channel *chn,
2229                struct GNUNET_SERVER_Client *client,
2230                size_t data_size,
2231                const void *data,
2232                uint16_t first_ptype, uint16_t last_ptype)
2233 {
2234   struct TransmitMessage *
2235     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2236   GNUNET_memcpy (&tmit_msg[1], data, data_size);
2237   tmit_msg->client = client;
2238   tmit_msg->size = data_size;
2239   tmit_msg->first_ptype = first_ptype;
2240   tmit_msg->last_ptype = last_ptype;
2241
2242   /* FIXME: separate queue per message ID */
2243
2244   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2245
2246   chn->is_master
2247     ? master_queue_message ((struct Master *) chn, tmit_msg)
2248     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2249   return tmit_msg;
2250 }
2251
2252
2253 /**
2254  * Cancel transmission of current message.
2255  *
2256  * @param chn     Channel to send to.
2257  * @param client  Client the message originates from.
2258  */
2259 static void
2260 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2261 {
2262   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2263
2264   struct GNUNET_MessageHeader msg;
2265   msg.size = htons (sizeof (msg));
2266   msg.type = htons (type);
2267
2268   queue_message (chn, client, sizeof (msg), &msg, type, type);
2269   transmit_message (chn);
2270
2271   /* FIXME: cleanup */
2272 }
2273
2274
2275 /**
2276  * Incoming message from a master or slave client.
2277  */
2278 static void
2279 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2280                           const struct GNUNET_MessageHeader *msg)
2281 {
2282   struct Channel *
2283     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2284   GNUNET_assert (NULL != chn);
2285
2286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2287               "%p Received message from client.\n", chn);
2288   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2289
2290   if (GNUNET_YES != chn->is_ready)
2291   {
2292     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2293                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2294     GNUNET_break (0);
2295     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2296     return;
2297   }
2298
2299   uint16_t size = ntohs (msg->size);
2300   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2301   {
2302     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2303                 "%p Message payload too large: %u < %u.\n",
2304                 chn,
2305                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2306                 (unsigned int) (size - sizeof (*msg)));
2307     GNUNET_break (0);
2308     transmit_cancel (chn, client);
2309     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2310     return;
2311   }
2312
2313   uint16_t first_ptype = 0, last_ptype = 0;
2314   if (GNUNET_SYSERR
2315       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2316                                           (const char *) &msg[1],
2317                                           &first_ptype, &last_ptype))
2318   {
2319     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2320                 "%p Received invalid message part from client.\n", chn);
2321     GNUNET_break (0);
2322     transmit_cancel (chn, client);
2323     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2324     return;
2325   }
2326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2327               "%p Received message with first part type %u and last part type %u.\n",
2328               chn, first_ptype, last_ptype);
2329
2330   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2331                  first_ptype, last_ptype);
2332   transmit_message (chn);
2333   /* FIXME: send a few ACKs even before transmit_notify is called */
2334
2335   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2336 };
2337
2338
2339 /**
2340  * Received result of GNUNET_PSYCSTORE_membership_store()
2341  */
2342 static void
2343 store_recv_membership_store_result (void *cls,
2344                                     int64_t result,
2345                                     const char *err_msg,
2346                                     uint16_t err_msg_size)
2347 {
2348   struct Operation *op = cls;
2349   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2351               op->chn,
2352               result,
2353               (int) err_msg_size,
2354               err_msg);
2355
2356   if (NULL != op->client)
2357     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2358   op_remove (op);
2359 }
2360
2361
2362 /**
2363  * Client requests to add/remove a slave in the membership database.
2364  */
2365 static void
2366 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2367                               const struct GNUNET_MessageHeader *msg)
2368 {
2369   struct Channel *
2370     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2371   GNUNET_assert (NULL != chn);
2372
2373   const struct ChannelMembershipStoreRequest *
2374     req = (const struct ChannelMembershipStoreRequest *) msg;
2375
2376   struct Operation *op = op_add (chn, client, req->op_id, 0);
2377
2378   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2379   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2381               "%p Received membership store request from client.\n", chn);
2382   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2383               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2384               chn, req->did_join, announced_at, effective_since);
2385
2386   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2387                                      req->did_join, announced_at, effective_since,
2388                                      0, /* FIXME: group_generation */
2389                                      &store_recv_membership_store_result, op);
2390   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2391 }
2392
2393
2394 /**
2395  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2396  * in response to a history request from a client.
2397  */
2398 static int
2399 store_recv_fragment_history (void *cls,
2400                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2401                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2402 {
2403   struct Operation *op = cls;
2404   if (NULL == op->client)
2405   { /* Requesting client already disconnected. */
2406     return GNUNET_NO;
2407   }
2408   struct Channel *chn = op->chn;
2409
2410   struct GNUNET_PSYC_MessageHeader *pmsg;
2411   uint16_t msize = ntohs (mmsg->header.size);
2412   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2413
2414   struct GNUNET_OperationResultMessage *
2415     res = GNUNET_malloc (sizeof (*res) + psize);
2416   res->header.size = htons (sizeof (*res) + psize);
2417   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2418   res->op_id = op->op_id;
2419   res->result_code = GNUNET_htonll (GNUNET_OK);
2420
2421   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2422   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2423   GNUNET_memcpy (&res[1], pmsg, psize);
2424
2425   /** @todo FIXME: send only to requesting client */
2426   client_send_msg (chn, &res->header);
2427
2428   GNUNET_free (res);
2429   return GNUNET_YES;
2430 }
2431
2432
2433 /**
2434  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2435  * in response to a history request from a client.
2436  */
2437 static void
2438 store_recv_fragment_history_result (void *cls, int64_t result,
2439                                     const char *err_msg, uint16_t err_msg_size)
2440 {
2441   struct Operation *op = cls;
2442   if (NULL == op->client)
2443   { /* Requesting client already disconnected. */
2444     return;
2445   }
2446
2447   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2448               "%p History replay #%" PRIu64 ": "
2449               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2450               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2451
2452   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2453   {
2454     /** @todo Multicast replay request for messages not found locally. */
2455   }
2456
2457   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2458   op_remove (op);
2459 }
2460
2461
2462 /**
2463  * Client requests channel history.
2464  */
2465 static void
2466 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2467                             const struct GNUNET_MessageHeader *msg)
2468 {
2469   struct Channel *
2470     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2471   GNUNET_assert (NULL != chn);
2472
2473   const struct GNUNET_PSYC_HistoryRequestMessage *
2474     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2475   uint16_t size = ntohs (msg->size);
2476   const char *method_prefix = (const char *) &req[1];
2477
2478   if (size < sizeof (*req) + 1
2479       || '\0' != method_prefix[size - sizeof (*req) - 1])
2480   {
2481     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2482                 "%p History replay #%" PRIu64 ": "
2483                 "invalid method prefix. size: %u < %u?\n",
2484                 chn,
2485                 GNUNET_ntohll (req->op_id),
2486                 size,
2487                 (unsigned int) sizeof (*req) + 1);
2488     GNUNET_break (0);
2489     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2490     return;
2491   }
2492
2493   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2494
2495   if (0 == req->message_limit)
2496     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2497                                   GNUNET_ntohll (req->start_message_id),
2498                                   GNUNET_ntohll (req->end_message_id),
2499                                   0, method_prefix,
2500                                   &store_recv_fragment_history,
2501                                   &store_recv_fragment_history_result, op);
2502   else
2503     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2504                                          GNUNET_ntohll (req->message_limit),
2505                                          method_prefix,
2506                                          &store_recv_fragment_history,
2507                                          &store_recv_fragment_history_result,
2508                                          op);
2509
2510   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2511 }
2512
2513
2514 /**
2515  * Received state var from PSYCstore, send it to client.
2516  */
2517 static int
2518 store_recv_state_var (void *cls, const char *name,
2519                       const void *value, uint32_t value_size)
2520 {
2521   struct Operation *op = cls;
2522   struct GNUNET_OperationResultMessage *res;
2523
2524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2525               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2526               op->chn, GNUNET_ntohll (op->op_id), name);
2527
2528   if (NULL != name) /* First part */
2529   {
2530     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2531     struct GNUNET_PSYC_MessageModifier *mod;
2532     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2533     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2534     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2535     res->op_id = op->op_id;
2536
2537     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2538     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2539     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2540     mod->name_size = htons (name_size);
2541     mod->value_size = htonl (value_size);
2542     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2543     GNUNET_memcpy (&mod[1], name, name_size);
2544     GNUNET_memcpy (((char *) &mod[1]) + name_size, value, value_size);
2545   }
2546   else /* Continuation */
2547   {
2548     struct GNUNET_MessageHeader *mod;
2549     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2550     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2551     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2552     res->op_id = op->op_id;
2553
2554     mod = (struct GNUNET_MessageHeader *) &res[1];
2555     mod->size = htons (sizeof (*mod) + value_size);
2556     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2557     GNUNET_memcpy (&mod[1], value, value_size);
2558   }
2559
2560   // FIXME: client might have been disconnected
2561   GNUNET_SERVER_notification_context_add (nc, op->client);
2562   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2563                                               GNUNET_NO);
2564   GNUNET_free (res);
2565   return GNUNET_YES;
2566 }
2567
2568
2569 /**
2570  * Received result of GNUNET_PSYCSTORE_state_get()
2571  * or GNUNET_PSYCSTORE_state_get_prefix()
2572  */
2573 static void
2574 store_recv_state_result (void *cls, int64_t result,
2575                          const char *err_msg, uint16_t err_msg_size)
2576 {
2577   struct Operation *op = cls;
2578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2579               "%p state_get #%" PRIu64 ": "
2580               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2581               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2582
2583   // FIXME: client might have been disconnected
2584   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2585   op_remove (op);
2586 }
2587
2588
2589 /**
2590  * Client requests best matching state variable from PSYCstore.
2591  */
2592 static void
2593 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2594                        const struct GNUNET_MessageHeader *msg)
2595 {
2596   struct Channel *
2597     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2598   GNUNET_assert (NULL != chn);
2599
2600   const struct StateRequest *
2601     req = (const struct StateRequest *) msg;
2602
2603   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2604   const char *name = (const char *) &req[1];
2605   if (0 == name_size || '\0' != name[name_size - 1])
2606   {
2607     GNUNET_break (0);
2608     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2609     return;
2610   }
2611
2612   struct Operation *op = op_add (chn, client, req->op_id, 0);
2613   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2614                               &store_recv_state_var,
2615                               &store_recv_state_result, op);
2616   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2617 }
2618
2619
2620 /**
2621  * Client requests state variables with a given prefix from PSYCstore.
2622  */
2623 static void
2624 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2625                               const struct GNUNET_MessageHeader *msg)
2626 {
2627   struct Channel *
2628     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2629   GNUNET_assert (NULL != chn);
2630
2631   const struct StateRequest *
2632     req = (const struct StateRequest *) msg;
2633
2634   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2635   const char *name = (const char *) &req[1];
2636   if (0 == name_size || '\0' != name[name_size - 1])
2637   {
2638     GNUNET_break (0);
2639     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2640     return;
2641   }
2642
2643   struct Operation *op = op_add (chn, client, req->op_id, 0);
2644   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2645                                      &store_recv_state_var,
2646                                      &store_recv_state_result, op);
2647   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2648 }
2649
2650
2651 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2652   { &client_recv_master_start, NULL,
2653     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2654
2655   { &client_recv_slave_join, NULL,
2656     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2657
2658   { &client_recv_join_decision, NULL,
2659     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2660
2661   { &client_recv_psyc_message, NULL,
2662     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2663
2664   { &client_recv_membership_store, NULL,
2665     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2666
2667   { &client_recv_history_replay, NULL,
2668     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2669
2670   { &client_recv_state_get, NULL,
2671     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2672
2673   { &client_recv_state_get_prefix, NULL,
2674     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2675
2676   { NULL, NULL, 0, 0 }
2677 };
2678
2679
2680 /**
2681  * Initialize the PSYC service.
2682  *
2683  * @param cls Closure.
2684  * @param server The initialized server.
2685  * @param c Configuration to use.
2686  */
2687 static void
2688 run (void *cls, struct GNUNET_SERVER_Handle *server,
2689      const struct GNUNET_CONFIGURATION_Handle *c)
2690 {
2691   cfg = c;
2692   store = GNUNET_PSYCSTORE_connect (cfg);
2693   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2694   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2695   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2696   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2697   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2698   nc = GNUNET_SERVER_notification_context_create (server, 1);
2699   GNUNET_SERVER_add_handlers (server, server_handlers);
2700   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2701   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2702 }
2703
2704
2705 /**
2706  * The main function for the service.
2707  *
2708  * @param argc number of arguments from the command line
2709  * @param argv command line arguments
2710  * @return 0 ok, 1 on error
2711  */
2712 int
2713 main (int argc, char *const *argv)
2714 {
2715   return (GNUNET_OK ==
2716           GNUNET_SERVICE_run (argc, argv, "psyc",
2717                               GNUNET_SERVICE_OPTION_NONE,
2718                               &run, NULL)) ? 0 : 1;
2719 }
2720
2721 /* end of gnunet-service-psyc.c */