multicast/psyc/social: message acks & scheduling
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412 };
413
414
415 static void
416 transmit_message (struct Channel *chn);
417
418 static uint64_t
419 message_queue_run (struct Channel *chn);
420
421 static uint64_t
422 message_queue_drop (struct Channel *chn);
423
424
425 static void
426 schedule_transmit_message (void *cls,
427                            const struct GNUNET_SCHEDULER_TaskContext *tc)
428 {
429   struct Channel *chn = cls;
430   transmit_message (chn);
431 }
432
433
434 /**
435  * Task run during shutdown.
436  *
437  * @param cls unused
438  * @param tc unused
439  */
440 static void
441 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
442 {
443   if (NULL != nc)
444   {
445     GNUNET_SERVER_notification_context_destroy (nc);
446     nc = NULL;
447   }
448   if (NULL != stats)
449   {
450     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
451     stats = NULL;
452   }
453 }
454
455
456 static struct Operation *
457 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
458         uint64_t op_id, uint32_t flags)
459 {
460   struct Operation *op = GNUNET_malloc (sizeof (*op));
461   op->client = client;
462   op->chn = chn;
463   op->op_id = op_id;
464   op->flags = flags;
465   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
466   return op;
467 }
468
469
470 static void
471 op_remove (struct Operation *op)
472 {
473   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
474   GNUNET_free (op);
475 }
476
477
478 /**
479  * Clean up master data structures after a client disconnected.
480  */
481 static void
482 cleanup_master (struct Master *mst)
483 {
484   struct Channel *chn = &mst->chn;
485
486   if (NULL != mst->origin)
487     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
488   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
489   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
490 }
491
492
493 /**
494  * Clean up slave data structures after a client disconnected.
495  */
496 static void
497 cleanup_slave (struct Slave *slv)
498 {
499   struct Channel *chn = &slv->chn;
500   struct GNUNET_CONTAINER_MultiHashMap *
501     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
502                                                 &chn->pub_key_hash);
503   GNUNET_assert (NULL != chn_slv);
504   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
505
506   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
507   {
508     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
509                                           chn_slv);
510     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
511   }
512   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
513
514   if (NULL != slv->join_msg)
515   {
516     GNUNET_free (slv->join_msg);
517     slv->join_msg = NULL;
518   }
519   if (NULL != slv->relays)
520   {
521     GNUNET_free (slv->relays);
522     slv->relays = NULL;
523   }
524   if (NULL != slv->member)
525   {
526     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
527     slv->member = NULL;
528   }
529   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
530 }
531
532
533 /**
534  * Clean up channel data structures after a client disconnected.
535  */
536 static void
537 cleanup_channel (struct Channel *chn)
538 {
539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540               "%p Cleaning up channel %s. master? %u\n",
541               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
542   message_queue_drop (chn);
543   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
544   chn->recv_frags = NULL;
545
546   if (NULL != chn->store_op)
547   {
548     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
549     chn->store_op = NULL;
550   }
551
552   (GNUNET_YES == chn->is_master)
553     ? cleanup_master ((struct Master *) chn)
554     : cleanup_slave ((struct Slave *) chn);
555   GNUNET_free (chn);
556 }
557
558
559 /**
560  * Called whenever a client is disconnected.
561  * Frees our resources associated with that client.
562  *
563  * @param cls Closure.
564  * @param client Identification of the client.
565  */
566 static void
567 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
568 {
569   if (NULL == client)
570     return;
571
572   struct Channel *
573     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
574
575   if (NULL == chn)
576   {
577     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578                 "%p User context is NULL in client_disconnect()\n", chn);
579     GNUNET_break (0);
580     return;
581   }
582
583   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584               "%p Client (%s) disconnected from channel %s\n",
585               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
586               GNUNET_h2s (&chn->pub_key_hash));
587
588   struct Client *cli = chn->clients_head;
589   while (NULL != cli)
590   {
591     if (cli->client == client)
592     {
593       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
594       GNUNET_free (cli);
595       break;
596     }
597     cli = cli->next;
598   }
599
600   struct Operation *op = chn->op_head;
601   while (NULL != op)
602   {
603     if (op->client == client)
604     {
605       op->client = NULL;
606       break;
607     }
608     op = op->next;
609   }
610
611   if (NULL == chn->clients_head)
612   { /* Last client disconnected. */
613     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614                 "%p Last client (%s) disconnected from channel %s\n",
615                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
616                 GNUNET_h2s (&chn->pub_key_hash));
617     chn->is_disconnected = GNUNET_YES;
618     if (NULL != chn->tmit_head)
619     { /* Send pending messages to multicast before cleanup. */
620       transmit_message (chn);
621     }
622     else
623     {
624       cleanup_channel (chn);
625     }
626   }
627 }
628
629
630 /**
631  * Send message to all clients connected to the channel.
632  */
633 static void
634 client_send_msg (const struct Channel *chn,
635                  const struct GNUNET_MessageHeader *msg)
636 {
637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638               "%p Sending message to clients.\n", chn);
639
640   struct Client *cli = chn->clients_head;
641   while (NULL != cli)
642   {
643     GNUNET_SERVER_notification_context_add (nc, cli->client);
644     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
645     cli = cli->next;
646   }
647 }
648
649
650 /**
651  * Send a result code back to the client.
652  *
653  * @param client
654  *        Client that should receive the result code.
655  * @param result_code
656  *        Code to transmit.
657  * @param op_id
658  *        Operation ID in network byte order.
659  * @param data
660  *        Data payload or NULL.
661  * @param data_size
662  *        Size of @a data.
663  */
664 static void
665 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
666                     int64_t result_code, const void *data, uint16_t data_size)
667 {
668   struct GNUNET_OperationResultMessage *res;
669
670   res = GNUNET_malloc (sizeof (*res) + data_size);
671   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
672   res->header.size = htons (sizeof (*res) + data_size);
673   res->result_code = GNUNET_htonll (result_code);
674   res->op_id = op_id;
675   if (0 < data_size)
676     memcpy (&res[1], data, data_size);
677
678   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679               "%p Sending result to client for operation #%" PRIu64 ": "
680               "%" PRId64 " (size: %u)\n",
681               client, GNUNET_ntohll (op_id), result_code, data_size);
682
683   GNUNET_SERVER_notification_context_add (nc, client);
684   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
685                                               GNUNET_NO);
686   GNUNET_free (res);
687 }
688
689
690 /**
691  * Closure for join_mem_test_cb()
692  */
693 struct JoinMemTestClosure
694 {
695   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
696   struct Channel *chn;
697   struct GNUNET_MULTICAST_JoinHandle *jh;
698   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
699 };
700
701
702 /**
703  * Membership test result callback used for join requests.
704  */
705 static void
706 join_mem_test_cb (void *cls, int64_t result,
707                   const char *err_msg, uint16_t err_msg_size)
708 {
709   struct JoinMemTestClosure *jcls = cls;
710
711   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
712   { /* Pass on join request to client if this is a master channel */
713     struct Master *mst = (struct Master *) jcls->chn;
714     struct GNUNET_HashCode slave_key_hash;
715     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
716                         &slave_key_hash);
717     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
718                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
719     client_send_msg (jcls->chn, &jcls->join_msg->header);
720   }
721   else
722   {
723     if (GNUNET_SYSERR == result)
724     {
725       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
726                   "Could not perform membership test (%.*s)\n",
727                   err_msg_size, err_msg);
728     }
729     // FIXME: add relays
730     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
731   }
732   GNUNET_free (jcls->join_msg);
733   GNUNET_free (jcls);
734 }
735
736
737 /**
738  * Incoming join request from multicast.
739  */
740 static void
741 mcast_recv_join_request (void *cls,
742                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
743                          const struct GNUNET_MessageHeader *join_msg,
744                          struct GNUNET_MULTICAST_JoinHandle *jh)
745 {
746   struct Channel *chn = cls;
747   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
748
749   uint16_t join_msg_size = 0;
750   if (NULL != join_msg)
751   {
752     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
753     {
754       join_msg_size = ntohs (join_msg->size);
755     }
756     else
757     {
758       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
759                   "%p Got join message with invalid type %u.\n",
760                   chn, ntohs (join_msg->type));
761     }
762   }
763
764   struct GNUNET_PSYC_JoinRequestMessage *
765     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
766   req->header.size = htons (sizeof (*req) + join_msg_size);
767   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
768   req->slave_key = *slave_key;
769   if (0 < join_msg_size)
770     memcpy (&req[1], join_msg, join_msg_size);
771
772   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
773   jcls->slave_key = *slave_key;
774   jcls->chn = chn;
775   jcls->jh = jh;
776   jcls->join_msg = req;
777
778   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
779                                     chn->max_message_id, 0,
780                                     &join_mem_test_cb, jcls);
781 }
782
783
784 /**
785  * Join decision received from multicast.
786  */
787 static void
788 mcast_recv_join_decision (void *cls, int is_admitted,
789                           const struct GNUNET_PeerIdentity *peer,
790                           uint16_t relay_count,
791                           const struct GNUNET_PeerIdentity *relays,
792                           const struct GNUNET_MessageHeader *join_resp)
793 {
794   struct Slave *slv = cls;
795   struct Channel *chn = &slv->chn;
796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
797               "%p Got join decision: %d\n", slv, is_admitted);
798   if (GNUNET_YES == chn->is_ready)
799   {
800     /* Already admitted */
801     return;
802   }
803
804   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
805   struct GNUNET_PSYC_JoinDecisionMessage *
806     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
807   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
808   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
809   dcsn->is_admitted = htonl (is_admitted);
810   if (0 < join_resp_size)
811     memcpy (&dcsn[1], join_resp, join_resp_size);
812
813   client_send_msg (chn, &dcsn->header);
814
815   if (GNUNET_YES == is_admitted)
816   {
817     chn->is_ready = GNUNET_YES;
818   }
819 }
820
821
822 static int
823 store_recv_fragment_replay (void *cls,
824                             struct GNUNET_MULTICAST_MessageHeader *msg,
825                             enum GNUNET_PSYCSTORE_MessageFlags flags)
826 {
827   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
828
829   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
830   return GNUNET_YES;
831 }
832
833
834 /**
835  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
836  */
837 static void
838 store_recv_fragment_replay_result (void *cls, int64_t result,
839                                    const char *err_msg, uint16_t err_msg_size)
840 {
841   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
844               rh, result, err_msg_size, err_msg);
845
846   switch (result)
847   {
848   case GNUNET_YES:
849     break;
850
851   case GNUNET_NO:
852     GNUNET_MULTICAST_replay_response (rh, NULL,
853                                       GNUNET_MULTICAST_REC_NOT_FOUND);
854     break;
855
856   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
857     GNUNET_MULTICAST_replay_response (rh, NULL,
858                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
859     break;
860
861   case GNUNET_SYSERR:
862     GNUNET_MULTICAST_replay_response (rh, NULL,
863                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
864     break;
865   }
866   GNUNET_MULTICAST_replay_response_end (rh);
867 }
868
869
870 /**
871  * Incoming fragment replay request from multicast.
872  */
873 static void
874 mcast_recv_replay_fragment (void *cls,
875                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
876                             uint64_t fragment_id, uint64_t flags,
877                             struct GNUNET_MULTICAST_ReplayHandle *rh)
878
879 {
880   struct Channel *chn = cls;
881   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
882                                  fragment_id, fragment_id,
883                                  &store_recv_fragment_replay,
884                                  &store_recv_fragment_replay_result, rh);
885 }
886
887
888 /**
889  * Incoming message replay request from multicast.
890  */
891 static void
892 mcast_recv_replay_message (void *cls,
893                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
894                            uint64_t message_id,
895                            uint64_t fragment_offset,
896                            uint64_t flags,
897                            struct GNUNET_MULTICAST_ReplayHandle *rh)
898 {
899   struct Channel *chn = cls;
900   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
901                                 message_id, message_id, 1, NULL,
902                                 &store_recv_fragment_replay,
903                                 &store_recv_fragment_replay_result, rh);
904 }
905
906
907 /**
908  * Convert an uint64_t in network byte order to a HashCode
909  * that can be used as key in a MultiHashMap
910  */
911 static inline void
912 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
913 {
914   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
915   /* TODO: use built-in byte swap functions if available */
916
917   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
918   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
919
920   *key = (struct GNUNET_HashCode) {};
921   *((uint64_t *) key)
922     = (n << 32) | (n >> 32);
923 }
924
925
926 /**
927  * Convert an uint64_t in host byte order to a HashCode
928  * that can be used as key in a MultiHashMap
929  */
930 static inline void
931 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
932 {
933 #if __BYTE_ORDER == __BIG_ENDIAN
934   hash_key_from_nll (key, n);
935 #elif __BYTE_ORDER == __LITTLE_ENDIAN
936   *key = (struct GNUNET_HashCode) {};
937   *((uint64_t *) key) = n;
938 #else
939   #error byteorder undefined
940 #endif
941 }
942
943
944 /**
945  * Initialize PSYC message header.
946  */
947 static inline void
948 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
949                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
950 {
951   uint16_t size = ntohs (mmsg->header.size);
952   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
953
954   pmsg->header.size = htons (psize);
955   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
956   pmsg->message_id = mmsg->message_id;
957   pmsg->fragment_offset = mmsg->fragment_offset;
958   pmsg->flags = htonl (flags);
959
960   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
961 }
962
963
964 /**
965  * Create a new PSYC message from a multicast message for sending it to clients.
966  */
967 static inline struct GNUNET_PSYC_MessageHeader *
968 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
969 {
970   struct GNUNET_PSYC_MessageHeader *pmsg;
971   uint16_t size = ntohs (mmsg->header.size);
972   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
973
974   pmsg = GNUNET_malloc (psize);
975   psyc_msg_init (pmsg, mmsg, flags);
976   return pmsg;
977 }
978
979
980 /**
981  * Send multicast message to all clients connected to the channel.
982  */
983 static void
984 client_send_mcast_msg (struct Channel *chn,
985                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
986                        uint32_t flags)
987 {
988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989               "%p Sending multicast message to client. "
990               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
991               chn, GNUNET_ntohll (mmsg->fragment_id),
992               GNUNET_ntohll (mmsg->message_id));
993
994   struct GNUNET_PSYC_MessageHeader *
995     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
996   client_send_msg (chn, &pmsg->header);
997   GNUNET_free (pmsg);
998 }
999
1000
1001 /**
1002  * Send multicast request to all clients connected to the channel.
1003  */
1004 static void
1005 client_send_mcast_req (struct Master *mst,
1006                        const struct GNUNET_MULTICAST_RequestHeader *req)
1007 {
1008   struct Channel *chn = &mst->chn;
1009
1010   struct GNUNET_PSYC_MessageHeader *pmsg;
1011   uint16_t size = ntohs (req->header.size);
1012   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1013
1014   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1015               "%p Sending multicast request to client. "
1016               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1017               chn, GNUNET_ntohll (req->fragment_id),
1018               GNUNET_ntohll (req->request_id));
1019
1020   pmsg = GNUNET_malloc (psize);
1021   pmsg->header.size = htons (psize);
1022   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1023   pmsg->message_id = req->request_id;
1024   pmsg->fragment_offset = req->fragment_offset;
1025   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1026   pmsg->slave_key = req->member_key;
1027
1028   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1029   client_send_msg (chn, &pmsg->header);
1030   GNUNET_free (pmsg);
1031 }
1032
1033
1034 /**
1035  * Insert a multicast message fragment into the queue belonging to the message.
1036  *
1037  * @param chn          Channel.
1038  * @param mmsg         Multicast message fragment.
1039  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1040  * @param first_ptype  First PSYC message part type in @a mmsg.
1041  * @param last_ptype   Last PSYC message part type in @a mmsg.
1042  */
1043 static void
1044 fragment_queue_insert (struct Channel *chn,
1045                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1046                        uint16_t first_ptype, uint16_t last_ptype)
1047 {
1048   const uint16_t size = ntohs (mmsg->header.size);
1049   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1050   struct GNUNET_CONTAINER_MultiHashMap
1051     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1052                                                     &chn->pub_key_hash);
1053
1054   struct GNUNET_HashCode msg_id_hash;
1055   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1056
1057   struct FragmentQueue
1058     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1059
1060   if (NULL == fragq)
1061   {
1062     fragq = GNUNET_new (struct FragmentQueue);
1063     fragq->state = MSG_FRAG_STATE_HEADER;
1064     fragq->fragments
1065       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1066
1067     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1068                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1069
1070     if (NULL == chan_msgs)
1071     {
1072       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1073       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1074                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1075     }
1076   }
1077
1078   struct GNUNET_HashCode frag_id_hash;
1079   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1080   struct RecvCacheEntry
1081     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1082   if (NULL == cache_entry)
1083   {
1084     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085                 "%p Adding message fragment to cache. "
1086                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1087                 chn, GNUNET_ntohll (mmsg->message_id),
1088                 GNUNET_ntohll (mmsg->fragment_id));
1089     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090                 "%p header_size: %" PRIu64 " + %u\n",
1091                 chn, fragq->header_size, size);
1092     cache_entry = GNUNET_new (struct RecvCacheEntry);
1093     cache_entry->ref_count = 1;
1094     cache_entry->mmsg = GNUNET_malloc (size);
1095     memcpy (cache_entry->mmsg, mmsg, size);
1096     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1097                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1098   }
1099   else
1100   {
1101     cache_entry->ref_count++;
1102     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                 "%p Message fragment is already in cache. "
1104                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1105                 ", ref_count: %u\n",
1106                 chn, GNUNET_ntohll (mmsg->message_id),
1107                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1108   }
1109
1110   if (MSG_FRAG_STATE_HEADER == fragq->state)
1111   {
1112     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1113     {
1114       struct GNUNET_PSYC_MessageMethod *
1115         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1116       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1117       fragq->flags = ntohl (pmeth->flags);
1118     }
1119
1120     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1121     {
1122       fragq->header_size += size;
1123     }
1124     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1125              || frag_offset == fragq->header_size)
1126     { /* header is now complete */
1127       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1128                   "%p Header of message %" PRIu64 " is complete.\n",
1129                   chn, GNUNET_ntohll (mmsg->message_id));
1130
1131       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1132                   "%p Adding message %" PRIu64 " to queue.\n",
1133                   chn, GNUNET_ntohll (mmsg->message_id));
1134       fragq->state = MSG_FRAG_STATE_DATA;
1135     }
1136     else
1137     {
1138       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1139                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1140                   "%" PRIu64 " != %" PRIu64 "\n",
1141                   chn, GNUNET_ntohll (mmsg->message_id),
1142                   frag_offset, fragq->header_size);
1143     }
1144   }
1145
1146   switch (last_ptype)
1147   {
1148   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1149     if (frag_offset == fragq->size)
1150       fragq->state = MSG_FRAG_STATE_END;
1151     else
1152       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1153                   "%p Message %" PRIu64 " is NOT complete yet: "
1154                   "%" PRIu64 " != %" PRIu64 "\n",
1155                   chn, GNUNET_ntohll (mmsg->message_id),
1156                   frag_offset, fragq->size);
1157     break;
1158
1159   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1160     /* Drop message without delivering to client if it's a single fragment */
1161     fragq->state =
1162       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1163       ? MSG_FRAG_STATE_DROP
1164       : MSG_FRAG_STATE_CANCEL;
1165   }
1166
1167   switch (fragq->state)
1168   {
1169   case MSG_FRAG_STATE_DATA:
1170   case MSG_FRAG_STATE_END:
1171   case MSG_FRAG_STATE_CANCEL:
1172     if (GNUNET_NO == fragq->is_queued)
1173     {
1174       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1175                                     GNUNET_ntohll (mmsg->message_id));
1176       fragq->is_queued = GNUNET_YES;
1177     }
1178   }
1179
1180   fragq->size += size;
1181   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1182                                 GNUNET_ntohll (mmsg->fragment_id));
1183 }
1184
1185
1186 /**
1187  * Run fragment queue of a message.
1188  *
1189  * Send fragments of a message in order to client, after all modifiers arrived
1190  * from multicast.
1191  *
1192  * @param chn      Channel.
1193  * @param msg_id  ID of the message @a fragq belongs to.
1194  * @param fragq   Fragment queue of the message.
1195  * @param drop    Drop message without delivering to client?
1196  *                #GNUNET_YES or #GNUNET_NO.
1197  */
1198 static void
1199 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1200                     struct FragmentQueue *fragq, uint8_t drop)
1201 {
1202   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1203               "%p Running message fragment queue for message %" PRIu64
1204               " (state: %u).\n",
1205               chn, msg_id, fragq->state);
1206
1207   struct GNUNET_CONTAINER_MultiHashMap
1208     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1209                                                     &chn->pub_key_hash);
1210   GNUNET_assert (NULL != chan_msgs);
1211   uint64_t frag_id;
1212
1213   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1214                                                     &frag_id))
1215   {
1216     struct GNUNET_HashCode frag_id_hash;
1217     hash_key_from_hll (&frag_id_hash, frag_id);
1218     struct RecvCacheEntry *cache_entry
1219       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1220     if (cache_entry != NULL)
1221     {
1222       if (GNUNET_NO == drop)
1223       {
1224         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1225       }
1226       if (cache_entry->ref_count <= 1)
1227       {
1228         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1229                                               cache_entry);
1230         GNUNET_free (cache_entry->mmsg);
1231         GNUNET_free (cache_entry);
1232       }
1233       else
1234       {
1235         cache_entry->ref_count--;
1236       }
1237     }
1238 #if CACHE_AGING_IMPLEMENTED
1239     else if (GNUNET_NO == drop)
1240     {
1241       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1242     }
1243 #endif
1244
1245     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1246   }
1247
1248   if (MSG_FRAG_STATE_END <= fragq->state)
1249   {
1250     struct GNUNET_HashCode msg_id_hash;
1251     hash_key_from_hll (&msg_id_hash, msg_id);
1252
1253     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1254     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1255     GNUNET_free (fragq);
1256   }
1257   else
1258   {
1259     fragq->is_queued = GNUNET_NO;
1260   }
1261 }
1262
1263
1264 struct StateModifyClosure
1265 {
1266   struct Channel *chn;
1267   uint64_t msg_id;
1268   struct GNUNET_HashCode msg_id_hash;
1269 };
1270
1271
1272 void
1273 store_recv_state_modify_result (void *cls, int64_t result,
1274                                 const char *err_msg, uint16_t err_msg_size)
1275 {
1276   struct StateModifyClosure *mcls = cls;
1277   struct Channel *chn = mcls->chn;
1278   uint64_t msg_id = mcls->msg_id;
1279
1280   struct FragmentQueue *
1281     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1282
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1285               chn, result, err_msg_size, err_msg);
1286
1287   switch (result)
1288   {
1289   case GNUNET_OK:
1290   case GNUNET_NO:
1291     if (NULL != fragq)
1292       fragq->state_is_modified = GNUNET_YES;
1293     if (chn->max_state_message_id < msg_id)
1294       chn->max_state_message_id = msg_id;
1295     if (chn->max_message_id < msg_id)
1296       chn->max_message_id = msg_id;
1297
1298     if (NULL != fragq)
1299       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1300     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1301     message_queue_run (chn);
1302     break;
1303
1304   default:
1305     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1306                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1307                 chn, result, err_msg_size, err_msg);
1308     /** @todo FIXME: handle state_modify error */
1309   }
1310 }
1311
1312
1313 /**
1314  * Run message queue.
1315  *
1316  * Send messages in queue to client in order after a message has arrived from
1317  * multicast, according to the following:
1318  * - A message is only sent if all of its modifiers arrived.
1319  * - A stateful message is only sent if the previous stateful message
1320  *   has already been delivered to the client.
1321  *
1322  * @param chn  Channel.
1323  *
1324  * @return Number of messages removed from queue and sent to client.
1325  */
1326 static uint64_t
1327 message_queue_run (struct Channel *chn)
1328 {
1329   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1330               "%p Running message queue.\n", chn);
1331   uint64_t n = 0;
1332   uint64_t msg_id;
1333
1334   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1335                                                     &msg_id))
1336   {
1337     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1338                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1339     struct GNUNET_HashCode msg_id_hash;
1340     hash_key_from_hll (&msg_id_hash, msg_id);
1341
1342     struct FragmentQueue *
1343       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1344
1345     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1346     {
1347       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1348                   "%p No fragq (%p) or header not complete.\n",
1349                   chn, fragq);
1350       break;
1351     }
1352
1353     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354                 "%p Fragment queue entry:  state: %u, state delta: "
1355                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1356                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1357
1358     if (MSG_FRAG_STATE_DATA <= fragq->state)
1359     {
1360       /* Check if there's a missing message before the current one */
1361       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1362       {
1363         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1364
1365         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1366             && (chn->max_message_id != msg_id - 1
1367                 && chn->max_message_id != msg_id))
1368         {
1369           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1370                       "%p Out of order message. "
1371                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1372                       chn, chn->max_message_id, msg_id);
1373           break;
1374           // FIXME: keep track of messages processed in this queue run,
1375           //        and only stop after reaching the end
1376         }
1377       }
1378       else
1379       {
1380         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1381         if (GNUNET_YES != fragq->state_is_modified)
1382         {
1383           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1384           {
1385             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1386                         "%p Out of order stateful message. "
1387                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1388                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1389             break;
1390             // FIXME: keep track of messages processed in this queue run,
1391             //        and only stop after reaching the end
1392           }
1393
1394           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1395           mcls->chn = chn;
1396           mcls->msg_id = msg_id;
1397           mcls->msg_id_hash = msg_id_hash;
1398
1399           /* Apply modifiers to state in PSYCstore */
1400           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1401                                          fragq->state_delta,
1402                                          store_recv_state_modify_result, mcls);
1403           break; // continue after asynchronous state modify result
1404         }
1405       }
1406       chn->max_message_id = msg_id;
1407     }
1408     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1409     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1410     n++;
1411   }
1412
1413   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1414               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1415   return n;
1416 }
1417
1418
1419 /**
1420  * Drop message queue of a channel.
1421  *
1422  * Remove all messages in queue without sending it to clients.
1423  *
1424  * @param chn  Channel.
1425  *
1426  * @return Number of messages removed from queue.
1427  */
1428 static uint64_t
1429 message_queue_drop (struct Channel *chn)
1430 {
1431   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1432               "%p Dropping message queue.\n", chn);
1433   uint64_t n = 0;
1434   uint64_t msg_id;
1435   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1436                                                     &msg_id))
1437   {
1438     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1439                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1440     struct GNUNET_HashCode msg_id_hash;
1441     hash_key_from_hll (&msg_id_hash, msg_id);
1442
1443     struct FragmentQueue *
1444       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1445     GNUNET_assert (NULL != fragq);
1446     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1447     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1448     n++;
1449   }
1450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1451               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1452   return n;
1453 }
1454
1455
1456 /**
1457  * Received result of GNUNET_PSYCSTORE_fragment_store().
1458  */
1459 static void
1460 store_recv_fragment_store_result (void *cls, int64_t result,
1461                                   const char *err_msg, uint16_t err_msg_size)
1462 {
1463   struct Channel *chn = cls;
1464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1465               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1466               chn, result, err_msg_size, err_msg);
1467 }
1468
1469
1470 /**
1471  * Handle incoming message fragment from multicast.
1472  *
1473  * Store it using PSYCstore and send it to the clients of the channel in order.
1474  */
1475 static void
1476 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1477 {
1478   struct Channel *chn = cls;
1479   uint16_t size = ntohs (mmsg->header.size);
1480
1481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1482               "%p Received multicast message of size %u. "
1483               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1484               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1485               chn, size,
1486               GNUNET_ntohll (mmsg->fragment_id),
1487               GNUNET_ntohll (mmsg->message_id),
1488               GNUNET_ntohll (mmsg->fragment_offset),
1489               GNUNET_ntohll (mmsg->flags));
1490
1491   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1492                                    &store_recv_fragment_store_result, chn);
1493
1494   uint16_t first_ptype = 0, last_ptype = 0;
1495   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1496                                                (const char *) &mmsg[1],
1497                                                &first_ptype, &last_ptype);
1498   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1499               "%p Message check result %d, first part type %u, last part type %u\n",
1500               chn, check, first_ptype, last_ptype);
1501   if (GNUNET_SYSERR == check)
1502   {
1503     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1504                 "%p Dropping incoming multicast message with invalid parts.\n",
1505                 chn);
1506     GNUNET_break_op (0);
1507     return;
1508   }
1509
1510   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1511   message_queue_run (chn);
1512 }
1513
1514
1515 /**
1516  * Incoming request fragment from multicast for a master.
1517  *
1518  * @param cls   Master.
1519  * @param req   The request.
1520  */
1521 static void
1522 mcast_recv_request (void *cls,
1523                     const struct GNUNET_MULTICAST_RequestHeader *req)
1524 {
1525   struct Master *mst = cls;
1526   uint16_t size = ntohs (req->header.size);
1527
1528   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_key);
1529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530               "%p Received multicast request of size %u from %s.\n",
1531               mst, size, str);
1532   GNUNET_free (str);
1533
1534   uint16_t first_ptype = 0, last_ptype = 0;
1535   if (GNUNET_SYSERR
1536       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1537                                           (const char *) &req[1],
1538                                           &first_ptype, &last_ptype))
1539   {
1540     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1541                 "%p Dropping incoming multicast request with invalid parts.\n",
1542                 mst);
1543     GNUNET_break_op (0);
1544     return;
1545   }
1546
1547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1548               "Message parts: first: type %u, last: type %u\n",
1549               first_ptype, last_ptype);
1550
1551   /* FIXME: in-order delivery */
1552   client_send_mcast_req (mst, req);
1553 }
1554
1555
1556 /**
1557  * Response from PSYCstore with the current counter values for a channel master.
1558  */
1559 static void
1560 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1561                             uint64_t max_message_id, uint64_t max_group_generation,
1562                             uint64_t max_state_message_id)
1563 {
1564   struct Master *mst = cls;
1565   struct Channel *chn = &mst->chn;
1566   chn->store_op = NULL;
1567
1568   struct GNUNET_PSYC_CountersResultMessage res;
1569   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1570   res.header.size = htons (sizeof (res));
1571   res.result_code = htonl (result);
1572   res.max_message_id = GNUNET_htonll (max_message_id);
1573
1574   if (GNUNET_OK == result || GNUNET_NO == result)
1575   {
1576     mst->max_message_id = max_message_id;
1577     chn->max_message_id = max_message_id;
1578     chn->max_state_message_id = max_state_message_id;
1579     mst->max_group_generation = max_group_generation;
1580     mst->origin
1581       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1582                                        mcast_recv_join_request,
1583                                        mcast_recv_replay_fragment,
1584                                        mcast_recv_replay_message,
1585                                        mcast_recv_request,
1586                                        mcast_recv_message, chn);
1587     chn->is_ready = GNUNET_YES;
1588   }
1589   else
1590   {
1591     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1592                 "%p GNUNET_PSYCSTORE_counters_get() "
1593                 "returned %d for channel %s.\n",
1594                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1595   }
1596
1597   client_send_msg (chn, &res.header);
1598 }
1599
1600
1601 /**
1602  * Response from PSYCstore with the current counter values for a channel slave.
1603  */
1604 void
1605 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1606                            uint64_t max_message_id, uint64_t max_group_generation,
1607                            uint64_t max_state_message_id)
1608 {
1609   struct Slave *slv = cls;
1610   struct Channel *chn = &slv->chn;
1611   chn->store_op = NULL;
1612
1613   struct GNUNET_PSYC_CountersResultMessage res;
1614   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1615   res.header.size = htons (sizeof (res));
1616   res.result_code = htonl (result);
1617   res.max_message_id = GNUNET_htonll (max_message_id);
1618
1619   if (GNUNET_OK == result || GNUNET_NO == result)
1620   {
1621     chn->max_message_id = max_message_id;
1622     chn->max_state_message_id = max_state_message_id;
1623     slv->member
1624       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1625                                       &slv->origin,
1626                                       slv->relay_count, slv->relays,
1627                                       &slv->join_msg->header,
1628                                       mcast_recv_join_request,
1629                                       mcast_recv_join_decision,
1630                                       mcast_recv_replay_fragment,
1631                                       mcast_recv_replay_message,
1632                                       mcast_recv_message, chn);
1633     if (NULL != slv->join_msg)
1634     {
1635       GNUNET_free (slv->join_msg);
1636       slv->join_msg = NULL;
1637     }
1638   }
1639   else
1640   {
1641     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1642                 "%p GNUNET_PSYCSTORE_counters_get() "
1643                 "returned %d for channel %s.\n",
1644                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1645   }
1646
1647   client_send_msg (chn, &res.header);
1648 }
1649
1650
1651 static void
1652 channel_init (struct Channel *chn)
1653 {
1654   chn->recv_msgs
1655     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1656   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1657 }
1658
1659
1660 /**
1661  * Handle a connecting client starting a channel master.
1662  */
1663 static void
1664 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1665                           const struct GNUNET_MessageHeader *msg)
1666 {
1667   const struct MasterStartRequest *req
1668     = (const struct MasterStartRequest *) msg;
1669
1670   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1671   struct GNUNET_HashCode pub_key_hash;
1672
1673   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1674   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1675
1676   struct Master *
1677     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1678   struct Channel *chn;
1679
1680   if (NULL == mst)
1681   {
1682     mst = GNUNET_new (struct Master);
1683     mst->policy = ntohl (req->policy);
1684     mst->priv_key = req->channel_key;
1685     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1686
1687     chn = &mst->chn;
1688     chn->is_master = GNUNET_YES;
1689     chn->pub_key = pub_key;
1690     chn->pub_key_hash = pub_key_hash;
1691     channel_init (chn);
1692
1693     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1694                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1695     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1696                                                    store_recv_master_counters, mst);
1697   }
1698   else
1699   {
1700     chn = &mst->chn;
1701
1702     struct GNUNET_PSYC_CountersResultMessage res;
1703     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1704     res.header.size = htons (sizeof (res));
1705     res.result_code = htonl (GNUNET_OK);
1706     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1707
1708     GNUNET_SERVER_notification_context_add (nc, client);
1709     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1710                                                 GNUNET_NO);
1711   }
1712
1713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714               "%p Client connected as master to channel %s.\n",
1715               mst, GNUNET_h2s (&chn->pub_key_hash));
1716
1717   struct Client *cli = GNUNET_new (struct Client);
1718   cli->client = client;
1719   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1720
1721   GNUNET_SERVER_client_set_user_context (client, chn);
1722   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1723 }
1724
1725
1726 /**
1727  * Handle a connecting client joining as a channel slave.
1728  */
1729 static void
1730 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1731                         const struct GNUNET_MessageHeader *msg)
1732 {
1733   const struct SlaveJoinRequest *req
1734     = (const struct SlaveJoinRequest *) msg;
1735   uint16_t req_size = ntohs (req->header.size);
1736
1737   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1738   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1739
1740   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1741   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1742   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1743
1744   struct GNUNET_CONTAINER_MultiHashMap *
1745     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1746   struct Slave *slv = NULL;
1747   struct Channel *chn;
1748
1749   if (NULL != chn_slv)
1750   {
1751     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1752   }
1753   if (NULL == slv)
1754   {
1755     slv = GNUNET_new (struct Slave);
1756     slv->priv_key = req->slave_key;
1757     slv->pub_key = slv_pub_key;
1758     slv->pub_key_hash = slv_pub_key_hash;
1759     slv->origin = req->origin;
1760     slv->relay_count = ntohl (req->relay_count);
1761
1762     const struct GNUNET_PeerIdentity *
1763       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1764     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1765     uint16_t join_msg_size = 0;
1766
1767     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1768         <= req_size)
1769     {
1770       struct GNUNET_PSYC_Message *
1771         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1772       join_msg_size = ntohs (join_msg->header.size);
1773       slv->join_msg = GNUNET_malloc (join_msg_size);
1774       memcpy (slv->join_msg, join_msg, join_msg_size);
1775     }
1776     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1777     {
1778       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1779                   "%u + %u + %u != %u\n",
1780                   sizeof (*req), relay_size, join_msg_size, req_size);
1781       GNUNET_break (0);
1782       GNUNET_SERVER_client_disconnect (client);
1783       GNUNET_free (slv);
1784       return;
1785     }
1786     if (0 < slv->relay_count)
1787     {
1788       slv->relays = GNUNET_malloc (relay_size);
1789       memcpy (slv->relays, &req[1], relay_size);
1790     }
1791
1792     chn = &slv->chn;
1793     chn->is_master = GNUNET_NO;
1794     chn->pub_key = req->channel_key;
1795     chn->pub_key_hash = pub_key_hash;
1796     channel_init (chn);
1797
1798     if (NULL == chn_slv)
1799     {
1800       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1801       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1802                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1803     }
1804     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1805                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1806     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1807                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1808     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1809                                                   &store_recv_slave_counters, slv);
1810   }
1811   else
1812   {
1813     chn = &slv->chn;
1814
1815     struct GNUNET_PSYC_CountersResultMessage res;
1816     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1817     res.header.size = htons (sizeof (res));
1818     res.result_code = htonl (GNUNET_OK);
1819     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1820
1821     GNUNET_SERVER_notification_context_add (nc, client);
1822     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1823                                                 GNUNET_NO);
1824
1825     if (NULL == slv->member)
1826     {
1827       slv->member
1828         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1829                                         &slv->origin,
1830                                         slv->relay_count, slv->relays,
1831                                         &slv->join_msg->header,
1832                                         &mcast_recv_join_request,
1833                                         &mcast_recv_join_decision,
1834                                         &mcast_recv_replay_fragment,
1835                                         &mcast_recv_replay_message,
1836                                         &mcast_recv_message, chn);
1837       if (NULL != slv->join_msg)
1838       {
1839         GNUNET_free (slv->join_msg);
1840         slv->join_msg = NULL;
1841       }
1842     }
1843     else if (NULL != slv->join_dcsn)
1844     {
1845       GNUNET_SERVER_notification_context_add (nc, client);
1846       GNUNET_SERVER_notification_context_unicast (nc, client,
1847                                                   &slv->join_dcsn->header,
1848                                                   GNUNET_NO);
1849     }
1850   }
1851
1852   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1853               "%p Client connected as slave to channel %s.\n",
1854               slv, GNUNET_h2s (&chn->pub_key_hash));
1855
1856   struct Client *cli = GNUNET_new (struct Client);
1857   cli->client = client;
1858   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1859
1860   GNUNET_SERVER_client_set_user_context (client, chn);
1861   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1862 }
1863
1864
1865 struct JoinDecisionClosure
1866 {
1867   int32_t is_admitted;
1868   struct GNUNET_MessageHeader *msg;
1869 };
1870
1871
1872 /**
1873  * Iterator callback for sending join decisions to multicast.
1874  */
1875 static int
1876 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1877                           void *value)
1878 {
1879   struct JoinDecisionClosure *jcls = cls;
1880   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1881   // FIXME: add relays
1882   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1883   return GNUNET_YES;
1884 }
1885
1886
1887 /**
1888  * Join decision from client.
1889  */
1890 static void
1891 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1892                            const struct GNUNET_MessageHeader *msg)
1893 {
1894   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1895     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1896   struct Channel *chn;
1897   struct Master *mst;
1898   struct JoinDecisionClosure jcls;
1899
1900   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1901   if (NULL == chn)
1902   {
1903     GNUNET_break (0);
1904     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1905     return;
1906   }
1907   GNUNET_assert (GNUNET_YES == chn->is_master);
1908   mst = (struct Master *) chn;
1909   jcls.is_admitted = ntohl (dcsn->is_admitted);
1910   jcls.msg
1911     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1912     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1913     : NULL;
1914
1915   struct GNUNET_HashCode slave_key_hash;
1916   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1917                       &slave_key_hash);
1918
1919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1920               "%p Got join decision (%d) from client for channel %s..\n",
1921               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1922   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1923               "%p ..and slave %s.\n",
1924               mst, GNUNET_h2s (&slave_key_hash));
1925
1926   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1927                                               &mcast_send_join_decision, &jcls);
1928   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1929   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1930 }
1931
1932
1933 /**
1934  * Send acknowledgement to a client.
1935  *
1936  * Sent after a message fragment has been passed on to multicast.
1937  *
1938  * @param chn The channel struct for the client.
1939  */
1940 static void
1941 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1942 {
1943   struct GNUNET_MessageHeader res;
1944   res.size = htons (sizeof (res));
1945   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1946
1947   /* FIXME */
1948   GNUNET_SERVER_notification_context_add (nc, client);
1949   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1950 }
1951
1952
1953 /**
1954  * Callback for the transmit functions of multicast.
1955  */
1956 static int
1957 transmit_notify (void *cls, size_t *data_size, void *data)
1958 {
1959   struct Channel *chn = cls;
1960   struct TransmitMessage *tmit_msg = chn->tmit_head;
1961
1962   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1963   {
1964     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1965                 "%p transmit_notify: nothing to send.\n", chn);
1966     if (NULL != tmit_msg && *data_size < tmit_msg->size)
1967       GNUNET_break (0);
1968     *data_size = 0;
1969     return GNUNET_NO;
1970   }
1971
1972   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1974
1975   *data_size = tmit_msg->size;
1976   memcpy (data, &tmit_msg[1], *data_size);
1977
1978   int ret
1979     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1980     ? GNUNET_NO
1981     : GNUNET_YES;
1982
1983   /* FIXME: handle disconnecting clients */
1984   if (NULL != tmit_msg->client)
1985     send_message_ack (chn, tmit_msg->client);
1986
1987   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1988   GNUNET_free (tmit_msg);
1989
1990   if (NULL != chn->tmit_head)
1991   {
1992     GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
1993   }
1994   else if (GNUNET_YES == chn->is_disconnected
1995            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1996   {
1997     /* FIXME: handle partial message (when still in_transmit) */
1998     return GNUNET_SYSERR;
1999   }
2000   return ret;
2001 }
2002
2003
2004 /**
2005  * Callback for the transmit functions of multicast.
2006  */
2007 static int
2008 master_transmit_notify (void *cls, size_t *data_size, void *data)
2009 {
2010   int ret = transmit_notify (cls, data_size, data);
2011
2012   if (GNUNET_YES == ret)
2013   {
2014     struct Master *mst = cls;
2015     mst->tmit_handle = NULL;
2016   }
2017   return ret;
2018 }
2019
2020
2021 /**
2022  * Callback for the transmit functions of multicast.
2023  */
2024 static int
2025 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2026 {
2027   int ret = transmit_notify (cls, data_size, data);
2028
2029   if (GNUNET_YES == ret)
2030   {
2031     struct Slave *slv = cls;
2032     slv->tmit_handle = NULL;
2033   }
2034   return ret;
2035 }
2036
2037
2038 /**
2039  * Transmit a message from a channel master to the multicast group.
2040  */
2041 static void
2042 master_transmit_message (struct Master *mst)
2043 {
2044   if (NULL == mst->chn.tmit_head)
2045     return;
2046   if (NULL == mst->tmit_handle)
2047   {
2048     mst->tmit_handle
2049       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
2050                                         mst->max_group_generation,
2051                                         master_transmit_notify, mst);
2052   }
2053   else
2054   {
2055     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2056   }
2057 }
2058
2059
2060 /**
2061  * Transmit a message from a channel slave to the multicast group.
2062  */
2063 static void
2064 slave_transmit_message (struct Slave *slv)
2065 {
2066   if (NULL == slv->chn.tmit_head)
2067     return;
2068   if (NULL == slv->tmit_handle)
2069   {
2070     slv->tmit_handle
2071       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2072                                            slave_transmit_notify, slv);
2073   }
2074   else
2075   {
2076     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2077   }
2078 }
2079
2080
2081 static void
2082 transmit_message (struct Channel *chn)
2083 {
2084   chn->is_master
2085     ? master_transmit_message ((struct Master *) chn)
2086     : slave_transmit_message ((struct Slave *) chn);
2087 }
2088
2089
2090 /**
2091  * Queue a message from a channel master for sending to the multicast group.
2092  */
2093 static void
2094 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2095 {
2096   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2097
2098   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2099   {
2100     tmit_msg->id = ++mst->max_message_id;
2101     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2102                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2103                 mst, tmit_msg->id);
2104     struct GNUNET_PSYC_MessageMethod *pmeth
2105       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2106
2107     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2108     {
2109       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2110     }
2111     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2112     {
2113       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2114                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2115                   mst, tmit_msg->id - mst->max_state_message_id);
2116       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2117                                           - mst->max_state_message_id);
2118       mst->max_state_message_id = tmit_msg->id;
2119     }
2120     else
2121     {
2122         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2123                     "%p master_queue_message: state not modified\n", mst);
2124       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2125     }
2126
2127     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2128     {
2129       /// @todo add state_hash to PSYC header
2130     }
2131   }
2132 }
2133
2134
2135 /**
2136  * Queue a message from a channel slave for sending to the multicast group.
2137  */
2138 static void
2139 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2140 {
2141   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2142   {
2143     struct GNUNET_PSYC_MessageMethod *pmeth
2144       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2145     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2146     tmit_msg->id = ++slv->max_request_id;
2147   }
2148 }
2149
2150
2151 /**
2152  * Queue PSYC message parts for sending to multicast.
2153  *
2154  * @param chn           Channel to send to.
2155  * @param client       Client the message originates from.
2156  * @param data_size    Size of @a data.
2157  * @param data         Concatenated message parts.
2158  * @param first_ptype  First message part type in @a data.
2159  * @param last_ptype   Last message part type in @a data.
2160  */
2161 static struct TransmitMessage *
2162 queue_message (struct Channel *chn,
2163                struct GNUNET_SERVER_Client *client,
2164                size_t data_size,
2165                const void *data,
2166                uint16_t first_ptype, uint16_t last_ptype)
2167 {
2168   struct TransmitMessage *
2169     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2170   memcpy (&tmit_msg[1], data, data_size);
2171   tmit_msg->client = client;
2172   tmit_msg->size = data_size;
2173   tmit_msg->first_ptype = first_ptype;
2174   tmit_msg->last_ptype = last_ptype;
2175
2176   /* FIXME: separate queue per message ID */
2177
2178   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2179
2180   chn->is_master
2181     ? master_queue_message ((struct Master *) chn, tmit_msg)
2182     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2183   return tmit_msg;
2184 }
2185
2186
2187 /**
2188  * Cancel transmission of current message.
2189  *
2190  * @param chn     Channel to send to.
2191  * @param client  Client the message originates from.
2192  */
2193 static void
2194 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2195 {
2196   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2197
2198   struct GNUNET_MessageHeader msg;
2199   msg.size = htons (sizeof (msg));
2200   msg.type = htons (type);
2201
2202   queue_message (chn, client, sizeof (msg), &msg, type, type);
2203   transmit_message (chn);
2204
2205   /* FIXME: cleanup */
2206 }
2207
2208
2209 /**
2210  * Incoming message from a master or slave client.
2211  */
2212 static void
2213 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2214                           const struct GNUNET_MessageHeader *msg)
2215 {
2216   struct Channel *
2217     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2218   GNUNET_assert (NULL != chn);
2219
2220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2221               "%p Received message from client.\n", chn);
2222   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2223
2224   if (GNUNET_YES != chn->is_ready)
2225   {
2226     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2227                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2228     GNUNET_break (0);
2229     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2230     return;
2231   }
2232
2233   uint16_t size = ntohs (msg->size);
2234   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2235   {
2236     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2237                 "%p Message payload too large: %u < %u.\n",
2238                 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2239     GNUNET_break (0);
2240     transmit_cancel (chn, client);
2241     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2242     return;
2243   }
2244
2245   uint16_t first_ptype = 0, last_ptype = 0;
2246   if (GNUNET_SYSERR
2247       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2248                                           (const char *) &msg[1],
2249                                           &first_ptype, &last_ptype))
2250   {
2251     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2252                 "%p Received invalid message part from client.\n", chn);
2253     GNUNET_break (0);
2254     transmit_cancel (chn, client);
2255     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2256     return;
2257   }
2258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2259               "%p Received message with first part type %u and last part type %u.\n",
2260               chn, first_ptype, last_ptype);
2261
2262   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2263                  first_ptype, last_ptype);
2264   transmit_message (chn);
2265   /* FIXME: send a few ACKs even before transmit_notify is called */
2266
2267   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2268 };
2269
2270
2271 /**
2272  * Received result of GNUNET_PSYCSTORE_membership_store()
2273  */
2274 static void
2275 store_recv_membership_store_result (void *cls, int64_t result,
2276                                     const char *err_msg, uint16_t err_msg_size)
2277 {
2278   struct Operation *op = cls;
2279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2280               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2281               op->chn, result, err_msg_size, err_msg);
2282
2283   if (NULL != op->client)
2284     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2285   op_remove (op);
2286 }
2287
2288
2289 /**
2290  * Client requests to add/remove a slave in the membership database.
2291  */
2292 static void
2293 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2294                               const struct GNUNET_MessageHeader *msg)
2295 {
2296   struct Channel *
2297     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2298   GNUNET_assert (NULL != chn);
2299
2300   const struct ChannelMembershipStoreRequest *
2301     req = (const struct ChannelMembershipStoreRequest *) msg;
2302
2303   struct Operation *op = op_add (chn, client, req->op_id, 0);
2304
2305   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2306   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308               "%p Received membership store request from client.\n", chn);
2309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2310               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2311               chn, req->did_join, announced_at, effective_since);
2312
2313   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2314                                      req->did_join, announced_at, effective_since,
2315                                      0, /* FIXME: group_generation */
2316                                      &store_recv_membership_store_result, op);
2317   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2318 }
2319
2320
2321 /**
2322  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2323  * in response to a history request from a client.
2324  */
2325 static int
2326 store_recv_fragment_history (void *cls,
2327                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2328                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2329 {
2330   struct Operation *op = cls;
2331   if (NULL == op->client)
2332   { /* Requesting client already disconnected. */
2333     return GNUNET_NO;
2334   }
2335   struct Channel *chn = op->chn;
2336
2337   struct GNUNET_PSYC_MessageHeader *pmsg;
2338   uint16_t msize = ntohs (mmsg->header.size);
2339   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2340
2341   struct GNUNET_OperationResultMessage *
2342     res = GNUNET_malloc (sizeof (*res) + psize);
2343   res->header.size = htons (sizeof (*res) + psize);
2344   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2345   res->op_id = op->op_id;
2346   res->result_code = GNUNET_htonll (GNUNET_OK);
2347
2348   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2349   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2350   memcpy (&res[1], pmsg, psize);
2351
2352   /** @todo FIXME: send only to requesting client */
2353   client_send_msg (chn, &res->header);
2354   return GNUNET_YES;
2355 }
2356
2357
2358 /**
2359  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2360  * in response to a history request from a client.
2361  */
2362 static void
2363 store_recv_fragment_history_result (void *cls, int64_t result,
2364                                     const char *err_msg, uint16_t err_msg_size)
2365 {
2366   struct Operation *op = cls;
2367   if (NULL == op->client)
2368   { /* Requesting client already disconnected. */
2369     return;
2370   }
2371
2372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2373               "%p History replay #%" PRIu64 ": "
2374               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2375               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2376
2377   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2378   {
2379     /** @todo Multicast replay request for messages not found locally. */
2380   }
2381
2382   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2383   op_remove (op);
2384 }
2385
2386
2387 /**
2388  * Client requests channel history.
2389  */
2390 static void
2391 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2392                             const struct GNUNET_MessageHeader *msg)
2393 {
2394   struct Channel *
2395     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2396   GNUNET_assert (NULL != chn);
2397
2398   const struct GNUNET_PSYC_HistoryRequestMessage *
2399     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2400   uint16_t size = ntohs (msg->size);
2401   const char *method_prefix = (const char *) &req[1];
2402
2403   if (size < sizeof (*req) + 1
2404       || '\0' != method_prefix[size - sizeof (*req) - 1])
2405   {
2406     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2407                 "%p History replay #%" PRIu64 ": "
2408                 "invalid method prefix. size: %u < %u?\n",
2409                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2410     GNUNET_break (0);
2411     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2412     return;
2413   }
2414
2415   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2416
2417   if (0 == req->message_limit)
2418     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2419                                   GNUNET_ntohll (req->start_message_id),
2420                                   GNUNET_ntohll (req->end_message_id),
2421                                   0, method_prefix,
2422                                   &store_recv_fragment_history,
2423                                   &store_recv_fragment_history_result, op);
2424   else
2425     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2426                                          GNUNET_ntohll (req->message_limit),
2427                                          method_prefix,
2428                                          &store_recv_fragment_history,
2429                                          &store_recv_fragment_history_result,
2430                                          op);
2431
2432   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2433 }
2434
2435
2436 /**
2437  * Received state var from PSYCstore, send it to client.
2438  */
2439 static int
2440 store_recv_state_var (void *cls, const char *name,
2441                       const void *value, uint32_t value_size)
2442 {
2443   struct Operation *op = cls;
2444   struct GNUNET_OperationResultMessage *res;
2445
2446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2447               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2448               op->chn, GNUNET_ntohll (op->op_id), name);
2449
2450   if (NULL != name) /* First part */
2451   {
2452     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2453     struct GNUNET_PSYC_MessageModifier *mod;
2454     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2455     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2456     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2457     res->op_id = op->op_id;
2458
2459     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2460     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2461     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2462     mod->name_size = htons (name_size);
2463     mod->value_size = htonl (value_size);
2464     mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2465     memcpy (&mod[1], name, name_size);
2466     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2467   }
2468   else /* Continuation */
2469   {
2470     struct GNUNET_MessageHeader *mod;
2471     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2472     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2473     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2474     res->op_id = op->op_id;
2475
2476     mod = (struct GNUNET_MessageHeader *) &res[1];
2477     mod->size = htons (sizeof (*mod) + value_size);
2478     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2479     memcpy (&mod[1], value, value_size);
2480   }
2481
2482   // FIXME: client might have been disconnected
2483   GNUNET_SERVER_notification_context_add (nc, op->client);
2484   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2485                                               GNUNET_NO);
2486   return GNUNET_YES;
2487 }
2488
2489
2490 /**
2491  * Received result of GNUNET_PSYCSTORE_state_get()
2492  * or GNUNET_PSYCSTORE_state_get_prefix()
2493  */
2494 static void
2495 store_recv_state_result (void *cls, int64_t result,
2496                          const char *err_msg, uint16_t err_msg_size)
2497 {
2498   struct Operation *op = cls;
2499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2500               "%p state_get #%" PRIu64 ": "
2501               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2502               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2503
2504   // FIXME: client might have been disconnected
2505   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2506   op_remove (op);
2507 }
2508
2509
2510 /**
2511  * Client requests best matching state variable from PSYCstore.
2512  */
2513 static void
2514 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2515                        const struct GNUNET_MessageHeader *msg)
2516 {
2517   struct Channel *
2518     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2519   GNUNET_assert (NULL != chn);
2520
2521   const struct StateRequest *
2522     req = (const struct StateRequest *) msg;
2523
2524   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2525   const char *name = (const char *) &req[1];
2526   if (0 == name_size || '\0' != name[name_size - 1])
2527   {
2528     GNUNET_break (0);
2529     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2530     return;
2531   }
2532
2533   struct Operation *op = op_add (chn, client, req->op_id, 0);
2534   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2535                               &store_recv_state_var,
2536                               &store_recv_state_result, op);
2537   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2538 }
2539
2540
2541 /**
2542  * Client requests state variables with a given prefix from PSYCstore.
2543  */
2544 static void
2545 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2546                               const struct GNUNET_MessageHeader *msg)
2547 {
2548   struct Channel *
2549     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2550   GNUNET_assert (NULL != chn);
2551
2552   const struct StateRequest *
2553     req = (const struct StateRequest *) msg;
2554
2555   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2556   const char *name = (const char *) &req[1];
2557   if (0 == name_size || '\0' != name[name_size - 1])
2558   {
2559     GNUNET_break (0);
2560     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2561     return;
2562   }
2563
2564   struct Operation *op = op_add (chn, client, req->op_id, 0);
2565   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2566                                      &store_recv_state_var,
2567                                      &store_recv_state_result, op);
2568   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2569 }
2570
2571
2572 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2573   { &client_recv_master_start, NULL,
2574     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2575
2576   { &client_recv_slave_join, NULL,
2577     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2578
2579   { &client_recv_join_decision, NULL,
2580     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2581
2582   { &client_recv_psyc_message, NULL,
2583     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2584
2585   { &client_recv_membership_store, NULL,
2586     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2587
2588   { &client_recv_history_replay, NULL,
2589     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2590
2591   { &client_recv_state_get, NULL,
2592     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2593
2594   { &client_recv_state_get_prefix, NULL,
2595     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2596
2597   { NULL, NULL, 0, 0 }
2598 };
2599
2600
2601 /**
2602  * Initialize the PSYC service.
2603  *
2604  * @param cls Closure.
2605  * @param server The initialized server.
2606  * @param c Configuration to use.
2607  */
2608 static void
2609 run (void *cls, struct GNUNET_SERVER_Handle *server,
2610      const struct GNUNET_CONFIGURATION_Handle *c)
2611 {
2612   cfg = c;
2613   store = GNUNET_PSYCSTORE_connect (cfg);
2614   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2615   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2616   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2617   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2618   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2619   nc = GNUNET_SERVER_notification_context_create (server, 1);
2620   GNUNET_SERVER_add_handlers (server, server_handlers);
2621   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2622   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2623                                 &shutdown_task, NULL);
2624 }
2625
2626
2627 /**
2628  * The main function for the service.
2629  *
2630  * @param argc number of arguments from the command line
2631  * @param argv command line arguments
2632  * @return 0 ok, 1 on error
2633  */
2634 int
2635 main (int argc, char *const *argv)
2636 {
2637   return (GNUNET_OK ==
2638           GNUNET_SERVICE_run (argc, argv, "psyc",
2639                               GNUNET_SERVICE_OPTION_NONE,
2640                               &run, NULL)) ? 0 : 1;
2641 }
2642
2643 /* end of gnunet-service-psyc.c */