dcb5031f175385b6f2ab89716e118f0548f9018c
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "psyc.h"
36
37
38 /**
39  * Handle to our current configuration.
40  */
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43 /**
44  * Handle to the statistics service.
45  */
46 static struct GNUNET_STATISTICS_Handle *stats;
47
48 /**
49  * Notification context, simplifies client broadcasts.
50  */
51 static struct GNUNET_SERVER_NotificationContext *nc;
52
53 /**
54  * Handle to the PSYCstore.
55  */
56 static struct GNUNET_PSYCSTORE_Handle *store;
57
58 /**
59  * All connected masters and slaves.
60  * Channel's pub_key_hash -> struct Channel
61  */
62 static struct GNUNET_CONTAINER_MultiHashMap *clients;
63
64 /**
65  * Message in the transmission queue.
66  */
67 struct TransmitMessage
68 {
69   struct TransmitMessage *prev;
70   struct TransmitMessage *next;
71
72   char *buf;
73   uint16_t size;
74   /**
75    * enum MessageState
76    */
77   uint8_t state;
78 };
79
80 /**
81  * Common part of the client context for both a master and slave channel.
82  */
83 struct Channel
84 {
85   struct GNUNET_SERVER_Client *client;
86
87   struct TransmitMessage *tmit_head;
88   struct TransmitMessage *tmit_tail;
89
90   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
91
92   /**
93    * Expected value size for the modifier being received from the PSYC service.
94    */
95   uint32_t tmit_mod_value_size_expected;
96
97   /**
98    * Actual value size for the modifier being received from the PSYC service.
99    */
100   uint32_t tmit_mod_value_size;
101
102   /**
103    * enum MessageState
104    */
105   uint8_t tmit_state;
106
107   uint8_t in_transmit;
108   uint8_t is_master;
109
110   /**
111    * Ready to receive messages from client.
112    */
113   uint8_t ready;
114
115   /**
116    * Client disconnected.
117    */
118   uint8_t disconnected;
119 };
120
121 /**
122  * Client context for a channel master.
123  */
124 struct Master
125 {
126   struct Channel channel;
127   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
128   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
129
130   struct GNUNET_MULTICAST_Origin *origin;
131   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
132
133   /**
134    * Maximum message ID for this channel.
135    *
136    * Incremented before sending a message, thus the message_id in messages sent
137    * starts from 1.
138    */
139   uint64_t max_message_id;
140
141   /**
142    * ID of the last message that contains any state operations.
143    * 0 if there is no such message.
144    */
145   uint64_t max_state_message_id;
146
147   /**
148    * Maximum group generation for this channel.
149    */
150   uint64_t max_group_generation;
151
152   /**
153    * @see enum GNUNET_PSYC_Policy
154    */
155   uint32_t policy;
156
157   struct GNUNET_HashCode pub_key_hash;
158 };
159
160
161 /**
162  * Client context for a channel slave.
163  */
164 struct Slave
165 {
166   struct Channel channel;
167   struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
168   struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
169
170   struct GNUNET_MULTICAST_Member *member;
171   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
172
173   struct GNUNET_PeerIdentity origin;
174
175   uint32_t relay_count;
176   struct GNUNET_PeerIdentity *relays;
177
178   struct GNUNET_MessageHeader *join_req;
179
180   uint64_t max_message_id;
181   uint64_t max_request_id;
182
183   struct GNUNET_HashCode chan_key_hash;
184 };
185
186
187 static inline void
188 transmit_message (struct Channel *ch, uint8_t inc_msg_id);
189
190
191 /**
192  * Task run during shutdown.
193  *
194  * @param cls unused
195  * @param tc unused
196  */
197 static void
198 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
199 {
200   if (NULL != nc)
201   {
202     GNUNET_SERVER_notification_context_destroy (nc);
203     nc = NULL;
204   }
205   if (NULL != stats)
206   {
207     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
208     stats = NULL;
209   }
210 }
211
212
213 static void
214 client_cleanup (struct Channel *ch)
215 {
216   if (ch->is_master)
217   {
218     struct Master *mst = (struct Master *) ch;
219     if (NULL != mst->origin)
220       GNUNET_MULTICAST_origin_stop (mst->origin);
221     GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
222   }
223   else
224   {
225     struct Slave *slv = (struct Slave *) ch;
226     if (NULL != slv->join_req)
227       GNUNET_free (slv->join_req);
228     if (NULL != slv->relays)
229       GNUNET_free (slv->relays);
230     if (NULL != slv->member)
231       GNUNET_MULTICAST_member_part (slv->member);
232   }
233
234   GNUNET_free (ch);
235 }
236
237
238 /**
239  * Called whenever a client is disconnected.
240  * Frees our resources associated with that client.
241  *
242  * @param cls Closure.
243  * @param client Identification of the client.
244  */
245 static void
246 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
247 {
248   if (NULL == client)
249     return;
250
251   struct Channel *ch
252     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
254
255   if (NULL == ch)
256   {
257     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
258                 "%p User context is NULL in client_disconnect()\n", ch);
259     GNUNET_break (0);
260     return;
261   }
262
263   ch->disconnected = GNUNET_YES;
264
265   /* Send pending messages to multicast before cleanup. */
266   if (NULL != ch->tmit_head)
267   {
268     transmit_message (ch, GNUNET_NO);
269   }
270   else
271   {
272     client_cleanup (ch);
273   }
274 }
275
276
277 static void
278 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
279          const struct GNUNET_MessageHeader *join_req,
280          struct GNUNET_MULTICAST_JoinHandle *jh)
281 {
282
283 }
284
285
286 static void
287 membership_test_cb (void *cls,
288                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
289                     uint64_t message_id, uint64_t group_generation,
290                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
291 {
292
293 }
294
295
296 static void
297 replay_fragment_cb (void *cls,
298                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
299                     uint64_t fragment_id, uint64_t flags,
300                     struct GNUNET_MULTICAST_ReplayHandle *rh)
301
302 {
303 }
304
305
306 static void
307 replay_message_cb (void *cls,
308                    const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
309                    uint64_t message_id,
310                    uint64_t fragment_offset,
311                    uint64_t flags,
312                    struct GNUNET_MULTICAST_ReplayHandle *rh)
313 {
314
315 }
316
317
318 static void
319 fragment_store_result (void *cls, int64_t result, const char *err_msg)
320 {
321   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
322               "fragment_store() returned %l (%s)\n", result, err_msg);
323 }
324
325
326 /**
327  * Incoming message fragment from multicast.
328  *
329  * Store it using PSYCstore and send it to the client of the channel.
330  */
331 static void
332 message_cb (struct Channel *ch,
333             const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key,
334             const struct GNUNET_HashCode *chan_key_hash,
335             const struct GNUNET_MessageHeader *msg)
336 {
337   uint16_t type = ntohs (msg->type);
338   uint16_t size = ntohs (msg->size);
339
340   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341               "%p Received message of type %u and size %u from multicast.\n",
342               ch, type, size);
343
344   switch (type)
345   {
346   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
347   {
348     GNUNET_PSYCSTORE_fragment_store (store, chan_key,
349                                      (const struct
350                                       GNUNET_MULTICAST_MessageHeader *) msg,
351                                      0, NULL, NULL);
352
353 #if TODO
354     /* FIXME: apply modifiers to state in PSYCstore */
355     GNUNET_PSYCSTORE_state_modify (store, chan_key,
356                                    GNUNET_ntohll (mmsg->message_id),
357                                    meth->mod_count, mods,
358                                    rcb, rcb_cls);
359 #endif
360
361     const struct GNUNET_MULTICAST_MessageHeader *mmsg
362       = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
363
364     if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
365                                                        (const char *) &mmsg[1]))
366     {
367       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
368                   "%p Received message with invalid parts from multicast. "
369                   "Dropping message.\n", ch);
370       GNUNET_break_op (0);
371       break;
372     }
373
374     struct GNUNET_PSYC_MessageHeader *pmsg;
375     uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
376     pmsg = GNUNET_malloc (psize);
377     pmsg->header.size = htons (psize);
378     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
379     pmsg->message_id = mmsg->message_id;
380
381     memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
382
383     GNUNET_SERVER_notification_context_add (nc, ch->client);
384     GNUNET_SERVER_notification_context_unicast (nc, ch->client,
385                                                 (const struct GNUNET_MessageHeader *) pmsg,
386                                                 GNUNET_NO);
387     GNUNET_free (pmsg);
388     break;
389   }
390   default:
391     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
392                 "%p Dropping unknown message of type %u and size %u.\n",
393                 ch, type, size);
394   }
395 }
396
397
398 /**
399  * Incoming message fragment from multicast for a master.
400  */
401 static void
402 master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
403 {
404   struct Master *mst = cls;
405   GNUNET_assert (NULL != mst);
406
407   struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key;
408   struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash;
409
410   message_cb (&mst->channel, chan_key, chan_key_hash, msg);
411 }
412
413
414 /**
415  * Incoming message fragment from multicast for a slave.
416  */
417 static void
418 slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
419 {
420   struct Slave *slv = cls;
421   GNUNET_assert (NULL != slv);
422
423   struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key;
424   struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash;
425
426   message_cb (&slv->channel, chan_key, chan_key_hash, msg);
427 }
428
429
430 /**
431  * Incoming request fragment from multicast for a master.
432  *
433  * @param cls           Master.
434  * @param member_key    Sending member's public key.
435  * @param msg           The message.
436  * @param flags         Request flags.
437  */
438 static void
439 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
440             const struct GNUNET_MessageHeader *msg,
441             enum GNUNET_MULTICAST_MessageFlags flags)
442 {
443   struct Master *mst = cls;
444   struct Channel *ch = &mst->channel;
445
446   uint16_t type = ntohs (msg->type);
447   uint16_t size = ntohs (msg->size);
448
449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450               "%p Received request of type %u and size %u from multicast.\n",
451               ch, type, size);
452
453   switch (type)
454   {
455   case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
456   {
457     const struct GNUNET_MULTICAST_RequestHeader *req
458       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
459
460     if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
461                                                        (const char *) &req[1]))
462     {
463       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464                   "%p Dropping message with invalid parts "
465                   "received from multicast.\n", ch);
466       GNUNET_break_op (0);
467       break;
468     }
469
470     struct GNUNET_PSYC_MessageHeader *pmsg;
471     uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
472     pmsg = GNUNET_malloc (psize);
473     pmsg->header.size = htons (psize);
474     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
475     pmsg->message_id = req->request_id;
476     pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
477
478     memcpy (&pmsg[1], &req[1], size - sizeof (*req));
479
480     GNUNET_SERVER_notification_context_add (nc, ch->client);
481     GNUNET_SERVER_notification_context_unicast (nc, ch->client,
482                                                 (const struct GNUNET_MessageHeader *) pmsg,
483                                                 GNUNET_NO);
484     GNUNET_free (pmsg);
485     break;
486   }
487   default:
488     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489                 "%p Dropping unknown request of type %u and size %u.\n",
490                 ch, type, size);
491     GNUNET_break_op (0);
492   }
493 }
494
495
496 /**
497  * Response from PSYCstore with the current counter values for a channel master.
498  */
499 static void
500 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
501                     uint64_t max_message_id, uint64_t max_group_generation,
502                     uint64_t max_state_message_id)
503 {
504   struct Master *mst = cls;
505   struct Channel *ch = &mst->channel;
506   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
507   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
508   res->header.size = htons (sizeof (*res));
509   res->result_code = htonl (result);
510   res->max_message_id = GNUNET_htonll (max_message_id);
511
512   if (GNUNET_OK == result || GNUNET_NO == result)
513   {
514     mst->max_message_id = max_message_id;
515     mst->max_state_message_id = max_state_message_id;
516     mst->max_group_generation = max_group_generation;
517     mst->origin
518       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
519                                        max_fragment_id + 1,
520                                        join_cb, membership_test_cb,
521                                        replay_fragment_cb, replay_message_cb,
522                                        request_cb, master_message_cb, ch);
523     ch->ready = GNUNET_YES;
524   }
525   GNUNET_SERVER_notification_context_add (nc, ch->client);
526   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
527                                               GNUNET_NO);
528   GNUNET_free (res);
529 }
530
531
532 /**
533  * Response from PSYCstore with the current counter values for a channel slave.
534  */
535 void
536 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
537                    uint64_t max_message_id, uint64_t max_group_generation,
538                    uint64_t max_state_message_id)
539 {
540   struct Slave *slv = cls;
541   struct Channel *ch = &slv->channel;
542   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
543   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
544   res->header.size = htons (sizeof (*res));
545   res->result_code = htonl (result);
546   res->max_message_id = GNUNET_htonll (max_message_id);
547
548   if (GNUNET_OK == result || GNUNET_NO == result)
549   {
550     slv->max_message_id = max_message_id;
551     slv->member
552       = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
553                                       &slv->origin,
554                                       slv->relay_count, slv->relays,
555                                       slv->join_req, join_cb,
556                                       membership_test_cb,
557                                       replay_fragment_cb, replay_message_cb,
558                                       slave_message_cb, ch);
559     ch->ready = GNUNET_YES;
560   }
561
562   GNUNET_SERVER_notification_context_add (nc, ch->client);
563   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
564                                               GNUNET_NO);
565   GNUNET_free (res);
566 }
567
568
569 /**
570  * Handle a connecting client starting a channel master.
571  */
572 static void
573 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
574                      const struct GNUNET_MessageHeader *msg)
575 {
576   const struct MasterStartRequest *req
577     = (const struct MasterStartRequest *) msg;
578   struct Master *mst = GNUNET_new (struct Master);
579   mst->channel.client = client;
580   mst->channel.is_master = GNUNET_YES;
581   mst->policy = ntohl (req->policy);
582   mst->priv_key = req->channel_key;
583   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key);
584   GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586               "%p Master connected to channel %s.\n",
587               mst, GNUNET_h2s (&mst->pub_key_hash));
588
589   GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
590                                  master_counters_cb, mst);
591
592   GNUNET_SERVER_client_set_user_context (client, &mst->channel);
593   GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
594                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
595   GNUNET_SERVER_receive_done (client, GNUNET_OK);
596 }
597
598
599 /**
600  * Handle a connecting client joining as a channel slave.
601  */
602 static void
603 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
604                    const struct GNUNET_MessageHeader *msg)
605 {
606   const struct SlaveJoinRequest *req
607     = (const struct SlaveJoinRequest *) msg;
608   struct Slave *slv = GNUNET_new (struct Slave);
609   slv->channel.client = client;
610   slv->channel.is_master = GNUNET_NO;
611   slv->slave_key = req->slave_key;
612   slv->chan_key = req->channel_key;
613   GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
614                       &slv->chan_key_hash);
615   slv->origin = req->origin;
616   slv->relay_count = ntohl (req->relay_count);
617   if (0 < slv->relay_count)
618   {
619     const struct GNUNET_PeerIdentity *relays
620       = (const struct GNUNET_PeerIdentity *) &req[1];
621     slv->relays
622       = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
623     uint32_t i;
624     for (i = 0; i < slv->relay_count; i++)
625       memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
626   }
627
628   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629               "%p Slave connected to channel %s.\n",
630               slv, GNUNET_h2s (&slv->chan_key_hash));
631
632   GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
633                                  slave_counters_cb, slv);
634
635   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
636   GNUNET_SERVER_receive_done (client, GNUNET_OK);
637 }
638
639
640 /**
641  * Send acknowledgement to a client.
642  *
643  * Sent after a message fragment has been passed on to multicast.
644  *
645  * @param ch The channel struct for the client.
646  */
647 static void
648 send_message_ack (struct Channel *ch)
649 {
650   struct GNUNET_MessageHeader res;
651   res.size = htons (sizeof (res));
652   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
653
654   GNUNET_SERVER_notification_context_add (nc, ch->client);
655   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
656                                               GNUNET_NO);
657 }
658
659
660 /**
661  * Callback for the transmit functions of multicast.
662  */
663 static int
664 transmit_notify (void *cls, size_t *data_size, void *data)
665 {
666   struct Channel *ch = cls;
667   struct TransmitMessage *tmit_msg = ch->tmit_head;
668
669   if (NULL == tmit_msg || *data_size < tmit_msg->size)
670   {
671     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672                 "%p transmit_notify: nothing to send.\n", ch);
673     *data_size = 0;
674     return GNUNET_NO;
675   }
676
677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
678               "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
679
680   *data_size = tmit_msg->size;
681   memcpy (data, tmit_msg->buf, *data_size);
682
683   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
684   GNUNET_free (tmit_msg);
685
686   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
687   send_message_ack (ch);
688
689   if (0 == ch->tmit_task)
690   {
691     if (NULL != ch->tmit_head)
692     {
693       transmit_message (ch, GNUNET_NO);
694     }
695     else if (ch->disconnected)
696     {
697       /* FIXME: handle partial message (when still in_transmit) */
698       client_cleanup (ch);
699     }
700   }
701
702   return ret;
703 }
704
705
706 /**
707  * Callback for the transmit functions of multicast.
708  */
709 static int
710 master_transmit_notify (void *cls, size_t *data_size, void *data)
711 {
712   int ret = transmit_notify (cls, data_size, data);
713
714   if (GNUNET_YES == ret)
715   {
716     struct Master *mst = cls;
717     mst->tmit_handle = NULL;
718   }
719   return ret;
720 }
721
722
723 /**
724  * Callback for the transmit functions of multicast.
725  */
726 static int
727 slave_transmit_notify (void *cls, size_t *data_size, void *data)
728 {
729   int ret = transmit_notify (cls, data_size, data);
730
731   if (GNUNET_YES == ret)
732   {
733     struct Slave *slv = cls;
734     slv->tmit_handle = NULL;
735   }
736   return ret;
737 }
738
739
740 /**
741  * Transmit a message from a channel master to the multicast group.
742  */
743 static void
744 master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
745 {
746   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
747   mst->channel.tmit_task = 0;
748   if (NULL == mst->tmit_handle)
749   {
750     if (GNUNET_YES == inc_msg_id)
751       mst->max_message_id++;
752     mst->tmit_handle
753       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
754                                         mst->max_group_generation,
755                                         master_transmit_notify, mst);
756   }
757   else
758   {
759     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
760   }
761 }
762
763
764 /**
765  * Transmit a message from a channel slave to the multicast group.
766  */
767 static void
768 slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
769 {
770   slv->channel.tmit_task = 0;
771   if (NULL == slv->tmit_handle)
772   {
773     if (GNUNET_YES == inc_msg_id)
774       slv->max_message_id++;
775     slv->tmit_handle
776       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
777                                            slave_transmit_notify, slv);
778   }
779   else
780   {
781     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
782   }
783 }
784
785
786 static inline void
787 transmit_message (struct Channel *ch, uint8_t inc_msg_id)
788 {
789   ch->is_master
790     ? master_transmit_message ((struct Master *) ch, inc_msg_id)
791     : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
792 }
793
794
795 static void
796 transmit_error (struct Channel *ch)
797 {
798   struct GNUNET_MessageHeader *msg;
799   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
800                                                     + sizeof (*msg));
801   msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
802   msg->size = ntohs (sizeof (*msg));
803   msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
804
805   tmit_msg->buf = (char *) &tmit_msg[1];
806   tmit_msg->size = sizeof (*msg);
807   tmit_msg->state = ch->tmit_state;
808   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
809   transmit_message (ch, GNUNET_NO);
810
811   /* FIXME: cleanup */
812 }
813
814
815 /**
816  * Incoming message from a client.
817  */
818 static void
819 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
820                      const struct GNUNET_MessageHeader *msg)
821 {
822   struct Channel *ch
823     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
824   GNUNET_assert (NULL != ch);
825
826   if (GNUNET_YES != ch->ready)
827   {
828     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
829                 "%p Ignoring message from client, channel is not ready yet.\n",
830                 ch);
831     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832     return;
833   }
834
835   uint8_t inc_msg_id = GNUNET_NO;
836   uint16_t size = ntohs (msg->size);
837   uint16_t psize = 0, ptype = 0, pos = 0;
838
839   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
840   {
841     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
842     GNUNET_break (0);
843     transmit_error (ch);
844     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
845     return;
846   }
847
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "%p Received message from client.\n", ch);
850   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
851
852   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
853   {
854     const struct GNUNET_MessageHeader *pmsg
855       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
856     psize = ntohs (pmsg->size);
857     ptype = ntohs (pmsg->type);
858     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
859     {
860       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
861                   "%p Received invalid message part of type %u and size %u "
862                   "from client.\n", ch, ptype, psize);
863       GNUNET_break (0);
864       transmit_error (ch);
865       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
866       return;
867     }
868     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869                 "%p Received message part from client.\n", ch);
870     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
871
872     if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
873       inc_msg_id = GNUNET_YES;
874   }
875
876   size -= sizeof (*msg);
877   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
878   tmit_msg->buf = (char *) &tmit_msg[1];
879   memcpy (tmit_msg->buf, &msg[1], size);
880   tmit_msg->size = size;
881   tmit_msg->state = ch->tmit_state;
882   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
883   transmit_message (ch, inc_msg_id);
884
885   GNUNET_SERVER_receive_done (client, GNUNET_OK);
886 };
887
888
889 /**
890  * Initialize the PSYC service.
891  *
892  * @param cls Closure.
893  * @param server The initialized server.
894  * @param c Configuration to use.
895  */
896 static void
897 run (void *cls, struct GNUNET_SERVER_Handle *server,
898      const struct GNUNET_CONFIGURATION_Handle *c)
899 {
900   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
901     { &handle_master_start, NULL,
902       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
903
904     { &handle_slave_join, NULL,
905       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
906
907     { &handle_psyc_message, NULL,
908       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
909   };
910
911   cfg = c;
912   store = GNUNET_PSYCSTORE_connect (cfg);
913   stats = GNUNET_STATISTICS_create ("psyc", cfg);
914   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
915   nc = GNUNET_SERVER_notification_context_create (server, 1);
916   GNUNET_SERVER_add_handlers (server, handlers);
917   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
918   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
919                                 NULL);
920 }
921
922
923 /**
924  * The main function for the service.
925  *
926  * @param argc number of arguments from the command line
927  * @param argv command line arguments
928  * @return 0 ok, 1 on error
929  */
930 int
931 main (int argc, char *const *argv)
932 {
933   return (GNUNET_OK ==
934           GNUNET_SERVICE_run (argc, argv, "psyc",
935                               GNUNET_SERVICE_OPTION_NONE,
936                               &run, NULL)) ? 0 : 1;
937 }
938
939 /* end of gnunet-service-psycstore.c */