70322adaab028cdb65bb4ff48e2b463960355045
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_multicast_service.h"
35 #include "gnunet_psycstore_service.h"
36 #include "gnunet_psyc_service.h"
37 #include "psyc.h"
38
39
40 /**
41  * Handle to our current configuration.
42  */
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44
45 /**
46  * Handle to the statistics service.
47  */
48 static struct GNUNET_STATISTICS_Handle *stats;
49
50 /**
51  * Notification context, simplifies client broadcasts.
52  */
53 static struct GNUNET_SERVER_NotificationContext *nc;
54
55 /**
56  * Handle to the PSYCstore.
57  */
58 static struct GNUNET_PSYCSTORE_Handle *store;
59
60 /**
61  * All connected masters.
62  * Channel's pub_key_hash -> struct Channel
63  */
64 static struct GNUNET_CONTAINER_MultiHashMap *masters;
65
66 /**
67  * All connected slaves.
68  * Channel's pub_key_hash -> struct Channel
69  */
70 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
71
72
73 /**
74  * Message in the transmission queue.
75  */
76 struct TransmitMessage
77 {
78   struct TransmitMessage *prev;
79   struct TransmitMessage *next;
80
81   /**
82    * ID assigned to the message.
83    */
84   uint64_t id;
85
86   /**
87    * Size of @a buf
88    */
89   uint16_t size;
90
91   /**
92    * @see enum MessageState
93    */
94   uint8_t state;
95
96   /* Followed by message */
97 };
98
99
100 /**
101  * Cache for received message fragments.
102  * Message fragments are only sent to clients after all modifiers arrived.
103  *
104  * chan_key -> MultiHashMap chan_msgs
105  */
106 static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
107
108
109 /**
110  * Entry in the chan_msgs hashmap of @a recv_cache:
111  * fragment_id -> RecvCacheEntry
112  */
113 struct RecvCacheEntry
114 {
115   struct GNUNET_MULTICAST_MessageHeader *mmsg;
116   uint16_t ref_count;
117 };
118
119
120 /**
121  * Entry in the @a recv_frags hash map of a @a Channel.
122  * message_id -> FragmentQueue
123  */
124 struct FragmentQueue
125 {
126   /**
127    * Fragment IDs stored in @a recv_cache.
128    */
129   struct GNUNET_CONTAINER_Heap *fragments;
130
131   /**
132    * Total size of received fragments.
133    */
134   uint64_t size;
135
136   /**
137    * Total size of received header fragments (METHOD & MODIFIERs)
138    */
139   uint64_t header_size;
140
141   /**
142    * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
143    */
144   uint64_t state_delta;
145
146   /**
147    * The @a flags field from struct GNUNET_PSYC_MessageMethod.
148    */
149   uint32_t flags;
150
151   /**
152    * Receive state of message.
153    *
154    * @see MessageFragmentState
155    */
156   uint8_t state;
157
158   /**
159    * Is the message queued for delivery to the client?
160    * i.e. added to the recv_msgs queue
161    */
162   uint8_t queued;
163 };
164
165
166 /**
167  * Common part of the client context for both a channel master and slave.
168  */
169 struct Channel
170 {
171   struct GNUNET_SERVER_Client *client;
172
173   struct TransmitMessage *tmit_head;
174   struct TransmitMessage *tmit_tail;
175
176   /**
177    * Received fragments not yet sent to the client.
178    * message_id -> FragmentQueue
179    */
180   struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
181
182   /**
183    * Received message IDs not yet sent to the client.
184    */
185   struct GNUNET_CONTAINER_Heap *recv_msgs;
186
187   /**
188    * FIXME: needed?
189    */
190   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
191
192   /**
193    * Public key of the channel.
194    */
195   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
196
197   /**
198    * Hash of @a pub_key.
199    */
200   struct GNUNET_HashCode pub_key_hash;
201
202   /**
203    * Last message ID sent to the client.
204    * 0 if there is no such message.
205    */
206   uint64_t max_message_id;
207
208   /**
209    * ID of the last stateful message, where the state operations has been
210    * processed and saved to PSYCstore and which has been sent to the client.
211    * 0 if there is no such message.
212    */
213   uint64_t max_state_message_id;
214
215   /**
216    * Expected value size for the modifier being received from the PSYC service.
217    */
218   uint32_t tmit_mod_value_size_expected;
219
220   /**
221    * Actual value size for the modifier being received from the PSYC service.
222    */
223   uint32_t tmit_mod_value_size;
224
225   /**
226    * @see enum MessageState
227    */
228   uint8_t tmit_state;
229
230   /**
231    * FIXME: needed?
232    */
233   uint8_t in_transmit;
234
235   /**
236    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
237    */
238   uint8_t is_master;
239
240   /**
241    * Ready to receive messages from client? #GNUNET_YES or #GNUNET_NO
242    */
243   uint8_t ready;
244
245   /**
246    * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
247    */
248   uint8_t disconnected;
249 };
250
251
252 /**
253  * Client context for a channel master.
254  */
255 struct Master
256 {
257   /**
258    * Channel struct common for Master and Slave
259    */
260   struct Channel channel;
261
262   /**
263    * Private key of the channel.
264    */
265   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
266
267   /**
268    * Handle for the multicast origin.
269    */
270   struct GNUNET_MULTICAST_Origin *origin;
271
272   /**
273    * Transmit handle for multicast.
274    */
275   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
276
277   /**
278    * Last message ID transmitted to this channel.
279    *
280    * Incremented before sending a message, thus the message_id in messages sent
281    * starts from 1.
282    */
283   uint64_t max_message_id;
284
285   /**
286    * ID of the last message with state operations transmitted to the channel.
287    * 0 if there is no such message.
288    */
289   uint64_t max_state_message_id;
290
291   /**
292    * Maximum group generation transmitted to the channel.
293    */
294   uint64_t max_group_generation;
295
296   /**
297    * @see enum GNUNET_PSYC_Policy
298    */
299   uint32_t policy;
300 };
301
302
303 /**
304  * Client context for a channel slave.
305  */
306 struct Slave
307 {
308   /**
309    * Channel struct common for Master and Slave
310    */
311   struct Channel channel;
312
313   /**
314    * Private key of the slave.
315    */
316   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
317
318   /**
319    * Handle for the multicast member.
320    */
321   struct GNUNET_MULTICAST_Member *member;
322
323   /**
324    * Transmit handle for multicast.
325    */
326   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
327
328   /**
329    * Peer identity of the origin.
330    */
331   struct GNUNET_PeerIdentity origin;
332
333   /**
334    * Number of items in @a relays.
335    */
336   uint32_t relay_count;
337
338   /**
339    * Relays that multicast can use to connect.
340    */
341   struct GNUNET_PeerIdentity *relays;
342
343   /**
344    * Join request to be transmitted to the master on join.
345    */
346   struct GNUNET_MessageHeader *join_req;
347
348   /**
349    * Maximum request ID for this channel.
350    */
351   uint64_t max_request_id;
352 };
353
354
355 static inline void
356 transmit_message (struct Channel *ch);
357
358
359 /**
360  * Task run during shutdown.
361  *
362  * @param cls unused
363  * @param tc unused
364  */
365 static void
366 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 {
368   if (NULL != nc)
369   {
370     GNUNET_SERVER_notification_context_destroy (nc);
371     nc = NULL;
372   }
373   if (NULL != stats)
374   {
375     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
376     stats = NULL;
377   }
378 }
379
380
381 static void
382 client_cleanup (struct Channel *ch)
383 {
384   /* FIXME: fragment_cache_clear */
385
386   if (ch->is_master)
387   {
388     struct Master *mst = (struct Master *) ch;
389     if (NULL != mst->origin)
390       GNUNET_MULTICAST_origin_stop (mst->origin);
391     GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
392   }
393   else
394   {
395     struct Slave *slv = (struct Slave *) ch;
396     if (NULL != slv->join_req)
397       GNUNET_free (slv->join_req);
398     if (NULL != slv->relays)
399       GNUNET_free (slv->relays);
400     if (NULL != slv->member)
401       GNUNET_MULTICAST_member_part (slv->member);
402     GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
403   }
404
405   GNUNET_free (ch);
406 }
407
408
409 /**
410  * Called whenever a client is disconnected.
411  * Frees our resources associated with that client.
412  *
413  * @param cls Closure.
414  * @param client Identification of the client.
415  */
416 static void
417 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
418 {
419   if (NULL == client)
420     return;
421
422   struct Channel *ch
423     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
425
426   if (NULL == ch)
427   {
428     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
429                 "%p User context is NULL in client_disconnect()\n", ch);
430     GNUNET_break (0);
431     return;
432   }
433
434   ch->disconnected = GNUNET_YES;
435
436   /* Send pending messages to multicast before cleanup. */
437   if (NULL != ch->tmit_head)
438   {
439     transmit_message (ch);
440   }
441   else
442   {
443     client_cleanup (ch);
444   }
445 }
446
447
448 /**
449  * Master receives a join request from a slave.
450  */
451 static void
452 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
453          const struct GNUNET_MessageHeader *join_req,
454          struct GNUNET_MULTICAST_JoinHandle *jh)
455 {
456
457 }
458
459
460 static void
461 membership_test_cb (void *cls,
462                     const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
463                     uint64_t message_id, uint64_t group_generation,
464                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
465 {
466
467 }
468
469
470 static void
471 replay_fragment_cb (void *cls,
472                     const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
473                     uint64_t fragment_id, uint64_t flags,
474                     struct GNUNET_MULTICAST_ReplayHandle *rh)
475
476 {
477 }
478
479
480 static void
481 replay_message_cb (void *cls,
482                    const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
483                    uint64_t message_id,
484                    uint64_t fragment_offset,
485                    uint64_t flags,
486                    struct GNUNET_MULTICAST_ReplayHandle *rh)
487 {
488
489 }
490
491
492 static void
493 fragment_store_result (void *cls, int64_t result, const char *err_msg)
494 {
495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
496               "fragment_store() returned %l (%s)\n", result, err_msg);
497 }
498
499
500 static void
501 message_to_client (struct Channel *ch,
502                    const struct GNUNET_MULTICAST_MessageHeader *mmsg)
503 {
504   uint16_t size = ntohs (mmsg->header.size);
505   struct GNUNET_PSYC_MessageHeader *pmsg;
506   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
507
508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509               "%p Sending message to client. "
510               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
511               ch, GNUNET_ntohll (mmsg->fragment_id),
512               GNUNET_ntohll (mmsg->message_id));
513
514   pmsg = GNUNET_malloc (psize);
515   pmsg->header.size = htons (psize);
516   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
517   pmsg->message_id = mmsg->message_id;
518
519   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
520
521   GNUNET_SERVER_notification_context_add (nc, ch->client);
522   GNUNET_SERVER_notification_context_unicast (nc, ch->client,
523                                               (const struct GNUNET_MessageHeader *) pmsg,
524                                               GNUNET_NO);
525   GNUNET_free (pmsg);
526 }
527
528
529 /**
530  * Convert an uint64_t in network byte order to a HashCode
531  * that can be used as key in a MultiHashMap
532  */
533 static inline void
534 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
535 {
536   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
537   /* TODO: use built-in byte swap functions if available */
538
539   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 0x00FF00FF00FF00FFULL);
540   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
541
542   *key = (struct GNUNET_HashCode) {{ 0 }};
543   *((uint64_t *) key)
544     = (n << 32) | (n >> 32);
545 }
546
547
548 /**
549  * Convert an uint64_t in host byte order to a HashCode
550  * that can be used as key in a MultiHashMap
551  */
552 static inline void
553 hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
554 {
555 #if __BYTE_ORDER == __BIG_ENDIAN
556   hash_key_from_nll (key, n);
557 #elif __BYTE_ORDER == __LITTLE_ENDIAN
558   *key = (struct GNUNET_HashCode) {{ 0 }};
559   *((uint64_t *) key) = n;
560 #else
561   #error byteorder undefined
562 #endif
563 }
564
565
566 /**
567  * Insert a multicast message fragment into the queue belonging to the message.
568  *
569  * @param ch           Channel.
570  * @param mmsg         Multicast message fragment.
571  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
572  * @param first_ptype  First PSYC message part type in @a mmsg.
573  * @param last_ptype   Last PSYC message part type in @a mmsg.
574  */
575 static void
576 fragment_queue_insert (struct Channel *ch,
577                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
578                        uint16_t first_ptype, uint16_t last_ptype)
579 {
580   const uint16_t size = ntohs (mmsg->header.size);
581   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
582   struct GNUNET_CONTAINER_MultiHashMap
583     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
584                                                     &ch->pub_key_hash);
585
586   struct GNUNET_HashCode msg_id_hash;
587   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
588
589   struct FragmentQueue
590     *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
591
592   if (NULL == fragq)
593   {
594     fragq = GNUNET_new (struct FragmentQueue);
595     fragq->state = MSG_FRAG_STATE_HEADER;
596     fragq->fragments
597       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
598
599     GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
600                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
601
602     if (NULL == chan_msgs)
603     {
604       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
605       GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs,
606                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
607     }
608   }
609
610   struct GNUNET_HashCode frag_id_hash;
611   hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
612   struct RecvCacheEntry
613     *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
614   if (NULL == cache_entry)
615   {
616     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617                 "%p Adding message fragment to cache. "
618                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
619                 "header_size: %" PRIu64 " + %u).\n",
620                 ch, GNUNET_ntohll (mmsg->message_id),
621                 GNUNET_ntohll (mmsg->fragment_id),
622                 fragq->header_size, size);
623     cache_entry = GNUNET_new (struct RecvCacheEntry);
624     cache_entry->ref_count = 1;
625     cache_entry->mmsg = GNUNET_malloc (size);
626     memcpy (cache_entry->mmsg, mmsg, size);
627     GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
628                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
629   }
630   else
631   {
632     cache_entry->ref_count++;
633     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634                 "%p Message fragment is already in cache. "
635                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
636                 ", ref_count: %u\n",
637                 ch, GNUNET_ntohll (mmsg->message_id),
638                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
639   }
640
641   if (MSG_FRAG_STATE_HEADER == fragq->state)
642   {
643     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
644     {
645       struct GNUNET_PSYC_MessageMethod *
646         pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
647       fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
648       fragq->flags = ntohl (pmeth->flags);
649     }
650
651     if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
652     {
653       fragq->header_size += size;
654     }
655     else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
656              || frag_offset == fragq->header_size)
657     { /* header is now complete */
658       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
659                   "%p Header of message %" PRIu64 " is complete.\n",
660                   ch, GNUNET_ntohll (mmsg->message_id));
661
662       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
663                   "%p Adding message %" PRIu64 " to queue.\n",
664                   ch, GNUNET_ntohll (mmsg->message_id));
665       fragq->state = MSG_FRAG_STATE_DATA;
666     }
667     else
668     {
669       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
670                   "%p Header of message %" PRIu64 " is NOT complete yet: "
671                   "%" PRIu64 " != %" PRIu64 "\n",
672                   ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
673                   fragq->header_size);
674     }
675   }
676
677   switch (last_ptype)
678   {
679   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
680     if (frag_offset == fragq->size)
681       fragq->state = MSG_FRAG_STATE_END;
682     else
683       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
684                   "%p Message %" PRIu64 " is NOT complete yet: "
685                   "%" PRIu64 " != %" PRIu64 "\n",
686                   ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
687                   fragq->size);
688     break;
689
690   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
691     /* Drop message without delivering to client if it's a single fragment */
692     fragq->state =
693       (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
694       ? MSG_FRAG_STATE_DROP
695       : MSG_FRAG_STATE_CANCEL;
696   }
697
698   switch (fragq->state)
699   {
700   case MSG_FRAG_STATE_DATA:
701   case MSG_FRAG_STATE_END:
702   case MSG_FRAG_STATE_CANCEL:
703     if (GNUNET_NO == fragq->queued)
704     {
705       GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
706                                     GNUNET_ntohll (mmsg->message_id));
707       fragq->queued = GNUNET_YES;
708     }
709   }
710
711   fragq->size += size;
712   GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
713                                 GNUNET_ntohll (mmsg->fragment_id));
714 }
715
716
717 /**
718  * Run fragment queue of a message.
719  *
720  * Send fragments of a message in order to client, after all modifiers arrived
721  * from multicast.
722  *
723  * @param ch      Channel.
724  * @param msg_id  ID of the message @a fragq belongs to.
725  * @param fragq   Fragment queue of the message.
726  * @param drop    Drop message without delivering to client?
727  *                #GNUNET_YES or #GNUNET_NO.
728  */
729 static void
730 fragment_queue_run (struct Channel *ch, uint64_t msg_id,
731                     struct FragmentQueue *fragq, uint8_t drop)
732 {
733   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
734               "%p Running message fragment queue for message %" PRIu64
735               " (state: %u).\n",
736               ch, msg_id, fragq->state);
737
738   struct GNUNET_CONTAINER_MultiHashMap
739     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
740                                                     &ch->pub_key_hash);
741   GNUNET_assert (NULL != chan_msgs);
742   uint64_t frag_id;
743
744   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
745                                                     &frag_id))
746   {
747     struct GNUNET_HashCode frag_id_hash;
748     hash_key_from_hll (&frag_id_hash, frag_id);
749     struct RecvCacheEntry *cache_entry
750       = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
751     if (cache_entry != NULL)
752     {
753       if (GNUNET_NO == drop)
754       {
755         message_to_client (ch, cache_entry->mmsg);
756       }
757       if (cache_entry->ref_count <= 1)
758       {
759         GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
760                                               cache_entry);
761         GNUNET_free (cache_entry->mmsg);
762         GNUNET_free (cache_entry);
763       }
764       else
765       {
766         cache_entry->ref_count--;
767       }
768     }
769 #if CACHE_AGING_IMPLEMENTED
770     else if (GNUNET_NO == drop)
771     {
772       /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
773     }
774 #endif
775
776     GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
777   }
778
779   if (MSG_FRAG_STATE_END <= fragq->state)
780   {
781     struct GNUNET_HashCode msg_id_hash;
782     hash_key_from_nll (&msg_id_hash, msg_id);
783
784     GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
785     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
786     GNUNET_free (fragq);
787   }
788   else
789   {
790     fragq->queued = GNUNET_NO;
791   }
792 }
793
794
795 /**
796  * Run message queue.
797  *
798  * Send messages in queue to client in order after a message has arrived from
799  * multicast, according to the following:
800  * - A message is only sent if all of its modifiers arrived.
801  * - A stateful message is only sent if the previous stateful message
802  *   has already been delivered to the client.
803  *
804  * @param ch  Channel.
805  * @return Number of messages removed from queue and sent to client.
806  */
807 static uint64_t
808 message_queue_run (struct Channel *ch)
809 {
810   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
811               "%p Running message queue.\n", ch);
812   uint64_t n = 0;
813   uint64_t msg_id;
814   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
815                                                     &msg_id))
816   {
817     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
818                 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
819     struct GNUNET_HashCode msg_id_hash;
820     hash_key_from_hll (&msg_id_hash, msg_id);
821
822     struct FragmentQueue *
823       fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
824
825     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
826     {
827       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
828                   "%p No fragq (%p) or header not complete.\n",
829                   ch, fragq);
830       break;
831     }
832
833     if (MSG_FRAG_STATE_HEADER == fragq->state)
834     {
835       /* Check if there's a missing message before the current one */
836       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
837       {
838         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
839             && msg_id - 1 != ch->max_message_id)
840         {
841           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
842                       "%p Out of order message. "
843                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
844                       ch, msg_id, ch->max_message_id);
845           break;
846         }
847       }
848       else
849       {
850         if (msg_id - fragq->state_delta != ch->max_state_message_id)
851         {
852           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
853                       "%p Out of order stateful message. "
854                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
855                       ch, msg_id, fragq->state_delta, ch->max_state_message_id);
856           break;
857         }
858 #if TODO
859         /* FIXME: apply modifiers to state in PSYCstore */
860         GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
861                                        state_modify_result_cb, cls);
862 #endif
863         ch->max_state_message_id = msg_id;
864       }
865       ch->max_message_id = msg_id;
866     }
867     fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
868     GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
869     n++;
870   }
871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872               "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
873   return n;
874 }
875
876
877 /**
878  * Handle incoming message from multicast.
879  *
880  * @param ch   Channel.
881  * @param mmsg Multicast message.
882  *
883  * @return #GNUNET_OK or #GNUNET_SYSERR
884  */
885 static int
886 handle_multicast_message (struct Channel *ch,
887                           const struct GNUNET_MULTICAST_MessageHeader *mmsg)
888 {
889   GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
890
891   uint16_t size = ntohs (mmsg->header.size);
892   uint16_t first_ptype = 0, last_ptype = 0;
893
894   if (GNUNET_SYSERR
895       == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
896                                           (const char *) &mmsg[1],
897                                           &first_ptype, &last_ptype))
898   {
899     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
900                 "%p Received message with invalid parts from multicast. "
901                 "Dropping message.\n", ch);
902     GNUNET_break_op (0);
903     return GNUNET_SYSERR;
904   }
905
906   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907               "Message parts: first: type %u, last: type %u\n",
908               first_ptype, last_ptype);
909
910   fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
911   message_queue_run (ch);
912
913   return GNUNET_OK;
914 }
915
916
917 /**
918  * Incoming message fragment from multicast.
919  *
920  * Store it using PSYCstore and send it to the client of the channel.
921  */
922 static void
923 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
924 {
925   struct Channel *ch = cls;
926   uint16_t type = ntohs (msg->type);
927   uint16_t size = ntohs (msg->size);
928
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930               "%p Received message of type %u and size %u from multicast.\n",
931               ch, type, size);
932
933   switch (type)
934   {
935   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
936   {
937     handle_multicast_message (ch, (const struct
938                                    GNUNET_MULTICAST_MessageHeader *) msg);
939     break;
940   }
941   default:
942     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
943                 "%p Dropping unknown message of type %u and size %u.\n",
944                 ch, type, size);
945   }
946 }
947
948
949 /**
950  * Incoming request fragment from multicast for a master.
951  *
952  * @param cls           Master.
953  * @param slave_key     Sending slave's public key.
954  * @param msg           The message.
955  * @param flags         Request flags.
956  */
957 static void
958 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
959             const struct GNUNET_MessageHeader *msg,
960             enum GNUNET_MULTICAST_MessageFlags flags)
961 {
962   struct Master *mst = cls;
963   struct Channel *ch = &mst->channel;
964
965   uint16_t type = ntohs (msg->type);
966   uint16_t size = ntohs (msg->size);
967
968   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969               "%p Received request of type %u and size %u from multicast.\n",
970               ch, type, size);
971
972   switch (type)
973   {
974   case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
975   {
976     const struct GNUNET_MULTICAST_RequestHeader *req
977       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
978
979     /* FIXME: see message_cb() */
980     if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
981                                                           (const char *) &req[1],
982                                                           NULL, NULL))
983     {
984       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
985                   "%p Dropping request with invalid parts "
986                   "received from multicast.\n", ch);
987       GNUNET_break_op (0);
988       break;
989     }
990
991     struct GNUNET_PSYC_MessageHeader *pmsg;
992     uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
993     pmsg = GNUNET_malloc (psize);
994     pmsg->header.size = htons (psize);
995     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
996     pmsg->message_id = req->request_id;
997     pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
998
999     memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1000
1001     GNUNET_SERVER_notification_context_add (nc, ch->client);
1002     GNUNET_SERVER_notification_context_unicast (nc, ch->client,
1003                                                 (const struct GNUNET_MessageHeader *) pmsg,
1004                                                 GNUNET_NO);
1005     GNUNET_free (pmsg);
1006     break;
1007   }
1008   default:
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "%p Dropping unknown request of type %u and size %u.\n",
1011                 ch, type, size);
1012     GNUNET_break_op (0);
1013   }
1014 }
1015
1016
1017 /**
1018  * Response from PSYCstore with the current counter values for a channel master.
1019  */
1020 static void
1021 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1022                     uint64_t max_message_id, uint64_t max_group_generation,
1023                     uint64_t max_state_message_id)
1024 {
1025   struct Master *mst = cls;
1026   struct Channel *ch = &mst->channel;
1027
1028   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
1029   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1030   res->header.size = htons (sizeof (*res));
1031   res->result_code = htonl (result);
1032   res->max_message_id = GNUNET_htonll (max_message_id);
1033
1034   if (GNUNET_OK == result || GNUNET_NO == result)
1035   {
1036     mst->max_message_id = max_message_id;
1037     ch->max_message_id = max_message_id;
1038     ch->max_state_message_id = max_state_message_id;
1039     mst->max_group_generation = max_group_generation;
1040     mst->origin
1041       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
1042                                        max_fragment_id,
1043                                        join_cb, membership_test_cb,
1044                                        replay_fragment_cb, replay_message_cb,
1045                                        request_cb, message_cb, ch);
1046     ch->ready = GNUNET_YES;
1047   }
1048   else
1049   {
1050     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1051                 "%p GNUNET_PSYCSTORE_counters_get() "
1052                 "returned %d for channel %s.\n",
1053                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1054   }
1055
1056   GNUNET_SERVER_notification_context_add (nc, ch->client);
1057   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1058                                               GNUNET_NO);
1059   GNUNET_free (res);
1060 }
1061
1062
1063 /**
1064  * Response from PSYCstore with the current counter values for a channel slave.
1065  */
1066 void
1067 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1068                    uint64_t max_message_id, uint64_t max_group_generation,
1069                    uint64_t max_state_message_id)
1070 {
1071   struct Slave *slv = cls;
1072   struct Channel *ch = &slv->channel;
1073
1074   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
1075   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1076   res->header.size = htons (sizeof (*res));
1077   res->result_code = htonl (result);
1078   res->max_message_id = GNUNET_htonll (max_message_id);
1079
1080   if (GNUNET_OK == result || GNUNET_NO == result)
1081   {
1082     ch->max_message_id = max_message_id;
1083     ch->max_state_message_id = max_state_message_id;
1084     slv->member
1085       = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1086                                       &slv->origin,
1087                                       slv->relay_count, slv->relays,
1088                                       slv->join_req, join_cb,
1089                                       membership_test_cb,
1090                                       replay_fragment_cb, replay_message_cb,
1091                                       message_cb, ch);
1092     ch->ready = GNUNET_YES;
1093   }
1094   else
1095   {
1096     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1097                 "%p GNUNET_PSYCSTORE_counters_get() "
1098                 "returned %d for channel %s.\n",
1099                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1100   }
1101
1102   GNUNET_SERVER_notification_context_add (nc, ch->client);
1103   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1104                                               GNUNET_NO);
1105   GNUNET_free (res);
1106 }
1107
1108
1109 static void
1110 channel_init (struct Channel *ch)
1111 {
1112   ch->recv_msgs
1113     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1114   ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1115 }
1116
1117
1118 /**
1119  * Handle a connecting client starting a channel master.
1120  */
1121 static void
1122 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1123                      const struct GNUNET_MessageHeader *msg)
1124 {
1125   const struct MasterStartRequest *req
1126     = (const struct MasterStartRequest *) msg;
1127
1128   struct Master *mst = GNUNET_new (struct Master);
1129   mst->policy = ntohl (req->policy);
1130   mst->priv_key = req->channel_key;
1131
1132   struct Channel *ch = &mst->channel;
1133   ch->client = client;
1134   ch->is_master = GNUNET_YES;
1135   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
1136   GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
1137   channel_init (ch);
1138
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "%p Master connected to channel %s.\n",
1141               mst, GNUNET_h2s (&ch->pub_key_hash));
1142
1143   GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
1144
1145   GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1146                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1147   GNUNET_SERVER_client_set_user_context (client, ch);
1148   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1149 }
1150
1151
1152 /**
1153  * Handle a connecting client joining as a channel slave.
1154  */
1155 static void
1156 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1157                    const struct GNUNET_MessageHeader *msg)
1158 {
1159   const struct SlaveJoinRequest *req
1160     = (const struct SlaveJoinRequest *) msg;
1161   struct Slave *slv = GNUNET_new (struct Slave);
1162   slv->priv_key = req->slave_key;
1163   slv->origin = req->origin;
1164   slv->relay_count = ntohl (req->relay_count);
1165   if (0 < slv->relay_count)
1166   {
1167     const struct GNUNET_PeerIdentity *relays
1168       = (const struct GNUNET_PeerIdentity *) &req[1];
1169     slv->relays
1170       = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1171     uint32_t i;
1172     for (i = 0; i < slv->relay_count; i++)
1173       memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1174   }
1175
1176   struct Channel *ch = &slv->channel;
1177   ch->client = client;
1178   ch->is_master = GNUNET_NO;
1179   ch->pub_key = req->channel_key;
1180   GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
1181                       &ch->pub_key_hash);
1182   channel_init (ch);
1183
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185               "%p Slave connected to channel %s.\n",
1186               slv, GNUNET_h2s (&ch->pub_key_hash));
1187
1188   GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
1189
1190   GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1191                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1192   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
1193   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1194 }
1195
1196
1197 /**
1198  * Send acknowledgement to a client.
1199  *
1200  * Sent after a message fragment has been passed on to multicast.
1201  *
1202  * @param ch The channel struct for the client.
1203  */
1204 static void
1205 send_message_ack (struct Channel *ch)
1206 {
1207   struct GNUNET_MessageHeader res;
1208   res.size = htons (sizeof (res));
1209   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1210
1211   GNUNET_SERVER_notification_context_add (nc, ch->client);
1212   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO);
1213 }
1214
1215
1216 /**
1217  * Callback for the transmit functions of multicast.
1218  */
1219 static int
1220 transmit_notify (void *cls, size_t *data_size, void *data)
1221 {
1222   struct Channel *ch = cls;
1223   struct TransmitMessage *tmit_msg = ch->tmit_head;
1224
1225   if (NULL == tmit_msg || *data_size < tmit_msg->size)
1226   {
1227     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228                 "%p transmit_notify: nothing to send.\n", ch);
1229     *data_size = 0;
1230     return GNUNET_NO;
1231   }
1232
1233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234               "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
1235
1236   *data_size = tmit_msg->size;
1237   memcpy (data, &tmit_msg[1], *data_size);
1238
1239   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1240   GNUNET_free (tmit_msg);
1241
1242   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1243   send_message_ack (ch);
1244
1245   if (0 == ch->tmit_task)
1246   {
1247     if (NULL != ch->tmit_head)
1248     {
1249       transmit_message (ch);
1250     }
1251     else if (ch->disconnected)
1252     {
1253       /* FIXME: handle partial message (when still in_transmit) */
1254       client_cleanup (ch);
1255     }
1256   }
1257
1258   return ret;
1259 }
1260
1261
1262 /**
1263  * Callback for the transmit functions of multicast.
1264  */
1265 static int
1266 master_transmit_notify (void *cls, size_t *data_size, void *data)
1267 {
1268   int ret = transmit_notify (cls, data_size, data);
1269
1270   if (GNUNET_YES == ret)
1271   {
1272     struct Master *mst = cls;
1273     mst->tmit_handle = NULL;
1274   }
1275   return ret;
1276 }
1277
1278
1279 /**
1280  * Callback for the transmit functions of multicast.
1281  */
1282 static int
1283 slave_transmit_notify (void *cls, size_t *data_size, void *data)
1284 {
1285   int ret = transmit_notify (cls, data_size, data);
1286
1287   if (GNUNET_YES == ret)
1288   {
1289     struct Slave *slv = cls;
1290     slv->tmit_handle = NULL;
1291   }
1292   return ret;
1293 }
1294
1295
1296 /**
1297  * Transmit a message from a channel master to the multicast group.
1298  */
1299 static void
1300 master_transmit_message (struct Master *mst)
1301 {
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1303   mst->channel.tmit_task = 0;
1304   if (NULL == mst->tmit_handle)
1305   {
1306     mst->tmit_handle
1307       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1308                                         mst->max_group_generation,
1309                                         master_transmit_notify, mst);
1310   }
1311   else
1312   {
1313     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
1314   }
1315 }
1316
1317
1318 /**
1319  * Transmit a message from a channel slave to the multicast group.
1320  */
1321 static void
1322 slave_transmit_message (struct Slave *slv)
1323 {
1324   slv->channel.tmit_task = 0;
1325   if (NULL == slv->tmit_handle)
1326   {
1327     slv->tmit_handle
1328       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1329                                            slave_transmit_notify, slv);
1330   }
1331   else
1332   {
1333     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
1334   }
1335 }
1336
1337
1338 static inline void
1339 transmit_message (struct Channel *ch)
1340 {
1341   ch->is_master
1342     ? master_transmit_message ((struct Master *) ch)
1343     : slave_transmit_message ((struct Slave *) ch);
1344 }
1345
1346
1347 /**
1348  * Queue a message from a channel master for sending to the multicast group.
1349  */
1350 static void
1351 master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1352                      uint16_t first_ptype, uint16_t last_ptype)
1353 {
1354   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1355
1356   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1357   {
1358     tmit_msg->id = ++mst->max_message_id;
1359     struct GNUNET_PSYC_MessageMethod *pmeth
1360       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1361
1362     if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1363     {
1364       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1365     }
1366     else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1367     {
1368       pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1369                                           - mst->max_state_message_id);
1370     }
1371     else
1372     {
1373       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1374     }
1375   }
1376 }
1377
1378
1379 /**
1380  * Queue a message from a channel slave for sending to the multicast group.
1381  */
1382 static void
1383 slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1384                      uint16_t first_ptype, uint16_t last_ptype)
1385 {
1386   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1387   {
1388     struct GNUNET_PSYC_MessageMethod *pmeth
1389       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1390     pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1391     tmit_msg->id = ++slv->max_request_id;
1392   }
1393 }
1394
1395
1396 static void
1397 queue_message (struct Channel *ch,  const struct GNUNET_MessageHeader *msg,
1398                uint16_t first_ptype, uint16_t last_ptype)
1399 {
1400   uint16_t size = ntohs (msg->size) - sizeof (*msg);
1401   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1402   memcpy (&tmit_msg[1], &msg[1], size);
1403   tmit_msg->size = size;
1404   tmit_msg->state = ch->tmit_state;
1405
1406   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1407
1408   ch->is_master
1409     ? master_queue_message ((struct Master *) ch, tmit_msg,
1410                             first_ptype, last_ptype)
1411     : slave_queue_message ((struct Slave *) ch, tmit_msg,
1412                            first_ptype, last_ptype);
1413 }
1414
1415
1416 static void
1417 transmit_error (struct Channel *ch)
1418 {
1419   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1420
1421   struct GNUNET_MessageHeader msg;
1422   msg.size = ntohs (sizeof (msg));
1423   msg.type = ntohs (type);
1424
1425   queue_message (ch, &msg, type, type);
1426   transmit_message (ch);
1427
1428   /* FIXME: cleanup */
1429 }
1430
1431
1432 /**
1433  * Incoming message from a client.
1434  */
1435 static void
1436 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1437                      const struct GNUNET_MessageHeader *msg)
1438 {
1439   struct Channel *ch
1440     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1441   GNUNET_assert (NULL != ch);
1442
1443   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444               "%p Received message from client.\n", ch);
1445   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1446
1447   if (GNUNET_YES != ch->ready)
1448   {
1449     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1450                 "%p Dropping message from client, channel is not ready yet.\n",
1451                 ch);
1452     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1453     return;
1454   }
1455
1456   uint16_t size = ntohs (msg->size);
1457   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1458   {
1459     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1460     GNUNET_break (0);
1461     transmit_error (ch);
1462     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1463     return;
1464   }
1465
1466   uint16_t first_ptype = 0, last_ptype = 0;
1467   if (GNUNET_SYSERR
1468       == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1469                                           (const char *) &msg[1],
1470                                           &first_ptype, &last_ptype))
1471   {
1472     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1473                 "%p Received invalid message part from client.\n", ch);
1474     GNUNET_break (0);
1475     transmit_error (ch);
1476     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1477     return;
1478   }
1479
1480   queue_message (ch, msg, first_ptype, last_ptype);
1481   transmit_message (ch);
1482
1483   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1484 };
1485
1486
1487 /**
1488  * Client requests to add a slave to the membership database.
1489  */
1490 static void
1491 handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1492                   const struct GNUNET_MessageHeader *msg)
1493 {
1494
1495 }
1496
1497
1498 /**
1499  * Client requests to remove a slave from the membership database.
1500  */
1501 static void
1502 handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1503                      const struct GNUNET_MessageHeader *msg)
1504 {
1505
1506 }
1507
1508
1509 /**
1510  * Client requests channel history from PSYCstore.
1511  */
1512 static void
1513 handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1514                       const struct GNUNET_MessageHeader *msg)
1515 {
1516
1517 }
1518
1519
1520 /**
1521  * Client requests best matching state variable from PSYCstore.
1522  */
1523 static void
1524 handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1525                   const struct GNUNET_MessageHeader *msg)
1526 {
1527
1528 }
1529
1530
1531 /**
1532  * Client requests state variables with a given prefix from PSYCstore.
1533  */
1534 static void
1535 handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1536                          const struct GNUNET_MessageHeader *msg)
1537 {
1538
1539 }
1540
1541
1542 /**
1543  * Initialize the PSYC service.
1544  *
1545  * @param cls Closure.
1546  * @param server The initialized server.
1547  * @param c Configuration to use.
1548  */
1549 static void
1550 run (void *cls, struct GNUNET_SERVER_Handle *server,
1551      const struct GNUNET_CONFIGURATION_Handle *c)
1552 {
1553   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1554     { &handle_master_start, NULL,
1555       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
1556
1557     { &handle_slave_join, NULL,
1558       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
1559
1560     { &handle_psyc_message, NULL,
1561       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1562
1563     { &handle_slave_add, NULL,
1564       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
1565
1566     { &handle_slave_remove, NULL,
1567       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
1568
1569     { &handle_story_request, NULL,
1570       GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
1571
1572     { &handle_state_get, NULL,
1573       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1574
1575     { &handle_state_get_prefix, NULL,
1576       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
1577   };
1578
1579   cfg = c;
1580   store = GNUNET_PSYCSTORE_connect (cfg);
1581   stats = GNUNET_STATISTICS_create ("psyc", cfg);
1582   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1583   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1584   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1585   nc = GNUNET_SERVER_notification_context_create (server, 1);
1586   GNUNET_SERVER_add_handlers (server, handlers);
1587   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
1588   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1589                                 &shutdown_task, NULL);
1590 }
1591
1592
1593 /**
1594  * The main function for the service.
1595  *
1596  * @param argc number of arguments from the command line
1597  * @param argv command line arguments
1598  * @return 0 ok, 1 on error
1599  */
1600 int
1601 main (int argc, char *const *argv)
1602 {
1603   return (GNUNET_OK ==
1604           GNUNET_SERVICE_run (argc, argv, "psyc",
1605                               GNUNET_SERVICE_OPTION_NONE,
1606                               &run, NULL)) ? 0 : 1;
1607 }
1608
1609 /* end of gnunet-service-psycstore.c */