-nicer logging
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * @see enum MessageState
102    */
103   uint8_t state;
104
105   /**
106    * Whether a message ACK has already been sent to the client.
107    * #GNUNET_YES or #GNUNET_NO
108    */
109   uint8_t ack_sent;
110
111   /* Followed by message */
112 };
113
114
115 /**
116  * Cache for received message fragments.
117  * Message fragments are only sent to clients after all modifiers arrived.
118  *
119  * chan_key -> MultiHashMap chan_msgs
120  */
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
122
123
124 /**
125  * Entry in the chan_msgs hashmap of @a recv_cache:
126  * fragment_id -> RecvCacheEntry
127  */
128 struct RecvCacheEntry
129 {
130   struct GNUNET_MULTICAST_MessageHeader *mmsg;
131   uint16_t ref_count;
132 };
133
134
135 /**
136  * Entry in the @a recv_frags hash map of a @a Channel.
137  * message_id -> FragmentQueue
138  */
139 struct FragmentQueue
140 {
141   /**
142    * Fragment IDs stored in @a recv_cache.
143    */
144   struct GNUNET_CONTAINER_Heap *fragments;
145
146   /**
147    * Total size of received fragments.
148    */
149   uint64_t size;
150
151   /**
152    * Total size of received header fragments (METHOD & MODIFIERs)
153    */
154   uint64_t header_size;
155
156   /**
157    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158    */
159   uint64_t state_delta;
160
161   /**
162    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
163    */
164   uint32_t flags;
165
166   /**
167    * Receive state of message.
168    *
169    * @see MessageFragmentState
170    */
171   uint8_t state;
172
173   /**
174    * Is the message queued for delivery to the client?
175    * i.e. added to the recv_msgs queue
176    */
177   uint8_t is_queued;
178 };
179
180
181 /**
182  * List of connected clients.
183  */
184 struct Client
185 {
186   struct Client *prev;
187   struct Client *next;
188
189   struct GNUNET_SERVER_Client *client;
190 };
191
192
193 struct Operation
194 {
195   struct Operation *prev;
196   struct Operation *next;
197
198   struct GNUNET_SERVER_Client *client;
199   struct Channel *chn;
200   uint64_t op_id;
201   uint32_t flags;
202 };
203
204
205 /**
206  * Common part of the client context for both a channel master and slave.
207  */
208 struct Channel
209 {
210   struct Client *clients_head;
211   struct Client *clients_tail;
212
213   struct Operation *op_head;
214   struct Operation *op_tail;
215
216   struct TransmitMessage *tmit_head;
217   struct TransmitMessage *tmit_tail;
218
219   /**
220    * Current PSYCstore operation.
221    */
222   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
223
224   /**
225    * Received fragments not yet sent to the client.
226    * message_id -> FragmentQueue
227    */
228   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
229
230   /**
231    * Received message IDs not yet sent to the client.
232    */
233   struct GNUNET_CONTAINER_Heap *recv_msgs;
234
235   /**
236    * Public key of the channel.
237    */
238   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
239
240   /**
241    * Hash of @a pub_key.
242    */
243   struct GNUNET_HashCode pub_key_hash;
244
245   /**
246    * Last message ID sent to the client.
247    * 0 if there is no such message.
248    */
249   uint64_t max_message_id;
250
251   /**
252    * ID of the last stateful message, where the state operations has been
253    * processed and saved to PSYCstore and which has been sent to the client.
254    * 0 if there is no such message.
255    */
256   uint64_t max_state_message_id;
257
258   /**
259    * Expected value size for the modifier being received from the PSYC service.
260    */
261   uint32_t tmit_mod_value_size_expected;
262
263   /**
264    * Actual value size for the modifier being received from the PSYC service.
265    */
266   uint32_t tmit_mod_value_size;
267
268   /**
269    * @see enum MessageState
270    */
271   uint8_t tmit_state;
272
273   /**
274    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
275    */
276   uint8_t is_master;
277
278   /**
279    * Is this channel ready to receive messages from client?
280    * #GNUNET_YES or #GNUNET_NO
281    */
282   uint8_t is_ready;
283
284   /**
285    * Is the client disconnected?
286    * #GNUNET_YES or #GNUNET_NO
287    */
288   uint8_t is_disconnected;
289 };
290
291
292 /**
293  * Client context for a channel master.
294  */
295 struct Master
296 {
297   /**
298    * Channel struct common for Master and Slave
299    */
300   struct Channel chn;
301
302   /**
303    * Private key of the channel.
304    */
305   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
306
307   /**
308    * Handle for the multicast origin.
309    */
310   struct GNUNET_MULTICAST_Origin *origin;
311
312   /**
313    * Transmit handle for multicast.
314    */
315   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
316
317   /**
318    * Incoming join requests from multicast.
319    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
320    */
321   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
322
323   /**
324    * Last message ID transmitted to this channel.
325    *
326    * Incremented before sending a message, thus the message_id in messages sent
327    * starts from 1.
328    */
329   uint64_t max_message_id;
330
331   /**
332    * ID of the last message with state operations transmitted to the channel.
333    * 0 if there is no such message.
334    */
335   uint64_t max_state_message_id;
336
337   /**
338    * Maximum group generation transmitted to the channel.
339    */
340   uint64_t max_group_generation;
341
342   /**
343    * @see enum GNUNET_PSYC_Policy
344    */
345   enum GNUNET_PSYC_Policy policy;
346 };
347
348
349 /**
350  * Client context for a channel slave.
351  */
352 struct Slave
353 {
354   /**
355    * Channel struct common for Master and Slave
356    */
357   struct Channel chn;
358
359   /**
360    * Private key of the slave.
361    */
362   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
363
364   /**
365    * Public key of the slave.
366    */
367   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
368
369   /**
370    * Hash of @a pub_key.
371    */
372   struct GNUNET_HashCode pub_key_hash;
373
374   /**
375    * Handle for the multicast member.
376    */
377   struct GNUNET_MULTICAST_Member *member;
378
379   /**
380    * Transmit handle for multicast.
381    */
382   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
383
384   /**
385    * Peer identity of the origin.
386    */
387   struct GNUNET_PeerIdentity origin;
388
389   /**
390    * Number of items in @a relays.
391    */
392   uint32_t relay_count;
393
394   /**
395    * Relays that multicast can use to connect.
396    */
397   struct GNUNET_PeerIdentity *relays;
398
399   /**
400    * Join request to be transmitted to the master on join.
401    */
402   struct GNUNET_PSYC_Message *join_msg;
403
404   /**
405    * Join decision received from multicast.
406    */
407   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
408
409   /**
410    * Maximum request ID for this channel.
411    */
412   uint64_t max_request_id;
413 };
414
415
416 static void
417 transmit_message (struct Channel *chn);
418
419
420 static uint64_t
421 message_queue_drop (struct Channel *chn);
422
423
424 /**
425  * Task run during shutdown.
426  *
427  * @param cls unused
428  * @param tc unused
429  */
430 static void
431 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
432 {
433   if (NULL != nc)
434   {
435     GNUNET_SERVER_notification_context_destroy (nc);
436     nc = NULL;
437   }
438   if (NULL != stats)
439   {
440     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
441     stats = NULL;
442   }
443 }
444
445
446 static struct Operation *
447 op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
448         uint64_t op_id, uint32_t flags)
449 {
450   struct Operation *op = GNUNET_malloc (sizeof (*op));
451   op->client = client;
452   op->chn = chn;
453   op->op_id = op_id;
454   op->flags = flags;
455   GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
456   return op;
457 }
458
459
460 static void
461 op_remove (struct Channel *chn, struct Operation *op)
462 {
463   GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
464   GNUNET_free (op);
465 }
466
467
468 /**
469  * Clean up master data structures after a client disconnected.
470  */
471 static void
472 cleanup_master (struct Master *mst)
473 {
474   struct Channel *chn = &mst->chn;
475
476   if (NULL != mst->origin)
477     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
478   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
479   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
480 }
481
482
483 /**
484  * Clean up slave data structures after a client disconnected.
485  */
486 static void
487 cleanup_slave (struct Slave *slv)
488 {
489   struct Channel *chn = &slv->chn;
490   struct GNUNET_CONTAINER_MultiHashMap *
491     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
492                                                 &chn->pub_key_hash);
493   GNUNET_assert (NULL != chn_slv);
494   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
495
496   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
497   {
498     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
499                                           chn_slv);
500     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
501   }
502   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
503
504   if (NULL != slv->join_msg)
505   {
506     GNUNET_free (slv->join_msg);
507     slv->join_msg = NULL;
508   }
509   if (NULL != slv->relays)
510   {
511     GNUNET_free (slv->relays);
512     slv->relays = NULL;
513   }
514   if (NULL != slv->member)
515   {
516     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
517     slv->member = NULL;
518   }
519   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
520 }
521
522
523 /**
524  * Clean up channel data structures after a client disconnected.
525  */
526 static void
527 cleanup_channel (struct Channel *chn)
528 {
529   message_queue_drop (chn);
530   GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
531
532   if (NULL != chn->store_op)
533   {
534     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
535     chn->store_op = NULL;
536   }
537
538   (GNUNET_YES == chn->is_master)
539     ? cleanup_master ((struct Master *) chn)
540     : cleanup_slave ((struct Slave *) chn);
541   GNUNET_free (chn);
542 }
543
544
545 /**
546  * Called whenever a client is disconnected.
547  * Frees our resources associated with that client.
548  *
549  * @param cls Closure.
550  * @param client Identification of the client.
551  */
552 static void
553 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
554 {
555   if (NULL == client)
556     return;
557
558   struct Channel *
559     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
560
561   if (NULL == chn)
562   {
563     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564                 "%p User context is NULL in client_disconnect()\n", chn);
565     GNUNET_break (0);
566     return;
567   }
568
569   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570               "%p Client (%s) disconnected from channel %s\n",
571               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
572               GNUNET_h2s (&chn->pub_key_hash));
573
574   struct Client *cli = chn->clients_head;
575   while (NULL != cli)
576   {
577     if (cli->client == client)
578     {
579       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
580       GNUNET_free (cli);
581       break;
582     }
583     cli = cli->next;
584   }
585
586   struct Operation *op = chn->op_head;
587   while (NULL != op)
588   {
589     if (op->client == client)
590     {
591       op->client = NULL;
592       break;
593     }
594     op = op->next;
595   }
596
597   if (NULL == chn->clients_head)
598   { /* Last client disconnected. */
599     if (NULL != chn->tmit_head)
600     { /* Send pending messages to multicast before cleanup. */
601       transmit_message (chn);
602     }
603     else
604     {
605       cleanup_channel (chn);
606     }
607   }
608 }
609
610
611 /**
612  * Send message to all clients connected to the channel.
613  */
614 static void
615 client_send_msg (const struct Channel *chn,
616                  const struct GNUNET_MessageHeader *msg)
617 {
618   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619               "%p Sending message to clients.\n", chn);
620
621   struct Client *cli = chn->clients_head;
622   while (NULL != cli)
623   {
624     GNUNET_SERVER_notification_context_add (nc, cli->client);
625     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
626     cli = cli->next;
627   }
628 }
629
630
631 /**
632  * Send a result code back to the client.
633  *
634  * @param client
635  *        Client that should receive the result code.
636  * @param result_code
637  *        Code to transmit.
638  * @param op_id
639  *        Operation ID in network byte order.
640  * @param data
641  *        Data payload or NULL.
642  * @param data_size
643  *        Size of @a data.
644  */
645 static void
646 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
647                     int64_t result_code, const void *data, uint16_t data_size)
648 {
649   struct GNUNET_OperationResultMessage *res;
650
651   res = GNUNET_malloc (sizeof (*res) + data_size);
652   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
653   res->header.size = htons (sizeof (*res) + data_size);
654   res->result_code = GNUNET_htonll_signed (result_code);
655   res->op_id = op_id;
656   if (0 < data_size)
657     memcpy (&res[1], data, data_size);
658
659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
660               "%p Sending result to client for operation #%" PRIu64 ": "
661               "%" PRId64 " (size: %u)\n",
662               client, GNUNET_ntohll (op_id), result_code, data_size);
663
664   GNUNET_SERVER_notification_context_add (nc, client);
665   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
666                                               GNUNET_NO);
667   GNUNET_free (res);
668 }
669
670
671 /**
672  * Closure for join_mem_test_cb()
673  */
674 struct JoinMemTestClosure
675 {
676   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
677   struct Channel *chn;
678   struct GNUNET_MULTICAST_JoinHandle *jh;
679   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
680 };
681
682
683 /**
684  * Membership test result callback used for join requests.
685  */
686 static void
687 join_mem_test_cb (void *cls, int64_t result,
688                   const char *err_msg, uint16_t err_msg_size)
689 {
690   struct JoinMemTestClosure *jcls = cls;
691
692   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
693   { /* Pass on join request to client if this is a master channel */
694     struct Master *mst = (struct Master *) jcls->chn;
695     struct GNUNET_HashCode slave_key_hash;
696     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
697                         &slave_key_hash);
698     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
699                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
700     client_send_msg (jcls->chn, &jcls->join_msg->header);
701   }
702   else
703   {
704     if (GNUNET_SYSERR == result)
705     {
706       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707                   "Could not perform membership test (%.*s)\n",
708                   err_msg_size, err_msg);
709     }
710     // FIXME: add relays
711     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
712   }
713   GNUNET_free (jcls->join_msg);
714   GNUNET_free (jcls);
715 }
716
717
718 /**
719  * Incoming join request from multicast.
720  */
721 static void
722 mcast_recv_join_request (void *cls,
723                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
724                          const struct GNUNET_MessageHeader *join_msg,
725                          struct GNUNET_MULTICAST_JoinHandle *jh)
726 {
727   struct Channel *chn = cls;
728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
729
730   uint16_t join_msg_size = 0;
731   if (NULL != join_msg)
732   {
733     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
734     {
735       join_msg_size = ntohs (join_msg->size);
736     }
737     else
738     {
739       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
740                   "%p Got join message with invalid type %u.\n",
741                   chn, ntohs (join_msg->type));
742     }
743   }
744
745   struct GNUNET_PSYC_JoinRequestMessage *
746     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
747   req->header.size = htons (sizeof (*req) + join_msg_size);
748   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
749   req->slave_key = *slave_key;
750   if (0 < join_msg_size)
751     memcpy (&req[1], join_msg, join_msg_size);
752
753   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
754   jcls->slave_key = *slave_key;
755   jcls->chn = chn;
756   jcls->jh = jh;
757   jcls->join_msg = req;
758
759   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
760                                     chn->max_message_id, 0,
761                                     &join_mem_test_cb, jcls);
762 }
763
764
765 /**
766  * Join decision received from multicast.
767  */
768 static void
769 mcast_recv_join_decision (void *cls, int is_admitted,
770                           const struct GNUNET_PeerIdentity *peer,
771                           uint16_t relay_count,
772                           const struct GNUNET_PeerIdentity *relays,
773                           const struct GNUNET_MessageHeader *join_resp)
774 {
775   struct Slave *slv = cls;
776   struct Channel *chn = &slv->chn;
777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778               "%p Got join decision: %d\n", slv, is_admitted);
779
780   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
781   struct GNUNET_PSYC_JoinDecisionMessage *
782     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
783   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
784   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
785   dcsn->is_admitted = htonl (is_admitted);
786   if (0 < join_resp_size)
787     memcpy (&dcsn[1], join_resp, join_resp_size);
788
789   client_send_msg (chn, &dcsn->header);
790
791   if (GNUNET_YES == is_admitted)
792   {
793     chn->is_ready = GNUNET_YES;
794   }
795   else
796   {
797     slv->member = NULL;
798   }
799 }
800
801
802 /**
803  * Received result of GNUNET_PSYCSTORE_membership_test()
804  */
805 static void
806 store_recv_membership_test_result (void *cls, int64_t result,
807                                    const char *err_msg, uint16_t err_msg_size)
808 {
809   struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
810   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811               "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
812               mth, result, err_msg_size, err_msg);
813
814   GNUNET_MULTICAST_membership_test_result (mth, result);
815 }
816
817
818 /**
819  * Incoming membership test request from multicast.
820  */
821 static void
822 mcast_recv_membership_test (void *cls,
823                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
824                             uint64_t message_id, uint64_t group_generation,
825                             struct GNUNET_MULTICAST_MembershipTestHandle *mth)
826 {
827   struct Channel *chn = cls;
828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829               "%p Received membership test request from multicast.\n",
830               mth);
831   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
832                                     message_id, group_generation,
833                                     &store_recv_membership_test_result, mth);
834 }
835
836
837 static int
838 store_recv_fragment_replay (void *cls,
839                             struct GNUNET_MULTICAST_MessageHeader *msg,
840                             enum GNUNET_PSYCSTORE_MessageFlags flags)
841 {
842   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
843
844   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
845   return GNUNET_YES;
846 }
847
848
849 /**
850  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
851  */
852 static void
853 store_recv_fragment_replay_result (void *cls, int64_t result,
854                                    const char *err_msg, uint16_t err_msg_size)
855 {
856   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
859               rh, result, err_msg_size, err_msg);
860
861   switch (result)
862   {
863   case GNUNET_YES:
864     break;
865
866   case GNUNET_NO:
867     GNUNET_MULTICAST_replay_response (rh, NULL,
868                                       GNUNET_MULTICAST_REC_NOT_FOUND);
869     break;
870
871   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
872     GNUNET_MULTICAST_replay_response (rh, NULL,
873                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
874     break;
875
876   case GNUNET_SYSERR:
877     GNUNET_MULTICAST_replay_response (rh, NULL,
878                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
879     break;
880   }
881   GNUNET_MULTICAST_replay_response_end (rh);
882 }
883
884
885 /**
886  * Incoming fragment replay request from multicast.
887  */
888 static void
889 mcast_recv_replay_fragment (void *cls,
890                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
891                             uint64_t fragment_id, uint64_t flags,
892                             struct GNUNET_MULTICAST_ReplayHandle *rh)
893
894 {
895   struct Channel *chn = cls;
896   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
897                                  fragment_id, fragment_id,
898                                  &store_recv_fragment_replay,
899                                  &store_recv_fragment_replay_result, rh);
900 }
901
902
903 /**
904  * Incoming message replay request from multicast.
905  */
906 static void
907 mcast_recv_replay_message (void *cls,
908                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
909                            uint64_t message_id,
910                            uint64_t fragment_offset,
911                            uint64_t flags,
912                            struct GNUNET_MULTICAST_ReplayHandle *rh)
913 {
914   struct Channel *chn = cls;
915   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
916                                 message_id, message_id, NULL,
917                                 &store_recv_fragment_replay,
918                                 &store_recv_fragment_replay_result, rh);
919 }
920
921
922 /**
923  * Convert an uint64_t in network byte order to a HashCode
924  * that can be used as key in a MultiHashMap
925  */
926 static inline void
927 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
928 {
929   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
930   /* TODO: use built-in byte swap functions if available */
931
932   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
933   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
934
935   *key = (struct GNUNET_HashCode) {};
936   *((uint64_t *) key)
937     = (n << 32) | (n >> 32);
938 }
939
940
941 /**
942  * Convert an uint64_t in host byte order to a HashCode
943  * that can be used as key in a MultiHashMap
944  */
945 static inline void
946 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
947 {
948 #if __BYTE_ORDER == __BIG_ENDIAN
949   hash_key_from_nll (key, n);
950 #elif __BYTE_ORDER == __LITTLE_ENDIAN
951   *key = (struct GNUNET_HashCode) {};
952   *((uint64_t *) key) = n;
953 #else
954   #error byteorder undefined
955 #endif
956 }
957
958
959 /**
960  * Initialize PSYC message header.
961  */
962 static inline void
963 psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
964                const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
965 {
966   uint16_t size = ntohs (mmsg->header.size);
967   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
968
969   pmsg->header.size = htons (psize);
970   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
971   pmsg->message_id = mmsg->message_id;
972   pmsg->fragment_offset = mmsg->fragment_offset;
973   pmsg->flags = htonl (flags);
974
975   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
976 }
977
978
979 /**
980  * Create a new PSYC message from a multicast message for sending it to clients.
981  */
982 static inline struct GNUNET_PSYC_MessageHeader *
983 psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
984 {
985   struct GNUNET_PSYC_MessageHeader *pmsg;
986   uint16_t size = ntohs (mmsg->header.size);
987   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
988
989   pmsg = GNUNET_malloc (psize);
990   psyc_msg_init (pmsg, mmsg, flags);
991   return pmsg;
992 }
993
994
995 /**
996  * Send multicast message to all clients connected to the channel.
997  */
998 static void
999 client_send_mcast_msg (struct Channel *chn,
1000                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1001                        uint32_t flags)
1002 {
1003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004               "%p Sending multicast message to client. "
1005               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1006               chn, GNUNET_ntohll (mmsg->fragment_id),
1007               GNUNET_ntohll (mmsg->message_id));
1008
1009   struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags);
1010   client_send_msg (chn, &pmsg->header);
1011   GNUNET_free (pmsg);
1012 }
1013
1014
1015 /**
1016  * Send multicast request to all clients connected to the channel.
1017  */
1018 static void
1019 client_send_mcast_req (struct Master *mst,
1020                        const struct GNUNET_MULTICAST_RequestHeader *req)
1021 {
1022   struct Channel *chn = &mst->chn;
1023
1024   struct GNUNET_PSYC_MessageHeader *pmsg;
1025   uint16_t size = ntohs (req->header.size);
1026   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1027
1028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1029               "%p Sending multicast request to client. "
1030               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
1031               chn, GNUNET_ntohll (req->fragment_id),
1032               GNUNET_ntohll (req->request_id));
1033
1034   pmsg = GNUNET_malloc (psize);
1035   pmsg->header.size = htons (psize);
1036   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1037   pmsg->message_id = req->request_id;
1038   pmsg->fragment_offset = req->fragment_offset;
1039   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1040
1041   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1042   client_send_msg (chn, &pmsg->header);
1043   GNUNET_free (pmsg);
1044 }
1045
1046
1047 /**
1048  * Insert a multicast message fragment into the queue belonging to the message.
1049  *
1050  * @param chn           Channel.
1051  * @param mmsg         Multicast message fragment.
1052  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
1053  * @param first_ptype  First PSYC message part type in @a mmsg.
1054  * @param last_ptype   Last PSYC message part type in @a mmsg.
1055  */
1056 static void
1057 fragment_queue_insert (struct Channel *chn,
1058                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1059                        uint16_t first_ptype, uint16_t last_ptype)
1060 {
1061   const uint16_t size = ntohs (mmsg->header.size);
1062   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
1063   struct GNUNET_CONTAINER_MultiHashMap
1064     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1065                                                     &chn->pub_key_hash);
1066
1067   struct GNUNET_HashCode msg_id_hash;
1068   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
1069
1070   struct FragmentQueue
1071     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1072
1073   if (NULL == fragq)
1074   {
1075     fragq = GNUNET_new (struct FragmentQueue);
1076     fragq->state = MSG_FRAG_STATE_HEADER;
1077     fragq->fragments
1078       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1079
1080     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1081                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1082
1083     if (NULL == chan_msgs)
1084     {
1085       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1086       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1087                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1088     }
1089   }
1090
1091   struct GNUNET_HashCode frag_id_hash;
1092   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1093   struct RecvCacheEntry
1094     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1095   if (NULL == cache_entry)
1096   {
1097     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098                 "%p Adding message fragment to cache. "
1099                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1100                 chn, GNUNET_ntohll (mmsg->message_id),
1101                 GNUNET_ntohll (mmsg->fragment_id));
1102     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                 "%p header_size: %" PRIu64 " + %u\n",
1104                 chn, fragq->header_size, size);
1105     cache_entry = GNUNET_new (struct RecvCacheEntry);
1106     cache_entry->ref_count = 1;
1107     cache_entry->mmsg = GNUNET_malloc (size);
1108     memcpy (cache_entry->mmsg, mmsg, size);
1109     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1110                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1111   }
1112   else
1113   {
1114     cache_entry->ref_count++;
1115     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116                 "%p Message fragment is already in cache. "
1117                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1118                 ", ref_count: %u\n",
1119                 chn, GNUNET_ntohll (mmsg->message_id),
1120                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1121   }
1122
1123   if (MSG_FRAG_STATE_HEADER == fragq->state)
1124   {
1125     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1126     {
1127       struct GNUNET_PSYC_MessageMethod *
1128         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1129       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1130       fragq->flags = ntohl (pmeth->flags);
1131     }
1132
1133     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1134     {
1135       fragq->header_size += size;
1136     }
1137     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1138              || frag_offset == fragq->header_size)
1139     { /* header is now complete */
1140       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1141                   "%p Header of message %" PRIu64 " is complete.\n",
1142                   chn, GNUNET_ntohll (mmsg->message_id));
1143
1144       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1145                   "%p Adding message %" PRIu64 " to queue.\n",
1146                   chn, GNUNET_ntohll (mmsg->message_id));
1147       fragq->state = MSG_FRAG_STATE_DATA;
1148     }
1149     else
1150     {
1151       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1152                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1153                   "%" PRIu64 " != %" PRIu64 "\n",
1154                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1155                   fragq->header_size);
1156     }
1157   }
1158
1159   switch (last_ptype)
1160   {
1161   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1162     if (frag_offset == fragq->size)
1163       fragq->state = MSG_FRAG_STATE_END;
1164     else
1165       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1166                   "%p Message %" PRIu64 " is NOT complete yet: "
1167                   "%" PRIu64 " != %" PRIu64 "\n",
1168                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1169                   fragq->size);
1170     break;
1171
1172   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1173     /* Drop message without delivering to client if it's a single fragment */
1174     fragq->state =
1175       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1176       ? MSG_FRAG_STATE_DROP
1177       : MSG_FRAG_STATE_CANCEL;
1178   }
1179
1180   switch (fragq->state)
1181   {
1182   case MSG_FRAG_STATE_DATA:
1183   case MSG_FRAG_STATE_END:
1184   case MSG_FRAG_STATE_CANCEL:
1185     if (GNUNET_NO == fragq->is_queued)
1186     {
1187       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1188                                     GNUNET_ntohll (mmsg->message_id));
1189       fragq->is_queued = GNUNET_YES;
1190     }
1191   }
1192
1193   fragq->size += size;
1194   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1195                                 GNUNET_ntohll (mmsg->fragment_id));
1196 }
1197
1198
1199 /**
1200  * Run fragment queue of a message.
1201  *
1202  * Send fragments of a message in order to client, after all modifiers arrived
1203  * from multicast.
1204  *
1205  * @param chn      Channel.
1206  * @param msg_id  ID of the message @a fragq belongs to.
1207  * @param fragq   Fragment queue of the message.
1208  * @param drop    Drop message without delivering to client?
1209  *                #GNUNET_YES or #GNUNET_NO.
1210  */
1211 static void
1212 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1213                     struct FragmentQueue *fragq, uint8_t drop)
1214 {
1215   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1216               "%p Running message fragment queue for message %" PRIu64
1217               " (state: %u).\n",
1218               chn, msg_id, fragq->state);
1219
1220   struct GNUNET_CONTAINER_MultiHashMap
1221     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1222                                                     &chn->pub_key_hash);
1223   GNUNET_assert (NULL != chan_msgs);
1224   uint64_t frag_id;
1225
1226   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1227                                                     &frag_id))
1228   {
1229     struct GNUNET_HashCode frag_id_hash;
1230     hash_key_from_hll (&frag_id_hash, frag_id);
1231     struct RecvCacheEntry *cache_entry
1232       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1233     if (cache_entry != NULL)
1234     {
1235       if (GNUNET_NO == drop)
1236       {
1237         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1238       }
1239       if (cache_entry->ref_count <= 1)
1240       {
1241         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1242                                               cache_entry);
1243         GNUNET_free (cache_entry->mmsg);
1244         GNUNET_free (cache_entry);
1245       }
1246       else
1247       {
1248         cache_entry->ref_count--;
1249       }
1250     }
1251 #if CACHE_AGING_IMPLEMENTED
1252     else if (GNUNET_NO == drop)
1253     {
1254       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1255     }
1256 #endif
1257
1258     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1259   }
1260
1261   if (MSG_FRAG_STATE_END <= fragq->state)
1262   {
1263     struct GNUNET_HashCode msg_id_hash;
1264     hash_key_from_hll (&msg_id_hash, msg_id);
1265
1266     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1267     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1268     GNUNET_free (fragq);
1269   }
1270   else
1271   {
1272     fragq->is_queued = GNUNET_NO;
1273   }
1274 }
1275
1276
1277 /**
1278  * Run message queue.
1279  *
1280  * Send messages in queue to client in order after a message has arrived from
1281  * multicast, according to the following:
1282  * - A message is only sent if all of its modifiers arrived.
1283  * - A stateful message is only sent if the previous stateful message
1284  *   has already been delivered to the client.
1285  *
1286  * @param chn  Channel.
1287  *
1288  * @return Number of messages removed from queue and sent to client.
1289  */
1290 static uint64_t
1291 message_queue_run (struct Channel *chn)
1292 {
1293   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1294               "%p Running message queue.\n", chn);
1295   uint64_t n = 0;
1296   uint64_t msg_id;
1297   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1298                                                     &msg_id))
1299   {
1300     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1301                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1302     struct GNUNET_HashCode msg_id_hash;
1303     hash_key_from_hll (&msg_id_hash, msg_id);
1304
1305     struct FragmentQueue *
1306       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1307
1308     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1309     {
1310       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1311                   "%p No fragq (%p) or header not complete.\n",
1312                   chn, fragq);
1313       break;
1314     }
1315
1316     if (MSG_FRAG_STATE_HEADER == fragq->state)
1317     {
1318       /* Check if there's a missing message before the current one */
1319       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1320       {
1321         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1322             && msg_id - 1 != chn->max_message_id)
1323         {
1324           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1325                       "%p Out of order message. "
1326                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1327                       chn, msg_id, chn->max_message_id);
1328           break;
1329         }
1330       }
1331       else
1332       {
1333         if (msg_id - fragq->state_delta != chn->max_state_message_id)
1334         {
1335           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1336                       "%p Out of order stateful message. "
1337                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1338                       chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1339           break;
1340         }
1341 #if TODO
1342         /* FIXME: apply modifiers to state in PSYCstore */
1343         GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1344                                        store_recv_state_modify_result, cls);
1345 #endif
1346         chn->max_state_message_id = msg_id;
1347       }
1348       chn->max_message_id = msg_id;
1349     }
1350     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1351     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1352     n++;
1353   }
1354   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1356   return n;
1357 }
1358
1359
1360 /**
1361  * Drop message queue of a channel.
1362  *
1363  * Remove all messages in queue without sending it to clients.
1364  *
1365  * @param chn  Channel.
1366  *
1367  * @return Number of messages removed from queue.
1368  */
1369 static uint64_t
1370 message_queue_drop (struct Channel *chn)
1371 {
1372   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1373               "%p Dropping message queue.\n", chn);
1374   uint64_t n = 0;
1375   uint64_t msg_id;
1376   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1377                                                     &msg_id))
1378   {
1379     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1380                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1381     struct GNUNET_HashCode msg_id_hash;
1382     hash_key_from_hll (&msg_id_hash, msg_id);
1383
1384     struct FragmentQueue *
1385       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1386     GNUNET_assert (NULL != fragq);
1387     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1388     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1389     n++;
1390   }
1391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1393   return n;
1394 }
1395
1396
1397 /**
1398  * Received result of GNUNET_PSYCSTORE_fragment_store().
1399  */
1400 static void
1401 store_recv_fragment_store_result (void *cls, int64_t result,
1402                                   const char *err_msg, uint16_t err_msg_size)
1403 {
1404   struct Channel *chn = cls;
1405   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1406               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1407               chn, result, err_msg_size, err_msg);
1408 }
1409
1410
1411 /**
1412  * Handle incoming message fragment from multicast.
1413  *
1414  * Store it using PSYCstore and send it to the clients of the channel in order.
1415  */
1416 static void
1417 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1418 {
1419   struct Channel *chn = cls;
1420   uint16_t size = ntohs (mmsg->header.size);
1421
1422   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423               "%p Received multicast message of size %u.\n",
1424               chn, size);
1425
1426   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1427                                    &store_recv_fragment_store_result, chn);
1428
1429   uint16_t first_ptype = 0, last_ptype = 0;
1430   if (GNUNET_SYSERR
1431       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1432                                           (const char *) &mmsg[1],
1433                                           &first_ptype, &last_ptype))
1434   {
1435     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1436                 "%p Dropping incoming multicast message with invalid parts.\n",
1437                 chn);
1438     GNUNET_break_op (0);
1439     return;
1440   }
1441
1442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443               "Message parts: first: type %u, last: type %u\n",
1444               first_ptype, last_ptype);
1445
1446   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1447   message_queue_run (chn);
1448 }
1449
1450
1451 /**
1452  * Incoming request fragment from multicast for a master.
1453  *
1454  * @param cls   Master.
1455  * @param req   The request.
1456  */
1457 static void
1458 mcast_recv_request (void *cls,
1459                     const struct GNUNET_MULTICAST_RequestHeader *req)
1460 {
1461   struct Master *mst = cls;
1462   uint16_t size = ntohs (req->header.size);
1463
1464   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1465               "%p Received multicast request of size %u.\n",
1466               mst, size);
1467
1468   uint16_t first_ptype = 0, last_ptype = 0;
1469   if (GNUNET_SYSERR
1470       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1471                                           (const char *) &req[1],
1472                                           &first_ptype, &last_ptype))
1473   {
1474     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1475                 "%p Dropping incoming multicast request with invalid parts.\n",
1476                 mst);
1477     GNUNET_break_op (0);
1478     return;
1479   }
1480
1481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1482               "Message parts: first: type %u, last: type %u\n",
1483               first_ptype, last_ptype);
1484
1485   /* FIXME: in-order delivery */
1486   client_send_mcast_req (mst, req);
1487 }
1488
1489
1490 /**
1491  * Response from PSYCstore with the current counter values for a channel master.
1492  */
1493 static void
1494 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1495                             uint64_t max_message_id, uint64_t max_group_generation,
1496                             uint64_t max_state_message_id)
1497 {
1498   struct Master *mst = cls;
1499   struct Channel *chn = &mst->chn;
1500   chn->store_op = NULL;
1501
1502   struct GNUNET_PSYC_CountersResultMessage res;
1503   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1504   res.header.size = htons (sizeof (res));
1505   res.result_code = GNUNET_htonl_signed (result);
1506   res.max_message_id = GNUNET_htonll (max_message_id);
1507
1508   if (GNUNET_OK == result || GNUNET_NO == result)
1509   {
1510     mst->max_message_id = max_message_id;
1511     chn->max_message_id = max_message_id;
1512     chn->max_state_message_id = max_state_message_id;
1513     mst->max_group_generation = max_group_generation;
1514     mst->origin
1515       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1516                                        &mcast_recv_join_request,
1517                                        &mcast_recv_membership_test,
1518                                        &mcast_recv_replay_fragment,
1519                                        &mcast_recv_replay_message,
1520                                        &mcast_recv_request,
1521                                        &mcast_recv_message, chn);
1522     chn->is_ready = GNUNET_YES;
1523   }
1524   else
1525   {
1526     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1527                 "%p GNUNET_PSYCSTORE_counters_get() "
1528                 "returned %d for channel %s.\n",
1529                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1530   }
1531
1532   client_send_msg (chn, &res.header);
1533 }
1534
1535
1536 /**
1537  * Response from PSYCstore with the current counter values for a channel slave.
1538  */
1539 void
1540 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1541                            uint64_t max_message_id, uint64_t max_group_generation,
1542                            uint64_t max_state_message_id)
1543 {
1544   struct Slave *slv = cls;
1545   struct Channel *chn = &slv->chn;
1546   chn->store_op = NULL;
1547
1548   struct GNUNET_PSYC_CountersResultMessage res;
1549   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1550   res.header.size = htons (sizeof (res));
1551   res.result_code = GNUNET_htonl_signed (result);
1552   res.max_message_id = GNUNET_htonll (max_message_id);
1553
1554   if (GNUNET_OK == result || GNUNET_NO == result)
1555   {
1556     chn->max_message_id = max_message_id;
1557     chn->max_state_message_id = max_state_message_id;
1558     slv->member
1559       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1560                                       &slv->origin,
1561                                       slv->relay_count, slv->relays,
1562                                       &slv->join_msg->header,
1563                                       &mcast_recv_join_request,
1564                                       &mcast_recv_join_decision,
1565                                       &mcast_recv_membership_test,
1566                                       &mcast_recv_replay_fragment,
1567                                       &mcast_recv_replay_message,
1568                                       &mcast_recv_message, chn);
1569     if (NULL != slv->join_msg)
1570     {
1571       GNUNET_free (slv->join_msg);
1572       slv->join_msg = NULL;
1573     }
1574   }
1575   else
1576   {
1577     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1578                 "%p GNUNET_PSYCSTORE_counters_get() "
1579                 "returned %d for channel %s.\n",
1580                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1581   }
1582
1583   client_send_msg (chn, &res.header);
1584 }
1585
1586
1587 static void
1588 channel_init (struct Channel *chn)
1589 {
1590   chn->recv_msgs
1591     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1592   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1593 }
1594
1595
1596 /**
1597  * Handle a connecting client starting a channel master.
1598  */
1599 static void
1600 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1601                           const struct GNUNET_MessageHeader *msg)
1602 {
1603   const struct MasterStartRequest *req
1604     = (const struct MasterStartRequest *) msg;
1605
1606   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1607   struct GNUNET_HashCode pub_key_hash;
1608
1609   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1610   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1611
1612   struct Master *
1613     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1614   struct Channel *chn;
1615
1616   if (NULL == mst)
1617   {
1618     mst = GNUNET_new (struct Master);
1619     mst->policy = ntohl (req->policy);
1620     mst->priv_key = req->channel_key;
1621     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1622
1623     chn = &mst->chn;
1624     chn->is_master = GNUNET_YES;
1625     chn->pub_key = pub_key;
1626     chn->pub_key_hash = pub_key_hash;
1627     channel_init (chn);
1628
1629     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1630                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1631     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1632                                                    store_recv_master_counters, mst);
1633   }
1634   else
1635   {
1636     chn = &mst->chn;
1637
1638     struct GNUNET_PSYC_CountersResultMessage res;
1639     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1640     res.header.size = htons (sizeof (res));
1641     res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1642     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1643
1644     GNUNET_SERVER_notification_context_add (nc, client);
1645     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1646                                                 GNUNET_NO);
1647   }
1648
1649   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1650               "%p Client connected as master to channel %s.\n",
1651               mst, GNUNET_h2s (&chn->pub_key_hash));
1652
1653   struct Client *cli = GNUNET_new (struct Client);
1654   cli->client = client;
1655   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1656
1657   GNUNET_SERVER_client_set_user_context (client, chn);
1658   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1659 }
1660
1661
1662 /**
1663  * Handle a connecting client joining as a channel slave.
1664  */
1665 static void
1666 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1667                         const struct GNUNET_MessageHeader *msg)
1668 {
1669   const struct SlaveJoinRequest *req
1670     = (const struct SlaveJoinRequest *) msg;
1671   uint16_t req_size = ntohs (req->header.size);
1672
1673   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1674   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1675
1676   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1677   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1678   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1679
1680   struct GNUNET_CONTAINER_MultiHashMap *
1681     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1682   struct Slave *slv = NULL;
1683   struct Channel *chn;
1684
1685   if (NULL != chn_slv)
1686   {
1687     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1688   }
1689   if (NULL == slv)
1690   {
1691     slv = GNUNET_new (struct Slave);
1692     slv->priv_key = req->slave_key;
1693     slv->pub_key = slv_pub_key;
1694     slv->pub_key_hash = slv_pub_key_hash;
1695     slv->origin = req->origin;
1696     slv->relay_count = ntohl (req->relay_count);
1697
1698     const struct GNUNET_PeerIdentity *
1699       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1700     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1701     uint16_t join_msg_size = 0;
1702
1703     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1704         <= req_size)
1705     {
1706       join_msg_size = ntohs (slv->join_msg->header.size);
1707       slv->join_msg = GNUNET_malloc (join_msg_size);
1708       memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1709     }
1710     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1711     {
1712       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1713                   "%u + %u + %u != %u\n",
1714                   sizeof (*req), relay_size, join_msg_size, req_size);
1715       GNUNET_break (0);
1716       GNUNET_SERVER_client_disconnect (client);
1717       GNUNET_free (slv);
1718       return;
1719     }
1720     if (0 < slv->relay_count)
1721     {
1722       slv->relays = GNUNET_malloc (relay_size);
1723       memcpy (slv->relays, &req[1], relay_size);
1724     }
1725
1726     chn = &slv->chn;
1727     chn->is_master = GNUNET_NO;
1728     chn->pub_key = req->channel_key;
1729     chn->pub_key_hash = pub_key_hash;
1730     channel_init (chn);
1731
1732     if (NULL == chn_slv)
1733     {
1734       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1735       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1736                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1737     }
1738     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1739                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1740     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1741                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1742     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1743                                                   &store_recv_slave_counters, slv);
1744   }
1745   else
1746   {
1747     chn = &slv->chn;
1748
1749     struct GNUNET_PSYC_CountersResultMessage res;
1750     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1751     res.header.size = htons (sizeof (res));
1752     res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1753     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1754
1755     GNUNET_SERVER_notification_context_add (nc, client);
1756     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1757                                                 GNUNET_NO);
1758
1759     if (NULL == slv->member)
1760     {
1761       slv->member
1762         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1763                                         &slv->origin,
1764                                         slv->relay_count, slv->relays,
1765                                         &slv->join_msg->header,
1766                                         &mcast_recv_join_request,
1767                                         &mcast_recv_join_decision,
1768                                         &mcast_recv_membership_test,
1769                                         &mcast_recv_replay_fragment,
1770                                         &mcast_recv_replay_message,
1771                                         &mcast_recv_message, chn);
1772       if (NULL != slv->join_msg)
1773       {
1774         GNUNET_free (slv->join_msg);
1775         slv->join_msg = NULL;
1776       }
1777     }
1778     else if (NULL != slv->join_dcsn)
1779     {
1780       GNUNET_SERVER_notification_context_add (nc, client);
1781       GNUNET_SERVER_notification_context_unicast (nc, client,
1782                                                   &slv->join_dcsn->header,
1783                                                   GNUNET_NO);
1784     }
1785   }
1786
1787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788               "%p Client connected as slave to channel %s.\n",
1789               slv, GNUNET_h2s (&chn->pub_key_hash));
1790
1791   struct Client *cli = GNUNET_new (struct Client);
1792   cli->client = client;
1793   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1794
1795   GNUNET_SERVER_client_set_user_context (client, chn);
1796   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1797 }
1798
1799
1800 struct JoinDecisionClosure
1801 {
1802   int32_t is_admitted;
1803   struct GNUNET_MessageHeader *msg;
1804 };
1805
1806
1807 /**
1808  * Iterator callback for sending join decisions to multicast.
1809  */
1810 static int
1811 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1812                           void *value)
1813 {
1814   struct JoinDecisionClosure *jcls = cls;
1815   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1816   // FIXME: add relays
1817   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1818   return GNUNET_YES;
1819 }
1820
1821
1822 /**
1823  * Join decision from client.
1824  */
1825 static void
1826 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1827                            const struct GNUNET_MessageHeader *msg)
1828 {
1829   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1830     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1831   struct Channel *chn;
1832   struct Master *mst;
1833   struct JoinDecisionClosure jcls;
1834
1835   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1836   if (NULL == chn)
1837   {
1838     GNUNET_break (0);
1839     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1840     return;
1841   }
1842   GNUNET_assert (GNUNET_YES == chn->is_master);
1843   mst = (struct Master *) chn;
1844   jcls.is_admitted = ntohl (dcsn->is_admitted);
1845   jcls.msg
1846     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1847     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1848     : NULL;
1849
1850   struct GNUNET_HashCode slave_key_hash;
1851   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1852                       &slave_key_hash);
1853
1854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1855               "%p Got join decision (%d) from client for channel %s..\n",
1856               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1858               "%p ..and slave %s.\n",
1859               mst, GNUNET_h2s (&slave_key_hash));
1860
1861   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1862                                               &mcast_send_join_decision, &jcls);
1863   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1864   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1865 }
1866
1867
1868 /**
1869  * Send acknowledgement to a client.
1870  *
1871  * Sent after a message fragment has been passed on to multicast.
1872  *
1873  * @param chn The channel struct for the client.
1874  */
1875 static void
1876 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1877 {
1878   struct GNUNET_MessageHeader res;
1879   res.size = htons (sizeof (res));
1880   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1881
1882   /* FIXME */
1883   GNUNET_SERVER_notification_context_add (nc, client);
1884   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1885 }
1886
1887
1888 /**
1889  * Callback for the transmit functions of multicast.
1890  */
1891 static int
1892 transmit_notify (void *cls, size_t *data_size, void *data)
1893 {
1894   struct Channel *chn = cls;
1895   struct TransmitMessage *tmit_msg = chn->tmit_head;
1896
1897   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1898   {
1899     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900                 "%p transmit_notify: nothing to send.\n", chn);
1901     *data_size = 0;
1902     return GNUNET_NO;
1903   }
1904
1905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1907
1908   *data_size = tmit_msg->size;
1909   memcpy (data, &tmit_msg[1], *data_size);
1910
1911   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1912
1913   if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1914     send_message_ack (chn, tmit_msg->client);
1915
1916   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1917   GNUNET_free (tmit_msg);
1918
1919   if (NULL != chn->tmit_head)
1920   {
1921     transmit_message (chn);
1922   }
1923   else if (GNUNET_YES == chn->is_disconnected)
1924   {
1925     /* FIXME: handle partial message (when still in_transmit) */
1926     cleanup_channel (chn);
1927   }
1928   return ret;
1929 }
1930
1931
1932 /**
1933  * Callback for the transmit functions of multicast.
1934  */
1935 static int
1936 master_transmit_notify (void *cls, size_t *data_size, void *data)
1937 {
1938   int ret = transmit_notify (cls, data_size, data);
1939
1940   if (GNUNET_YES == ret)
1941   {
1942     struct Master *mst = cls;
1943     mst->tmit_handle = NULL;
1944   }
1945   return ret;
1946 }
1947
1948
1949 /**
1950  * Callback for the transmit functions of multicast.
1951  */
1952 static int
1953 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1954 {
1955   int ret = transmit_notify (cls, data_size, data);
1956
1957   if (GNUNET_YES == ret)
1958   {
1959     struct Slave *slv = cls;
1960     slv->tmit_handle = NULL;
1961   }
1962   return ret;
1963 }
1964
1965
1966 /**
1967  * Transmit a message from a channel master to the multicast group.
1968  */
1969 static void
1970 master_transmit_message (struct Master *mst)
1971 {
1972   if (NULL == mst->tmit_handle)
1973   {
1974     mst->tmit_handle
1975       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1976                                         mst->max_group_generation,
1977                                         master_transmit_notify, mst);
1978   }
1979   else
1980   {
1981     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1982   }
1983 }
1984
1985
1986 /**
1987  * Transmit a message from a channel slave to the multicast group.
1988  */
1989 static void
1990 slave_transmit_message (struct Slave *slv)
1991 {
1992   if (NULL == slv->tmit_handle)
1993   {
1994     slv->tmit_handle
1995       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1996                                            slave_transmit_notify, slv);
1997   }
1998   else
1999   {
2000     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
2001   }
2002 }
2003
2004
2005 static void
2006 transmit_message (struct Channel *chn)
2007 {
2008   chn->is_master
2009     ? master_transmit_message ((struct Master *) chn)
2010     : slave_transmit_message ((struct Slave *) chn);
2011 }
2012
2013
2014 /**
2015  * Queue a message from a channel master for sending to the multicast group.
2016  */
2017 static void
2018 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2019                      uint16_t first_ptype, uint16_t last_ptype)
2020 {
2021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
2022
2023   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2024   {
2025     tmit_msg->id = ++mst->max_message_id;
2026     struct GNUNET_PSYC_MessageMethod *pmeth
2027       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2028
2029     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
2030     {
2031       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
2032     }
2033     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2034     {
2035       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2036                                           - mst->max_state_message_id);
2037     }
2038     else
2039     {
2040       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2041     }
2042   }
2043 }
2044
2045
2046 /**
2047  * Queue a message from a channel slave for sending to the multicast group.
2048  */
2049 static void
2050 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
2051                      uint16_t first_ptype, uint16_t last_ptype)
2052 {
2053   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2054   {
2055     struct GNUNET_PSYC_MessageMethod *pmeth
2056       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
2057     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2058     tmit_msg->id = ++slv->max_request_id;
2059   }
2060 }
2061
2062
2063 /**
2064  * Queue PSYC message parts for sending to multicast.
2065  *
2066  * @param chn           Channel to send to.
2067  * @param client       Client the message originates from.
2068  * @param data_size    Size of @a data.
2069  * @param data         Concatenated message parts.
2070  * @param first_ptype  First message part type in @a data.
2071  * @param last_ptype   Last message part type in @a data.
2072  */
2073 static struct TransmitMessage *
2074 queue_message (struct Channel *chn,
2075                struct GNUNET_SERVER_Client *client,
2076                size_t data_size,
2077                const void *data,
2078                uint16_t first_ptype, uint16_t last_ptype)
2079 {
2080   struct TransmitMessage *
2081     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2082   memcpy (&tmit_msg[1], data, data_size);
2083   tmit_msg->client = client;
2084   tmit_msg->size = data_size;
2085   tmit_msg->state = chn->tmit_state;
2086
2087   /* FIXME: separate queue per message ID */
2088
2089   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2090
2091   chn->is_master
2092     ? master_queue_message ((struct Master *) chn, tmit_msg,
2093                             first_ptype, last_ptype)
2094     : slave_queue_message ((struct Slave *) chn, tmit_msg,
2095                            first_ptype, last_ptype);
2096   return tmit_msg;
2097 }
2098
2099
2100 /**
2101  * Cancel transmission of current message.
2102  *
2103  * @param chn     Channel to send to.
2104  * @param client  Client the message originates from.
2105  */
2106 static void
2107 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2108 {
2109   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2110
2111   struct GNUNET_MessageHeader msg;
2112   msg.size = htons (sizeof (msg));
2113   msg.type = htons (type);
2114
2115   queue_message (chn, client, sizeof (msg), &msg, type, type);
2116   transmit_message (chn);
2117
2118   /* FIXME: cleanup */
2119 }
2120
2121
2122 /**
2123  * Incoming message from a master or slave client.
2124  */
2125 static void
2126 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2127                           const struct GNUNET_MessageHeader *msg)
2128 {
2129   struct Channel *
2130     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2131   GNUNET_assert (NULL != chn);
2132
2133   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134               "%p Received message from client.\n", chn);
2135   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2136
2137   if (GNUNET_YES != chn->is_ready)
2138   {
2139     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2140                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2141     GNUNET_break (0);
2142     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2143     return;
2144   }
2145
2146   uint16_t size = ntohs (msg->size);
2147   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2148   {
2149     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2150     GNUNET_break (0);
2151     transmit_cancel (chn, client);
2152     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2153     return;
2154   }
2155
2156   uint16_t first_ptype = 0, last_ptype = 0;
2157   if (GNUNET_SYSERR
2158       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2159                                           (const char *) &msg[1],
2160                                           &first_ptype, &last_ptype))
2161   {
2162     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2163                 "%p Received invalid message part from client.\n", chn);
2164     GNUNET_break (0);
2165     transmit_cancel (chn, client);
2166     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2167     return;
2168   }
2169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170               "%p Received message with first part type %u and last part type %u.\n",
2171               chn, first_ptype, last_ptype);
2172
2173   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2174                  first_ptype, last_ptype);
2175   transmit_message (chn);
2176   /* FIXME: send a few ACKs even before transmit_notify is called */
2177
2178   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2179 };
2180
2181
2182 struct MembershipStoreClosure
2183 {
2184   struct GNUNET_SERVER_Client *client;
2185   struct Channel *chn;
2186   uint64_t op_id;
2187 };
2188
2189
2190 /**
2191  * Received result of GNUNET_PSYCSTORE_membership_store()
2192  */
2193 static void
2194 store_recv_membership_store_result (void *cls, int64_t result,
2195                                     const char *err_msg, uint16_t err_msg_size)
2196 {
2197   struct MembershipStoreClosure *mcls = cls;
2198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2199               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2200               mcls->chn, result, err_msg_size, err_msg);
2201
2202   client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size);
2203 }
2204
2205
2206 /**
2207  * Client requests to add/remove a slave in the membership database.
2208  */
2209 static void
2210 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2211                               const struct GNUNET_MessageHeader *msg)
2212 {
2213   struct Channel *
2214     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2215   GNUNET_assert (NULL != chn);
2216
2217   const struct ChannelMembershipStoreRequest *
2218     req = (const struct ChannelMembershipStoreRequest *) msg;
2219
2220   struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2221   mcls->client = client;
2222   mcls->chn = chn;
2223   mcls->op_id = req->op_id;
2224
2225   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2226   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2227   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2228               "%p Received membership store request from client.\n", chn);
2229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2230               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2231               chn, req->did_join, announced_at, effective_since);
2232
2233   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2234                                      req->did_join, announced_at, effective_since,
2235                                      0, /* FIXME: group_generation */
2236                                      &store_recv_membership_store_result, mcls);
2237   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2238 }
2239
2240
2241 /**
2242  * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2243  * in response to a history request from a client.
2244  */
2245 static int
2246 store_recv_fragment_history (void *cls,
2247                              struct GNUNET_MULTICAST_MessageHeader *mmsg,
2248                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2249 {
2250   struct Operation *op = cls;
2251   if (NULL == op->client)
2252   { /* Requesting client already disconnected. */
2253     return GNUNET_NO;
2254   }
2255   struct Channel *chn = op->chn;
2256
2257   struct GNUNET_PSYC_MessageHeader *pmsg;
2258   uint16_t msize = ntohs (mmsg->header.size);
2259   uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2260
2261   struct GNUNET_OperationResultMessage *
2262     res = GNUNET_malloc (sizeof (*res) + psize);
2263   res->header.size = htons (sizeof (*res) + psize);
2264   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2265   res->op_id = op->op_id;
2266   res->result_code = GNUNET_htonll_signed (GNUNET_OK);
2267
2268   pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2269   psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2270   memcpy (&res[1], pmsg, psize);
2271
2272   /** @todo FIXME: send only to requesting client */
2273   client_send_msg (chn, &res->header);
2274   return GNUNET_YES;
2275 }
2276
2277
2278 /**
2279  * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2280  * in response to a history request from a client.
2281  */
2282 static void
2283 store_recv_fragment_history_result (void *cls, int64_t result,
2284                                     const char *err_msg, uint16_t err_msg_size)
2285 {
2286   struct Operation *op = cls;
2287   if (NULL == op->client)
2288   { /* Requesting client already disconnected. */
2289     return;
2290   }
2291
2292   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2293               "%p History replay #%" PRIu64 ": "
2294               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2295               op->chn, op->op_id, result, err_msg_size, err_msg);
2296
2297   if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2298   {
2299     /** @todo Multicast replay request for messages not found locally. */
2300   }
2301
2302   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2303 }
2304
2305
2306 /**
2307  * Client requests channel history.
2308  */
2309 static void
2310 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2311                             const struct GNUNET_MessageHeader *msg)
2312 {
2313   struct Channel *
2314     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2315   GNUNET_assert (NULL != chn);
2316
2317   const struct GNUNET_PSYC_HistoryRequestMessage *
2318     req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2319   uint16_t size = ntohs (msg->size);
2320   const char *method_prefix = (const char *) &req[1];
2321
2322   if (size < sizeof (*req) + 1
2323       || '\0' != method_prefix[size - sizeof (*req) - 1])
2324   {
2325     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2326                 "%p History replay #%" PRIu64 ": "
2327                 "invalid method prefix. size: %u < %u?\n",
2328                 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2329     GNUNET_break (0);
2330     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2331     return;
2332   }
2333
2334   struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2335
2336   if (0 == req->message_limit)
2337     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2338                                   GNUNET_ntohll (req->start_message_id),
2339                                   GNUNET_ntohll (req->end_message_id),
2340                                   method_prefix,
2341                                   &store_recv_fragment_history,
2342                                   &store_recv_fragment_history_result, op);
2343   else
2344     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2345                                          GNUNET_ntohll (req->message_limit),
2346                                          method_prefix,
2347                                          &store_recv_fragment_history,
2348                                          &store_recv_fragment_history_result,
2349                                          op);
2350
2351   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2352 }
2353
2354
2355 /**
2356  * Received state var from PSYCstore, send it to client.
2357  */
2358 static int
2359 store_recv_state_var (void *cls, const char *name,
2360                       const void *value, size_t value_size)
2361 {
2362   struct Operation *op = cls;
2363   struct GNUNET_OperationResultMessage *res;
2364
2365   if (NULL != name)
2366   {
2367     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2368     struct GNUNET_PSYC_MessageModifier *mod;
2369     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2370     res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2371     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2372     res->op_id = op->op_id;
2373
2374     mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2375     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2376     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2377     mod->name_size = htons (name_size);
2378     mod->value_size = htonl (value_size);
2379     mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2380     memcpy (&mod[1], name, name_size);
2381     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2382   }
2383   else
2384   {
2385     struct GNUNET_MessageHeader *mod;
2386     res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2387     res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2388     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2389     res->op_id = op->op_id;
2390
2391     mod = (struct GNUNET_MessageHeader *) &res[1];
2392     mod->size = htons (sizeof (*mod) + value_size);
2393     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2394     memcpy (&mod[1], value, value_size);
2395   }
2396
2397   // FIXME: client might have been disconnected
2398   GNUNET_SERVER_notification_context_add (nc, op->client);
2399   GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2400                                               GNUNET_NO);
2401   GNUNET_free (op);
2402   return GNUNET_YES;
2403 }
2404
2405
2406 /**
2407  * Received result of GNUNET_PSYCSTORE_state_get()
2408  * or GNUNET_PSYCSTORE_state_get_prefix()
2409  */
2410 static void
2411 store_recv_state_result (void *cls, int64_t result,
2412                          const char *err_msg, uint16_t err_msg_size)
2413 {
2414   struct Operation *op = cls;
2415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2416               "%p History replay #%" PRIu64 ": "
2417               "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2418               op->chn, op->op_id, result, err_msg_size, err_msg);
2419
2420   // FIXME: client might have been disconnected
2421   client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2422 }
2423
2424
2425 /**
2426  * Client requests best matching state variable from PSYCstore.
2427  */
2428 static void
2429 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2430                        const struct GNUNET_MessageHeader *msg)
2431 {
2432   struct Channel *
2433     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2434   GNUNET_assert (NULL != chn);
2435
2436   const struct StateRequest *
2437     req = (const struct StateRequest *) msg;
2438
2439   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2440   const char *name = (const char *) &req[1];
2441   if (0 == name_size || '\0' != name[name_size - 1])
2442   {
2443     GNUNET_break (0);
2444     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2445     return;
2446   }
2447
2448   struct Operation *op = op_add (chn, client, req->op_id, 0);
2449   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2450                               &store_recv_state_var,
2451                               &store_recv_state_result, op);
2452   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2453 }
2454
2455
2456 /**
2457  * Client requests state variables with a given prefix from PSYCstore.
2458  */
2459 static void
2460 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2461                               const struct GNUNET_MessageHeader *msg)
2462 {
2463   struct Channel *
2464     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2465   GNUNET_assert (NULL != chn);
2466
2467   const struct StateRequest *
2468     req = (const struct StateRequest *) msg;
2469
2470   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2471   const char *name = (const char *) &req[1];
2472   if (0 == name_size || '\0' != name[name_size - 1])
2473   {
2474     GNUNET_break (0);
2475     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2476     return;
2477   }
2478
2479   struct Operation *op = op_add (chn, client, req->op_id, 0);
2480   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2481                                      &store_recv_state_var,
2482                                      &store_recv_state_result, op);
2483   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2484 }
2485
2486
2487 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2488   { &client_recv_master_start, NULL,
2489     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2490
2491   { &client_recv_slave_join, NULL,
2492     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2493
2494   { &client_recv_join_decision, NULL,
2495     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2496
2497   { &client_recv_psyc_message, NULL,
2498     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2499
2500   { &client_recv_membership_store, NULL,
2501     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2502
2503   { &client_recv_history_replay, NULL,
2504     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2505
2506   { &client_recv_state_get, NULL,
2507     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2508
2509   { &client_recv_state_get_prefix, NULL,
2510     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2511
2512   { NULL, NULL, 0, 0 }
2513 };
2514
2515
2516 /**
2517  * Initialize the PSYC service.
2518  *
2519  * @param cls Closure.
2520  * @param server The initialized server.
2521  * @param c Configuration to use.
2522  */
2523 static void
2524 run (void *cls, struct GNUNET_SERVER_Handle *server,
2525      const struct GNUNET_CONFIGURATION_Handle *c)
2526 {
2527   cfg = c;
2528   store = GNUNET_PSYCSTORE_connect (cfg);
2529   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2530   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2531   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2532   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2533   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2534   nc = GNUNET_SERVER_notification_context_create (server, 1);
2535   GNUNET_SERVER_add_handlers (server, server_handlers);
2536   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2537   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2538                                 &shutdown_task, NULL);
2539 }
2540
2541
2542 /**
2543  * The main function for the service.
2544  *
2545  * @param argc number of arguments from the command line
2546  * @param argv command line arguments
2547  * @return 0 ok, 1 on error
2548  */
2549 int
2550 main (int argc, char *const *argv)
2551 {
2552   return (GNUNET_OK ==
2553           GNUNET_SERVICE_run (argc, argv, "psyc",
2554                               GNUNET_SERVICE_OPTION_NONE,
2555                               &run, NULL)) ? 0 : 1;
2556 }
2557
2558 /* end of gnunet-service-psyc.c */