2b436aa214008e1ac7dbb0a18f5b262c75c0efef
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 static void
421 transmit_message (struct Channel *chn);
422
423 static uint64_t
424 message_queue_run (struct Channel *chn);
425
426 static uint64_t
427 message_queue_drop (struct Channel *chn);
428
429
430 static void
431 schedule_transmit_message (void *cls)
432 {
433   struct Channel *chn = cls;
434
435   transmit_message (chn);
436 }
437
438
439 /**
440  * Task run during shutdown.
441  *
442  * @param cls unused
443  */
444 static void
445 shutdown_task (void *cls)
446 {
447   if (NULL != nc)
448   {
449     GNUNET_SERVER_notification_context_destroy (nc);
450     nc = NULL;
451   }
452   if (NULL != stats)
453   {
454     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
455     stats = NULL;
456   }
457 }
458
459
460 static struct Operation *
461 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
462         uint64_t op_id, uint32_t flags)
463 {
464   struct Operation *op = GNUNET_malloc (sizeof (*op));
465   op->client = client;
466   op->chn = chn;
467   op->op_id = op_id;
468   op->flags = flags;
469   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
470   return op;
471 }
472
473
474 static void
475 op_remove (struct Operation *op)
476 {
477   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
478   GNUNET_free (op);
479 }
480
481
482 /**
483  * Clean up master data structures after a client disconnected.
484  */
485 static void
486 cleanup_master (struct Master *mst)
487 {
488   struct Channel *chn = &mst->chn;
489
490   if (NULL != mst->origin)
491     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
492   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
493   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
494 }
495
496
497 /**
498  * Clean up slave data structures after a client disconnected.
499  */
500 static void
501 cleanup_slave (struct Slave *slv)
502 {
503   struct Channel *chn = &slv->chn;
504   struct GNUNET_CONTAINER_MultiHashMap *
505     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
506                                                 &chn->pub_key_hash);
507   GNUNET_assert (NULL != chn_slv);
508   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
509
510   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
511   {
512     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
513                                           chn_slv);
514     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
515   }
516   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
517
518   if (NULL != slv->join_msg)
519   {
520     GNUNET_free (slv->join_msg);
521     slv->join_msg = NULL;
522   }
523   if (NULL != slv->relays)
524   {
525     GNUNET_free (slv->relays);
526     slv->relays = NULL;
527   }
528   if (NULL != slv->member)
529   {
530     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
531     slv->member = NULL;
532   }
533   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
534 }
535
536
537 /**
538  * Clean up channel data structures after a client disconnected.
539  */
540 static void
541 cleanup_channel (struct Channel *chn)
542 {
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "%p Cleaning up channel %s. master? %u\n",
545               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
546   message_queue_drop (chn);
547   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
548   chn->recv_frags = NULL;
549
550   if (NULL != chn->store_op)
551   {
552     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
553     chn->store_op = NULL;
554   }
555
556   (GNUNET_YES == chn->is_master)
557     ? cleanup_master ((struct Master *) chn)
558     : cleanup_slave ((struct Slave *) chn);
559   GNUNET_free (chn);
560 }
561
562
563 /**
564  * Called whenever a client is disconnected.
565  * Frees our resources associated with that client.
566  *
567  * @param cls Closure.
568  * @param client Identification of the client.
569  */
570 static void
571 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
572 {
573   if (NULL == client)
574     return;
575
576   struct Channel *
577     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
578
579   if (NULL == chn)
580   {
581     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582                 "%p User context is NULL in client_disconnect()\n", chn);
583     GNUNET_break (0);
584     return;
585   }
586
587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588               "%p Client (%s) disconnected from channel %s\n",
589               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
590               GNUNET_h2s (&chn->pub_key_hash));
591
592   struct Client *cli = chn->clients_head;
593   while (NULL != cli)
594   {
595     if (cli->client == client)
596     {
597       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
598       GNUNET_free (cli);
599       break;
600     }
601     cli = cli->next;
602   }
603
604   struct Operation *op = chn->op_head;
605   while (NULL != op)
606   {
607     if (op->client == client)
608     {
609       op->client = NULL;
610       break;
611     }
612     op = op->next;
613   }
614
615   if (NULL == chn->clients_head)
616   { /* Last client disconnected. */
617     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                 "%p Last client (%s) disconnected from channel %s\n",
619                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
620                 GNUNET_h2s (&chn->pub_key_hash));
621     chn->is_disconnected = GNUNET_YES;
622     if (NULL != chn->tmit_head)
623     { /* Send pending messages to multicast before cleanup. */
624       transmit_message (chn);
625     }
626     else
627     {
628       cleanup_channel (chn);
629     }
630   }
631 }
632
633
634 /**
635  * Send message to all clients connected to the channel.
636  */
637 static void
638 client_send_msg (const struct Channel *chn,
639                  const struct GNUNET_MessageHeader *msg)
640 {
641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642               "%p Sending message to clients.\n", chn);
643
644   struct Client *cli = chn->clients_head;
645   while (NULL != cli)
646   {
647     GNUNET_SERVER_notification_context_add (nc, cli->client);
648     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
649     cli = cli->next;
650   }
651 }
652
653
654 /**
655  * Send a result code back to the client.
656  *
657  * @param client
658  *        Client that should receive the result code.
659  * @param result_code
660  *        Code to transmit.
661  * @param op_id
662  *        Operation ID in network byte order.
663  * @param data
664  *        Data payload or NULL.
665  * @param data_size
666  *        Size of @a data.
667  */
668 static void
669 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
670                     int64_t result_code, const void *data, uint16_t data_size)
671 {
672   struct GNUNET_OperationResultMessage *res;
673
674   res = GNUNET_malloc (sizeof (*res) + data_size);
675   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
676   res->header.size = htons (sizeof (*res) + data_size);
677   res->result_code = GNUNET_htonll (result_code);
678   res->op_id = op_id;
679   if (0 < data_size)
680     memcpy (&res[1], data, data_size);
681
682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
683               "%p Sending result to client for operation #%" PRIu64 ": "
684               "%" PRId64 " (size: %u)\n",
685               client, GNUNET_ntohll (op_id), result_code, data_size);
686
687   GNUNET_SERVER_notification_context_add (nc, client);
688   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
689                                               GNUNET_NO);
690   GNUNET_free (res);
691 }
692
693
694 /**
695  * Closure for join_mem_test_cb()
696  */
697 struct JoinMemTestClosure
698 {
699   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
700   struct Channel *chn;
701   struct GNUNET_MULTICAST_JoinHandle *jh;
702   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
703 };
704
705
706 /**
707  * Membership test result callback used for join requests.
708  */
709 static void
710 join_mem_test_cb (void *cls, int64_t result,
711                   const char *err_msg, uint16_t err_msg_size)
712 {
713   struct JoinMemTestClosure *jcls = cls;
714
715   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
716   { /* Pass on join request to client if this is a master channel */
717     struct Master *mst = (struct Master *) jcls->chn;
718     struct GNUNET_HashCode slave_pub_hash;
719     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
720                         &slave_pub_hash);
721     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
722                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
723     client_send_msg (jcls->chn, &jcls->join_msg->header);
724   }
725   else
726   {
727     if (GNUNET_SYSERR == result)
728     {
729       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
730                   "Could not perform membership test (%.*s)\n",
731                   err_msg_size, err_msg);
732     }
733     // FIXME: add relays
734     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
735   }
736   GNUNET_free (jcls->join_msg);
737   GNUNET_free (jcls);
738 }
739
740
741 /**
742  * Incoming join request from multicast.
743  */
744 static void
745 mcast_recv_join_request (void *cls,
746                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
747                          const struct GNUNET_MessageHeader *join_msg,
748                          struct GNUNET_MULTICAST_JoinHandle *jh)
749 {
750   struct Channel *chn = cls;
751   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
752
753   uint16_t join_msg_size = 0;
754   if (NULL != join_msg)
755   {
756     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
757     {
758       join_msg_size = ntohs (join_msg->size);
759     }
760     else
761     {
762       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
763                   "%p Got join message with invalid type %u.\n",
764                   chn, ntohs (join_msg->type));
765     }
766   }
767
768   struct GNUNET_PSYC_JoinRequestMessage *
769     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
770   req->header.size = htons (sizeof (*req) + join_msg_size);
771   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
772   req->slave_pub_key = *slave_pub_key;
773   if (0 < join_msg_size)
774     memcpy (&req[1], join_msg, join_msg_size);
775
776   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
777   jcls->slave_pub_key = *slave_pub_key;
778   jcls->chn = chn;
779   jcls->jh = jh;
780   jcls->join_msg = req;
781
782   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
783                                     chn->max_message_id, 0,
784                                     &join_mem_test_cb, jcls);
785 }
786
787
788 /**
789  * Join decision received from multicast.
790  */
791 static void
792 mcast_recv_join_decision (void *cls, int is_admitted,
793                           const struct GNUNET_PeerIdentity *peer,
794                           uint16_t relay_count,
795                           const struct GNUNET_PeerIdentity *relays,
796                           const struct GNUNET_MessageHeader *join_resp)
797 {
798   struct Slave *slv = cls;
799   struct Channel *chn = &slv->chn;
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "%p Got join decision: %d\n", slv, is_admitted);
802   if (GNUNET_YES == chn->is_ready)
803   {
804     /* Already admitted */
805     return;
806   }
807
808   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
809   struct GNUNET_PSYC_JoinDecisionMessage *
810     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
811   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
812   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
813   dcsn->is_admitted = htonl (is_admitted);
814   if (0 < join_resp_size)
815     memcpy (&dcsn[1], join_resp, join_resp_size);
816
817   client_send_msg (chn, &dcsn->header);
818
819   if (GNUNET_YES == is_admitted
820       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
821   {
822     chn->is_ready = GNUNET_YES;
823   }
824 }
825
826
827 static int
828 store_recv_fragment_replay (void *cls,
829                             struct GNUNET_MULTICAST_MessageHeader *msg,
830                             enum GNUNET_PSYCSTORE_MessageFlags flags)
831 {
832   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
833
834   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
835   return GNUNET_YES;
836 }
837
838
839 /**
840  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
841  */
842 static void
843 store_recv_fragment_replay_result (void *cls, int64_t result,
844                                    const char *err_msg, uint16_t err_msg_size)
845 {
846   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
849               rh, result, err_msg_size, err_msg);
850
851   switch (result)
852   {
853   case GNUNET_YES:
854     break;
855
856   case GNUNET_NO:
857     GNUNET_MULTICAST_replay_response (rh, NULL,
858                                       GNUNET_MULTICAST_REC_NOT_FOUND);
859     break;
860
861   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
862     GNUNET_MULTICAST_replay_response (rh, NULL,
863                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
864     break;
865
866   case GNUNET_SYSERR:
867     GNUNET_MULTICAST_replay_response (rh, NULL,
868                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
869     break;
870   }
871   GNUNET_MULTICAST_replay_response_end (rh);
872 }
873
874
875 /**
876  * Incoming fragment replay request from multicast.
877  */
878 static void
879 mcast_recv_replay_fragment (void *cls,
880                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
881                             uint64_t fragment_id, uint64_t flags,
882                             struct GNUNET_MULTICAST_ReplayHandle *rh)
883
884 {
885   struct Channel *chn = cls;
886   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
887                                  fragment_id, fragment_id,
888                                  &store_recv_fragment_replay,
889                                  &store_recv_fragment_replay_result, rh);
890 }
891
892
893 /**
894  * Incoming message replay request from multicast.
895  */
896 static void
897 mcast_recv_replay_message (void *cls,
898                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
899                            uint64_t message_id,
900                            uint64_t fragment_offset,
901                            uint64_t flags,
902                            struct GNUNET_MULTICAST_ReplayHandle *rh)
903 {
904   struct Channel *chn = cls;
905   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
906                                 message_id, message_id, 1, NULL,
907                                 &store_recv_fragment_replay,
908                                 &store_recv_fragment_replay_result, rh);
909 }
910
911
912 /**
913  * Convert an uint64_t in network byte order to a HashCode
914  * that can be used as key in a MultiHashMap
915  */
916 static inline void
917 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
918 {
919   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
920   /* TODO: use built-in byte swap functions if available */
921
922   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
923   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
924
925   *key = (struct GNUNET_HashCode) {};
926   *((uint64_t *) key)
927     = (n << 32) | (n >> 32);
928 }
929
930
931 /**
932  * Convert an uint64_t in host byte order to a HashCode
933  * that can be used as key in a MultiHashMap
934  */
935 static inline void
936 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
937 {
938 #if __BYTE_ORDER == __BIG_ENDIAN
939   hash_key_from_nll (key, n);
940 #elif __BYTE_ORDER == __LITTLE_ENDIAN
941   *key = (struct GNUNET_HashCode) {};
942   *((uint64_t *) key) = n;
943 #else
944   #error byteorder undefined
945 #endif
946 }
947
948
949 /**
950  * Initialize PSYC message header.
951  */
952 static inline void
953 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
954                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
955 {
956   uint16_t size = ntohs (mmsg->header.size);
957   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
958
959   pmsg->header.size = htons (psize);
960   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
961   pmsg->message_id = mmsg->message_id;
962   pmsg->fragment_offset = mmsg->fragment_offset;
963   pmsg->flags = htonl (flags);
964
965   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
966 }
967
968
969 /**
970  * Create a new PSYC message from a multicast message for sending it to clients.
971  */
972 static inline struct GNUNET_PSYC_MessageHeader *
973 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
974 {
975   struct GNUNET_PSYC_MessageHeader *pmsg;
976   uint16_t size = ntohs (mmsg->header.size);
977   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
978
979   pmsg = GNUNET_malloc (psize);
980   psyc_msg_init (pmsg, mmsg, flags);
981   return pmsg;
982 }
983
984
985 /**
986  * Send multicast message to all clients connected to the channel.
987  */
988 static void
989 client_send_mcast_msg (struct Channel *chn,
990                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
991                        uint32_t flags)
992 {
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994               "%p Sending multicast message to client. "
995               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
996               chn, GNUNET_ntohll (mmsg->fragment_id),
997               GNUNET_ntohll (mmsg->message_id));
998
999   struct GNUNET_PSYC_MessageHeader *
1000     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1001   client_send_msg (chn, &pmsg->header);
1002   GNUNET_free (pmsg);
1003 }
1004
1005
1006 /**
1007  * Send multicast request to all clients connected to the channel.
1008  */
1009 static void
1010 client_send_mcast_req (struct Master *mst,
1011                        const struct GNUNET_MULTICAST_RequestHeader *req)
1012 {
1013   struct Channel *chn = &mst->chn;
1014
1015   struct GNUNET_PSYC_MessageHeader *pmsg;
1016   uint16_t size = ntohs (req->header.size);
1017   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1018
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "%p Sending multicast request to client. "
1021               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1022               chn, GNUNET_ntohll (req->fragment_id),
1023               GNUNET_ntohll (req->request_id));
1024
1025   pmsg = GNUNET_malloc (psize);
1026   pmsg->header.size = htons (psize);
1027   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1028   pmsg->message_id = req->request_id;
1029   pmsg->fragment_offset = req->fragment_offset;
1030   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1031   pmsg->slave_pub_key = req->member_pub_key;
1032   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1033
1034   client_send_msg (chn, &pmsg->header);
1035
1036   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1037
1038   GNUNET_free (pmsg);
1039 }
1040
1041
1042 /**
1043  * Insert a multicast message fragment into the queue belonging to the message.
1044  *
1045  * @param chn          Channel.
1046  * @param mmsg         Multicast message fragment.
1047  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1048  * @param first_ptype  First PSYC message part type in @a mmsg.
1049  * @param last_ptype   Last PSYC message part type in @a mmsg.
1050  */
1051 static void
1052 fragment_queue_insert (struct Channel *chn,
1053                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1054                        uint16_t first_ptype, uint16_t last_ptype)
1055 {
1056   const uint16_t size = ntohs (mmsg->header.size);
1057   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1058   struct GNUNET_CONTAINER_MultiHashMap
1059     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1060                                                     &chn->pub_key_hash);
1061
1062   struct GNUNET_HashCode msg_id_hash;
1063   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1064
1065   struct FragmentQueue
1066     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1067
1068   if (NULL == fragq)
1069   {
1070     fragq = GNUNET_new (struct FragmentQueue);
1071     fragq->state = MSG_FRAG_STATE_HEADER;
1072     fragq->fragments
1073       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1074
1075     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1076                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1077
1078     if (NULL == chan_msgs)
1079     {
1080       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1081       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1082                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1083     }
1084   }
1085
1086   struct GNUNET_HashCode frag_id_hash;
1087   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1088   struct RecvCacheEntry
1089     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1090   if (NULL == cache_entry)
1091   {
1092     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093                 "%p Adding message fragment to cache. "
1094                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1095                 chn, GNUNET_ntohll (mmsg->message_id),
1096                 GNUNET_ntohll (mmsg->fragment_id));
1097     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098                 "%p header_size: %" PRIu64 " + %u\n",
1099                 chn, fragq->header_size, size);
1100     cache_entry = GNUNET_new (struct RecvCacheEntry);
1101     cache_entry->ref_count = 1;
1102     cache_entry->mmsg = GNUNET_malloc (size);
1103     memcpy (cache_entry->mmsg, mmsg, size);
1104     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1105                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1106   }
1107   else
1108   {
1109     cache_entry->ref_count++;
1110     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111                 "%p Message fragment is already in cache. "
1112                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1113                 ", ref_count: %u\n",
1114                 chn, GNUNET_ntohll (mmsg->message_id),
1115                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1116   }
1117
1118   if (MSG_FRAG_STATE_HEADER == fragq->state)
1119   {
1120     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1121     {
1122       struct GNUNET_PSYC_MessageMethod *
1123         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1124       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1125       fragq->flags = ntohl (pmeth->flags);
1126     }
1127
1128     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1129     {
1130       fragq->header_size += size;
1131     }
1132     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1133              || frag_offset == fragq->header_size)
1134     { /* header is now complete */
1135       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1136                   "%p Header of message %" PRIu64 " is complete.\n",
1137                   chn, GNUNET_ntohll (mmsg->message_id));
1138
1139       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1140                   "%p Adding message %" PRIu64 " to queue.\n",
1141                   chn, GNUNET_ntohll (mmsg->message_id));
1142       fragq->state = MSG_FRAG_STATE_DATA;
1143     }
1144     else
1145     {
1146       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1147                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1148                   "%" PRIu64 " != %" PRIu64 "\n",
1149                   chn, GNUNET_ntohll (mmsg->message_id),
1150                   frag_offset, fragq->header_size);
1151     }
1152   }
1153
1154   switch (last_ptype)
1155   {
1156   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1157     if (frag_offset == fragq->size)
1158       fragq->state = MSG_FRAG_STATE_END;
1159     else
1160       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161                   "%p Message %" PRIu64 " is NOT complete yet: "
1162                   "%" PRIu64 " != %" PRIu64 "\n",
1163                   chn, GNUNET_ntohll (mmsg->message_id),
1164                   frag_offset, fragq->size);
1165     break;
1166
1167   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1168     /* Drop message without delivering to client if it's a single fragment */
1169     fragq->state =
1170       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1171       ? MSG_FRAG_STATE_DROP
1172       : MSG_FRAG_STATE_CANCEL;
1173   }
1174
1175   switch (fragq->state)
1176   {
1177   case MSG_FRAG_STATE_DATA:
1178   case MSG_FRAG_STATE_END:
1179   case MSG_FRAG_STATE_CANCEL:
1180     if (GNUNET_NO == fragq->is_queued)
1181     {
1182       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1183                                     GNUNET_ntohll (mmsg->message_id));
1184       fragq->is_queued = GNUNET_YES;
1185     }
1186   }
1187
1188   fragq->size += size;
1189   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1190                                 GNUNET_ntohll (mmsg->fragment_id));
1191 }
1192
1193
1194 /**
1195  * Run fragment queue of a message.
1196  *
1197  * Send fragments of a message in order to client, after all modifiers arrived
1198  * from multicast.
1199  *
1200  * @param chn
1201  *        Channel.
1202  * @param msg_id
1203  *        ID of the message @a fragq belongs to.
1204  * @param fragq
1205  *        Fragment queue of the message.
1206  * @param drop
1207  *        Drop message without delivering to client?
1208  *        #GNUNET_YES or #GNUNET_NO.
1209  */
1210 static void
1211 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1212                     struct FragmentQueue *fragq, uint8_t drop)
1213 {
1214   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1215               "%p Running message fragment queue for message %" PRIu64
1216               " (state: %u).\n",
1217               chn, msg_id, fragq->state);
1218
1219   struct GNUNET_CONTAINER_MultiHashMap
1220     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1221                                                     &chn->pub_key_hash);
1222   GNUNET_assert (NULL != chan_msgs);
1223   uint64_t frag_id;
1224
1225   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1226                                                     &frag_id))
1227   {
1228     struct GNUNET_HashCode frag_id_hash;
1229     hash_key_from_hll (&frag_id_hash, frag_id);
1230     struct RecvCacheEntry *cache_entry
1231       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1232     if (cache_entry != NULL)
1233     {
1234       if (GNUNET_NO == drop)
1235       {
1236         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1237       }
1238       if (cache_entry->ref_count <= 1)
1239       {
1240         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1241                                               cache_entry);
1242         GNUNET_free (cache_entry->mmsg);
1243         GNUNET_free (cache_entry);
1244       }
1245       else
1246       {
1247         cache_entry->ref_count--;
1248       }
1249     }
1250 #if CACHE_AGING_IMPLEMENTED
1251     else if (GNUNET_NO == drop)
1252     {
1253       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1254     }
1255 #endif
1256
1257     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1258   }
1259
1260   if (MSG_FRAG_STATE_END <= fragq->state)
1261   {
1262     struct GNUNET_HashCode msg_id_hash;
1263     hash_key_from_hll (&msg_id_hash, msg_id);
1264
1265     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1266     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1267     GNUNET_free (fragq);
1268   }
1269   else
1270   {
1271     fragq->is_queued = GNUNET_NO;
1272   }
1273 }
1274
1275
1276 struct StateModifyClosure
1277 {
1278   struct Channel *chn;
1279   uint64_t msg_id;
1280   struct GNUNET_HashCode msg_id_hash;
1281 };
1282
1283
1284 void
1285 store_recv_state_modify_result (void *cls, int64_t result,
1286                                 const char *err_msg, uint16_t err_msg_size)
1287 {
1288   struct StateModifyClosure *mcls = cls;
1289   struct Channel *chn = mcls->chn;
1290   uint64_t msg_id = mcls->msg_id;
1291
1292   struct FragmentQueue *
1293     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1294
1295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1297               chn, result, err_msg_size, err_msg);
1298
1299   switch (result)
1300   {
1301   case GNUNET_OK:
1302   case GNUNET_NO:
1303     if (NULL != fragq)
1304       fragq->state_is_modified = GNUNET_YES;
1305     if (chn->max_state_message_id < msg_id)
1306       chn->max_state_message_id = msg_id;
1307     if (chn->max_message_id < msg_id)
1308       chn->max_message_id = msg_id;
1309
1310     if (NULL != fragq)
1311       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1312     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1313     message_queue_run (chn);
1314     break;
1315
1316   default:
1317     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1318                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1319                 chn, result, err_msg_size, err_msg);
1320     /** @todo FIXME: handle state_modify error */
1321   }
1322 }
1323
1324
1325 /**
1326  * Run message queue.
1327  *
1328  * Send messages in queue to client in order after a message has arrived from
1329  * multicast, according to the following:
1330  * - A message is only sent if all of its modifiers arrived.
1331  * - A stateful message is only sent if the previous stateful message
1332  *   has already been delivered to the client.
1333  *
1334  * @param chn  Channel.
1335  *
1336  * @return Number of messages removed from queue and sent to client.
1337  */
1338 static uint64_t
1339 message_queue_run (struct Channel *chn)
1340 {
1341   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1342               "%p Running message queue.\n", chn);
1343   uint64_t n = 0;
1344   uint64_t msg_id;
1345
1346   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1347                                                     &msg_id))
1348   {
1349     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1350                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1351     struct GNUNET_HashCode msg_id_hash;
1352     hash_key_from_hll (&msg_id_hash, msg_id);
1353
1354     struct FragmentQueue *
1355       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1356
1357     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1358     {
1359       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1360                   "%p No fragq (%p) or header not complete.\n",
1361                   chn, fragq);
1362       break;
1363     }
1364
1365     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366                 "%p Fragment queue entry:  state: %u, state delta: "
1367                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1368                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1369
1370     if (MSG_FRAG_STATE_DATA <= fragq->state)
1371     {
1372       /* Check if there's a missing message before the current one */
1373       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1374       {
1375         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1376
1377         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1378             && (chn->max_message_id != msg_id - 1
1379                 && chn->max_message_id != msg_id))
1380         {
1381           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1382                       "%p Out of order message. "
1383                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1384                       chn, chn->max_message_id, msg_id);
1385           break;
1386           // FIXME: keep track of messages processed in this queue run,
1387           //        and only stop after reaching the end
1388         }
1389       }
1390       else
1391       {
1392         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1393         if (GNUNET_YES != fragq->state_is_modified)
1394         {
1395           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1396           {
1397             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1398                         "%p Out of order stateful message. "
1399                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1400                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1401             break;
1402             // FIXME: keep track of messages processed in this queue run,
1403             //        and only stop after reaching the end
1404           }
1405
1406           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1407           mcls->chn = chn;
1408           mcls->msg_id = msg_id;
1409           mcls->msg_id_hash = msg_id_hash;
1410
1411           /* Apply modifiers to state in PSYCstore */
1412           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1413                                          fragq->state_delta,
1414                                          store_recv_state_modify_result, mcls);
1415           break; // continue after asynchronous state modify result
1416         }
1417       }
1418       chn->max_message_id = msg_id;
1419     }
1420     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1421     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1422     n++;
1423   }
1424
1425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1427   return n;
1428 }
1429
1430
1431 /**
1432  * Drop message queue of a channel.
1433  *
1434  * Remove all messages in queue without sending it to clients.
1435  *
1436  * @param chn  Channel.
1437  *
1438  * @return Number of messages removed from queue.
1439  */
1440 static uint64_t
1441 message_queue_drop (struct Channel *chn)
1442 {
1443   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1444               "%p Dropping message queue.\n", chn);
1445   uint64_t n = 0;
1446   uint64_t msg_id;
1447   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1448                                                     &msg_id))
1449   {
1450     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1451                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1452     struct GNUNET_HashCode msg_id_hash;
1453     hash_key_from_hll (&msg_id_hash, msg_id);
1454
1455     struct FragmentQueue *
1456       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1457     GNUNET_assert (NULL != fragq);
1458     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1459     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1460     n++;
1461   }
1462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1463               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1464   return n;
1465 }
1466
1467
1468 /**
1469  * Received result of GNUNET_PSYCSTORE_fragment_store().
1470  */
1471 static void
1472 store_recv_fragment_store_result (void *cls, int64_t result,
1473                                   const char *err_msg, uint16_t err_msg_size)
1474 {
1475   struct Channel *chn = cls;
1476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1478               chn, result, err_msg_size, err_msg);
1479 }
1480
1481
1482 /**
1483  * Handle incoming message fragment from multicast.
1484  *
1485  * Store it using PSYCstore and send it to the clients of the channel in order.
1486  */
1487 static void
1488 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1489 {
1490   struct Channel *chn = cls;
1491   uint16_t size = ntohs (mmsg->header.size);
1492
1493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1494               "%p Received multicast message of size %u. "
1495               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1496               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1497               chn, size,
1498               GNUNET_ntohll (mmsg->fragment_id),
1499               GNUNET_ntohll (mmsg->message_id),
1500               GNUNET_ntohll (mmsg->fragment_offset),
1501               GNUNET_ntohll (mmsg->flags));
1502
1503   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1504                                    &store_recv_fragment_store_result, chn);
1505
1506   uint16_t first_ptype = 0, last_ptype = 0;
1507   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1508                                                (const char *) &mmsg[1],
1509                                                &first_ptype, &last_ptype);
1510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511               "%p Message check result %d, first part type %u, last part type %u\n",
1512               chn, check, first_ptype, last_ptype);
1513   if (GNUNET_SYSERR == check)
1514   {
1515     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1516                 "%p Dropping incoming multicast message with invalid parts.\n",
1517                 chn);
1518     GNUNET_break_op (0);
1519     return;
1520   }
1521
1522   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1523   message_queue_run (chn);
1524 }
1525
1526
1527 /**
1528  * Incoming request fragment from multicast for a master.
1529  *
1530  * @param cls   Master.
1531  * @param req   The request.
1532  */
1533 static void
1534 mcast_recv_request (void *cls,
1535                     const struct GNUNET_MULTICAST_RequestHeader *req)
1536 {
1537   struct Master *mst = cls;
1538   uint16_t size = ntohs (req->header.size);
1539
1540   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542               "%p Received multicast request of size %u from %s.\n",
1543               mst, size, str);
1544   GNUNET_free (str);
1545
1546   uint16_t first_ptype = 0, last_ptype = 0;
1547   if (GNUNET_SYSERR
1548       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1549                                           (const char *) &req[1],
1550                                           &first_ptype, &last_ptype))
1551   {
1552     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1553                 "%p Dropping incoming multicast request with invalid parts.\n",
1554                 mst);
1555     GNUNET_break_op (0);
1556     return;
1557   }
1558
1559   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1560               "Message parts: first: type %u, last: type %u\n",
1561               first_ptype, last_ptype);
1562
1563   /* FIXME: in-order delivery */
1564   client_send_mcast_req (mst, req);
1565 }
1566
1567
1568 /**
1569  * Response from PSYCstore with the current counter values for a channel master.
1570  */
1571 static void
1572 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1573                             uint64_t max_message_id, uint64_t max_group_generation,
1574                             uint64_t max_state_message_id)
1575 {
1576   struct Master *mst = cls;
1577   struct Channel *chn = &mst->chn;
1578   chn->store_op = NULL;
1579
1580   struct GNUNET_PSYC_CountersResultMessage res;
1581   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1582   res.header.size = htons (sizeof (res));
1583   res.result_code = htonl (result);
1584   res.max_message_id = GNUNET_htonll (max_message_id);
1585
1586   if (GNUNET_OK == result || GNUNET_NO == result)
1587   {
1588     mst->max_message_id = max_message_id;
1589     chn->max_message_id = max_message_id;
1590     chn->max_state_message_id = max_state_message_id;
1591     mst->max_group_generation = max_group_generation;
1592     mst->origin
1593       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1594                                        mcast_recv_join_request,
1595                                        mcast_recv_replay_fragment,
1596                                        mcast_recv_replay_message,
1597                                        mcast_recv_request,
1598                                        mcast_recv_message, chn);
1599     chn->is_ready = GNUNET_YES;
1600   }
1601   else
1602   {
1603     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1604                 "%p GNUNET_PSYCSTORE_counters_get() "
1605                 "returned %d for channel %s.\n",
1606                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1607   }
1608
1609   client_send_msg (chn, &res.header);
1610 }
1611
1612
1613 /**
1614  * Response from PSYCstore with the current counter values for a channel slave.
1615  */
1616 void
1617 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1618                            uint64_t max_message_id, uint64_t max_group_generation,
1619                            uint64_t max_state_message_id)
1620 {
1621   struct Slave *slv = cls;
1622   struct Channel *chn = &slv->chn;
1623   chn->store_op = NULL;
1624
1625   struct GNUNET_PSYC_CountersResultMessage res;
1626   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1627   res.header.size = htons (sizeof (res));
1628   res.result_code = htonl (result);
1629   res.max_message_id = GNUNET_htonll (max_message_id);
1630
1631   if (GNUNET_OK == result || GNUNET_NO == result)
1632   {
1633     chn->max_message_id = max_message_id;
1634     chn->max_state_message_id = max_state_message_id;
1635     slv->member
1636       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1637                                       &slv->origin,
1638                                       slv->relay_count, slv->relays,
1639                                       &slv->join_msg->header,
1640                                       mcast_recv_join_request,
1641                                       mcast_recv_join_decision,
1642                                       mcast_recv_replay_fragment,
1643                                       mcast_recv_replay_message,
1644                                       mcast_recv_message, chn);
1645     if (NULL != slv->join_msg)
1646     {
1647       GNUNET_free (slv->join_msg);
1648       slv->join_msg = NULL;
1649     }
1650   }
1651   else
1652   {
1653     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1654                 "%p GNUNET_PSYCSTORE_counters_get() "
1655                 "returned %d for channel %s.\n",
1656                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1657   }
1658
1659   client_send_msg (chn, &res.header);
1660 }
1661
1662
1663 static void
1664 channel_init (struct Channel *chn)
1665 {
1666   chn->recv_msgs
1667     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1668   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1669 }
1670
1671
1672 /**
1673  * Handle a connecting client starting a channel master.
1674  */
1675 static void
1676 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1677                           const struct GNUNET_MessageHeader *msg)
1678 {
1679   const struct MasterStartRequest *req
1680     = (const struct MasterStartRequest *) msg;
1681
1682   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1683   struct GNUNET_HashCode pub_key_hash;
1684
1685   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1686   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1687
1688   struct Master *
1689     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1690   struct Channel *chn;
1691
1692   if (NULL == mst)
1693   {
1694     mst = GNUNET_new (struct Master);
1695     mst->policy = ntohl (req->policy);
1696     mst->priv_key = req->channel_key;
1697     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1698
1699     chn = &mst->chn;
1700     chn->is_master = GNUNET_YES;
1701     chn->pub_key = pub_key;
1702     chn->pub_key_hash = pub_key_hash;
1703     channel_init (chn);
1704
1705     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1706                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1707     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1708                                                    store_recv_master_counters, mst);
1709   }
1710   else
1711   {
1712     chn = &mst->chn;
1713
1714     struct GNUNET_PSYC_CountersResultMessage res;
1715     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1716     res.header.size = htons (sizeof (res));
1717     res.result_code = htonl (GNUNET_OK);
1718     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1719
1720     GNUNET_SERVER_notification_context_add (nc, client);
1721     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1722                                                 GNUNET_NO);
1723   }
1724
1725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726               "%p Client connected as master to channel %s.\n",
1727               mst, GNUNET_h2s (&chn->pub_key_hash));
1728
1729   struct Client *cli = GNUNET_new (struct Client);
1730   cli->client = client;
1731   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1732
1733   GNUNET_SERVER_client_set_user_context (client, chn);
1734   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1735 }
1736
1737
1738 /**
1739  * Handle a connecting client joining as a channel slave.
1740  */
1741 static void
1742 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1743                         const struct GNUNET_MessageHeader *msg)
1744 {
1745   const struct SlaveJoinRequest *req
1746     = (const struct SlaveJoinRequest *) msg;
1747   uint16_t req_size = ntohs (req->header.size);
1748
1749   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1750   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1751
1752   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1753   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1754   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1755
1756   struct GNUNET_CONTAINER_MultiHashMap *
1757     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1758   struct Slave *slv = NULL;
1759   struct Channel *chn;
1760
1761   if (NULL != chn_slv)
1762   {
1763     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1764   }
1765   if (NULL == slv)
1766   {
1767     slv = GNUNET_new (struct Slave);
1768     slv->priv_key = req->slave_key;
1769     slv->pub_key = slv_pub_key;
1770     slv->pub_key_hash = slv_pub_hash;
1771     slv->origin = req->origin;
1772     slv->relay_count = ntohl (req->relay_count);
1773     slv->join_flags = ntohl (req->flags);
1774
1775     const struct GNUNET_PeerIdentity *
1776       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1777     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1778     uint16_t join_msg_size = 0;
1779
1780     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1781         <= req_size)
1782     {
1783       struct GNUNET_PSYC_Message *
1784         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1785       join_msg_size = ntohs (join_msg->header.size);
1786       slv->join_msg = GNUNET_malloc (join_msg_size);
1787       memcpy (slv->join_msg, join_msg, join_msg_size);
1788     }
1789     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1790     {
1791       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1792                   "%u + %u + %u != %u\n",
1793                   sizeof (*req), relay_size, join_msg_size, req_size);
1794       GNUNET_break (0);
1795       GNUNET_SERVER_client_disconnect (client);
1796       GNUNET_free (slv);
1797       return;
1798     }
1799     if (0 < slv->relay_count)
1800     {
1801       slv->relays = GNUNET_malloc (relay_size);
1802       memcpy (slv->relays, &req[1], relay_size);
1803     }
1804
1805     chn = &slv->chn;
1806     chn->is_master = GNUNET_NO;
1807     chn->pub_key = req->channel_pub_key;
1808     chn->pub_key_hash = pub_key_hash;
1809     channel_init (chn);
1810
1811     if (NULL == chn_slv)
1812     {
1813       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1814       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1815                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1816     }
1817     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1818                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1819     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1820                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1821     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1822                                                   &store_recv_slave_counters, slv);
1823   }
1824   else
1825   {
1826     chn = &slv->chn;
1827
1828     struct GNUNET_PSYC_CountersResultMessage res;
1829     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1830     res.header.size = htons (sizeof (res));
1831     res.result_code = htonl (GNUNET_OK);
1832     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1833
1834     GNUNET_SERVER_notification_context_add (nc, client);
1835     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1836                                                 GNUNET_NO);
1837
1838     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1839     {
1840       mcast_recv_join_decision (slv, GNUNET_YES,
1841                                 NULL, 0, NULL, NULL);
1842     }
1843     else if (NULL == slv->member)
1844     {
1845       slv->member
1846         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1847                                         &slv->origin,
1848                                         slv->relay_count, slv->relays,
1849                                         &slv->join_msg->header,
1850                                         &mcast_recv_join_request,
1851                                         &mcast_recv_join_decision,
1852                                         &mcast_recv_replay_fragment,
1853                                         &mcast_recv_replay_message,
1854                                         &mcast_recv_message, chn);
1855       if (NULL != slv->join_msg)
1856       {
1857         GNUNET_free (slv->join_msg);
1858         slv->join_msg = NULL;
1859       }
1860     }
1861     else if (NULL != slv->join_dcsn)
1862     {
1863       GNUNET_SERVER_notification_context_add (nc, client);
1864       GNUNET_SERVER_notification_context_unicast (nc, client,
1865                                                   &slv->join_dcsn->header,
1866                                                   GNUNET_NO);
1867     }
1868   }
1869
1870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871               "%p Client connected as slave to channel %s.\n",
1872               slv, GNUNET_h2s (&chn->pub_key_hash));
1873
1874   struct Client *cli = GNUNET_new (struct Client);
1875   cli->client = client;
1876   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1877
1878   GNUNET_SERVER_client_set_user_context (client, chn);
1879   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1880 }
1881
1882
1883 struct JoinDecisionClosure
1884 {
1885   int32_t is_admitted;
1886   struct GNUNET_MessageHeader *msg;
1887 };
1888
1889
1890 /**
1891  * Iterator callback for sending join decisions to multicast.
1892  */
1893 static int
1894 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1895                           void *value)
1896 {
1897   struct JoinDecisionClosure *jcls = cls;
1898   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1899   // FIXME: add relays
1900   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1901   return GNUNET_YES;
1902 }
1903
1904
1905 /**
1906  * Join decision from client.
1907  */
1908 static void
1909 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1910                            const struct GNUNET_MessageHeader *msg)
1911 {
1912   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1913     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1914   struct Channel *chn;
1915   struct Master *mst;
1916   struct JoinDecisionClosure jcls;
1917
1918   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1919   if (NULL == chn)
1920   {
1921     GNUNET_break (0);
1922     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1923     return;
1924   }
1925   GNUNET_assert (GNUNET_YES == chn->is_master);
1926   mst = (struct Master *) chn;
1927   jcls.is_admitted = ntohl (dcsn->is_admitted);
1928   jcls.msg
1929     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1930     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1931     : NULL;
1932
1933   struct GNUNET_HashCode slave_pub_hash;
1934   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1935                       &slave_pub_hash);
1936
1937   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1938               "%p Got join decision (%d) from client for channel %s..\n",
1939               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1941               "%p ..and slave %s.\n",
1942               mst, GNUNET_h2s (&slave_pub_hash));
1943
1944   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1945                                               &mcast_send_join_decision, &jcls);
1946   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1947   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1948 }
1949
1950
1951 /**
1952  * Send acknowledgement to a client.
1953  *
1954  * Sent after a message fragment has been passed on to multicast.
1955  *
1956  * @param chn The channel struct for the client.
1957  */
1958 static void
1959 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1960 {
1961   struct GNUNET_MessageHeader res;
1962   res.size = htons (sizeof (res));
1963   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1964
1965   /* FIXME */
1966   GNUNET_SERVER_notification_context_add (nc, client);
1967   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1968 }
1969
1970
1971 /**
1972  * Callback for the transmit functions of multicast.
1973  */
1974 static int
1975 transmit_notify (void *cls, size_t *data_size, void *data)
1976 {
1977   struct Channel *chn = cls;
1978   struct TransmitMessage *tmit_msg = chn->tmit_head;
1979
1980   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1981   {
1982     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1983                 "%p transmit_notify: nothing to send.\n", chn);
1984     if (NULL != tmit_msg && *data_size < tmit_msg->size)
1985       GNUNET_break (0);
1986     *data_size = 0;
1987     return GNUNET_NO;
1988   }
1989
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1992
1993   *data_size = tmit_msg->size;
1994   memcpy (data, &tmit_msg[1], *data_size);
1995
1996   int ret
1997     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1998     ? GNUNET_NO
1999     : GNUNET_YES;
2000
2001   /* FIXME: handle disconnecting clients */
2002   if (NULL != tmit_msg->client)
2003     send_message_ack (chn, tmit_msg->client);
2004
2005   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2006   GNUNET_free (tmit_msg);
2007
2008   if (NULL != chn->tmit_head)
2009   {
2010     GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
2011   }
2012   else if (GNUNET_YES == chn->is_disconnected
2013            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2014   {
2015     /* FIXME: handle partial message (when still in_transmit) */
2016     return GNUNET_SYSERR;
2017   }
2018   return ret;
2019 }
2020
2021
2022 /**
2023  * Callback for the transmit functions of multicast.
2024  */
2025 static int
2026 master_transmit_notify (void *cls, size_t *data_size, void *data)
2027 {
2028   int ret = transmit_notify (cls, data_size, data);
2029
2030   if (GNUNET_YES == ret)
2031   {
2032     struct Master *mst = cls;
2033     mst->tmit_handle = NULL;
2034   }
2035   return ret;
2036 }
2037
2038
2039 /**
2040  * Callback for the transmit functions of multicast.
2041  */
2042 static int
2043 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2044 {
2045   int ret = transmit_notify (cls, data_size, data);
2046
2047   if (GNUNET_YES == ret)
2048   {
2049     struct Slave *slv = cls;
2050     slv->tmit_handle = NULL;
2051   }
2052   return ret;
2053 }
2054
2055
2056 /**
2057  * Transmit a message from a channel master to the multicast group.
2058  */
2059 static void
2060 master_transmit_message (struct Master *mst)
2061 {
2062   struct Channel *chn = &mst->chn;
2063   struct TransmitMessage *tmit_msg = chn->tmit_head;
2064   if (NULL == tmit_msg)
2065     return;
2066   if (NULL == mst->tmit_handle)
2067   {
2068     mst->tmit_handle
2069       = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2070                                         mst->max_group_generation,
2071                                         master_transmit_notify, mst);
2072   }
2073   else
2074   {
2075     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2076   }
2077 }
2078
2079
2080 /**
2081  * Transmit a message from a channel slave to the multicast group.
2082  */
2083 static void
2084 slave_transmit_message (struct Slave *slv)
2085 {
2086   if (NULL == slv->chn.tmit_head)
2087     return;
2088   if (NULL == slv->tmit_handle)
2089   {
2090     slv->tmit_handle
2091       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2092                                            slave_transmit_notify, slv);
2093   }
2094   else
2095   {
2096     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2097   }
2098 }
2099
2100
2101 static void
2102 transmit_message (struct Channel *chn)
2103 {
2104   chn->is_master
2105     ? master_transmit_message ((struct Master *) chn)
2106     : slave_transmit_message ((struct Slave *) chn);
2107 }
2108
2109
2110 /**
2111  * Queue a message from a channel master for sending to the multicast group.
2112  */
2113 static void
2114 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2115 {
2116   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2117
2118   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2119   {
2120     tmit_msg->id = ++mst->max_message_id;
2121     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2122                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2123                 mst, tmit_msg->id);
2124     struct GNUNET_PSYC_MessageMethod *pmeth
2125       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2126
2127     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2128     {
2129       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2130     }
2131     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2132     {
2133       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2134                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2135                   mst, tmit_msg->id - mst->max_state_message_id);
2136       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2137                                           - mst->max_state_message_id);
2138       mst->max_state_message_id = tmit_msg->id;
2139     }
2140     else
2141     {
2142         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2143                     "%p master_queue_message: state not modified\n", mst);
2144       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2145     }
2146
2147     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2148     {
2149       /// @todo add state_hash to PSYC header
2150     }
2151   }
2152 }
2153
2154
2155 /**
2156  * Queue a message from a channel slave for sending to the multicast group.
2157  */
2158 static void
2159 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2160 {
2161   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2162   {
2163     struct GNUNET_PSYC_MessageMethod *pmeth
2164       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2165     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2166     tmit_msg->id = ++slv->max_request_id;
2167   }
2168 }
2169
2170
2171 /**
2172  * Queue PSYC message parts for sending to multicast.
2173  *
2174  * @param chn
2175  *        Channel to send to.
2176  * @param client
2177  *        Client the message originates from.
2178  * @param data_size
2179  *        Size of @a data.
2180  * @param data
2181  *        Concatenated message parts.
2182  * @param first_ptype
2183  *        First message part type in @a data.
2184  * @param last_ptype
2185  *        Last message part type in @a data.
2186  */
2187 static struct TransmitMessage *
2188 queue_message (struct Channel *chn,
2189                struct GNUNET_SERVER_Client *client,
2190                size_t data_size,
2191                const void *data,
2192                uint16_t first_ptype, uint16_t last_ptype)
2193 {
2194   struct TransmitMessage *
2195     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2196   memcpy (&tmit_msg[1], data, data_size);
2197   tmit_msg->client = client;
2198   tmit_msg->size = data_size;
2199   tmit_msg->first_ptype = first_ptype;
2200   tmit_msg->last_ptype = last_ptype;
2201
2202   /* FIXME: separate queue per message ID */
2203
2204   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2205
2206   chn->is_master
2207     ? master_queue_message ((struct Master *) chn, tmit_msg)
2208     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2209   return tmit_msg;
2210 }
2211
2212
2213 /**
2214  * Cancel transmission of current message.
2215  *
2216  * @param chn     Channel to send to.
2217  * @param client  Client the message originates from.
2218  */
2219 static void
2220 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2221 {
2222   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2223
2224   struct GNUNET_MessageHeader msg;
2225   msg.size = htons (sizeof (msg));
2226   msg.type = htons (type);
2227
2228   queue_message (chn, client, sizeof (msg), &msg, type, type);
2229   transmit_message (chn);
2230
2231   /* FIXME: cleanup */
2232 }
2233
2234
2235 /**
2236  * Incoming message from a master or slave client.
2237  */
2238 static void
2239 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2240                           const struct GNUNET_MessageHeader *msg)
2241 {
2242   struct Channel *
2243     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2244   GNUNET_assert (NULL != chn);
2245
2246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2247               "%p Received message from client.\n", chn);
2248   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2249
2250   if (GNUNET_YES != chn->is_ready)
2251   {
2252     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2253                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2254     GNUNET_break (0);
2255     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2256     return;
2257   }
2258
2259   uint16_t size = ntohs (msg->size);
2260   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2261   {
2262     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2263                 "%p Message payload too large: %u < %u.\n",
2264                 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2265     GNUNET_break (0);
2266     transmit_cancel (chn, client);
2267     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2268     return;
2269   }
2270
2271   uint16_t first_ptype = 0, last_ptype = 0;
2272   if (GNUNET_SYSERR
2273       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2274                                           (const char *) &msg[1],
2275                                           &first_ptype, &last_ptype))
2276   {
2277     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2278                 "%p Received invalid message part from client.\n", chn);
2279     GNUNET_break (0);
2280     transmit_cancel (chn, client);
2281     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2282     return;
2283   }
2284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2285               "%p Received message with first part type %u and last part type %u.\n",
2286               chn, first_ptype, last_ptype);
2287
2288   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2289                  first_ptype, last_ptype);
2290   transmit_message (chn);
2291   /* FIXME: send a few ACKs even before transmit_notify is called */
2292
2293   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2294 };
2295
2296
2297 /**
2298  * Received result of GNUNET_PSYCSTORE_membership_store()
2299  */
2300 static void
2301 store_recv_membership_store_result (void *cls, int64_t result,
2302                                     const char *err_msg, uint16_t err_msg_size)
2303 {
2304   struct Operation *op = cls;
2305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2307               op->chn, result, err_msg_size, err_msg);
2308
2309   if (NULL != op->client)
2310     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2311   op_remove (op);
2312 }
2313
2314
2315 /**
2316  * Client requests to add/remove a slave in the membership database.
2317  */
2318 static void
2319 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2320                               const struct GNUNET_MessageHeader *msg)
2321 {
2322   struct Channel *
2323     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2324   GNUNET_assert (NULL != chn);
2325
2326   const struct ChannelMembershipStoreRequest *
2327     req = (const struct ChannelMembershipStoreRequest *) msg;
2328
2329   struct Operation *op = op_add (chn, client, req->op_id, 0);
2330
2331   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2332   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2333   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2334               "%p Received membership store request from client.\n", chn);
2335   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2336               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2337               chn, req->did_join, announced_at, effective_since);
2338
2339   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2340                                      req->did_join, announced_at, effective_since,
2341                                      0, /* FIXME: group_generation */
2342                                      &store_recv_membership_store_result, op);
2343   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2344 }
2345
2346
2347 /**
2348  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2349  * in response to a history request from a client.
2350  */
2351 static int
2352 store_recv_fragment_history (void *cls,
2353                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2354                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2355 {
2356   struct Operation *op = cls;
2357   if (NULL == op->client)
2358   { /* Requesting client already disconnected. */
2359     return GNUNET_NO;
2360   }
2361   struct Channel *chn = op->chn;
2362
2363   struct GNUNET_PSYC_MessageHeader *pmsg;
2364   uint16_t msize = ntohs (mmsg->header.size);
2365   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2366
2367   struct GNUNET_OperationResultMessage *
2368     res = GNUNET_malloc (sizeof (*res) + psize);
2369   res->header.size = htons (sizeof (*res) + psize);
2370   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2371   res->op_id = op->op_id;
2372   res->result_code = GNUNET_htonll (GNUNET_OK);
2373
2374   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2375   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2376   memcpy (&res[1], pmsg, psize);
2377
2378   /** @todo FIXME: send only to requesting client */
2379   client_send_msg (chn, &res->header);
2380   return GNUNET_YES;
2381 }
2382
2383
2384 /**
2385  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2386  * in response to a history request from a client.
2387  */
2388 static void
2389 store_recv_fragment_history_result (void *cls, int64_t result,
2390                                     const char *err_msg, uint16_t err_msg_size)
2391 {
2392   struct Operation *op = cls;
2393   if (NULL == op->client)
2394   { /* Requesting client already disconnected. */
2395     return;
2396   }
2397
2398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2399               "%p History replay #%" PRIu64 ": "
2400               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2401               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2402
2403   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2404   {
2405     /** @todo Multicast replay request for messages not found locally. */
2406   }
2407
2408   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2409   op_remove (op);
2410 }
2411
2412
2413 /**
2414  * Client requests channel history.
2415  */
2416 static void
2417 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2418                             const struct GNUNET_MessageHeader *msg)
2419 {
2420   struct Channel *
2421     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2422   GNUNET_assert (NULL != chn);
2423
2424   const struct GNUNET_PSYC_HistoryRequestMessage *
2425     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2426   uint16_t size = ntohs (msg->size);
2427   const char *method_prefix = (const char *) &req[1];
2428
2429   if (size < sizeof (*req) + 1
2430       || '\0' != method_prefix[size - sizeof (*req) - 1])
2431   {
2432     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2433                 "%p History replay #%" PRIu64 ": "
2434                 "invalid method prefix. size: %u < %u?\n",
2435                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2436     GNUNET_break (0);
2437     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2438     return;
2439   }
2440
2441   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2442
2443   if (0 == req->message_limit)
2444     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2445                                   GNUNET_ntohll (req->start_message_id),
2446                                   GNUNET_ntohll (req->end_message_id),
2447                                   0, method_prefix,
2448                                   &store_recv_fragment_history,
2449                                   &store_recv_fragment_history_result, op);
2450   else
2451     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2452                                          GNUNET_ntohll (req->message_limit),
2453                                          method_prefix,
2454                                          &store_recv_fragment_history,
2455                                          &store_recv_fragment_history_result,
2456                                          op);
2457
2458   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2459 }
2460
2461
2462 /**
2463  * Received state var from PSYCstore, send it to client.
2464  */
2465 static int
2466 store_recv_state_var (void *cls, const char *name,
2467                       const void *value, uint32_t value_size)
2468 {
2469   struct Operation *op = cls;
2470   struct GNUNET_OperationResultMessage *res;
2471
2472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2473               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2474               op->chn, GNUNET_ntohll (op->op_id), name);
2475
2476   if (NULL != name) /* First part */
2477   {
2478     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2479     struct GNUNET_PSYC_MessageModifier *mod;
2480     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2481     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2482     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2483     res->op_id = op->op_id;
2484
2485     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2486     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2487     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2488     mod->name_size = htons (name_size);
2489     mod->value_size = htonl (value_size);
2490     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2491     memcpy (&mod[1], name, name_size);
2492     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2493   }
2494   else /* Continuation */
2495   {
2496     struct GNUNET_MessageHeader *mod;
2497     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2498     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2499     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2500     res->op_id = op->op_id;
2501
2502     mod = (struct GNUNET_MessageHeader *) &res[1];
2503     mod->size = htons (sizeof (*mod) + value_size);
2504     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2505     memcpy (&mod[1], value, value_size);
2506   }
2507
2508   // FIXME: client might have been disconnected
2509   GNUNET_SERVER_notification_context_add (nc, op->client);
2510   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2511                                               GNUNET_NO);
2512   return GNUNET_YES;
2513 }
2514
2515
2516 /**
2517  * Received result of GNUNET_PSYCSTORE_state_get()
2518  * or GNUNET_PSYCSTORE_state_get_prefix()
2519  */
2520 static void
2521 store_recv_state_result (void *cls, int64_t result,
2522                          const char *err_msg, uint16_t err_msg_size)
2523 {
2524   struct Operation *op = cls;
2525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526               "%p state_get #%" PRIu64 ": "
2527               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2528               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2529
2530   // FIXME: client might have been disconnected
2531   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2532   op_remove (op);
2533 }
2534
2535
2536 /**
2537  * Client requests best matching state variable from PSYCstore.
2538  */
2539 static void
2540 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2541                        const struct GNUNET_MessageHeader *msg)
2542 {
2543   struct Channel *
2544     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2545   GNUNET_assert (NULL != chn);
2546
2547   const struct StateRequest *
2548     req = (const struct StateRequest *) msg;
2549
2550   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2551   const char *name = (const char *) &req[1];
2552   if (0 == name_size || '\0' != name[name_size - 1])
2553   {
2554     GNUNET_break (0);
2555     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2556     return;
2557   }
2558
2559   struct Operation *op = op_add (chn, client, req->op_id, 0);
2560   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2561                               &store_recv_state_var,
2562                               &store_recv_state_result, op);
2563   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2564 }
2565
2566
2567 /**
2568  * Client requests state variables with a given prefix from PSYCstore.
2569  */
2570 static void
2571 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2572                               const struct GNUNET_MessageHeader *msg)
2573 {
2574   struct Channel *
2575     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2576   GNUNET_assert (NULL != chn);
2577
2578   const struct StateRequest *
2579     req = (const struct StateRequest *) msg;
2580
2581   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2582   const char *name = (const char *) &req[1];
2583   if (0 == name_size || '\0' != name[name_size - 1])
2584   {
2585     GNUNET_break (0);
2586     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2587     return;
2588   }
2589
2590   struct Operation *op = op_add (chn, client, req->op_id, 0);
2591   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2592                                      &store_recv_state_var,
2593                                      &store_recv_state_result, op);
2594   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2595 }
2596
2597
2598 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2599   { &client_recv_master_start, NULL,
2600     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2601
2602   { &client_recv_slave_join, NULL,
2603     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2604
2605   { &client_recv_join_decision, NULL,
2606     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2607
2608   { &client_recv_psyc_message, NULL,
2609     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2610
2611   { &client_recv_membership_store, NULL,
2612     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2613
2614   { &client_recv_history_replay, NULL,
2615     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2616
2617   { &client_recv_state_get, NULL,
2618     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2619
2620   { &client_recv_state_get_prefix, NULL,
2621     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2622
2623   { NULL, NULL, 0, 0 }
2624 };
2625
2626
2627 /**
2628  * Initialize the PSYC service.
2629  *
2630  * @param cls Closure.
2631  * @param server The initialized server.
2632  * @param c Configuration to use.
2633  */
2634 static void
2635 run (void *cls, struct GNUNET_SERVER_Handle *server,
2636      const struct GNUNET_CONFIGURATION_Handle *c)
2637 {
2638   cfg = c;
2639   store = GNUNET_PSYCSTORE_connect (cfg);
2640   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2641   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2642   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2643   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2644   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2645   nc = GNUNET_SERVER_notification_context_create (server, 1);
2646   GNUNET_SERVER_add_handlers (server, server_handlers);
2647   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2648   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2649                                 &shutdown_task, NULL);
2650 }
2651
2652
2653 /**
2654  * The main function for the service.
2655  *
2656  * @param argc number of arguments from the command line
2657  * @param argv command line arguments
2658  * @return 0 ok, 1 on error
2659  */
2660 int
2661 main (int argc, char *const *argv)
2662 {
2663   return (GNUNET_OK ==
2664           GNUNET_SERVICE_run (argc, argv, "psyc",
2665                               GNUNET_SERVICE_OPTION_NONE,
2666                               &run, NULL)) ? 0 : 1;
2667 }
2668
2669 /* end of gnunet-service-psyc.c */