multicast: removed membership test callback as clients check that already during...
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /**
111    * @see enum MessageState
112    */
113   uint8_t state;
114
115   /**
116    * Whether a message ACK has already been sent to the client.
117    * #GNUNET_YES or #GNUNET_NO
118    */
119   uint8_t ack_sent;
120
121   /* Followed by message */
122 };
123
124
125 /**
126  * Cache for received message fragments.
127  * Message fragments are only sent to clients after all modifiers arrived.
128  *
129  * chan_key -> MultiHashMap chan_msgs
130  */
131 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
132
133
134 /**
135  * Entry in the chan_msgs hashmap of @a recv_cache:
136  * fragment_id -> RecvCacheEntry
137  */
138 struct RecvCacheEntry
139 {
140   struct GNUNET_MULTICAST_MessageHeader *mmsg;
141   uint16_t ref_count;
142 };
143
144
145 /**
146  * Entry in the @a recv_frags hash map of a @a Channel.
147  * message_id -> FragmentQueue
148  */
149 struct FragmentQueue
150 {
151   /**
152    * Fragment IDs stored in @a recv_cache.
153    */
154   struct GNUNET_CONTAINER_Heap *fragments;
155
156   /**
157    * Total size of received fragments.
158    */
159   uint64_t size;
160
161   /**
162    * Total size of received header fragments (METHOD & MODIFIERs)
163    */
164   uint64_t header_size;
165
166   /**
167    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
168    */
169   uint64_t state_delta;
170
171   /**
172    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
173    */
174   uint32_t flags;
175
176   /**
177    * Receive state of message.
178    *
179    * @see MessageFragmentState
180    */
181   uint8_t state;
182
183   /**
184    * Whether the state is already modified in PSYCstore.
185    */
186   uint8_t state_is_modified;
187
188   /**
189    * Is the message queued for delivery to the client?
190    * i.e. added to the recv_msgs queue
191    */
192   uint8_t is_queued;
193 };
194
195
196 /**
197  * List of connected clients.
198  */
199 struct Client
200 {
201   struct Client *prev;
202   struct Client *next;
203
204   struct GNUNET_SERVER_Client *client;
205 };
206
207
208 struct Operation
209 {
210   struct Operation *prev;
211   struct Operation *next;
212
213   struct GNUNET_SERVER_Client *client;
214   struct Channel *chn;
215   uint64_t op_id;
216   uint32_t flags;
217 };
218
219
220 /**
221  * Common part of the client context for both a channel master and slave.
222  */
223 struct Channel
224 {
225   struct Client *clients_head;
226   struct Client *clients_tail;
227
228   struct Operation *op_head;
229   struct Operation *op_tail;
230
231   struct TransmitMessage *tmit_head;
232   struct TransmitMessage *tmit_tail;
233
234   /**
235    * Current PSYCstore operation.
236    */
237   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
238
239   /**
240    * Received fragments not yet sent to the client.
241    * message_id -> FragmentQueue
242    */
243   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
244
245   /**
246    * Received message IDs not yet sent to the client.
247    */
248   struct GNUNET_CONTAINER_Heap *recv_msgs;
249
250   /**
251    * Public key of the channel.
252    */
253   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
254
255   /**
256    * Hash of @a pub_key.
257    */
258   struct GNUNET_HashCode pub_key_hash;
259
260   /**
261    * Last message ID sent to the client.
262    * 0 if there is no such message.
263    */
264   uint64_t max_message_id;
265
266   /**
267    * ID of the last stateful message, where the state operations has been
268    * processed and saved to PSYCstore and which has been sent to the client.
269    * 0 if there is no such message.
270    */
271   uint64_t max_state_message_id;
272
273   /**
274    * Expected value size for the modifier being received from the PSYC service.
275    */
276   uint32_t tmit_mod_value_size_expected;
277
278   /**
279    * Actual value size for the modifier being received from the PSYC service.
280    */
281   uint32_t tmit_mod_value_size;
282
283   /**
284    * @see enum MessageState
285    */
286   uint8_t tmit_state;
287
288   /**
289    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
290    */
291   uint8_t is_master;
292
293   /**
294    * Is this channel ready to receive messages from client?
295    * #GNUNET_YES or #GNUNET_NO
296    */
297   uint8_t is_ready;
298
299   /**
300    * Is the client disconnected?
301    * #GNUNET_YES or #GNUNET_NO
302    */
303   uint8_t is_disconnected;
304 };
305
306
307 /**
308  * Client context for a channel master.
309  */
310 struct Master
311 {
312   /**
313    * Channel struct common for Master and Slave
314    */
315   struct Channel chn;
316
317   /**
318    * Private key of the channel.
319    */
320   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
321
322   /**
323    * Handle for the multicast origin.
324    */
325   struct GNUNET_MULTICAST_Origin *origin;
326
327   /**
328    * Transmit handle for multicast.
329    */
330   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
331
332   /**
333    * Incoming join requests from multicast.
334    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
335    */
336   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
337
338   /**
339    * Last message ID transmitted to this channel.
340    *
341    * Incremented before sending a message, thus the message_id in messages sent
342    * starts from 1.
343    */
344   uint64_t max_message_id;
345
346   /**
347    * ID of the last message with state operations transmitted to the channel.
348    * 0 if there is no such message.
349    */
350   uint64_t max_state_message_id;
351
352   /**
353    * Maximum group generation transmitted to the channel.
354    */
355   uint64_t max_group_generation;
356
357   /**
358    * @see enum GNUNET_PSYC_Policy
359    */
360   enum GNUNET_PSYC_Policy policy;
361 };
362
363
364 /**
365  * Client context for a channel slave.
366  */
367 struct Slave
368 {
369   /**
370    * Channel struct common for Master and Slave
371    */
372   struct Channel chn;
373
374   /**
375    * Private key of the slave.
376    */
377   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
378
379   /**
380    * Public key of the slave.
381    */
382   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
383
384   /**
385    * Hash of @a pub_key.
386    */
387   struct GNUNET_HashCode pub_key_hash;
388
389   /**
390    * Handle for the multicast member.
391    */
392   struct GNUNET_MULTICAST_Member *member;
393
394   /**
395    * Transmit handle for multicast.
396    */
397   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
398
399   /**
400    * Peer identity of the origin.
401    */
402   struct GNUNET_PeerIdentity origin;
403
404   /**
405    * Number of items in @a relays.
406    */
407   uint32_t relay_count;
408
409   /**
410    * Relays that multicast can use to connect.
411    */
412   struct GNUNET_PeerIdentity *relays;
413
414   /**
415    * Join request to be transmitted to the master on join.
416    */
417   struct GNUNET_PSYC_Message *join_msg;
418
419   /**
420    * Join decision received from multicast.
421    */
422   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
423
424   /**
425    * Maximum request ID for this channel.
426    */
427   uint64_t max_request_id;
428 };
429
430
431 static void
432 transmit_message (struct Channel *chn);
433
434 static uint64_t
435 message_queue_run (struct Channel *chn);
436
437 static uint64_t
438 message_queue_drop (struct Channel *chn);
439
440
441 /**
442  * Task run during shutdown.
443  *
444  * @param cls unused
445  * @param tc unused
446  */
447 static void
448 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
449 {
450   if (NULL != nc)
451   {
452     GNUNET_SERVER_notification_context_destroy (nc);
453     nc = NULL;
454   }
455   if (NULL != stats)
456   {
457     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
458     stats = NULL;
459   }
460 }
461
462
463 static struct Operation *
464 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
465         uint64_t op_id, uint32_t flags)
466 {
467   struct Operation *op = GNUNET_malloc (sizeof (*op));
468   op->client = client;
469   op->chn = chn;
470   op->op_id = op_id;
471   op->flags = flags;
472   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
473   return op;
474 }
475
476
477 static void
478 op_remove (struct Operation *op)
479 {
480   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
481   GNUNET_free (op);
482 }
483
484
485 /**
486  * Clean up master data structures after a client disconnected.
487  */
488 static void
489 cleanup_master (struct Master *mst)
490 {
491   struct Channel *chn = &mst->chn;
492
493   if (NULL != mst->origin)
494     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
495   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
496   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
497 }
498
499
500 /**
501  * Clean up slave data structures after a client disconnected.
502  */
503 static void
504 cleanup_slave (struct Slave *slv)
505 {
506   struct Channel *chn = &slv->chn;
507   struct GNUNET_CONTAINER_MultiHashMap *
508     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
509                                                 &chn->pub_key_hash);
510   GNUNET_assert (NULL != chn_slv);
511   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
512
513   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
514   {
515     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
516                                           chn_slv);
517     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
518   }
519   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
520
521   if (NULL != slv->join_msg)
522   {
523     GNUNET_free (slv->join_msg);
524     slv->join_msg = NULL;
525   }
526   if (NULL != slv->relays)
527   {
528     GNUNET_free (slv->relays);
529     slv->relays = NULL;
530   }
531   if (NULL != slv->member)
532   {
533     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
534     slv->member = NULL;
535   }
536   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
537 }
538
539
540 /**
541  * Clean up channel data structures after a client disconnected.
542  */
543 static void
544 cleanup_channel (struct Channel *chn)
545 {
546   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547               "%p Cleaning up channel %s. master? %u\n",
548               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
549   message_queue_drop (chn);
550   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
551   chn->recv_frags = NULL;
552
553   if (NULL != chn->store_op)
554   {
555     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
556     chn->store_op = NULL;
557   }
558
559   (GNUNET_YES == chn->is_master)
560     ? cleanup_master ((struct Master *) chn)
561     : cleanup_slave ((struct Slave *) chn);
562   GNUNET_free (chn);
563 }
564
565
566 /**
567  * Called whenever a client is disconnected.
568  * Frees our resources associated with that client.
569  *
570  * @param cls Closure.
571  * @param client Identification of the client.
572  */
573 static void
574 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
575 {
576   if (NULL == client)
577     return;
578
579   struct Channel *
580     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
581
582   if (NULL == chn)
583   {
584     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
585                 "%p User context is NULL in client_disconnect()\n", chn);
586     GNUNET_break (0);
587     return;
588   }
589
590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591               "%p Client (%s) disconnected from channel %s\n",
592               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
593               GNUNET_h2s (&chn->pub_key_hash));
594
595   struct Client *cli = chn->clients_head;
596   while (NULL != cli)
597   {
598     if (cli->client == client)
599     {
600       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
601       GNUNET_free (cli);
602       break;
603     }
604     cli = cli->next;
605   }
606
607   struct Operation *op = chn->op_head;
608   while (NULL != op)
609   {
610     if (op->client == client)
611     {
612       op->client = NULL;
613       break;
614     }
615     op = op->next;
616   }
617
618   if (NULL == chn->clients_head)
619   { /* Last client disconnected. */
620     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621                 "%p Last client (%s) disconnected from channel %s\n",
622                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
623                 GNUNET_h2s (&chn->pub_key_hash));
624     chn->is_disconnected = GNUNET_YES;
625     if (NULL != chn->tmit_head)
626     { /* Send pending messages to multicast before cleanup. */
627       transmit_message (chn);
628     }
629     else
630     {
631       cleanup_channel (chn);
632     }
633   }
634 }
635
636
637 /**
638  * Send message to all clients connected to the channel.
639  */
640 static void
641 client_send_msg (const struct Channel *chn,
642                  const struct GNUNET_MessageHeader *msg)
643 {
644   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
645               "%p Sending message to clients.\n", chn);
646
647   struct Client *cli = chn->clients_head;
648   while (NULL != cli)
649   {
650     GNUNET_SERVER_notification_context_add (nc, cli->client);
651     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
652     cli = cli->next;
653   }
654 }
655
656
657 /**
658  * Send a result code back to the client.
659  *
660  * @param client
661  *        Client that should receive the result code.
662  * @param result_code
663  *        Code to transmit.
664  * @param op_id
665  *        Operation ID in network byte order.
666  * @param data
667  *        Data payload or NULL.
668  * @param data_size
669  *        Size of @a data.
670  */
671 static void
672 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
673                     int64_t result_code, const void *data, uint16_t data_size)
674 {
675   struct GNUNET_OperationResultMessage *res;
676
677   res = GNUNET_malloc (sizeof (*res) + data_size);
678   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
679   res->header.size = htons (sizeof (*res) + data_size);
680   res->result_code = GNUNET_htonll (result_code);
681   res->op_id = op_id;
682   if (0 < data_size)
683     memcpy (&res[1], data, data_size);
684
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686               "%p Sending result to client for operation #%" PRIu64 ": "
687               "%" PRId64 " (size: %u)\n",
688               client, GNUNET_ntohll (op_id), result_code, data_size);
689
690   GNUNET_SERVER_notification_context_add (nc, client);
691   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
692                                               GNUNET_NO);
693   GNUNET_free (res);
694 }
695
696
697 /**
698  * Closure for join_mem_test_cb()
699  */
700 struct JoinMemTestClosure
701 {
702   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
703   struct Channel *chn;
704   struct GNUNET_MULTICAST_JoinHandle *jh;
705   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
706 };
707
708
709 /**
710  * Membership test result callback used for join requests.
711  */
712 static void
713 join_mem_test_cb (void *cls, int64_t result,
714                   const char *err_msg, uint16_t err_msg_size)
715 {
716   struct JoinMemTestClosure *jcls = cls;
717
718   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
719   { /* Pass on join request to client if this is a master channel */
720     struct Master *mst = (struct Master *) jcls->chn;
721     struct GNUNET_HashCode slave_key_hash;
722     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
723                         &slave_key_hash);
724     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
725                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
726     client_send_msg (jcls->chn, &jcls->join_msg->header);
727   }
728   else
729   {
730     if (GNUNET_SYSERR == result)
731     {
732       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
733                   "Could not perform membership test (%.*s)\n",
734                   err_msg_size, err_msg);
735     }
736     // FIXME: add relays
737     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
738   }
739   GNUNET_free (jcls->join_msg);
740   GNUNET_free (jcls);
741 }
742
743
744 /**
745  * Incoming join request from multicast.
746  */
747 static void
748 mcast_recv_join_request (void *cls,
749                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
750                          const struct GNUNET_MessageHeader *join_msg,
751                          struct GNUNET_MULTICAST_JoinHandle *jh)
752 {
753   struct Channel *chn = cls;
754   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
755
756   uint16_t join_msg_size = 0;
757   if (NULL != join_msg)
758   {
759     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
760     {
761       join_msg_size = ntohs (join_msg->size);
762     }
763     else
764     {
765       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
766                   "%p Got join message with invalid type %u.\n",
767                   chn, ntohs (join_msg->type));
768     }
769   }
770
771   struct GNUNET_PSYC_JoinRequestMessage *
772     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
773   req->header.size = htons (sizeof (*req) + join_msg_size);
774   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
775   req->slave_key = *slave_key;
776   if (0 < join_msg_size)
777     memcpy (&req[1], join_msg, join_msg_size);
778
779   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
780   jcls->slave_key = *slave_key;
781   jcls->chn = chn;
782   jcls->jh = jh;
783   jcls->join_msg = req;
784
785   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
786                                     chn->max_message_id, 0,
787                                     &join_mem_test_cb, jcls);
788 }
789
790
791 /**
792  * Join decision received from multicast.
793  */
794 static void
795 mcast_recv_join_decision (void *cls, int is_admitted,
796                           const struct GNUNET_PeerIdentity *peer,
797                           uint16_t relay_count,
798                           const struct GNUNET_PeerIdentity *relays,
799                           const struct GNUNET_MessageHeader *join_resp)
800 {
801   struct Slave *slv = cls;
802   struct Channel *chn = &slv->chn;
803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804               "%p Got join decision: %d\n", slv, is_admitted);
805   if (GNUNET_YES == chn->is_ready)
806   {
807     /* Already admitted */
808     return;
809   }
810
811   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
812   struct GNUNET_PSYC_JoinDecisionMessage *
813     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
814   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
815   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
816   dcsn->is_admitted = htonl (is_admitted);
817   if (0 < join_resp_size)
818     memcpy (&dcsn[1], join_resp, join_resp_size);
819
820   client_send_msg (chn, &dcsn->header);
821
822   if (GNUNET_YES == is_admitted)
823   {
824     chn->is_ready = GNUNET_YES;
825   }
826 }
827
828
829 static int
830 store_recv_fragment_replay (void *cls,
831                             struct GNUNET_MULTICAST_MessageHeader *msg,
832                             enum GNUNET_PSYCSTORE_MessageFlags flags)
833 {
834   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
835
836   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
837   return GNUNET_YES;
838 }
839
840
841 /**
842  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
843  */
844 static void
845 store_recv_fragment_replay_result (void *cls, int64_t result,
846                                    const char *err_msg, uint16_t err_msg_size)
847 {
848   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
851               rh, result, err_msg_size, err_msg);
852
853   switch (result)
854   {
855   case GNUNET_YES:
856     break;
857
858   case GNUNET_NO:
859     GNUNET_MULTICAST_replay_response (rh, NULL,
860                                       GNUNET_MULTICAST_REC_NOT_FOUND);
861     break;
862
863   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
864     GNUNET_MULTICAST_replay_response (rh, NULL,
865                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
866     break;
867
868   case GNUNET_SYSERR:
869     GNUNET_MULTICAST_replay_response (rh, NULL,
870                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
871     break;
872   }
873   GNUNET_MULTICAST_replay_response_end (rh);
874 }
875
876
877 /**
878  * Incoming fragment replay request from multicast.
879  */
880 static void
881 mcast_recv_replay_fragment (void *cls,
882                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
883                             uint64_t fragment_id, uint64_t flags,
884                             struct GNUNET_MULTICAST_ReplayHandle *rh)
885
886 {
887   struct Channel *chn = cls;
888   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
889                                  fragment_id, fragment_id,
890                                  &store_recv_fragment_replay,
891                                  &store_recv_fragment_replay_result, rh);
892 }
893
894
895 /**
896  * Incoming message replay request from multicast.
897  */
898 static void
899 mcast_recv_replay_message (void *cls,
900                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
901                            uint64_t message_id,
902                            uint64_t fragment_offset,
903                            uint64_t flags,
904                            struct GNUNET_MULTICAST_ReplayHandle *rh)
905 {
906   struct Channel *chn = cls;
907   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
908                                 message_id, message_id, 1, NULL,
909                                 &store_recv_fragment_replay,
910                                 &store_recv_fragment_replay_result, rh);
911 }
912
913
914 /**
915  * Convert an uint64_t in network byte order to a HashCode
916  * that can be used as key in a MultiHashMap
917  */
918 static inline void
919 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
920 {
921   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
922   /* TODO: use built-in byte swap functions if available */
923
924   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
925   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
926
927   *key = (struct GNUNET_HashCode) {};
928   *((uint64_t *) key)
929     = (n << 32) | (n >> 32);
930 }
931
932
933 /**
934  * Convert an uint64_t in host byte order to a HashCode
935  * that can be used as key in a MultiHashMap
936  */
937 static inline void
938 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
939 {
940 #if __BYTE_ORDER == __BIG_ENDIAN
941   hash_key_from_nll (key, n);
942 #elif __BYTE_ORDER == __LITTLE_ENDIAN
943   *key = (struct GNUNET_HashCode) {};
944   *((uint64_t *) key) = n;
945 #else
946   #error byteorder undefined
947 #endif
948 }
949
950
951 /**
952  * Initialize PSYC message header.
953  */
954 static inline void
955 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
956                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
957 {
958   uint16_t size = ntohs (mmsg->header.size);
959   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
960
961   pmsg->header.size = htons (psize);
962   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
963   pmsg->message_id = mmsg->message_id;
964   pmsg->fragment_offset = mmsg->fragment_offset;
965   pmsg->flags = htonl (flags);
966
967   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
968 }
969
970
971 /**
972  * Create a new PSYC message from a multicast message for sending it to clients.
973  */
974 static inline struct GNUNET_PSYC_MessageHeader *
975 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
976 {
977   struct GNUNET_PSYC_MessageHeader *pmsg;
978   uint16_t size = ntohs (mmsg->header.size);
979   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
980
981   pmsg = GNUNET_malloc (psize);
982   psyc_msg_init (pmsg, mmsg, flags);
983   return pmsg;
984 }
985
986
987 /**
988  * Send multicast message to all clients connected to the channel.
989  */
990 static void
991 client_send_mcast_msg (struct Channel *chn,
992                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
993                        uint32_t flags)
994 {
995   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996               "%p Sending multicast message to client. "
997               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
998               chn, GNUNET_ntohll (mmsg->fragment_id),
999               GNUNET_ntohll (mmsg->message_id));
1000
1001   struct GNUNET_PSYC_MessageHeader *
1002     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1003   client_send_msg (chn, &pmsg->header);
1004   GNUNET_free (pmsg);
1005 }
1006
1007
1008 /**
1009  * Send multicast request to all clients connected to the channel.
1010  */
1011 static void
1012 client_send_mcast_req (struct Master *mst,
1013                        const struct GNUNET_MULTICAST_RequestHeader *req)
1014 {
1015   struct Channel *chn = &mst->chn;
1016
1017   struct GNUNET_PSYC_MessageHeader *pmsg;
1018   uint16_t size = ntohs (req->header.size);
1019   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1020
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               "%p Sending multicast request to client. "
1023               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1024               chn, GNUNET_ntohll (req->fragment_id),
1025               GNUNET_ntohll (req->request_id));
1026
1027   pmsg = GNUNET_malloc (psize);
1028   pmsg->header.size = htons (psize);
1029   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1030   pmsg->message_id = req->request_id;
1031   pmsg->fragment_offset = req->fragment_offset;
1032   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1033   pmsg->slave_key = req->member_key;
1034
1035   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1036   client_send_msg (chn, &pmsg->header);
1037   GNUNET_free (pmsg);
1038 }
1039
1040
1041 /**
1042  * Insert a multicast message fragment into the queue belonging to the message.
1043  *
1044  * @param chn          Channel.
1045  * @param mmsg         Multicast message fragment.
1046  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1047  * @param first_ptype  First PSYC message part type in @a mmsg.
1048  * @param last_ptype   Last PSYC message part type in @a mmsg.
1049  */
1050 static void
1051 fragment_queue_insert (struct Channel *chn,
1052                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1053                        uint16_t first_ptype, uint16_t last_ptype)
1054 {
1055   const uint16_t size = ntohs (mmsg->header.size);
1056   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1057   struct GNUNET_CONTAINER_MultiHashMap
1058     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1059                                                     &chn->pub_key_hash);
1060
1061   struct GNUNET_HashCode msg_id_hash;
1062   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1063
1064   struct FragmentQueue
1065     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1066
1067   if (NULL == fragq)
1068   {
1069     fragq = GNUNET_new (struct FragmentQueue);
1070     fragq->state = MSG_FRAG_STATE_HEADER;
1071     fragq->fragments
1072       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1073
1074     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1075                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1076
1077     if (NULL == chan_msgs)
1078     {
1079       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1080       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1081                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1082     }
1083   }
1084
1085   struct GNUNET_HashCode frag_id_hash;
1086   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1087   struct RecvCacheEntry
1088     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1089   if (NULL == cache_entry)
1090   {
1091     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092                 "%p Adding message fragment to cache. "
1093                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1094                 chn, GNUNET_ntohll (mmsg->message_id),
1095                 GNUNET_ntohll (mmsg->fragment_id));
1096     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097                 "%p header_size: %" PRIu64 " + %u\n",
1098                 chn, fragq->header_size, size);
1099     cache_entry = GNUNET_new (struct RecvCacheEntry);
1100     cache_entry->ref_count = 1;
1101     cache_entry->mmsg = GNUNET_malloc (size);
1102     memcpy (cache_entry->mmsg, mmsg, size);
1103     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1104                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1105   }
1106   else
1107   {
1108     cache_entry->ref_count++;
1109     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110                 "%p Message fragment is already in cache. "
1111                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1112                 ", ref_count: %u\n",
1113                 chn, GNUNET_ntohll (mmsg->message_id),
1114                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1115   }
1116
1117   if (MSG_FRAG_STATE_HEADER == fragq->state)
1118   {
1119     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1120     {
1121       struct GNUNET_PSYC_MessageMethod *
1122         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1123       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1124       fragq->flags = ntohl (pmeth->flags);
1125     }
1126
1127     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1128     {
1129       fragq->header_size += size;
1130     }
1131     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1132              || frag_offset == fragq->header_size)
1133     { /* header is now complete */
1134       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1135                   "%p Header of message %" PRIu64 " is complete.\n",
1136                   chn, GNUNET_ntohll (mmsg->message_id));
1137
1138       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1139                   "%p Adding message %" PRIu64 " to queue.\n",
1140                   chn, GNUNET_ntohll (mmsg->message_id));
1141       fragq->state = MSG_FRAG_STATE_DATA;
1142     }
1143     else
1144     {
1145       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1146                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1147                   "%" PRIu64 " != %" PRIu64 "\n",
1148                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1149                   fragq->header_size);
1150     }
1151   }
1152
1153   switch (last_ptype)
1154   {
1155   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1156     if (frag_offset == fragq->size)
1157       fragq->state = MSG_FRAG_STATE_END;
1158     else
1159       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1160                   "%p Message %" PRIu64 " is NOT complete yet: "
1161                   "%" PRIu64 " != %" PRIu64 "\n",
1162                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1163                   fragq->size);
1164     break;
1165
1166   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1167     /* Drop message without delivering to client if it's a single fragment */
1168     fragq->state =
1169       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1170       ? MSG_FRAG_STATE_DROP
1171       : MSG_FRAG_STATE_CANCEL;
1172   }
1173
1174   switch (fragq->state)
1175   {
1176   case MSG_FRAG_STATE_DATA:
1177   case MSG_FRAG_STATE_END:
1178   case MSG_FRAG_STATE_CANCEL:
1179     if (GNUNET_NO == fragq->is_queued)
1180     {
1181       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1182                                     GNUNET_ntohll (mmsg->message_id));
1183       fragq->is_queued = GNUNET_YES;
1184     }
1185   }
1186
1187   fragq->size += size;
1188   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1189                                 GNUNET_ntohll (mmsg->fragment_id));
1190 }
1191
1192
1193 /**
1194  * Run fragment queue of a message.
1195  *
1196  * Send fragments of a message in order to client, after all modifiers arrived
1197  * from multicast.
1198  *
1199  * @param chn      Channel.
1200  * @param msg_id  ID of the message @a fragq belongs to.
1201  * @param fragq   Fragment queue of the message.
1202  * @param drop    Drop message without delivering to client?
1203  *                #GNUNET_YES or #GNUNET_NO.
1204  */
1205 static void
1206 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1207                     struct FragmentQueue *fragq, uint8_t drop)
1208 {
1209   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1210               "%p Running message fragment queue for message %" PRIu64
1211               " (state: %u).\n",
1212               chn, msg_id, fragq->state);
1213
1214   struct GNUNET_CONTAINER_MultiHashMap
1215     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1216                                                     &chn->pub_key_hash);
1217   GNUNET_assert (NULL != chan_msgs);
1218   uint64_t frag_id;
1219
1220   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1221                                                     &frag_id))
1222   {
1223     struct GNUNET_HashCode frag_id_hash;
1224     hash_key_from_hll (&frag_id_hash, frag_id);
1225     struct RecvCacheEntry *cache_entry
1226       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1227     if (cache_entry != NULL)
1228     {
1229       if (GNUNET_NO == drop)
1230       {
1231         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1232       }
1233       if (cache_entry->ref_count <= 1)
1234       {
1235         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1236                                               cache_entry);
1237         GNUNET_free (cache_entry->mmsg);
1238         GNUNET_free (cache_entry);
1239       }
1240       else
1241       {
1242         cache_entry->ref_count--;
1243       }
1244     }
1245 #if CACHE_AGING_IMPLEMENTED
1246     else if (GNUNET_NO == drop)
1247     {
1248       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1249     }
1250 #endif
1251
1252     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1253   }
1254
1255   if (MSG_FRAG_STATE_END <= fragq->state)
1256   {
1257     struct GNUNET_HashCode msg_id_hash;
1258     hash_key_from_hll (&msg_id_hash, msg_id);
1259
1260     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1261     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1262     GNUNET_free (fragq);
1263   }
1264   else
1265   {
1266     fragq->is_queued = GNUNET_NO;
1267   }
1268 }
1269
1270
1271 struct StateModifyClosure
1272 {
1273   struct Channel *chn;
1274   uint64_t msg_id;
1275   struct GNUNET_HashCode msg_id_hash;
1276 };
1277
1278
1279 void
1280 store_recv_state_modify_result (void *cls, int64_t result,
1281                                 const char *err_msg, uint16_t err_msg_size)
1282 {
1283   struct StateModifyClosure *mcls = cls;
1284   struct Channel *chn = mcls->chn;
1285   uint64_t msg_id = mcls->msg_id;
1286
1287   struct FragmentQueue *
1288     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1289
1290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1292               chn, result, err_msg_size, err_msg);
1293
1294   switch (result)
1295   {
1296   case GNUNET_OK:
1297   case GNUNET_NO:
1298     if (NULL != fragq)
1299       fragq->state_is_modified = GNUNET_YES;
1300     if (chn->max_state_message_id < msg_id)
1301       chn->max_state_message_id = msg_id;
1302     if (chn->max_message_id < msg_id)
1303       chn->max_message_id = msg_id;
1304
1305     if (NULL != fragq)
1306       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1307     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1308     message_queue_run (chn);
1309     break;
1310
1311   default:
1312     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1313                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1314                 chn, result, err_msg_size, err_msg);
1315     /** @todo FIXME: handle state_modify error */
1316   }
1317 }
1318
1319
1320 /**
1321  * Run message queue.
1322  *
1323  * Send messages in queue to client in order after a message has arrived from
1324  * multicast, according to the following:
1325  * - A message is only sent if all of its modifiers arrived.
1326  * - A stateful message is only sent if the previous stateful message
1327  *   has already been delivered to the client.
1328  *
1329  * @param chn  Channel.
1330  *
1331  * @return Number of messages removed from queue and sent to client.
1332  */
1333 static uint64_t
1334 message_queue_run (struct Channel *chn)
1335 {
1336   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1337               "%p Running message queue.\n", chn);
1338   uint64_t n = 0;
1339   uint64_t msg_id;
1340
1341   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1342                                                     &msg_id))
1343   {
1344     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1345                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1346     struct GNUNET_HashCode msg_id_hash;
1347     hash_key_from_hll (&msg_id_hash, msg_id);
1348
1349     struct FragmentQueue *
1350       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1351
1352     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1353     {
1354       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1355                   "%p No fragq (%p) or header not complete.\n",
1356                   chn, fragq);
1357       break;
1358     }
1359
1360     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1361                 "%p Fragment queue entry:  state: %u, state delta: "
1362                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1363                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1364
1365     if (MSG_FRAG_STATE_DATA <= fragq->state)
1366     {
1367       /* Check if there's a missing message before the current one */
1368       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1369       {
1370         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1371
1372         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1373             && (chn->max_message_id != msg_id - 1
1374                 && chn->max_message_id != msg_id))
1375         {
1376           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1377                       "%p Out of order message. "
1378                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1379                       chn, chn->max_message_id, msg_id);
1380           break;
1381           // FIXME: keep track of messages processed in this queue run,
1382           //        and only stop after reaching the end
1383         }
1384       }
1385       else
1386       {
1387         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1388         if (GNUNET_YES != fragq->state_is_modified)
1389         {
1390           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1391           {
1392             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1393                         "%p Out of order stateful message. "
1394                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1395                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1396             break;
1397             // FIXME: keep track of messages processed in this queue run,
1398             //        and only stop after reaching the end
1399           }
1400
1401           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1402           mcls->chn = chn;
1403           mcls->msg_id = msg_id;
1404           mcls->msg_id_hash = msg_id_hash;
1405
1406           /* Apply modifiers to state in PSYCstore */
1407           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1408                                          fragq->state_delta,
1409                                          store_recv_state_modify_result, mcls);
1410           break; // continue after asynchronous state modify result
1411         }
1412       }
1413       chn->max_message_id = msg_id;
1414     }
1415     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1416     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1417     n++;
1418   }
1419
1420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1421               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1422   return n;
1423 }
1424
1425
1426 /**
1427  * Drop message queue of a channel.
1428  *
1429  * Remove all messages in queue without sending it to clients.
1430  *
1431  * @param chn  Channel.
1432  *
1433  * @return Number of messages removed from queue.
1434  */
1435 static uint64_t
1436 message_queue_drop (struct Channel *chn)
1437 {
1438   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1439               "%p Dropping message queue.\n", chn);
1440   uint64_t n = 0;
1441   uint64_t msg_id;
1442   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1443                                                     &msg_id))
1444   {
1445     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1446                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1447     struct GNUNET_HashCode msg_id_hash;
1448     hash_key_from_hll (&msg_id_hash, msg_id);
1449
1450     struct FragmentQueue *
1451       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1452     GNUNET_assert (NULL != fragq);
1453     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1454     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1455     n++;
1456   }
1457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1458               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1459   return n;
1460 }
1461
1462
1463 /**
1464  * Received result of GNUNET_PSYCSTORE_fragment_store().
1465  */
1466 static void
1467 store_recv_fragment_store_result (void *cls, int64_t result,
1468                                   const char *err_msg, uint16_t err_msg_size)
1469 {
1470   struct Channel *chn = cls;
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1473               chn, result, err_msg_size, err_msg);
1474 }
1475
1476
1477 /**
1478  * Handle incoming message fragment from multicast.
1479  *
1480  * Store it using PSYCstore and send it to the clients of the channel in order.
1481  */
1482 static void
1483 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1484 {
1485   struct Channel *chn = cls;
1486   uint16_t size = ntohs (mmsg->header.size);
1487
1488   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1489               "%p Received multicast message of size %u.\n",
1490               chn, size);
1491
1492   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1493                                    &store_recv_fragment_store_result, chn);
1494
1495   uint16_t first_ptype = 0, last_ptype = 0;
1496   if (GNUNET_SYSERR
1497       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1498                                           (const char *) &mmsg[1],
1499                                           &first_ptype, &last_ptype))
1500   {
1501     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1502                 "%p Dropping incoming multicast message with invalid parts.\n",
1503                 chn);
1504     GNUNET_break_op (0);
1505     return;
1506   }
1507
1508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1509               "Message parts: first: type %u, last: type %u\n",
1510               first_ptype, last_ptype);
1511
1512   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1513   message_queue_run (chn);
1514 }
1515
1516
1517 /**
1518  * Incoming request fragment from multicast for a master.
1519  *
1520  * @param cls   Master.
1521  * @param req   The request.
1522  */
1523 static void
1524 mcast_recv_request (void *cls,
1525                     const struct GNUNET_MULTICAST_RequestHeader *req)
1526 {
1527   struct Master *mst = cls;
1528   uint16_t size = ntohs (req->header.size);
1529
1530   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_key);
1531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1532               "%p Received multicast request of size %u from %s.\n",
1533               mst, size, str);
1534   GNUNET_free (str);
1535
1536   uint16_t first_ptype = 0, last_ptype = 0;
1537   if (GNUNET_SYSERR
1538       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1539                                           (const char *) &req[1],
1540                                           &first_ptype, &last_ptype))
1541   {
1542     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1543                 "%p Dropping incoming multicast request with invalid parts.\n",
1544                 mst);
1545     GNUNET_break_op (0);
1546     return;
1547   }
1548
1549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1550               "Message parts: first: type %u, last: type %u\n",
1551               first_ptype, last_ptype);
1552
1553   /* FIXME: in-order delivery */
1554   client_send_mcast_req (mst, req);
1555 }
1556
1557
1558 /**
1559  * Response from PSYCstore with the current counter values for a channel master.
1560  */
1561 static void
1562 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1563                             uint64_t max_message_id, uint64_t max_group_generation,
1564                             uint64_t max_state_message_id)
1565 {
1566   struct Master *mst = cls;
1567   struct Channel *chn = &mst->chn;
1568   chn->store_op = NULL;
1569
1570   struct GNUNET_PSYC_CountersResultMessage res;
1571   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1572   res.header.size = htons (sizeof (res));
1573   res.result_code = htonl (result);
1574   res.max_message_id = GNUNET_htonll (max_message_id);
1575
1576   if (GNUNET_OK == result || GNUNET_NO == result)
1577   {
1578     mst->max_message_id = max_message_id;
1579     chn->max_message_id = max_message_id;
1580     chn->max_state_message_id = max_state_message_id;
1581     mst->max_group_generation = max_group_generation;
1582     mst->origin
1583       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1584                                        mcast_recv_join_request,
1585                                        mcast_recv_replay_fragment,
1586                                        mcast_recv_replay_message,
1587                                        mcast_recv_request,
1588                                        mcast_recv_message, chn);
1589     chn->is_ready = GNUNET_YES;
1590   }
1591   else
1592   {
1593     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1594                 "%p GNUNET_PSYCSTORE_counters_get() "
1595                 "returned %d for channel %s.\n",
1596                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1597   }
1598
1599   client_send_msg (chn, &res.header);
1600 }
1601
1602
1603 /**
1604  * Response from PSYCstore with the current counter values for a channel slave.
1605  */
1606 void
1607 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1608                            uint64_t max_message_id, uint64_t max_group_generation,
1609                            uint64_t max_state_message_id)
1610 {
1611   struct Slave *slv = cls;
1612   struct Channel *chn = &slv->chn;
1613   chn->store_op = NULL;
1614
1615   struct GNUNET_PSYC_CountersResultMessage res;
1616   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1617   res.header.size = htons (sizeof (res));
1618   res.result_code = htonl (result);
1619   res.max_message_id = GNUNET_htonll (max_message_id);
1620
1621   if (GNUNET_OK == result || GNUNET_NO == result)
1622   {
1623     chn->max_message_id = max_message_id;
1624     chn->max_state_message_id = max_state_message_id;
1625     slv->member
1626       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1627                                       &slv->origin,
1628                                       slv->relay_count, slv->relays,
1629                                       &slv->join_msg->header,
1630                                       mcast_recv_join_request,
1631                                       mcast_recv_join_decision,
1632                                       mcast_recv_replay_fragment,
1633                                       mcast_recv_replay_message,
1634                                       mcast_recv_message, chn);
1635     if (NULL != slv->join_msg)
1636     {
1637       GNUNET_free (slv->join_msg);
1638       slv->join_msg = NULL;
1639     }
1640   }
1641   else
1642   {
1643     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1644                 "%p GNUNET_PSYCSTORE_counters_get() "
1645                 "returned %d for channel %s.\n",
1646                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1647   }
1648
1649   client_send_msg (chn, &res.header);
1650 }
1651
1652
1653 static void
1654 channel_init (struct Channel *chn)
1655 {
1656   chn->recv_msgs
1657     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1658   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1659 }
1660
1661
1662 /**
1663  * Handle a connecting client starting a channel master.
1664  */
1665 static void
1666 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1667                           const struct GNUNET_MessageHeader *msg)
1668 {
1669   const struct MasterStartRequest *req
1670     = (const struct MasterStartRequest *) msg;
1671
1672   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1673   struct GNUNET_HashCode pub_key_hash;
1674
1675   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1676   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1677
1678   struct Master *
1679     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1680   struct Channel *chn;
1681
1682   if (NULL == mst)
1683   {
1684     mst = GNUNET_new (struct Master);
1685     mst->policy = ntohl (req->policy);
1686     mst->priv_key = req->channel_key;
1687     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1688
1689     chn = &mst->chn;
1690     chn->is_master = GNUNET_YES;
1691     chn->pub_key = pub_key;
1692     chn->pub_key_hash = pub_key_hash;
1693     channel_init (chn);
1694
1695     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1696                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1697     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1698                                                    store_recv_master_counters, mst);
1699   }
1700   else
1701   {
1702     chn = &mst->chn;
1703
1704     struct GNUNET_PSYC_CountersResultMessage res;
1705     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1706     res.header.size = htons (sizeof (res));
1707     res.result_code = htonl (GNUNET_OK);
1708     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1709
1710     GNUNET_SERVER_notification_context_add (nc, client);
1711     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1712                                                 GNUNET_NO);
1713   }
1714
1715   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716               "%p Client connected as master to channel %s.\n",
1717               mst, GNUNET_h2s (&chn->pub_key_hash));
1718
1719   struct Client *cli = GNUNET_new (struct Client);
1720   cli->client = client;
1721   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1722
1723   GNUNET_SERVER_client_set_user_context (client, chn);
1724   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1725 }
1726
1727
1728 /**
1729  * Handle a connecting client joining as a channel slave.
1730  */
1731 static void
1732 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1733                         const struct GNUNET_MessageHeader *msg)
1734 {
1735   const struct SlaveJoinRequest *req
1736     = (const struct SlaveJoinRequest *) msg;
1737   uint16_t req_size = ntohs (req->header.size);
1738
1739   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1740   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1741
1742   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1743   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1744   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1745
1746   struct GNUNET_CONTAINER_MultiHashMap *
1747     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1748   struct Slave *slv = NULL;
1749   struct Channel *chn;
1750
1751   if (NULL != chn_slv)
1752   {
1753     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1754   }
1755   if (NULL == slv)
1756   {
1757     slv = GNUNET_new (struct Slave);
1758     slv->priv_key = req->slave_key;
1759     slv->pub_key = slv_pub_key;
1760     slv->pub_key_hash = slv_pub_key_hash;
1761     slv->origin = req->origin;
1762     slv->relay_count = ntohl (req->relay_count);
1763
1764     const struct GNUNET_PeerIdentity *
1765       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1766     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1767     uint16_t join_msg_size = 0;
1768
1769     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1770         <= req_size)
1771     {
1772       struct GNUNET_PSYC_Message *
1773         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1774       join_msg_size = ntohs (join_msg->header.size);
1775       slv->join_msg = GNUNET_malloc (join_msg_size);
1776       memcpy (slv->join_msg, join_msg, join_msg_size);
1777     }
1778     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1779     {
1780       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1781                   "%u + %u + %u != %u\n",
1782                   sizeof (*req), relay_size, join_msg_size, req_size);
1783       GNUNET_break (0);
1784       GNUNET_SERVER_client_disconnect (client);
1785       GNUNET_free (slv);
1786       return;
1787     }
1788     if (0 < slv->relay_count)
1789     {
1790       slv->relays = GNUNET_malloc (relay_size);
1791       memcpy (slv->relays, &req[1], relay_size);
1792     }
1793
1794     chn = &slv->chn;
1795     chn->is_master = GNUNET_NO;
1796     chn->pub_key = req->channel_key;
1797     chn->pub_key_hash = pub_key_hash;
1798     channel_init (chn);
1799
1800     if (NULL == chn_slv)
1801     {
1802       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1803       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1804                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1805     }
1806     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1807                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1808     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1809                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1810     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1811                                                   &store_recv_slave_counters, slv);
1812   }
1813   else
1814   {
1815     chn = &slv->chn;
1816
1817     struct GNUNET_PSYC_CountersResultMessage res;
1818     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1819     res.header.size = htons (sizeof (res));
1820     res.result_code = htonl (GNUNET_OK);
1821     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1822
1823     GNUNET_SERVER_notification_context_add (nc, client);
1824     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1825                                                 GNUNET_NO);
1826
1827     if (NULL == slv->member)
1828     {
1829       slv->member
1830         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1831                                         &slv->origin,
1832                                         slv->relay_count, slv->relays,
1833                                         &slv->join_msg->header,
1834                                         &mcast_recv_join_request,
1835                                         &mcast_recv_join_decision,
1836                                         &mcast_recv_replay_fragment,
1837                                         &mcast_recv_replay_message,
1838                                         &mcast_recv_message, chn);
1839       if (NULL != slv->join_msg)
1840       {
1841         GNUNET_free (slv->join_msg);
1842         slv->join_msg = NULL;
1843       }
1844     }
1845     else if (NULL != slv->join_dcsn)
1846     {
1847       GNUNET_SERVER_notification_context_add (nc, client);
1848       GNUNET_SERVER_notification_context_unicast (nc, client,
1849                                                   &slv->join_dcsn->header,
1850                                                   GNUNET_NO);
1851     }
1852   }
1853
1854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1855               "%p Client connected as slave to channel %s.\n",
1856               slv, GNUNET_h2s (&chn->pub_key_hash));
1857
1858   struct Client *cli = GNUNET_new (struct Client);
1859   cli->client = client;
1860   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1861
1862   GNUNET_SERVER_client_set_user_context (client, chn);
1863   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1864 }
1865
1866
1867 struct JoinDecisionClosure
1868 {
1869   int32_t is_admitted;
1870   struct GNUNET_MessageHeader *msg;
1871 };
1872
1873
1874 /**
1875  * Iterator callback for sending join decisions to multicast.
1876  */
1877 static int
1878 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1879                           void *value)
1880 {
1881   struct JoinDecisionClosure *jcls = cls;
1882   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1883   // FIXME: add relays
1884   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1885   return GNUNET_YES;
1886 }
1887
1888
1889 /**
1890  * Join decision from client.
1891  */
1892 static void
1893 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1894                            const struct GNUNET_MessageHeader *msg)
1895 {
1896   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1897     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1898   struct Channel *chn;
1899   struct Master *mst;
1900   struct JoinDecisionClosure jcls;
1901
1902   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1903   if (NULL == chn)
1904   {
1905     GNUNET_break (0);
1906     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1907     return;
1908   }
1909   GNUNET_assert (GNUNET_YES == chn->is_master);
1910   mst = (struct Master *) chn;
1911   jcls.is_admitted = ntohl (dcsn->is_admitted);
1912   jcls.msg
1913     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1914     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1915     : NULL;
1916
1917   struct GNUNET_HashCode slave_key_hash;
1918   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1919                       &slave_key_hash);
1920
1921   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922               "%p Got join decision (%d) from client for channel %s..\n",
1923               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1924   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1925               "%p ..and slave %s.\n",
1926               mst, GNUNET_h2s (&slave_key_hash));
1927
1928   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1929                                               &mcast_send_join_decision, &jcls);
1930   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1931   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1932 }
1933
1934
1935 /**
1936  * Send acknowledgement to a client.
1937  *
1938  * Sent after a message fragment has been passed on to multicast.
1939  *
1940  * @param chn The channel struct for the client.
1941  */
1942 static void
1943 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1944 {
1945   struct GNUNET_MessageHeader res;
1946   res.size = htons (sizeof (res));
1947   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1948
1949   /* FIXME */
1950   GNUNET_SERVER_notification_context_add (nc, client);
1951   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1952 }
1953
1954
1955 /**
1956  * Callback for the transmit functions of multicast.
1957  */
1958 static int
1959 transmit_notify (void *cls, size_t *data_size, void *data)
1960 {
1961   struct Channel *chn = cls;
1962   struct TransmitMessage *tmit_msg = chn->tmit_head;
1963
1964   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1965   {
1966     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                 "%p transmit_notify: nothing to send.\n", chn);
1968     *data_size = 0;
1969     return GNUNET_NO;
1970   }
1971
1972   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1974
1975   *data_size = tmit_msg->size;
1976   memcpy (data, &tmit_msg[1], *data_size);
1977
1978   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1979
1980   if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1981     send_message_ack (chn, tmit_msg->client);
1982
1983   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1984   GNUNET_free (tmit_msg);
1985
1986   if (NULL != chn->tmit_head)
1987   {
1988     transmit_message (chn);
1989   }
1990   else if (GNUNET_YES == chn->is_disconnected
1991            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1992   {
1993     /* FIXME: handle partial message (when still in_transmit) */
1994     return GNUNET_SYSERR;
1995   }
1996   return ret;
1997 }
1998
1999
2000 /**
2001  * Callback for the transmit functions of multicast.
2002  */
2003 static int
2004 master_transmit_notify (void *cls, size_t *data_size, void *data)
2005 {
2006   int ret = transmit_notify (cls, data_size, data);
2007
2008   if (GNUNET_YES == ret)
2009   {
2010     struct Master *mst = cls;
2011     mst->tmit_handle = NULL;
2012   }
2013   return ret;
2014 }
2015
2016
2017 /**
2018  * Callback for the transmit functions of multicast.
2019  */
2020 static int
2021 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2022 {
2023   int ret = transmit_notify (cls, data_size, data);
2024
2025   if (GNUNET_YES == ret)
2026   {
2027     struct Slave *slv = cls;
2028     slv->tmit_handle = NULL;
2029   }
2030   return ret;
2031 }
2032
2033
2034 /**
2035  * Transmit a message from a channel master to the multicast group.
2036  */
2037 static void
2038 master_transmit_message (struct Master *mst)
2039 {
2040   if (NULL == mst->tmit_handle)
2041   {
2042     mst->tmit_handle
2043       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
2044                                         mst->max_group_generation,
2045                                         master_transmit_notify, mst);
2046   }
2047   else
2048   {
2049     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2050   }
2051 }
2052
2053
2054 /**
2055  * Transmit a message from a channel slave to the multicast group.
2056  */
2057 static void
2058 slave_transmit_message (struct Slave *slv)
2059 {
2060   if (NULL == slv->tmit_handle)
2061   {
2062     slv->tmit_handle
2063       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
2064                                            slave_transmit_notify, slv);
2065   }
2066   else
2067   {
2068     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2069   }
2070 }
2071
2072
2073 static void
2074 transmit_message (struct Channel *chn)
2075 {
2076   chn->is_master
2077     ? master_transmit_message ((struct Master *) chn)
2078     : slave_transmit_message ((struct Slave *) chn);
2079 }
2080
2081
2082 /**
2083  * Queue a message from a channel master for sending to the multicast group.
2084  */
2085 static void
2086 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2087 {
2088   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2089
2090   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2091   {
2092     tmit_msg->id = ++mst->max_message_id;
2093     struct GNUNET_PSYC_MessageMethod *pmeth
2094       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2095
2096     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2097     {
2098       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2099     }
2100     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2101     {
2102       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2103                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2104                   mst, tmit_msg->id - mst->max_state_message_id);
2105       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2106                                           - mst->max_state_message_id);
2107       mst->max_state_message_id = tmit_msg->id;
2108     }
2109     else
2110     {
2111         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2112                     "%p master_queue_message: state not modified\n", mst);
2113       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2114     }
2115
2116     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2117     {
2118       /// @todo add state_hash to PSYC header
2119     }
2120   }
2121 }
2122
2123
2124 /**
2125  * Queue a message from a channel slave for sending to the multicast group.
2126  */
2127 static void
2128 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2129 {
2130   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2131   {
2132     struct GNUNET_PSYC_MessageMethod *pmeth
2133       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2134     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2135     tmit_msg->id = ++slv->max_request_id;
2136   }
2137 }
2138
2139
2140 /**
2141  * Queue PSYC message parts for sending to multicast.
2142  *
2143  * @param chn           Channel to send to.
2144  * @param client       Client the message originates from.
2145  * @param data_size    Size of @a data.
2146  * @param data         Concatenated message parts.
2147  * @param first_ptype  First message part type in @a data.
2148  * @param last_ptype   Last message part type in @a data.
2149  */
2150 static struct TransmitMessage *
2151 queue_message (struct Channel *chn,
2152                struct GNUNET_SERVER_Client *client,
2153                size_t data_size,
2154                const void *data,
2155                uint16_t first_ptype, uint16_t last_ptype)
2156 {
2157   struct TransmitMessage *
2158     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2159   memcpy (&tmit_msg[1], data, data_size);
2160   tmit_msg->client = client;
2161   tmit_msg->size = data_size;
2162   tmit_msg->state = chn->tmit_state;
2163   tmit_msg->first_ptype = first_ptype;
2164   tmit_msg->last_ptype = last_ptype;
2165
2166   /* FIXME: separate queue per message ID */
2167
2168   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2169
2170   chn->is_master
2171     ? master_queue_message ((struct Master *) chn, tmit_msg)
2172     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2173   return tmit_msg;
2174 }
2175
2176
2177 /**
2178  * Cancel transmission of current message.
2179  *
2180  * @param chn     Channel to send to.
2181  * @param client  Client the message originates from.
2182  */
2183 static void
2184 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2185 {
2186   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2187
2188   struct GNUNET_MessageHeader msg;
2189   msg.size = htons (sizeof (msg));
2190   msg.type = htons (type);
2191
2192   queue_message (chn, client, sizeof (msg), &msg, type, type);
2193   transmit_message (chn);
2194
2195   /* FIXME: cleanup */
2196 }
2197
2198
2199 /**
2200  * Incoming message from a master or slave client.
2201  */
2202 static void
2203 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2204                           const struct GNUNET_MessageHeader *msg)
2205 {
2206   struct Channel *
2207     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2208   GNUNET_assert (NULL != chn);
2209
2210   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2211               "%p Received message from client.\n", chn);
2212   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2213
2214   if (GNUNET_YES != chn->is_ready)
2215   {
2216     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2217                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2218     GNUNET_break (0);
2219     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2220     return;
2221   }
2222
2223   uint16_t size = ntohs (msg->size);
2224   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2225   {
2226     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2227                 "%p Message payload too large: %u < %u.\n",
2228                 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2229     GNUNET_break (0);
2230     transmit_cancel (chn, client);
2231     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2232     return;
2233   }
2234
2235   uint16_t first_ptype = 0, last_ptype = 0;
2236   if (GNUNET_SYSERR
2237       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2238                                           (const char *) &msg[1],
2239                                           &first_ptype, &last_ptype))
2240   {
2241     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2242                 "%p Received invalid message part from client.\n", chn);
2243     GNUNET_break (0);
2244     transmit_cancel (chn, client);
2245     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2246     return;
2247   }
2248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2249               "%p Received message with first part type %u and last part type %u.\n",
2250               chn, first_ptype, last_ptype);
2251
2252   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2253                  first_ptype, last_ptype);
2254   transmit_message (chn);
2255   /* FIXME: send a few ACKs even before transmit_notify is called */
2256
2257   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2258 };
2259
2260
2261 /**
2262  * Received result of GNUNET_PSYCSTORE_membership_store()
2263  */
2264 static void
2265 store_recv_membership_store_result (void *cls, int64_t result,
2266                                     const char *err_msg, uint16_t err_msg_size)
2267 {
2268   struct Operation *op = cls;
2269   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2270               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2271               op->chn, result, err_msg_size, err_msg);
2272
2273   if (NULL != op->client)
2274     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2275   op_remove (op);
2276 }
2277
2278
2279 /**
2280  * Client requests to add/remove a slave in the membership database.
2281  */
2282 static void
2283 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2284                               const struct GNUNET_MessageHeader *msg)
2285 {
2286   struct Channel *
2287     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2288   GNUNET_assert (NULL != chn);
2289
2290   const struct ChannelMembershipStoreRequest *
2291     req = (const struct ChannelMembershipStoreRequest *) msg;
2292
2293   struct Operation *op = op_add (chn, client, req->op_id, 0);
2294
2295   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2296   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2298               "%p Received membership store request from client.\n", chn);
2299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2300               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2301               chn, req->did_join, announced_at, effective_since);
2302
2303   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2304                                      req->did_join, announced_at, effective_since,
2305                                      0, /* FIXME: group_generation */
2306                                      &store_recv_membership_store_result, op);
2307   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2308 }
2309
2310
2311 /**
2312  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2313  * in response to a history request from a client.
2314  */
2315 static int
2316 store_recv_fragment_history (void *cls,
2317                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2318                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2319 {
2320   struct Operation *op = cls;
2321   if (NULL == op->client)
2322   { /* Requesting client already disconnected. */
2323     return GNUNET_NO;
2324   }
2325   struct Channel *chn = op->chn;
2326
2327   struct GNUNET_PSYC_MessageHeader *pmsg;
2328   uint16_t msize = ntohs (mmsg->header.size);
2329   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2330
2331   struct GNUNET_OperationResultMessage *
2332     res = GNUNET_malloc (sizeof (*res) + psize);
2333   res->header.size = htons (sizeof (*res) + psize);
2334   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2335   res->op_id = op->op_id;
2336   res->result_code = GNUNET_htonll (GNUNET_OK);
2337
2338   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2339   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2340   memcpy (&res[1], pmsg, psize);
2341
2342   /** @todo FIXME: send only to requesting client */
2343   client_send_msg (chn, &res->header);
2344   return GNUNET_YES;
2345 }
2346
2347
2348 /**
2349  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2350  * in response to a history request from a client.
2351  */
2352 static void
2353 store_recv_fragment_history_result (void *cls, int64_t result,
2354                                     const char *err_msg, uint16_t err_msg_size)
2355 {
2356   struct Operation *op = cls;
2357   if (NULL == op->client)
2358   { /* Requesting client already disconnected. */
2359     return;
2360   }
2361
2362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363               "%p History replay #%" PRIu64 ": "
2364               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2365               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2366
2367   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2368   {
2369     /** @todo Multicast replay request for messages not found locally. */
2370   }
2371
2372   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2373   op_remove (op);
2374 }
2375
2376
2377 /**
2378  * Client requests channel history.
2379  */
2380 static void
2381 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2382                             const struct GNUNET_MessageHeader *msg)
2383 {
2384   struct Channel *
2385     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2386   GNUNET_assert (NULL != chn);
2387
2388   const struct GNUNET_PSYC_HistoryRequestMessage *
2389     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2390   uint16_t size = ntohs (msg->size);
2391   const char *method_prefix = (const char *) &req[1];
2392
2393   if (size < sizeof (*req) + 1
2394       || '\0' != method_prefix[size - sizeof (*req) - 1])
2395   {
2396     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2397                 "%p History replay #%" PRIu64 ": "
2398                 "invalid method prefix. size: %u < %u?\n",
2399                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2400     GNUNET_break (0);
2401     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2402     return;
2403   }
2404
2405   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2406
2407   if (0 == req->message_limit)
2408     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2409                                   GNUNET_ntohll (req->start_message_id),
2410                                   GNUNET_ntohll (req->end_message_id),
2411                                   0, method_prefix,
2412                                   &store_recv_fragment_history,
2413                                   &store_recv_fragment_history_result, op);
2414   else
2415     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2416                                          GNUNET_ntohll (req->message_limit),
2417                                          method_prefix,
2418                                          &store_recv_fragment_history,
2419                                          &store_recv_fragment_history_result,
2420                                          op);
2421
2422   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2423 }
2424
2425
2426 /**
2427  * Received state var from PSYCstore, send it to client.
2428  */
2429 static int
2430 store_recv_state_var (void *cls, const char *name,
2431                       const void *value, uint32_t value_size)
2432 {
2433   struct Operation *op = cls;
2434   struct GNUNET_OperationResultMessage *res;
2435
2436   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2437               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2438               op->chn, GNUNET_ntohll (op->op_id), name);
2439
2440   if (NULL != name) /* First part */
2441   {
2442     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2443     struct GNUNET_PSYC_MessageModifier *mod;
2444     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2445     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2446     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2447     res->op_id = op->op_id;
2448
2449     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2450     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2451     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2452     mod->name_size = htons (name_size);
2453     mod->value_size = htonl (value_size);
2454     mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2455     memcpy (&mod[1], name, name_size);
2456     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2457   }
2458   else /* Continuation */
2459   {
2460     struct GNUNET_MessageHeader *mod;
2461     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2462     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2463     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2464     res->op_id = op->op_id;
2465
2466     mod = (struct GNUNET_MessageHeader *) &res[1];
2467     mod->size = htons (sizeof (*mod) + value_size);
2468     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2469     memcpy (&mod[1], value, value_size);
2470   }
2471
2472   // FIXME: client might have been disconnected
2473   GNUNET_SERVER_notification_context_add (nc, op->client);
2474   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2475                                               GNUNET_NO);
2476   return GNUNET_YES;
2477 }
2478
2479
2480 /**
2481  * Received result of GNUNET_PSYCSTORE_state_get()
2482  * or GNUNET_PSYCSTORE_state_get_prefix()
2483  */
2484 static void
2485 store_recv_state_result (void *cls, int64_t result,
2486                          const char *err_msg, uint16_t err_msg_size)
2487 {
2488   struct Operation *op = cls;
2489   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2490               "%p state_get #%" PRIu64 ": "
2491               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2492               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2493
2494   // FIXME: client might have been disconnected
2495   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2496   op_remove (op);
2497 }
2498
2499
2500 /**
2501  * Client requests best matching state variable from PSYCstore.
2502  */
2503 static void
2504 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2505                        const struct GNUNET_MessageHeader *msg)
2506 {
2507   struct Channel *
2508     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2509   GNUNET_assert (NULL != chn);
2510
2511   const struct StateRequest *
2512     req = (const struct StateRequest *) msg;
2513
2514   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2515   const char *name = (const char *) &req[1];
2516   if (0 == name_size || '\0' != name[name_size - 1])
2517   {
2518     GNUNET_break (0);
2519     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2520     return;
2521   }
2522
2523   struct Operation *op = op_add (chn, client, req->op_id, 0);
2524   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2525                               &store_recv_state_var,
2526                               &store_recv_state_result, op);
2527   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2528 }
2529
2530
2531 /**
2532  * Client requests state variables with a given prefix from PSYCstore.
2533  */
2534 static void
2535 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2536                               const struct GNUNET_MessageHeader *msg)
2537 {
2538   struct Channel *
2539     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2540   GNUNET_assert (NULL != chn);
2541
2542   const struct StateRequest *
2543     req = (const struct StateRequest *) msg;
2544
2545   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2546   const char *name = (const char *) &req[1];
2547   if (0 == name_size || '\0' != name[name_size - 1])
2548   {
2549     GNUNET_break (0);
2550     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2551     return;
2552   }
2553
2554   struct Operation *op = op_add (chn, client, req->op_id, 0);
2555   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2556                                      &store_recv_state_var,
2557                                      &store_recv_state_result, op);
2558   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2559 }
2560
2561
2562 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2563   { &client_recv_master_start, NULL,
2564     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2565
2566   { &client_recv_slave_join, NULL,
2567     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2568
2569   { &client_recv_join_decision, NULL,
2570     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2571
2572   { &client_recv_psyc_message, NULL,
2573     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2574
2575   { &client_recv_membership_store, NULL,
2576     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2577
2578   { &client_recv_history_replay, NULL,
2579     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2580
2581   { &client_recv_state_get, NULL,
2582     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2583
2584   { &client_recv_state_get_prefix, NULL,
2585     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2586
2587   { NULL, NULL, 0, 0 }
2588 };
2589
2590
2591 /**
2592  * Initialize the PSYC service.
2593  *
2594  * @param cls Closure.
2595  * @param server The initialized server.
2596  * @param c Configuration to use.
2597  */
2598 static void
2599 run (void *cls, struct GNUNET_SERVER_Handle *server,
2600      const struct GNUNET_CONFIGURATION_Handle *c)
2601 {
2602   cfg = c;
2603   store = GNUNET_PSYCSTORE_connect (cfg);
2604   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2605   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2606   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2607   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2608   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2609   nc = GNUNET_SERVER_notification_context_create (server, 1);
2610   GNUNET_SERVER_add_handlers (server, server_handlers);
2611   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2612   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2613                                 &shutdown_task, NULL);
2614 }
2615
2616
2617 /**
2618  * The main function for the service.
2619  *
2620  * @param argc number of arguments from the command line
2621  * @param argv command line arguments
2622  * @return 0 ok, 1 on error
2623  */
2624 int
2625 main (int argc, char *const *argv)
2626 {
2627   return (GNUNET_OK ==
2628           GNUNET_SERVICE_run (argc, argv, "psyc",
2629                               GNUNET_SERVICE_OPTION_NONE,
2630                               &run, NULL)) ? 0 : 1;
2631 }
2632
2633 /* end of gnunet-service-psyc.c */