-bringing copyright tags up to FSF standard
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "gnunet_psyc_util_lib.h"
38 #include "psyc.h"
39
40
41 /**
42  * Handle to our current configuration.
43  */
44 static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47  * Handle to the statistics service.
48  */
49 static struct GNUNET_STATISTICS_Handle *stats;
50
51 /**
52  * Notification context, simplifies client broadcasts.
53  */
54 static struct GNUNET_SERVER_NotificationContext *nc;
55
56 /**
57  * Handle to the PSYCstore.
58  */
59 static struct GNUNET_PSYCSTORE_Handle *store;
60
61 /**
62  * All connected masters.
63  * Channel's pub_key_hash -> struct Master
64  */
65 static struct GNUNET_CONTAINER_MultiHashMap *masters;
66
67 /**
68  * All connected slaves.
69  * Channel's pub_key_hash -> struct Slave
70  */
71 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
72
73 /**
74  * Connected slaves per channel.
75  * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
76  */
77 static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
78
79
80 /**
81  * Message in the transmission queue.
82  */
83 struct TransmitMessage
84 {
85   struct TransmitMessage *prev;
86   struct TransmitMessage *next;
87
88   struct GNUNET_SERVER_Client *client;
89
90   /**
91    * ID assigned to the message.
92    */
93   uint64_t id;
94
95   /**
96    * Size of message.
97    */
98   uint16_t size;
99
100   /**
101    * @see enum MessageState
102    */
103   uint8_t state;
104
105   /**
106    * Whether a message ACK has already been sent to the client.
107    * #GNUNET_YES or #GNUNET_NO
108    */
109   uint8_t ack_sent;
110
111   /* Followed by message */
112 };
113
114
115 /**
116  * Cache for received message fragments.
117  * Message fragments are only sent to clients after all modifiers arrived.
118  *
119  * chan_key -> MultiHashMap chan_msgs
120  */
121 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
122
123
124 /**
125  * Entry in the chan_msgs hashmap of @a recv_cache:
126  * fragment_id -> RecvCacheEntry
127  */
128 struct RecvCacheEntry
129 {
130   struct GNUNET_MULTICAST_MessageHeader *mmsg;
131   uint16_t ref_count;
132 };
133
134
135 /**
136  * Entry in the @a recv_frags hash map of a @a Channel.
137  * message_id -> FragmentQueue
138  */
139 struct FragmentQueue
140 {
141   /**
142    * Fragment IDs stored in @a recv_cache.
143    */
144   struct GNUNET_CONTAINER_Heap *fragments;
145
146   /**
147    * Total size of received fragments.
148    */
149   uint64_t size;
150
151   /**
152    * Total size of received header fragments (METHOD & MODIFIERs)
153    */
154   uint64_t header_size;
155
156   /**
157    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
158    */
159   uint64_t state_delta;
160
161   /**
162    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
163    */
164   uint32_t flags;
165
166   /**
167    * Receive state of message.
168    *
169    * @see MessageFragmentState
170    */
171   uint8_t state;
172
173   /**
174    * Is the message queued for delivery to the client?
175    * i.e. added to the recv_msgs queue
176    */
177   uint8_t is_queued;
178 };
179
180
181 /**
182  * List of connected clients.
183  */
184 struct ClientListItem
185 {
186   struct ClientListItem *prev;
187   struct ClientListItem *next;
188   struct GNUNET_SERVER_Client *client;
189 };
190
191
192 /**
193  * Common part of the client context for both a channel master and slave.
194  */
195 struct Channel
196 {
197   struct ClientListItem *clients_head;
198   struct ClientListItem *clients_tail;
199
200   struct TransmitMessage *tmit_head;
201   struct TransmitMessage *tmit_tail;
202
203   /**
204    * Current PSYCstore operation.
205    */
206   struct GNUNET_PSYCSTORE_OperationHandle *store_op;
207
208   /**
209    * Received fragments not yet sent to the client.
210    * message_id -> FragmentQueue
211    */
212   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
213
214   /**
215    * Received message IDs not yet sent to the client.
216    */
217   struct GNUNET_CONTAINER_Heap *recv_msgs;
218
219   /**
220    * Public key of the channel.
221    */
222   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
223
224   /**
225    * Hash of @a pub_key.
226    */
227   struct GNUNET_HashCode pub_key_hash;
228
229   /**
230    * Last message ID sent to the client.
231    * 0 if there is no such message.
232    */
233   uint64_t max_message_id;
234
235   /**
236    * ID of the last stateful message, where the state operations has been
237    * processed and saved to PSYCstore and which has been sent to the client.
238    * 0 if there is no such message.
239    */
240   uint64_t max_state_message_id;
241
242   /**
243    * Expected value size for the modifier being received from the PSYC service.
244    */
245   uint32_t tmit_mod_value_size_expected;
246
247   /**
248    * Actual value size for the modifier being received from the PSYC service.
249    */
250   uint32_t tmit_mod_value_size;
251
252   /**
253    * @see enum MessageState
254    */
255   uint8_t tmit_state;
256
257   /**
258    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
259    */
260   uint8_t is_master;
261
262   /**
263    * Is this channel ready to receive messages from client?
264    * #GNUNET_YES or #GNUNET_NO
265    */
266   uint8_t is_ready;
267
268   /**
269    * Is the client disconnected?
270    * #GNUNET_YES or #GNUNET_NO
271    */
272   uint8_t is_disconnected;
273 };
274
275
276 /**
277  * Client context for a channel master.
278  */
279 struct Master
280 {
281   /**
282    * Channel struct common for Master and Slave
283    */
284   struct Channel chn;
285
286   /**
287    * Private key of the channel.
288    */
289   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
290
291   /**
292    * Handle for the multicast origin.
293    */
294   struct GNUNET_MULTICAST_Origin *origin;
295
296   /**
297    * Transmit handle for multicast.
298    */
299   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
300
301   /**
302    * Incoming join requests from multicast.
303    * member_key -> struct GNUNET_MULTICAST_JoinHandle *
304    */
305   struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
306
307   /**
308    * Last message ID transmitted to this channel.
309    *
310    * Incremented before sending a message, thus the message_id in messages sent
311    * starts from 1.
312    */
313   uint64_t max_message_id;
314
315   /**
316    * ID of the last message with state operations transmitted to the channel.
317    * 0 if there is no such message.
318    */
319   uint64_t max_state_message_id;
320
321   /**
322    * Maximum group generation transmitted to the channel.
323    */
324   uint64_t max_group_generation;
325
326   /**
327    * @see enum GNUNET_PSYC_Policy
328    */
329   enum GNUNET_PSYC_Policy policy;
330 };
331
332
333 /**
334  * Client context for a channel slave.
335  */
336 struct Slave
337 {
338   /**
339    * Channel struct common for Master and Slave
340    */
341   struct Channel chn;
342
343   /**
344    * Private key of the slave.
345    */
346   struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
347
348   /**
349    * Public key of the slave.
350    */
351   struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
352
353   /**
354    * Hash of @a pub_key.
355    */
356   struct GNUNET_HashCode pub_key_hash;
357
358   /**
359    * Handle for the multicast member.
360    */
361   struct GNUNET_MULTICAST_Member *member;
362
363   /**
364    * Transmit handle for multicast.
365    */
366   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
367
368   /**
369    * Peer identity of the origin.
370    */
371   struct GNUNET_PeerIdentity origin;
372
373   /**
374    * Number of items in @a relays.
375    */
376   uint32_t relay_count;
377
378   /**
379    * Relays that multicast can use to connect.
380    */
381   struct GNUNET_PeerIdentity *relays;
382
383   /**
384    * Join request to be transmitted to the master on join.
385    */
386   struct GNUNET_PSYC_Message *join_msg;
387
388   /**
389    * Join decision received from multicast.
390    */
391   struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
392
393   /**
394    * Maximum request ID for this channel.
395    */
396   uint64_t max_request_id;
397 };
398
399
400 struct OperationClosure
401 {
402   struct GNUNET_SERVER_Client *client;
403   struct Channel *chn;
404   uint64_t op_id;
405 };
406
407
408 static void
409 transmit_message (struct Channel *chn);
410
411
412 static uint64_t
413 message_queue_drop (struct Channel *chn);
414
415
416 /**
417  * Task run during shutdown.
418  *
419  * @param cls unused
420  * @param tc unused
421  */
422 static void
423 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   if (NULL != nc)
426   {
427     GNUNET_SERVER_notification_context_destroy (nc);
428     nc = NULL;
429   }
430   if (NULL != stats)
431   {
432     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
433     stats = NULL;
434   }
435 }
436
437
438 /**
439  * Clean up master data structures after a client disconnected.
440  */
441 static void
442 cleanup_master (struct Master *mst)
443 {
444   struct Channel *chn = &mst->chn;
445
446   if (NULL != mst->origin)
447     GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
448   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
449   GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
450 }
451
452
453 /**
454  * Clean up slave data structures after a client disconnected.
455  */
456 static void
457 cleanup_slave (struct Slave *slv)
458 {
459   struct Channel *chn = &slv->chn;
460   struct GNUNET_CONTAINER_MultiHashMap *
461     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
462                                                 &chn->pub_key_hash);
463   GNUNET_assert (NULL != chn_slv);
464   GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
465
466   if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
467   {
468     GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
469                                           chn_slv);
470     GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
471   }
472   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
473
474   if (NULL != slv->join_msg)
475   {
476     GNUNET_free (slv->join_msg);
477     slv->join_msg = NULL;
478   }
479   if (NULL != slv->relays)
480   {
481     GNUNET_free (slv->relays);
482     slv->relays = NULL;
483   }
484   if (NULL != slv->member)
485   {
486     GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
487     slv->member = NULL;
488   }
489   GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
490 }
491
492
493 /**
494  * Clean up channel data structures after a client disconnected.
495  */
496 static void
497 cleanup_channel (struct Channel *chn)
498 {
499   message_queue_drop (chn);
500   GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
501
502   if (NULL != chn->store_op)
503   {
504     GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
505     chn->store_op = NULL;
506   }
507
508   (GNUNET_YES == chn->is_master)
509     ? cleanup_master ((struct Master *) chn)
510     : cleanup_slave ((struct Slave *) chn);
511   GNUNET_free (chn);
512 }
513
514
515 /**
516  * Called whenever a client is disconnected.
517  * Frees our resources associated with that client.
518  *
519  * @param cls Closure.
520  * @param client Identification of the client.
521  */
522 static void
523 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
524 {
525   if (NULL == client)
526     return;
527
528   struct Channel *
529     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
530
531   if (NULL == chn)
532   {
533     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534                 "%p User context is NULL in client_disconnect()\n", chn);
535     GNUNET_break (0);
536     return;
537   }
538
539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
540               "%p Client (%s) disconnected from channel %s\n",
541               chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
542               GNUNET_h2s (&chn->pub_key_hash));
543
544   struct ClientListItem *cli = chn->clients_head;
545   while (NULL != cli)
546   {
547     if (cli->client == client)
548     {
549       GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
550       GNUNET_free (cli);
551       break;
552     }
553     cli = cli->next;
554   }
555
556   if (NULL == chn->clients_head)
557   { /* Last client disconnected. */
558     if (NULL != chn->tmit_head)
559     { /* Send pending messages to multicast before cleanup. */
560       transmit_message (chn);
561     }
562     else
563     {
564       cleanup_channel (chn);
565     }
566   }
567 }
568
569
570 /**
571  * Send message to all clients connected to the channel.
572  */
573 static void
574 client_send_msg (const struct Channel *chn,
575                  const struct GNUNET_MessageHeader *msg)
576 {
577   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
578               "%p Sending message to clients.\n", chn);
579
580   struct ClientListItem *cli = chn->clients_head;
581   while (NULL != cli)
582   {
583     GNUNET_SERVER_notification_context_add (nc, cli->client);
584     GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
585     cli = cli->next;
586   }
587 }
588
589
590 /**
591  * Send a result code back to the client.
592  *
593  * @param client
594  *        Client that should receive the result code.
595  * @param result_code
596  *        Code to transmit.
597  * @param op_id
598  *        Operation ID in network byte order.
599  * @param err_msg
600  *        Error message to include (or NULL for none).
601  */
602 static void
603 client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
604                     int64_t result_code, const char *err_msg)
605 {
606   struct OperationResult *res;
607   size_t err_size = 0;
608
609   if (NULL != err_msg)
610     err_size = strnlen (err_msg,
611                         GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
612   res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
613   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
614   res->header.size = htons (sizeof (struct OperationResult) + err_size);
615   res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
616   res->op_id = op_id;
617   if (0 < err_size)
618   {
619     memcpy (&res[1], err_msg, err_size);
620     ((char *) &res[1])[err_size - 1] = '\0';
621   }
622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623               "%p Sending result to client for operation #%" PRIu64 ": "
624               "%" PRId64 " (%s)\n",
625               client, GNUNET_ntohll (op_id), result_code, err_msg);
626
627   GNUNET_SERVER_notification_context_add (nc, client);
628   GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
629                                               GNUNET_NO);
630   GNUNET_free (res);
631 }
632
633
634 /**
635  * Closure for join_mem_test_cb()
636  */
637 struct JoinMemTestClosure
638 {
639   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
640   struct Channel *chn;
641   struct GNUNET_MULTICAST_JoinHandle *jh;
642   struct GNUNET_PSYC_JoinRequestMessage *join_msg;
643 };
644
645
646 /**
647  * Membership test result callback used for join requests.
648  */
649 static void
650 join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
651 {
652   struct JoinMemTestClosure *jcls = cls;
653
654   if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
655   { /* Pass on join request to client if this is a master channel */
656     struct Master *mst = (struct Master *) jcls->chn;
657     struct GNUNET_HashCode slave_key_hash;
658     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
659                         &slave_key_hash);
660     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
661                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
662     client_send_msg (jcls->chn, &jcls->join_msg->header);
663   }
664   else
665   {
666     // FIXME: add relays
667     GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
668   }
669   GNUNET_free (jcls->join_msg);
670   GNUNET_free (jcls);
671 }
672
673
674 /**
675  * Incoming join request from multicast.
676  */
677 static void
678 mcast_recv_join_request (void *cls,
679                          const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
680                          const struct GNUNET_MessageHeader *join_msg,
681                          struct GNUNET_MULTICAST_JoinHandle *jh)
682 {
683   struct Channel *chn = cls;
684   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
685
686   uint16_t join_msg_size = 0;
687   if (NULL != join_msg)
688   {
689     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
690     {
691       join_msg_size = ntohs (join_msg->size);
692     }
693     else
694     {
695       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
696                   "%p Got join message with invalid type %u.\n",
697                   chn, ntohs (join_msg->type));
698     }
699   }
700
701   struct GNUNET_PSYC_JoinRequestMessage *
702     req = GNUNET_malloc (sizeof (*req) + join_msg_size);
703   req->header.size = htons (sizeof (*req) + join_msg_size);
704   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
705   req->slave_key = *slave_key;
706   if (0 < join_msg_size)
707     memcpy (&req[1], join_msg, join_msg_size);
708
709   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
710   jcls->slave_key = *slave_key;
711   jcls->chn = chn;
712   jcls->jh = jh;
713   jcls->join_msg = req;
714
715   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
716                                     chn->max_message_id, 0,
717                                     &join_mem_test_cb, jcls);
718 }
719
720
721 /**
722  * Join decision received from multicast.
723  */
724 static void
725 mcast_recv_join_decision (void *cls, int is_admitted,
726                           const struct GNUNET_PeerIdentity *peer,
727                           uint16_t relay_count,
728                           const struct GNUNET_PeerIdentity *relays,
729                           const struct GNUNET_MessageHeader *join_resp)
730 {
731   struct Slave *slv = cls;
732   struct Channel *chn = &slv->chn;
733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734               "%p Got join decision: %d\n", slv, is_admitted);
735
736   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
737   struct GNUNET_PSYC_JoinDecisionMessage *
738     dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
739   dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
740   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
741   dcsn->is_admitted = htonl (is_admitted);
742   if (0 < join_resp_size)
743     memcpy (&dcsn[1], join_resp, join_resp_size);
744
745   client_send_msg (chn, &dcsn->header);
746
747   if (GNUNET_YES == is_admitted)
748   {
749     chn->is_ready = GNUNET_YES;
750   }
751   else
752   {
753     slv->member = NULL;
754   }
755 }
756
757
758 /**
759  * Received result of GNUNET_PSYCSTORE_membership_test()
760  */
761 static void
762 store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
763 {
764   struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766               "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
767               mth, result, err_msg);
768
769   GNUNET_MULTICAST_membership_test_result (mth, result);
770 }
771
772
773 /**
774  * Incoming membership test request from multicast.
775  */
776 static void
777 mcast_recv_membership_test (void *cls,
778                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
779                             uint64_t message_id, uint64_t group_generation,
780                             struct GNUNET_MULTICAST_MembershipTestHandle *mth)
781 {
782   struct Channel *chn = cls;
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "%p Received membership test request from multicast.\n",
785               mth);
786   GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
787                                     message_id, group_generation,
788                                     &store_recv_membership_test_result, mth);
789 }
790
791
792 static int
793 store_recv_fragment_replay (void *cls,
794                             struct GNUNET_MULTICAST_MessageHeader *msg,
795                             enum GNUNET_PSYCSTORE_MessageFlags flags)
796 {
797   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
798
799   GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
800   return GNUNET_YES;
801 }
802
803
804 /**
805  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
806  */
807 static void
808 store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
809 {
810   struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
813               rh, result, err_msg);
814
815   switch (result)
816   {
817   case GNUNET_YES:
818     break;
819
820   case GNUNET_NO:
821     GNUNET_MULTICAST_replay_response (rh, NULL,
822                                       GNUNET_MULTICAST_REC_NOT_FOUND);
823     break;
824
825   case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
826     GNUNET_MULTICAST_replay_response (rh, NULL,
827                                       GNUNET_MULTICAST_REC_ACCESS_DENIED);
828     break;
829
830   case GNUNET_SYSERR:
831     GNUNET_MULTICAST_replay_response (rh, NULL,
832                                       GNUNET_MULTICAST_REC_INTERNAL_ERROR);
833     break;
834   }
835   GNUNET_MULTICAST_replay_response_end (rh);
836 }
837
838
839 /**
840  * Incoming fragment replay request from multicast.
841  */
842 static void
843 mcast_recv_replay_fragment (void *cls,
844                             const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
845                             uint64_t fragment_id, uint64_t flags,
846                             struct GNUNET_MULTICAST_ReplayHandle *rh)
847
848 {
849   struct Channel *chn = cls;
850   GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
851                                  fragment_id, fragment_id,
852                                  &store_recv_fragment_replay,
853                                  &store_recv_fragment_replay_result, rh);
854 }
855
856
857 /**
858  * Incoming message replay request from multicast.
859  */
860 static void
861 mcast_recv_replay_message (void *cls,
862                            const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
863                            uint64_t message_id,
864                            uint64_t fragment_offset,
865                            uint64_t flags,
866                            struct GNUNET_MULTICAST_ReplayHandle *rh)
867 {
868   struct Channel *chn = cls;
869   GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
870                                 message_id, message_id,
871                                 &store_recv_fragment_replay,
872                                 &store_recv_fragment_replay_result, rh);
873 }
874
875
876 /**
877  * Convert an uint64_t in network byte order to a HashCode
878  * that can be used as key in a MultiHashMap
879  */
880 static inline void
881 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
882 {
883   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
884   /* TODO: use built-in byte swap functions if available */
885
886   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
887   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
888
889   *key = (struct GNUNET_HashCode) {};
890   *((uint64_t *) key)
891     = (n << 32) | (n >> 32);
892 }
893
894
895 /**
896  * Convert an uint64_t in host byte order to a HashCode
897  * that can be used as key in a MultiHashMap
898  */
899 static inline void
900 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
901 {
902 #if __BYTE_ORDER == __BIG_ENDIAN
903   hash_key_from_nll (key, n);
904 #elif __BYTE_ORDER == __LITTLE_ENDIAN
905   *key = (struct GNUNET_HashCode) {};
906   *((uint64_t *) key) = n;
907 #else
908   #error byteorder undefined
909 #endif
910 }
911
912
913 /**
914  * Send multicast message to all clients connected to the channel.
915  */
916 static void
917 client_send_mcast_msg (struct Channel *chn,
918                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
919                        uint32_t flags)
920 {
921   struct GNUNET_PSYC_MessageHeader *pmsg;
922   uint16_t size = ntohs (mmsg->header.size);
923   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
924
925   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926               "%p Sending multicast message to client. "
927               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
928               chn, GNUNET_ntohll (mmsg->fragment_id),
929               GNUNET_ntohll (mmsg->message_id));
930
931   pmsg = GNUNET_malloc (psize);
932   pmsg->header.size = htons (psize);
933   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
934   pmsg->message_id = mmsg->message_id;
935   pmsg->fragment_offset = mmsg->fragment_offset;
936   pmsg->flags = htonl (flags);
937
938   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
939   client_send_msg (chn, &pmsg->header);
940   GNUNET_free (pmsg);
941 }
942
943
944 /**
945  * Send multicast request to all clients connected to the channel.
946  */
947 static void
948 client_send_mcast_req (struct Master *mst,
949                        const struct GNUNET_MULTICAST_RequestHeader *req)
950 {
951   struct Channel *chn = &mst->chn;
952
953   struct GNUNET_PSYC_MessageHeader *pmsg;
954   uint16_t size = ntohs (req->header.size);
955   uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
956
957   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958               "%p Sending multicast request to client. "
959               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
960               chn, GNUNET_ntohll (req->fragment_id),
961               GNUNET_ntohll (req->request_id));
962
963   pmsg = GNUNET_malloc (psize);
964   pmsg->header.size = htons (psize);
965   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
966   pmsg->message_id = req->request_id;
967   pmsg->fragment_offset = req->fragment_offset;
968   pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
969
970   memcpy (&pmsg[1], &req[1], size - sizeof (*req));
971   client_send_msg (chn, &pmsg->header);
972   GNUNET_free (pmsg);
973 }
974
975
976 /**
977  * Insert a multicast message fragment into the queue belonging to the message.
978  *
979  * @param chn           Channel.
980  * @param mmsg         Multicast message fragment.
981  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
982  * @param first_ptype  First PSYC message part type in @a mmsg.
983  * @param last_ptype   Last PSYC message part type in @a mmsg.
984  */
985 static void
986 fragment_queue_insert (struct Channel *chn,
987                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
988                        uint16_t first_ptype, uint16_t last_ptype)
989 {
990   const uint16_t size = ntohs (mmsg->header.size);
991   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
992   struct GNUNET_CONTAINER_MultiHashMap
993     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
994                                                     &chn->pub_key_hash);
995
996   struct GNUNET_HashCode msg_id_hash;
997   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
998
999   struct FragmentQueue
1000     *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1001
1002   if (NULL == fragq)
1003   {
1004     fragq = GNUNET_new (struct FragmentQueue);
1005     fragq->state = MSG_FRAG_STATE_HEADER;
1006     fragq->fragments
1007       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1008
1009     GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
1010                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1011
1012     if (NULL == chan_msgs)
1013     {
1014       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1015       GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
1016                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1017     }
1018   }
1019
1020   struct GNUNET_HashCode frag_id_hash;
1021   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
1022   struct RecvCacheEntry
1023     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1024   if (NULL == cache_entry)
1025   {
1026     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                 "%p Adding message fragment to cache. "
1028                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
1029                 chn, GNUNET_ntohll (mmsg->message_id),
1030                 GNUNET_ntohll (mmsg->fragment_id));
1031     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032                 "%p header_size: %" PRIu64 " + %u\n",
1033                 chn, fragq->header_size, size);
1034     cache_entry = GNUNET_new (struct RecvCacheEntry);
1035     cache_entry->ref_count = 1;
1036     cache_entry->mmsg = GNUNET_malloc (size);
1037     memcpy (cache_entry->mmsg, mmsg, size);
1038     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
1039                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1040   }
1041   else
1042   {
1043     cache_entry->ref_count++;
1044     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1045                 "%p Message fragment is already in cache. "
1046                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
1047                 ", ref_count: %u\n",
1048                 chn, GNUNET_ntohll (mmsg->message_id),
1049                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
1050   }
1051
1052   if (MSG_FRAG_STATE_HEADER == fragq->state)
1053   {
1054     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1055     {
1056       struct GNUNET_PSYC_MessageMethod *
1057         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
1058       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
1059       fragq->flags = ntohl (pmeth->flags);
1060     }
1061
1062     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
1063     {
1064       fragq->header_size += size;
1065     }
1066     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1067              || frag_offset == fragq->header_size)
1068     { /* header is now complete */
1069       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1070                   "%p Header of message %" PRIu64 " is complete.\n",
1071                   chn, GNUNET_ntohll (mmsg->message_id));
1072
1073       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1074                   "%p Adding message %" PRIu64 " to queue.\n",
1075                   chn, GNUNET_ntohll (mmsg->message_id));
1076       fragq->state = MSG_FRAG_STATE_DATA;
1077     }
1078     else
1079     {
1080       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1081                   "%p Header of message %" PRIu64 " is NOT complete yet: "
1082                   "%" PRIu64 " != %" PRIu64 "\n",
1083                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1084                   fragq->header_size);
1085     }
1086   }
1087
1088   switch (last_ptype)
1089   {
1090   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1091     if (frag_offset == fragq->size)
1092       fragq->state = MSG_FRAG_STATE_END;
1093     else
1094       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1095                   "%p Message %" PRIu64 " is NOT complete yet: "
1096                   "%" PRIu64 " != %" PRIu64 "\n",
1097                   chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
1098                   fragq->size);
1099     break;
1100
1101   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1102     /* Drop message without delivering to client if it's a single fragment */
1103     fragq->state =
1104       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1105       ? MSG_FRAG_STATE_DROP
1106       : MSG_FRAG_STATE_CANCEL;
1107   }
1108
1109   switch (fragq->state)
1110   {
1111   case MSG_FRAG_STATE_DATA:
1112   case MSG_FRAG_STATE_END:
1113   case MSG_FRAG_STATE_CANCEL:
1114     if (GNUNET_NO == fragq->is_queued)
1115     {
1116       GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
1117                                     GNUNET_ntohll (mmsg->message_id));
1118       fragq->is_queued = GNUNET_YES;
1119     }
1120   }
1121
1122   fragq->size += size;
1123   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
1124                                 GNUNET_ntohll (mmsg->fragment_id));
1125 }
1126
1127
1128 /**
1129  * Run fragment queue of a message.
1130  *
1131  * Send fragments of a message in order to client, after all modifiers arrived
1132  * from multicast.
1133  *
1134  * @param chn      Channel.
1135  * @param msg_id  ID of the message @a fragq belongs to.
1136  * @param fragq   Fragment queue of the message.
1137  * @param drop    Drop message without delivering to client?
1138  *                #GNUNET_YES or #GNUNET_NO.
1139  */
1140 static void
1141 fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1142                     struct FragmentQueue *fragq, uint8_t drop)
1143 {
1144   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1145               "%p Running message fragment queue for message %" PRIu64
1146               " (state: %u).\n",
1147               chn, msg_id, fragq->state);
1148
1149   struct GNUNET_CONTAINER_MultiHashMap
1150     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1151                                                     &chn->pub_key_hash);
1152   GNUNET_assert (NULL != chan_msgs);
1153   uint64_t frag_id;
1154
1155   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
1156                                                     &frag_id))
1157   {
1158     struct GNUNET_HashCode frag_id_hash;
1159     hash_key_from_hll (&frag_id_hash, frag_id);
1160     struct RecvCacheEntry *cache_entry
1161       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
1162     if (cache_entry != NULL)
1163     {
1164       if (GNUNET_NO == drop)
1165       {
1166         client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1167       }
1168       if (cache_entry->ref_count <= 1)
1169       {
1170         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
1171                                               cache_entry);
1172         GNUNET_free (cache_entry->mmsg);
1173         GNUNET_free (cache_entry);
1174       }
1175       else
1176       {
1177         cache_entry->ref_count--;
1178       }
1179     }
1180 #if CACHE_AGING_IMPLEMENTED
1181     else if (GNUNET_NO == drop)
1182     {
1183       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
1184     }
1185 #endif
1186
1187     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
1188   }
1189
1190   if (MSG_FRAG_STATE_END <= fragq->state)
1191   {
1192     struct GNUNET_HashCode msg_id_hash;
1193     hash_key_from_hll (&msg_id_hash, msg_id);
1194
1195     GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1196     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1197     GNUNET_free (fragq);
1198   }
1199   else
1200   {
1201     fragq->is_queued = GNUNET_NO;
1202   }
1203 }
1204
1205
1206 /**
1207  * Run message queue.
1208  *
1209  * Send messages in queue to client in order after a message has arrived from
1210  * multicast, according to the following:
1211  * - A message is only sent if all of its modifiers arrived.
1212  * - A stateful message is only sent if the previous stateful message
1213  *   has already been delivered to the client.
1214  *
1215  * @param chn  Channel.
1216  *
1217  * @return Number of messages removed from queue and sent to client.
1218  */
1219 static uint64_t
1220 message_queue_run (struct Channel *chn)
1221 {
1222   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1223               "%p Running message queue.\n", chn);
1224   uint64_t n = 0;
1225   uint64_t msg_id;
1226   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1227                                                     &msg_id))
1228   {
1229     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1230                 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1231     struct GNUNET_HashCode msg_id_hash;
1232     hash_key_from_hll (&msg_id_hash, msg_id);
1233
1234     struct FragmentQueue *
1235       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1236
1237     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1238     {
1239       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1240                   "%p No fragq (%p) or header not complete.\n",
1241                   chn, fragq);
1242       break;
1243     }
1244
1245     if (MSG_FRAG_STATE_HEADER == fragq->state)
1246     {
1247       /* Check if there's a missing message before the current one */
1248       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1249       {
1250         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1251             && msg_id - 1 != chn->max_message_id)
1252         {
1253           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1254                       "%p Out of order message. "
1255                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1256                       chn, msg_id, chn->max_message_id);
1257           break;
1258         }
1259       }
1260       else
1261       {
1262         if (msg_id - fragq->state_delta != chn->max_state_message_id)
1263         {
1264           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1265                       "%p Out of order stateful message. "
1266                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1267                       chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1268           break;
1269         }
1270 #if TODO
1271         /* FIXME: apply modifiers to state in PSYCstore */
1272         GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1273                                        store_recv_state_modify_result, cls);
1274 #endif
1275         chn->max_state_message_id = msg_id;
1276       }
1277       chn->max_message_id = msg_id;
1278     }
1279     fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1280     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1281     n++;
1282   }
1283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1284               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1285   return n;
1286 }
1287
1288
1289 /**
1290  * Drop message queue of a channel.
1291  *
1292  * Remove all messages in queue without sending it to clients.
1293  *
1294  * @param chn  Channel.
1295  *
1296  * @return Number of messages removed from queue.
1297  */
1298 static uint64_t
1299 message_queue_drop (struct Channel *chn)
1300 {
1301   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1302               "%p Dropping message queue.\n", chn);
1303   uint64_t n = 0;
1304   uint64_t msg_id;
1305   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1306                                                     &msg_id))
1307   {
1308     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1309                 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1310     struct GNUNET_HashCode msg_id_hash;
1311     hash_key_from_hll (&msg_id_hash, msg_id);
1312
1313     struct FragmentQueue *
1314       fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1315     GNUNET_assert (NULL != fragq);
1316     fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1317     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1318     n++;
1319   }
1320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1322   return n;
1323 }
1324
1325
1326 /**
1327  * Received result of GNUNET_PSYCSTORE_fragment_store().
1328  */
1329 static void
1330 store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1331 {
1332   struct Channel *chn = cls;
1333   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334               "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1335               chn, result, err_msg);
1336 }
1337
1338
1339 /**
1340  * Handle incoming message fragment from multicast.
1341  *
1342  * Store it using PSYCstore and send it to the clients of the channel in order.
1343  */
1344 static void
1345 mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1346 {
1347   struct Channel *chn = cls;
1348   uint16_t size = ntohs (mmsg->header.size);
1349
1350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1351               "%p Received multicast message of size %u.\n",
1352               chn, size);
1353
1354   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1355                                    &store_recv_fragment_store_result, chn);
1356
1357   uint16_t first_ptype = 0, last_ptype = 0;
1358   if (GNUNET_SYSERR
1359       == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1360                                           (const char *) &mmsg[1],
1361                                           &first_ptype, &last_ptype))
1362   {
1363     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1364                 "%p Dropping incoming multicast message with invalid parts.\n",
1365                 chn);
1366     GNUNET_break_op (0);
1367     return;
1368   }
1369
1370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371               "Message parts: first: type %u, last: type %u\n",
1372               first_ptype, last_ptype);
1373
1374   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1375   message_queue_run (chn);
1376 }
1377
1378
1379 /**
1380  * Incoming request fragment from multicast for a master.
1381  *
1382  * @param cls   Master.
1383  * @param req   The request.
1384  */
1385 static void
1386 mcast_recv_request (void *cls,
1387                     const struct GNUNET_MULTICAST_RequestHeader *req)
1388 {
1389   struct Master *mst = cls;
1390   uint16_t size = ntohs (req->header.size);
1391
1392   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393               "%p Received multicast request of size %u.\n",
1394               mst, size);
1395
1396   uint16_t first_ptype = 0, last_ptype = 0;
1397   if (GNUNET_SYSERR
1398       == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1399                                           (const char *) &req[1],
1400                                           &first_ptype, &last_ptype))
1401   {
1402     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1403                 "%p Dropping incoming multicast request with invalid parts.\n",
1404                 mst);
1405     GNUNET_break_op (0);
1406     return;
1407   }
1408
1409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410               "Message parts: first: type %u, last: type %u\n",
1411               first_ptype, last_ptype);
1412
1413   /* FIXME: in-order delivery */
1414   client_send_mcast_req (mst, req);
1415 }
1416
1417
1418 /**
1419  * Response from PSYCstore with the current counter values for a channel master.
1420  */
1421 static void
1422 store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1423                             uint64_t max_message_id, uint64_t max_group_generation,
1424                             uint64_t max_state_message_id)
1425 {
1426   struct Master *mst = cls;
1427   struct Channel *chn = &mst->chn;
1428   chn->store_op = NULL;
1429
1430   struct GNUNET_PSYC_CountersResultMessage res;
1431   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1432   res.header.size = htons (sizeof (res));
1433   res.result_code = htonl (result - INT32_MIN);
1434   res.max_message_id = GNUNET_htonll (max_message_id);
1435
1436   if (GNUNET_OK == result || GNUNET_NO == result)
1437   {
1438     mst->max_message_id = max_message_id;
1439     chn->max_message_id = max_message_id;
1440     chn->max_state_message_id = max_state_message_id;
1441     mst->max_group_generation = max_group_generation;
1442     mst->origin
1443       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1444                                        &mcast_recv_join_request,
1445                                        &mcast_recv_membership_test,
1446                                        &mcast_recv_replay_fragment,
1447                                        &mcast_recv_replay_message,
1448                                        &mcast_recv_request,
1449                                        &mcast_recv_message, chn);
1450     chn->is_ready = GNUNET_YES;
1451   }
1452   else
1453   {
1454     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1455                 "%p GNUNET_PSYCSTORE_counters_get() "
1456                 "returned %d for channel %s.\n",
1457                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1458   }
1459
1460   client_send_msg (chn, &res.header);
1461 }
1462
1463
1464 /**
1465  * Response from PSYCstore with the current counter values for a channel slave.
1466  */
1467 void
1468 store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1469                            uint64_t max_message_id, uint64_t max_group_generation,
1470                            uint64_t max_state_message_id)
1471 {
1472   struct Slave *slv = cls;
1473   struct Channel *chn = &slv->chn;
1474   chn->store_op = NULL;
1475
1476   struct GNUNET_PSYC_CountersResultMessage res;
1477   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1478   res.header.size = htons (sizeof (res));
1479   res.result_code = htonl (result - INT32_MIN);
1480   res.max_message_id = GNUNET_htonll (max_message_id);
1481
1482   if (GNUNET_OK == result || GNUNET_NO == result)
1483   {
1484     chn->max_message_id = max_message_id;
1485     chn->max_state_message_id = max_state_message_id;
1486     slv->member
1487       = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1488                                       &slv->origin,
1489                                       slv->relay_count, slv->relays,
1490                                       &slv->join_msg->header,
1491                                       &mcast_recv_join_request,
1492                                       &mcast_recv_join_decision,
1493                                       &mcast_recv_membership_test,
1494                                       &mcast_recv_replay_fragment,
1495                                       &mcast_recv_replay_message,
1496                                       &mcast_recv_message, chn);
1497     if (NULL != slv->join_msg)
1498     {
1499       GNUNET_free (slv->join_msg);
1500       slv->join_msg = NULL;
1501     }
1502   }
1503   else
1504   {
1505     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1506                 "%p GNUNET_PSYCSTORE_counters_get() "
1507                 "returned %d for channel %s.\n",
1508                 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1509   }
1510
1511   client_send_msg (chn, &res.header);
1512 }
1513
1514
1515 static void
1516 channel_init (struct Channel *chn)
1517 {
1518   chn->recv_msgs
1519     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1520   chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1521 }
1522
1523
1524 /**
1525  * Handle a connecting client starting a channel master.
1526  */
1527 static void
1528 client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1529                           const struct GNUNET_MessageHeader *msg)
1530 {
1531   const struct MasterStartRequest *req
1532     = (const struct MasterStartRequest *) msg;
1533
1534   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1535   struct GNUNET_HashCode pub_key_hash;
1536
1537   GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1538   GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1539
1540   struct Master *
1541     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1542   struct Channel *chn;
1543
1544   if (NULL == mst)
1545   {
1546     mst = GNUNET_new (struct Master);
1547     mst->policy = ntohl (req->policy);
1548     mst->priv_key = req->channel_key;
1549     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1550
1551     chn = &mst->chn;
1552     chn->is_master = GNUNET_YES;
1553     chn->pub_key = pub_key;
1554     chn->pub_key_hash = pub_key_hash;
1555     channel_init (chn);
1556
1557     GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1558                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1559     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1560                                                    store_recv_master_counters, mst);
1561   }
1562   else
1563   {
1564     chn = &mst->chn;
1565
1566     struct GNUNET_PSYC_CountersResultMessage res;
1567     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1568     res.header.size = htons (sizeof (res));
1569     res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
1570     res.max_message_id = GNUNET_htonll (mst->max_message_id);
1571
1572     GNUNET_SERVER_notification_context_add (nc, client);
1573     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1574                                                 GNUNET_NO);
1575   }
1576
1577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578               "%p Client connected as master to channel %s.\n",
1579               mst, GNUNET_h2s (&chn->pub_key_hash));
1580
1581   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1582   cli->client = client;
1583   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1584
1585   GNUNET_SERVER_client_set_user_context (client, chn);
1586   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1587 }
1588
1589
1590 /**
1591  * Handle a connecting client joining as a channel slave.
1592  */
1593 static void
1594 client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1595                         const struct GNUNET_MessageHeader *msg)
1596 {
1597   const struct SlaveJoinRequest *req
1598     = (const struct SlaveJoinRequest *) msg;
1599   uint16_t req_size = ntohs (req->header.size);
1600
1601   struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1602   struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1603
1604   GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1605   GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1606   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1607
1608   struct GNUNET_CONTAINER_MultiHashMap *
1609     chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1610   struct Slave *slv = NULL;
1611   struct Channel *chn;
1612
1613   if (NULL != chn_slv)
1614   {
1615     slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1616   }
1617   if (NULL == slv)
1618   {
1619     slv = GNUNET_new (struct Slave);
1620     slv->priv_key = req->slave_key;
1621     slv->pub_key = slv_pub_key;
1622     slv->pub_key_hash = slv_pub_key_hash;
1623     slv->origin = req->origin;
1624     slv->relay_count = ntohl (req->relay_count);
1625
1626     const struct GNUNET_PeerIdentity *
1627       relays = (const struct GNUNET_PeerIdentity *) &req[1];
1628     uint16_t relay_size = slv->relay_count * sizeof (*relays);
1629     uint16_t join_msg_size = 0;
1630
1631     if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1632         <= req_size)
1633     {
1634       join_msg_size = ntohs (slv->join_msg->header.size);
1635       slv->join_msg = GNUNET_malloc (join_msg_size);
1636       memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1637     }
1638     if (sizeof (*req) + relay_size + join_msg_size != req_size)
1639     {
1640       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1641                   "%u + %u + %u != %u\n",
1642                   sizeof (*req), relay_size, join_msg_size, req_size);
1643       GNUNET_break (0);
1644       GNUNET_SERVER_client_disconnect (client);
1645       GNUNET_free (slv);
1646       return;
1647     }
1648     if (0 < slv->relay_count)
1649     {
1650       slv->relays = GNUNET_malloc (relay_size);
1651       memcpy (slv->relays, &req[1], relay_size);
1652     }
1653
1654     chn = &slv->chn;
1655     chn->is_master = GNUNET_NO;
1656     chn->pub_key = req->channel_key;
1657     chn->pub_key_hash = pub_key_hash;
1658     channel_init (chn);
1659
1660     if (NULL == chn_slv)
1661     {
1662       chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1663       GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1664                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1665     }
1666     GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1667                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1668     GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1669                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1670     chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1671                                                   &store_recv_slave_counters, slv);
1672   }
1673   else
1674   {
1675     chn = &slv->chn;
1676
1677     struct GNUNET_PSYC_CountersResultMessage res;
1678     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1679     res.header.size = htons (sizeof (res));
1680     res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
1681     res.max_message_id = GNUNET_htonll (chn->max_message_id);
1682
1683     GNUNET_SERVER_notification_context_add (nc, client);
1684     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1685                                                 GNUNET_NO);
1686
1687     if (NULL == slv->member)
1688     {
1689       slv->member
1690         = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1691                                         &slv->origin,
1692                                         slv->relay_count, slv->relays,
1693                                         &slv->join_msg->header,
1694                                         &mcast_recv_join_request,
1695                                         &mcast_recv_join_decision,
1696                                         &mcast_recv_membership_test,
1697                                         &mcast_recv_replay_fragment,
1698                                         &mcast_recv_replay_message,
1699                                         &mcast_recv_message, chn);
1700       if (NULL != slv->join_msg)
1701       {
1702         GNUNET_free (slv->join_msg);
1703         slv->join_msg = NULL;
1704       }
1705     }
1706     else if (NULL != slv->join_dcsn)
1707     {
1708       GNUNET_SERVER_notification_context_add (nc, client);
1709       GNUNET_SERVER_notification_context_unicast (nc, client,
1710                                                   &slv->join_dcsn->header,
1711                                                   GNUNET_NO);
1712     }
1713   }
1714
1715   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716               "%p Client connected as slave to channel %s.\n",
1717               slv, GNUNET_h2s (&chn->pub_key_hash));
1718
1719   struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1720   cli->client = client;
1721   GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1722
1723   GNUNET_SERVER_client_set_user_context (client, chn);
1724   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1725 }
1726
1727
1728 struct JoinDecisionClosure
1729 {
1730   int32_t is_admitted;
1731   struct GNUNET_MessageHeader *msg;
1732 };
1733
1734
1735 /**
1736  * Iterator callback for sending join decisions to multicast.
1737  */
1738 static int
1739 mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1740                           void *value)
1741 {
1742   struct JoinDecisionClosure *jcls = cls;
1743   struct GNUNET_MULTICAST_JoinHandle *jh = value;
1744   // FIXME: add relays
1745   GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1746   return GNUNET_YES;
1747 }
1748
1749
1750 /**
1751  * Join decision from client.
1752  */
1753 static void
1754 client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1755                            const struct GNUNET_MessageHeader *msg)
1756 {
1757   const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
1758     = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
1759   struct Channel *chn;
1760   struct Master *mst;
1761   struct JoinDecisionClosure jcls;
1762
1763   chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1764   if (NULL == chn)
1765   {
1766     GNUNET_break (0);
1767     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1768     return;
1769   }
1770   GNUNET_assert (GNUNET_YES == chn->is_master);
1771   mst = (struct Master *) chn;
1772   jcls.is_admitted = ntohl (dcsn->is_admitted);
1773   jcls.msg
1774     = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1775     ? (struct GNUNET_MessageHeader *) &dcsn[1]
1776     : NULL;
1777
1778   struct GNUNET_HashCode slave_key_hash;
1779   GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
1780                       &slave_key_hash);
1781
1782   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783               "%p Got join decision (%d) from client for channel %s..\n",
1784               mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786               "%p ..and slave %s.\n",
1787               mst, GNUNET_h2s (&slave_key_hash));
1788
1789   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1790                                               &mcast_send_join_decision, &jcls);
1791   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1792   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1793 }
1794
1795
1796 /**
1797  * Send acknowledgement to a client.
1798  *
1799  * Sent after a message fragment has been passed on to multicast.
1800  *
1801  * @param chn The channel struct for the client.
1802  */
1803 static void
1804 send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1805 {
1806   struct GNUNET_MessageHeader res;
1807   res.size = htons (sizeof (res));
1808   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1809
1810   /* FIXME */
1811   GNUNET_SERVER_notification_context_add (nc, client);
1812   GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1813 }
1814
1815
1816 /**
1817  * Callback for the transmit functions of multicast.
1818  */
1819 static int
1820 transmit_notify (void *cls, size_t *data_size, void *data)
1821 {
1822   struct Channel *chn = cls;
1823   struct TransmitMessage *tmit_msg = chn->tmit_head;
1824
1825   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1826   {
1827     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1828                 "%p transmit_notify: nothing to send.\n", chn);
1829     *data_size = 0;
1830     return GNUNET_NO;
1831   }
1832
1833   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834               "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1835
1836   *data_size = tmit_msg->size;
1837   memcpy (data, &tmit_msg[1], *data_size);
1838
1839   int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1840
1841   if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
1842     send_message_ack (chn, tmit_msg->client);
1843
1844   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1845   GNUNET_free (tmit_msg);
1846
1847   if (NULL != chn->tmit_head)
1848   {
1849     transmit_message (chn);
1850   }
1851   else if (GNUNET_YES == chn->is_disconnected)
1852   {
1853     /* FIXME: handle partial message (when still in_transmit) */
1854     cleanup_channel (chn);
1855   }
1856   return ret;
1857 }
1858
1859
1860 /**
1861  * Callback for the transmit functions of multicast.
1862  */
1863 static int
1864 master_transmit_notify (void *cls, size_t *data_size, void *data)
1865 {
1866   int ret = transmit_notify (cls, data_size, data);
1867
1868   if (GNUNET_YES == ret)
1869   {
1870     struct Master *mst = cls;
1871     mst->tmit_handle = NULL;
1872   }
1873   return ret;
1874 }
1875
1876
1877 /**
1878  * Callback for the transmit functions of multicast.
1879  */
1880 static int
1881 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1882 {
1883   int ret = transmit_notify (cls, data_size, data);
1884
1885   if (GNUNET_YES == ret)
1886   {
1887     struct Slave *slv = cls;
1888     slv->tmit_handle = NULL;
1889   }
1890   return ret;
1891 }
1892
1893
1894 /**
1895  * Transmit a message from a channel master to the multicast group.
1896  */
1897 static void
1898 master_transmit_message (struct Master *mst)
1899 {
1900   if (NULL == mst->tmit_handle)
1901   {
1902     mst->tmit_handle
1903       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1904                                         mst->max_group_generation,
1905                                         master_transmit_notify, mst);
1906   }
1907   else
1908   {
1909     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1910   }
1911 }
1912
1913
1914 /**
1915  * Transmit a message from a channel slave to the multicast group.
1916  */
1917 static void
1918 slave_transmit_message (struct Slave *slv)
1919 {
1920   if (NULL == slv->tmit_handle)
1921   {
1922     slv->tmit_handle
1923       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1924                                            slave_transmit_notify, slv);
1925   }
1926   else
1927   {
1928     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1929   }
1930 }
1931
1932
1933 static void
1934 transmit_message (struct Channel *chn)
1935 {
1936   chn->is_master
1937     ? master_transmit_message ((struct Master *) chn)
1938     : slave_transmit_message ((struct Slave *) chn);
1939 }
1940
1941
1942 /**
1943  * Queue a message from a channel master for sending to the multicast group.
1944  */
1945 static void
1946 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1947                      uint16_t first_ptype, uint16_t last_ptype)
1948 {
1949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1950
1951   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1952   {
1953     tmit_msg->id = ++mst->max_message_id;
1954     struct GNUNET_PSYC_MessageMethod *pmeth
1955       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1956
1957     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1958     {
1959       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1960     }
1961     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1962     {
1963       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1964                                           - mst->max_state_message_id);
1965     }
1966     else
1967     {
1968       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1969     }
1970   }
1971 }
1972
1973
1974 /**
1975  * Queue a message from a channel slave for sending to the multicast group.
1976  */
1977 static void
1978 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1979                      uint16_t first_ptype, uint16_t last_ptype)
1980 {
1981   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1982   {
1983     struct GNUNET_PSYC_MessageMethod *pmeth
1984       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1985     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1986     tmit_msg->id = ++slv->max_request_id;
1987   }
1988 }
1989
1990
1991 /**
1992  * Queue PSYC message parts for sending to multicast.
1993  *
1994  * @param chn           Channel to send to.
1995  * @param client       Client the message originates from.
1996  * @param data_size    Size of @a data.
1997  * @param data         Concatenated message parts.
1998  * @param first_ptype  First message part type in @a data.
1999  * @param last_ptype   Last message part type in @a data.
2000  */
2001 static struct TransmitMessage *
2002 queue_message (struct Channel *chn,
2003                struct GNUNET_SERVER_Client *client,
2004                size_t data_size,
2005                const void *data,
2006                uint16_t first_ptype, uint16_t last_ptype)
2007 {
2008   struct TransmitMessage *
2009     tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
2010   memcpy (&tmit_msg[1], data, data_size);
2011   tmit_msg->client = client;
2012   tmit_msg->size = data_size;
2013   tmit_msg->state = chn->tmit_state;
2014
2015   /* FIXME: separate queue per message ID */
2016
2017   GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2018
2019   chn->is_master
2020     ? master_queue_message ((struct Master *) chn, tmit_msg,
2021                             first_ptype, last_ptype)
2022     : slave_queue_message ((struct Slave *) chn, tmit_msg,
2023                            first_ptype, last_ptype);
2024   return tmit_msg;
2025 }
2026
2027
2028 /**
2029  * Cancel transmission of current message.
2030  *
2031  * @param chn     Channel to send to.
2032  * @param client  Client the message originates from.
2033  */
2034 static void
2035 transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2036 {
2037   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2038
2039   struct GNUNET_MessageHeader msg;
2040   msg.size = htons (sizeof (msg));
2041   msg.type = htons (type);
2042
2043   queue_message (chn, client, sizeof (msg), &msg, type, type);
2044   transmit_message (chn);
2045
2046   /* FIXME: cleanup */
2047 }
2048
2049
2050 /**
2051  * Incoming message from a master or slave client.
2052  */
2053 static void
2054 client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2055                           const struct GNUNET_MessageHeader *msg)
2056 {
2057   struct Channel *
2058     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2059   GNUNET_assert (NULL != chn);
2060
2061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062               "%p Received message from client.\n", chn);
2063   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
2064
2065   if (GNUNET_YES != chn->is_ready)
2066   {
2067     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2068                 "%p Channel is not ready yet, disconnecting client.\n", chn);
2069     GNUNET_break (0);
2070     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2071     return;
2072   }
2073
2074   uint16_t size = ntohs (msg->size);
2075   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2076   {
2077     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
2078     GNUNET_break (0);
2079     transmit_cancel (chn, client);
2080     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2081     return;
2082   }
2083
2084   uint16_t first_ptype = 0, last_ptype = 0;
2085   if (GNUNET_SYSERR
2086       == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
2087                                           (const char *) &msg[1],
2088                                           &first_ptype, &last_ptype))
2089   {
2090     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2091                 "%p Received invalid message part from client.\n", chn);
2092     GNUNET_break (0);
2093     transmit_cancel (chn, client);
2094     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2095     return;
2096   }
2097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2098               "%p Received message with first part type %u and last part type %u.\n",
2099               chn, first_ptype, last_ptype);
2100
2101   queue_message (chn, client, size - sizeof (*msg), &msg[1],
2102                  first_ptype, last_ptype);
2103   transmit_message (chn);
2104   /* FIXME: send a few ACKs even before transmit_notify is called */
2105
2106   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2107 };
2108
2109
2110 struct MembershipStoreClosure
2111 {
2112   struct GNUNET_SERVER_Client *client;
2113   struct Channel *chn;
2114   uint64_t op_id;
2115 };
2116
2117
2118 /**
2119  * Received result of GNUNET_PSYCSTORE_membership_store()
2120  */
2121 static void
2122 store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2123 {
2124   struct MembershipStoreClosure *mcls = cls;
2125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2126               "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2127               mcls->chn, result, err_msg);
2128
2129   client_send_result (mcls->client, mcls->op_id, result, err_msg);
2130 }
2131
2132
2133 /**
2134  * Client requests to add/remove a slave in the membership database.
2135  */
2136 static void
2137 client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2138                               const struct GNUNET_MessageHeader *msg)
2139 {
2140   struct Channel *
2141     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2142   GNUNET_assert (NULL != chn);
2143
2144   const struct ChannelMembershipStoreRequest *
2145     req = (const struct ChannelMembershipStoreRequest *) msg;
2146
2147   struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2148   mcls->client = client;
2149   mcls->chn = chn;
2150   mcls->op_id = req->op_id;
2151
2152   uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2153   uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2154   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2155               "%p Received membership store request from client.\n", chn);
2156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2157               "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
2158               chn, req->did_join, announced_at, effective_since);
2159
2160   GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2161                                      req->did_join, announced_at, effective_since,
2162                                      0, /* FIXME: group_generation */
2163                                      &store_recv_membership_store_result, mcls);
2164   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2165 }
2166
2167
2168 static int
2169 store_recv_fragment_history (void *cls,
2170                              struct GNUNET_MULTICAST_MessageHeader *msg,
2171                              enum GNUNET_PSYCSTORE_MessageFlags flags)
2172 {
2173   struct OperationClosure *opcls = cls;
2174   struct Channel *chn = opcls->chn;
2175   client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
2176   return GNUNET_YES;
2177 }
2178
2179
2180 /**
2181  * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
2182  */
2183 static void
2184 store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
2185 {
2186   struct OperationClosure *opcls = cls;
2187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2188               "%p History replay #%" PRIu64 ": "
2189               "PSYCSTORE returned %" PRId64 " (%s)\n",
2190               opcls->chn, opcls->op_id, result, err_msg);
2191
2192   client_send_result (opcls->client, opcls->op_id, result, err_msg);
2193 }
2194
2195
2196 /**
2197  * Client requests channel history from PSYCstore.
2198  */
2199 static void
2200 client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2201                             const struct GNUNET_MessageHeader *msg)
2202 {
2203   struct Channel *
2204     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2205   GNUNET_assert (NULL != chn);
2206
2207   const struct HistoryRequest *
2208     req = (const struct HistoryRequest *) msg;
2209
2210   struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2211   opcls->client = client;
2212   opcls->chn = chn;
2213   opcls->op_id = req->op_id;
2214
2215   if (0 == req->message_limit)
2216     GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2217                                   GNUNET_ntohll (req->start_message_id),
2218                                   GNUNET_ntohll (req->end_message_id),
2219                                   &store_recv_fragment_history,
2220                                   &store_recv_fragment_history_result, opcls);
2221   else
2222     GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2223                                          GNUNET_ntohll (req->message_limit),
2224                                          &store_recv_fragment_history,
2225                                          &store_recv_fragment_history_result,
2226                                          opcls);
2227
2228   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2229 }
2230
2231
2232 /**
2233  * Received state var from PSYCstore, send it to client.
2234  */
2235 static int
2236 store_recv_state_var (void *cls, const char *name,
2237                       const void *value, size_t value_size)
2238 {
2239   struct OperationClosure *opcls = cls;
2240   struct OperationResult *op;
2241
2242   if (NULL != name)
2243   {
2244     uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2245     struct GNUNET_PSYC_MessageModifier *mod;
2246     op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2247     op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2248     op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2249     op->op_id = opcls->op_id;
2250
2251     mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
2252     mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2253     mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2254     mod->name_size = htons (name_size);
2255     mod->value_size = htonl (value_size);
2256     mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2257     memcpy (&mod[1], name, name_size);
2258     memcpy (((char *) &mod[1]) + name_size, value, value_size);
2259   }
2260   else
2261   {
2262     struct GNUNET_MessageHeader *mod;
2263     op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
2264     op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
2265     op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2266     op->op_id = opcls->op_id;
2267
2268     mod = (struct GNUNET_MessageHeader *) &op[1];
2269     mod->size = htons (sizeof (*mod) + value_size);
2270     mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2271     memcpy (&mod[1], value, value_size);
2272   }
2273
2274   GNUNET_SERVER_notification_context_add (nc, opcls->client);
2275   GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
2276                                               GNUNET_NO);
2277   GNUNET_free (op);
2278   return GNUNET_YES;
2279 }
2280
2281
2282 /**
2283  * Received result of GNUNET_PSYCSTORE_state_get()
2284  * or GNUNET_PSYCSTORE_state_get_prefix()
2285  */
2286 static void
2287 store_recv_state_result (void *cls, int64_t result, const char *err_msg)
2288 {
2289   struct OperationClosure *opcls = cls;
2290   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2291               "%p History replay #%" PRIu64 ": "
2292               "PSYCSTORE returned %" PRId64 " (%s)\n",
2293               opcls->chn, opcls->op_id, result, err_msg);
2294
2295   client_send_result (opcls->client, opcls->op_id, result, err_msg);
2296 }
2297
2298
2299 /**
2300  * Client requests best matching state variable from PSYCstore.
2301  */
2302 static void
2303 client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2304                        const struct GNUNET_MessageHeader *msg)
2305 {
2306   struct Channel *
2307     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2308   GNUNET_assert (NULL != chn);
2309
2310   const struct StateRequest *
2311     req = (const struct StateRequest *) msg;
2312
2313   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2314   const char *name = (const char *) &req[1];
2315   if (0 == name_size || '\0' != name[name_size - 1])
2316   {
2317     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2318     return;
2319   }
2320
2321   struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2322   opcls->client = client;
2323   opcls->chn = chn;
2324   opcls->op_id = req->op_id;
2325
2326   GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2327                               &store_recv_state_var,
2328                               &store_recv_state_result, opcls);
2329   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2330 }
2331
2332
2333 /**
2334  * Client requests state variables with a given prefix from PSYCstore.
2335  */
2336 static void
2337 client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2338                               const struct GNUNET_MessageHeader *msg)
2339 {
2340   struct Channel *
2341     chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2342   GNUNET_assert (NULL != chn);
2343
2344   const struct StateRequest *
2345     req = (const struct StateRequest *) msg;
2346
2347   uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2348   const char *name = (const char *) &req[1];
2349   if (0 == name_size || '\0' != name[name_size - 1])
2350   {
2351     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2352     return;
2353   }
2354
2355   struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2356   opcls->client = client;
2357   opcls->chn = chn;
2358   opcls->op_id = req->op_id;
2359
2360   GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2361                                      &store_recv_state_var,
2362                                      &store_recv_state_result, opcls);
2363   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2364
2365 }
2366
2367
2368 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2369   { &client_recv_master_start, NULL,
2370     GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2371
2372   { &client_recv_slave_join, NULL,
2373     GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2374
2375   { &client_recv_join_decision, NULL,
2376     GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2377
2378   { &client_recv_psyc_message, NULL,
2379     GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2380
2381   { &client_recv_membership_store, NULL,
2382     GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2383
2384   { &client_recv_history_replay, NULL,
2385     GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2386
2387   { &client_recv_state_get, NULL,
2388     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2389
2390   { &client_recv_state_get_prefix, NULL,
2391     GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2392
2393   { NULL, NULL, 0, 0 }
2394 };
2395
2396
2397 /**
2398  * Initialize the PSYC service.
2399  *
2400  * @param cls Closure.
2401  * @param server The initialized server.
2402  * @param c Configuration to use.
2403  */
2404 static void
2405 run (void *cls, struct GNUNET_SERVER_Handle *server,
2406      const struct GNUNET_CONFIGURATION_Handle *c)
2407 {
2408   cfg = c;
2409   store = GNUNET_PSYCSTORE_connect (cfg);
2410   stats = GNUNET_STATISTICS_create ("psyc", cfg);
2411   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2412   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2413   channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2414   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2415   nc = GNUNET_SERVER_notification_context_create (server, 1);
2416   GNUNET_SERVER_add_handlers (server, server_handlers);
2417   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2418   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2419                                 &shutdown_task, NULL);
2420 }
2421
2422
2423 /**
2424  * The main function for the service.
2425  *
2426  * @param argc number of arguments from the command line
2427  * @param argv command line arguments
2428  * @return 0 ok, 1 on error
2429  */
2430 int
2431 main (int argc, char *const *argv)
2432 {
2433   return (GNUNET_OK ==
2434           GNUNET_SERVICE_run (argc, argv, "psyc",
2435                               GNUNET_SERVICE_OPTION_NONE,
2436                               &run, NULL)) ? 0 : 1;
2437 }
2438
2439 /* end of gnunet-service-psyc.c */