- re-added testcase for crypto-paillier
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "psyc.h"
36
37
38 /**
39  * Handle to our current configuration.
40  */
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43 /**
44  * Handle to the statistics service.
45  */
46 static struct GNUNET_STATISTICS_Handle *stats;
47
48 /**
49  * Notification context, simplifies client broadcasts.
50  */
51 static struct GNUNET_SERVER_NotificationContext *nc;
52
53 /**
54  * Handle to the PSYCstore.
55  */
56 static struct GNUNET_PSYCSTORE_Handle *store;
57
58 /**
59  * Channel's pub_key_hash -> struct Channel
60  */
61 static struct GNUNET_CONTAINER_MultiHashMap *clients;
62
63 /**
64  * Message in the transmission queue.
65  */
66 struct TransmitMessage
67 {
68   struct TransmitMessage *prev;
69   struct TransmitMessage *next;
70
71   char *buf;
72   uint16_t size;
73   /**
74    * enum MessageState
75    */
76   uint8_t state;
77 };
78
79 /**
80  * Common part of the client context for both a master and slave channel.
81  */
82 struct Channel
83 {
84   struct GNUNET_SERVER_Client *client;
85
86   struct TransmitMessage *tmit_head;
87   struct TransmitMessage *tmit_tail;
88
89   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
90
91   /**
92    * Expected value size for the modifier being received from the PSYC service.
93    */
94   uint32_t tmit_mod_value_size_expected;
95
96   /**
97    * Actual value size for the modifier being received from the PSYC service.
98    */
99   uint32_t tmit_mod_value_size;
100
101   /**
102    * enum MessageState
103    */
104   uint8_t tmit_state;
105
106   uint8_t in_transmit;
107   uint8_t is_master;
108   uint8_t disconnected;
109 };
110
111 /**
112  * Client context for a channel master.
113  */
114 struct Master
115 {
116   struct Channel channel;
117   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
119   struct GNUNET_HashCode pub_key_hash;
120
121   struct GNUNET_MULTICAST_Origin *origin;
122   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
123
124   /**
125    * Maximum message ID for this channel.
126    *
127    * Incremented before sending a message, thus the message_id in messages sent
128    * starts from 1.
129    */
130   uint64_t max_message_id;
131
132   /**
133    * ID of the last message that contains any state operations.
134    * 0 if there is no such message.
135    */
136   uint64_t max_state_message_id;
137
138   /**
139    * Maximum group generation for this channel.
140    */
141   uint64_t max_group_generation;
142
143   /**
144    * @see enum GNUNET_PSYC_Policy
145    */
146   uint32_t policy;
147 };
148
149
150 /**
151  * Client context for a channel slave.
152  */
153 struct Slave
154 {
155   struct Channel channel;
156   struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
157   struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
158   struct GNUNET_HashCode chan_key_hash;
159
160   struct GNUNET_MULTICAST_Member *member;
161   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
162
163   struct GNUNET_PeerIdentity origin;
164   struct GNUNET_PeerIdentity *relays;
165   struct GNUNET_MessageHeader *join_req;
166
167   uint64_t max_message_id;
168   uint64_t max_request_id;
169
170   uint32_t relay_count;
171 };
172
173
174 static inline void
175 transmit_message (struct Channel *ch);
176
177
178 /**
179  * Task run during shutdown.
180  *
181  * @param cls unused
182  * @param tc unused
183  */
184 static void
185 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
186 {
187   if (NULL != nc)
188   {
189     GNUNET_SERVER_notification_context_destroy (nc);
190     nc = NULL;
191   }
192   if (NULL != stats)
193   {
194     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
195     stats = NULL;
196   }
197 }
198
199
200 static void
201 client_cleanup (struct Channel *ch)
202 {
203   if (ch->is_master)
204   {
205     struct Master *mst = (struct Master *) ch;
206     if (NULL != mst->origin)
207       GNUNET_MULTICAST_origin_stop (mst->origin);
208     GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
209   }
210   else
211   {
212     struct Slave *slv = (struct Slave *) ch;
213     if (NULL != slv->join_req)
214       GNUNET_free (slv->join_req);
215     if (NULL != slv->relays)
216       GNUNET_free (slv->relays);
217     if (NULL != slv->member)
218       GNUNET_MULTICAST_member_part (slv->member);
219   }
220
221   GNUNET_free (ch);
222 }
223
224
225 /**
226  * Called whenever a client is disconnected.
227  * Frees our resources associated with that client.
228  *
229  * @param cls Closure.
230  * @param client Identification of the client.
231  */
232 static void
233 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
234 {
235   if (NULL == client)
236     return;
237
238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
239
240   struct Channel *ch
241     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
242   if (NULL == ch)
243   {
244     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
245                 "User context is NULL in client_disconnect()\n");
246     GNUNET_break (0);
247     return;
248   }
249
250   ch->disconnected = GNUNET_YES;
251
252   /* Send pending messages to multicast before cleanup. */
253   if (NULL != ch->tmit_head)
254   {
255     transmit_message (ch);
256   }
257   else
258   {
259     client_cleanup (ch);
260   }
261 }
262
263
264 static void
265 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
266          const struct GNUNET_MessageHeader *join_req,
267          struct GNUNET_MULTICAST_JoinHandle *jh)
268 {
269
270 }
271
272
273 static void
274 membership_test_cb (void *cls,
275                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
276                     uint64_t message_id, uint64_t group_generation,
277                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
278 {
279
280 }
281
282
283 static void
284 replay_fragment_cb (void *cls,
285                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
286                     uint64_t fragment_id, uint64_t flags,
287                     struct GNUNET_MULTICAST_ReplayHandle *rh)
288
289 {
290 }
291
292
293 static void
294 replay_message_cb (void *cls,
295                    const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
296                    uint64_t message_id,
297                    uint64_t fragment_offset,
298                    uint64_t flags,
299                    struct GNUNET_MULTICAST_ReplayHandle *rh)
300 {
301
302 }
303
304
305 static void
306 fragment_store_result (void *cls, int64_t result, const char *err_msg)
307 {
308   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
309               "fragment_store() returned %l (%s)\n", result, err_msg);
310 }
311
312
313 /**
314  * Iterator callback for sending a message to a client.
315  *
316  * @see message_cb()
317  */
318 static int
319 message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
320                    void *chan)
321 {
322   const struct GNUNET_MessageHeader *msg = cls;
323   struct Channel *ch = chan;
324
325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326               "Sending message of type %u and size %u to client 0x%zx.\n",
327               ntohs (msg->type), ntohs (msg->size), ch->client);
328
329   GNUNET_SERVER_notification_context_add (nc, ch->client);
330   GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
331
332   return GNUNET_YES;
333 }
334
335
336 /**
337  * Incoming message fragment from multicast.
338  *
339  * Store it using PSYCstore and send it to all clients of the channel.
340  */
341 static void
342 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
343 {
344   uint16_t type = ntohs (msg->type);
345   uint16_t size = ntohs (msg->size);
346
347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348               "Received message of type %u and size %u from multicast.\n",
349               type, size);
350
351   struct Channel *ch = cls;
352   struct Master *mst = cls;
353   struct Slave *slv = cls;
354
355   /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
356      = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
357   struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
358     = ch->is_master ? &mst->pub_key : &slv->chan_key;
359   struct GNUNET_HashCode *chan_key_hash
360     = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
361
362   switch (type)
363   {
364   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
365   {
366     GNUNET_PSYCSTORE_fragment_store (store, chan_key,
367                                      (const struct
368                                       GNUNET_MULTICAST_MessageHeader *) msg,
369                                      0, NULL, NULL);
370
371 #if TODO
372     /* FIXME: apply modifiers to state in PSYCstore */
373     GNUNET_PSYCSTORE_state_modify (store, chan_key,
374                                    GNUNET_ntohll (mmsg->message_id),
375                                    meth->mod_count, mods,
376                                    rcb, rcb_cls);
377 #endif
378
379     const struct GNUNET_MULTICAST_MessageHeader *mmsg
380       = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
381     struct GNUNET_PSYC_MessageHeader *pmsg;
382
383     uint16_t size = ntohs (msg->size);
384     uint16_t psize = 0;
385     uint16_t pos = 0;
386
387     for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
388     {
389       const struct GNUNET_MessageHeader *pmsg
390         = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
391       psize = ntohs (pmsg->size);
392       if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
393       {
394         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395                     "Received invalid message part of type %u and size %u "
396                     "from multicast. Not sending to clients.\n",
397                     ntohs (pmsg->type), psize);
398         GNUNET_break_op (0);
399         return;
400       }
401     }
402
403     psize = sizeof (*pmsg) + size - sizeof (*mmsg);
404     pmsg = GNUNET_malloc (psize);
405     pmsg->header.size = htons (psize);
406     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
407     pmsg->message_id = mmsg->message_id;
408
409     memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
410
411     GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash,
412                                                 message_to_client,
413                                                 (void *) pmsg);
414     GNUNET_free (pmsg);
415     break;
416   }
417   default:
418     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
419                 "Discarding unknown message of type %u and size %u.\n",
420                 type, size);
421   }
422 }
423
424
425 /**
426  * Send a request received from multicast to a client.
427  */
428 static int
429 request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
430                    void *chan)
431 {
432   /* TODO */
433
434   return GNUNET_YES;
435 }
436
437
438 static void
439 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
440             const struct GNUNET_MessageHeader *req,
441             enum GNUNET_MULTICAST_MessageFlags flags)
442 {
443
444 }
445
446
447 /**
448  * Response from PSYCstore with the current counter values for a channel master.
449  */
450 static void
451 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
452                     uint64_t max_message_id, uint64_t max_group_generation,
453                     uint64_t max_state_message_id)
454 {
455   struct Master *mst = cls;
456   struct Channel *ch = &mst->channel;
457   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
458   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
459   res->header.size = htons (sizeof (*res));
460   res->result_code = htonl (result);
461   res->max_message_id = GNUNET_htonll (max_message_id);
462
463   if (GNUNET_OK == result || GNUNET_NO == result)
464   {
465     mst->max_message_id = max_message_id;
466     mst->max_state_message_id = max_state_message_id;
467     mst->max_group_generation = max_group_generation;
468     mst->origin
469       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
470                                        max_fragment_id + 1,
471                                        join_cb, membership_test_cb,
472                                        replay_fragment_cb, replay_message_cb,
473                                        request_cb, message_cb, ch);
474   }
475   GNUNET_SERVER_notification_context_add (nc, ch->client);
476   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
477                                               GNUNET_NO);
478   GNUNET_free (res);
479 }
480
481
482 /**
483  * Response from PSYCstore with the current counter values for a channel slave.
484  */
485 void
486 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
487                    uint64_t max_message_id, uint64_t max_group_generation,
488                    uint64_t max_state_message_id)
489 {
490   struct Slave *slv = cls;
491   struct Channel *ch = &slv->channel;
492   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
493   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
494   res->header.size = htons (sizeof (*res));
495   res->result_code = htonl (result);
496   res->max_message_id = GNUNET_htonll (max_message_id);
497
498   if (GNUNET_OK == result || GNUNET_NO == result)
499   {
500     slv->max_message_id = max_message_id;
501     slv->member
502       = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
503                                       &slv->origin,
504                                       slv->relay_count, slv->relays,
505                                       slv->join_req, join_cb,
506                                       membership_test_cb,
507                                       replay_fragment_cb, replay_message_cb,
508                                       message_cb, ch);
509   }
510
511   GNUNET_SERVER_notification_context_add (nc, ch->client);
512   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
513                                               GNUNET_NO);
514   GNUNET_free (res);
515 }
516
517
518 /**
519  * Handle a connecting client starting a channel master.
520  */
521 static void
522 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
523                      const struct GNUNET_MessageHeader *msg)
524 {
525   const struct MasterStartRequest *req
526     = (const struct MasterStartRequest *) msg;
527   struct Master *mst = GNUNET_new (struct Master);
528   mst->channel.client = client;
529   mst->channel.is_master = GNUNET_YES;
530   mst->policy = ntohl (req->policy);
531   mst->priv_key = req->channel_key;
532   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
533                                                   &mst->pub_key);
534   GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
535
536   GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
537                                  master_counters_cb, mst);
538
539   GNUNET_SERVER_client_set_user_context (client, &mst->channel);
540   GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
541                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
542   GNUNET_SERVER_receive_done (client, GNUNET_OK);
543 }
544
545
546 /**
547  * Handle a connecting client joining as a channel slave.
548  */
549 static void
550 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
551                    const struct GNUNET_MessageHeader *msg)
552 {
553   const struct SlaveJoinRequest *req
554     = (const struct SlaveJoinRequest *) msg;
555   struct Slave *slv = GNUNET_new (struct Slave);
556   slv->channel.client = client;
557   slv->channel.is_master = GNUNET_NO;
558   slv->slave_key = req->slave_key;
559   slv->chan_key = req->channel_key;
560   GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
561                       &slv->chan_key_hash);
562   slv->origin = req->origin;
563   slv->relay_count = ntohl (req->relay_count);
564
565   const struct GNUNET_PeerIdentity *relays
566     = (const struct GNUNET_PeerIdentity *) &req[1];
567   slv->relays
568     = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
569   uint32_t i;
570   for (i = 0; i < slv->relay_count; i++)
571     memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
572
573   GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
574                                  slave_counters_cb, slv);
575
576   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
577   GNUNET_SERVER_receive_done (client, GNUNET_OK);
578 }
579
580
581 /**
582  * Send acknowledgement to a client.
583  *
584  * Sent after a message fragment has been passed on to multicast.
585  *
586  * @param ch The channel struct for the client.
587  */
588 static void
589 send_message_ack (struct Channel *ch)
590 {
591   struct GNUNET_MessageHeader res;
592   res.size = htons (sizeof (res));
593   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
594
595   GNUNET_SERVER_notification_context_add (nc, ch->client);
596   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
597                                               GNUNET_NO);
598 }
599
600
601 /**
602  * Callback for the transmit functions of multicast.
603  */
604 static int
605 transmit_notify (void *cls, size_t *data_size, void *data)
606 {
607   struct Channel *ch = cls;
608   struct TransmitMessage *tmit_msg = ch->tmit_head;
609
610   if (NULL == tmit_msg || *data_size < tmit_msg->size)
611   {
612     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n");
613     *data_size = 0;
614     return GNUNET_NO;
615   }
616
617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618               "transmit_notify: sending %u bytes.\n", tmit_msg->size);
619
620   *data_size = tmit_msg->size;
621   memcpy (data, tmit_msg->buf, *data_size);
622
623   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
624   GNUNET_free (tmit_msg);
625
626   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
627   send_message_ack (ch);
628
629   if (0 == ch->tmit_task)
630   {
631     if (NULL != ch->tmit_head)
632     {
633       transmit_message (ch);
634     }
635     else if (ch->disconnected)
636     {
637       /* FIXME: handle partial message (when still in_transmit) */
638       client_cleanup (ch);
639     }
640   }
641
642   return ret;
643 }
644
645
646 /**
647  * Transmit a message from a channel master to the multicast group.
648  */
649 static void
650 master_transmit_message (struct Master *mst)
651 {
652   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
653   mst->channel.tmit_task = 0;
654   if (NULL == mst->tmit_handle)
655   {
656     mst->tmit_handle
657       = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
658                                         mst->max_group_generation,
659                                         transmit_notify, mst);
660   }
661   else
662   {
663     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
664   }
665 }
666
667
668 /**
669  * Transmit a message from a channel slave to the multicast group.
670  */
671 static void
672 slave_transmit_message (struct Slave *slv)
673 {
674   slv->channel.tmit_task = 0;
675   if (NULL == slv->tmit_handle)
676   {
677     slv->tmit_handle
678       = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
679                                           transmit_notify, slv);
680   }
681   else
682   {
683     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
684   }
685 }
686
687
688 static inline void
689 transmit_message (struct Channel *ch)
690 {
691   ch->is_master
692     ? master_transmit_message ((struct Master *) ch)
693     : slave_transmit_message ((struct Slave *) ch);
694 }
695
696
697 static void
698 transmit_error (struct Channel *ch)
699 {
700   struct GNUNET_MessageHeader *msg;
701   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
702                                                     + sizeof (*msg));
703   msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
704   msg->size = ntohs (sizeof (*msg));
705   msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
706
707   tmit_msg->buf = (char *) &tmit_msg[1];
708   tmit_msg->size = sizeof (*msg);
709   tmit_msg->state = ch->tmit_state;
710   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
711   transmit_message (ch);
712
713   /* FIXME: cleanup */
714   GNUNET_SERVER_client_disconnect (ch->client);
715 }
716
717
718 /**
719  * Incoming message from a client.
720  */
721 static void
722 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
723                          const struct GNUNET_MessageHeader *msg)
724 {
725   struct Channel *ch
726     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
727   GNUNET_assert (NULL != ch);
728
729   uint16_t size = ntohs (msg->size);
730   uint16_t psize = 0, pos = 0;
731
732   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
733   {
734     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
735     GNUNET_break (0);
736     transmit_error (ch);
737     return;
738   }
739
740   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
741   {
742     const struct GNUNET_MessageHeader *pmsg
743       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
744     psize = ntohs (pmsg->size);
745     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746                 "Received message part of type %u and size %u "
747                 "from client.\n", ntohs (pmsg->type), psize);
748     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
749     {
750       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
751                   "Received invalid message part of type %u and size %u "
752                   "from client.\n", ntohs (pmsg->type), psize);
753       GNUNET_break (0);
754       transmit_error (ch);
755       return;
756     }
757   }
758
759   size -= sizeof (*msg);
760   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
761   tmit_msg->buf = (char *) &tmit_msg[1];
762   memcpy (tmit_msg->buf, &msg[1], size);
763   tmit_msg->size = size;
764   tmit_msg->state = ch->tmit_state;
765   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
766   transmit_message (ch);
767
768   GNUNET_SERVER_receive_done (client, GNUNET_OK);
769 };
770
771
772 /**
773  * Initialize the PSYC service.
774  *
775  * @param cls Closure.
776  * @param server The initialized server.
777  * @param c Configuration to use.
778  */
779 static void
780 run (void *cls, struct GNUNET_SERVER_Handle *server,
781      const struct GNUNET_CONFIGURATION_Handle *c)
782 {
783   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
784     { &handle_master_start, NULL,
785       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
786
787     { &handle_slave_join, NULL,
788       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
789
790     { &handle_psyc_message, NULL,
791       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
792   };
793
794   cfg = c;
795   store = GNUNET_PSYCSTORE_connect (cfg);
796   stats = GNUNET_STATISTICS_create ("psyc", cfg);
797   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
798   nc = GNUNET_SERVER_notification_context_create (server, 1);
799   GNUNET_SERVER_add_handlers (server, handlers);
800   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
801   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
802                                 NULL);
803 }
804
805
806 /**
807  * The main function for the service.
808  *
809  * @param argc number of arguments from the command line
810  * @param argv command line arguments
811  * @return 0 ok, 1 on error
812  */
813 int
814 main (int argc, char *const *argv)
815 {
816   return (GNUNET_OK ==
817           GNUNET_SERVICE_run (argc, argv, "psyc",
818                               GNUNET_SERVICE_OPTION_NONE,
819                               &run, NULL)) ? 0 : 1;
820 }
821
822 /* end of gnunet-service-psycstore.c */