38ec10e4d840cc4c0014fb86320332725383380d
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 static void
421 transmit_message (struct Channel *chn);
422
423 static uint64_t
424 message_queue_run (struct Channel *chn);
425
426 static uint64_t
427 message_queue_drop (struct Channel *chn);
428
429
430 static void
431 schedule_transmit_message (void *cls,
432                            const struct GNUNET_SCHEDULER_TaskContext *tc)
433 {
434   struct Channel *chn = cls;
435   transmit_message (chn);
436 }
437
438
439 /**
440  * Task run during shutdown.
441  *
442  * @param cls unused
443  * @param tc unused
444  */
445 static void
446 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
447 {
448   if (NULL != nc)
449   {
450     GNUNET_SERVER_notification_context_destroy (nc);
451     nc = NULL;
452   }
453   if (NULL != stats)
454   {
455     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
456     stats = NULL;
457   }
458 }
459
460
461 static struct Operation *
462 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
463         uint64_t op_id, uint32_t flags)
464 {
465   struct Operation *op = GNUNET_malloc (sizeof (*op));
466   op->client = client;
467   op->chn = chn;
468   op->op_id = op_id;
469   op->flags = flags;
470   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
471   return op;
472 }
473
474
475 static void
476 op_remove (struct Operation *op)
477 {
478   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
479   GNUNET_free (op);
480 }
481
482
483 /**
484  * Clean up master data structures after a client disconnected.
485  */
486 static void
487 cleanup_master (struct Master *mst)
488 {
489   struct Channel *chn = &mst->chn;
490
491   if (NULL != mst->origin)
492     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
493   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
494   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
495 }
496
497
498 /**
499  * Clean up slave data structures after a client disconnected.
500  */
501 static void
502 cleanup_slave (struct Slave *slv)
503 {
504   struct Channel *chn = &slv->chn;
505   struct GNUNET_CONTAINER_MultiHashMap *
506     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
507                                                 &chn->pub_key_hash);
508   GNUNET_assert (NULL != chn_slv);
509   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
510
511   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
512   {
513     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
514                                           chn_slv);
515     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
516   }
517   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
518
519   if (NULL != slv->join_msg)
520   {
521     GNUNET_free (slv->join_msg);
522     slv->join_msg = NULL;
523   }
524   if (NULL != slv->relays)
525   {
526     GNUNET_free (slv->relays);
527     slv->relays = NULL;
528   }
529   if (NULL != slv->member)
530   {
531     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
532     slv->member = NULL;
533   }
534   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
535 }
536
537
538 /**
539  * Clean up channel data structures after a client disconnected.
540  */
541 static void
542 cleanup_channel (struct Channel *chn)
543 {
544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545               "%p Cleaning up channel %s. master? %u\n",
546               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
547   message_queue_drop (chn);
548   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
549   chn->recv_frags = NULL;
550
551   if (NULL != chn->store_op)
552   {
553     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
554     chn->store_op = NULL;
555   }
556
557   (GNUNET_YES == chn->is_master)
558     ? cleanup_master ((struct Master *) chn)
559     : cleanup_slave ((struct Slave *) chn);
560   GNUNET_free (chn);
561 }
562
563
564 /**
565  * Called whenever a client is disconnected.
566  * Frees our resources associated with that client.
567  *
568  * @param cls Closure.
569  * @param client Identification of the client.
570  */
571 static void
572 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
573 {
574   if (NULL == client)
575     return;
576
577   struct Channel *
578     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
579
580   if (NULL == chn)
581   {
582     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583                 "%p User context is NULL in client_disconnect()\n", chn);
584     GNUNET_break (0);
585     return;
586   }
587
588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589               "%p Client (%s) disconnected from channel %s\n",
590               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
591               GNUNET_h2s (&chn->pub_key_hash));
592
593   struct Client *cli = chn->clients_head;
594   while (NULL != cli)
595   {
596     if (cli->client == client)
597     {
598       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
599       GNUNET_free (cli);
600       break;
601     }
602     cli = cli->next;
603   }
604
605   struct Operation *op = chn->op_head;
606   while (NULL != op)
607   {
608     if (op->client == client)
609     {
610       op->client = NULL;
611       break;
612     }
613     op = op->next;
614   }
615
616   if (NULL == chn->clients_head)
617   { /* Last client disconnected. */
618     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619                 "%p Last client (%s) disconnected from channel %s\n",
620                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
621                 GNUNET_h2s (&chn->pub_key_hash));
622     chn->is_disconnected = GNUNET_YES;
623     if (NULL != chn->tmit_head)
624     { /* Send pending messages to multicast before cleanup. */
625       transmit_message (chn);
626     }
627     else
628     {
629       cleanup_channel (chn);
630     }
631   }
632 }
633
634
635 /**
636  * Send message to all clients connected to the channel.
637  */
638 static void
639 client_send_msg (const struct Channel *chn,
640                  const struct GNUNET_MessageHeader *msg)
641 {
642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643               "%p Sending message to clients.\n", chn);
644
645   struct Client *cli = chn->clients_head;
646   while (NULL != cli)
647   {
648     GNUNET_SERVER_notification_context_add (nc, cli->client);
649     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
650     cli = cli->next;
651   }
652 }
653
654
655 /**
656  * Send a result code back to the client.
657  *
658  * @param client
659  *        Client that should receive the result code.
660  * @param result_code
661  *        Code to transmit.
662  * @param op_id
663  *        Operation ID in network byte order.
664  * @param data
665  *        Data payload or NULL.
666  * @param data_size
667  *        Size of @a data.
668  */
669 static void
670 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
671                     int64_t result_code, const void *data, uint16_t data_size)
672 {
673   struct GNUNET_OperationResultMessage *res;
674
675   res = GNUNET_malloc (sizeof (*res) + data_size);
676   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
677   res->header.size = htons (sizeof (*res) + data_size);
678   res->result_code = GNUNET_htonll (result_code);
679   res->op_id = op_id;
680   if (0 < data_size)
681     memcpy (&res[1], data, data_size);
682
683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
684               "%p Sending result to client for operation #%" PRIu64 ": "
685               "%" PRId64 " (size: %u)\n",
686               client, GNUNET_ntohll (op_id), result_code, data_size);
687
688   GNUNET_SERVER_notification_context_add (nc, client);
689   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
690                                               GNUNET_NO);
691   GNUNET_free (res);
692 }
693
694
695 /**
696  * Closure for join_mem_test_cb()
697  */
698 struct JoinMemTestClosure
699 {
700   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
701   struct Channel *chn;
702   struct GNUNET_MULTICAST_JoinHandle *jh;
703   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
704 };
705
706
707 /**
708  * Membership test result callback used for join requests.
709  */
710 static void
711 join_mem_test_cb (void *cls, int64_t result,
712                   const char *err_msg, uint16_t err_msg_size)
713 {
714   struct JoinMemTestClosure *jcls = cls;
715
716   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
717   { /* Pass on join request to client if this is a master channel */
718     struct Master *mst = (struct Master *) jcls->chn;
719     struct GNUNET_HashCode slave_pub_hash;
720     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
721                         &slave_pub_hash);
722     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
723                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
724     client_send_msg (jcls->chn, &jcls->join_msg->header);
725   }
726   else
727   {
728     if (GNUNET_SYSERR == result)
729     {
730       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
731                   "Could not perform membership test (%.*s)\n",
732                   err_msg_size, err_msg);
733     }
734     // FIXME: add relays
735     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
736   }
737   GNUNET_free (jcls->join_msg);
738   GNUNET_free (jcls);
739 }
740
741
742 /**
743  * Incoming join request from multicast.
744  */
745 static void
746 mcast_recv_join_request (void *cls,
747                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
748                          const struct GNUNET_MessageHeader *join_msg,
749                          struct GNUNET_MULTICAST_JoinHandle *jh)
750 {
751   struct Channel *chn = cls;
752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
753
754   uint16_t join_msg_size = 0;
755   if (NULL != join_msg)
756   {
757     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
758     {
759       join_msg_size = ntohs (join_msg->size);
760     }
761     else
762     {
763       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
764                   "%p Got join message with invalid type %u.\n",
765                   chn, ntohs (join_msg->type));
766     }
767   }
768
769   struct GNUNET_PSYC_JoinRequestMessage *
770     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
771   req->header.size = htons (sizeof (*req) + join_msg_size);
772   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
773   req->slave_pub_key = *slave_pub_key;
774   if (0 < join_msg_size)
775     memcpy (&req[1], join_msg, join_msg_size);
776
777   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
778   jcls->slave_pub_key = *slave_pub_key;
779   jcls->chn = chn;
780   jcls->jh = jh;
781   jcls->join_msg = req;
782
783   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
784                                     chn->max_message_id, 0,
785                                     &join_mem_test_cb, jcls);
786 }
787
788
789 /**
790  * Join decision received from multicast.
791  */
792 static void
793 mcast_recv_join_decision (void *cls, int is_admitted,
794                           const struct GNUNET_PeerIdentity *peer,
795                           uint16_t relay_count,
796                           const struct GNUNET_PeerIdentity *relays,
797                           const struct GNUNET_MessageHeader *join_resp)
798 {
799   struct Slave *slv = cls;
800   struct Channel *chn = &slv->chn;
801   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
802               "%p Got join decision: %d\n", slv, is_admitted);
803   if (GNUNET_YES == chn->is_ready)
804   {
805     /* Already admitted */
806     return;
807   }
808
809   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
810   struct GNUNET_PSYC_JoinDecisionMessage *
811     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
812   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
813   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
814   dcsn->is_admitted = htonl (is_admitted);
815   if (0 < join_resp_size)
816     memcpy (&dcsn[1], join_resp, join_resp_size);
817
818   client_send_msg (chn, &dcsn->header);
819
820   if (GNUNET_YES == is_admitted
821       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
822   {
823     chn->is_ready = GNUNET_YES;
824   }
825 }
826
827
828 static int
829 store_recv_fragment_replay (void *cls,
830                             struct GNUNET_MULTICAST_MessageHeader *msg,
831                             enum GNUNET_PSYCSTORE_MessageFlags flags)
832 {
833   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
834
835   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
836   return GNUNET_YES;
837 }
838
839
840 /**
841  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
842  */
843 static void
844 store_recv_fragment_replay_result (void *cls, int64_t result,
845                                    const char *err_msg, uint16_t err_msg_size)
846 {
847   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
850               rh, result, err_msg_size, err_msg);
851
852   switch (result)
853   {
854   case GNUNET_YES:
855     break;
856
857   case GNUNET_NO:
858     GNUNET_MULTICAST_replay_response (rh, NULL,
859                                       GNUNET_MULTICAST_REC_NOT_FOUND);
860     break;
861
862   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
863     GNUNET_MULTICAST_replay_response (rh, NULL,
864                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
865     break;
866
867   case GNUNET_SYSERR:
868     GNUNET_MULTICAST_replay_response (rh, NULL,
869                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
870     break;
871   }
872   GNUNET_MULTICAST_replay_response_end (rh);
873 }
874
875
876 /**
877  * Incoming fragment replay request from multicast.
878  */
879 static void
880 mcast_recv_replay_fragment (void *cls,
881                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
882                             uint64_t fragment_id, uint64_t flags,
883                             struct GNUNET_MULTICAST_ReplayHandle *rh)
884
885 {
886   struct Channel *chn = cls;
887   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
888                                  fragment_id, fragment_id,
889                                  &store_recv_fragment_replay,
890                                  &store_recv_fragment_replay_result, rh);
891 }
892
893
894 /**
895  * Incoming message replay request from multicast.
896  */
897 static void
898 mcast_recv_replay_message (void *cls,
899                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
900                            uint64_t message_id,
901                            uint64_t fragment_offset,
902                            uint64_t flags,
903                            struct GNUNET_MULTICAST_ReplayHandle *rh)
904 {
905   struct Channel *chn = cls;
906   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
907                                 message_id, message_id, 1, NULL,
908                                 &store_recv_fragment_replay,
909                                 &store_recv_fragment_replay_result, rh);
910 }
911
912
913 /**
914  * Convert an uint64_t in network byte order to a HashCode
915  * that can be used as key in a MultiHashMap
916  */
917 static inline void
918 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
919 {
920   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
921   /* TODO: use built-in byte swap functions if available */
922
923   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
924   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
925
926   *key = (struct GNUNET_HashCode) {};
927   *((uint64_t *) key)
928     = (n << 32) | (n >> 32);
929 }
930
931
932 /**
933  * Convert an uint64_t in host byte order to a HashCode
934  * that can be used as key in a MultiHashMap
935  */
936 static inline void
937 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
938 {
939 #if __BYTE_ORDER == __BIG_ENDIAN
940   hash_key_from_nll (key, n);
941 #elif __BYTE_ORDER == __LITTLE_ENDIAN
942   *key = (struct GNUNET_HashCode) {};
943   *((uint64_t *) key) = n;
944 #else
945   #error byteorder undefined
946 #endif
947 }
948
949
950 /**
951  * Initialize PSYC message header.
952  */
953 static inline void
954 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
955                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
956 {
957   uint16_t size = ntohs (mmsg->header.size);
958   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
959
960   pmsg->header.size = htons (psize);
961   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
962   pmsg->message_id = mmsg->message_id;
963   pmsg->fragment_offset = mmsg->fragment_offset;
964   pmsg->flags = htonl (flags);
965
966   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
967 }
968
969
970 /**
971  * Create a new PSYC message from a multicast message for sending it to clients.
972  */
973 static inline struct GNUNET_PSYC_MessageHeader *
974 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
975 {
976   struct GNUNET_PSYC_MessageHeader *pmsg;
977   uint16_t size = ntohs (mmsg->header.size);
978   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
979
980   pmsg = GNUNET_malloc (psize);
981   psyc_msg_init (pmsg, mmsg, flags);
982   return pmsg;
983 }
984
985
986 /**
987  * Send multicast message to all clients connected to the channel.
988  */
989 static void
990 client_send_mcast_msg (struct Channel *chn,
991                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
992                        uint32_t flags)
993 {
994   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
995               "%p Sending multicast message to client. "
996               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
997               chn, GNUNET_ntohll (mmsg->fragment_id),
998               GNUNET_ntohll (mmsg->message_id));
999
1000   struct GNUNET_PSYC_MessageHeader *
1001     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1002   client_send_msg (chn, &pmsg->header);
1003   GNUNET_free (pmsg);
1004 }
1005
1006
1007 /**
1008  * Send multicast request to all clients connected to the channel.
1009  */
1010 static void
1011 client_send_mcast_req (struct Master *mst,
1012                        const struct GNUNET_MULTICAST_RequestHeader *req)
1013 {
1014   struct Channel *chn = &mst->chn;
1015
1016   struct GNUNET_PSYC_MessageHeader *pmsg;
1017   uint16_t size = ntohs (req->header.size);
1018   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1019
1020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1021               "%p Sending multicast request to client. "
1022               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1023               chn, GNUNET_ntohll (req->fragment_id),
1024               GNUNET_ntohll (req->request_id));
1025
1026   pmsg = GNUNET_malloc (psize);
1027   pmsg->header.size = htons (psize);
1028   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1029   pmsg->message_id = req->request_id;
1030   pmsg->fragment_offset = req->fragment_offset;
1031   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1032   pmsg->slave_pub_key = req->member_pub_key;
1033   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1034
1035   client_send_msg (chn, &pmsg->header);
1036
1037   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1038
1039   GNUNET_free (pmsg);
1040 }
1041
1042
1043 /**
1044  * Insert a multicast message fragment into the queue belonging to the message.
1045  *
1046  * @param chn          Channel.
1047  * @param mmsg         Multicast message fragment.
1048  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1049  * @param first_ptype  First PSYC message part type in @a mmsg.
1050  * @param last_ptype   Last PSYC message part type in @a mmsg.
1051  */
1052 static void
1053 fragment_queue_insert (struct Channel *chn,
1054                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1055                        uint16_t first_ptype, uint16_t last_ptype)
1056 {
1057   const uint16_t size = ntohs (mmsg->header.size);
1058   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1059   struct GNUNET_CONTAINER_MultiHashMap
1060     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1061                                                     &chn->pub_key_hash);
1062
1063   struct GNUNET_HashCode msg_id_hash;
1064   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1065
1066   struct FragmentQueue
1067     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1068
1069   if (NULL == fragq)
1070   {
1071     fragq = GNUNET_new (struct FragmentQueue);
1072     fragq->state = MSG_FRAG_STATE_HEADER;
1073     fragq->fragments
1074       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1075
1076     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1077                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1078
1079     if (NULL == chan_msgs)
1080     {
1081       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1082       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1083                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1084     }
1085   }
1086
1087   struct GNUNET_HashCode frag_id_hash;
1088   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1089   struct RecvCacheEntry
1090     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1091   if (NULL == cache_entry)
1092   {
1093     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094                 "%p Adding message fragment to cache. "
1095                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1096                 chn, GNUNET_ntohll (mmsg->message_id),
1097                 GNUNET_ntohll (mmsg->fragment_id));
1098     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099                 "%p header_size: %" PRIu64 " + %u\n",
1100                 chn, fragq->header_size, size);
1101     cache_entry = GNUNET_new (struct RecvCacheEntry);
1102     cache_entry->ref_count = 1;
1103     cache_entry->mmsg = GNUNET_malloc (size);
1104     memcpy (cache_entry->mmsg, mmsg, size);
1105     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1106                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1107   }
1108   else
1109   {
1110     cache_entry->ref_count++;
1111     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112                 "%p Message fragment is already in cache. "
1113                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1114                 ", ref_count: %u\n",
1115                 chn, GNUNET_ntohll (mmsg->message_id),
1116                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1117   }
1118
1119   if (MSG_FRAG_STATE_HEADER == fragq->state)
1120   {
1121     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1122     {
1123       struct GNUNET_PSYC_MessageMethod *
1124         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1125       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1126       fragq->flags = ntohl (pmeth->flags);
1127     }
1128
1129     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1130     {
1131       fragq->header_size += size;
1132     }
1133     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1134              || frag_offset == fragq->header_size)
1135     { /* header is now complete */
1136       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1137                   "%p Header of message %" PRIu64 " is complete.\n",
1138                   chn, GNUNET_ntohll (mmsg->message_id));
1139
1140       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1141                   "%p Adding message %" PRIu64 " to queue.\n",
1142                   chn, GNUNET_ntohll (mmsg->message_id));
1143       fragq->state = MSG_FRAG_STATE_DATA;
1144     }
1145     else
1146     {
1147       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1148                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1149                   "%" PRIu64 " != %" PRIu64 "\n",
1150                   chn, GNUNET_ntohll (mmsg->message_id),
1151                   frag_offset, fragq->header_size);
1152     }
1153   }
1154
1155   switch (last_ptype)
1156   {
1157   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1158     if (frag_offset == fragq->size)
1159       fragq->state = MSG_FRAG_STATE_END;
1160     else
1161       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1162                   "%p Message %" PRIu64 " is NOT complete yet: "
1163                   "%" PRIu64 " != %" PRIu64 "\n",
1164                   chn, GNUNET_ntohll (mmsg->message_id),
1165                   frag_offset, fragq->size);
1166     break;
1167
1168   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1169     /* Drop message without delivering to client if it's a single fragment */
1170     fragq->state =
1171       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1172       ? MSG_FRAG_STATE_DROP
1173       : MSG_FRAG_STATE_CANCEL;
1174   }
1175
1176   switch (fragq->state)
1177   {
1178   case MSG_FRAG_STATE_DATA:
1179   case MSG_FRAG_STATE_END:
1180   case MSG_FRAG_STATE_CANCEL:
1181     if (GNUNET_NO == fragq->is_queued)
1182     {
1183       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1184                                     GNUNET_ntohll (mmsg->message_id));
1185       fragq->is_queued = GNUNET_YES;
1186     }
1187   }
1188
1189   fragq->size += size;
1190   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1191                                 GNUNET_ntohll (mmsg->fragment_id));
1192 }
1193
1194
1195 /**
1196  * Run fragment queue of a message.
1197  *
1198  * Send fragments of a message in order to client, after all modifiers arrived
1199  * from multicast.
1200  *
1201  * @param chn
1202  *        Channel.
1203  * @param msg_id
1204  *        ID of the message @a fragq belongs to.
1205  * @param fragq
1206  *        Fragment queue of the message.
1207  * @param drop
1208  *        Drop message without delivering to client?
1209  *        #GNUNET_YES or #GNUNET_NO.
1210  */
1211 static void
1212 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1213                     struct FragmentQueue *fragq, uint8_t drop)
1214 {
1215   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1216               "%p Running message fragment queue for message %" PRIu64
1217               " (state: %u).\n",
1218               chn, msg_id, fragq->state);
1219
1220   struct GNUNET_CONTAINER_MultiHashMap
1221     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1222                                                     &chn->pub_key_hash);
1223   GNUNET_assert (NULL != chan_msgs);
1224   uint64_t frag_id;
1225
1226   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1227                                                     &frag_id))
1228   {
1229     struct GNUNET_HashCode frag_id_hash;
1230     hash_key_from_hll (&frag_id_hash, frag_id);
1231     struct RecvCacheEntry *cache_entry
1232       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1233     if (cache_entry != NULL)
1234     {
1235       if (GNUNET_NO == drop)
1236       {
1237         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1238       }
1239       if (cache_entry->ref_count <= 1)
1240       {
1241         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1242                                               cache_entry);
1243         GNUNET_free (cache_entry->mmsg);
1244         GNUNET_free (cache_entry);
1245       }
1246       else
1247       {
1248         cache_entry->ref_count--;
1249       }
1250     }
1251 #if CACHE_AGING_IMPLEMENTED
1252     else if (GNUNET_NO == drop)
1253     {
1254       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1255     }
1256 #endif
1257
1258     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1259   }
1260
1261   if (MSG_FRAG_STATE_END <= fragq->state)
1262   {
1263     struct GNUNET_HashCode msg_id_hash;
1264     hash_key_from_hll (&msg_id_hash, msg_id);
1265
1266     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1267     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1268     GNUNET_free (fragq);
1269   }
1270   else
1271   {
1272     fragq->is_queued = GNUNET_NO;
1273   }
1274 }
1275
1276
1277 struct StateModifyClosure
1278 {
1279   struct Channel *chn;
1280   uint64_t msg_id;
1281   struct GNUNET_HashCode msg_id_hash;
1282 };
1283
1284
1285 void
1286 store_recv_state_modify_result (void *cls, int64_t result,
1287                                 const char *err_msg, uint16_t err_msg_size)
1288 {
1289   struct StateModifyClosure *mcls = cls;
1290   struct Channel *chn = mcls->chn;
1291   uint64_t msg_id = mcls->msg_id;
1292
1293   struct FragmentQueue *
1294     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1295
1296   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1297               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1298               chn, result, err_msg_size, err_msg);
1299
1300   switch (result)
1301   {
1302   case GNUNET_OK:
1303   case GNUNET_NO:
1304     if (NULL != fragq)
1305       fragq->state_is_modified = GNUNET_YES;
1306     if (chn->max_state_message_id < msg_id)
1307       chn->max_state_message_id = msg_id;
1308     if (chn->max_message_id < msg_id)
1309       chn->max_message_id = msg_id;
1310
1311     if (NULL != fragq)
1312       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1313     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1314     message_queue_run (chn);
1315     break;
1316
1317   default:
1318     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1319                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1320                 chn, result, err_msg_size, err_msg);
1321     /** @todo FIXME: handle state_modify error */
1322   }
1323 }
1324
1325
1326 /**
1327  * Run message queue.
1328  *
1329  * Send messages in queue to client in order after a message has arrived from
1330  * multicast, according to the following:
1331  * - A message is only sent if all of its modifiers arrived.
1332  * - A stateful message is only sent if the previous stateful message
1333  *   has already been delivered to the client.
1334  *
1335  * @param chn  Channel.
1336  *
1337  * @return Number of messages removed from queue and sent to client.
1338  */
1339 static uint64_t
1340 message_queue_run (struct Channel *chn)
1341 {
1342   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1343               "%p Running message queue.\n", chn);
1344   uint64_t n = 0;
1345   uint64_t msg_id;
1346
1347   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1348                                                     &msg_id))
1349   {
1350     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1351                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1352     struct GNUNET_HashCode msg_id_hash;
1353     hash_key_from_hll (&msg_id_hash, msg_id);
1354
1355     struct FragmentQueue *
1356       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1357
1358     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1359     {
1360       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1361                   "%p No fragq (%p) or header not complete.\n",
1362                   chn, fragq);
1363       break;
1364     }
1365
1366     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1367                 "%p Fragment queue entry:  state: %u, state delta: "
1368                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1369                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1370
1371     if (MSG_FRAG_STATE_DATA <= fragq->state)
1372     {
1373       /* Check if there's a missing message before the current one */
1374       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1375       {
1376         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1377
1378         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1379             && (chn->max_message_id != msg_id - 1
1380                 && chn->max_message_id != msg_id))
1381         {
1382           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1383                       "%p Out of order message. "
1384                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1385                       chn, chn->max_message_id, msg_id);
1386           break;
1387           // FIXME: keep track of messages processed in this queue run,
1388           //        and only stop after reaching the end
1389         }
1390       }
1391       else
1392       {
1393         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1394         if (GNUNET_YES != fragq->state_is_modified)
1395         {
1396           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1397           {
1398             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1399                         "%p Out of order stateful message. "
1400                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1401                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1402             break;
1403             // FIXME: keep track of messages processed in this queue run,
1404             //        and only stop after reaching the end
1405           }
1406
1407           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1408           mcls->chn = chn;
1409           mcls->msg_id = msg_id;
1410           mcls->msg_id_hash = msg_id_hash;
1411
1412           /* Apply modifiers to state in PSYCstore */
1413           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1414                                          fragq->state_delta,
1415                                          store_recv_state_modify_result, mcls);
1416           break; // continue after asynchronous state modify result
1417         }
1418       }
1419       chn->max_message_id = msg_id;
1420     }
1421     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1422     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1423     n++;
1424   }
1425
1426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1428   return n;
1429 }
1430
1431
1432 /**
1433  * Drop message queue of a channel.
1434  *
1435  * Remove all messages in queue without sending it to clients.
1436  *
1437  * @param chn  Channel.
1438  *
1439  * @return Number of messages removed from queue.
1440  */
1441 static uint64_t
1442 message_queue_drop (struct Channel *chn)
1443 {
1444   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1445               "%p Dropping message queue.\n", chn);
1446   uint64_t n = 0;
1447   uint64_t msg_id;
1448   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1449                                                     &msg_id))
1450   {
1451     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1452                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1453     struct GNUNET_HashCode msg_id_hash;
1454     hash_key_from_hll (&msg_id_hash, msg_id);
1455
1456     struct FragmentQueue *
1457       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1458     GNUNET_assert (NULL != fragq);
1459     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1460     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1461     n++;
1462   }
1463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1464               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1465   return n;
1466 }
1467
1468
1469 /**
1470  * Received result of GNUNET_PSYCSTORE_fragment_store().
1471  */
1472 static void
1473 store_recv_fragment_store_result (void *cls, int64_t result,
1474                                   const char *err_msg, uint16_t err_msg_size)
1475 {
1476   struct Channel *chn = cls;
1477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1478               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1479               chn, result, err_msg_size, err_msg);
1480 }
1481
1482
1483 /**
1484  * Handle incoming message fragment from multicast.
1485  *
1486  * Store it using PSYCstore and send it to the clients of the channel in order.
1487  */
1488 static void
1489 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1490 {
1491   struct Channel *chn = cls;
1492   uint16_t size = ntohs (mmsg->header.size);
1493
1494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1495               "%p Received multicast message of size %u. "
1496               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1497               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1498               chn, size,
1499               GNUNET_ntohll (mmsg->fragment_id),
1500               GNUNET_ntohll (mmsg->message_id),
1501               GNUNET_ntohll (mmsg->fragment_offset),
1502               GNUNET_ntohll (mmsg->flags));
1503
1504   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1505                                    &store_recv_fragment_store_result, chn);
1506
1507   uint16_t first_ptype = 0, last_ptype = 0;
1508   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1509                                                (const char *) &mmsg[1],
1510                                                &first_ptype, &last_ptype);
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "%p Message check result %d, first part type %u, last part type %u\n",
1513               chn, check, first_ptype, last_ptype);
1514   if (GNUNET_SYSERR == check)
1515   {
1516     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1517                 "%p Dropping incoming multicast message with invalid parts.\n",
1518                 chn);
1519     GNUNET_break_op (0);
1520     return;
1521   }
1522
1523   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1524   message_queue_run (chn);
1525 }
1526
1527
1528 /**
1529  * Incoming request fragment from multicast for a master.
1530  *
1531  * @param cls   Master.
1532  * @param req   The request.
1533  */
1534 static void
1535 mcast_recv_request (void *cls,
1536                     const struct GNUNET_MULTICAST_RequestHeader *req)
1537 {
1538   struct Master *mst = cls;
1539   uint16_t size = ntohs (req->header.size);
1540
1541   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1542   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543               "%p Received multicast request of size %u from %s.\n",
1544               mst, size, str);
1545   GNUNET_free (str);
1546
1547   uint16_t first_ptype = 0, last_ptype = 0;
1548   if (GNUNET_SYSERR
1549       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1550                                           (const char *) &req[1],
1551                                           &first_ptype, &last_ptype))
1552   {
1553     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1554                 "%p Dropping incoming multicast request with invalid parts.\n",
1555                 mst);
1556     GNUNET_break_op (0);
1557     return;
1558   }
1559
1560   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1561               "Message parts: first: type %u, last: type %u\n",
1562               first_ptype, last_ptype);
1563
1564   /* FIXME: in-order delivery */
1565   client_send_mcast_req (mst, req);
1566 }
1567
1568
1569 /**
1570  * Response from PSYCstore with the current counter values for a channel master.
1571  */
1572 static void
1573 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1574                             uint64_t max_message_id, uint64_t max_group_generation,
1575                             uint64_t max_state_message_id)
1576 {
1577   struct Master *mst = cls;
1578   struct Channel *chn = &mst->chn;
1579   chn->store_op = NULL;
1580
1581   struct GNUNET_PSYC_CountersResultMessage res;
1582   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1583   res.header.size = htons (sizeof (res));
1584   res.result_code = htonl (result);
1585   res.max_message_id = GNUNET_htonll (max_message_id);
1586
1587   if (GNUNET_OK == result || GNUNET_NO == result)
1588   {
1589     mst->max_message_id = max_message_id;
1590     chn->max_message_id = max_message_id;
1591     chn->max_state_message_id = max_state_message_id;
1592     mst->max_group_generation = max_group_generation;
1593     mst->origin
1594       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1595                                        mcast_recv_join_request,
1596                                        mcast_recv_replay_fragment,
1597                                        mcast_recv_replay_message,
1598                                        mcast_recv_request,
1599                                        mcast_recv_message, chn);
1600     chn->is_ready = GNUNET_YES;
1601   }
1602   else
1603   {
1604     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1605                 "%p GNUNET_PSYCSTORE_counters_get() "
1606                 "returned %d for channel %s.\n",
1607                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1608   }
1609
1610   client_send_msg (chn, &res.header);
1611 }
1612
1613
1614 /**
1615  * Response from PSYCstore with the current counter values for a channel slave.
1616  */
1617 void
1618 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1619                            uint64_t max_message_id, uint64_t max_group_generation,
1620                            uint64_t max_state_message_id)
1621 {
1622   struct Slave *slv = cls;
1623   struct Channel *chn = &slv->chn;
1624   chn->store_op = NULL;
1625
1626   struct GNUNET_PSYC_CountersResultMessage res;
1627   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1628   res.header.size = htons (sizeof (res));
1629   res.result_code = htonl (result);
1630   res.max_message_id = GNUNET_htonll (max_message_id);
1631
1632   if (GNUNET_OK == result || GNUNET_NO == result)
1633   {
1634     chn->max_message_id = max_message_id;
1635     chn->max_state_message_id = max_state_message_id;
1636     slv->member
1637       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1638                                       &slv->origin,
1639                                       slv->relay_count, slv->relays,
1640                                       &slv->join_msg->header,
1641                                       mcast_recv_join_request,
1642                                       mcast_recv_join_decision,
1643                                       mcast_recv_replay_fragment,
1644                                       mcast_recv_replay_message,
1645                                       mcast_recv_message, chn);
1646     if (NULL != slv->join_msg)
1647     {
1648       GNUNET_free (slv->join_msg);
1649       slv->join_msg = NULL;
1650     }
1651   }
1652   else
1653   {
1654     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1655                 "%p GNUNET_PSYCSTORE_counters_get() "
1656                 "returned %d for channel %s.\n",
1657                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1658   }
1659
1660   client_send_msg (chn, &res.header);
1661 }
1662
1663
1664 static void
1665 channel_init (struct Channel *chn)
1666 {
1667   chn->recv_msgs
1668     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1669   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1670 }
1671
1672
1673 /**
1674  * Handle a connecting client starting a channel master.
1675  */
1676 static void
1677 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1678                           const struct GNUNET_MessageHeader *msg)
1679 {
1680   const struct MasterStartRequest *req
1681     = (const struct MasterStartRequest *) msg;
1682
1683   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1684   struct GNUNET_HashCode pub_key_hash;
1685
1686   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1687   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1688
1689   struct Master *
1690     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1691   struct Channel *chn;
1692
1693   if (NULL == mst)
1694   {
1695     mst = GNUNET_new (struct Master);
1696     mst->policy = ntohl (req->policy);
1697     mst->priv_key = req->channel_key;
1698     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1699
1700     chn = &mst->chn;
1701     chn->is_master = GNUNET_YES;
1702     chn->pub_key = pub_key;
1703     chn->pub_key_hash = pub_key_hash;
1704     channel_init (chn);
1705
1706     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1707                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1708     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1709                                                    store_recv_master_counters, mst);
1710   }
1711   else
1712   {
1713     chn = &mst->chn;
1714
1715     struct GNUNET_PSYC_CountersResultMessage res;
1716     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1717     res.header.size = htons (sizeof (res));
1718     res.result_code = htonl (GNUNET_OK);
1719     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1720
1721     GNUNET_SERVER_notification_context_add (nc, client);
1722     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1723                                                 GNUNET_NO);
1724   }
1725
1726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1727               "%p Client connected as master to channel %s.\n",
1728               mst, GNUNET_h2s (&chn->pub_key_hash));
1729
1730   struct Client *cli = GNUNET_new (struct Client);
1731   cli->client = client;
1732   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1733
1734   GNUNET_SERVER_client_set_user_context (client, chn);
1735   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1736 }
1737
1738
1739 /**
1740  * Handle a connecting client joining as a channel slave.
1741  */
1742 static void
1743 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1744                         const struct GNUNET_MessageHeader *msg)
1745 {
1746   const struct SlaveJoinRequest *req
1747     = (const struct SlaveJoinRequest *) msg;
1748   uint16_t req_size = ntohs (req->header.size);
1749
1750   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1751   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1752
1753   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1754   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1755   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1756
1757   struct GNUNET_CONTAINER_MultiHashMap *
1758     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1759   struct Slave *slv = NULL;
1760   struct Channel *chn;
1761
1762   if (NULL != chn_slv)
1763   {
1764     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1765   }
1766   if (NULL == slv)
1767   {
1768     slv = GNUNET_new (struct Slave);
1769     slv->priv_key = req->slave_key;
1770     slv->pub_key = slv_pub_key;
1771     slv->pub_key_hash = slv_pub_hash;
1772     slv->origin = req->origin;
1773     slv->relay_count = ntohl (req->relay_count);
1774     slv->join_flags = ntohl (req->flags);
1775
1776     const struct GNUNET_PeerIdentity *
1777       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1778     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1779     uint16_t join_msg_size = 0;
1780
1781     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1782         <= req_size)
1783     {
1784       struct GNUNET_PSYC_Message *
1785         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1786       join_msg_size = ntohs (join_msg->header.size);
1787       slv->join_msg = GNUNET_malloc (join_msg_size);
1788       memcpy (slv->join_msg, join_msg, join_msg_size);
1789     }
1790     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1791     {
1792       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1793                   "%u + %u + %u != %u\n",
1794                   sizeof (*req), relay_size, join_msg_size, req_size);
1795       GNUNET_break (0);
1796       GNUNET_SERVER_client_disconnect (client);
1797       GNUNET_free (slv);
1798       return;
1799     }
1800     if (0 < slv->relay_count)
1801     {
1802       slv->relays = GNUNET_malloc (relay_size);
1803       memcpy (slv->relays, &req[1], relay_size);
1804     }
1805
1806     chn = &slv->chn;
1807     chn->is_master = GNUNET_NO;
1808     chn->pub_key = req->channel_pub_key;
1809     chn->pub_key_hash = pub_key_hash;
1810     channel_init (chn);
1811
1812     if (NULL == chn_slv)
1813     {
1814       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1815       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1816                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1817     }
1818     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1819                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1820     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1821                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1822     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1823                                                   &store_recv_slave_counters, slv);
1824   }
1825   else
1826   {
1827     chn = &slv->chn;
1828
1829     struct GNUNET_PSYC_CountersResultMessage res;
1830     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1831     res.header.size = htons (sizeof (res));
1832     res.result_code = htonl (GNUNET_OK);
1833     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1834
1835     GNUNET_SERVER_notification_context_add (nc, client);
1836     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1837                                                 GNUNET_NO);
1838
1839     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1840     {
1841       mcast_recv_join_decision (slv, GNUNET_YES,
1842                                 NULL, 0, NULL, NULL);
1843     }
1844     else if (NULL == slv->member)
1845     {
1846       slv->member
1847         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1848                                         &slv->origin,
1849                                         slv->relay_count, slv->relays,
1850                                         &slv->join_msg->header,
1851                                         &mcast_recv_join_request,
1852                                         &mcast_recv_join_decision,
1853                                         &mcast_recv_replay_fragment,
1854                                         &mcast_recv_replay_message,
1855                                         &mcast_recv_message, chn);
1856       if (NULL != slv->join_msg)
1857       {
1858         GNUNET_free (slv->join_msg);
1859         slv->join_msg = NULL;
1860       }
1861     }
1862     else if (NULL != slv->join_dcsn)
1863     {
1864       GNUNET_SERVER_notification_context_add (nc, client);
1865       GNUNET_SERVER_notification_context_unicast (nc, client,
1866                                                   &slv->join_dcsn->header,
1867                                                   GNUNET_NO);
1868     }
1869   }
1870
1871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1872               "%p Client connected as slave to channel %s.\n",
1873               slv, GNUNET_h2s (&chn->pub_key_hash));
1874
1875   struct Client *cli = GNUNET_new (struct Client);
1876   cli->client = client;
1877   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1878
1879   GNUNET_SERVER_client_set_user_context (client, chn);
1880   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1881 }
1882
1883
1884 struct JoinDecisionClosure
1885 {
1886   int32_t is_admitted;
1887   struct GNUNET_MessageHeader *msg;
1888 };
1889
1890
1891 /**
1892  * Iterator callback for sending join decisions to multicast.
1893  */
1894 static int
1895 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1896                           void *value)
1897 {
1898   struct JoinDecisionClosure *jcls = cls;
1899   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1900   // FIXME: add relays
1901   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1902   return GNUNET_YES;
1903 }
1904
1905
1906 /**
1907  * Join decision from client.
1908  */
1909 static void
1910 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1911                            const struct GNUNET_MessageHeader *msg)
1912 {
1913   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1914     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1915   struct Channel *chn;
1916   struct Master *mst;
1917   struct JoinDecisionClosure jcls;
1918
1919   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1920   if (NULL == chn)
1921   {
1922     GNUNET_break (0);
1923     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1924     return;
1925   }
1926   GNUNET_assert (GNUNET_YES == chn->is_master);
1927   mst = (struct Master *) chn;
1928   jcls.is_admitted = ntohl (dcsn->is_admitted);
1929   jcls.msg
1930     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1931     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1932     : NULL;
1933
1934   struct GNUNET_HashCode slave_pub_hash;
1935   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1936                       &slave_pub_hash);
1937
1938   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939               "%p Got join decision (%d) from client for channel %s..\n",
1940               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1942               "%p ..and slave %s.\n",
1943               mst, GNUNET_h2s (&slave_pub_hash));
1944
1945   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1946                                               &mcast_send_join_decision, &jcls);
1947   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1948   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1949 }
1950
1951
1952 /**
1953  * Send acknowledgement to a client.
1954  *
1955  * Sent after a message fragment has been passed on to multicast.
1956  *
1957  * @param chn The channel struct for the client.
1958  */
1959 static void
1960 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1961 {
1962   struct GNUNET_MessageHeader res;
1963   res.size = htons (sizeof (res));
1964   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1965
1966   /* FIXME */
1967   GNUNET_SERVER_notification_context_add (nc, client);
1968   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1969 }
1970
1971
1972 /**
1973  * Callback for the transmit functions of multicast.
1974  */
1975 static int
1976 transmit_notify (void *cls, size_t *data_size, void *data)
1977 {
1978   struct Channel *chn = cls;
1979   struct TransmitMessage *tmit_msg = chn->tmit_head;
1980
1981   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1982   {
1983     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984                 "%p transmit_notify: nothing to send.\n", chn);
1985     if (NULL != tmit_msg && *data_size < tmit_msg->size)
1986       GNUNET_break (0);
1987     *data_size = 0;
1988     return GNUNET_NO;
1989   }
1990
1991   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1992               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1993
1994   *data_size = tmit_msg->size;
1995   memcpy (data, &tmit_msg[1], *data_size);
1996
1997   int ret
1998     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1999     ? GNUNET_NO
2000     : GNUNET_YES;
2001
2002   /* FIXME: handle disconnecting clients */
2003   if (NULL != tmit_msg->client)
2004     send_message_ack (chn, tmit_msg->client);
2005
2006   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2007   GNUNET_free (tmit_msg);
2008
2009   if (NULL != chn->tmit_head)
2010   {
2011     GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
2012   }
2013   else if (GNUNET_YES == chn->is_disconnected
2014            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2015   {
2016     /* FIXME: handle partial message (when still in_transmit) */
2017     return GNUNET_SYSERR;
2018   }
2019   return ret;
2020 }
2021
2022
2023 /**
2024  * Callback for the transmit functions of multicast.
2025  */
2026 static int
2027 master_transmit_notify (void *cls, size_t *data_size, void *data)
2028 {
2029   int ret = transmit_notify (cls, data_size, data);
2030
2031   if (GNUNET_YES == ret)
2032   {
2033     struct Master *mst = cls;
2034     mst->tmit_handle = NULL;
2035   }
2036   return ret;
2037 }
2038
2039
2040 /**
2041  * Callback for the transmit functions of multicast.
2042  */
2043 static int
2044 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2045 {
2046   int ret = transmit_notify (cls, data_size, data);
2047
2048   if (GNUNET_YES == ret)
2049   {
2050     struct Slave *slv = cls;
2051     slv->tmit_handle = NULL;
2052   }
2053   return ret;
2054 }
2055
2056
2057 /**
2058  * Transmit a message from a channel master to the multicast group.
2059  */
2060 static void
2061 master_transmit_message (struct Master *mst)
2062 {
2063   struct Channel *chn = &mst->chn;
2064   struct TransmitMessage *tmit_msg = chn->tmit_head;
2065   if (NULL == tmit_msg)
2066     return;
2067   if (NULL == mst->tmit_handle)
2068   {
2069     mst->tmit_handle
2070       = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2071                                         mst->max_group_generation,
2072                                         master_transmit_notify, mst);
2073   }
2074   else
2075   {
2076     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2077   }
2078 }
2079
2080
2081 /**
2082  * Transmit a message from a channel slave to the multicast group.
2083  */
2084 static void
2085 slave_transmit_message (struct Slave *slv)
2086 {
2087   if (NULL == slv->chn.tmit_head)
2088     return;
2089   if (NULL == slv->tmit_handle)
2090   {
2091     slv->tmit_handle
2092       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2093                                            slave_transmit_notify, slv);
2094   }
2095   else
2096   {
2097     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2098   }
2099 }
2100
2101
2102 static void
2103 transmit_message (struct Channel *chn)
2104 {
2105   chn->is_master
2106     ? master_transmit_message ((struct Master *) chn)
2107     : slave_transmit_message ((struct Slave *) chn);
2108 }
2109
2110
2111 /**
2112  * Queue a message from a channel master for sending to the multicast group.
2113  */
2114 static void
2115 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2116 {
2117   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2118
2119   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2120   {
2121     tmit_msg->id = ++mst->max_message_id;
2122     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2123                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2124                 mst, tmit_msg->id);
2125     struct GNUNET_PSYC_MessageMethod *pmeth
2126       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2127
2128     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2129     {
2130       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2131     }
2132     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2133     {
2134       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2135                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2136                   mst, tmit_msg->id - mst->max_state_message_id);
2137       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2138                                           - mst->max_state_message_id);
2139       mst->max_state_message_id = tmit_msg->id;
2140     }
2141     else
2142     {
2143         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2144                     "%p master_queue_message: state not modified\n", mst);
2145       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2146     }
2147
2148     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2149     {
2150       /// @todo add state_hash to PSYC header
2151     }
2152   }
2153 }
2154
2155
2156 /**
2157  * Queue a message from a channel slave for sending to the multicast group.
2158  */
2159 static void
2160 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2161 {
2162   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2163   {
2164     struct GNUNET_PSYC_MessageMethod *pmeth
2165       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2166     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2167     tmit_msg->id = ++slv->max_request_id;
2168   }
2169 }
2170
2171
2172 /**
2173  * Queue PSYC message parts for sending to multicast.
2174  *
2175  * @param chn
2176  *        Channel to send to.
2177  * @param client
2178  *        Client the message originates from.
2179  * @param data_size
2180  *        Size of @a data.
2181  * @param data
2182  *        Concatenated message parts.
2183  * @param first_ptype
2184  *        First message part type in @a data.
2185  * @param last_ptype
2186  *        Last message part type in @a data.
2187  */
2188 static struct TransmitMessage *
2189 queue_message (struct Channel *chn,
2190                struct GNUNET_SERVER_Client *client,
2191                size_t data_size,
2192                const void *data,
2193                uint16_t first_ptype, uint16_t last_ptype)
2194 {
2195   struct TransmitMessage *
2196     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2197   memcpy (&tmit_msg[1], data, data_size);
2198   tmit_msg->client = client;
2199   tmit_msg->size = data_size;
2200   tmit_msg->first_ptype = first_ptype;
2201   tmit_msg->last_ptype = last_ptype;
2202
2203   /* FIXME: separate queue per message ID */
2204
2205   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2206
2207   chn->is_master
2208     ? master_queue_message ((struct Master *) chn, tmit_msg)
2209     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2210   return tmit_msg;
2211 }
2212
2213
2214 /**
2215  * Cancel transmission of current message.
2216  *
2217  * @param chn     Channel to send to.
2218  * @param client  Client the message originates from.
2219  */
2220 static void
2221 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2222 {
2223   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2224
2225   struct GNUNET_MessageHeader msg;
2226   msg.size = htons (sizeof (msg));
2227   msg.type = htons (type);
2228
2229   queue_message (chn, client, sizeof (msg), &msg, type, type);
2230   transmit_message (chn);
2231
2232   /* FIXME: cleanup */
2233 }
2234
2235
2236 /**
2237  * Incoming message from a master or slave client.
2238  */
2239 static void
2240 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2241                           const struct GNUNET_MessageHeader *msg)
2242 {
2243   struct Channel *
2244     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2245   GNUNET_assert (NULL != chn);
2246
2247   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2248               "%p Received message from client.\n", chn);
2249   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2250
2251   if (GNUNET_YES != chn->is_ready)
2252   {
2253     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2254                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2255     GNUNET_break (0);
2256     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2257     return;
2258   }
2259
2260   uint16_t size = ntohs (msg->size);
2261   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2262   {
2263     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2264                 "%p Message payload too large: %u < %u.\n",
2265                 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2266     GNUNET_break (0);
2267     transmit_cancel (chn, client);
2268     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2269     return;
2270   }
2271
2272   uint16_t first_ptype = 0, last_ptype = 0;
2273   if (GNUNET_SYSERR
2274       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2275                                           (const char *) &msg[1],
2276                                           &first_ptype, &last_ptype))
2277   {
2278     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2279                 "%p Received invalid message part from client.\n", chn);
2280     GNUNET_break (0);
2281     transmit_cancel (chn, client);
2282     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2283     return;
2284   }
2285   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2286               "%p Received message with first part type %u and last part type %u.\n",
2287               chn, first_ptype, last_ptype);
2288
2289   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2290                  first_ptype, last_ptype);
2291   transmit_message (chn);
2292   /* FIXME: send a few ACKs even before transmit_notify is called */
2293
2294   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2295 };
2296
2297
2298 /**
2299  * Received result of GNUNET_PSYCSTORE_membership_store()
2300  */
2301 static void
2302 store_recv_membership_store_result (void *cls, int64_t result,
2303                                     const char *err_msg, uint16_t err_msg_size)
2304 {
2305   struct Operation *op = cls;
2306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2307               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2308               op->chn, result, err_msg_size, err_msg);
2309
2310   if (NULL != op->client)
2311     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2312   op_remove (op);
2313 }
2314
2315
2316 /**
2317  * Client requests to add/remove a slave in the membership database.
2318  */
2319 static void
2320 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2321                               const struct GNUNET_MessageHeader *msg)
2322 {
2323   struct Channel *
2324     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2325   GNUNET_assert (NULL != chn);
2326
2327   const struct ChannelMembershipStoreRequest *
2328     req = (const struct ChannelMembershipStoreRequest *) msg;
2329
2330   struct Operation *op = op_add (chn, client, req->op_id, 0);
2331
2332   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2333   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2335               "%p Received membership store request from client.\n", chn);
2336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2337               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2338               chn, req->did_join, announced_at, effective_since);
2339
2340   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2341                                      req->did_join, announced_at, effective_since,
2342                                      0, /* FIXME: group_generation */
2343                                      &store_recv_membership_store_result, op);
2344   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2345 }
2346
2347
2348 /**
2349  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2350  * in response to a history request from a client.
2351  */
2352 static int
2353 store_recv_fragment_history (void *cls,
2354                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2355                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2356 {
2357   struct Operation *op = cls;
2358   if (NULL == op->client)
2359   { /* Requesting client already disconnected. */
2360     return GNUNET_NO;
2361   }
2362   struct Channel *chn = op->chn;
2363
2364   struct GNUNET_PSYC_MessageHeader *pmsg;
2365   uint16_t msize = ntohs (mmsg->header.size);
2366   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2367
2368   struct GNUNET_OperationResultMessage *
2369     res = GNUNET_malloc (sizeof (*res) + psize);
2370   res->header.size = htons (sizeof (*res) + psize);
2371   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2372   res->op_id = op->op_id;
2373   res->result_code = GNUNET_htonll (GNUNET_OK);
2374
2375   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2376   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2377   memcpy (&res[1], pmsg, psize);
2378
2379   /** @todo FIXME: send only to requesting client */
2380   client_send_msg (chn, &res->header);
2381   return GNUNET_YES;
2382 }
2383
2384
2385 /**
2386  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2387  * in response to a history request from a client.
2388  */
2389 static void
2390 store_recv_fragment_history_result (void *cls, int64_t result,
2391                                     const char *err_msg, uint16_t err_msg_size)
2392 {
2393   struct Operation *op = cls;
2394   if (NULL == op->client)
2395   { /* Requesting client already disconnected. */
2396     return;
2397   }
2398
2399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2400               "%p History replay #%" PRIu64 ": "
2401               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2402               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2403
2404   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2405   {
2406     /** @todo Multicast replay request for messages not found locally. */
2407   }
2408
2409   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2410   op_remove (op);
2411 }
2412
2413
2414 /**
2415  * Client requests channel history.
2416  */
2417 static void
2418 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2419                             const struct GNUNET_MessageHeader *msg)
2420 {
2421   struct Channel *
2422     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2423   GNUNET_assert (NULL != chn);
2424
2425   const struct GNUNET_PSYC_HistoryRequestMessage *
2426     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2427   uint16_t size = ntohs (msg->size);
2428   const char *method_prefix = (const char *) &req[1];
2429
2430   if (size < sizeof (*req) + 1
2431       || '\0' != method_prefix[size - sizeof (*req) - 1])
2432   {
2433     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2434                 "%p History replay #%" PRIu64 ": "
2435                 "invalid method prefix. size: %u < %u?\n",
2436                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2437     GNUNET_break (0);
2438     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2439     return;
2440   }
2441
2442   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2443
2444   if (0 == req->message_limit)
2445     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2446                                   GNUNET_ntohll (req->start_message_id),
2447                                   GNUNET_ntohll (req->end_message_id),
2448                                   0, method_prefix,
2449                                   &store_recv_fragment_history,
2450                                   &store_recv_fragment_history_result, op);
2451   else
2452     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2453                                          GNUNET_ntohll (req->message_limit),
2454                                          method_prefix,
2455                                          &store_recv_fragment_history,
2456                                          &store_recv_fragment_history_result,
2457                                          op);
2458
2459   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2460 }
2461
2462
2463 /**
2464  * Received state var from PSYCstore, send it to client.
2465  */
2466 static int
2467 store_recv_state_var (void *cls, const char *name,
2468                       const void *value, uint32_t value_size)
2469 {
2470   struct Operation *op = cls;
2471   struct GNUNET_OperationResultMessage *res;
2472
2473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2474               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2475               op->chn, GNUNET_ntohll (op->op_id), name);
2476
2477   if (NULL != name) /* First part */
2478   {
2479     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2480     struct GNUNET_PSYC_MessageModifier *mod;
2481     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2482     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2483     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2484     res->op_id = op->op_id;
2485
2486     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2487     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2488     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2489     mod->name_size = htons (name_size);
2490     mod->value_size = htonl (value_size);
2491     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2492     memcpy (&mod[1], name, name_size);
2493     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2494   }
2495   else /* Continuation */
2496   {
2497     struct GNUNET_MessageHeader *mod;
2498     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2499     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2500     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2501     res->op_id = op->op_id;
2502
2503     mod = (struct GNUNET_MessageHeader *) &res[1];
2504     mod->size = htons (sizeof (*mod) + value_size);
2505     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2506     memcpy (&mod[1], value, value_size);
2507   }
2508
2509   // FIXME: client might have been disconnected
2510   GNUNET_SERVER_notification_context_add (nc, op->client);
2511   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2512                                               GNUNET_NO);
2513   return GNUNET_YES;
2514 }
2515
2516
2517 /**
2518  * Received result of GNUNET_PSYCSTORE_state_get()
2519  * or GNUNET_PSYCSTORE_state_get_prefix()
2520  */
2521 static void
2522 store_recv_state_result (void *cls, int64_t result,
2523                          const char *err_msg, uint16_t err_msg_size)
2524 {
2525   struct Operation *op = cls;
2526   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2527               "%p state_get #%" PRIu64 ": "
2528               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2529               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2530
2531   // FIXME: client might have been disconnected
2532   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2533   op_remove (op);
2534 }
2535
2536
2537 /**
2538  * Client requests best matching state variable from PSYCstore.
2539  */
2540 static void
2541 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2542                        const struct GNUNET_MessageHeader *msg)
2543 {
2544   struct Channel *
2545     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2546   GNUNET_assert (NULL != chn);
2547
2548   const struct StateRequest *
2549     req = (const struct StateRequest *) msg;
2550
2551   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2552   const char *name = (const char *) &req[1];
2553   if (0 == name_size || '\0' != name[name_size - 1])
2554   {
2555     GNUNET_break (0);
2556     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2557     return;
2558   }
2559
2560   struct Operation *op = op_add (chn, client, req->op_id, 0);
2561   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2562                               &store_recv_state_var,
2563                               &store_recv_state_result, op);
2564   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2565 }
2566
2567
2568 /**
2569  * Client requests state variables with a given prefix from PSYCstore.
2570  */
2571 static void
2572 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2573                               const struct GNUNET_MessageHeader *msg)
2574 {
2575   struct Channel *
2576     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2577   GNUNET_assert (NULL != chn);
2578
2579   const struct StateRequest *
2580     req = (const struct StateRequest *) msg;
2581
2582   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2583   const char *name = (const char *) &req[1];
2584   if (0 == name_size || '\0' != name[name_size - 1])
2585   {
2586     GNUNET_break (0);
2587     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2588     return;
2589   }
2590
2591   struct Operation *op = op_add (chn, client, req->op_id, 0);
2592   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2593                                      &store_recv_state_var,
2594                                      &store_recv_state_result, op);
2595   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2596 }
2597
2598
2599 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2600   { &client_recv_master_start, NULL,
2601     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2602
2603   { &client_recv_slave_join, NULL,
2604     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2605
2606   { &client_recv_join_decision, NULL,
2607     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2608
2609   { &client_recv_psyc_message, NULL,
2610     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2611
2612   { &client_recv_membership_store, NULL,
2613     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2614
2615   { &client_recv_history_replay, NULL,
2616     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2617
2618   { &client_recv_state_get, NULL,
2619     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2620
2621   { &client_recv_state_get_prefix, NULL,
2622     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2623
2624   { NULL, NULL, 0, 0 }
2625 };
2626
2627
2628 /**
2629  * Initialize the PSYC service.
2630  *
2631  * @param cls Closure.
2632  * @param server The initialized server.
2633  * @param c Configuration to use.
2634  */
2635 static void
2636 run (void *cls, struct GNUNET_SERVER_Handle *server,
2637      const struct GNUNET_CONFIGURATION_Handle *c)
2638 {
2639   cfg = c;
2640   store = GNUNET_PSYCSTORE_connect (cfg);
2641   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2642   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2643   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2644   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2645   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2646   nc = GNUNET_SERVER_notification_context_create (server, 1);
2647   GNUNET_SERVER_add_handlers (server, server_handlers);
2648   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2649   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2650                                 &shutdown_task, NULL);
2651 }
2652
2653
2654 /**
2655  * The main function for the service.
2656  *
2657  * @param argc number of arguments from the command line
2658  * @param argv command line arguments
2659  * @return 0 ok, 1 on error
2660  */
2661 int
2662 main (int argc, char *const *argv)
2663 {
2664   return (GNUNET_OK ==
2665           GNUNET_SERVICE_run (argc, argv, "psyc",
2666                               GNUNET_SERVICE_OPTION_NONE,
2667                               &run, NULL)) ? 0 : 1;
2668 }
2669
2670 /* end of gnunet-service-psyc.c */