social needs to start as user service, not system
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * Type of first message part.
102    */
103   uint16_t first_ptype;
104
105   /**
106    * Type of last message part.
107    */
108   uint16_t last_ptype;
109
110   /* Followed by message */
111 };
112
113
114 /**
115  * Cache for received message fragments.
116  * Message fragments are only sent to clients after all modifiers arrived.
117  *
118  * chan_key -> MultiHashMap chan_msgs
119  */
120 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
121
122
123 /**
124  * Entry in the chan_msgs hashmap of @a recv_cache:
125  * fragment_id -> RecvCacheEntry
126  */
127 struct RecvCacheEntry
128 {
129   struct GNUNET_MULTICAST_MessageHeader *mmsg;
130   uint16_t ref_count;
131 };
132
133
134 /**
135  * Entry in the @a recv_frags hash map of a @a Channel.
136  * message_id -> FragmentQueue
137  */
138 struct FragmentQueue
139 {
140   /**
141    * Fragment IDs stored in @a recv_cache.
142    */
143   struct GNUNET_CONTAINER_Heap *fragments;
144
145   /**
146    * Total size of received fragments.
147    */
148   uint64_t size;
149
150   /**
151    * Total size of received header fragments (METHOD & MODIFIERs)
152    */
153   uint64_t header_size;
154
155   /**
156    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
157    */
158   uint64_t state_delta;
159
160   /**
161    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
162    */
163   uint32_t flags;
164
165   /**
166    * Receive state of message.
167    *
168    * @see MessageFragmentState
169    */
170   uint8_t state;
171
172   /**
173    * Whether the state is already modified in PSYCstore.
174    */
175   uint8_t state_is_modified;
176
177   /**
178    * Is the message queued for delivery to the client?
179    * i.e. added to the recv_msgs queue
180    */
181   uint8_t is_queued;
182 };
183
184
185 /**
186  * List of connected clients.
187  */
188 struct Client
189 {
190   struct Client *prev;
191   struct Client *next;
192
193   struct GNUNET_SERVER_Client *client;
194 };
195
196
197 struct Operation
198 {
199   struct Operation *prev;
200   struct Operation *next;
201
202   struct GNUNET_SERVER_Client *client;
203   struct Channel *chn;
204   uint64_t op_id;
205   uint32_t flags;
206 };
207
208
209 /**
210  * Common part of the client context for both a channel master and slave.
211  */
212 struct Channel
213 {
214   struct Client *clients_head;
215   struct Client *clients_tail;
216
217   struct Operation *op_head;
218   struct Operation *op_tail;
219
220   struct TransmitMessage *tmit_head;
221   struct TransmitMessage *tmit_tail;
222
223   /**
224    * Current PSYCstore operation.
225    */
226   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
227
228   /**
229    * Received fragments not yet sent to the client.
230    * message_id -> FragmentQueue
231    */
232   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
233
234   /**
235    * Received message IDs not yet sent to the client.
236    */
237   struct GNUNET_CONTAINER_Heap *recv_msgs;
238
239   /**
240    * Public key of the channel.
241    */
242   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
243
244   /**
245    * Hash of @a pub_key.
246    */
247   struct GNUNET_HashCode pub_key_hash;
248
249   /**
250    * Last message ID sent to the client.
251    * 0 if there is no such message.
252    */
253   uint64_t max_message_id;
254
255   /**
256    * ID of the last stateful message, where the state operations has been
257    * processed and saved to PSYCstore and which has been sent to the client.
258    * 0 if there is no such message.
259    */
260   uint64_t max_state_message_id;
261
262   /**
263    * Expected value size for the modifier being received from the PSYC service.
264    */
265   uint32_t tmit_mod_value_size_expected;
266
267   /**
268    * Actual value size for the modifier being received from the PSYC service.
269    */
270   uint32_t tmit_mod_value_size;
271
272   /**
273    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274    */
275   uint8_t is_master;
276
277   /**
278    * Is this channel ready to receive messages from client?
279    * #GNUNET_YES or #GNUNET_NO
280    */
281   uint8_t is_ready;
282
283   /**
284    * Is the client disconnected?
285    * #GNUNET_YES or #GNUNET_NO
286    */
287   uint8_t is_disconnected;
288 };
289
290
291 /**
292  * Client context for a channel master.
293  */
294 struct Master
295 {
296   /**
297    * Channel struct common for Master and Slave
298    */
299   struct Channel chn;
300
301   /**
302    * Private key of the channel.
303    */
304   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
305
306   /**
307    * Handle for the multicast origin.
308    */
309   struct GNUNET_MULTICAST_Origin *origin;
310
311   /**
312    * Transmit handle for multicast.
313    */
314   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
315
316   /**
317    * Incoming join requests from multicast.
318    * member_pub_key -> struct GNUNET_MULTICAST_JoinHandle *
319    */
320   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
321
322   /**
323    * Last message ID transmitted to this channel.
324    *
325    * Incremented before sending a message, thus the message_id in messages sent
326    * starts from 1.
327    */
328   uint64_t max_message_id;
329
330   /**
331    * ID of the last message with state operations transmitted to the channel.
332    * 0 if there is no such message.
333    */
334   uint64_t max_state_message_id;
335
336   /**
337    * Maximum group generation transmitted to the channel.
338    */
339   uint64_t max_group_generation;
340
341   /**
342    * @see enum GNUNET_PSYC_Policy
343    */
344   enum GNUNET_PSYC_Policy policy;
345 };
346
347
348 /**
349  * Client context for a channel slave.
350  */
351 struct Slave
352 {
353   /**
354    * Channel struct common for Master and Slave
355    */
356   struct Channel chn;
357
358   /**
359    * Private key of the slave.
360    */
361   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
362
363   /**
364    * Public key of the slave.
365    */
366   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
367
368   /**
369    * Hash of @a pub_key.
370    */
371   struct GNUNET_HashCode pub_key_hash;
372
373   /**
374    * Handle for the multicast member.
375    */
376   struct GNUNET_MULTICAST_Member *member;
377
378   /**
379    * Transmit handle for multicast.
380    */
381   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
382
383   /**
384    * Peer identity of the origin.
385    */
386   struct GNUNET_PeerIdentity origin;
387
388   /**
389    * Number of items in @a relays.
390    */
391   uint32_t relay_count;
392
393   /**
394    * Relays that multicast can use to connect.
395    */
396   struct GNUNET_PeerIdentity *relays;
397
398   /**
399    * Join request to be transmitted to the master on join.
400    */
401   struct GNUNET_PSYC_Message *join_msg;
402
403   /**
404    * Join decision received from multicast.
405    */
406   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
407
408   /**
409    * Maximum request ID for this channel.
410    */
411   uint64_t max_request_id;
412
413   /**
414    * Join flags.
415    */
416   enum GNUNET_PSYC_SlaveJoinFlags join_flags;
417 };
418
419
420 static void
421 transmit_message (struct Channel *chn);
422
423 static uint64_t
424 message_queue_run (struct Channel *chn);
425
426 static uint64_t
427 message_queue_drop (struct Channel *chn);
428
429
430 static void
431 schedule_transmit_message (void *cls)
432 {
433   struct Channel *chn = cls;
434
435   transmit_message (chn);
436 }
437
438
439 /**
440  * Task run during shutdown.
441  *
442  * @param cls unused
443  */
444 static void
445 shutdown_task (void *cls)
446 {
447   if (NULL != nc)
448   {
449     GNUNET_SERVER_notification_context_destroy (nc);
450     nc = NULL;
451   }
452   if (NULL != stats)
453   {
454     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
455     stats = NULL;
456   }
457 }
458
459
460 static struct Operation *
461 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
462         uint64_t op_id, uint32_t flags)
463 {
464   struct Operation *op = GNUNET_malloc (sizeof (*op));
465   op->client = client;
466   op->chn = chn;
467   op->op_id = op_id;
468   op->flags = flags;
469   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
470   return op;
471 }
472
473
474 static void
475 op_remove (struct Operation *op)
476 {
477   GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
478   GNUNET_free (op);
479 }
480
481
482 /**
483  * Clean up master data structures after a client disconnected.
484  */
485 static void
486 cleanup_master (struct Master *mst)
487 {
488   struct Channel *chn = &mst->chn;
489
490   if (NULL != mst->origin)
491     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
492   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
493   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
494 }
495
496
497 /**
498  * Clean up slave data structures after a client disconnected.
499  */
500 static void
501 cleanup_slave (struct Slave *slv)
502 {
503   struct Channel *chn = &slv->chn;
504   struct GNUNET_CONTAINER_MultiHashMap *
505     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
506                                                 &chn->pub_key_hash);
507   GNUNET_assert (NULL != chn_slv);
508   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
509
510   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
511   {
512     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
513                                           chn_slv);
514     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
515   }
516   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
517
518   if (NULL != slv->join_msg)
519   {
520     GNUNET_free (slv->join_msg);
521     slv->join_msg = NULL;
522   }
523   if (NULL != slv->relays)
524   {
525     GNUNET_free (slv->relays);
526     slv->relays = NULL;
527   }
528   if (NULL != slv->member)
529   {
530     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
531     slv->member = NULL;
532   }
533   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
534 }
535
536
537 /**
538  * Clean up channel data structures after a client disconnected.
539  */
540 static void
541 cleanup_channel (struct Channel *chn)
542 {
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
544               "%p Cleaning up channel %s. master? %u\n",
545               chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
546   message_queue_drop (chn);
547   GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
548   chn->recv_frags = NULL;
549
550   if (NULL != chn->store_op)
551   {
552     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
553     chn->store_op = NULL;
554   }
555
556   (GNUNET_YES == chn->is_master)
557     ? cleanup_master ((struct Master *) chn)
558     : cleanup_slave ((struct Slave *) chn);
559   GNUNET_free (chn);
560 }
561
562
563 /**
564  * Called whenever a client is disconnected.
565  * Frees our resources associated with that client.
566  *
567  * @param cls Closure.
568  * @param client Identification of the client.
569  */
570 static void
571 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
572 {
573   if (NULL == client)
574     return;
575
576   struct Channel *
577     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
578
579   if (NULL == chn)
580   {
581     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582                 "%p User context is NULL in client_disconnect()\n", chn);
583     GNUNET_break (0);
584     return;
585   }
586
587   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588               "%p Client (%s) disconnected from channel %s\n",
589               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
590               GNUNET_h2s (&chn->pub_key_hash));
591
592   struct Client *cli = chn->clients_head;
593   while (NULL != cli)
594   {
595     if (cli->client == client)
596     {
597       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
598       GNUNET_free (cli);
599       break;
600     }
601     cli = cli->next;
602   }
603
604   struct Operation *op = chn->op_head;
605   while (NULL != op)
606   {
607     if (op->client == client)
608     {
609       op->client = NULL;
610       break;
611     }
612     op = op->next;
613   }
614
615   if (NULL == chn->clients_head)
616   { /* Last client disconnected. */
617     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                 "%p Last client (%s) disconnected from channel %s\n",
619                 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
620                 GNUNET_h2s (&chn->pub_key_hash));
621     chn->is_disconnected = GNUNET_YES;
622     if (NULL != chn->tmit_head)
623     { /* Send pending messages to multicast before cleanup. */
624       transmit_message (chn);
625     }
626     else
627     {
628       cleanup_channel (chn);
629     }
630   }
631 }
632
633
634 /**
635  * Send message to all clients connected to the channel.
636  */
637 static void
638 client_send_msg (const struct Channel *chn,
639                  const struct GNUNET_MessageHeader *msg)
640 {
641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
642               "%p Sending message to clients.\n", chn);
643
644   struct Client *cli = chn->clients_head;
645   while (NULL != cli)
646   {
647     GNUNET_SERVER_notification_context_add (nc, cli->client);
648     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
649     cli = cli->next;
650   }
651 }
652
653
654 /**
655  * Send a result code back to the client.
656  *
657  * @param client
658  *        Client that should receive the result code.
659  * @param result_code
660  *        Code to transmit.
661  * @param op_id
662  *        Operation ID in network byte order.
663  * @param data
664  *        Data payload or NULL.
665  * @param data_size
666  *        Size of @a data.
667  */
668 static void
669 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
670                     int64_t result_code, const void *data, uint16_t data_size)
671 {
672   struct GNUNET_OperationResultMessage *res;
673
674   res = GNUNET_malloc (sizeof (*res) + data_size);
675   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
676   res->header.size = htons (sizeof (*res) + data_size);
677   res->result_code = GNUNET_htonll (result_code);
678   res->op_id = op_id;
679   if (0 < data_size)
680     memcpy (&res[1], data, data_size);
681
682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
683               "%p Sending result to client for operation #%" PRIu64 ": "
684               "%" PRId64 " (size: %u)\n",
685               client, GNUNET_ntohll (op_id), result_code, data_size);
686
687   GNUNET_SERVER_notification_context_add (nc, client);
688   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
689                                               GNUNET_NO);
690   GNUNET_free (res);
691 }
692
693
694 /**
695  * Closure for join_mem_test_cb()
696  */
697 struct JoinMemTestClosure
698 {
699   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
700   struct Channel *chn;
701   struct GNUNET_MULTICAST_JoinHandle *jh;
702   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
703 };
704
705
706 /**
707  * Membership test result callback used for join requests.
708  */
709 static void
710 join_mem_test_cb (void *cls, int64_t result,
711                   const char *err_msg, uint16_t err_msg_size)
712 {
713   struct JoinMemTestClosure *jcls = cls;
714
715   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
716   { /* Pass on join request to client if this is a master channel */
717     struct Master *mst = (struct Master *) jcls->chn;
718     struct GNUNET_HashCode slave_pub_hash;
719     GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
720                         &slave_pub_hash);
721     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh,
722                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
723     client_send_msg (jcls->chn, &jcls->join_msg->header);
724   }
725   else
726   {
727     if (GNUNET_SYSERR == result)
728     {
729       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
730                   "Could not perform membership test (%.*s)\n",
731                   err_msg_size, err_msg);
732     }
733     // FIXME: add relays
734     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
735   }
736   GNUNET_free (jcls->join_msg);
737   GNUNET_free (jcls);
738 }
739
740
741 /**
742  * Incoming join request from multicast.
743  */
744 static void
745 mcast_recv_join_request (void *cls,
746                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
747                          const struct GNUNET_MessageHeader *join_msg,
748                          struct GNUNET_MULTICAST_JoinHandle *jh)
749 {
750   struct Channel *chn = cls;
751   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
752
753   uint16_t join_msg_size = 0;
754   if (NULL != join_msg)
755   {
756     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
757     {
758       join_msg_size = ntohs (join_msg->size);
759     }
760     else
761     {
762       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
763                   "%p Got join message with invalid type %u.\n",
764                   chn, ntohs (join_msg->type));
765     }
766   }
767
768   struct GNUNET_PSYC_JoinRequestMessage *
769     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
770   req->header.size = htons (sizeof (*req) + join_msg_size);
771   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
772   req->slave_pub_key = *slave_pub_key;
773   if (0 < join_msg_size)
774     memcpy (&req[1], join_msg, join_msg_size);
775
776   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
777   jcls->slave_pub_key = *slave_pub_key;
778   jcls->chn = chn;
779   jcls->jh = jh;
780   jcls->join_msg = req;
781
782   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
783                                     chn->max_message_id, 0,
784                                     &join_mem_test_cb, jcls);
785 }
786
787
788 /**
789  * Join decision received from multicast.
790  */
791 static void
792 mcast_recv_join_decision (void *cls, int is_admitted,
793                           const struct GNUNET_PeerIdentity *peer,
794                           uint16_t relay_count,
795                           const struct GNUNET_PeerIdentity *relays,
796                           const struct GNUNET_MessageHeader *join_resp)
797 {
798   struct Slave *slv = cls;
799   struct Channel *chn = &slv->chn;
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "%p Got join decision: %d\n", slv, is_admitted);
802   if (GNUNET_YES == chn->is_ready)
803   {
804     /* Already admitted */
805     return;
806   }
807
808   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
809   struct GNUNET_PSYC_JoinDecisionMessage *
810     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
811   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
812   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
813   dcsn->is_admitted = htonl (is_admitted);
814   if (0 < join_resp_size)
815     memcpy (&dcsn[1], join_resp, join_resp_size);
816
817   client_send_msg (chn, &dcsn->header);
818
819   if (GNUNET_YES == is_admitted
820       && ! (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags))
821   {
822     chn->is_ready = GNUNET_YES;
823   }
824 }
825
826
827 static int
828 store_recv_fragment_replay (void *cls,
829                             struct GNUNET_MULTICAST_MessageHeader *msg,
830                             enum GNUNET_PSYCSTORE_MessageFlags flags)
831 {
832   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
833
834   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
835   return GNUNET_YES;
836 }
837
838
839 /**
840  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
841  */
842 static void
843 store_recv_fragment_replay_result (void *cls, int64_t result,
844                                    const char *err_msg, uint16_t err_msg_size)
845 {
846   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
849               rh, result, err_msg_size, err_msg);
850
851   switch (result)
852   {
853   case GNUNET_YES:
854     break;
855
856   case GNUNET_NO:
857     GNUNET_MULTICAST_replay_response (rh, NULL,
858                                       GNUNET_MULTICAST_REC_NOT_FOUND);
859     break;
860
861   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
862     GNUNET_MULTICAST_replay_response (rh, NULL,
863                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
864     break;
865
866   case GNUNET_SYSERR:
867     GNUNET_MULTICAST_replay_response (rh, NULL,
868                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
869     break;
870   }
871   GNUNET_MULTICAST_replay_response_end (rh);
872 }
873
874
875 /**
876  * Incoming fragment replay request from multicast.
877  */
878 static void
879 mcast_recv_replay_fragment (void *cls,
880                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
881                             uint64_t fragment_id, uint64_t flags,
882                             struct GNUNET_MULTICAST_ReplayHandle *rh)
883
884 {
885   struct Channel *chn = cls;
886   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_pub_key,
887                                  fragment_id, fragment_id,
888                                  &store_recv_fragment_replay,
889                                  &store_recv_fragment_replay_result, rh);
890 }
891
892
893 /**
894  * Incoming message replay request from multicast.
895  */
896 static void
897 mcast_recv_replay_message (void *cls,
898                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
899                            uint64_t message_id,
900                            uint64_t fragment_offset,
901                            uint64_t flags,
902                            struct GNUNET_MULTICAST_ReplayHandle *rh)
903 {
904   struct Channel *chn = cls;
905   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_pub_key,
906                                 message_id, message_id, 1, NULL,
907                                 &store_recv_fragment_replay,
908                                 &store_recv_fragment_replay_result, rh);
909 }
910
911
912 /**
913  * Convert an uint64_t in network byte order to a HashCode
914  * that can be used as key in a MultiHashMap
915  */
916 static inline void
917 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
918 {
919   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
920   /* TODO: use built-in byte swap functions if available */
921
922   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
923   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
924
925   *key = (struct GNUNET_HashCode) {};
926   *((uint64_t *) key)
927     = (n << 32) | (n >> 32);
928 }
929
930
931 /**
932  * Convert an uint64_t in host byte order to a HashCode
933  * that can be used as key in a MultiHashMap
934  */
935 static inline void
936 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
937 {
938 #if __BYTE_ORDER == __BIG_ENDIAN
939   hash_key_from_nll (key, n);
940 #elif __BYTE_ORDER == __LITTLE_ENDIAN
941   *key = (struct GNUNET_HashCode) {};
942   *((uint64_t *) key) = n;
943 #else
944   #error byteorder undefined
945 #endif
946 }
947
948
949 /**
950  * Initialize PSYC message header.
951  */
952 static inline void
953 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
954                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
955 {
956   uint16_t size = ntohs (mmsg->header.size);
957   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
958
959   pmsg->header.size = htons (psize);
960   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
961   pmsg->message_id = mmsg->message_id;
962   pmsg->fragment_offset = mmsg->fragment_offset;
963   pmsg->flags = htonl (flags);
964
965   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
966 }
967
968
969 /**
970  * Create a new PSYC message from a multicast message for sending it to clients.
971  */
972 static inline struct GNUNET_PSYC_MessageHeader *
973 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
974 {
975   struct GNUNET_PSYC_MessageHeader *pmsg;
976   uint16_t size = ntohs (mmsg->header.size);
977   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
978
979   pmsg = GNUNET_malloc (psize);
980   psyc_msg_init (pmsg, mmsg, flags);
981   return pmsg;
982 }
983
984
985 /**
986  * Send multicast message to all clients connected to the channel.
987  */
988 static void
989 client_send_mcast_msg (struct Channel *chn,
990                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
991                        uint32_t flags)
992 {
993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994               "%p Sending multicast message to client. "
995               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
996               chn, GNUNET_ntohll (mmsg->fragment_id),
997               GNUNET_ntohll (mmsg->message_id));
998
999   struct GNUNET_PSYC_MessageHeader *
1000     pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1001   client_send_msg (chn, &pmsg->header);
1002   GNUNET_free (pmsg);
1003 }
1004
1005
1006 /**
1007  * Send multicast request to all clients connected to the channel.
1008  */
1009 static void
1010 client_send_mcast_req (struct Master *mst,
1011                        const struct GNUNET_MULTICAST_RequestHeader *req)
1012 {
1013   struct Channel *chn = &mst->chn;
1014
1015   struct GNUNET_PSYC_MessageHeader *pmsg;
1016   uint16_t size = ntohs (req->header.size);
1017   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1018
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "%p Sending multicast request to client. "
1021               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1022               chn, GNUNET_ntohll (req->fragment_id),
1023               GNUNET_ntohll (req->request_id));
1024
1025   pmsg = GNUNET_malloc (psize);
1026   pmsg->header.size = htons (psize);
1027   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1028   pmsg->message_id = req->request_id;
1029   pmsg->fragment_offset = req->fragment_offset;
1030   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1031   pmsg->slave_pub_key = req->member_pub_key;
1032   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1033
1034   client_send_msg (chn, &pmsg->header);
1035
1036   /* FIXME: save req to PSYCstore so that it can be resent later to clients */
1037
1038   GNUNET_free (pmsg);
1039 }
1040
1041
1042 /**
1043  * Insert a multicast message fragment into the queue belonging to the message.
1044  *
1045  * @param chn          Channel.
1046  * @param mmsg         Multicast message fragment.
1047  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1048  * @param first_ptype  First PSYC message part type in @a mmsg.
1049  * @param last_ptype   Last PSYC message part type in @a mmsg.
1050  */
1051 static void
1052 fragment_queue_insert (struct Channel *chn,
1053                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1054                        uint16_t first_ptype, uint16_t last_ptype)
1055 {
1056   const uint16_t size = ntohs (mmsg->header.size);
1057   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1058   struct GNUNET_CONTAINER_MultiHashMap
1059     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1060                                                     &chn->pub_key_hash);
1061
1062   struct GNUNET_HashCode msg_id_hash;
1063   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1064
1065   struct FragmentQueue
1066     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1067
1068   if (NULL == fragq)
1069   {
1070     fragq = GNUNET_new (struct FragmentQueue);
1071     fragq->state = MSG_FRAG_STATE_HEADER;
1072     fragq->fragments
1073       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1074
1075     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1076                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1077
1078     if (NULL == chan_msgs)
1079     {
1080       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1081       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1082                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1083     }
1084   }
1085
1086   struct GNUNET_HashCode frag_id_hash;
1087   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1088   struct RecvCacheEntry
1089     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1090   if (NULL == cache_entry)
1091   {
1092     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093                 "%p Adding message fragment to cache. "
1094                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1095                 chn, GNUNET_ntohll (mmsg->message_id),
1096                 GNUNET_ntohll (mmsg->fragment_id));
1097     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098                 "%p header_size: %" PRIu64 " + %u\n",
1099                 chn, fragq->header_size, size);
1100     cache_entry = GNUNET_new (struct RecvCacheEntry);
1101     cache_entry->ref_count = 1;
1102     cache_entry->mmsg = GNUNET_malloc (size);
1103     memcpy (cache_entry->mmsg, mmsg, size);
1104     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1105                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1106   }
1107   else
1108   {
1109     cache_entry->ref_count++;
1110     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111                 "%p Message fragment is already in cache. "
1112                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1113                 ", ref_count: %u\n",
1114                 chn, GNUNET_ntohll (mmsg->message_id),
1115                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1116   }
1117
1118   if (MSG_FRAG_STATE_HEADER == fragq->state)
1119   {
1120     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1121     {
1122       struct GNUNET_PSYC_MessageMethod *
1123         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1124       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1125       fragq->flags = ntohl (pmeth->flags);
1126     }
1127
1128     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1129     {
1130       fragq->header_size += size;
1131     }
1132     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1133              || frag_offset == fragq->header_size)
1134     { /* header is now complete */
1135       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1136                   "%p Header of message %" PRIu64 " is complete.\n",
1137                   chn, GNUNET_ntohll (mmsg->message_id));
1138
1139       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1140                   "%p Adding message %" PRIu64 " to queue.\n",
1141                   chn, GNUNET_ntohll (mmsg->message_id));
1142       fragq->state = MSG_FRAG_STATE_DATA;
1143     }
1144     else
1145     {
1146       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1147                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1148                   "%" PRIu64 " != %" PRIu64 "\n",
1149                   chn, GNUNET_ntohll (mmsg->message_id),
1150                   frag_offset, fragq->header_size);
1151     }
1152   }
1153
1154   switch (last_ptype)
1155   {
1156   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1157     if (frag_offset == fragq->size)
1158       fragq->state = MSG_FRAG_STATE_END;
1159     else
1160       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161                   "%p Message %" PRIu64 " is NOT complete yet: "
1162                   "%" PRIu64 " != %" PRIu64 "\n",
1163                   chn, GNUNET_ntohll (mmsg->message_id),
1164                   frag_offset, fragq->size);
1165     break;
1166
1167   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1168     /* Drop message without delivering to client if it's a single fragment */
1169     fragq->state =
1170       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1171       ? MSG_FRAG_STATE_DROP
1172       : MSG_FRAG_STATE_CANCEL;
1173   }
1174
1175   switch (fragq->state)
1176   {
1177   case MSG_FRAG_STATE_DATA:
1178   case MSG_FRAG_STATE_END:
1179   case MSG_FRAG_STATE_CANCEL:
1180     if (GNUNET_NO == fragq->is_queued)
1181     {
1182       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1183                                     GNUNET_ntohll (mmsg->message_id));
1184       fragq->is_queued = GNUNET_YES;
1185     }
1186   }
1187
1188   fragq->size += size;
1189   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1190                                 GNUNET_ntohll (mmsg->fragment_id));
1191 }
1192
1193
1194 /**
1195  * Run fragment queue of a message.
1196  *
1197  * Send fragments of a message in order to client, after all modifiers arrived
1198  * from multicast.
1199  *
1200  * @param chn
1201  *        Channel.
1202  * @param msg_id
1203  *        ID of the message @a fragq belongs to.
1204  * @param fragq
1205  *        Fragment queue of the message.
1206  * @param drop
1207  *        Drop message without delivering to client?
1208  *        #GNUNET_YES or #GNUNET_NO.
1209  */
1210 static void
1211 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1212                     struct FragmentQueue *fragq, uint8_t drop)
1213 {
1214   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1215               "%p Running message fragment queue for message %" PRIu64
1216               " (state: %u).\n",
1217               chn, msg_id, fragq->state);
1218
1219   struct GNUNET_CONTAINER_MultiHashMap
1220     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1221                                                     &chn->pub_key_hash);
1222   GNUNET_assert (NULL != chan_msgs);
1223   uint64_t frag_id;
1224
1225   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1226                                                     &frag_id))
1227   {
1228     struct GNUNET_HashCode frag_id_hash;
1229     hash_key_from_hll (&frag_id_hash, frag_id);
1230     struct RecvCacheEntry *cache_entry
1231       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1232     if (cache_entry != NULL)
1233     {
1234       if (GNUNET_NO == drop)
1235       {
1236         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1237       }
1238       if (cache_entry->ref_count <= 1)
1239       {
1240         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1241                                               cache_entry);
1242         GNUNET_free (cache_entry->mmsg);
1243         GNUNET_free (cache_entry);
1244       }
1245       else
1246       {
1247         cache_entry->ref_count--;
1248       }
1249     }
1250 #if CACHE_AGING_IMPLEMENTED
1251     else if (GNUNET_NO == drop)
1252     {
1253       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1254     }
1255 #endif
1256
1257     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1258   }
1259
1260   if (MSG_FRAG_STATE_END <= fragq->state)
1261   {
1262     struct GNUNET_HashCode msg_id_hash;
1263     hash_key_from_hll (&msg_id_hash, msg_id);
1264
1265     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1266     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1267     GNUNET_free (fragq);
1268   }
1269   else
1270   {
1271     fragq->is_queued = GNUNET_NO;
1272   }
1273 }
1274
1275
1276 struct StateModifyClosure
1277 {
1278   struct Channel *chn;
1279   uint64_t msg_id;
1280   struct GNUNET_HashCode msg_id_hash;
1281 };
1282
1283
1284 void
1285 store_recv_state_modify_result (void *cls, int64_t result,
1286                                 const char *err_msg, uint16_t err_msg_size)
1287 {
1288   struct StateModifyClosure *mcls = cls;
1289   struct Channel *chn = mcls->chn;
1290   uint64_t msg_id = mcls->msg_id;
1291
1292   struct FragmentQueue *
1293     fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1294
1295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1296               "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1297               chn, result, err_msg_size, err_msg);
1298
1299   switch (result)
1300   {
1301   case GNUNET_OK:
1302   case GNUNET_NO:
1303     if (NULL != fragq)
1304       fragq->state_is_modified = GNUNET_YES;
1305     if (chn->max_state_message_id < msg_id)
1306       chn->max_state_message_id = msg_id;
1307     if (chn->max_message_id < msg_id)
1308       chn->max_message_id = msg_id;
1309
1310     if (NULL != fragq)
1311       fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1312     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1313     message_queue_run (chn);
1314     break;
1315
1316   default:
1317     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1318                 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1319                 chn, result, err_msg_size, err_msg);
1320     /** @todo FIXME: handle state_modify error */
1321   }
1322 }
1323
1324
1325 /**
1326  * Run message queue.
1327  *
1328  * Send messages in queue to client in order after a message has arrived from
1329  * multicast, according to the following:
1330  * - A message is only sent if all of its modifiers arrived.
1331  * - A stateful message is only sent if the previous stateful message
1332  *   has already been delivered to the client.
1333  *
1334  * @param chn  Channel.
1335  *
1336  * @return Number of messages removed from queue and sent to client.
1337  */
1338 static uint64_t
1339 message_queue_run (struct Channel *chn)
1340 {
1341   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1342               "%p Running message queue.\n", chn);
1343   uint64_t n = 0;
1344   uint64_t msg_id;
1345
1346   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1347                                                     &msg_id))
1348   {
1349     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1350                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1351     struct GNUNET_HashCode msg_id_hash;
1352     hash_key_from_hll (&msg_id_hash, msg_id);
1353
1354     struct FragmentQueue *
1355       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1356
1357     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1358     {
1359       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1360                   "%p No fragq (%p) or header not complete.\n",
1361                   chn, fragq);
1362       break;
1363     }
1364
1365     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366                 "%p Fragment queue entry:  state: %u, state delta: "
1367                 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1368                 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1369
1370     if (MSG_FRAG_STATE_DATA <= fragq->state)
1371     {
1372       /* Check if there's a missing message before the current one */
1373       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1374       {
1375         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1376
1377         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1378             && (chn->max_message_id != msg_id - 1
1379                 && chn->max_message_id != msg_id))
1380         {
1381           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1382                       "%p Out of order message. "
1383                       "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1384                       chn, chn->max_message_id, msg_id);
1385           break;
1386           // FIXME: keep track of messages processed in this queue run,
1387           //        and only stop after reaching the end
1388         }
1389       }
1390       else
1391       {
1392         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1393         if (GNUNET_YES != fragq->state_is_modified)
1394         {
1395           if (msg_id - fragq->state_delta != chn->max_state_message_id)
1396           {
1397             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1398                         "%p Out of order stateful message. "
1399                         "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1400                         chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1401             break;
1402             // FIXME: keep track of messages processed in this queue run,
1403             //        and only stop after reaching the end
1404           }
1405
1406           struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1407           mcls->chn = chn;
1408           mcls->msg_id = msg_id;
1409           mcls->msg_id_hash = msg_id_hash;
1410
1411           /* Apply modifiers to state in PSYCstore */
1412           GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1413                                          fragq->state_delta,
1414                                          store_recv_state_modify_result, mcls);
1415           break; // continue after asynchronous state modify result
1416         }
1417       }
1418       chn->max_message_id = msg_id;
1419     }
1420     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1421     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1422     n++;
1423   }
1424
1425   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1427   return n;
1428 }
1429
1430
1431 /**
1432  * Drop message queue of a channel.
1433  *
1434  * Remove all messages in queue without sending it to clients.
1435  *
1436  * @param chn  Channel.
1437  *
1438  * @return Number of messages removed from queue.
1439  */
1440 static uint64_t
1441 message_queue_drop (struct Channel *chn)
1442 {
1443   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1444               "%p Dropping message queue.\n", chn);
1445   uint64_t n = 0;
1446   uint64_t msg_id;
1447   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1448                                                     &msg_id))
1449   {
1450     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1451                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1452     struct GNUNET_HashCode msg_id_hash;
1453     hash_key_from_hll (&msg_id_hash, msg_id);
1454
1455     struct FragmentQueue *
1456       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1457     GNUNET_assert (NULL != fragq);
1458     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1459     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1460     n++;
1461   }
1462   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1463               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1464   return n;
1465 }
1466
1467
1468 /**
1469  * Received result of GNUNET_PSYCSTORE_fragment_store().
1470  */
1471 static void
1472 store_recv_fragment_store_result (void *cls, int64_t result,
1473                                   const char *err_msg, uint16_t err_msg_size)
1474 {
1475   struct Channel *chn = cls;
1476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1478               chn, result, err_msg_size, err_msg);
1479 }
1480
1481
1482 /**
1483  * Handle incoming message fragment from multicast.
1484  *
1485  * Store it using PSYCstore and send it to the clients of the channel in order.
1486  */
1487 static void
1488 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1489 {
1490   struct Channel *chn = cls;
1491   uint16_t size = ntohs (mmsg->header.size);
1492
1493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1494               "%p Received multicast message of size %u. "
1495               "fragment_id=%" PRIu64 ", message_id=%" PRIu64
1496               ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
1497               chn, size,
1498               GNUNET_ntohll (mmsg->fragment_id),
1499               GNUNET_ntohll (mmsg->message_id),
1500               GNUNET_ntohll (mmsg->fragment_offset),
1501               GNUNET_ntohll (mmsg->flags));
1502
1503   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1504                                    &store_recv_fragment_store_result, chn);
1505
1506   uint16_t first_ptype = 0, last_ptype = 0;
1507   int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1508                                                (const char *) &mmsg[1],
1509                                                &first_ptype, &last_ptype);
1510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511               "%p Message check result %d, first part type %u, last part type %u\n",
1512               chn, check, first_ptype, last_ptype);
1513   if (GNUNET_SYSERR == check)
1514   {
1515     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1516                 "%p Dropping incoming multicast message with invalid parts.\n",
1517                 chn);
1518     GNUNET_break_op (0);
1519     return;
1520   }
1521
1522   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1523   message_queue_run (chn);
1524 }
1525
1526
1527 /**
1528  * Incoming request fragment from multicast for a master.
1529  *
1530  * @param cls   Master.
1531  * @param req   The request.
1532  */
1533 static void
1534 mcast_recv_request (void *cls,
1535                     const struct GNUNET_MULTICAST_RequestHeader *req)
1536 {
1537   struct Master *mst = cls;
1538   uint16_t size = ntohs (req->header.size);
1539
1540   char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&req->member_pub_key);
1541   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542               "%p Received multicast request of size %u from %s.\n",
1543               mst, size, str);
1544   GNUNET_free (str);
1545
1546   uint16_t first_ptype = 0, last_ptype = 0;
1547   if (GNUNET_SYSERR
1548       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1549                                           (const char *) &req[1],
1550                                           &first_ptype, &last_ptype))
1551   {
1552     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1553                 "%p Dropping incoming multicast request with invalid parts.\n",
1554                 mst);
1555     GNUNET_break_op (0);
1556     return;
1557   }
1558
1559   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1560               "Message parts: first: type %u, last: type %u\n",
1561               first_ptype, last_ptype);
1562
1563   /* FIXME: in-order delivery */
1564   client_send_mcast_req (mst, req);
1565 }
1566
1567
1568 /**
1569  * Response from PSYCstore with the current counter values for a channel master.
1570  */
1571 static void
1572 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1573                             uint64_t max_message_id, uint64_t max_group_generation,
1574                             uint64_t max_state_message_id)
1575 {
1576   struct Master *mst = cls;
1577   struct Channel *chn = &mst->chn;
1578   chn->store_op = NULL;
1579
1580   struct GNUNET_PSYC_CountersResultMessage res;
1581   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1582   res.header.size = htons (sizeof (res));
1583   res.result_code = htonl (result);
1584   res.max_message_id = GNUNET_htonll (max_message_id);
1585
1586   if (GNUNET_OK == result || GNUNET_NO == result)
1587   {
1588     mst->max_message_id = max_message_id;
1589     chn->max_message_id = max_message_id;
1590     chn->max_state_message_id = max_state_message_id;
1591     mst->max_group_generation = max_group_generation;
1592     mst->origin
1593       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1594                                        mcast_recv_join_request,
1595                                        mcast_recv_replay_fragment,
1596                                        mcast_recv_replay_message,
1597                                        mcast_recv_request,
1598                                        mcast_recv_message, chn);
1599     chn->is_ready = GNUNET_YES;
1600   }
1601   else
1602   {
1603     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1604                 "%p GNUNET_PSYCSTORE_counters_get() "
1605                 "returned %d for channel %s.\n",
1606                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1607   }
1608
1609   client_send_msg (chn, &res.header);
1610 }
1611
1612
1613 /**
1614  * Response from PSYCstore with the current counter values for a channel slave.
1615  */
1616 void
1617 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1618                            uint64_t max_message_id, uint64_t max_group_generation,
1619                            uint64_t max_state_message_id)
1620 {
1621   struct Slave *slv = cls;
1622   struct Channel *chn = &slv->chn;
1623   chn->store_op = NULL;
1624
1625   struct GNUNET_PSYC_CountersResultMessage res;
1626   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1627   res.header.size = htons (sizeof (res));
1628   res.result_code = htonl (result);
1629   res.max_message_id = GNUNET_htonll (max_message_id);
1630
1631   if (GNUNET_OK == result || GNUNET_NO == result)
1632   {
1633     chn->max_message_id = max_message_id;
1634     chn->max_state_message_id = max_state_message_id;
1635     slv->member
1636       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1637                                       &slv->origin,
1638                                       slv->relay_count, slv->relays,
1639                                       &slv->join_msg->header,
1640                                       mcast_recv_join_request,
1641                                       mcast_recv_join_decision,
1642                                       mcast_recv_replay_fragment,
1643                                       mcast_recv_replay_message,
1644                                       mcast_recv_message, chn);
1645     if (NULL != slv->join_msg)
1646     {
1647       GNUNET_free (slv->join_msg);
1648       slv->join_msg = NULL;
1649     }
1650   }
1651   else
1652   {
1653     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1654                 "%p GNUNET_PSYCSTORE_counters_get() "
1655                 "returned %d for channel %s.\n",
1656                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1657   }
1658
1659   client_send_msg (chn, &res.header);
1660 }
1661
1662
1663 static void
1664 channel_init (struct Channel *chn)
1665 {
1666   chn->recv_msgs
1667     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1668   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1669 }
1670
1671
1672 /**
1673  * Handle a connecting client starting a channel master.
1674  */
1675 static void
1676 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1677                           const struct GNUNET_MessageHeader *msg)
1678 {
1679   const struct MasterStartRequest *req
1680     = (const struct MasterStartRequest *) msg;
1681
1682   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1683   struct GNUNET_HashCode pub_key_hash;
1684
1685   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1686   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1687
1688   struct Master *
1689     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1690   struct Channel *chn;
1691
1692   if (NULL == mst)
1693   {
1694     mst = GNUNET_new (struct Master);
1695     mst->policy = ntohl (req->policy);
1696     mst->priv_key = req->channel_key;
1697     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1698
1699     chn = &mst->chn;
1700     chn->is_master = GNUNET_YES;
1701     chn->pub_key = pub_key;
1702     chn->pub_key_hash = pub_key_hash;
1703     channel_init (chn);
1704
1705     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1706                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1707     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1708                                                    store_recv_master_counters, mst);
1709   }
1710   else
1711   {
1712     chn = &mst->chn;
1713
1714     struct GNUNET_PSYC_CountersResultMessage res;
1715     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1716     res.header.size = htons (sizeof (res));
1717     res.result_code = htonl (GNUNET_OK);
1718     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1719
1720     GNUNET_SERVER_notification_context_add (nc, client);
1721     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1722                                                 GNUNET_NO);
1723   }
1724
1725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726               "%p Client connected as master to channel %s.\n",
1727               mst, GNUNET_h2s (&chn->pub_key_hash));
1728
1729   struct Client *cli = GNUNET_new (struct Client);
1730   cli->client = client;
1731   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1732
1733   GNUNET_SERVER_client_set_user_context (client, chn);
1734   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1735 }
1736
1737
1738 /**
1739  * Handle a connecting client joining as a channel slave.
1740  */
1741 static void
1742 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1743                         const struct GNUNET_MessageHeader *msg)
1744 {
1745   const struct SlaveJoinRequest *req
1746     = (const struct SlaveJoinRequest *) msg;
1747   uint16_t req_size = ntohs (req->header.size);
1748
1749   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1750   struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1751
1752   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1753   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1754   GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
1755
1756   struct GNUNET_CONTAINER_MultiHashMap *
1757     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1758   struct Slave *slv = NULL;
1759   struct Channel *chn;
1760
1761   if (NULL != chn_slv)
1762   {
1763     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_hash);
1764   }
1765   if (NULL == slv)
1766   {
1767     slv = GNUNET_new (struct Slave);
1768     slv->priv_key = req->slave_key;
1769     slv->pub_key = slv_pub_key;
1770     slv->pub_key_hash = slv_pub_hash;
1771     slv->origin = req->origin;
1772     slv->relay_count = ntohl (req->relay_count);
1773     slv->join_flags = ntohl (req->flags);
1774
1775     const struct GNUNET_PeerIdentity *
1776       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1777     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1778     uint16_t join_msg_size = 0;
1779
1780     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1781         <= req_size)
1782     {
1783       struct GNUNET_PSYC_Message *
1784         join_msg = (struct GNUNET_PSYC_Message *) (((char *) &req[1]) + relay_size);
1785       join_msg_size = ntohs (join_msg->header.size);
1786       slv->join_msg = GNUNET_malloc (join_msg_size);
1787       memcpy (slv->join_msg, join_msg, join_msg_size);
1788     }
1789     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1790     {
1791       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1792                   "%u + %u + %u != %u\n",
1793                   (unsigned int) sizeof (*req),
1794                   relay_size,
1795                   join_msg_size,
1796                   req_size);
1797       GNUNET_break (0);
1798       GNUNET_SERVER_client_disconnect (client);
1799       GNUNET_free (slv);
1800       return;
1801     }
1802     if (0 < slv->relay_count)
1803     {
1804       slv->relays = GNUNET_malloc (relay_size);
1805       memcpy (slv->relays, &req[1], relay_size);
1806     }
1807
1808     chn = &slv->chn;
1809     chn->is_master = GNUNET_NO;
1810     chn->pub_key = req->channel_pub_key;
1811     chn->pub_key_hash = pub_key_hash;
1812     channel_init (chn);
1813
1814     if (NULL == chn_slv)
1815     {
1816       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1817       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1818                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1819     }
1820     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1821                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1822     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1823                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1824     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1825                                                   &store_recv_slave_counters, slv);
1826   }
1827   else
1828   {
1829     chn = &slv->chn;
1830
1831     struct GNUNET_PSYC_CountersResultMessage res;
1832     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1833     res.header.size = htons (sizeof (res));
1834     res.result_code = htonl (GNUNET_OK);
1835     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1836
1837     GNUNET_SERVER_notification_context_add (nc, client);
1838     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1839                                                 GNUNET_NO);
1840
1841     if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1842     {
1843       mcast_recv_join_decision (slv, GNUNET_YES,
1844                                 NULL, 0, NULL, NULL);
1845     }
1846     else if (NULL == slv->member)
1847     {
1848       slv->member
1849         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1850                                         &slv->origin,
1851                                         slv->relay_count, slv->relays,
1852                                         &slv->join_msg->header,
1853                                         &mcast_recv_join_request,
1854                                         &mcast_recv_join_decision,
1855                                         &mcast_recv_replay_fragment,
1856                                         &mcast_recv_replay_message,
1857                                         &mcast_recv_message, chn);
1858       if (NULL != slv->join_msg)
1859       {
1860         GNUNET_free (slv->join_msg);
1861         slv->join_msg = NULL;
1862       }
1863     }
1864     else if (NULL != slv->join_dcsn)
1865     {
1866       GNUNET_SERVER_notification_context_add (nc, client);
1867       GNUNET_SERVER_notification_context_unicast (nc, client,
1868                                                   &slv->join_dcsn->header,
1869                                                   GNUNET_NO);
1870     }
1871   }
1872
1873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1874               "%p Client connected as slave to channel %s.\n",
1875               slv, GNUNET_h2s (&chn->pub_key_hash));
1876
1877   struct Client *cli = GNUNET_new (struct Client);
1878   cli->client = client;
1879   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1880
1881   GNUNET_SERVER_client_set_user_context (client, chn);
1882   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1883 }
1884
1885
1886 struct JoinDecisionClosure
1887 {
1888   int32_t is_admitted;
1889   struct GNUNET_MessageHeader *msg;
1890 };
1891
1892
1893 /**
1894  * Iterator callback for sending join decisions to multicast.
1895  */
1896 static int
1897 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1898                           void *value)
1899 {
1900   struct JoinDecisionClosure *jcls = cls;
1901   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1902   // FIXME: add relays
1903   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1904   return GNUNET_YES;
1905 }
1906
1907
1908 /**
1909  * Join decision from client.
1910  */
1911 static void
1912 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1913                            const struct GNUNET_MessageHeader *msg)
1914 {
1915   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1916     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1917   struct Channel *chn;
1918   struct Master *mst;
1919   struct JoinDecisionClosure jcls;
1920
1921   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1922   if (NULL == chn)
1923   {
1924     GNUNET_break (0);
1925     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1926     return;
1927   }
1928   GNUNET_assert (GNUNET_YES == chn->is_master);
1929   mst = (struct Master *) chn;
1930   jcls.is_admitted = ntohl (dcsn->is_admitted);
1931   jcls.msg
1932     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1933     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1934     : NULL;
1935
1936   struct GNUNET_HashCode slave_pub_hash;
1937   GNUNET_CRYPTO_hash (&dcsn->slave_pub_key, sizeof (dcsn->slave_pub_key),
1938                       &slave_pub_hash);
1939
1940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1941               "%p Got join decision (%d) from client for channel %s..\n",
1942               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1944               "%p ..and slave %s.\n",
1945               mst, GNUNET_h2s (&slave_pub_hash));
1946
1947   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1948                                               &mcast_send_join_decision, &jcls);
1949   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1950   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1951 }
1952
1953
1954 /**
1955  * Send acknowledgement to a client.
1956  *
1957  * Sent after a message fragment has been passed on to multicast.
1958  *
1959  * @param chn The channel struct for the client.
1960  */
1961 static void
1962 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1963 {
1964   struct GNUNET_MessageHeader res;
1965   res.size = htons (sizeof (res));
1966   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1967
1968   /* FIXME */
1969   GNUNET_SERVER_notification_context_add (nc, client);
1970   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1971 }
1972
1973
1974 /**
1975  * Callback for the transmit functions of multicast.
1976  */
1977 static int
1978 transmit_notify (void *cls, size_t *data_size, void *data)
1979 {
1980   struct Channel *chn = cls;
1981   struct TransmitMessage *tmit_msg = chn->tmit_head;
1982
1983   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1984   {
1985     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1986                 "%p transmit_notify: nothing to send.\n", chn);
1987     if (NULL != tmit_msg && *data_size < tmit_msg->size)
1988       GNUNET_break (0);
1989     *data_size = 0;
1990     return GNUNET_NO;
1991   }
1992
1993   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1995
1996   *data_size = tmit_msg->size;
1997   memcpy (data, &tmit_msg[1], *data_size);
1998
1999   int ret
2000     = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2001     ? GNUNET_NO
2002     : GNUNET_YES;
2003
2004   /* FIXME: handle disconnecting clients */
2005   if (NULL != tmit_msg->client)
2006     send_message_ack (chn, tmit_msg->client);
2007
2008   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
2009   GNUNET_free (tmit_msg);
2010
2011   if (NULL != chn->tmit_head)
2012   {
2013     GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2014   }
2015   else if (GNUNET_YES == chn->is_disconnected
2016            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2017   {
2018     /* FIXME: handle partial message (when still in_transmit) */
2019     return GNUNET_SYSERR;
2020   }
2021   return ret;
2022 }
2023
2024
2025 /**
2026  * Callback for the transmit functions of multicast.
2027  */
2028 static int
2029 master_transmit_notify (void *cls, size_t *data_size, void *data)
2030 {
2031   int ret = transmit_notify (cls, data_size, data);
2032
2033   if (GNUNET_YES == ret)
2034   {
2035     struct Master *mst = cls;
2036     mst->tmit_handle = NULL;
2037   }
2038   return ret;
2039 }
2040
2041
2042 /**
2043  * Callback for the transmit functions of multicast.
2044  */
2045 static int
2046 slave_transmit_notify (void *cls, size_t *data_size, void *data)
2047 {
2048   int ret = transmit_notify (cls, data_size, data);
2049
2050   if (GNUNET_YES == ret)
2051   {
2052     struct Slave *slv = cls;
2053     slv->tmit_handle = NULL;
2054   }
2055   return ret;
2056 }
2057
2058
2059 /**
2060  * Transmit a message from a channel master to the multicast group.
2061  */
2062 static void
2063 master_transmit_message (struct Master *mst)
2064 {
2065   struct Channel *chn = &mst->chn;
2066   struct TransmitMessage *tmit_msg = chn->tmit_head;
2067   if (NULL == tmit_msg)
2068     return;
2069   if (NULL == mst->tmit_handle)
2070   {
2071     mst->tmit_handle = (void *) &mst->tmit_handle;
2072     struct GNUNET_MULTICAST_OriginTransmitHandle *
2073       tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, tmit_msg->id,
2074                                                     mst->max_group_generation,
2075                                                     master_transmit_notify, mst);
2076     if (NULL != mst->tmit_handle)
2077       mst->tmit_handle = tmit_handle;
2078   }
2079   else
2080   {
2081     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
2082   }
2083 }
2084
2085
2086 /**
2087  * Transmit a message from a channel slave to the multicast group.
2088  */
2089 static void
2090 slave_transmit_message (struct Slave *slv)
2091 {
2092   if (NULL == slv->chn.tmit_head)
2093     return;
2094   if (NULL == slv->tmit_handle)
2095   {
2096     slv->tmit_handle = (void *) &slv->tmit_handle;
2097     struct GNUNET_MULTICAST_MemberTransmitHandle *
2098       tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
2099                                                        slave_transmit_notify, slv);
2100     if (NULL != slv->tmit_handle)
2101       slv->tmit_handle = tmit_handle;
2102   }
2103   else
2104   {
2105     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2106   }
2107 }
2108
2109
2110 static void
2111 transmit_message (struct Channel *chn)
2112 {
2113   chn->is_master
2114     ? master_transmit_message ((struct Master *) chn)
2115     : slave_transmit_message ((struct Slave *) chn);
2116 }
2117
2118
2119 /**
2120  * Queue a message from a channel master for sending to the multicast group.
2121  */
2122 static void
2123 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2124 {
2125   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2126
2127   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2128   {
2129     tmit_msg->id = ++mst->max_message_id;
2130     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2131                 "%p master_queue_message: message_id=%" PRIu64 "\n",
2132                 mst, tmit_msg->id);
2133     struct GNUNET_PSYC_MessageMethod *pmeth
2134       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2135
2136     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2137     {
2138       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2139     }
2140     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2141     {
2142       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2143                   "%p master_queue_message: state_delta=%" PRIu64 "\n",
2144                   mst, tmit_msg->id - mst->max_state_message_id);
2145       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2146                                           - mst->max_state_message_id);
2147       mst->max_state_message_id = tmit_msg->id;
2148     }
2149     else
2150     {
2151         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2152                     "%p master_queue_message: state not modified\n", mst);
2153       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2154     }
2155
2156     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2157     {
2158       /// @todo add state_hash to PSYC header
2159     }
2160   }
2161 }
2162
2163
2164 /**
2165  * Queue a message from a channel slave for sending to the multicast group.
2166  */
2167 static void
2168 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2169 {
2170   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2171   {
2172     struct GNUNET_PSYC_MessageMethod *pmeth
2173       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2174     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2175     tmit_msg->id = ++slv->max_request_id;
2176   }
2177 }
2178
2179
2180 /**
2181  * Queue PSYC message parts for sending to multicast.
2182  *
2183  * @param chn
2184  *        Channel to send to.
2185  * @param client
2186  *        Client the message originates from.
2187  * @param data_size
2188  *        Size of @a data.
2189  * @param data
2190  *        Concatenated message parts.
2191  * @param first_ptype
2192  *        First message part type in @a data.
2193  * @param last_ptype
2194  *        Last message part type in @a data.
2195  */
2196 static struct TransmitMessage *
2197 queue_message (struct Channel *chn,
2198                struct GNUNET_SERVER_Client *client,
2199                size_t data_size,
2200                const void *data,
2201                uint16_t first_ptype, uint16_t last_ptype)
2202 {
2203   struct TransmitMessage *
2204     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2205   memcpy (&tmit_msg[1], data, data_size);
2206   tmit_msg->client = client;
2207   tmit_msg->size = data_size;
2208   tmit_msg->first_ptype = first_ptype;
2209   tmit_msg->last_ptype = last_ptype;
2210
2211   /* FIXME: separate queue per message ID */
2212
2213   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2214
2215   chn->is_master
2216     ? master_queue_message ((struct Master *) chn, tmit_msg)
2217     : slave_queue_message ((struct Slave *) chn, tmit_msg);
2218   return tmit_msg;
2219 }
2220
2221
2222 /**
2223  * Cancel transmission of current message.
2224  *
2225  * @param chn     Channel to send to.
2226  * @param client  Client the message originates from.
2227  */
2228 static void
2229 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2230 {
2231   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2232
2233   struct GNUNET_MessageHeader msg;
2234   msg.size = htons (sizeof (msg));
2235   msg.type = htons (type);
2236
2237   queue_message (chn, client, sizeof (msg), &msg, type, type);
2238   transmit_message (chn);
2239
2240   /* FIXME: cleanup */
2241 }
2242
2243
2244 /**
2245  * Incoming message from a master or slave client.
2246  */
2247 static void
2248 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2249                           const struct GNUNET_MessageHeader *msg)
2250 {
2251   struct Channel *
2252     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2253   GNUNET_assert (NULL != chn);
2254
2255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2256               "%p Received message from client.\n", chn);
2257   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2258
2259   if (GNUNET_YES != chn->is_ready)
2260   {
2261     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2262                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2263     GNUNET_break (0);
2264     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2265     return;
2266   }
2267
2268   uint16_t size = ntohs (msg->size);
2269   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2270   {
2271     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2272                 "%p Message payload too large: %u < %u.\n",
2273                 chn,
2274                 (unsigned int) GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
2275                 (unsigned int) (size - sizeof (*msg)));
2276     GNUNET_break (0);
2277     transmit_cancel (chn, client);
2278     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2279     return;
2280   }
2281
2282   uint16_t first_ptype = 0, last_ptype = 0;
2283   if (GNUNET_SYSERR
2284       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2285                                           (const char *) &msg[1],
2286                                           &first_ptype, &last_ptype))
2287   {
2288     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2289                 "%p Received invalid message part from client.\n", chn);
2290     GNUNET_break (0);
2291     transmit_cancel (chn, client);
2292     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2293     return;
2294   }
2295   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2296               "%p Received message with first part type %u and last part type %u.\n",
2297               chn, first_ptype, last_ptype);
2298
2299   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2300                  first_ptype, last_ptype);
2301   transmit_message (chn);
2302   /* FIXME: send a few ACKs even before transmit_notify is called */
2303
2304   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2305 };
2306
2307
2308 /**
2309  * Received result of GNUNET_PSYCSTORE_membership_store()
2310  */
2311 static void
2312 store_recv_membership_store_result (void *cls,
2313                                     int64_t result,
2314                                     const char *err_msg,
2315                                     uint16_t err_msg_size)
2316 {
2317   struct Operation *op = cls;
2318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2319               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2320               op->chn,
2321               result,
2322               (int) err_msg_size,
2323               err_msg);
2324
2325   if (NULL != op->client)
2326     client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2327   op_remove (op);
2328 }
2329
2330
2331 /**
2332  * Client requests to add/remove a slave in the membership database.
2333  */
2334 static void
2335 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2336                               const struct GNUNET_MessageHeader *msg)
2337 {
2338   struct Channel *
2339     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2340   GNUNET_assert (NULL != chn);
2341
2342   const struct ChannelMembershipStoreRequest *
2343     req = (const struct ChannelMembershipStoreRequest *) msg;
2344
2345   struct Operation *op = op_add (chn, client, req->op_id, 0);
2346
2347   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2348   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2349   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350               "%p Received membership store request from client.\n", chn);
2351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2353               chn, req->did_join, announced_at, effective_since);
2354
2355   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_pub_key,
2356                                      req->did_join, announced_at, effective_since,
2357                                      0, /* FIXME: group_generation */
2358                                      &store_recv_membership_store_result, op);
2359   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2360 }
2361
2362
2363 /**
2364  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2365  * in response to a history request from a client.
2366  */
2367 static int
2368 store_recv_fragment_history (void *cls,
2369                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2370                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2371 {
2372   struct Operation *op = cls;
2373   if (NULL == op->client)
2374   { /* Requesting client already disconnected. */
2375     return GNUNET_NO;
2376   }
2377   struct Channel *chn = op->chn;
2378
2379   struct GNUNET_PSYC_MessageHeader *pmsg;
2380   uint16_t msize = ntohs (mmsg->header.size);
2381   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2382
2383   struct GNUNET_OperationResultMessage *
2384     res = GNUNET_malloc (sizeof (*res) + psize);
2385   res->header.size = htons (sizeof (*res) + psize);
2386   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2387   res->op_id = op->op_id;
2388   res->result_code = GNUNET_htonll (GNUNET_OK);
2389
2390   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2391   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2392   memcpy (&res[1], pmsg, psize);
2393
2394   /** @todo FIXME: send only to requesting client */
2395   client_send_msg (chn, &res->header);
2396   return GNUNET_YES;
2397 }
2398
2399
2400 /**
2401  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2402  * in response to a history request from a client.
2403  */
2404 static void
2405 store_recv_fragment_history_result (void *cls, int64_t result,
2406                                     const char *err_msg, uint16_t err_msg_size)
2407 {
2408   struct Operation *op = cls;
2409   if (NULL == op->client)
2410   { /* Requesting client already disconnected. */
2411     return;
2412   }
2413
2414   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2415               "%p History replay #%" PRIu64 ": "
2416               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2417               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2418
2419   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2420   {
2421     /** @todo Multicast replay request for messages not found locally. */
2422   }
2423
2424   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2425   op_remove (op);
2426 }
2427
2428
2429 /**
2430  * Client requests channel history.
2431  */
2432 static void
2433 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2434                             const struct GNUNET_MessageHeader *msg)
2435 {
2436   struct Channel *
2437     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2438   GNUNET_assert (NULL != chn);
2439
2440   const struct GNUNET_PSYC_HistoryRequestMessage *
2441     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2442   uint16_t size = ntohs (msg->size);
2443   const char *method_prefix = (const char *) &req[1];
2444
2445   if (size < sizeof (*req) + 1
2446       || '\0' != method_prefix[size - sizeof (*req) - 1])
2447   {
2448     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2449                 "%p History replay #%" PRIu64 ": "
2450                 "invalid method prefix. size: %u < %u?\n",
2451                 chn,
2452                 GNUNET_ntohll (req->op_id),
2453                 size,
2454                 (unsigned int) sizeof (*req) + 1);
2455     GNUNET_break (0);
2456     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2457     return;
2458   }
2459
2460   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2461
2462   if (0 == req->message_limit)
2463     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2464                                   GNUNET_ntohll (req->start_message_id),
2465                                   GNUNET_ntohll (req->end_message_id),
2466                                   0, method_prefix,
2467                                   &store_recv_fragment_history,
2468                                   &store_recv_fragment_history_result, op);
2469   else
2470     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2471                                          GNUNET_ntohll (req->message_limit),
2472                                          method_prefix,
2473                                          &store_recv_fragment_history,
2474                                          &store_recv_fragment_history_result,
2475                                          op);
2476
2477   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2478 }
2479
2480
2481 /**
2482  * Received state var from PSYCstore, send it to client.
2483  */
2484 static int
2485 store_recv_state_var (void *cls, const char *name,
2486                       const void *value, uint32_t value_size)
2487 {
2488   struct Operation *op = cls;
2489   struct GNUNET_OperationResultMessage *res;
2490
2491   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2492               "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2493               op->chn, GNUNET_ntohll (op->op_id), name);
2494
2495   if (NULL != name) /* First part */
2496   {
2497     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2498     struct GNUNET_PSYC_MessageModifier *mod;
2499     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2500     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2501     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2502     res->op_id = op->op_id;
2503
2504     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2505     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2506     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2507     mod->name_size = htons (name_size);
2508     mod->value_size = htonl (value_size);
2509     mod->oper = htons (GNUNET_PSYC_OP_ASSIGN);
2510     memcpy (&mod[1], name, name_size);
2511     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2512   }
2513   else /* Continuation */
2514   {
2515     struct GNUNET_MessageHeader *mod;
2516     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2517     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2518     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2519     res->op_id = op->op_id;
2520
2521     mod = (struct GNUNET_MessageHeader *) &res[1];
2522     mod->size = htons (sizeof (*mod) + value_size);
2523     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2524     memcpy (&mod[1], value, value_size);
2525   }
2526
2527   // FIXME: client might have been disconnected
2528   GNUNET_SERVER_notification_context_add (nc, op->client);
2529   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2530                                               GNUNET_NO);
2531   return GNUNET_YES;
2532 }
2533
2534
2535 /**
2536  * Received result of GNUNET_PSYCSTORE_state_get()
2537  * or GNUNET_PSYCSTORE_state_get_prefix()
2538  */
2539 static void
2540 store_recv_state_result (void *cls, int64_t result,
2541                          const char *err_msg, uint16_t err_msg_size)
2542 {
2543   struct Operation *op = cls;
2544   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2545               "%p state_get #%" PRIu64 ": "
2546               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2547               op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2548
2549   // FIXME: client might have been disconnected
2550   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2551   op_remove (op);
2552 }
2553
2554
2555 /**
2556  * Client requests best matching state variable from PSYCstore.
2557  */
2558 static void
2559 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2560                        const struct GNUNET_MessageHeader *msg)
2561 {
2562   struct Channel *
2563     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2564   GNUNET_assert (NULL != chn);
2565
2566   const struct StateRequest *
2567     req = (const struct StateRequest *) msg;
2568
2569   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2570   const char *name = (const char *) &req[1];
2571   if (0 == name_size || '\0' != name[name_size - 1])
2572   {
2573     GNUNET_break (0);
2574     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2575     return;
2576   }
2577
2578   struct Operation *op = op_add (chn, client, req->op_id, 0);
2579   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2580                               &store_recv_state_var,
2581                               &store_recv_state_result, op);
2582   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2583 }
2584
2585
2586 /**
2587  * Client requests state variables with a given prefix from PSYCstore.
2588  */
2589 static void
2590 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2591                               const struct GNUNET_MessageHeader *msg)
2592 {
2593   struct Channel *
2594     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2595   GNUNET_assert (NULL != chn);
2596
2597   const struct StateRequest *
2598     req = (const struct StateRequest *) msg;
2599
2600   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2601   const char *name = (const char *) &req[1];
2602   if (0 == name_size || '\0' != name[name_size - 1])
2603   {
2604     GNUNET_break (0);
2605     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2606     return;
2607   }
2608
2609   struct Operation *op = op_add (chn, client, req->op_id, 0);
2610   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2611                                      &store_recv_state_var,
2612                                      &store_recv_state_result, op);
2613   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2614 }
2615
2616
2617 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2618   { &client_recv_master_start, NULL,
2619     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2620
2621   { &client_recv_slave_join, NULL,
2622     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2623
2624   { &client_recv_join_decision, NULL,
2625     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2626
2627   { &client_recv_psyc_message, NULL,
2628     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2629
2630   { &client_recv_membership_store, NULL,
2631     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2632
2633   { &client_recv_history_replay, NULL,
2634     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2635
2636   { &client_recv_state_get, NULL,
2637     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2638
2639   { &client_recv_state_get_prefix, NULL,
2640     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2641
2642   { NULL, NULL, 0, 0 }
2643 };
2644
2645
2646 /**
2647  * Initialize the PSYC service.
2648  *
2649  * @param cls Closure.
2650  * @param server The initialized server.
2651  * @param c Configuration to use.
2652  */
2653 static void
2654 run (void *cls, struct GNUNET_SERVER_Handle *server,
2655      const struct GNUNET_CONFIGURATION_Handle *c)
2656 {
2657   cfg = c;
2658   store = GNUNET_PSYCSTORE_connect (cfg);
2659   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2660   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2661   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2662   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2663   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2664   nc = GNUNET_SERVER_notification_context_create (server, 1);
2665   GNUNET_SERVER_add_handlers (server, server_handlers);
2666   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2667   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2668 }
2669
2670
2671 /**
2672  * The main function for the service.
2673  *
2674  * @param argc number of arguments from the command line
2675  * @param argv command line arguments
2676  * @return 0 ok, 1 on error
2677  */
2678 int
2679 main (int argc, char *const *argv)
2680 {
2681   return (GNUNET_OK ==
2682           GNUNET_SERVICE_run (argc, argv, "psyc",
2683                               GNUNET_SERVICE_OPTION_NONE,
2684                               &run, NULL)) ? 0 : 1;
2685 }
2686
2687 /* end of gnunet-service-psyc.c */