e7020bc69a72d86a2a98ad29c176268a036de2ab
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * @see enum MessageState
102    */
103   uint8_t state;
104
105   /**
106    * Whether a message ACK has already been sent to the client.
107    * #GNUNET_YES or #GNUNET_NO
108    */
109   uint8_t ack_sent;
110
111   /* Followed by message */
112 };
113
114
115 /**
116  * Cache for received message fragments.
117  * Message fragments are only sent to clients after all modifiers arrived.
118  *
119  * chan_key -> MultiHashMap chan_msgs
120  */
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
122
123
124 /**
125  * Entry in the chan_msgs hashmap of @a recv_cache:
126  * fragment_id -> RecvCacheEntry
127  */
128 struct RecvCacheEntry
129 {
130   struct GNUNET_MULTICAST_MessageHeader *mmsg;
131   uint16_t ref_count;
132 };
133
134
135 /**
136  * Entry in the @a recv_frags hash map of a @a Channel.
137  * message_id -> FragmentQueue
138  */
139 struct FragmentQueue
140 {
141   /**
142    * Fragment IDs stored in @a recv_cache.
143    */
144   struct GNUNET_CONTAINER_Heap *fragments;
145
146   /**
147    * Total size of received fragments.
148    */
149   uint64_t size;
150
151   /**
152    * Total size of received header fragments (METHOD & MODIFIERs)
153    */
154   uint64_t header_size;
155
156   /**
157    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158    */
159   uint64_t state_delta;
160
161   /**
162    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
163    */
164   uint32_t flags;
165
166   /**
167    * Receive state of message.
168    *
169    * @see MessageFragmentState
170    */
171   uint8_t state;
172
173   /**
174    * Is the message queued for delivery to the client?
175    * i.e. added to the recv_msgs queue
176    */
177   uint8_t is_queued;
178 };
179
180
181 /**
182  * List of connected clients.
183  */
184 struct ClientListItem
185 {
186   struct ClientListItem *prev;
187   struct ClientListItem *next;
188   struct GNUNET_SERVER_Client *client;
189 };
190
191
192 /**
193  * Common part of the client context for both a channel master and slave.
194  */
195 struct Channel
196 {
197   struct ClientListItem *clients_head;
198   struct ClientListItem *clients_tail;
199
200   struct TransmitMessage *tmit_head;
201   struct TransmitMessage *tmit_tail;
202
203   /**
204    * Current PSYCstore operation.
205    */
206   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
207
208   /**
209    * Received fragments not yet sent to the client.
210    * message_id -> FragmentQueue
211    */
212   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
213
214   /**
215    * Received message IDs not yet sent to the client.
216    */
217   struct GNUNET_CONTAINER_Heap *recv_msgs;
218
219   /**
220    * Public key of the channel.
221    */
222   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
223
224   /**
225    * Hash of @a pub_key.
226    */
227   struct GNUNET_HashCode pub_key_hash;
228
229   /**
230    * Last message ID sent to the client.
231    * 0 if there is no such message.
232    */
233   uint64_t max_message_id;
234
235   /**
236    * ID of the last stateful message, where the state operations has been
237    * processed and saved to PSYCstore and which has been sent to the client.
238    * 0 if there is no such message.
239    */
240   uint64_t max_state_message_id;
241
242   /**
243    * Expected value size for the modifier being received from the PSYC service.
244    */
245   uint32_t tmit_mod_value_size_expected;
246
247   /**
248    * Actual value size for the modifier being received from the PSYC service.
249    */
250   uint32_t tmit_mod_value_size;
251
252   /**
253    * @see enum MessageState
254    */
255   uint8_t tmit_state;
256
257   /**
258    * FIXME: needed?
259    */
260   uint8_t in_transmit;
261
262   /**
263    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
264    */
265   uint8_t is_master;
266
267   /**
268    * Is this channel ready to receive messages from client?
269    * #GNUNET_YES or #GNUNET_NO
270    */
271   uint8_t is_ready;
272
273   /**
274    * Is the client disconnected?
275    * #GNUNET_YES or #GNUNET_NO
276    */
277   uint8_t is_disconnected;
278 };
279
280
281 /**
282  * Client context for a channel master.
283  */
284 struct Master
285 {
286   /**
287    * Channel struct common for Master and Slave
288    */
289   struct Channel chn;
290
291   /**
292    * Private key of the channel.
293    */
294   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
295
296   /**
297    * Handle for the multicast origin.
298    */
299   struct GNUNET_MULTICAST_Origin *origin;
300
301   /**
302    * Transmit handle for multicast.
303    */
304   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
305
306   /**
307    * Incoming join requests from multicast.
308    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
309    */
310   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
311
312   /**
313    * Last message ID transmitted to this channel.
314    *
315    * Incremented before sending a message, thus the message_id in messages sent
316    * starts from 1.
317    */
318   uint64_t max_message_id;
319
320   /**
321    * ID of the last message with state operations transmitted to the channel.
322    * 0 if there is no such message.
323    */
324   uint64_t max_state_message_id;
325
326   /**
327    * Maximum group generation transmitted to the channel.
328    */
329   uint64_t max_group_generation;
330
331   /**
332    * @see enum GNUNET_PSYC_Policy
333    */
334   enum GNUNET_PSYC_Policy policy;
335 };
336
337
338 /**
339  * Client context for a channel slave.
340  */
341 struct Slave
342 {
343   /**
344    * Channel struct common for Master and Slave
345    */
346   struct Channel chn;
347
348   /**
349    * Private key of the slave.
350    */
351   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
352
353   /**
354    * Public key of the slave.
355    */
356   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
357
358   /**
359    * Hash of @a pub_key.
360    */
361   struct GNUNET_HashCode pub_key_hash;
362
363   /**
364    * Handle for the multicast member.
365    */
366   struct GNUNET_MULTICAST_Member *member;
367
368   /**
369    * Transmit handle for multicast.
370    */
371   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
372
373   /**
374    * Peer identity of the origin.
375    */
376   struct GNUNET_PeerIdentity origin;
377
378   /**
379    * Number of items in @a relays.
380    */
381   uint32_t relay_count;
382
383   /**
384    * Relays that multicast can use to connect.
385    */
386   struct GNUNET_PeerIdentity *relays;
387
388   /**
389    * Join request to be transmitted to the master on join.
390    */
391   struct GNUNET_PSYC_Message *join_msg;
392
393   /**
394    * Join decision received from multicast.
395    */
396   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
397
398   /**
399    * Maximum request ID for this channel.
400    */
401   uint64_t max_request_id;
402 };
403
404
405 static void
406 transmit_message (struct Channel *chn);
407
408
409 static uint64_t
410 message_queue_drop (struct Channel *chn);
411
412
413 /**
414  * Task run during shutdown.
415  *
416  * @param cls unused
417  * @param tc unused
418  */
419 static void
420 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
421 {
422   if (NULL != nc)
423   {
424     GNUNET_SERVER_notification_context_destroy (nc);
425     nc = NULL;
426   }
427   if (NULL != stats)
428   {
429     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
430     stats = NULL;
431   }
432 }
433
434
435 /**
436  * Clean up master data structures after a client disconnected.
437  */
438 static void
439 cleanup_master (struct Master *mst)
440 {
441   struct Channel *chn = &mst->chn;
442
443   if (NULL != mst->origin)
444     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
445   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
446   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
447 }
448
449
450 /**
451  * Clean up slave data structures after a client disconnected.
452  */
453 static void
454 cleanup_slave (struct Slave *slv)
455 {
456   struct Channel *chn = &slv->chn;
457   struct GNUNET_CONTAINER_MultiHashMap *
458     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
459                                                 &chn->pub_key_hash);
460   GNUNET_assert (NULL != chn_slv);
461   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
462
463   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
464   {
465     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
466                                           chn_slv);
467     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
468   }
469   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
470
471   if (NULL != slv->join_msg)
472   {
473     GNUNET_free (slv->join_msg);
474     slv->join_msg = NULL;
475   }
476   if (NULL != slv->relays)
477   {
478     GNUNET_free (slv->relays);
479     slv->relays = NULL;
480   }
481   if (NULL != slv->member)
482   {
483     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
484     slv->member = NULL;
485   }
486   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
487 }
488
489
490 /**
491  * Clean up channel data structures after a client disconnected.
492  */
493 static void
494 cleanup_channel (struct Channel *chn)
495 {
496   message_queue_drop (chn);
497   GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
498
499   if (NULL != chn->store_op)
500   {
501     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
502     chn->store_op = NULL;
503   }
504
505   (GNUNET_YES == chn->is_master)
506     ? cleanup_master ((struct Master *) chn)
507     : cleanup_slave ((struct Slave *) chn);
508   GNUNET_free (chn);
509 }
510
511
512 /**
513  * Called whenever a client is disconnected.
514  * Frees our resources associated with that client.
515  *
516  * @param cls Closure.
517  * @param client Identification of the client.
518  */
519 static void
520 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
521 {
522   if (NULL == client)
523     return;
524
525   struct Channel *
526     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
527
528   if (NULL == chn)
529   {
530     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
531                 "%p User context is NULL in client_disconnect()\n", chn);
532     GNUNET_break (0);
533     return;
534   }
535
536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
537               "%p Client (%s) disconnected from channel %s\n",
538               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
539               GNUNET_h2s (&chn->pub_key_hash));
540
541   struct ClientListItem *cli = chn->clients_head;
542   while (NULL != cli)
543   {
544     if (cli->client == client)
545     {
546       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
547       GNUNET_free (cli);
548       break;
549     }
550     cli = cli->next;
551   }
552
553   if (NULL == chn->clients_head)
554   { /* Last client disconnected. */
555     if (NULL != chn->tmit_head)
556     { /* Send pending messages to multicast before cleanup. */
557       transmit_message (chn);
558     }
559     else
560     {
561       cleanup_channel (chn);
562     }
563   }
564 }
565
566
567 /**
568  * Send message to all clients connected to the channel.
569  */
570 static void
571 client_send_msg (const struct Channel *chn,
572                  const struct GNUNET_MessageHeader *msg)
573 {
574   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
575               "%p Sending message to clients.\n", chn);
576
577   struct ClientListItem *cli = chn->clients_head;
578   while (NULL != cli)
579   {
580     GNUNET_SERVER_notification_context_add (nc, cli->client);
581     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
582     cli = cli->next;
583   }
584 }
585
586
587 /**
588  * Closure for join_mem_test_cb()
589  */
590 struct JoinMemTestClosure
591 {
592   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
593   struct Channel *chn;
594   struct GNUNET_MULTICAST_JoinHandle *jh;
595   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
596 };
597
598
599 /**
600  * Membership test result callback used for join requests.
601  */
602 static void
603 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
604 {
605   struct JoinMemTestClosure *jcls = cls;
606
607   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
608   { /* Pass on join request to client if this is a master channel */
609     struct Master *mst = (struct Master *) jcls->chn;
610     struct GNUNET_HashCode slave_key_hash;
611     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
612                         &slave_key_hash);
613     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
614                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
615     client_send_msg (jcls->chn, &jcls->join_msg->header);
616   }
617   else
618   {
619     // FIXME: add relays
620     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
621   }
622   GNUNET_free (jcls->join_msg);
623   GNUNET_free (jcls);
624 }
625
626
627 /**
628  * Incoming join request from multicast.
629  */
630 static void
631 mcast_recv_join_request (void *cls,
632                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
633                          const struct GNUNET_MessageHeader *join_msg,
634                          struct GNUNET_MULTICAST_JoinHandle *jh)
635 {
636   struct Channel *chn = cls;
637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
638
639   uint16_t join_msg_size = 0;
640   if (NULL != join_msg)
641   {
642     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
643     {
644       join_msg_size = ntohs (join_msg->size);
645     }
646     else
647     {
648       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
649                   "%p Got join message with invalid type %u.\n",
650                   chn, ntohs (join_msg->type));
651     }
652   }
653
654   struct GNUNET_PSYC_JoinRequestMessage *
655     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
656   req->header.size = htons (sizeof (*req) + join_msg_size);
657   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
658   req->slave_key = *slave_key;
659   if (0 < join_msg_size)
660     memcpy (&req[1], join_msg, join_msg_size);
661
662   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
663   jcls->slave_key = *slave_key;
664   jcls->chn = chn;
665   jcls->jh = jh;
666   jcls->join_msg = req;
667
668   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
669                                     chn->max_message_id, 0,
670                                     &join_mem_test_cb, jcls);
671 }
672
673
674 /**
675  * Join decision received from multicast.
676  */
677 static void
678 mcast_recv_join_decision (void *cls, int is_admitted,
679                           const struct GNUNET_PeerIdentity *peer,
680                           uint16_t relay_count,
681                           const struct GNUNET_PeerIdentity *relays,
682                           const struct GNUNET_MessageHeader *join_resp)
683 {
684   struct Slave *slv = cls;
685   struct Channel *chn = &slv->chn;
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687               "%p Got join decision: %d\n", slv, is_admitted);
688
689   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
690   struct GNUNET_PSYC_JoinDecisionMessage *
691     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
692   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
693   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
694   dcsn->is_admitted = htonl (is_admitted);
695   if (0 < join_resp_size)
696     memcpy (&dcsn[1], join_resp, join_resp_size);
697
698   client_send_msg (chn, &dcsn->header);
699
700   if (GNUNET_YES == is_admitted)
701   {
702     chn->is_ready = GNUNET_YES;
703   }
704   else
705   {
706     slv->member = NULL;
707   }
708 }
709
710
711 /**
712  * Received result of GNUNET_PSYCSTORE_membership_test()
713  */
714 static void
715 store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
716 {
717   struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719               "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
720               mth, result, err_msg);
721
722   GNUNET_MULTICAST_membership_test_result (mth, result);
723 }
724
725
726 /**
727  * Incoming membership test request from multicast.
728  */
729 static void
730 mcast_recv_membership_test (void *cls,
731                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
732                             uint64_t message_id, uint64_t group_generation,
733                             struct GNUNET_MULTICAST_MembershipTestHandle *mth)
734 {
735   struct Channel *chn = cls;
736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737               "%p Received membership test request from multicast.\n",
738               mth);
739   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
740                                     message_id, group_generation,
741                                     &store_recv_membership_test_result, mth);
742 }
743
744
745 static int
746 store_recv_fragment_replay (void *cls,
747                             struct GNUNET_MULTICAST_MessageHeader *msg,
748                             enum GNUNET_PSYCSTORE_MessageFlags flags)
749 {
750   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
751
752   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
753   return GNUNET_YES;
754 }
755
756
757 /**
758  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
759  */
760 static void
761 store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
762 {
763   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
766               rh, result, err_msg);
767
768   switch (result)
769   {
770   case GNUNET_YES:
771     break;
772
773   case GNUNET_NO:
774     GNUNET_MULTICAST_replay_response (rh, NULL,
775                                       GNUNET_MULTICAST_REC_NOT_FOUND);
776     break;
777
778   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
779     GNUNET_MULTICAST_replay_response (rh, NULL,
780                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
781
782   case GNUNET_SYSERR:
783     GNUNET_MULTICAST_replay_response (rh, NULL,
784                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
785     break;
786   }
787   GNUNET_MULTICAST_replay_response_end (rh);
788 }
789
790
791 /**
792  * Incoming fragment replay request from multicast.
793  */
794 static void
795 mcast_recv_replay_fragment (void *cls,
796                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
797                             uint64_t fragment_id, uint64_t flags,
798                             struct GNUNET_MULTICAST_ReplayHandle *rh)
799
800 {
801   struct Channel *chn = cls;
802   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id,
803                                  &store_recv_fragment_replay,
804                                  &store_recv_fragment_replay_result, rh);
805 }
806
807
808 /**
809  * Incoming message replay request from multicast.
810  */
811 static void
812 mcast_recv_replay_message (void *cls,
813                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
814                            uint64_t message_id,
815                            uint64_t fragment_offset,
816                            uint64_t flags,
817                            struct GNUNET_MULTICAST_ReplayHandle *rh)
818 {
819   struct Channel *chn = cls;
820   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id,
821                                 &store_recv_fragment_replay,
822                                 &store_recv_fragment_replay_result, rh);
823 }
824
825
826 /**
827  * Convert an uint64_t in network byte order to a HashCode
828  * that can be used as key in a MultiHashMap
829  */
830 static inline void
831 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
832 {
833   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
834   /* TODO: use built-in byte swap functions if available */
835
836   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
837   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
838
839   *key = (struct GNUNET_HashCode) {};
840   *((uint64_t *) key)
841     = (n << 32) | (n >> 32);
842 }
843
844
845 /**
846  * Convert an uint64_t in host byte order to a HashCode
847  * that can be used as key in a MultiHashMap
848  */
849 static inline void
850 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
851 {
852 #if __BYTE_ORDER == __BIG_ENDIAN
853   hash_key_from_nll (key, n);
854 #elif __BYTE_ORDER == __LITTLE_ENDIAN
855   *key = (struct GNUNET_HashCode) {};
856   *((uint64_t *) key) = n;
857 #else
858   #error byteorder undefined
859 #endif
860 }
861
862
863 /**
864  * Send multicast message to all clients connected to the channel.
865  */
866 static void
867 client_send_mcast_msg (struct Channel *chn,
868                        const struct GNUNET_MULTICAST_MessageHeader *mmsg)
869 {
870   struct GNUNET_PSYC_MessageHeader *pmsg;
871   uint16_t size = ntohs (mmsg->header.size);
872   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
873
874   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875               "%p Sending multicast message to client. "
876               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
877               chn, GNUNET_ntohll (mmsg->fragment_id),
878               GNUNET_ntohll (mmsg->message_id));
879
880   pmsg = GNUNET_malloc (psize);
881   pmsg->header.size = htons (psize);
882   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
883   pmsg->message_id = mmsg->message_id;
884   pmsg->fragment_offset = mmsg->fragment_offset;
885
886   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
887   client_send_msg (chn, &pmsg->header);
888   GNUNET_free (pmsg);
889 }
890
891
892 /**
893  * Send multicast request to all clients connected to the channel.
894  */
895 static void
896 client_send_mcast_req (struct Master *mst,
897                        const struct GNUNET_MULTICAST_RequestHeader *req)
898 {
899   struct Channel *chn = &mst->chn;
900
901   struct GNUNET_PSYC_MessageHeader *pmsg;
902   uint16_t size = ntohs (req->header.size);
903   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
904
905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906               "%p Sending multicast request to client. "
907               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
908               chn, GNUNET_ntohll (req->fragment_id),
909               GNUNET_ntohll (req->request_id));
910
911   pmsg = GNUNET_malloc (psize);
912   pmsg->header.size = htons (psize);
913   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
914   pmsg->message_id = req->request_id;
915   pmsg->fragment_offset = req->fragment_offset;
916   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
917
918   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
919   client_send_msg (chn, &pmsg->header);
920   GNUNET_free (pmsg);
921 }
922
923
924 /**
925  * Insert a multicast message fragment into the queue belonging to the message.
926  *
927  * @param chn           Channel.
928  * @param mmsg         Multicast message fragment.
929  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
930  * @param first_ptype  First PSYC message part type in @a mmsg.
931  * @param last_ptype   Last PSYC message part type in @a mmsg.
932  */
933 static void
934 fragment_queue_insert (struct Channel *chn,
935                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
936                        uint16_t first_ptype, uint16_t last_ptype)
937 {
938   const uint16_t size = ntohs (mmsg->header.size);
939   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
940   struct GNUNET_CONTAINER_MultiHashMap
941     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
942                                                     &chn->pub_key_hash);
943
944   struct GNUNET_HashCode msg_id_hash;
945   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
946
947   struct FragmentQueue
948     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
949
950   if (NULL == fragq)
951   {
952     fragq = GNUNET_new (struct FragmentQueue);
953     fragq->state = MSG_FRAG_STATE_HEADER;
954     fragq->fragments
955       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
956
957     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
958                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
959
960     if (NULL == chan_msgs)
961     {
962       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
963       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
964                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
965     }
966   }
967
968   struct GNUNET_HashCode frag_id_hash;
969   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
970   struct RecvCacheEntry
971     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
972   if (NULL == cache_entry)
973   {
974     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                 "%p Adding message fragment to cache. "
976                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
977                 chn, GNUNET_ntohll (mmsg->message_id),
978                 GNUNET_ntohll (mmsg->fragment_id));
979     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                 "%p header_size: %" PRIu64 " + %u\n",
981                 chn, fragq->header_size, size);
982     cache_entry = GNUNET_new (struct RecvCacheEntry);
983     cache_entry->ref_count = 1;
984     cache_entry->mmsg = GNUNET_malloc (size);
985     memcpy (cache_entry->mmsg, mmsg, size);
986     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
987                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
988   }
989   else
990   {
991     cache_entry->ref_count++;
992     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993                 "%p Message fragment is already in cache. "
994                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
995                 ", ref_count: %u\n",
996                 chn, GNUNET_ntohll (mmsg->message_id),
997                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
998   }
999
1000   if (MSG_FRAG_STATE_HEADER == fragq->state)
1001   {
1002     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1003     {
1004       struct GNUNET_PSYC_MessageMethod *
1005         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1006       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1007       fragq->flags = ntohl (pmeth->flags);
1008     }
1009
1010     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1011     {
1012       fragq->header_size += size;
1013     }
1014     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1015              || frag_offset == fragq->header_size)
1016     { /* header is now complete */
1017       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1018                   "%p Header of message %" PRIu64 " is complete.\n",
1019                   chn, GNUNET_ntohll (mmsg->message_id));
1020
1021       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1022                   "%p Adding message %" PRIu64 " to queue.\n",
1023                   chn, GNUNET_ntohll (mmsg->message_id));
1024       fragq->state = MSG_FRAG_STATE_DATA;
1025     }
1026     else
1027     {
1028       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1029                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1030                   "%" PRIu64 " != %" PRIu64 "\n",
1031                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1032                   fragq->header_size);
1033     }
1034   }
1035
1036   switch (last_ptype)
1037   {
1038   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1039     if (frag_offset == fragq->size)
1040       fragq->state = MSG_FRAG_STATE_END;
1041     else
1042       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1043                   "%p Message %" PRIu64 " is NOT complete yet: "
1044                   "%" PRIu64 " != %" PRIu64 "\n",
1045                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1046                   fragq->size);
1047     break;
1048
1049   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1050     /* Drop message without delivering to client if it's a single fragment */
1051     fragq->state =
1052       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1053       ? MSG_FRAG_STATE_DROP
1054       : MSG_FRAG_STATE_CANCEL;
1055   }
1056
1057   switch (fragq->state)
1058   {
1059   case MSG_FRAG_STATE_DATA:
1060   case MSG_FRAG_STATE_END:
1061   case MSG_FRAG_STATE_CANCEL:
1062     if (GNUNET_NO == fragq->is_queued)
1063     {
1064       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1065                                     GNUNET_ntohll (mmsg->message_id));
1066       fragq->is_queued = GNUNET_YES;
1067     }
1068   }
1069
1070   fragq->size += size;
1071   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1072                                 GNUNET_ntohll (mmsg->fragment_id));
1073 }
1074
1075
1076 /**
1077  * Run fragment queue of a message.
1078  *
1079  * Send fragments of a message in order to client, after all modifiers arrived
1080  * from multicast.
1081  *
1082  * @param chn      Channel.
1083  * @param msg_id  ID of the message @a fragq belongs to.
1084  * @param fragq   Fragment queue of the message.
1085  * @param drop    Drop message without delivering to client?
1086  *                #GNUNET_YES or #GNUNET_NO.
1087  */
1088 static void
1089 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1090                     struct FragmentQueue *fragq, uint8_t drop)
1091 {
1092   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1093               "%p Running message fragment queue for message %" PRIu64
1094               " (state: %u).\n",
1095               chn, msg_id, fragq->state);
1096
1097   struct GNUNET_CONTAINER_MultiHashMap
1098     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1099                                                     &chn->pub_key_hash);
1100   GNUNET_assert (NULL != chan_msgs);
1101   uint64_t frag_id;
1102
1103   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1104                                                     &frag_id))
1105   {
1106     struct GNUNET_HashCode frag_id_hash;
1107     hash_key_from_hll (&frag_id_hash, frag_id);
1108     struct RecvCacheEntry *cache_entry
1109       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1110     if (cache_entry != NULL)
1111     {
1112       if (GNUNET_NO == drop)
1113       {
1114         client_send_mcast_msg (chn, cache_entry->mmsg);
1115       }
1116       if (cache_entry->ref_count <= 1)
1117       {
1118         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1119                                               cache_entry);
1120         GNUNET_free (cache_entry->mmsg);
1121         GNUNET_free (cache_entry);
1122       }
1123       else
1124       {
1125         cache_entry->ref_count--;
1126       }
1127     }
1128 #if CACHE_AGING_IMPLEMENTED
1129     else if (GNUNET_NO == drop)
1130     {
1131       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1132     }
1133 #endif
1134
1135     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1136   }
1137
1138   if (MSG_FRAG_STATE_END <= fragq->state)
1139   {
1140     struct GNUNET_HashCode msg_id_hash;
1141     hash_key_from_hll (&msg_id_hash, msg_id);
1142
1143     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1144     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1145     GNUNET_free (fragq);
1146   }
1147   else
1148   {
1149     fragq->is_queued = GNUNET_NO;
1150   }
1151 }
1152
1153
1154 /**
1155  * Run message queue.
1156  *
1157  * Send messages in queue to client in order after a message has arrived from
1158  * multicast, according to the following:
1159  * - A message is only sent if all of its modifiers arrived.
1160  * - A stateful message is only sent if the previous stateful message
1161  *   has already been delivered to the client.
1162  *
1163  * @param chn  Channel.
1164  *
1165  * @return Number of messages removed from queue and sent to client.
1166  */
1167 static uint64_t
1168 message_queue_run (struct Channel *chn)
1169 {
1170   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171               "%p Running message queue.\n", chn);
1172   uint64_t n = 0;
1173   uint64_t msg_id;
1174   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1175                                                     &msg_id))
1176   {
1177     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1178                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1179     struct GNUNET_HashCode msg_id_hash;
1180     hash_key_from_hll (&msg_id_hash, msg_id);
1181
1182     struct FragmentQueue *
1183       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1184
1185     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1186     {
1187       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188                   "%p No fragq (%p) or header not complete.\n",
1189                   chn, fragq);
1190       break;
1191     }
1192
1193     if (MSG_FRAG_STATE_HEADER == fragq->state)
1194     {
1195       /* Check if there's a missing message before the current one */
1196       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1197       {
1198         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1199             && msg_id - 1 != chn->max_message_id)
1200         {
1201           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1202                       "%p Out of order message. "
1203                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1204                       chn, msg_id, chn->max_message_id);
1205           break;
1206         }
1207       }
1208       else
1209       {
1210         if (msg_id - fragq->state_delta != chn->max_state_message_id)
1211         {
1212           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1213                       "%p Out of order stateful message. "
1214                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1215                       chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1216           break;
1217         }
1218 #if TODO
1219         /* FIXME: apply modifiers to state in PSYCstore */
1220         GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1221                                        store_recv_state_modify_result, cls);
1222 #endif
1223         chn->max_state_message_id = msg_id;
1224       }
1225       chn->max_message_id = msg_id;
1226     }
1227     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1228     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1229     n++;
1230   }
1231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1232               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1233   return n;
1234 }
1235
1236
1237 /**
1238  * Drop message queue of a channel.
1239  *
1240  * Remove all messages in queue without sending it to clients.
1241  *
1242  * @param chn  Channel.
1243  *
1244  * @return Number of messages removed from queue.
1245  */
1246 static uint64_t
1247 message_queue_drop (struct Channel *chn)
1248 {
1249   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1250               "%p Dropping message queue.\n", chn);
1251   uint64_t n = 0;
1252   uint64_t msg_id;
1253   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1254                                                     &msg_id))
1255   {
1256     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1257                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1258     struct GNUNET_HashCode msg_id_hash;
1259     hash_key_from_hll (&msg_id_hash, msg_id);
1260
1261     struct FragmentQueue *
1262       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1263
1264     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1265     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1266     n++;
1267   }
1268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1270   return n;
1271 }
1272
1273
1274 /**
1275  * Received result of GNUNET_PSYCSTORE_fragment_store().
1276  */
1277 static void
1278 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1279 {
1280   struct Channel *chn = cls;
1281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1283               chn, result, err_msg);
1284 }
1285
1286
1287 /**
1288  * Handle incoming message fragment from multicast.
1289  *
1290  * Store it using PSYCstore and send it to the clients of the channel in order.
1291  */
1292 static void
1293 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1294 {
1295   struct Channel *chn = cls;
1296   uint16_t size = ntohs (mmsg->header.size);
1297
1298   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299               "%p Received multicast message of size %u.\n",
1300               chn, size);
1301
1302   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1303                                    &store_recv_fragment_store_result, chn);
1304
1305   uint16_t first_ptype = 0, last_ptype = 0;
1306   if (GNUNET_SYSERR
1307       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1308                                           (const char *) &mmsg[1],
1309                                           &first_ptype, &last_ptype))
1310   {
1311     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1312                 "%p Dropping incoming multicast message with invalid parts.\n",
1313                 chn);
1314     GNUNET_break_op (0);
1315     return;
1316   }
1317
1318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319               "Message parts: first: type %u, last: type %u\n",
1320               first_ptype, last_ptype);
1321
1322   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1323   message_queue_run (chn);
1324 }
1325
1326
1327 /**
1328  * Incoming request fragment from multicast for a master.
1329  *
1330  * @param cls   Master.
1331  * @param req   The request.
1332  */
1333 static void
1334 mcast_recv_request (void *cls,
1335                     const struct GNUNET_MULTICAST_RequestHeader *req)
1336 {
1337   struct Master *mst = cls;
1338   uint16_t size = ntohs (req->header.size);
1339
1340   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1341               "%p Received multicast request of size %u.\n",
1342               mst, size);
1343
1344   uint16_t first_ptype = 0, last_ptype = 0;
1345   if (GNUNET_SYSERR
1346       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1347                                           (const char *) &req[1],
1348                                           &first_ptype, &last_ptype))
1349   {
1350     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1351                 "%p Dropping incoming multicast request with invalid parts.\n",
1352                 mst);
1353     GNUNET_break_op (0);
1354     return;
1355   }
1356
1357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1358               "Message parts: first: type %u, last: type %u\n",
1359               first_ptype, last_ptype);
1360
1361   /* FIXME: in-order delivery */
1362   client_send_mcast_req (mst, req);
1363 }
1364
1365
1366 /**
1367  * Response from PSYCstore with the current counter values for a channel master.
1368  */
1369 static void
1370 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1371                             uint64_t max_message_id, uint64_t max_group_generation,
1372                             uint64_t max_state_message_id)
1373 {
1374   struct Master *mst = cls;
1375   struct Channel *chn = &mst->chn;
1376   chn->store_op = NULL;
1377
1378   struct CountersResult res;
1379   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1380   res.header.size = htons (sizeof (res));
1381   res.result_code = htonl (result);
1382   res.max_message_id = GNUNET_htonll (max_message_id);
1383
1384   if (GNUNET_OK == result || GNUNET_NO == result)
1385   {
1386     mst->max_message_id = max_message_id;
1387     chn->max_message_id = max_message_id;
1388     chn->max_state_message_id = max_state_message_id;
1389     mst->max_group_generation = max_group_generation;
1390     mst->origin
1391       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1392                                        &mcast_recv_join_request,
1393                                        &mcast_recv_membership_test,
1394                                        &mcast_recv_replay_fragment,
1395                                        &mcast_recv_replay_message,
1396                                        &mcast_recv_request,
1397                                        &mcast_recv_message, chn);
1398     chn->is_ready = GNUNET_YES;
1399   }
1400   else
1401   {
1402     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1403                 "%p GNUNET_PSYCSTORE_counters_get() "
1404                 "returned %d for channel %s.\n",
1405                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1406   }
1407
1408   client_send_msg (chn, &res.header);
1409 }
1410
1411
1412 /**
1413  * Response from PSYCstore with the current counter values for a channel slave.
1414  */
1415 void
1416 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1417                            uint64_t max_message_id, uint64_t max_group_generation,
1418                            uint64_t max_state_message_id)
1419 {
1420   struct Slave *slv = cls;
1421   struct Channel *chn = &slv->chn;
1422   chn->store_op = NULL;
1423
1424   struct CountersResult res;
1425   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1426   res.header.size = htons (sizeof (res));
1427   res.result_code = htonl (result);
1428   res.max_message_id = GNUNET_htonll (max_message_id);
1429
1430   if (GNUNET_OK == result || GNUNET_NO == result)
1431   {
1432     chn->max_message_id = max_message_id;
1433     chn->max_state_message_id = max_state_message_id;
1434     slv->member
1435       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1436                                       &slv->origin,
1437                                       slv->relay_count, slv->relays,
1438                                       &slv->join_msg->header,
1439                                       &mcast_recv_join_request,
1440                                       &mcast_recv_join_decision,
1441                                       &mcast_recv_membership_test,
1442                                       &mcast_recv_replay_fragment,
1443                                       &mcast_recv_replay_message,
1444                                       &mcast_recv_message, chn);
1445     if (NULL != slv->join_msg)
1446     {
1447       GNUNET_free (slv->join_msg);
1448       slv->join_msg = NULL;
1449     }
1450   }
1451   else
1452   {
1453     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1454                 "%p GNUNET_PSYCSTORE_counters_get() "
1455                 "returned %d for channel %s.\n",
1456                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1457   }
1458
1459   client_send_msg (chn, &res.header);
1460 }
1461
1462
1463 static void
1464 channel_init (struct Channel *chn)
1465 {
1466   chn->recv_msgs
1467     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1468   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1469 }
1470
1471
1472 /**
1473  * Handle a connecting client starting a channel master.
1474  */
1475 static void
1476 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1477                           const struct GNUNET_MessageHeader *msg)
1478 {
1479   const struct MasterStartRequest *req
1480     = (const struct MasterStartRequest *) msg;
1481
1482   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1483   struct GNUNET_HashCode pub_key_hash;
1484
1485   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1486   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1487
1488   struct Master *
1489     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1490   struct Channel *chn;
1491
1492   if (NULL == mst)
1493   {
1494     mst = GNUNET_new (struct Master);
1495     mst->policy = ntohl (req->policy);
1496     mst->priv_key = req->channel_key;
1497     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1498
1499     chn = &mst->chn;
1500     chn->is_master = GNUNET_YES;
1501     chn->pub_key = pub_key;
1502     chn->pub_key_hash = pub_key_hash;
1503     channel_init (chn);
1504
1505     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1506                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1507     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1508                                                    store_recv_master_counters, mst);
1509   }
1510   else
1511   {
1512     chn = &mst->chn;
1513
1514     struct CountersResult res;
1515     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1516     res.header.size = htons (sizeof (res));
1517     res.result_code = htonl (GNUNET_OK);
1518     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1519
1520     GNUNET_SERVER_notification_context_add (nc, client);
1521     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1522                                                 GNUNET_NO);
1523   }
1524
1525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526               "%p Client connected as master to channel %s.\n",
1527               mst, GNUNET_h2s (&chn->pub_key_hash));
1528
1529   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1530   cli->client = client;
1531   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1532
1533   GNUNET_SERVER_client_set_user_context (client, chn);
1534   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1535 }
1536
1537
1538 /**
1539  * Handle a connecting client joining as a channel slave.
1540  */
1541 static void
1542 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1543                         const struct GNUNET_MessageHeader *msg)
1544 {
1545   const struct SlaveJoinRequest *req
1546     = (const struct SlaveJoinRequest *) msg;
1547   uint16_t req_size = ntohs (req->header.size);
1548
1549   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1550   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1551
1552   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1553   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1554   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1555
1556   struct GNUNET_CONTAINER_MultiHashMap *
1557     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1558   struct Slave *slv = NULL;
1559   struct Channel *chn;
1560
1561   if (NULL != chn_slv)
1562   {
1563     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1564   }
1565   if (NULL == slv)
1566   {
1567     slv = GNUNET_new (struct Slave);
1568     slv->priv_key = req->slave_key;
1569     slv->pub_key = slv_pub_key;
1570     slv->pub_key_hash = slv_pub_key_hash;
1571     slv->origin = req->origin;
1572     slv->relay_count = ntohl (req->relay_count);
1573
1574     const struct GNUNET_PeerIdentity *
1575       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1576     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1577     uint16_t join_msg_size = 0;
1578
1579     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1580         <= req_size)
1581     {
1582       join_msg_size = ntohs (slv->join_msg->header.size);
1583       slv->join_msg = GNUNET_malloc (join_msg_size);
1584       memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1585     }
1586     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1587     {
1588       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1589                   "%u + %u + %u != %u\n",
1590                   sizeof (*req), relay_size, join_msg_size, req_size);
1591       GNUNET_break (0);
1592       GNUNET_SERVER_client_disconnect (client);
1593       return;
1594     }
1595     if (0 < slv->relay_count)
1596     {
1597       slv->relays = GNUNET_malloc (relay_size);
1598       memcpy (slv->relays, &req[1], relay_size);
1599     }
1600
1601     chn = &slv->chn;
1602     chn->is_master = GNUNET_NO;
1603     chn->pub_key = req->channel_key;
1604     chn->pub_key_hash = pub_key_hash;
1605     channel_init (chn);
1606
1607     if (NULL == chn_slv)
1608     {
1609       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1610       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1611                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1612     }
1613     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1614                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1615     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1616                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1617     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1618                                                   &store_recv_slave_counters, slv);
1619   }
1620   else
1621   {
1622     chn = &slv->chn;
1623
1624     struct CountersResult res;
1625     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1626     res.header.size = htons (sizeof (res));
1627     res.result_code = htonl (GNUNET_OK);
1628     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1629
1630     GNUNET_SERVER_notification_context_add (nc, client);
1631     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1632                                                 GNUNET_NO);
1633
1634     if (NULL == slv->member)
1635     {
1636       slv->member
1637         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1638                                         &slv->origin,
1639                                         slv->relay_count, slv->relays,
1640                                         &slv->join_msg->header,
1641                                         &mcast_recv_join_request,
1642                                         &mcast_recv_join_decision,
1643                                         &mcast_recv_membership_test,
1644                                         &mcast_recv_replay_fragment,
1645                                         &mcast_recv_replay_message,
1646                                         &mcast_recv_message, chn);
1647       if (NULL != slv->join_msg)
1648       {
1649         GNUNET_free (slv->join_msg);
1650         slv->join_msg = NULL;
1651       }
1652     }
1653     else if (NULL != slv->join_dcsn)
1654     {
1655       GNUNET_SERVER_notification_context_add (nc, client);
1656       GNUNET_SERVER_notification_context_unicast (nc, client,
1657                                                   &slv->join_dcsn->header,
1658                                                   GNUNET_NO);
1659     }
1660   }
1661
1662   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663               "%p Client connected as slave to channel %s.\n",
1664               slv, GNUNET_h2s (&chn->pub_key_hash));
1665
1666   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1667   cli->client = client;
1668   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1669
1670   GNUNET_SERVER_client_set_user_context (client, chn);
1671   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1672 }
1673
1674
1675 struct JoinDecisionClosure
1676 {
1677   int32_t is_admitted;
1678   struct GNUNET_MessageHeader *msg;
1679 };
1680
1681
1682 /**
1683  * Iterator callback for sending join decisions to multicast.
1684  */
1685 static int
1686 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1687                           void *value)
1688 {
1689   struct JoinDecisionClosure *jcls = cls;
1690   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1691   // FIXME: add relays
1692   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1693   return GNUNET_YES;
1694 }
1695
1696
1697 /**
1698  * Join decision from client.
1699  */
1700 static void
1701 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1702                            const struct GNUNET_MessageHeader *msg)
1703 {
1704   struct Channel *
1705     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1706   GNUNET_assert (GNUNET_YES == chn->is_master);
1707   struct Master *mst = (struct Master *) chn;
1708
1709   struct GNUNET_PSYC_JoinDecisionMessage *
1710     dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1711   struct JoinDecisionClosure jcls;
1712   jcls.is_admitted = ntohl (dcsn->is_admitted);
1713   jcls.msg
1714     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1715     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1716     : NULL;
1717
1718   struct GNUNET_HashCode slave_key_hash;
1719   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1720                       &slave_key_hash);
1721
1722   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1723               "%p Got join decision (%d) from client for channel %s..\n",
1724               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1726               "%p ..and slave %s.\n",
1727               mst, GNUNET_h2s (&slave_key_hash));
1728
1729   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1730                                               &mcast_send_join_decision, &jcls);
1731   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1732   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1733 }
1734
1735
1736 /**
1737  * Send acknowledgement to a client.
1738  *
1739  * Sent after a message fragment has been passed on to multicast.
1740  *
1741  * @param chn The channel struct for the client.
1742  */
1743 static void
1744 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1745 {
1746   struct GNUNET_MessageHeader res;
1747   res.size = htons (sizeof (res));
1748   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1749
1750   /* FIXME */
1751   GNUNET_SERVER_notification_context_add (nc, client);
1752   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1753 }
1754
1755
1756 /**
1757  * Callback for the transmit functions of multicast.
1758  */
1759 static int
1760 transmit_notify (void *cls, size_t *data_size, void *data)
1761 {
1762   struct Channel *chn = cls;
1763   struct TransmitMessage *tmit_msg = chn->tmit_head;
1764
1765   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1766   {
1767     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1768                 "%p transmit_notify: nothing to send.\n", chn);
1769     *data_size = 0;
1770     return GNUNET_NO;
1771   }
1772
1773   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1775
1776   *data_size = tmit_msg->size;
1777   memcpy (data, &tmit_msg[1], *data_size);
1778
1779   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1780
1781   if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1782     send_message_ack (chn, tmit_msg->client);
1783
1784   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1785   GNUNET_free (tmit_msg);
1786
1787   if (NULL != chn->tmit_head)
1788   {
1789     transmit_message (chn);
1790   }
1791   else if (GNUNET_YES == chn->is_disconnected)
1792   {
1793     /* FIXME: handle partial message (when still in_transmit) */
1794     cleanup_channel (chn);
1795   }
1796   return ret;
1797 }
1798
1799
1800 /**
1801  * Callback for the transmit functions of multicast.
1802  */
1803 static int
1804 master_transmit_notify (void *cls, size_t *data_size, void *data)
1805 {
1806   int ret = transmit_notify (cls, data_size, data);
1807
1808   if (GNUNET_YES == ret)
1809   {
1810     struct Master *mst = cls;
1811     mst->tmit_handle = NULL;
1812   }
1813   return ret;
1814 }
1815
1816
1817 /**
1818  * Callback for the transmit functions of multicast.
1819  */
1820 static int
1821 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1822 {
1823   int ret = transmit_notify (cls, data_size, data);
1824
1825   if (GNUNET_YES == ret)
1826   {
1827     struct Slave *slv = cls;
1828     slv->tmit_handle = NULL;
1829   }
1830   return ret;
1831 }
1832
1833
1834 /**
1835  * Transmit a message from a channel master to the multicast group.
1836  */
1837 static void
1838 master_transmit_message (struct Master *mst)
1839 {
1840   if (NULL == mst->tmit_handle)
1841   {
1842     mst->tmit_handle
1843       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1844                                         mst->max_group_generation,
1845                                         master_transmit_notify, mst);
1846   }
1847   else
1848   {
1849     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1850   }
1851 }
1852
1853
1854 /**
1855  * Transmit a message from a channel slave to the multicast group.
1856  */
1857 static void
1858 slave_transmit_message (struct Slave *slv)
1859 {
1860   if (NULL == slv->tmit_handle)
1861   {
1862     slv->tmit_handle
1863       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1864                                            slave_transmit_notify, slv);
1865   }
1866   else
1867   {
1868     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1869   }
1870 }
1871
1872
1873 static void
1874 transmit_message (struct Channel *chn)
1875 {
1876   chn->is_master
1877     ? master_transmit_message ((struct Master *) chn)
1878     : slave_transmit_message ((struct Slave *) chn);
1879 }
1880
1881
1882 /**
1883  * Queue a message from a channel master for sending to the multicast group.
1884  */
1885 static void
1886 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1887                      uint16_t first_ptype, uint16_t last_ptype)
1888 {
1889   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1890
1891   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1892   {
1893     tmit_msg->id = ++mst->max_message_id;
1894     struct GNUNET_PSYC_MessageMethod *pmeth
1895       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1896
1897     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1898     {
1899       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1900     }
1901     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1902     {
1903       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1904                                           - mst->max_state_message_id);
1905     }
1906     else
1907     {
1908       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1909     }
1910   }
1911 }
1912
1913
1914 /**
1915  * Queue a message from a channel slave for sending to the multicast group.
1916  */
1917 static void
1918 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1919                      uint16_t first_ptype, uint16_t last_ptype)
1920 {
1921   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1922   {
1923     struct GNUNET_PSYC_MessageMethod *pmeth
1924       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1925     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1926     tmit_msg->id = ++slv->max_request_id;
1927   }
1928 }
1929
1930
1931 /**
1932  * Queue PSYC message parts for sending to multicast.
1933  *
1934  * @param chn           Channel to send to.
1935  * @param client       Client the message originates from.
1936  * @param data_size    Size of @a data.
1937  * @param data         Concatenated message parts.
1938  * @param first_ptype  First message part type in @a data.
1939  * @param last_ptype   Last message part type in @a data.
1940  */
1941 static struct TransmitMessage *
1942 queue_message (struct Channel *chn,
1943                struct GNUNET_SERVER_Client *client,
1944                size_t data_size,
1945                const void *data,
1946                uint16_t first_ptype, uint16_t last_ptype)
1947 {
1948   struct TransmitMessage *
1949     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
1950   memcpy (&tmit_msg[1], data, data_size);
1951   tmit_msg->client = client;
1952   tmit_msg->size = data_size;
1953   tmit_msg->state = chn->tmit_state;
1954
1955   /* FIXME: separate queue per message ID */
1956
1957   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
1958
1959   chn->is_master
1960     ? master_queue_message ((struct Master *) chn, tmit_msg,
1961                             first_ptype, last_ptype)
1962     : slave_queue_message ((struct Slave *) chn, tmit_msg,
1963                            first_ptype, last_ptype);
1964   return tmit_msg;
1965 }
1966
1967
1968 /**
1969  * Cancel transmission of current message.
1970  *
1971  * @param chn     Channel to send to.
1972  * @param client  Client the message originates from.
1973  */
1974 static void
1975 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1976 {
1977   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1978
1979   struct GNUNET_MessageHeader msg;
1980   msg.size = htons (sizeof (msg));
1981   msg.type = htons (type);
1982
1983   queue_message (chn, client, sizeof (msg), &msg, type, type);
1984   transmit_message (chn);
1985
1986   /* FIXME: cleanup */
1987 }
1988
1989
1990 /**
1991  * Incoming message from a master or slave client.
1992  */
1993 static void
1994 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1995                           const struct GNUNET_MessageHeader *msg)
1996 {
1997   struct Channel *
1998     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1999   GNUNET_assert (NULL != chn);
2000
2001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002               "%p Received message from client.\n", chn);
2003   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2004
2005   if (GNUNET_YES != chn->is_ready)
2006   {
2007     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2008                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2009     GNUNET_break (0);
2010     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2011     return;
2012   }
2013
2014   uint16_t size = ntohs (msg->size);
2015   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2016   {
2017     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2018     GNUNET_break (0);
2019     transmit_cancel (chn, client);
2020     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2021     return;
2022   }
2023
2024   uint16_t first_ptype = 0, last_ptype = 0;
2025   if (GNUNET_SYSERR
2026       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2027                                           (const char *) &msg[1],
2028                                           &first_ptype, &last_ptype))
2029   {
2030     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2031                 "%p Received invalid message part from client.\n", chn);
2032     GNUNET_break (0);
2033     transmit_cancel (chn, client);
2034     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2035     return;
2036   }
2037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038               "%p Received message with first part type %u and last part type %u.\n",
2039               chn, first_ptype, last_ptype);
2040
2041   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2042                  first_ptype, last_ptype);
2043   transmit_message (chn);
2044   /* FIXME: send a few ACKs even before transmit_notify is called */
2045
2046   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2047 };
2048
2049
2050 /**
2051  * Received result of GNUNET_PSYCSTORE_membership_store()
2052  */
2053 static void
2054 store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2055 {
2056   struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
2057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2059               mth, result, err_msg);
2060   /* FIXME: send result to client */
2061 }
2062
2063
2064 /**
2065  * Client requests to add/remove a slave in the membership database.
2066  */
2067 static void
2068 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2069                               const struct GNUNET_MessageHeader *msg)
2070 {
2071   struct Channel *
2072     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2073   GNUNET_assert (NULL != chn);
2074
2075   const struct ChannelMembershipStoreRequest *
2076     req = (const struct ChannelMembershipStoreRequest *) msg;
2077
2078   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2079   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2080   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2081               "%p Received membership store request from client.\n", chn);
2082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2083               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2084               chn, req->did_join, announced_at, effective_since);
2085
2086   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2087                                      req->did_join, announced_at, effective_since,
2088                                      0, /* FIXME: group_generation */
2089                                      &store_recv_membership_store_result, chn);
2090   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2091 }
2092
2093
2094 /**
2095  * Client requests channel history from PSYCstore.
2096  */
2097 static void
2098 client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
2099                            const struct GNUNET_MessageHeader *msg)
2100 {
2101
2102 }
2103
2104
2105 /**
2106  * Client requests best matching state variable from PSYCstore.
2107  */
2108 static void
2109 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2110                        const struct GNUNET_MessageHeader *msg)
2111 {
2112
2113 }
2114
2115
2116 /**
2117  * Client requests state variables with a given prefix from PSYCstore.
2118  */
2119 static void
2120 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2121                               const struct GNUNET_MessageHeader *msg)
2122 {
2123
2124 }
2125
2126
2127 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2128   { &client_recv_master_start, NULL,
2129     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2130
2131   { &client_recv_slave_join, NULL,
2132     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2133
2134   { &client_recv_join_decision, NULL,
2135     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2136
2137   { &client_recv_psyc_message, NULL,
2138     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2139
2140   { &client_recv_membership_store, NULL,
2141     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2142
2143   { &client_recv_story_request, NULL,
2144     GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2145
2146   { &client_recv_state_get, NULL,
2147     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2148
2149   { &client_recv_state_get_prefix, NULL,
2150     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2151
2152   { NULL, NULL, 0, 0 }
2153 };
2154
2155
2156 /**
2157  * Initialize the PSYC service.
2158  *
2159  * @param cls Closure.
2160  * @param server The initialized server.
2161  * @param c Configuration to use.
2162  */
2163 static void
2164 run (void *cls, struct GNUNET_SERVER_Handle *server,
2165      const struct GNUNET_CONFIGURATION_Handle *c)
2166 {
2167   cfg = c;
2168   store = GNUNET_PSYCSTORE_connect (cfg);
2169   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2170   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2171   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2172   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2173   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2174   nc = GNUNET_SERVER_notification_context_create (server, 1);
2175   GNUNET_SERVER_add_handlers (server, server_handlers);
2176   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2177   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2178                                 &shutdown_task, NULL);
2179 }
2180
2181
2182 /**
2183  * The main function for the service.
2184  *
2185  * @param argc number of arguments from the command line
2186  * @param argv command line arguments
2187  * @return 0 ok, 1 on error
2188  */
2189 int
2190 main (int argc, char *const *argv)
2191 {
2192   return (GNUNET_OK ==
2193           GNUNET_SERVICE_run (argc, argv, "psyc",
2194                               GNUNET_SERVICE_OPTION_NONE,
2195                               &run, NULL)) ? 0 : 1;
2196 }
2197
2198 /* end of gnunet-service-psyc.c */