- use tunnel encryption state to select decryption key
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "psyc.h"
36
37
38 /**
39  * Handle to our current configuration.
40  */
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43 /**
44  * Handle to the statistics service.
45  */
46 static struct GNUNET_STATISTICS_Handle *stats;
47
48 /**
49  * Notification context, simplifies client broadcasts.
50  */
51 static struct GNUNET_SERVER_NotificationContext *nc;
52
53 /**
54  * Handle to the PSYCstore.
55  */
56 static struct GNUNET_PSYCSTORE_Handle *store;
57
58 /**
59  * Channel's pub_key_hash -> struct Channel
60  */
61 static struct GNUNET_CONTAINER_MultiHashMap *clients;
62
63 /**
64  * Message in the transmission queue.
65  */
66 struct TransmitMessage
67 {
68   struct TransmitMessage *prev;
69   struct TransmitMessage *next;
70
71   char *buf;
72   uint16_t size;
73   /**
74    * enum GNUNET_PSYC_DataStatus
75    */
76   uint8_t status;
77 };
78
79 /**
80  * Common part of the client context for both a master and slave channel.
81  */
82 struct Channel
83 {
84   struct GNUNET_SERVER_Client *client;
85
86   struct TransmitMessage *tmit_head;
87   struct TransmitMessage *tmit_tail;
88
89   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
90   uint32_t tmit_mod_count;
91   uint32_t tmit_mod_recvd;
92   /**
93    * enum GNUNET_PSYC_DataStatus
94    */
95   uint8_t tmit_status;
96
97   uint8_t in_transmit;
98   uint8_t is_master;
99   uint8_t disconnected;
100 };
101
102 /**
103  * Client context for a channel master.
104  */
105 struct Master
106 {
107   struct Channel channel;
108   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
109   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
110   struct GNUNET_HashCode pub_key_hash;
111
112   struct GNUNET_MULTICAST_Origin *origin;
113   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
114
115   uint64_t max_message_id;
116   uint64_t max_state_message_id;
117   uint64_t max_group_generation;
118
119   /**
120    * enum GNUNET_PSYC_Policy
121    */
122   uint32_t policy;
123 };
124
125
126 /**
127  * Client context for a channel slave.
128  */
129 struct Slave
130 {
131   struct Channel channel;
132   struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
133   struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
134   struct GNUNET_HashCode chan_key_hash;
135
136   struct GNUNET_MULTICAST_Member *member;
137   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
138
139   struct GNUNET_PeerIdentity origin;
140   struct GNUNET_PeerIdentity *relays;
141   struct GNUNET_MessageHeader *join_req;
142
143   uint64_t max_message_id;
144   uint64_t max_request_id;
145
146   uint32_t relay_count;
147 };
148
149
150 static void
151 transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay);
152
153
154 /**
155  * Task run during shutdown.
156  *
157  * @param cls unused
158  * @param tc unused
159  */
160 static void
161 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
162 {
163   if (NULL != nc)
164   {
165     GNUNET_SERVER_notification_context_destroy (nc);
166     nc = NULL;
167   }
168   if (NULL != stats)
169   {
170     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
171     stats = NULL;
172   }
173 }
174
175
176 static void
177 client_cleanup (struct Channel *ch)
178 {
179   if (ch->is_master)
180   {
181     struct Master *mst = (struct Master *) ch;
182     if (NULL != mst->origin)
183       GNUNET_MULTICAST_origin_stop (mst->origin);
184   }
185   else
186   {
187     struct Slave *slv = (struct Slave *) ch;
188     if (NULL != slv->join_req)
189       GNUNET_free (slv->join_req);
190     if (NULL != slv->relays)
191       GNUNET_free (slv->relays);
192     if (NULL != slv->member)
193       GNUNET_MULTICAST_member_part (slv->member);
194   }
195
196   GNUNET_free (ch);
197 }
198
199 /**
200  * Called whenever a client is disconnected.
201  * Frees our resources associated with that client.
202  *
203  * @param cls Closure.
204  * @param client Identification of the client.
205  */
206 static void
207 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
208 {
209   if (NULL == client)
210     return;
211
212   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
213
214   struct Channel *ch
215     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
216   if (NULL == ch)
217   {
218     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
219                 "User context is NULL in client_disconnect()\n");
220     GNUNET_break (0);
221     return;
222   }
223
224   ch->disconnected = GNUNET_YES;
225
226   /* Send pending messages to multicast before cleanup. */
227   if (NULL != ch->tmit_head)
228   {
229     transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
230   }
231   else
232   {
233     client_cleanup (ch);
234   }
235 }
236
237 void
238 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
239          const struct GNUNET_MessageHeader *join_req,
240          struct GNUNET_MULTICAST_JoinHandle *jh)
241 {
242
243 }
244
245 void
246 membership_test_cb (void *cls,
247                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
248                     uint64_t message_id, uint64_t group_generation,
249                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
250 {
251
252 }
253
254 void
255 replay_fragment_cb (void *cls,
256                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
257                     uint64_t fragment_id, uint64_t flags,
258                     struct GNUNET_MULTICAST_ReplayHandle *rh)
259 {
260
261 }
262
263 void
264 replay_message_cb (void *cls,
265                    const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
266                    uint64_t message_id,
267                    uint64_t fragment_offset,
268                    uint64_t flags,
269                    struct GNUNET_MULTICAST_ReplayHandle *rh)
270 {
271
272 }
273
274 void
275 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
276             const struct GNUNET_MessageHeader *req,
277             enum GNUNET_MULTICAST_MessageFlags flags)
278 {
279
280 }
281
282
283 void
284 fragment_store_result (void *cls, int64_t result, const char *err_msg)
285 {
286   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
287               "fragment_store() returned %l (%s)\n", result, err_msg);
288 }
289
290 /**
291  * Send PSYC messages in an incoming multicast message to a client.
292  */
293 int
294 send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan)
295 {
296   const struct GNUNET_MULTICAST_MessageHeader *msg = cls;
297   struct Channel *ch = chan;
298
299   uint16_t size = ntohs (msg->header.size);
300   uint16_t pos = 0;
301
302   while (sizeof (*msg) + pos < size)
303   {
304     const struct GNUNET_MessageHeader *pmsg
305       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
306     uint16_t psize = ntohs (pmsg->size);
307     if (sizeof (*msg) + pos + psize > size)
308     {
309       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
310                   "Ignoring message of type %u with invalid size. "
311                   "(%u + %u + %u > %u)\n", ntohs (pmsg->type),
312                   sizeof (*msg), pos, psize, size);
313       break;
314     }
315     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
316                 "Sending message of type %u and size %u to client.\n",
317                 ntohs (pmsg->type), psize);
318
319     GNUNET_SERVER_notification_context_add (nc, ch->client);
320     GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg,
321                                                 GNUNET_NO);
322     pos += psize;
323   }
324   return GNUNET_YES;
325 }
326
327
328 /**
329  * Incoming message fragment from multicast.
330  *
331  * Store it using PSYCstore and send it to all clients of the channel.
332  */
333 void
334 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
335 {
336   uint16_t type = ntohs (msg->type);
337   uint16_t size = ntohs (msg->size);
338
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Received message of type %u and size %u from multicast.\n",
341               type, size);
342
343   struct Channel *ch = cls;
344   struct Master *mst = cls;
345   struct Slave *slv = cls;
346
347   struct GNUNET_CRYPTO_EddsaPublicKey *ch_key
348     = ch->is_master ? &mst->pub_key : &slv->chan_key;
349   struct GNUNET_HashCode *ch_key_hash
350     = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
351
352   switch (type)
353   {
354   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
355     GNUNET_PSYCSTORE_fragment_store (store, ch_key,
356                                      (const struct
357                                       GNUNET_MULTICAST_MessageHeader *) msg,
358                                      0, NULL, NULL);
359     GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash,
360                                                 send_to_client, (void *) msg);
361     break;
362
363   default:
364     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
365                 "Ignoring unknown message of type %u and size %u.\n",
366                 type, size);
367   }
368 }
369
370
371 /**
372  * Response from PSYCstore with the current counter values for a channel master.
373  */
374 void
375 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
376                     uint64_t max_message_id, uint64_t max_group_generation,
377                     uint64_t max_state_message_id)
378 {
379   struct Master *mst = cls;
380   struct Channel *ch = &mst->channel;
381   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
382   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
383   res->header.size = htons (sizeof (*res));
384   res->result_code = htonl (result);
385   res->max_message_id = GNUNET_htonll (max_message_id);
386
387   if (GNUNET_OK == result || GNUNET_NO == result)
388   {
389     mst->max_message_id = max_message_id;
390     mst->max_state_message_id = max_state_message_id;
391     mst->max_group_generation = max_group_generation;
392     mst->origin
393       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
394                                        max_fragment_id + 1,
395                                        join_cb, membership_test_cb,
396                                        replay_fragment_cb, replay_message_cb,
397                                        request_cb, message_cb, ch);
398   }
399   GNUNET_SERVER_notification_context_add (nc, ch->client);
400   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
401                                               GNUNET_NO);
402   GNUNET_free (res);
403 }
404
405
406 /**
407  * Response from PSYCstore with the current counter values for a channel slave.
408  */
409 void
410 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
411                    uint64_t max_message_id, uint64_t max_group_generation,
412                    uint64_t max_state_message_id)
413 {
414   struct Slave *slv = cls;
415   struct Channel *ch = &slv->channel;
416   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
417   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
418   res->header.size = htons (sizeof (*res));
419   res->result_code = htonl (result);
420   res->max_message_id = GNUNET_htonll (max_message_id);
421
422   if (GNUNET_OK == result || GNUNET_NO == result)
423   {
424     slv->max_message_id = max_message_id;
425     slv->member
426       = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
427                                       &slv->origin,
428                                       slv->relay_count, slv->relays,
429                                       slv->join_req, join_cb,
430                                       membership_test_cb,
431                                       replay_fragment_cb, replay_message_cb,
432                                       message_cb, ch);
433   }
434
435   GNUNET_SERVER_notification_context_add (nc, ch->client);
436   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
437                                               GNUNET_NO);
438   GNUNET_free (res);
439 }
440
441
442 /**
443  * Handle a connecting client starting a channel master.
444  */
445 static void
446 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
447                      const struct GNUNET_MessageHeader *msg)
448 {
449   const struct MasterStartRequest *req
450     = (const struct MasterStartRequest *) msg;
451   struct Master *mst = GNUNET_new (struct Master);
452   mst->channel.client = client;
453   mst->channel.is_master = GNUNET_YES;
454   mst->policy = ntohl (req->policy);
455   mst->priv_key = req->channel_key;
456   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
457                                                   &mst->pub_key);
458   GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
459
460   GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
461                                  master_counters_cb, mst);
462
463   GNUNET_SERVER_client_set_user_context (client, &mst->channel);
464   GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
465                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
466   GNUNET_SERVER_receive_done (client, GNUNET_OK);
467 }
468
469
470 /**
471  * Handle a connecting client joining as a channel slave.
472  */
473 static void
474 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
475                    const struct GNUNET_MessageHeader *msg)
476 {
477   const struct SlaveJoinRequest *req
478     = (const struct SlaveJoinRequest *) msg;
479   struct Slave *slv = GNUNET_new (struct Slave);
480   slv->channel.client = client;
481   slv->channel.is_master = GNUNET_NO;
482   slv->slave_key = req->slave_key;
483   slv->chan_key = req->channel_key;
484   GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
485                       &slv->chan_key_hash);
486   slv->origin = req->origin;
487   slv->relay_count = ntohl (req->relay_count);
488
489   const struct GNUNET_PeerIdentity *relays
490     = (const struct GNUNET_PeerIdentity *) &req[1];
491   slv->relays
492     = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
493   uint32_t i;
494   for (i = 0; i < slv->relay_count; i++)
495     memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
496
497   GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
498                                  slave_counters_cb, slv);
499
500   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
501   GNUNET_SERVER_receive_done (client, GNUNET_OK);
502 }
503
504
505 /**
506  * Send transmission acknowledgement to a client.
507  *
508  * Sent after the last GNUNET_PSYC_MessageModifier and after each
509  * GNUNET_PSYC_MessageData.
510  *
511  * @param ch The channel struct for the client.
512  */
513 static void
514 send_transmit_ack (struct Channel *ch)
515 {
516   struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
517   res->header.size = htons (sizeof (*res));
518   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
519
520   res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
521   struct TransmitMessage *tmit_msg = ch->tmit_tail;
522   if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status)
523     res->buf_avail -= tmit_msg->size;
524   res->buf_avail = htons (res->buf_avail);
525
526   GNUNET_SERVER_notification_context_add (nc, ch->client);
527   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
528                                               GNUNET_NO);
529   GNUNET_free (res);
530 }
531
532
533 /**
534  * Callback for the transmit functions of multicast.
535  */
536 static int
537 transmit_notify (void *cls, size_t *data_size, void *data)
538 {
539   struct Channel *ch = cls;
540   struct TransmitMessage *msg = ch->tmit_head;
541
542   if (NULL == msg || *data_size < msg->size)
543   {
544     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n");
545     *data_size = 0;
546     return GNUNET_NO;
547   }
548
549   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550               "transmit_notify: sending %u bytes.\n", msg->size);
551
552   *data_size = msg->size;
553   memcpy (data, msg->buf, *data_size);
554
555   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
556   GNUNET_free (msg);
557
558   int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
559
560   if (0 == ch->tmit_task)
561   {
562     if (NULL != ch->tmit_head)
563     {
564       transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
565     }
566     else if (ch->disconnected)
567     {
568       /* FIXME: handle partial message (when still in_transmit) */
569       client_cleanup (ch);
570     }
571   }
572
573   return ret;
574 }
575
576
577 /**
578  * Transmit a message from a channel master to the multicast group.
579  */
580 static void
581 master_transmit_message (void *cls,
582                          const struct GNUNET_SCHEDULER_TaskContext *tc)
583 {
584   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
585   struct Master *mst = cls;
586   mst->channel.tmit_task = 0;
587   if (NULL == mst->tmit_handle)
588   {
589     mst->tmit_handle
590       = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
591                                         mst->max_group_generation,
592                                         transmit_notify, mst);
593   }
594   else
595   {
596     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
597   }
598 }
599
600
601 /**
602  * Transmit a message from a channel slave to the multicast group.
603  */
604 static void
605 slave_transmit_message (void *cls,
606                         const struct GNUNET_SCHEDULER_TaskContext *tc)
607 {
608   struct Slave *slv = cls;
609   slv->channel.tmit_task = 0;
610   if (NULL == slv->tmit_handle)
611   {
612     slv->tmit_handle
613       = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
614                                           transmit_notify, slv);
615   }
616   else
617   {
618     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
619   }
620 }
621
622
623 /**
624  * Schedule message transmission from a channel to the multicast group.
625  *
626  * @param ch The channel.
627  * @param delay Transmission delay.
628  */
629 static void
630 transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
631 {
632   if (0 != ch->tmit_task)
633     GNUNET_SCHEDULER_cancel (ch->tmit_task);
634
635   ch->tmit_task
636     = ch->is_master
637     ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
638     : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
639 }
640
641 /**
642  * Queue incoming message parts from a client for transmission, and send them to
643  * the multicast group when the buffer is full or reached the end of message.
644  *
645  * @param ch Channel struct for the client.
646  * @param msg Message from the client.
647  *
648  * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
649  */
650 static int
651 queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
652 {
653   uint16_t size = ntohs (msg->size);
654   struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
655   struct TransmitMessage *tmit_msg = ch->tmit_tail;
656
657   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658               "Queueing message of type %u and size %u "
659               "for transmission to multicast.\n",
660               ntohs (msg->type), size);
661
662   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
663     return GNUNET_SYSERR;
664
665   if (NULL == tmit_msg
666       || tmit_msg->status != GNUNET_PSYC_DATA_CONT
667       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size)
668   {
669     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670                 "Appending message qto new buffer.\n");
671     /* Start filling up new buffer */
672     tmit_msg = GNUNET_new (struct TransmitMessage);
673     tmit_msg->buf = GNUNET_malloc (size);
674     memcpy (tmit_msg->buf, msg, size);
675     tmit_msg->size = size;
676     tmit_msg->status = ch->tmit_status;
677     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
678   }
679   else
680   {
681     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682                 "Appending message to existing buffer.\n");
683     /* Append to existing buffer */
684     tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
685     memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
686     tmit_msg->size += size;
687     tmit_msg->status = ch->tmit_status;
688   }
689
690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
691
692   /* Wait a bit for the remaining message parts from the client
693      if there's still some space left in the buffer. */
694   if (GNUNET_PSYC_DATA_CONT == tmit_msg->status
695       && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData)
696           < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE))
697     tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
698
699   transmit_message (ch, tmit_delay);
700
701   return GNUNET_OK;
702 }
703
704 /**
705  * Incoming method from a client.
706  */
707 static void
708 handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
709                         const struct GNUNET_MessageHeader *msg)
710 {
711   const struct GNUNET_PSYC_MessageMethod *meth
712     = (const struct GNUNET_PSYC_MessageMethod *) msg;
713   struct Channel *ch
714     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
715   GNUNET_assert (NULL != ch);
716
717   if (GNUNET_NO != ch->in_transmit)
718   {
719     /* FIXME: already transmitting a message, send back error message. */
720     return;
721   }
722
723   ch->in_transmit = GNUNET_YES;
724   ch->tmit_mod_recvd = 0;
725   ch->tmit_mod_count = ntohl (meth->mod_count);
726   ch->tmit_status = GNUNET_PSYC_DATA_CONT;
727
728   queue_message (ch, msg);
729
730   if (0 == ch->tmit_mod_count)
731     send_transmit_ack (ch);
732
733   GNUNET_SERVER_receive_done (client, GNUNET_OK);
734 };
735
736
737 /**
738  * Incoming modifier from a client.
739  */
740 static void
741 handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
742                           const struct GNUNET_MessageHeader *msg)
743 {
744   /*
745   const struct GNUNET_PSYC_MessageModifier *mod
746     = (const struct GNUNET_PSYC_MessageModifier *) msg;
747   */
748   struct Channel *ch
749     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
750   GNUNET_assert (NULL != ch);
751
752   ch->tmit_mod_recvd++;
753   queue_message (ch, msg);
754
755   if (ch->tmit_mod_recvd == ch->tmit_mod_count)
756     send_transmit_ack (ch);
757
758   GNUNET_SERVER_receive_done (client, GNUNET_OK);
759 };
760
761
762 /**
763  * Incoming data from a client.
764  */
765 static void
766 handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
767                       const struct GNUNET_MessageHeader *msg)
768 {
769   const struct GNUNET_PSYC_MessageData *data
770     = (const struct GNUNET_PSYC_MessageData *) msg;
771   struct Channel *ch
772     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
773   GNUNET_assert (NULL != ch);
774
775   ch->tmit_status = ntohs (data->status);
776   queue_message (ch, msg);
777   send_transmit_ack (ch);
778
779   if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
780     ch->in_transmit = GNUNET_NO;
781
782   GNUNET_SERVER_receive_done (client, GNUNET_OK);
783 };
784
785
786 /**
787  * Initialize the PSYC service.
788  *
789  * @param cls Closure.
790  * @param server The initialized server.
791  * @param c Configuration to use.
792  */
793 static void
794 run (void *cls, struct GNUNET_SERVER_Handle *server,
795      const struct GNUNET_CONFIGURATION_Handle *c)
796 {
797   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
798     { &handle_master_start, NULL,
799       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
800
801     { &handle_slave_join, NULL,
802       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
803
804     { &handle_transmit_method, NULL,
805       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
806
807     { &handle_transmit_modifier, NULL,
808       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
809
810     { &handle_transmit_data, NULL,
811       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
812
813     { NULL, NULL, 0, 0 }
814   };
815
816   cfg = c;
817   store = GNUNET_PSYCSTORE_connect (cfg);
818   stats = GNUNET_STATISTICS_create ("psyc", cfg);
819   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
820   nc = GNUNET_SERVER_notification_context_create (server, 1);
821   GNUNET_SERVER_add_handlers (server, handlers);
822   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
823   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
824                                 NULL);
825 }
826
827
828 /**
829  * The main function for the service.
830  *
831  * @param argc number of arguments from the command line
832  * @param argv command line arguments
833  * @return 0 ok, 1 on error
834  */
835 int
836 main (int argc, char *const *argv)
837 {
838   return (GNUNET_OK ==
839           GNUNET_SERVICE_run (argc, argv, "psyc",
840                               GNUNET_SERVICE_OPTION_NONE,
841                               &run, NULL)) ? 0 : 1;
842 }
843
844 /* end of gnunet-service-psycstore.c */