psycstore: add fragment_limit arg for message_get
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * @see enum MessageState
102    */
103   uint8_t state;
104
105   /**
106    * Whether a message ACK has already been sent to the client.
107    * #GNUNET_YES or #GNUNET_NO
108    */
109   uint8_t ack_sent;
110
111   /* Followed by message */
112 };
113
114
115 /**
116  * Cache for received message fragments.
117  * Message fragments are only sent to clients after all modifiers arrived.
118  *
119  * chan_key -> MultiHashMap chan_msgs
120  */
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
122
123
124 /**
125  * Entry in the chan_msgs hashmap of @a recv_cache:
126  * fragment_id -> RecvCacheEntry
127  */
128 struct RecvCacheEntry
129 {
130   struct GNUNET_MULTICAST_MessageHeader *mmsg;
131   uint16_t ref_count;
132 };
133
134
135 /**
136  * Entry in the @a recv_frags hash map of a @a Channel.
137  * message_id -> FragmentQueue
138  */
139 struct FragmentQueue
140 {
141   /**
142    * Fragment IDs stored in @a recv_cache.
143    */
144   struct GNUNET_CONTAINER_Heap *fragments;
145
146   /**
147    * Total size of received fragments.
148    */
149   uint64_t size;
150
151   /**
152    * Total size of received header fragments (METHOD & MODIFIERs)
153    */
154   uint64_t header_size;
155
156   /**
157    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158    */
159   uint64_t state_delta;
160
161   /**
162    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
163    */
164   uint32_t flags;
165
166   /**
167    * Receive state of message.
168    *
169    * @see MessageFragmentState
170    */
171   uint8_t state;
172
173   /**
174    * Whether the state is already modified in PSYCstore.
175    */
176   uint8_t state_is_modified;
177
178   /**
179    * Is the message queued for delivery to the client?
180    * i.e. added to the recv_msgs queue
181    */
182   uint8_t is_queued;
183 };
184
185
186 /**
187  * List of connected clients.
188  */
189 struct Client
190 {
191   struct Client *prev;
192   struct Client *next;
193
194   struct GNUNET_SERVER_Client *client;
195 };
196
197
198 struct Operation
199 {
200   struct Operation *prev;
201   struct Operation *next;
202
203   struct GNUNET_SERVER_Client *client;
204   struct Channel *chn;
205   uint64_t op_id;
206   uint32_t flags;
207 };
208
209
210 /**
211  * Common part of the client context for both a channel master and slave.
212  */
213 struct Channel
214 {
215   struct Client *clients_head;
216   struct Client *clients_tail;
217
218   struct Operation *op_head;
219   struct Operation *op_tail;
220
221   struct TransmitMessage *tmit_head;
222   struct TransmitMessage *tmit_tail;
223
224   /**
225    * Current PSYCstore operation.
226    */
227   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
228
229   /**
230    * Received fragments not yet sent to the client.
231    * message_id -> FragmentQueue
232    */
233   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
234
235   /**
236    * Received message IDs not yet sent to the client.
237    */
238   struct GNUNET_CONTAINER_Heap *recv_msgs;
239
240   /**
241    * Public key of the channel.
242    */
243   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
244
245   /**
246    * Hash of @a pub_key.
247    */
248   struct GNUNET_HashCode pub_key_hash;
249
250   /**
251    * Last message ID sent to the client.
252    * 0 if there is no such message.
253    */
254   uint64_t max_message_id;
255
256   /**
257    * ID of the last stateful message, where the state operations has been
258    * processed and saved to PSYCstore and which has been sent to the client.
259    * 0 if there is no such message.
260    */
261   uint64_t max_state_message_id;
262
263   /**
264    * Expected value size for the modifier being received from the PSYC service.
265    */
266   uint32_t tmit_mod_value_size_expected;
267
268   /**
269    * Actual value size for the modifier being received from the PSYC service.
270    */
271   uint32_t tmit_mod_value_size;
272
273   /**
274    * @see enum MessageState
275    */
276   uint8_t tmit_state;
277
278   /**
279    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
280    */
281   uint8_t is_master;
282
283   /**
284    * Is this channel ready to receive messages from client?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_ready;
288
289   /**
290    * Is the client disconnected?
291    * #GNUNET_YES or #GNUNET_NO
292    */
293   uint8_t is_disconnected;
294 };
295
296
297 /**
298  * Client context for a channel master.
299  */
300 struct Master
301 {
302   /**
303    * Channel struct common for Master and Slave
304    */
305   struct Channel chn;
306
307   /**
308    * Private key of the channel.
309    */
310   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
311
312   /**
313    * Handle for the multicast origin.
314    */
315   struct GNUNET_MULTICAST_Origin *origin;
316
317   /**
318    * Transmit handle for multicast.
319    */
320   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
321
322   /**
323    * Incoming join requests from multicast.
324    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
325    */
326   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
327
328   /**
329    * Last message ID transmitted to this channel.
330    *
331    * Incremented before sending a message, thus the message_id in messages sent
332    * starts from 1.
333    */
334   uint64_t max_message_id;
335
336   /**
337    * ID of the last message with state operations transmitted to the channel.
338    * 0 if there is no such message.
339    */
340   uint64_t max_state_message_id;
341
342   /**
343    * Maximum group generation transmitted to the channel.
344    */
345   uint64_t max_group_generation;
346
347   /**
348    * @see enum GNUNET_PSYC_Policy
349    */
350   enum GNUNET_PSYC_Policy policy;
351 };
352
353
354 /**
355  * Client context for a channel slave.
356  */
357 struct Slave
358 {
359   /**
360    * Channel struct common for Master and Slave
361    */
362   struct Channel chn;
363
364   /**
365    * Private key of the slave.
366    */
367   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
368
369   /**
370    * Public key of the slave.
371    */
372   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
373
374   /**
375    * Hash of @a pub_key.
376    */
377   struct GNUNET_HashCode pub_key_hash;
378
379   /**
380    * Handle for the multicast member.
381    */
382   struct GNUNET_MULTICAST_Member *member;
383
384   /**
385    * Transmit handle for multicast.
386    */
387   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
388
389   /**
390    * Peer identity of the origin.
391    */
392   struct GNUNET_PeerIdentity origin;
393
394   /**
395    * Number of items in @a relays.
396    */
397   uint32_t relay_count;
398
399   /**
400    * Relays that multicast can use to connect.
401    */
402   struct GNUNET_PeerIdentity *relays;
403
404   /**
405    * Join request to be transmitted to the master on join.
406    */
407   struct GNUNET_PSYC_Message *join_msg;
408
409   /**
410    * Join decision received from multicast.
411    */
412   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
413
414   /**
415    * Maximum request ID for this channel.
416    */
417   uint64_t max_request_id;
418 };
419
420
421 static void
422 transmit_message (struct Channel *chn);
423
424 static uint64_t
425 message_queue_run (struct Channel *chn);
426
427 static uint64_t
428 message_queue_drop (struct Channel *chn);
429
430
431 /**
432  * Task run during shutdown.
433  *
434  * @param cls unused
435  * @param tc unused
436  */
437 static void
438 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
439 {
440   if (NULL != nc)
441   {
442     GNUNET_SERVER_notification_context_destroy (nc);
443     nc = NULL;
444   }
445   if (NULL != stats)
446   {
447     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
448     stats = NULL;
449   }
450 }
451
452
453 static struct Operation *
454 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
455         uint64_t op_id, uint32_t flags)
456 {
457   struct Operation *op = GNUNET_malloc (sizeof (*op));
458   op->client = client;
459   op->chn = chn;
460   op->op_id = op_id;
461   op->flags = flags;
462   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
463   return op;
464 }
465
466
467 static void
468 op_remove (struct Operation *op)
469 {
470   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
471   GNUNET_free (op);
472 }
473
474
475 /**
476  * Clean up master data structures after a client disconnected.
477  */
478 static void
479 cleanup_master (struct Master *mst)
480 {
481   struct Channel *chn = &mst->chn;
482
483   if (NULL != mst->origin)
484     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
485   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
486   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
487 }
488
489
490 /**
491  * Clean up slave data structures after a client disconnected.
492  */
493 static void
494 cleanup_slave (struct Slave *slv)
495 {
496   struct Channel *chn = &slv->chn;
497   struct GNUNET_CONTAINER_MultiHashMap *
498     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
499                                                 &chn->pub_key_hash);
500   GNUNET_assert (NULL != chn_slv);
501   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
502
503   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
504   {
505     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
506                                           chn_slv);
507     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
508   }
509   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
510
511   if (NULL != slv->join_msg)
512   {
513     GNUNET_free (slv->join_msg);
514     slv->join_msg = NULL;
515   }
516   if (NULL != slv->relays)
517   {
518     GNUNET_free (slv->relays);
519     slv->relays = NULL;
520   }
521   if (NULL != slv->member)
522   {
523     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
524     slv->member = NULL;
525   }
526   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
527 }
528
529
530 /**
531  * Clean up channel data structures after a client disconnected.
532  */
533 static void
534 cleanup_channel (struct Channel *chn)
535 {
536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537               "%p Cleaning up channel %s. master? %u\n",
538               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
539   message_queue_drop (chn);
540   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
541   chn->recv_frags = NULL;
542
543   if (NULL != chn->store_op)
544   {
545     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
546     chn->store_op = NULL;
547   }
548
549   (GNUNET_YES == chn->is_master)
550     ? cleanup_master ((struct Master *) chn)
551     : cleanup_slave ((struct Slave *) chn);
552   GNUNET_free (chn);
553 }
554
555
556 /**
557  * Called whenever a client is disconnected.
558  * Frees our resources associated with that client.
559  *
560  * @param cls Closure.
561  * @param client Identification of the client.
562  */
563 static void
564 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
565 {
566   if (NULL == client)
567     return;
568
569   struct Channel *
570     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
571
572   if (NULL == chn)
573   {
574     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
575                 "%p User context is NULL in client_disconnect()\n", chn);
576     GNUNET_break (0);
577     return;
578   }
579
580   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
581               "%p Client (%s) disconnected from channel %s\n",
582               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
583               GNUNET_h2s (&chn->pub_key_hash));
584
585   chn->is_disconnected = GNUNET_YES;
586
587   struct Client *cli = chn->clients_head;
588   while (NULL != cli)
589   {
590     if (cli->client == client)
591     {
592       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
593       GNUNET_free (cli);
594       break;
595     }
596     cli = cli->next;
597   }
598
599   struct Operation *op = chn->op_head;
600   while (NULL != op)
601   {
602     if (op->client == client)
603     {
604       op->client = NULL;
605       break;
606     }
607     op = op->next;
608   }
609
610   if (NULL == chn->clients_head)
611   { /* Last client disconnected. */
612     if (NULL != chn->tmit_head)
613     { /* Send pending messages to multicast before cleanup. */
614       transmit_message (chn);
615     }
616     else
617     {
618       cleanup_channel (chn);
619     }
620   }
621 }
622
623
624 /**
625  * Send message to all clients connected to the channel.
626  */
627 static void
628 client_send_msg (const struct Channel *chn,
629                  const struct GNUNET_MessageHeader *msg)
630 {
631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632               "%p Sending message to clients.\n", chn);
633
634   struct Client *cli = chn->clients_head;
635   while (NULL != cli)
636   {
637     GNUNET_SERVER_notification_context_add (nc, cli->client);
638     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
639     cli = cli->next;
640   }
641 }
642
643
644 /**
645  * Send a result code back to the client.
646  *
647  * @param client
648  *        Client that should receive the result code.
649  * @param result_code
650  *        Code to transmit.
651  * @param op_id
652  *        Operation ID in network byte order.
653  * @param data
654  *        Data payload or NULL.
655  * @param data_size
656  *        Size of @a data.
657  */
658 static void
659 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
660                     int64_t result_code, const void *data, uint16_t data_size)
661 {
662   struct GNUNET_OperationResultMessage *res;
663
664   res = GNUNET_malloc (sizeof (*res) + data_size);
665   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
666   res->header.size = htons (sizeof (*res) + data_size);
667   res->result_code = GNUNET_htonll (result_code);
668   res->op_id = op_id;
669   if (0 < data_size)
670     memcpy (&res[1], data, data_size);
671
672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673               "%p Sending result to client for operation #%" PRIu64 ": "
674               "%" PRId64 " (size: %u)\n",
675               client, GNUNET_ntohll (op_id), result_code, data_size);
676
677   GNUNET_SERVER_notification_context_add (nc, client);
678   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
679                                               GNUNET_NO);
680   GNUNET_free (res);
681 }
682
683
684 /**
685  * Closure for join_mem_test_cb()
686  */
687 struct JoinMemTestClosure
688 {
689   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
690   struct Channel *chn;
691   struct GNUNET_MULTICAST_JoinHandle *jh;
692   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
693 };
694
695
696 /**
697  * Membership test result callback used for join requests.
698  */
699 static void
700 join_mem_test_cb (void *cls, int64_t result,
701                   const char *err_msg, uint16_t err_msg_size)
702 {
703   struct JoinMemTestClosure *jcls = cls;
704
705   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
706   { /* Pass on join request to client if this is a master channel */
707     struct Master *mst = (struct Master *) jcls->chn;
708     struct GNUNET_HashCode slave_key_hash;
709     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
710                         &slave_key_hash);
711     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
712                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
713     client_send_msg (jcls->chn, &jcls->join_msg->header);
714   }
715   else
716   {
717     if (GNUNET_SYSERR == result)
718     {
719       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
720                   "Could not perform membership test (%.*s)\n",
721                   err_msg_size, err_msg);
722     }
723     // FIXME: add relays
724     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
725   }
726   GNUNET_free (jcls->join_msg);
727   GNUNET_free (jcls);
728 }
729
730
731 /**
732  * Incoming join request from multicast.
733  */
734 static void
735 mcast_recv_join_request (void *cls,
736                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
737                          const struct GNUNET_MessageHeader *join_msg,
738                          struct GNUNET_MULTICAST_JoinHandle *jh)
739 {
740   struct Channel *chn = cls;
741   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
742
743   uint16_t join_msg_size = 0;
744   if (NULL != join_msg)
745   {
746     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
747     {
748       join_msg_size = ntohs (join_msg->size);
749     }
750     else
751     {
752       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
753                   "%p Got join message with invalid type %u.\n",
754                   chn, ntohs (join_msg->type));
755     }
756   }
757
758   struct GNUNET_PSYC_JoinRequestMessage *
759     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
760   req->header.size = htons (sizeof (*req) + join_msg_size);
761   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
762   req->slave_key = *slave_key;
763   if (0 < join_msg_size)
764     memcpy (&req[1], join_msg, join_msg_size);
765
766   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
767   jcls->slave_key = *slave_key;
768   jcls->chn = chn;
769   jcls->jh = jh;
770   jcls->join_msg = req;
771
772   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
773                                     chn->max_message_id, 0,
774                                     &join_mem_test_cb, jcls);
775 }
776
777
778 /**
779  * Join decision received from multicast.
780  */
781 static void
782 mcast_recv_join_decision (void *cls, int is_admitted,
783                           const struct GNUNET_PeerIdentity *peer,
784                           uint16_t relay_count,
785                           const struct GNUNET_PeerIdentity *relays,
786                           const struct GNUNET_MessageHeader *join_resp)
787 {
788   struct Slave *slv = cls;
789   struct Channel *chn = &slv->chn;
790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791               "%p Got join decision: %d\n", slv, is_admitted);
792
793   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
794   struct GNUNET_PSYC_JoinDecisionMessage *
795     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
796   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
797   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
798   dcsn->is_admitted = htonl (is_admitted);
799   if (0 < join_resp_size)
800     memcpy (&dcsn[1], join_resp, join_resp_size);
801
802   client_send_msg (chn, &dcsn->header);
803
804   if (GNUNET_YES == is_admitted)
805   {
806     chn->is_ready = GNUNET_YES;
807   }
808   else
809   {
810     slv->member = NULL;
811   }
812 }
813
814
815 /**
816  * Received result of GNUNET_PSYCSTORE_membership_test()
817  */
818 static void
819 store_recv_membership_test_result (void *cls, int64_t result,
820                                    const char *err_msg, uint16_t err_msg_size)
821 {
822   struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824               "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
825               mth, result, err_msg_size, err_msg);
826
827   GNUNET_MULTICAST_membership_test_result (mth, result);
828 }
829
830
831 /**
832  * Incoming membership test request from multicast.
833  */
834 static void
835 mcast_recv_membership_test (void *cls,
836                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
837                             uint64_t message_id, uint64_t group_generation,
838                             struct GNUNET_MULTICAST_MembershipTestHandle *mth)
839 {
840   struct Channel *chn = cls;
841   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842               "%p Received membership test request from multicast.\n",
843               mth);
844   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
845                                     message_id, group_generation,
846                                     &store_recv_membership_test_result, mth);
847 }
848
849
850 static int
851 store_recv_fragment_replay (void *cls,
852                             struct GNUNET_MULTICAST_MessageHeader *msg,
853                             enum GNUNET_PSYCSTORE_MessageFlags flags)
854 {
855   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
856
857   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
858   return GNUNET_YES;
859 }
860
861
862 /**
863  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
864  */
865 static void
866 store_recv_fragment_replay_result (void *cls, int64_t result,
867                                    const char *err_msg, uint16_t err_msg_size)
868 {
869   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
872               rh, result, err_msg_size, err_msg);
873
874   switch (result)
875   {
876   case GNUNET_YES:
877     break;
878
879   case GNUNET_NO:
880     GNUNET_MULTICAST_replay_response (rh, NULL,
881                                       GNUNET_MULTICAST_REC_NOT_FOUND);
882     break;
883
884   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
885     GNUNET_MULTICAST_replay_response (rh, NULL,
886                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
887     break;
888
889   case GNUNET_SYSERR:
890     GNUNET_MULTICAST_replay_response (rh, NULL,
891                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
892     break;
893   }
894   GNUNET_MULTICAST_replay_response_end (rh);
895 }
896
897
898 /**
899  * Incoming fragment replay request from multicast.
900  */
901 static void
902 mcast_recv_replay_fragment (void *cls,
903                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
904                             uint64_t fragment_id, uint64_t flags,
905                             struct GNUNET_MULTICAST_ReplayHandle *rh)
906
907 {
908   struct Channel *chn = cls;
909   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
910                                  fragment_id, fragment_id,
911                                  &store_recv_fragment_replay,
912                                  &store_recv_fragment_replay_result, rh);
913 }
914
915
916 /**
917  * Incoming message replay request from multicast.
918  */
919 static void
920 mcast_recv_replay_message (void *cls,
921                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
922                            uint64_t message_id,
923                            uint64_t fragment_offset,
924                            uint64_t flags,
925                            struct GNUNET_MULTICAST_ReplayHandle *rh)
926 {
927   struct Channel *chn = cls;
928   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
929                                 message_id, message_id, 1, NULL,
930                                 &store_recv_fragment_replay,
931                                 &store_recv_fragment_replay_result, rh);
932 }
933
934
935 /**
936  * Convert an uint64_t in network byte order to a HashCode
937  * that can be used as key in a MultiHashMap
938  */
939 static inline void
940 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
941 {
942   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
943   /* TODO: use built-in byte swap functions if available */
944
945   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
946   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
947
948   *key = (struct GNUNET_HashCode) {};
949   *((uint64_t *) key)
950     = (n << 32) | (n >> 32);
951 }
952
953
954 /**
955  * Convert an uint64_t in host byte order to a HashCode
956  * that can be used as key in a MultiHashMap
957  */
958 static inline void
959 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
960 {
961 #if __BYTE_ORDER == __BIG_ENDIAN
962   hash_key_from_nll (key, n);
963 #elif __BYTE_ORDER == __LITTLE_ENDIAN
964   *key = (struct GNUNET_HashCode) {};
965   *((uint64_t *) key) = n;
966 #else
967   #error byteorder undefined
968 #endif
969 }
970
971
972 /**
973  * Initialize PSYC message header.
974  */
975 static inline void
976 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
977                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
978 {
979   uint16_t size = ntohs (mmsg->header.size);
980   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
981
982   pmsg->header.size = htons (psize);
983   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
984   pmsg->message_id = mmsg->message_id;
985   pmsg->fragment_offset = mmsg->fragment_offset;
986   pmsg->flags = htonl (flags);
987
988   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
989 }
990
991
992 /**
993  * Create a new PSYC message from a multicast message for sending it to clients.
994  */
995 static inline struct GNUNET_PSYC_MessageHeader *
996 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
997 {
998   struct GNUNET_PSYC_MessageHeader *pmsg;
999   uint16_t size = ntohs (mmsg->header.size);
1000   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1001
1002   pmsg = GNUNET_malloc (psize);
1003   psyc_msg_init (pmsg, mmsg, flags);
1004   return pmsg;
1005 }
1006
1007
1008 /**
1009  * Send multicast message to all clients connected to the channel.
1010  */
1011 static void
1012 client_send_mcast_msg (struct Channel *chn,
1013                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1014                        uint32_t flags)
1015 {
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017               "%p Sending multicast message to client. "
1018               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1019               chn, GNUNET_ntohll (mmsg->fragment_id),
1020               GNUNET_ntohll (mmsg->message_id));
1021
1022   struct GNUNET_PSYC_MessageHeader *
1023     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1024   client_send_msg (chn, &pmsg->header);
1025   GNUNET_free (pmsg);
1026 }
1027
1028
1029 /**
1030  * Send multicast request to all clients connected to the channel.
1031  */
1032 static void
1033 client_send_mcast_req (struct Master *mst,
1034                        const struct GNUNET_MULTICAST_RequestHeader *req)
1035 {
1036   struct Channel *chn = &mst->chn;
1037
1038   struct GNUNET_PSYC_MessageHeader *pmsg;
1039   uint16_t size = ntohs (req->header.size);
1040   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1041
1042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043               "%p Sending multicast request to client. "
1044               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1045               chn, GNUNET_ntohll (req->fragment_id),
1046               GNUNET_ntohll (req->request_id));
1047
1048   pmsg = GNUNET_malloc (psize);
1049   pmsg->header.size = htons (psize);
1050   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1051   pmsg->message_id = req->request_id;
1052   pmsg->fragment_offset = req->fragment_offset;
1053   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1054   pmsg->slave_key = req->member_key;
1055
1056   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1057   client_send_msg (chn, &pmsg->header);
1058   GNUNET_free (pmsg);
1059 }
1060
1061
1062 /**
1063  * Insert a multicast message fragment into the queue belonging to the message.
1064  *
1065  * @param chn          Channel.
1066  * @param mmsg         Multicast message fragment.
1067  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1068  * @param first_ptype  First PSYC message part type in @a mmsg.
1069  * @param last_ptype   Last PSYC message part type in @a mmsg.
1070  */
1071 static void
1072 fragment_queue_insert (struct Channel *chn,
1073                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1074                        uint16_t first_ptype, uint16_t last_ptype)
1075 {
1076   const uint16_t size = ntohs (mmsg->header.size);
1077   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1078   struct GNUNET_CONTAINER_MultiHashMap
1079     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1080                                                     &chn->pub_key_hash);
1081
1082   struct GNUNET_HashCode msg_id_hash;
1083   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1084
1085   struct FragmentQueue
1086     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1087
1088   if (NULL == fragq)
1089   {
1090     fragq = GNUNET_new (struct FragmentQueue);
1091     fragq->state = MSG_FRAG_STATE_HEADER;
1092     fragq->fragments
1093       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1094
1095     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1096                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1097
1098     if (NULL == chan_msgs)
1099     {
1100       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1101       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1102                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1103     }
1104   }
1105
1106   struct GNUNET_HashCode frag_id_hash;
1107   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1108   struct RecvCacheEntry
1109     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1110   if (NULL == cache_entry)
1111   {
1112     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                 "%p Adding message fragment to cache. "
1114                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1115                 chn, GNUNET_ntohll (mmsg->message_id),
1116                 GNUNET_ntohll (mmsg->fragment_id));
1117     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118                 "%p header_size: %" PRIu64 " + %u\n",
1119                 chn, fragq->header_size, size);
1120     cache_entry = GNUNET_new (struct RecvCacheEntry);
1121     cache_entry->ref_count = 1;
1122     cache_entry->mmsg = GNUNET_malloc (size);
1123     memcpy (cache_entry->mmsg, mmsg, size);
1124     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1125                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1126   }
1127   else
1128   {
1129     cache_entry->ref_count++;
1130     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131                 "%p Message fragment is already in cache. "
1132                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1133                 ", ref_count: %u\n",
1134                 chn, GNUNET_ntohll (mmsg->message_id),
1135                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1136   }
1137
1138   if (MSG_FRAG_STATE_HEADER == fragq->state)
1139   {
1140     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1141     {
1142       struct GNUNET_PSYC_MessageMethod *
1143         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1144       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1145       fragq->flags = ntohl (pmeth->flags);
1146     }
1147
1148     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1149     {
1150       fragq->header_size += size;
1151     }
1152     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1153              || frag_offset == fragq->header_size)
1154     { /* header is now complete */
1155       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1156                   "%p Header of message %" PRIu64 " is complete.\n",
1157                   chn, GNUNET_ntohll (mmsg->message_id));
1158
1159       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1160                   "%p Adding message %" PRIu64 " to queue.\n",
1161                   chn, GNUNET_ntohll (mmsg->message_id));
1162       fragq->state = MSG_FRAG_STATE_DATA;
1163     }
1164     else
1165     {
1166       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1167                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1168                   "%" PRIu64 " != %" PRIu64 "\n",
1169                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1170                   fragq->header_size);
1171     }
1172   }
1173
1174   switch (last_ptype)
1175   {
1176   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1177     if (frag_offset == fragq->size)
1178       fragq->state = MSG_FRAG_STATE_END;
1179     else
1180       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1181                   "%p Message %" PRIu64 " is NOT complete yet: "
1182                   "%" PRIu64 " != %" PRIu64 "\n",
1183                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1184                   fragq->size);
1185     break;
1186
1187   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1188     /* Drop message without delivering to client if it's a single fragment */
1189     fragq->state =
1190       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1191       ? MSG_FRAG_STATE_DROP
1192       : MSG_FRAG_STATE_CANCEL;
1193   }
1194
1195   switch (fragq->state)
1196   {
1197   case MSG_FRAG_STATE_DATA:
1198   case MSG_FRAG_STATE_END:
1199   case MSG_FRAG_STATE_CANCEL:
1200     if (GNUNET_NO == fragq->is_queued)
1201     {
1202       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1203                                     GNUNET_ntohll (mmsg->message_id));
1204       fragq->is_queued = GNUNET_YES;
1205     }
1206   }
1207
1208   fragq->size += size;
1209   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1210                                 GNUNET_ntohll (mmsg->fragment_id));
1211 }
1212
1213
1214 /**
1215  * Run fragment queue of a message.
1216  *
1217  * Send fragments of a message in order to client, after all modifiers arrived
1218  * from multicast.
1219  *
1220  * @param chn      Channel.
1221  * @param msg_id  ID of the message @a fragq belongs to.
1222  * @param fragq   Fragment queue of the message.
1223  * @param drop    Drop message without delivering to client?
1224  *                #GNUNET_YES or #GNUNET_NO.
1225  */
1226 static void
1227 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1228                     struct FragmentQueue *fragq, uint8_t drop)
1229 {
1230   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1231               "%p Running message fragment queue for message %" PRIu64
1232               " (state: %u).\n",
1233               chn, msg_id, fragq->state);
1234
1235   struct GNUNET_CONTAINER_MultiHashMap
1236     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1237                                                     &chn->pub_key_hash);
1238   GNUNET_assert (NULL != chan_msgs);
1239   uint64_t frag_id;
1240
1241   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1242                                                     &frag_id))
1243   {
1244     struct GNUNET_HashCode frag_id_hash;
1245     hash_key_from_hll (&frag_id_hash, frag_id);
1246     struct RecvCacheEntry *cache_entry
1247       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1248     if (cache_entry != NULL)
1249     {
1250       if (GNUNET_NO == drop)
1251       {
1252         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1253       }
1254       if (cache_entry->ref_count <= 1)
1255       {
1256         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1257                                               cache_entry);
1258         GNUNET_free (cache_entry->mmsg);
1259         GNUNET_free (cache_entry);
1260       }
1261       else
1262       {
1263         cache_entry->ref_count--;
1264       }
1265     }
1266 #if CACHE_AGING_IMPLEMENTED
1267     else if (GNUNET_NO == drop)
1268     {
1269       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1270     }
1271 #endif
1272
1273     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1274   }
1275
1276   if (MSG_FRAG_STATE_END <= fragq->state)
1277   {
1278     struct GNUNET_HashCode msg_id_hash;
1279     hash_key_from_hll (&msg_id_hash, msg_id);
1280
1281     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1282     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1283     GNUNET_free (fragq);
1284   }
1285   else
1286   {
1287     fragq->is_queued = GNUNET_NO;
1288   }
1289 }
1290
1291
1292 struct StateModifyClosure
1293 {
1294   struct Channel *chn;
1295   uint64_t msg_id;
1296   struct GNUNET_HashCode msg_id_hash;
1297 };
1298
1299
1300 void
1301 store_recv_state_modify_result (void *cls, int64_t result,
1302                                 const char *err_msg, uint16_t err_msg_size)
1303 {
1304   struct StateModifyClosure *mcls = cls;
1305   struct Channel *chn = mcls->chn;
1306   uint64_t msg_id = mcls->msg_id;
1307
1308   struct FragmentQueue *
1309     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1310
1311   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1313               chn, result, err_msg_size, err_msg);
1314
1315   switch (result)
1316   {
1317   case GNUNET_OK:
1318   case GNUNET_NO:
1319     if (NULL != fragq)
1320       fragq->state_is_modified = GNUNET_YES;
1321     if (chn->max_state_message_id < msg_id)
1322       chn->max_state_message_id = msg_id;
1323     if (chn->max_message_id < msg_id)
1324       chn->max_message_id = msg_id;
1325
1326     if (NULL != fragq)
1327       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1328     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1329     message_queue_run (chn);
1330     break;
1331
1332   default:
1333     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1334                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1335                 chn, result, err_msg_size, err_msg);
1336     /** @todo FIXME: handle state_modify error */
1337   }
1338 }
1339
1340
1341 /**
1342  * Run message queue.
1343  *
1344  * Send messages in queue to client in order after a message has arrived from
1345  * multicast, according to the following:
1346  * - A message is only sent if all of its modifiers arrived.
1347  * - A stateful message is only sent if the previous stateful message
1348  *   has already been delivered to the client.
1349  *
1350  * @param chn  Channel.
1351  *
1352  * @return Number of messages removed from queue and sent to client.
1353  */
1354 static uint64_t
1355 message_queue_run (struct Channel *chn)
1356 {
1357   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1358               "%p Running message queue.\n", chn);
1359   uint64_t n = 0;
1360   uint64_t msg_id;
1361
1362   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1363                                                     &msg_id))
1364   {
1365     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1366                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1367     struct GNUNET_HashCode msg_id_hash;
1368     hash_key_from_hll (&msg_id_hash, msg_id);
1369
1370     struct FragmentQueue *
1371       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1372
1373     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1374     {
1375       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1376                   "%p No fragq (%p) or header not complete.\n",
1377                   chn, fragq);
1378       break;
1379     }
1380
1381     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1382                 "%p Fragment queue entry:  state: %u, state delta: "
1383                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1384                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1385
1386     if (MSG_FRAG_STATE_DATA <= fragq->state)
1387     {
1388       /* Check if there's a missing message before the current one */
1389       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1390       {
1391         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1392
1393         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1394             && (chn->max_message_id != msg_id - 1
1395                 && chn->max_message_id != msg_id))
1396         {
1397           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1398                       "%p Out of order message. "
1399                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1400                       chn, chn->max_message_id, msg_id);
1401           break;
1402           // FIXME: keep track of messages processed in this queue run,
1403           //        and only stop after reaching the end
1404         }
1405       }
1406       else
1407       {
1408         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1409         if (GNUNET_YES != fragq->state_is_modified)
1410         {
1411           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1412           {
1413             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1414                         "%p Out of order stateful message. "
1415                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1416                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1417             break;
1418             // FIXME: keep track of messages processed in this queue run,
1419             //        and only stop after reaching the end
1420           }
1421
1422           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1423           mcls->chn = chn;
1424           mcls->msg_id = msg_id;
1425           mcls->msg_id_hash = msg_id_hash;
1426
1427           /* Apply modifiers to state in PSYCstore */
1428           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1429                                          fragq->state_delta,
1430                                          store_recv_state_modify_result, mcls);
1431           break; // continue after asynchronous state modify result
1432         }
1433       }
1434       chn->max_message_id = msg_id;
1435     }
1436     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1437     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1438     n++;
1439   }
1440
1441   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1442               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1443   return n;
1444 }
1445
1446
1447 /**
1448  * Drop message queue of a channel.
1449  *
1450  * Remove all messages in queue without sending it to clients.
1451  *
1452  * @param chn  Channel.
1453  *
1454  * @return Number of messages removed from queue.
1455  */
1456 static uint64_t
1457 message_queue_drop (struct Channel *chn)
1458 {
1459   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1460               "%p Dropping message queue.\n", chn);
1461   uint64_t n = 0;
1462   uint64_t msg_id;
1463   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1464                                                     &msg_id))
1465   {
1466     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1467                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1468     struct GNUNET_HashCode msg_id_hash;
1469     hash_key_from_hll (&msg_id_hash, msg_id);
1470
1471     struct FragmentQueue *
1472       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1473     GNUNET_assert (NULL != fragq);
1474     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1475     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1476     n++;
1477   }
1478   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1479               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1480   return n;
1481 }
1482
1483
1484 /**
1485  * Received result of GNUNET_PSYCSTORE_fragment_store().
1486  */
1487 static void
1488 store_recv_fragment_store_result (void *cls, int64_t result,
1489                                   const char *err_msg, uint16_t err_msg_size)
1490 {
1491   struct Channel *chn = cls;
1492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1494               chn, result, err_msg_size, err_msg);
1495 }
1496
1497
1498 /**
1499  * Handle incoming message fragment from multicast.
1500  *
1501  * Store it using PSYCstore and send it to the clients of the channel in order.
1502  */
1503 static void
1504 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1505 {
1506   struct Channel *chn = cls;
1507   uint16_t size = ntohs (mmsg->header.size);
1508
1509   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1510               "%p Received multicast message of size %u.\n",
1511               chn, size);
1512
1513   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1514                                    &store_recv_fragment_store_result, chn);
1515
1516   uint16_t first_ptype = 0, last_ptype = 0;
1517   if (GNUNET_SYSERR
1518       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1519                                           (const char *) &mmsg[1],
1520                                           &first_ptype, &last_ptype))
1521   {
1522     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1523                 "%p Dropping incoming multicast message with invalid parts.\n",
1524                 chn);
1525     GNUNET_break_op (0);
1526     return;
1527   }
1528
1529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530               "Message parts: first: type %u, last: type %u\n",
1531               first_ptype, last_ptype);
1532
1533   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1534   message_queue_run (chn);
1535 }
1536
1537
1538 /**
1539  * Incoming request fragment from multicast for a master.
1540  *
1541  * @param cls   Master.
1542  * @param req   The request.
1543  */
1544 static void
1545 mcast_recv_request (void *cls,
1546                     const struct GNUNET_MULTICAST_RequestHeader *req)
1547 {
1548   struct Master *mst = cls;
1549   uint16_t size = ntohs (req->header.size);
1550
1551   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_key);
1552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553               "%p Received multicast request of size %u from %s.\n",
1554               mst, size, str);
1555   GNUNET_free (str);
1556
1557   uint16_t first_ptype = 0, last_ptype = 0;
1558   if (GNUNET_SYSERR
1559       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1560                                           (const char *) &req[1],
1561                                           &first_ptype, &last_ptype))
1562   {
1563     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1564                 "%p Dropping incoming multicast request with invalid parts.\n",
1565                 mst);
1566     GNUNET_break_op (0);
1567     return;
1568   }
1569
1570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1571               "Message parts: first: type %u, last: type %u\n",
1572               first_ptype, last_ptype);
1573
1574   /* FIXME: in-order delivery */
1575   client_send_mcast_req (mst, req);
1576 }
1577
1578
1579 /**
1580  * Response from PSYCstore with the current counter values for a channel master.
1581  */
1582 static void
1583 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1584                             uint64_t max_message_id, uint64_t max_group_generation,
1585                             uint64_t max_state_message_id)
1586 {
1587   struct Master *mst = cls;
1588   struct Channel *chn = &mst->chn;
1589   chn->store_op = NULL;
1590
1591   struct GNUNET_PSYC_CountersResultMessage res;
1592   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1593   res.header.size = htons (sizeof (res));
1594   res.result_code = htonl (result);
1595   res.max_message_id = GNUNET_htonll (max_message_id);
1596
1597   if (GNUNET_OK == result || GNUNET_NO == result)
1598   {
1599     mst->max_message_id = max_message_id;
1600     chn->max_message_id = max_message_id;
1601     chn->max_state_message_id = max_state_message_id;
1602     mst->max_group_generation = max_group_generation;
1603     mst->origin
1604       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1605                                        mcast_recv_join_request,
1606                                        mcast_recv_membership_test,
1607                                        mcast_recv_replay_fragment,
1608                                        mcast_recv_replay_message,
1609                                        mcast_recv_request,
1610                                        mcast_recv_message, chn);
1611     chn->is_ready = GNUNET_YES;
1612   }
1613   else
1614   {
1615     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1616                 "%p GNUNET_PSYCSTORE_counters_get() "
1617                 "returned %d for channel %s.\n",
1618                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1619   }
1620
1621   client_send_msg (chn, &res.header);
1622 }
1623
1624
1625 /**
1626  * Response from PSYCstore with the current counter values for a channel slave.
1627  */
1628 void
1629 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1630                            uint64_t max_message_id, uint64_t max_group_generation,
1631                            uint64_t max_state_message_id)
1632 {
1633   struct Slave *slv = cls;
1634   struct Channel *chn = &slv->chn;
1635   chn->store_op = NULL;
1636
1637   struct GNUNET_PSYC_CountersResultMessage res;
1638   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1639   res.header.size = htons (sizeof (res));
1640   res.result_code = htonl (result);
1641   res.max_message_id = GNUNET_htonll (max_message_id);
1642
1643   if (GNUNET_OK == result || GNUNET_NO == result)
1644   {
1645     chn->max_message_id = max_message_id;
1646     chn->max_state_message_id = max_state_message_id;
1647     slv->member
1648       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1649                                       &slv->origin,
1650                                       slv->relay_count, slv->relays,
1651                                       &slv->join_msg->header,
1652                                       mcast_recv_join_request,
1653                                       mcast_recv_join_decision,
1654                                       mcast_recv_membership_test,
1655                                       mcast_recv_replay_fragment,
1656                                       mcast_recv_replay_message,
1657                                       mcast_recv_message, chn);
1658     if (NULL != slv->join_msg)
1659     {
1660       GNUNET_free (slv->join_msg);
1661       slv->join_msg = NULL;
1662     }
1663   }
1664   else
1665   {
1666     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1667                 "%p GNUNET_PSYCSTORE_counters_get() "
1668                 "returned %d for channel %s.\n",
1669                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1670   }
1671
1672   client_send_msg (chn, &res.header);
1673 }
1674
1675
1676 static void
1677 channel_init (struct Channel *chn)
1678 {
1679   chn->recv_msgs
1680     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1681   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1682 }
1683
1684
1685 /**
1686  * Handle a connecting client starting a channel master.
1687  */
1688 static void
1689 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1690                           const struct GNUNET_MessageHeader *msg)
1691 {
1692   const struct MasterStartRequest *req
1693     = (const struct MasterStartRequest *) msg;
1694
1695   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1696   struct GNUNET_HashCode pub_key_hash;
1697
1698   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1699   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1700
1701   struct Master *
1702     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1703   struct Channel *chn;
1704
1705   if (NULL == mst)
1706   {
1707     mst = GNUNET_new (struct Master);
1708     mst->policy = ntohl (req->policy);
1709     mst->priv_key = req->channel_key;
1710     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1711
1712     chn = &mst->chn;
1713     chn->is_master = GNUNET_YES;
1714     chn->pub_key = pub_key;
1715     chn->pub_key_hash = pub_key_hash;
1716     channel_init (chn);
1717
1718     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1719                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1720     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1721                                                    store_recv_master_counters, mst);
1722   }
1723   else
1724   {
1725     chn = &mst->chn;
1726
1727     struct GNUNET_PSYC_CountersResultMessage res;
1728     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1729     res.header.size = htons (sizeof (res));
1730     res.result_code = htonl (GNUNET_OK);
1731     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1732
1733     GNUNET_SERVER_notification_context_add (nc, client);
1734     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1735                                                 GNUNET_NO);
1736   }
1737
1738   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739               "%p Client connected as master to channel %s.\n",
1740               mst, GNUNET_h2s (&chn->pub_key_hash));
1741
1742   struct Client *cli = GNUNET_new (struct Client);
1743   cli->client = client;
1744   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1745
1746   GNUNET_SERVER_client_set_user_context (client, chn);
1747   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1748 }
1749
1750
1751 /**
1752  * Handle a connecting client joining as a channel slave.
1753  */
1754 static void
1755 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1756                         const struct GNUNET_MessageHeader *msg)
1757 {
1758   const struct SlaveJoinRequest *req
1759     = (const struct SlaveJoinRequest *) msg;
1760   uint16_t req_size = ntohs (req->header.size);
1761
1762   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1763   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1764
1765   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1766   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1767   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1768
1769   struct GNUNET_CONTAINER_MultiHashMap *
1770     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1771   struct Slave *slv = NULL;
1772   struct Channel *chn;
1773
1774   if (NULL != chn_slv)
1775   {
1776     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1777   }
1778   if (NULL == slv)
1779   {
1780     slv = GNUNET_new (struct Slave);
1781     slv->priv_key = req->slave_key;
1782     slv->pub_key = slv_pub_key;
1783     slv->pub_key_hash = slv_pub_key_hash;
1784     slv->origin = req->origin;
1785     slv->relay_count = ntohl (req->relay_count);
1786
1787     const struct GNUNET_PeerIdentity *
1788       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1789     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1790     uint16_t join_msg_size = 0;
1791
1792     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1793         <= req_size)
1794     {
1795       struct GNUNET_PSYC_Message *
1796         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1797       join_msg_size = ntohs (join_msg->header.size);
1798       slv->join_msg = GNUNET_malloc (join_msg_size);
1799       memcpy (slv->join_msg, join_msg, join_msg_size);
1800     }
1801     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1802     {
1803       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1804                   "%u + %u + %u != %u\n",
1805                   sizeof (*req), relay_size, join_msg_size, req_size);
1806       GNUNET_break (0);
1807       GNUNET_SERVER_client_disconnect (client);
1808       GNUNET_free (slv);
1809       return;
1810     }
1811     if (0 < slv->relay_count)
1812     {
1813       slv->relays = GNUNET_malloc (relay_size);
1814       memcpy (slv->relays, &req[1], relay_size);
1815     }
1816
1817     chn = &slv->chn;
1818     chn->is_master = GNUNET_NO;
1819     chn->pub_key = req->channel_key;
1820     chn->pub_key_hash = pub_key_hash;
1821     channel_init (chn);
1822
1823     if (NULL == chn_slv)
1824     {
1825       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1826       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1827                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1828     }
1829     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1830                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1831     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1832                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1833     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1834                                                   &store_recv_slave_counters, slv);
1835   }
1836   else
1837   {
1838     chn = &slv->chn;
1839
1840     struct GNUNET_PSYC_CountersResultMessage res;
1841     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1842     res.header.size = htons (sizeof (res));
1843     res.result_code = htonl (GNUNET_OK);
1844     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1845
1846     GNUNET_SERVER_notification_context_add (nc, client);
1847     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1848                                                 GNUNET_NO);
1849
1850     if (NULL == slv->member)
1851     {
1852       slv->member
1853         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1854                                         &slv->origin,
1855                                         slv->relay_count, slv->relays,
1856                                         &slv->join_msg->header,
1857                                         &mcast_recv_join_request,
1858                                         &mcast_recv_join_decision,
1859                                         &mcast_recv_membership_test,
1860                                         &mcast_recv_replay_fragment,
1861                                         &mcast_recv_replay_message,
1862                                         &mcast_recv_message, chn);
1863       if (NULL != slv->join_msg)
1864       {
1865         GNUNET_free (slv->join_msg);
1866         slv->join_msg = NULL;
1867       }
1868     }
1869     else if (NULL != slv->join_dcsn)
1870     {
1871       GNUNET_SERVER_notification_context_add (nc, client);
1872       GNUNET_SERVER_notification_context_unicast (nc, client,
1873                                                   &slv->join_dcsn->header,
1874                                                   GNUNET_NO);
1875     }
1876   }
1877
1878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1879               "%p Client connected as slave to channel %s.\n",
1880               slv, GNUNET_h2s (&chn->pub_key_hash));
1881
1882   struct Client *cli = GNUNET_new (struct Client);
1883   cli->client = client;
1884   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1885
1886   GNUNET_SERVER_client_set_user_context (client, chn);
1887   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1888 }
1889
1890
1891 struct JoinDecisionClosure
1892 {
1893   int32_t is_admitted;
1894   struct GNUNET_MessageHeader *msg;
1895 };
1896
1897
1898 /**
1899  * Iterator callback for sending join decisions to multicast.
1900  */
1901 static int
1902 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1903                           void *value)
1904 {
1905   struct JoinDecisionClosure *jcls = cls;
1906   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1907   // FIXME: add relays
1908   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1909   return GNUNET_YES;
1910 }
1911
1912
1913 /**
1914  * Join decision from client.
1915  */
1916 static void
1917 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1918                            const struct GNUNET_MessageHeader *msg)
1919 {
1920   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1921     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1922   struct Channel *chn;
1923   struct Master *mst;
1924   struct JoinDecisionClosure jcls;
1925
1926   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1927   if (NULL == chn)
1928   {
1929     GNUNET_break (0);
1930     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1931     return;
1932   }
1933   GNUNET_assert (GNUNET_YES == chn->is_master);
1934   mst = (struct Master *) chn;
1935   jcls.is_admitted = ntohl (dcsn->is_admitted);
1936   jcls.msg
1937     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1938     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1939     : NULL;
1940
1941   struct GNUNET_HashCode slave_key_hash;
1942   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1943                       &slave_key_hash);
1944
1945   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1946               "%p Got join decision (%d) from client for channel %s..\n",
1947               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1948   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1949               "%p ..and slave %s.\n",
1950               mst, GNUNET_h2s (&slave_key_hash));
1951
1952   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1953                                               &mcast_send_join_decision, &jcls);
1954   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1955   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1956 }
1957
1958
1959 /**
1960  * Send acknowledgement to a client.
1961  *
1962  * Sent after a message fragment has been passed on to multicast.
1963  *
1964  * @param chn The channel struct for the client.
1965  */
1966 static void
1967 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1968 {
1969   struct GNUNET_MessageHeader res;
1970   res.size = htons (sizeof (res));
1971   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1972
1973   /* FIXME */
1974   GNUNET_SERVER_notification_context_add (nc, client);
1975   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1976 }
1977
1978
1979 /**
1980  * Callback for the transmit functions of multicast.
1981  */
1982 static int
1983 transmit_notify (void *cls, size_t *data_size, void *data)
1984 {
1985   struct Channel *chn = cls;
1986   struct TransmitMessage *tmit_msg = chn->tmit_head;
1987
1988   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1989   {
1990     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991                 "%p transmit_notify: nothing to send.\n", chn);
1992     *data_size = 0;
1993     return GNUNET_NO;
1994   }
1995
1996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1997               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1998
1999   *data_size = tmit_msg->size;
2000   memcpy (data, &tmit_msg[1], *data_size);
2001
2002   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
2003
2004   if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
2005     send_message_ack (chn, tmit_msg->client);
2006
2007   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2008   GNUNET_free (tmit_msg);
2009
2010   if (NULL != chn->tmit_head)
2011   {
2012     transmit_message (chn);
2013   }
2014   else if (GNUNET_YES == chn->is_disconnected)
2015   {
2016     /* FIXME: handle partial message (when still in_transmit) */
2017     return GNUNET_SYSERR;
2018   }
2019   return ret;
2020 }
2021
2022
2023 /**
2024  * Callback for the transmit functions of multicast.
2025  */
2026 static int
2027 master_transmit_notify (void *cls, size_t *data_size, void *data)
2028 {
2029   int ret = transmit_notify (cls, data_size, data);
2030
2031   if (GNUNET_YES == ret)
2032   {
2033     struct Master *mst = cls;
2034     mst->tmit_handle = NULL;
2035   }
2036   return ret;
2037 }
2038
2039
2040 /**
2041  * Callback for the transmit functions of multicast.
2042  */
2043 static int
2044 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2045 {
2046   int ret = transmit_notify (cls, data_size, data);
2047
2048   if (GNUNET_YES == ret)
2049   {
2050     struct Slave *slv = cls;
2051     slv->tmit_handle = NULL;
2052   }
2053   return ret;
2054 }
2055
2056
2057 /**
2058  * Transmit a message from a channel master to the multicast group.
2059  */
2060 static void
2061 master_transmit_message (struct Master *mst)
2062 {
2063   if (NULL == mst->tmit_handle)
2064   {
2065     mst->tmit_handle
2066       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
2067                                         mst->max_group_generation,
2068                                         master_transmit_notify, mst);
2069   }
2070   else
2071   {
2072     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2073   }
2074 }
2075
2076
2077 /**
2078  * Transmit a message from a channel slave to the multicast group.
2079  */
2080 static void
2081 slave_transmit_message (struct Slave *slv)
2082 {
2083   if (NULL == slv->tmit_handle)
2084   {
2085     slv->tmit_handle
2086       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
2087                                            slave_transmit_notify, slv);
2088   }
2089   else
2090   {
2091     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2092   }
2093 }
2094
2095
2096 static void
2097 transmit_message (struct Channel *chn)
2098 {
2099   chn->is_master
2100     ? master_transmit_message ((struct Master *) chn)
2101     : slave_transmit_message ((struct Slave *) chn);
2102 }
2103
2104
2105 /**
2106  * Queue a message from a channel master for sending to the multicast group.
2107  */
2108 static void
2109 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2110                      uint16_t first_ptype, uint16_t last_ptype)
2111 {
2112   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2113
2114   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2115   {
2116     tmit_msg->id = ++mst->max_message_id;
2117     struct GNUNET_PSYC_MessageMethod *pmeth
2118       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2119
2120     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2121     {
2122       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2123     }
2124     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2125     {
2126       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2127                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2128                   mst, tmit_msg->id - mst->max_state_message_id);
2129       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2130                                           - mst->max_state_message_id);
2131       mst->max_state_message_id = tmit_msg->id;
2132     }
2133     else
2134     {
2135         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2136                     "%p master_queue_message: state not modified\n", mst);
2137       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2138     }
2139
2140     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2141     {
2142       /// @todo add state_hash to PSYC header
2143     }
2144   }
2145 }
2146
2147
2148 /**
2149  * Queue a message from a channel slave for sending to the multicast group.
2150  */
2151 static void
2152 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
2153                      uint16_t first_ptype, uint16_t last_ptype)
2154 {
2155   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2156   {
2157     struct GNUNET_PSYC_MessageMethod *pmeth
2158       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2159     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2160     tmit_msg->id = ++slv->max_request_id;
2161   }
2162 }
2163
2164
2165 /**
2166  * Queue PSYC message parts for sending to multicast.
2167  *
2168  * @param chn           Channel to send to.
2169  * @param client       Client the message originates from.
2170  * @param data_size    Size of @a data.
2171  * @param data         Concatenated message parts.
2172  * @param first_ptype  First message part type in @a data.
2173  * @param last_ptype   Last message part type in @a data.
2174  */
2175 static struct TransmitMessage *
2176 queue_message (struct Channel *chn,
2177                struct GNUNET_SERVER_Client *client,
2178                size_t data_size,
2179                const void *data,
2180                uint16_t first_ptype, uint16_t last_ptype)
2181 {
2182   struct TransmitMessage *
2183     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2184   memcpy (&tmit_msg[1], data, data_size);
2185   tmit_msg->client = client;
2186   tmit_msg->size = data_size;
2187   tmit_msg->state = chn->tmit_state;
2188
2189   /* FIXME: separate queue per message ID */
2190
2191   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2192
2193   chn->is_master
2194     ? master_queue_message ((struct Master *) chn, tmit_msg,
2195                             first_ptype, last_ptype)
2196     : slave_queue_message ((struct Slave *) chn, tmit_msg,
2197                            first_ptype, last_ptype);
2198   return tmit_msg;
2199 }
2200
2201
2202 /**
2203  * Cancel transmission of current message.
2204  *
2205  * @param chn     Channel to send to.
2206  * @param client  Client the message originates from.
2207  */
2208 static void
2209 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2210 {
2211   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2212
2213   struct GNUNET_MessageHeader msg;
2214   msg.size = htons (sizeof (msg));
2215   msg.type = htons (type);
2216
2217   queue_message (chn, client, sizeof (msg), &msg, type, type);
2218   transmit_message (chn);
2219
2220   /* FIXME: cleanup */
2221 }
2222
2223
2224 /**
2225  * Incoming message from a master or slave client.
2226  */
2227 static void
2228 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2229                           const struct GNUNET_MessageHeader *msg)
2230 {
2231   struct Channel *
2232     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2233   GNUNET_assert (NULL != chn);
2234
2235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236               "%p Received message from client.\n", chn);
2237   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2238
2239   if (GNUNET_YES != chn->is_ready)
2240   {
2241     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2242                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2243     GNUNET_break (0);
2244     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2245     return;
2246   }
2247
2248   uint16_t size = ntohs (msg->size);
2249   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2250   {
2251     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2252                 "%p Message payload too large: %u < %u.\n",
2253                 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2254     GNUNET_break (0);
2255     transmit_cancel (chn, client);
2256     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2257     return;
2258   }
2259
2260   uint16_t first_ptype = 0, last_ptype = 0;
2261   if (GNUNET_SYSERR
2262       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2263                                           (const char *) &msg[1],
2264                                           &first_ptype, &last_ptype))
2265   {
2266     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2267                 "%p Received invalid message part from client.\n", chn);
2268     GNUNET_break (0);
2269     transmit_cancel (chn, client);
2270     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2271     return;
2272   }
2273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2274               "%p Received message with first part type %u and last part type %u.\n",
2275               chn, first_ptype, last_ptype);
2276
2277   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2278                  first_ptype, last_ptype);
2279   transmit_message (chn);
2280   /* FIXME: send a few ACKs even before transmit_notify is called */
2281
2282   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2283 };
2284
2285
2286 /**
2287  * Received result of GNUNET_PSYCSTORE_membership_store()
2288  */
2289 static void
2290 store_recv_membership_store_result (void *cls, int64_t result,
2291                                     const char *err_msg, uint16_t err_msg_size)
2292 {
2293   struct Operation *op = cls;
2294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2296               op->chn, result, err_msg_size, err_msg);
2297
2298   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2299   op_remove (op);
2300 }
2301
2302
2303 /**
2304  * Client requests to add/remove a slave in the membership database.
2305  */
2306 static void
2307 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2308                               const struct GNUNET_MessageHeader *msg)
2309 {
2310   struct Channel *
2311     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2312   GNUNET_assert (NULL != chn);
2313
2314   const struct ChannelMembershipStoreRequest *
2315     req = (const struct ChannelMembershipStoreRequest *) msg;
2316
2317   struct Operation *op = op_add (chn, client, req->op_id, 0);
2318
2319   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2320   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2321   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2322               "%p Received membership store request from client.\n", chn);
2323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2324               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2325               chn, req->did_join, announced_at, effective_since);
2326
2327   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2328                                      req->did_join, announced_at, effective_since,
2329                                      0, /* FIXME: group_generation */
2330                                      &store_recv_membership_store_result, op);
2331   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2332 }
2333
2334
2335 /**
2336  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2337  * in response to a history request from a client.
2338  */
2339 static int
2340 store_recv_fragment_history (void *cls,
2341                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2342                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2343 {
2344   struct Operation *op = cls;
2345   if (NULL == op->client)
2346   { /* Requesting client already disconnected. */
2347     return GNUNET_NO;
2348   }
2349   struct Channel *chn = op->chn;
2350
2351   struct GNUNET_PSYC_MessageHeader *pmsg;
2352   uint16_t msize = ntohs (mmsg->header.size);
2353   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2354
2355   struct GNUNET_OperationResultMessage *
2356     res = GNUNET_malloc (sizeof (*res) + psize);
2357   res->header.size = htons (sizeof (*res) + psize);
2358   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2359   res->op_id = op->op_id;
2360   res->result_code = GNUNET_htonll (GNUNET_OK);
2361
2362   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2363   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2364   memcpy (&res[1], pmsg, psize);
2365
2366   /** @todo FIXME: send only to requesting client */
2367   client_send_msg (chn, &res->header);
2368   return GNUNET_YES;
2369 }
2370
2371
2372 /**
2373  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2374  * in response to a history request from a client.
2375  */
2376 static void
2377 store_recv_fragment_history_result (void *cls, int64_t result,
2378                                     const char *err_msg, uint16_t err_msg_size)
2379 {
2380   struct Operation *op = cls;
2381   if (NULL == op->client)
2382   { /* Requesting client already disconnected. */
2383     return;
2384   }
2385
2386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2387               "%p History replay #%" PRIu64 ": "
2388               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2389               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2390
2391   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2392   {
2393     /** @todo Multicast replay request for messages not found locally. */
2394   }
2395
2396   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2397   op_remove (op);
2398 }
2399
2400
2401 /**
2402  * Client requests channel history.
2403  */
2404 static void
2405 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2406                             const struct GNUNET_MessageHeader *msg)
2407 {
2408   struct Channel *
2409     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2410   GNUNET_assert (NULL != chn);
2411
2412   const struct GNUNET_PSYC_HistoryRequestMessage *
2413     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2414   uint16_t size = ntohs (msg->size);
2415   const char *method_prefix = (const char *) &req[1];
2416
2417   if (size < sizeof (*req) + 1
2418       || '\0' != method_prefix[size - sizeof (*req) - 1])
2419   {
2420     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2421                 "%p History replay #%" PRIu64 ": "
2422                 "invalid method prefix. size: %u < %u?\n",
2423                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2424     GNUNET_break (0);
2425     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2426     return;
2427   }
2428
2429   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2430
2431   if (0 == req->message_limit)
2432     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2433                                   GNUNET_ntohll (req->start_message_id),
2434                                   GNUNET_ntohll (req->end_message_id),
2435                                   0, method_prefix,
2436                                   &store_recv_fragment_history,
2437                                   &store_recv_fragment_history_result, op);
2438   else
2439     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2440                                          GNUNET_ntohll (req->message_limit),
2441                                          method_prefix,
2442                                          &store_recv_fragment_history,
2443                                          &store_recv_fragment_history_result,
2444                                          op);
2445
2446   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2447 }
2448
2449
2450 /**
2451  * Received state var from PSYCstore, send it to client.
2452  */
2453 static int
2454 store_recv_state_var (void *cls, const char *name,
2455                       const void *value, uint32_t value_size)
2456 {
2457   struct Operation *op = cls;
2458   struct GNUNET_OperationResultMessage *res;
2459
2460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2461               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2462               op->chn, GNUNET_ntohll (op->op_id), name);
2463
2464   if (NULL != name) /* First part */
2465   {
2466     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2467     struct GNUNET_PSYC_MessageModifier *mod;
2468     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2469     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2470     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2471     res->op_id = op->op_id;
2472
2473     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2474     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2475     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2476     mod->name_size = htons (name_size);
2477     mod->value_size = htonl (value_size);
2478     mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2479     memcpy (&mod[1], name, name_size);
2480     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2481   }
2482   else /* Continuation */
2483   {
2484     struct GNUNET_MessageHeader *mod;
2485     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2486     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2487     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2488     res->op_id = op->op_id;
2489
2490     mod = (struct GNUNET_MessageHeader *) &res[1];
2491     mod->size = htons (sizeof (*mod) + value_size);
2492     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2493     memcpy (&mod[1], value, value_size);
2494   }
2495
2496   // FIXME: client might have been disconnected
2497   GNUNET_SERVER_notification_context_add (nc, op->client);
2498   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2499                                               GNUNET_NO);
2500   return GNUNET_YES;
2501 }
2502
2503
2504 /**
2505  * Received result of GNUNET_PSYCSTORE_state_get()
2506  * or GNUNET_PSYCSTORE_state_get_prefix()
2507  */
2508 static void
2509 store_recv_state_result (void *cls, int64_t result,
2510                          const char *err_msg, uint16_t err_msg_size)
2511 {
2512   struct Operation *op = cls;
2513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2514               "%p state_get #%" PRIu64 ": "
2515               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2516               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2517
2518   // FIXME: client might have been disconnected
2519   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2520   op_remove (op);
2521 }
2522
2523
2524 /**
2525  * Client requests best matching state variable from PSYCstore.
2526  */
2527 static void
2528 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2529                        const struct GNUNET_MessageHeader *msg)
2530 {
2531   struct Channel *
2532     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2533   GNUNET_assert (NULL != chn);
2534
2535   const struct StateRequest *
2536     req = (const struct StateRequest *) msg;
2537
2538   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2539   const char *name = (const char *) &req[1];
2540   if (0 == name_size || '\0' != name[name_size - 1])
2541   {
2542     GNUNET_break (0);
2543     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2544     return;
2545   }
2546
2547   struct Operation *op = op_add (chn, client, req->op_id, 0);
2548   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2549                               &store_recv_state_var,
2550                               &store_recv_state_result, op);
2551   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2552 }
2553
2554
2555 /**
2556  * Client requests state variables with a given prefix from PSYCstore.
2557  */
2558 static void
2559 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2560                               const struct GNUNET_MessageHeader *msg)
2561 {
2562   struct Channel *
2563     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2564   GNUNET_assert (NULL != chn);
2565
2566   const struct StateRequest *
2567     req = (const struct StateRequest *) msg;
2568
2569   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2570   const char *name = (const char *) &req[1];
2571   if (0 == name_size || '\0' != name[name_size - 1])
2572   {
2573     GNUNET_break (0);
2574     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2575     return;
2576   }
2577
2578   struct Operation *op = op_add (chn, client, req->op_id, 0);
2579   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2580                                      &store_recv_state_var,
2581                                      &store_recv_state_result, op);
2582   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2583 }
2584
2585
2586 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2587   { &client_recv_master_start, NULL,
2588     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2589
2590   { &client_recv_slave_join, NULL,
2591     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2592
2593   { &client_recv_join_decision, NULL,
2594     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2595
2596   { &client_recv_psyc_message, NULL,
2597     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2598
2599   { &client_recv_membership_store, NULL,
2600     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2601
2602   { &client_recv_history_replay, NULL,
2603     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2604
2605   { &client_recv_state_get, NULL,
2606     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2607
2608   { &client_recv_state_get_prefix, NULL,
2609     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2610
2611   { NULL, NULL, 0, 0 }
2612 };
2613
2614
2615 /**
2616  * Initialize the PSYC service.
2617  *
2618  * @param cls Closure.
2619  * @param server The initialized server.
2620  * @param c Configuration to use.
2621  */
2622 static void
2623 run (void *cls, struct GNUNET_SERVER_Handle *server,
2624      const struct GNUNET_CONFIGURATION_Handle *c)
2625 {
2626   cfg = c;
2627   store = GNUNET_PSYCSTORE_connect (cfg);
2628   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2629   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2630   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2631   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2632   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2633   nc = GNUNET_SERVER_notification_context_create (server, 1);
2634   GNUNET_SERVER_add_handlers (server, server_handlers);
2635   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2636   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2637                                 &shutdown_task, NULL);
2638 }
2639
2640
2641 /**
2642  * The main function for the service.
2643  *
2644  * @param argc number of arguments from the command line
2645  * @param argv command line arguments
2646  * @return 0 ok, 1 on error
2647  */
2648 int
2649 main (int argc, char *const *argv)
2650 {
2651   return (GNUNET_OK ==
2652           GNUNET_SERVICE_run (argc, argv, "psyc",
2653                               GNUNET_SERVICE_OPTION_NONE,
2654                               &run, NULL)) ? 0 : 1;
2655 }
2656
2657 /* end of gnunet-service-psyc.c */