cmdline usability for gnunet-social
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 static void
421 transmit_message (struct Channel *chn);
422
423 static uint64_t
424 message_queue_run (struct Channel *chn);
425
426 static uint64_t
427 message_queue_drop (struct Channel *chn);
428
429
430 static void
431 schedule_transmit_message (void *cls)
432 {
433   struct Channel *chn = cls;
434
435   transmit_message (chn);
436 }
437
438
439 /**
440  * Task run during shutdown.
441  *
442  * @param cls unused
443  */
444 static void
445 shutdown_task (void *cls)
446 {
447   if (NULL != nc)
448   {
449     GNUNET_SERVER_notification_context_destroy (nc);
450     nc = NULL;
451   }
452   if (NULL != stats)
453   {
454     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
455     stats = NULL;
456   }
457 }
458
459
460 static struct Operation *
461 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
462         uint64_t op_id, uint32_t flags)
463 {
464   struct Operation *op = GNUNET_malloc (sizeof (*op));
465   op->client = client;
466   op->chn = chn;
467   op->op_id = op_id;
468   op->flags = flags;
469   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
470   return op;
471 }
472
473
474 static void
475 op_remove (struct Operation *op)
476 {
477   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
478   GNUNET_free (op);
479 }
480
481
482 /**
483  * Clean up master data structures after a client disconnected.
484  */
485 static void
486 cleanup_master (struct Master *mst)
487 {
488   struct Channel *chn = &mst->chn;
489
490   if (NULL != mst->origin)
491     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
492   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
493   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
494 }
495
496
497 /**
498  * Clean up slave data structures after a client disconnected.
499  */
500 static void
501 cleanup_slave (struct Slave *slv)
502 {
503   struct Channel *chn = &slv->chn;
504   struct GNUNET_CONTAINER_MultiHashMap *
505     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
506                                                 &chn->pub_key_hash);
507   GNUNET_assert (NULL != chn_slv);
508   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
509
510   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
511   {
512     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
513                                           chn_slv);
514     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
515   }
516   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
517
518   if (NULL != slv->join_msg)
519   {
520     GNUNET_free (slv->join_msg);
521     slv->join_msg = NULL;
522   }
523   if (NULL != slv->relays)
524   {
525     GNUNET_free (slv->relays);
526     slv->relays = NULL;
527   }
528   if (NULL != slv->member)
529   {
530     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
531     slv->member = NULL;
532   }
533   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
534 }
535
536
537 /**
538  * Clean up channel data structures after a client disconnected.
539  */
540 static void
541 cleanup_channel (struct Channel *chn)
542 {
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "%p Cleaning up channel %s. master? %u\n",
545               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
546   message_queue_drop (chn);
547   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
548   chn->recv_frags = NULL;
549
550   if (NULL != chn->store_op)
551   {
552     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
553     chn->store_op = NULL;
554   }
555
556   (GNUNET_YES == chn->is_master)
557     ? cleanup_master ((struct Master *) chn)
558     : cleanup_slave ((struct Slave *) chn);
559   GNUNET_free (chn);
560 }
561
562
563 /**
564  * Called whenever a client is disconnected.
565  * Frees our resources associated with that client.
566  *
567  * @param cls Closure.
568  * @param client Identification of the client.
569  */
570 static void
571 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
572 {
573   if (NULL == client)
574     return;
575
576   struct Channel *
577     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
578
579   if (NULL == chn)
580   {
581     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582                 "%p User context is NULL in client_disconnect()\n", chn);
583     GNUNET_break (0);
584     return;
585   }
586
587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588               "%p Client (%s) disconnected from channel %s\n",
589               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
590               GNUNET_h2s (&chn->pub_key_hash));
591
592   struct Client *cli = chn->clients_head;
593   while (NULL != cli)
594   {
595     if (cli->client == client)
596     {
597       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
598       GNUNET_free (cli);
599       break;
600     }
601     cli = cli->next;
602   }
603
604   struct Operation *op = chn->op_head;
605   while (NULL != op)
606   {
607     if (op->client == client)
608     {
609       op->client = NULL;
610       break;
611     }
612     op = op->next;
613   }
614
615   if (NULL == chn->clients_head)
616   { /* Last client disconnected. */
617     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                 "%p Last client (%s) disconnected from channel %s\n",
619                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
620                 GNUNET_h2s (&chn->pub_key_hash));
621     chn->is_disconnected = GNUNET_YES;
622     if (NULL != chn->tmit_head)
623     { /* Send pending messages to multicast before cleanup. */
624       transmit_message (chn);
625     }
626     else
627     {
628       cleanup_channel (chn);
629     }
630   }
631 }
632
633
634 /**
635  * Send message to all clients connected to the channel.
636  */
637 static void
638 client_send_msg (const struct Channel *chn,
639                  const struct GNUNET_MessageHeader *msg)
640 {
641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642               "%p Sending message to clients.\n", chn);
643
644   struct Client *cli = chn->clients_head;
645   while (NULL != cli)
646   {
647     GNUNET_SERVER_notification_context_add (nc, cli->client);
648     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
649     cli = cli->next;
650   }
651 }
652
653
654 /**
655  * Send a result code back to the client.
656  *
657  * @param client
658  *        Client that should receive the result code.
659  * @param result_code
660  *        Code to transmit.
661  * @param op_id
662  *        Operation ID in network byte order.
663  * @param data
664  *        Data payload or NULL.
665  * @param data_size
666  *        Size of @a data.
667  */
668 static void
669 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
670                     int64_t result_code, const void *data, uint16_t data_size)
671 {
672   struct GNUNET_OperationResultMessage *res;
673
674   res = GNUNET_malloc (sizeof (*res) + data_size);
675   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
676   res->header.size = htons (sizeof (*res) + data_size);
677   res->result_code = GNUNET_htonll (result_code);
678   res->op_id = op_id;
679   if (0 < data_size)
680     memcpy (&res[1], data, data_size);
681
682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
683               "%p Sending result to client for operation #%" PRIu64 ": "
684               "%" PRId64 " (size: %u)\n",
685               client, GNUNET_ntohll (op_id), result_code, data_size);
686
687   GNUNET_SERVER_notification_context_add (nc, client);
688   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
689                                               GNUNET_NO);
690   GNUNET_free (res);
691 }
692
693
694 /**
695  * Closure for join_mem_test_cb()
696  */
697 struct JoinMemTestClosure
698 {
699   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
700   struct Channel *chn;
701   struct GNUNET_MULTICAST_JoinHandle *jh;
702   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
703 };
704
705
706 /**
707  * Membership test result callback used for join requests.
708  */
709 static void
710 join_mem_test_cb (void *cls, int64_t result,
711                   const char *err_msg, uint16_t err_msg_size)
712 {
713   struct JoinMemTestClosure *jcls = cls;
714
715   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
716   { /* Pass on join request to client if this is a master channel */
717     struct Master *mst = (struct Master *) jcls->chn;
718     struct GNUNET_HashCode slave_pub_hash;
719     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
720                         &slave_pub_hash);
721     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
722                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
723     client_send_msg (jcls->chn, &jcls->join_msg->header);
724   }
725   else
726   {
727     if (GNUNET_SYSERR == result)
728     {
729       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
730                   "Could not perform membership test (%.*s)\n",
731                   err_msg_size, err_msg);
732     }
733     // FIXME: add relays
734     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
735   }
736   GNUNET_free (jcls->join_msg);
737   GNUNET_free (jcls);
738 }
739
740
741 /**
742  * Incoming join request from multicast.
743  */
744 static void
745 mcast_recv_join_request (void *cls,
746                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
747                          const struct GNUNET_MessageHeader *join_msg,
748                          struct GNUNET_MULTICAST_JoinHandle *jh)
749 {
750   struct Channel *chn = cls;
751   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
752
753   uint16_t join_msg_size = 0;
754   if (NULL != join_msg)
755   {
756     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
757     {
758       join_msg_size = ntohs (join_msg->size);
759     }
760     else
761     {
762       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
763                   "%p Got join message with invalid type %u.\n",
764                   chn, ntohs (join_msg->type));
765     }
766   }
767
768   struct GNUNET_PSYC_JoinRequestMessage *
769     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
770   req->header.size = htons (sizeof (*req) + join_msg_size);
771   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
772   req->slave_pub_key = *slave_pub_key;
773   if (0 < join_msg_size)
774     memcpy (&req[1], join_msg, join_msg_size);
775
776   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
777   jcls->slave_pub_key = *slave_pub_key;
778   jcls->chn = chn;
779   jcls->jh = jh;
780   jcls->join_msg = req;
781
782   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
783                                     chn->max_message_id, 0,
784                                     &join_mem_test_cb, jcls);
785 }
786
787
788 /**
789  * Join decision received from multicast.
790  */
791 static void
792 mcast_recv_join_decision (void *cls, int is_admitted,
793                           const struct GNUNET_PeerIdentity *peer,
794                           uint16_t relay_count,
795                           const struct GNUNET_PeerIdentity *relays,
796                           const struct GNUNET_MessageHeader *join_resp)
797 {
798   struct Slave *slv = cls;
799   struct Channel *chn = &slv->chn;
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "%p Got join decision: %d\n", slv, is_admitted);
802   if (GNUNET_YES == chn->is_ready)
803   {
804     /* Already admitted */
805     return;
806   }
807
808   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
809   struct GNUNET_PSYC_JoinDecisionMessage *
810     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
811   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
812   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
813   dcsn->is_admitted = htonl (is_admitted);
814   if (0 < join_resp_size)
815     memcpy (&dcsn[1], join_resp, join_resp_size);
816
817   client_send_msg (chn, &dcsn->header);
818
819   if (GNUNET_YES == is_admitted
820       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
821   {
822     chn->is_ready = GNUNET_YES;
823   }
824 }
825
826
827 static int
828 store_recv_fragment_replay (void *cls,
829                             struct GNUNET_MULTICAST_MessageHeader *msg,
830                             enum GNUNET_PSYCSTORE_MessageFlags flags)
831 {
832   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
833
834   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
835   return GNUNET_YES;
836 }
837
838
839 /**
840  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
841  */
842 static void
843 store_recv_fragment_replay_result (void *cls, int64_t result,
844                                    const char *err_msg, uint16_t err_msg_size)
845 {
846   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
849               rh, result, err_msg_size, err_msg);
850
851   switch (result)
852   {
853   case GNUNET_YES:
854     break;
855
856   case GNUNET_NO:
857     GNUNET_MULTICAST_replay_response (rh, NULL,
858                                       GNUNET_MULTICAST_REC_NOT_FOUND);
859     break;
860
861   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
862     GNUNET_MULTICAST_replay_response (rh, NULL,
863                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
864     break;
865
866   case GNUNET_SYSERR:
867     GNUNET_MULTICAST_replay_response (rh, NULL,
868                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
869     break;
870   }
871   GNUNET_MULTICAST_replay_response_end (rh);
872 }
873
874
875 /**
876  * Incoming fragment replay request from multicast.
877  */
878 static void
879 mcast_recv_replay_fragment (void *cls,
880                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
881                             uint64_t fragment_id, uint64_t flags,
882                             struct GNUNET_MULTICAST_ReplayHandle *rh)
883
884 {
885   struct Channel *chn = cls;
886   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
887                                  fragment_id, fragment_id,
888                                  &store_recv_fragment_replay,
889                                  &store_recv_fragment_replay_result, rh);
890 }
891
892
893 /**
894  * Incoming message replay request from multicast.
895  */
896 static void
897 mcast_recv_replay_message (void *cls,
898                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
899                            uint64_t message_id,
900                            uint64_t fragment_offset,
901                            uint64_t flags,
902                            struct GNUNET_MULTICAST_ReplayHandle *rh)
903 {
904   struct Channel *chn = cls;
905   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
906                                 message_id, message_id, 1, NULL,
907                                 &store_recv_fragment_replay,
908                                 &store_recv_fragment_replay_result, rh);
909 }
910
911
912 /**
913  * Convert an uint64_t in network byte order to a HashCode
914  * that can be used as key in a MultiHashMap
915  */
916 static inline void
917 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
918 {
919   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
920   /* TODO: use built-in byte swap functions if available */
921
922   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
923   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
924
925   *key = (struct GNUNET_HashCode) {};
926   *((uint64_t *) key)
927     = (n << 32) | (n >> 32);
928 }
929
930
931 /**
932  * Convert an uint64_t in host byte order to a HashCode
933  * that can be used as key in a MultiHashMap
934  */
935 static inline void
936 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
937 {
938 #if __BYTE_ORDER == __BIG_ENDIAN
939   hash_key_from_nll (key, n);
940 #elif __BYTE_ORDER == __LITTLE_ENDIAN
941   *key = (struct GNUNET_HashCode) {};
942   *((uint64_t *) key) = n;
943 #else
944   #error byteorder undefined
945 #endif
946 }
947
948
949 /**
950  * Initialize PSYC message header.
951  */
952 static inline void
953 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
954                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
955 {
956   uint16_t size = ntohs (mmsg->header.size);
957   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
958
959   pmsg->header.size = htons (psize);
960   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
961   pmsg->message_id = mmsg->message_id;
962   pmsg->fragment_offset = mmsg->fragment_offset;
963   pmsg->flags = htonl (flags);
964
965   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
966 }
967
968
969 /**
970  * Create a new PSYC message from a multicast message for sending it to clients.
971  */
972 static inline struct GNUNET_PSYC_MessageHeader *
973 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
974 {
975   struct GNUNET_PSYC_MessageHeader *pmsg;
976   uint16_t size = ntohs (mmsg->header.size);
977   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
978
979   pmsg = GNUNET_malloc (psize);
980   psyc_msg_init (pmsg, mmsg, flags);
981   return pmsg;
982 }
983
984
985 /**
986  * Send multicast message to all clients connected to the channel.
987  */
988 static void
989 client_send_mcast_msg (struct Channel *chn,
990                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
991                        uint32_t flags)
992 {
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994               "%p Sending multicast message to client. "
995               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
996               chn, GNUNET_ntohll (mmsg->fragment_id),
997               GNUNET_ntohll (mmsg->message_id));
998
999   struct GNUNET_PSYC_MessageHeader *
1000     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1001   client_send_msg (chn, &pmsg->header);
1002   GNUNET_free (pmsg);
1003 }
1004
1005
1006 /**
1007  * Send multicast request to all clients connected to the channel.
1008  */
1009 static void
1010 client_send_mcast_req (struct Master *mst,
1011                        const struct GNUNET_MULTICAST_RequestHeader *req)
1012 {
1013   struct Channel *chn = &mst->chn;
1014
1015   struct GNUNET_PSYC_MessageHeader *pmsg;
1016   uint16_t size = ntohs (req->header.size);
1017   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1018
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "%p Sending multicast request to client. "
1021               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1022               chn, GNUNET_ntohll (req->fragment_id),
1023               GNUNET_ntohll (req->request_id));
1024
1025   pmsg = GNUNET_malloc (psize);
1026   pmsg->header.size = htons (psize);
1027   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1028   pmsg->message_id = req->request_id;
1029   pmsg->fragment_offset = req->fragment_offset;
1030   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1031   pmsg->slave_pub_key = req->member_pub_key;
1032   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1033
1034   client_send_msg (chn, &pmsg->header);
1035
1036   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1037
1038   GNUNET_free (pmsg);
1039 }
1040
1041
1042 /**
1043  * Insert a multicast message fragment into the queue belonging to the message.
1044  *
1045  * @param chn          Channel.
1046  * @param mmsg         Multicast message fragment.
1047  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1048  * @param first_ptype  First PSYC message part type in @a mmsg.
1049  * @param last_ptype   Last PSYC message part type in @a mmsg.
1050  */
1051 static void
1052 fragment_queue_insert (struct Channel *chn,
1053                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1054                        uint16_t first_ptype, uint16_t last_ptype)
1055 {
1056   const uint16_t size = ntohs (mmsg->header.size);
1057   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1058   struct GNUNET_CONTAINER_MultiHashMap
1059     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1060                                                     &chn->pub_key_hash);
1061
1062   struct GNUNET_HashCode msg_id_hash;
1063   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1064
1065   struct FragmentQueue
1066     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1067
1068   if (NULL == fragq)
1069   {
1070     fragq = GNUNET_new (struct FragmentQueue);
1071     fragq->state = MSG_FRAG_STATE_HEADER;
1072     fragq->fragments
1073       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1074
1075     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1076                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1077
1078     if (NULL == chan_msgs)
1079     {
1080       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1081       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1082                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1083     }
1084   }
1085
1086   struct GNUNET_HashCode frag_id_hash;
1087   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1088   struct RecvCacheEntry
1089     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1090   if (NULL == cache_entry)
1091   {
1092     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093                 "%p Adding message fragment to cache. "
1094                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1095                 chn, GNUNET_ntohll (mmsg->message_id),
1096                 GNUNET_ntohll (mmsg->fragment_id));
1097     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098                 "%p header_size: %" PRIu64 " + %u\n",
1099                 chn, fragq->header_size, size);
1100     cache_entry = GNUNET_new (struct RecvCacheEntry);
1101     cache_entry->ref_count = 1;
1102     cache_entry->mmsg = GNUNET_malloc (size);
1103     memcpy (cache_entry->mmsg, mmsg, size);
1104     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1105                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1106   }
1107   else
1108   {
1109     cache_entry->ref_count++;
1110     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111                 "%p Message fragment is already in cache. "
1112                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1113                 ", ref_count: %u\n",
1114                 chn, GNUNET_ntohll (mmsg->message_id),
1115                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1116   }
1117
1118   if (MSG_FRAG_STATE_HEADER == fragq->state)
1119   {
1120     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1121     {
1122       struct GNUNET_PSYC_MessageMethod *
1123         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1124       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1125       fragq->flags = ntohl (pmeth->flags);
1126     }
1127
1128     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1129     {
1130       fragq->header_size += size;
1131     }
1132     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1133              || frag_offset == fragq->header_size)
1134     { /* header is now complete */
1135       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1136                   "%p Header of message %" PRIu64 " is complete.\n",
1137                   chn, GNUNET_ntohll (mmsg->message_id));
1138
1139       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1140                   "%p Adding message %" PRIu64 " to queue.\n",
1141                   chn, GNUNET_ntohll (mmsg->message_id));
1142       fragq->state = MSG_FRAG_STATE_DATA;
1143     }
1144     else
1145     {
1146       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1147                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1148                   "%" PRIu64 " != %" PRIu64 "\n",
1149                   chn, GNUNET_ntohll (mmsg->message_id),
1150                   frag_offset, fragq->header_size);
1151     }
1152   }
1153
1154   switch (last_ptype)
1155   {
1156   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1157     if (frag_offset == fragq->size)
1158       fragq->state = MSG_FRAG_STATE_END;
1159     else
1160       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161                   "%p Message %" PRIu64 " is NOT complete yet: "
1162                   "%" PRIu64 " != %" PRIu64 "\n",
1163                   chn, GNUNET_ntohll (mmsg->message_id),
1164                   frag_offset, fragq->size);
1165     break;
1166
1167   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1168     /* Drop message without delivering to client if it's a single fragment */
1169     fragq->state =
1170       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1171       ? MSG_FRAG_STATE_DROP
1172       : MSG_FRAG_STATE_CANCEL;
1173   }
1174
1175   switch (fragq->state)
1176   {
1177   case MSG_FRAG_STATE_DATA:
1178   case MSG_FRAG_STATE_END:
1179   case MSG_FRAG_STATE_CANCEL:
1180     if (GNUNET_NO == fragq->is_queued)
1181     {
1182       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1183                                     GNUNET_ntohll (mmsg->message_id));
1184       fragq->is_queued = GNUNET_YES;
1185     }
1186   }
1187
1188   fragq->size += size;
1189   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1190                                 GNUNET_ntohll (mmsg->fragment_id));
1191 }
1192
1193
1194 /**
1195  * Run fragment queue of a message.
1196  *
1197  * Send fragments of a message in order to client, after all modifiers arrived
1198  * from multicast.
1199  *
1200  * @param chn
1201  *        Channel.
1202  * @param msg_id
1203  *        ID of the message @a fragq belongs to.
1204  * @param fragq
1205  *        Fragment queue of the message.
1206  * @param drop
1207  *        Drop message without delivering to client?
1208  *        #GNUNET_YES or #GNUNET_NO.
1209  */
1210 static void
1211 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1212                     struct FragmentQueue *fragq, uint8_t drop)
1213 {
1214   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1215               "%p Running message fragment queue for message %" PRIu64
1216               " (state: %u).\n",
1217               chn, msg_id, fragq->state);
1218
1219   struct GNUNET_CONTAINER_MultiHashMap
1220     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1221                                                     &chn->pub_key_hash);
1222   GNUNET_assert (NULL != chan_msgs);
1223   uint64_t frag_id;
1224
1225   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1226                                                     &frag_id))
1227   {
1228     struct GNUNET_HashCode frag_id_hash;
1229     hash_key_from_hll (&frag_id_hash, frag_id);
1230     struct RecvCacheEntry *cache_entry
1231       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1232     if (cache_entry != NULL)
1233     {
1234       if (GNUNET_NO == drop)
1235       {
1236         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1237       }
1238       if (cache_entry->ref_count <= 1)
1239       {
1240         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1241                                               cache_entry);
1242         GNUNET_free (cache_entry->mmsg);
1243         GNUNET_free (cache_entry);
1244       }
1245       else
1246       {
1247         cache_entry->ref_count--;
1248       }
1249     }
1250 #if CACHE_AGING_IMPLEMENTED
1251     else if (GNUNET_NO == drop)
1252     {
1253       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1254     }
1255 #endif
1256
1257     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1258   }
1259
1260   if (MSG_FRAG_STATE_END <= fragq->state)
1261   {
1262     struct GNUNET_HashCode msg_id_hash;
1263     hash_key_from_hll (&msg_id_hash, msg_id);
1264
1265     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1266     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1267     GNUNET_free (fragq);
1268   }
1269   else
1270   {
1271     fragq->is_queued = GNUNET_NO;
1272   }
1273 }
1274
1275
1276 struct StateModifyClosure
1277 {
1278   struct Channel *chn;
1279   uint64_t msg_id;
1280   struct GNUNET_HashCode msg_id_hash;
1281 };
1282
1283
1284 void
1285 store_recv_state_modify_result (void *cls, int64_t result,
1286                                 const char *err_msg, uint16_t err_msg_size)
1287 {
1288   struct StateModifyClosure *mcls = cls;
1289   struct Channel *chn = mcls->chn;
1290   uint64_t msg_id = mcls->msg_id;
1291
1292   struct FragmentQueue *
1293     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1294
1295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1297               chn, result, err_msg_size, err_msg);
1298
1299   switch (result)
1300   {
1301   case GNUNET_OK:
1302   case GNUNET_NO:
1303     if (NULL != fragq)
1304       fragq->state_is_modified = GNUNET_YES;
1305     if (chn->max_state_message_id < msg_id)
1306       chn->max_state_message_id = msg_id;
1307     if (chn->max_message_id < msg_id)
1308       chn->max_message_id = msg_id;
1309
1310     if (NULL != fragq)
1311       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1312     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1313     message_queue_run (chn);
1314     break;
1315
1316   default:
1317     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1318                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1319                 chn, result, err_msg_size, err_msg);
1320     /** @todo FIXME: handle state_modify error */
1321   }
1322 }
1323
1324
1325 /**
1326  * Run message queue.
1327  *
1328  * Send messages in queue to client in order after a message has arrived from
1329  * multicast, according to the following:
1330  * - A message is only sent if all of its modifiers arrived.
1331  * - A stateful message is only sent if the previous stateful message
1332  *   has already been delivered to the client.
1333  *
1334  * @param chn  Channel.
1335  *
1336  * @return Number of messages removed from queue and sent to client.
1337  */
1338 static uint64_t
1339 message_queue_run (struct Channel *chn)
1340 {
1341   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1342               "%p Running message queue.\n", chn);
1343   uint64_t n = 0;
1344   uint64_t msg_id;
1345
1346   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1347                                                     &msg_id))
1348   {
1349     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1350                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1351     struct GNUNET_HashCode msg_id_hash;
1352     hash_key_from_hll (&msg_id_hash, msg_id);
1353
1354     struct FragmentQueue *
1355       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1356
1357     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1358     {
1359       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1360                   "%p No fragq (%p) or header not complete.\n",
1361                   chn, fragq);
1362       break;
1363     }
1364
1365     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366                 "%p Fragment queue entry:  state: %u, state delta: "
1367                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1368                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1369
1370     if (MSG_FRAG_STATE_DATA <= fragq->state)
1371     {
1372       /* Check if there's a missing message before the current one */
1373       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1374       {
1375         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1376
1377         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1378             && (chn->max_message_id != msg_id - 1
1379                 && chn->max_message_id != msg_id))
1380         {
1381           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1382                       "%p Out of order message. "
1383                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1384                       chn, chn->max_message_id, msg_id);
1385           break;
1386           // FIXME: keep track of messages processed in this queue run,
1387           //        and only stop after reaching the end
1388         }
1389       }
1390       else
1391       {
1392         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1393         if (GNUNET_YES != fragq->state_is_modified)
1394         {
1395           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1396           {
1397             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1398                         "%p Out of order stateful message. "
1399                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1400                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1401             break;
1402             // FIXME: keep track of messages processed in this queue run,
1403             //        and only stop after reaching the end
1404           }
1405
1406           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1407           mcls->chn = chn;
1408           mcls->msg_id = msg_id;
1409           mcls->msg_id_hash = msg_id_hash;
1410
1411           /* Apply modifiers to state in PSYCstore */
1412           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1413                                          fragq->state_delta,
1414                                          store_recv_state_modify_result, mcls);
1415           break; // continue after asynchronous state modify result
1416         }
1417       }
1418       chn->max_message_id = msg_id;
1419     }
1420     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1421     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1422     n++;
1423   }
1424
1425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1427   return n;
1428 }
1429
1430
1431 /**
1432  * Drop message queue of a channel.
1433  *
1434  * Remove all messages in queue without sending it to clients.
1435  *
1436  * @param chn  Channel.
1437  *
1438  * @return Number of messages removed from queue.
1439  */
1440 static uint64_t
1441 message_queue_drop (struct Channel *chn)
1442 {
1443   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1444               "%p Dropping message queue.\n", chn);
1445   uint64_t n = 0;
1446   uint64_t msg_id;
1447   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1448                                                     &msg_id))
1449   {
1450     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1451                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1452     struct GNUNET_HashCode msg_id_hash;
1453     hash_key_from_hll (&msg_id_hash, msg_id);
1454
1455     struct FragmentQueue *
1456       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1457     GNUNET_assert (NULL != fragq);
1458     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1459     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1460     n++;
1461   }
1462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1463               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1464   return n;
1465 }
1466
1467
1468 /**
1469  * Received result of GNUNET_PSYCSTORE_fragment_store().
1470  */
1471 static void
1472 store_recv_fragment_store_result (void *cls, int64_t result,
1473                                   const char *err_msg, uint16_t err_msg_size)
1474 {
1475   struct Channel *chn = cls;
1476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1478               chn, result, err_msg_size, err_msg);
1479 }
1480
1481
1482 /**
1483  * Handle incoming message fragment from multicast.
1484  *
1485  * Store it using PSYCstore and send it to the clients of the channel in order.
1486  */
1487 static void
1488 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1489 {
1490   struct Channel *chn = cls;
1491   uint16_t size = ntohs (mmsg->header.size);
1492
1493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1494               "%p Received multicast message of size %u. "
1495               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1496               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1497               chn, size,
1498               GNUNET_ntohll (mmsg->fragment_id),
1499               GNUNET_ntohll (mmsg->message_id),
1500               GNUNET_ntohll (mmsg->fragment_offset),
1501               GNUNET_ntohll (mmsg->flags));
1502
1503   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1504                                    &store_recv_fragment_store_result, chn);
1505
1506   uint16_t first_ptype = 0, last_ptype = 0;
1507   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1508                                                (const char *) &mmsg[1],
1509                                                &first_ptype, &last_ptype);
1510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511               "%p Message check result %d, first part type %u, last part type %u\n",
1512               chn, check, first_ptype, last_ptype);
1513   if (GNUNET_SYSERR == check)
1514   {
1515     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1516                 "%p Dropping incoming multicast message with invalid parts.\n",
1517                 chn);
1518     GNUNET_break_op (0);
1519     return;
1520   }
1521
1522   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1523   message_queue_run (chn);
1524 }
1525
1526
1527 /**
1528  * Incoming request fragment from multicast for a master.
1529  *
1530  * @param cls   Master.
1531  * @param req   The request.
1532  */
1533 static void
1534 mcast_recv_request (void *cls,
1535                     const struct GNUNET_MULTICAST_RequestHeader *req)
1536 {
1537   struct Master *mst = cls;
1538   uint16_t size = ntohs (req->header.size);
1539
1540   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542               "%p Received multicast request of size %u from %s.\n",
1543               mst, size, str);
1544   GNUNET_free (str);
1545
1546   uint16_t first_ptype = 0, last_ptype = 0;
1547   if (GNUNET_SYSERR
1548       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1549                                           (const char *) &req[1],
1550                                           &first_ptype, &last_ptype))
1551   {
1552     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1553                 "%p Dropping incoming multicast request with invalid parts.\n",
1554                 mst);
1555     GNUNET_break_op (0);
1556     return;
1557   }
1558
1559   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1560               "Message parts: first: type %u, last: type %u\n",
1561               first_ptype, last_ptype);
1562
1563   /* FIXME: in-order delivery */
1564   client_send_mcast_req (mst, req);
1565 }
1566
1567
1568 /**
1569  * Response from PSYCstore with the current counter values for a channel master.
1570  */
1571 static void
1572 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1573                             uint64_t max_message_id, uint64_t max_group_generation,
1574                             uint64_t max_state_message_id)
1575 {
1576   struct Master *mst = cls;
1577   struct Channel *chn = &mst->chn;
1578   chn->store_op = NULL;
1579
1580   struct GNUNET_PSYC_CountersResultMessage res;
1581   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1582   res.header.size = htons (sizeof (res));
1583   res.result_code = htonl (result);
1584   res.max_message_id = GNUNET_htonll (max_message_id);
1585
1586   if (GNUNET_OK == result || GNUNET_NO == result)
1587   {
1588     mst->max_message_id = max_message_id;
1589     chn->max_message_id = max_message_id;
1590     chn->max_state_message_id = max_state_message_id;
1591     mst->max_group_generation = max_group_generation;
1592     mst->origin
1593       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1594                                        mcast_recv_join_request,
1595                                        mcast_recv_replay_fragment,
1596                                        mcast_recv_replay_message,
1597                                        mcast_recv_request,
1598                                        mcast_recv_message, chn);
1599     chn->is_ready = GNUNET_YES;
1600   }
1601   else
1602   {
1603     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1604                 "%p GNUNET_PSYCSTORE_counters_get() "
1605                 "returned %d for channel %s.\n",
1606                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1607   }
1608
1609   client_send_msg (chn, &res.header);
1610 }
1611
1612
1613 /**
1614  * Response from PSYCstore with the current counter values for a channel slave.
1615  */
1616 void
1617 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1618                            uint64_t max_message_id, uint64_t max_group_generation,
1619                            uint64_t max_state_message_id)
1620 {
1621   struct Slave *slv = cls;
1622   struct Channel *chn = &slv->chn;
1623   chn->store_op = NULL;
1624
1625   struct GNUNET_PSYC_CountersResultMessage res;
1626   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1627   res.header.size = htons (sizeof (res));
1628   res.result_code = htonl (result);
1629   res.max_message_id = GNUNET_htonll (max_message_id);
1630
1631   if (GNUNET_OK == result || GNUNET_NO == result)
1632   {
1633     chn->max_message_id = max_message_id;
1634     chn->max_state_message_id = max_state_message_id;
1635     slv->member
1636       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1637                                       &slv->origin,
1638                                       slv->relay_count, slv->relays,
1639                                       &slv->join_msg->header,
1640                                       mcast_recv_join_request,
1641                                       mcast_recv_join_decision,
1642                                       mcast_recv_replay_fragment,
1643                                       mcast_recv_replay_message,
1644                                       mcast_recv_message, chn);
1645     if (NULL != slv->join_msg)
1646     {
1647       GNUNET_free (slv->join_msg);
1648       slv->join_msg = NULL;
1649     }
1650   }
1651   else
1652   {
1653     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1654                 "%p GNUNET_PSYCSTORE_counters_get() "
1655                 "returned %d for channel %s.\n",
1656                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1657   }
1658
1659   client_send_msg (chn, &res.header);
1660 }
1661
1662
1663 static void
1664 channel_init (struct Channel *chn)
1665 {
1666   chn->recv_msgs
1667     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1668   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1669 }
1670
1671
1672 /**
1673  * Handle a connecting client starting a channel master.
1674  */
1675 static void
1676 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1677                           const struct GNUNET_MessageHeader *msg)
1678 {
1679   const struct MasterStartRequest *req
1680     = (const struct MasterStartRequest *) msg;
1681
1682   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1683   struct GNUNET_HashCode pub_key_hash;
1684
1685   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1686   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1687
1688   struct Master *
1689     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1690   struct Channel *chn;
1691
1692   if (NULL == mst)
1693   {
1694     mst = GNUNET_new (struct Master);
1695     mst->policy = ntohl (req->policy);
1696     mst->priv_key = req->channel_key;
1697     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1698
1699     chn = &mst->chn;
1700     chn->is_master = GNUNET_YES;
1701     chn->pub_key = pub_key;
1702     chn->pub_key_hash = pub_key_hash;
1703     channel_init (chn);
1704
1705     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1706                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1707     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1708                                                    store_recv_master_counters, mst);
1709   }
1710   else
1711   {
1712     chn = &mst->chn;
1713
1714     struct GNUNET_PSYC_CountersResultMessage res;
1715     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1716     res.header.size = htons (sizeof (res));
1717     res.result_code = htonl (GNUNET_OK);
1718     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1719
1720     GNUNET_SERVER_notification_context_add (nc, client);
1721     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1722                                                 GNUNET_NO);
1723   }
1724
1725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726               "%p Client connected as master to channel %s.\n",
1727               mst, GNUNET_h2s (&chn->pub_key_hash));
1728
1729   struct Client *cli = GNUNET_new (struct Client);
1730   cli->client = client;
1731   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1732
1733   GNUNET_SERVER_client_set_user_context (client, chn);
1734   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1735 }
1736
1737
1738 /**
1739  * Handle a connecting client joining as a channel slave.
1740  */
1741 static void
1742 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1743                         const struct GNUNET_MessageHeader *msg)
1744 {
1745   const struct SlaveJoinRequest *req
1746     = (const struct SlaveJoinRequest *) msg;
1747   uint16_t req_size = ntohs (req->header.size);
1748
1749   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1750   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1751
1752   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1753   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1754   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1755
1756   struct GNUNET_CONTAINER_MultiHashMap *
1757     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1758   struct Slave *slv = NULL;
1759   struct Channel *chn;
1760
1761   if (NULL != chn_slv)
1762   {
1763     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1764   }
1765   if (NULL == slv)
1766   {
1767     slv = GNUNET_new (struct Slave);
1768     slv->priv_key = req->slave_key;
1769     slv->pub_key = slv_pub_key;
1770     slv->pub_key_hash = slv_pub_hash;
1771     slv->origin = req->origin;
1772     slv->relay_count = ntohl (req->relay_count);
1773     slv->join_flags = ntohl (req->flags);
1774
1775     const struct GNUNET_PeerIdentity *
1776       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1777     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1778     uint16_t join_msg_size = 0;
1779
1780     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1781         <= req_size)
1782     {
1783       struct GNUNET_PSYC_Message *
1784         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1785       join_msg_size = ntohs (join_msg->header.size);
1786       slv->join_msg = GNUNET_malloc (join_msg_size);
1787       memcpy (slv->join_msg, join_msg, join_msg_size);
1788     }
1789     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1790     {
1791       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1792                   "%u + %u + %u != %u\n",
1793                   (unsigned int) sizeof (*req),
1794                   relay_size,
1795                   join_msg_size,
1796                   req_size);
1797       GNUNET_break (0);
1798       GNUNET_SERVER_client_disconnect (client);
1799       GNUNET_free (slv);
1800       return;
1801     }
1802     if (0 < slv->relay_count)
1803     {
1804       slv->relays = GNUNET_malloc (relay_size);
1805       memcpy (slv->relays, &req[1], relay_size);
1806     }
1807
1808     chn = &slv->chn;
1809     chn->is_master = GNUNET_NO;
1810     chn->pub_key = req->channel_pub_key;
1811     chn->pub_key_hash = pub_key_hash;
1812     channel_init (chn);
1813
1814     if (NULL == chn_slv)
1815     {
1816       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1817       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1818                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1819     }
1820     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1821                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1822     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1823                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1824     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1825                                                   &store_recv_slave_counters, slv);
1826   }
1827   else
1828   {
1829     chn = &slv->chn;
1830
1831     struct GNUNET_PSYC_CountersResultMessage res;
1832     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1833     res.header.size = htons (sizeof (res));
1834     res.result_code = htonl (GNUNET_OK);
1835     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1836
1837     GNUNET_SERVER_notification_context_add (nc, client);
1838     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1839                                                 GNUNET_NO);
1840
1841     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1842     {
1843       mcast_recv_join_decision (slv, GNUNET_YES,
1844                                 NULL, 0, NULL, NULL);
1845     }
1846     else if (NULL == slv->member)
1847     {
1848       slv->member
1849         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1850                                         &slv->origin,
1851                                         slv->relay_count, slv->relays,
1852                                         &slv->join_msg->header,
1853                                         &mcast_recv_join_request,
1854                                         &mcast_recv_join_decision,
1855                                         &mcast_recv_replay_fragment,
1856                                         &mcast_recv_replay_message,
1857                                         &mcast_recv_message, chn);
1858       if (NULL != slv->join_msg)
1859       {
1860         GNUNET_free (slv->join_msg);
1861         slv->join_msg = NULL;
1862       }
1863     }
1864     else if (NULL != slv->join_dcsn)
1865     {
1866       GNUNET_SERVER_notification_context_add (nc, client);
1867       GNUNET_SERVER_notification_context_unicast (nc, client,
1868                                                   &slv->join_dcsn->header,
1869                                                   GNUNET_NO);
1870     }
1871   }
1872
1873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1874               "%p Client connected as slave to channel %s.\n",
1875               slv, GNUNET_h2s (&chn->pub_key_hash));
1876
1877   struct Client *cli = GNUNET_new (struct Client);
1878   cli->client = client;
1879   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1880
1881   GNUNET_SERVER_client_set_user_context (client, chn);
1882   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1883 }
1884
1885
1886 struct JoinDecisionClosure
1887 {
1888   int32_t is_admitted;
1889   struct GNUNET_MessageHeader *msg;
1890 };
1891
1892
1893 /**
1894  * Iterator callback for sending join decisions to multicast.
1895  */
1896 static int
1897 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1898                           void *value)
1899 {
1900   struct JoinDecisionClosure *jcls = cls;
1901   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1902   // FIXME: add relays
1903   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1904   return GNUNET_YES;
1905 }
1906
1907
1908 /**
1909  * Join decision from client.
1910  */
1911 static void
1912 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1913                            const struct GNUNET_MessageHeader *msg)
1914 {
1915   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1916     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1917   struct Channel *chn;
1918   struct Master *mst;
1919   struct JoinDecisionClosure jcls;
1920
1921   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1922   if (NULL == chn)
1923   {
1924     GNUNET_break (0);
1925     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1926     return;
1927   }
1928   GNUNET_assert (GNUNET_YES == chn->is_master);
1929   mst = (struct Master *) chn;
1930   jcls.is_admitted = ntohl (dcsn->is_admitted);
1931   jcls.msg
1932     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1933     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1934     : NULL;
1935
1936   struct GNUNET_HashCode slave_pub_hash;
1937   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1938                       &slave_pub_hash);
1939
1940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1941               "%p Got join decision (%d) from client for channel %s..\n",
1942               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1944               "%p ..and slave %s.\n",
1945               mst, GNUNET_h2s (&slave_pub_hash));
1946
1947   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1948                                               &mcast_send_join_decision, &jcls);
1949   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1950   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1951 }
1952
1953
1954 /**
1955  * Send acknowledgement to a client.
1956  *
1957  * Sent after a message fragment has been passed on to multicast.
1958  *
1959  * @param chn The channel struct for the client.
1960  */
1961 static void
1962 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1963 {
1964   struct GNUNET_MessageHeader res;
1965   res.size = htons (sizeof (res));
1966   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1967
1968   /* FIXME */
1969   GNUNET_SERVER_notification_context_add (nc, client);
1970   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1971 }
1972
1973
1974 /**
1975  * Callback for the transmit functions of multicast.
1976  */
1977 static int
1978 transmit_notify (void *cls, size_t *data_size, void *data)
1979 {
1980   struct Channel *chn = cls;
1981   struct TransmitMessage *tmit_msg = chn->tmit_head;
1982
1983   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1984   {
1985     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1986                 "%p transmit_notify: nothing to send.\n", chn);
1987     if (NULL != tmit_msg && *data_size < tmit_msg->size)
1988       GNUNET_break (0);
1989     *data_size = 0;
1990     return GNUNET_NO;
1991   }
1992
1993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1995
1996   *data_size = tmit_msg->size;
1997   memcpy (data, &tmit_msg[1], *data_size);
1998
1999   int ret
2000     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2001     ? GNUNET_NO
2002     : GNUNET_YES;
2003
2004   /* FIXME: handle disconnecting clients */
2005   if (NULL != tmit_msg->client)
2006     send_message_ack (chn, tmit_msg->client);
2007
2008   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2009   GNUNET_free (tmit_msg);
2010
2011   if (NULL != chn->tmit_head)
2012   {
2013     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2014   }
2015   else if (GNUNET_YES == chn->is_disconnected
2016            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2017   {
2018     /* FIXME: handle partial message (when still in_transmit) */
2019     return GNUNET_SYSERR;
2020   }
2021   return ret;
2022 }
2023
2024
2025 /**
2026  * Callback for the transmit functions of multicast.
2027  */
2028 static int
2029 master_transmit_notify (void *cls, size_t *data_size, void *data)
2030 {
2031   int ret = transmit_notify (cls, data_size, data);
2032
2033   if (GNUNET_YES == ret)
2034   {
2035     struct Master *mst = cls;
2036     mst->tmit_handle = NULL;
2037   }
2038   return ret;
2039 }
2040
2041
2042 /**
2043  * Callback for the transmit functions of multicast.
2044  */
2045 static int
2046 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2047 {
2048   int ret = transmit_notify (cls, data_size, data);
2049
2050   if (GNUNET_YES == ret)
2051   {
2052     struct Slave *slv = cls;
2053     slv->tmit_handle = NULL;
2054   }
2055   return ret;
2056 }
2057
2058
2059 /**
2060  * Transmit a message from a channel master to the multicast group.
2061  */
2062 static void
2063 master_transmit_message (struct Master *mst)
2064 {
2065   struct Channel *chn = &mst->chn;
2066   struct TransmitMessage *tmit_msg = chn->tmit_head;
2067   if (NULL == tmit_msg)
2068     return;
2069   if (NULL == mst->tmit_handle)
2070   {
2071     mst->tmit_handle
2072       = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2073                                         mst->max_group_generation,
2074                                         master_transmit_notify, mst);
2075   }
2076   else
2077   {
2078     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2079   }
2080 }
2081
2082
2083 /**
2084  * Transmit a message from a channel slave to the multicast group.
2085  */
2086 static void
2087 slave_transmit_message (struct Slave *slv)
2088 {
2089   if (NULL == slv->chn.tmit_head)
2090     return;
2091   if (NULL == slv->tmit_handle)
2092   {
2093     slv->tmit_handle
2094       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2095                                            slave_transmit_notify, slv);
2096   }
2097   else
2098   {
2099     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2100   }
2101 }
2102
2103
2104 static void
2105 transmit_message (struct Channel *chn)
2106 {
2107   chn->is_master
2108     ? master_transmit_message ((struct Master *) chn)
2109     : slave_transmit_message ((struct Slave *) chn);
2110 }
2111
2112
2113 /**
2114  * Queue a message from a channel master for sending to the multicast group.
2115  */
2116 static void
2117 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2118 {
2119   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2120
2121   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2122   {
2123     tmit_msg->id = ++mst->max_message_id;
2124     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2125                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2126                 mst, tmit_msg->id);
2127     struct GNUNET_PSYC_MessageMethod *pmeth
2128       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2129
2130     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2131     {
2132       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2133     }
2134     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2135     {
2136       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2137                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2138                   mst, tmit_msg->id - mst->max_state_message_id);
2139       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2140                                           - mst->max_state_message_id);
2141       mst->max_state_message_id = tmit_msg->id;
2142     }
2143     else
2144     {
2145         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2146                     "%p master_queue_message: state not modified\n", mst);
2147       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2148     }
2149
2150     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2151     {
2152       /// @todo add state_hash to PSYC header
2153     }
2154   }
2155 }
2156
2157
2158 /**
2159  * Queue a message from a channel slave for sending to the multicast group.
2160  */
2161 static void
2162 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2163 {
2164   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2165   {
2166     struct GNUNET_PSYC_MessageMethod *pmeth
2167       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2168     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2169     tmit_msg->id = ++slv->max_request_id;
2170   }
2171 }
2172
2173
2174 /**
2175  * Queue PSYC message parts for sending to multicast.
2176  *
2177  * @param chn
2178  *        Channel to send to.
2179  * @param client
2180  *        Client the message originates from.
2181  * @param data_size
2182  *        Size of @a data.
2183  * @param data
2184  *        Concatenated message parts.
2185  * @param first_ptype
2186  *        First message part type in @a data.
2187  * @param last_ptype
2188  *        Last message part type in @a data.
2189  */
2190 static struct TransmitMessage *
2191 queue_message (struct Channel *chn,
2192                struct GNUNET_SERVER_Client *client,
2193                size_t data_size,
2194                const void *data,
2195                uint16_t first_ptype, uint16_t last_ptype)
2196 {
2197   struct TransmitMessage *
2198     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2199   memcpy (&tmit_msg[1], data, data_size);
2200   tmit_msg->client = client;
2201   tmit_msg->size = data_size;
2202   tmit_msg->first_ptype = first_ptype;
2203   tmit_msg->last_ptype = last_ptype;
2204
2205   /* FIXME: separate queue per message ID */
2206
2207   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2208
2209   chn->is_master
2210     ? master_queue_message ((struct Master *) chn, tmit_msg)
2211     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2212   return tmit_msg;
2213 }
2214
2215
2216 /**
2217  * Cancel transmission of current message.
2218  *
2219  * @param chn     Channel to send to.
2220  * @param client  Client the message originates from.
2221  */
2222 static void
2223 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2224 {
2225   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2226
2227   struct GNUNET_MessageHeader msg;
2228   msg.size = htons (sizeof (msg));
2229   msg.type = htons (type);
2230
2231   queue_message (chn, client, sizeof (msg), &msg, type, type);
2232   transmit_message (chn);
2233
2234   /* FIXME: cleanup */
2235 }
2236
2237
2238 /**
2239  * Incoming message from a master or slave client.
2240  */
2241 static void
2242 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2243                           const struct GNUNET_MessageHeader *msg)
2244 {
2245   struct Channel *
2246     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2247   GNUNET_assert (NULL != chn);
2248
2249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2250               "%p Received message from client.\n", chn);
2251   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2252
2253   if (GNUNET_YES != chn->is_ready)
2254   {
2255     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2256                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2257     GNUNET_break (0);
2258     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2259     return;
2260   }
2261
2262   uint16_t size = ntohs (msg->size);
2263   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2264   {
2265     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2266                 "%p Message payload too large: %u < %u.\n",
2267                 chn,
2268                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2269                 (unsigned int) (size - sizeof (*msg)));
2270     GNUNET_break (0);
2271     transmit_cancel (chn, client);
2272     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2273     return;
2274   }
2275
2276   uint16_t first_ptype = 0, last_ptype = 0;
2277   if (GNUNET_SYSERR
2278       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2279                                           (const char *) &msg[1],
2280                                           &first_ptype, &last_ptype))
2281   {
2282     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2283                 "%p Received invalid message part from client.\n", chn);
2284     GNUNET_break (0);
2285     transmit_cancel (chn, client);
2286     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2287     return;
2288   }
2289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2290               "%p Received message with first part type %u and last part type %u.\n",
2291               chn, first_ptype, last_ptype);
2292
2293   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2294                  first_ptype, last_ptype);
2295   transmit_message (chn);
2296   /* FIXME: send a few ACKs even before transmit_notify is called */
2297
2298   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2299 };
2300
2301
2302 /**
2303  * Received result of GNUNET_PSYCSTORE_membership_store()
2304  */
2305 static void
2306 store_recv_membership_store_result (void *cls,
2307                                     int64_t result,
2308                                     const char *err_msg,
2309                                     uint16_t err_msg_size)
2310 {
2311   struct Operation *op = cls;
2312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2313               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2314               op->chn,
2315               result,
2316               (int) err_msg_size,
2317               err_msg);
2318
2319   if (NULL != op->client)
2320     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2321   op_remove (op);
2322 }
2323
2324
2325 /**
2326  * Client requests to add/remove a slave in the membership database.
2327  */
2328 static void
2329 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2330                               const struct GNUNET_MessageHeader *msg)
2331 {
2332   struct Channel *
2333     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2334   GNUNET_assert (NULL != chn);
2335
2336   const struct ChannelMembershipStoreRequest *
2337     req = (const struct ChannelMembershipStoreRequest *) msg;
2338
2339   struct Operation *op = op_add (chn, client, req->op_id, 0);
2340
2341   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2342   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2344               "%p Received membership store request from client.\n", chn);
2345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2346               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2347               chn, req->did_join, announced_at, effective_since);
2348
2349   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2350                                      req->did_join, announced_at, effective_since,
2351                                      0, /* FIXME: group_generation */
2352                                      &store_recv_membership_store_result, op);
2353   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2354 }
2355
2356
2357 /**
2358  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2359  * in response to a history request from a client.
2360  */
2361 static int
2362 store_recv_fragment_history (void *cls,
2363                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2364                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2365 {
2366   struct Operation *op = cls;
2367   if (NULL == op->client)
2368   { /* Requesting client already disconnected. */
2369     return GNUNET_NO;
2370   }
2371   struct Channel *chn = op->chn;
2372
2373   struct GNUNET_PSYC_MessageHeader *pmsg;
2374   uint16_t msize = ntohs (mmsg->header.size);
2375   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2376
2377   struct GNUNET_OperationResultMessage *
2378     res = GNUNET_malloc (sizeof (*res) + psize);
2379   res->header.size = htons (sizeof (*res) + psize);
2380   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2381   res->op_id = op->op_id;
2382   res->result_code = GNUNET_htonll (GNUNET_OK);
2383
2384   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2385   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2386   memcpy (&res[1], pmsg, psize);
2387
2388   /** @todo FIXME: send only to requesting client */
2389   client_send_msg (chn, &res->header);
2390   return GNUNET_YES;
2391 }
2392
2393
2394 /**
2395  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2396  * in response to a history request from a client.
2397  */
2398 static void
2399 store_recv_fragment_history_result (void *cls, int64_t result,
2400                                     const char *err_msg, uint16_t err_msg_size)
2401 {
2402   struct Operation *op = cls;
2403   if (NULL == op->client)
2404   { /* Requesting client already disconnected. */
2405     return;
2406   }
2407
2408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2409               "%p History replay #%" PRIu64 ": "
2410               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2411               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2412
2413   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2414   {
2415     /** @todo Multicast replay request for messages not found locally. */
2416   }
2417
2418   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2419   op_remove (op);
2420 }
2421
2422
2423 /**
2424  * Client requests channel history.
2425  */
2426 static void
2427 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2428                             const struct GNUNET_MessageHeader *msg)
2429 {
2430   struct Channel *
2431     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2432   GNUNET_assert (NULL != chn);
2433
2434   const struct GNUNET_PSYC_HistoryRequestMessage *
2435     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2436   uint16_t size = ntohs (msg->size);
2437   const char *method_prefix = (const char *) &req[1];
2438
2439   if (size < sizeof (*req) + 1
2440       || '\0' != method_prefix[size - sizeof (*req) - 1])
2441   {
2442     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2443                 "%p History replay #%" PRIu64 ": "
2444                 "invalid method prefix. size: %u < %u?\n",
2445                 chn,
2446                 GNUNET_ntohll (req->op_id),
2447                 size,
2448                 (unsigned int) sizeof (*req) + 1);
2449     GNUNET_break (0);
2450     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2451     return;
2452   }
2453
2454   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2455
2456   if (0 == req->message_limit)
2457     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2458                                   GNUNET_ntohll (req->start_message_id),
2459                                   GNUNET_ntohll (req->end_message_id),
2460                                   0, method_prefix,
2461                                   &store_recv_fragment_history,
2462                                   &store_recv_fragment_history_result, op);
2463   else
2464     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2465                                          GNUNET_ntohll (req->message_limit),
2466                                          method_prefix,
2467                                          &store_recv_fragment_history,
2468                                          &store_recv_fragment_history_result,
2469                                          op);
2470
2471   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2472 }
2473
2474
2475 /**
2476  * Received state var from PSYCstore, send it to client.
2477  */
2478 static int
2479 store_recv_state_var (void *cls, const char *name,
2480                       const void *value, uint32_t value_size)
2481 {
2482   struct Operation *op = cls;
2483   struct GNUNET_OperationResultMessage *res;
2484
2485   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2486               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2487               op->chn, GNUNET_ntohll (op->op_id), name);
2488
2489   if (NULL != name) /* First part */
2490   {
2491     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2492     struct GNUNET_PSYC_MessageModifier *mod;
2493     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2494     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2495     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2496     res->op_id = op->op_id;
2497
2498     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2499     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2500     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2501     mod->name_size = htons (name_size);
2502     mod->value_size = htonl (value_size);
2503     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2504     memcpy (&mod[1], name, name_size);
2505     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2506   }
2507   else /* Continuation */
2508   {
2509     struct GNUNET_MessageHeader *mod;
2510     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2511     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2512     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2513     res->op_id = op->op_id;
2514
2515     mod = (struct GNUNET_MessageHeader *) &res[1];
2516     mod->size = htons (sizeof (*mod) + value_size);
2517     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2518     memcpy (&mod[1], value, value_size);
2519   }
2520
2521   // FIXME: client might have been disconnected
2522   GNUNET_SERVER_notification_context_add (nc, op->client);
2523   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2524                                               GNUNET_NO);
2525   return GNUNET_YES;
2526 }
2527
2528
2529 /**
2530  * Received result of GNUNET_PSYCSTORE_state_get()
2531  * or GNUNET_PSYCSTORE_state_get_prefix()
2532  */
2533 static void
2534 store_recv_state_result (void *cls, int64_t result,
2535                          const char *err_msg, uint16_t err_msg_size)
2536 {
2537   struct Operation *op = cls;
2538   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2539               "%p state_get #%" PRIu64 ": "
2540               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2541               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2542
2543   // FIXME: client might have been disconnected
2544   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2545   op_remove (op);
2546 }
2547
2548
2549 /**
2550  * Client requests best matching state variable from PSYCstore.
2551  */
2552 static void
2553 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2554                        const struct GNUNET_MessageHeader *msg)
2555 {
2556   struct Channel *
2557     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2558   GNUNET_assert (NULL != chn);
2559
2560   const struct StateRequest *
2561     req = (const struct StateRequest *) msg;
2562
2563   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2564   const char *name = (const char *) &req[1];
2565   if (0 == name_size || '\0' != name[name_size - 1])
2566   {
2567     GNUNET_break (0);
2568     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2569     return;
2570   }
2571
2572   struct Operation *op = op_add (chn, client, req->op_id, 0);
2573   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2574                               &store_recv_state_var,
2575                               &store_recv_state_result, op);
2576   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2577 }
2578
2579
2580 /**
2581  * Client requests state variables with a given prefix from PSYCstore.
2582  */
2583 static void
2584 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2585                               const struct GNUNET_MessageHeader *msg)
2586 {
2587   struct Channel *
2588     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2589   GNUNET_assert (NULL != chn);
2590
2591   const struct StateRequest *
2592     req = (const struct StateRequest *) msg;
2593
2594   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2595   const char *name = (const char *) &req[1];
2596   if (0 == name_size || '\0' != name[name_size - 1])
2597   {
2598     GNUNET_break (0);
2599     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2600     return;
2601   }
2602
2603   struct Operation *op = op_add (chn, client, req->op_id, 0);
2604   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2605                                      &store_recv_state_var,
2606                                      &store_recv_state_result, op);
2607   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2608 }
2609
2610
2611 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2612   { &client_recv_master_start, NULL,
2613     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2614
2615   { &client_recv_slave_join, NULL,
2616     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2617
2618   { &client_recv_join_decision, NULL,
2619     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2620
2621   { &client_recv_psyc_message, NULL,
2622     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2623
2624   { &client_recv_membership_store, NULL,
2625     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2626
2627   { &client_recv_history_replay, NULL,
2628     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2629
2630   { &client_recv_state_get, NULL,
2631     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2632
2633   { &client_recv_state_get_prefix, NULL,
2634     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2635
2636   { NULL, NULL, 0, 0 }
2637 };
2638
2639
2640 /**
2641  * Initialize the PSYC service.
2642  *
2643  * @param cls Closure.
2644  * @param server The initialized server.
2645  * @param c Configuration to use.
2646  */
2647 static void
2648 run (void *cls, struct GNUNET_SERVER_Handle *server,
2649      const struct GNUNET_CONFIGURATION_Handle *c)
2650 {
2651   cfg = c;
2652   store = GNUNET_PSYCSTORE_connect (cfg);
2653   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2654   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2655   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2656   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2657   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2658   nc = GNUNET_SERVER_notification_context_create (server, 1);
2659   GNUNET_SERVER_add_handlers (server, server_handlers);
2660   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2661   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2662 }
2663
2664
2665 /**
2666  * The main function for the service.
2667  *
2668  * @param argc number of arguments from the command line
2669  * @param argv command line arguments
2670  * @return 0 ok, 1 on error
2671  */
2672 int
2673 main (int argc, char *const *argv)
2674 {
2675   return (GNUNET_OK ==
2676           GNUNET_SERVICE_run (argc, argv, "psyc",
2677                               GNUNET_SERVICE_OPTION_NONE,
2678                               &run, NULL)) ? 0 : 1;
2679 }
2680
2681 /* end of gnunet-service-psyc.c */