- typo
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "psyc.h"
36
37
38 /**
39  * Handle to our current configuration.
40  */
41 static const struct GNUNET_CONFIGURATION_Handle *cfg;
42
43 /**
44  * Handle to the statistics service.
45  */
46 static struct GNUNET_STATISTICS_Handle *stats;
47
48 /**
49  * Notification context, simplifies client broadcasts.
50  */
51 static struct GNUNET_SERVER_NotificationContext *nc;
52
53 /**
54  * Handle to the PSYCstore.
55  */
56 static struct GNUNET_PSYCSTORE_Handle *store;
57
58 /**
59  * channel's pub_key_hash -> struct Channel
60  */
61 static struct GNUNET_CONTAINER_MultiHashMap *clients;
62
63 /**
64  * Message in the transmission queue.
65  */
66 struct TransmitMessage
67 {
68   struct TransmitMessage *prev;
69   struct TransmitMessage *next;
70
71   char *buf;
72   uint16_t size;
73   uint8_t status;
74 };
75
76 /**
77  * Common part of the client context for both a master and slave channel.
78  */
79 struct Channel
80 {
81   struct GNUNET_SERVER_Client *client;
82
83   struct TransmitMessage *tmit_head;
84   struct TransmitMessage *tmit_tail;
85
86   char *tmit_buf;
87   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
88   uint32_t tmit_mod_count;
89   uint32_t tmit_mod_recvd;
90   uint16_t tmit_size;
91   uint8_t tmit_status;
92
93   uint8_t in_transmit;
94   uint8_t is_master;
95 };
96
97 /**
98  * Client context for a channel master.
99  */
100 struct Master
101 {
102   struct Channel channel;
103   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
104   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
105   struct GNUNET_HashCode pub_key_hash;
106
107   struct GNUNET_MULTICAST_Origin *origin;
108   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
109
110   uint64_t max_message_id;
111   uint64_t max_state_message_id;
112   uint64_t max_group_generation;
113
114   /**
115    * enum GNUNET_PSYC_Policy
116    */
117   uint32_t policy;
118 };
119
120
121 /**
122  * Client context for a channel slave.
123  */
124 struct Slave
125 {
126   struct Channel channel;
127   struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
128   struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
129   struct GNUNET_HashCode chan_key_hash;
130
131   struct GNUNET_MULTICAST_Member *member;
132   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
133
134   struct GNUNET_PeerIdentity origin;
135   struct GNUNET_PeerIdentity *relays;
136   struct GNUNET_MessageHeader *join_req;
137
138   uint64_t max_message_id;
139   uint64_t max_request_id;
140
141   uint32_t relay_count;
142 };
143
144
145 /**
146  * Task run during shutdown.
147  *
148  * @param cls unused
149  * @param tc unused
150  */
151 static void
152 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
153 {
154   if (NULL != nc)
155   {
156     GNUNET_SERVER_notification_context_destroy (nc);
157     nc = NULL;
158   }
159   if (NULL != stats)
160   {
161     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
162     stats = NULL;
163   }
164 }
165
166 /**
167  * Called whenever a client is disconnected.
168  * Frees our resources associated with that client.
169  *
170  * @param cls Closure.
171  * @param client Identification of the client.
172  */
173 static void
174 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
175 {
176   if (NULL == client)
177     return;
178
179   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
180
181   struct Channel *ch
182     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
183   if (NULL == ch)
184   {
185     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
186                 "User context is NULL in client_disconnect()\n");
187     GNUNET_break (0);
188     return;
189   }
190
191   if (NULL != ch->tmit_buf)
192   {
193     GNUNET_free (ch->tmit_buf);
194     ch->tmit_buf = NULL;
195   }
196
197   if (ch->is_master)
198   {
199     struct Master *mst = (struct Master *) ch;
200     if (NULL != mst->origin)
201       GNUNET_MULTICAST_origin_stop (mst->origin);
202   }
203   else
204   {
205     struct Slave *slv = (struct Slave *) ch;
206     if (NULL != slv->join_req)
207       GNUNET_free (slv->join_req);
208     if (NULL != slv->relays)
209       GNUNET_free (slv->relays);
210     if (NULL != slv->member)
211       GNUNET_MULTICAST_member_part (slv->member);
212   }
213
214   GNUNET_free (ch);
215 }
216
217 void
218 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
219          const struct GNUNET_MessageHeader *join_req,
220          struct GNUNET_MULTICAST_JoinHandle *jh)
221 {
222
223 }
224
225 void
226 membership_test_cb (void *cls,
227                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
228                     uint64_t message_id, uint64_t group_generation,
229                     struct GNUNET_MULTICAST_MembershipTestHandle *mth)
230 {
231
232 }
233
234 void
235 replay_fragment_cb (void *cls,
236                     const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
237                     uint64_t fragment_id, uint64_t flags,
238                     struct GNUNET_MULTICAST_ReplayHandle *rh)
239 {
240
241 }
242
243 void
244 replay_message_cb (void *cls,
245                    const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
246                    uint64_t message_id,
247                    uint64_t fragment_offset,
248                    uint64_t flags,
249                    struct GNUNET_MULTICAST_ReplayHandle *rh)
250 {
251
252 }
253
254 void
255 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
256             const struct GNUNET_MessageHeader *req,
257             enum GNUNET_MULTICAST_MessageFlags flags)
258 {
259
260 }
261
262 void
263 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
264 {
265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266               "Received message of type %u from multicast.\n",
267               ntohs (msg->type));
268 }
269
270 void
271 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
272                     uint64_t max_message_id, uint64_t max_group_generation,
273                     uint64_t max_state_message_id)
274 {
275   struct Master *mst = cls;
276   struct Channel *ch = &mst->channel;
277   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
278   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
279   res->header.size = htons (sizeof (*res));
280   res->result_code = htonl (result);
281   res->max_message_id = GNUNET_htonll (max_message_id);
282
283   if (GNUNET_OK == result || GNUNET_NO == result)
284   {
285     mst->max_message_id = max_message_id;
286     mst->max_state_message_id = max_state_message_id;
287     mst->max_group_generation = max_group_generation;
288     mst->origin
289       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
290                                        max_fragment_id + 1,
291                                        join_cb, membership_test_cb,
292                                        replay_fragment_cb, replay_message_cb,
293                                        request_cb, message_cb, ch);
294   }
295   GNUNET_SERVER_notification_context_add (nc, ch->client);
296   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
297                                               GNUNET_NO);
298   GNUNET_free (res);
299 }
300
301
302 void
303 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
304                    uint64_t max_message_id, uint64_t max_group_generation,
305                    uint64_t max_state_message_id)
306 {
307   struct Slave *slv = cls;
308   struct Channel *ch = &slv->channel;
309   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
310   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
311   res->header.size = htons (sizeof (*res));
312   res->result_code = htonl (result);
313   res->max_message_id = GNUNET_htonll (max_message_id);
314
315   if (GNUNET_OK == result || GNUNET_NO == result)
316   {
317     slv->max_message_id = max_message_id;
318     slv->member
319       = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
320                                       &slv->origin,
321                                       slv->relay_count, slv->relays,
322                                       slv->join_req, join_cb,
323                                       membership_test_cb,
324                                       replay_fragment_cb, replay_message_cb,
325                                       message_cb, ch);
326   }
327
328   GNUNET_SERVER_notification_context_add (nc, ch->client);
329   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
330                                               GNUNET_NO);
331   GNUNET_free (res);
332 }
333
334
335 static void
336 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
337                      const struct GNUNET_MessageHeader *msg)
338 {
339   const struct MasterStartRequest *req
340     = (const struct MasterStartRequest *) msg;
341   struct Master *mst = GNUNET_new (struct Master);
342   mst->channel.client = client;
343   mst->channel.is_master = GNUNET_YES;
344   mst->policy = ntohl (req->policy);
345   mst->priv_key = req->channel_key;
346   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
347                                                   &mst->pub_key);
348   GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
349
350   GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
351                                  master_counters_cb, mst);
352
353   GNUNET_SERVER_client_set_user_context (client, &mst->channel);
354   GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
355                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
356   GNUNET_SERVER_receive_done (client, GNUNET_OK);
357 }
358
359
360 static void
361 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
362                    const struct GNUNET_MessageHeader *msg)
363 {
364   const struct SlaveJoinRequest *req
365     = (const struct SlaveJoinRequest *) msg;
366   struct Slave *slv = GNUNET_new (struct Slave);
367   slv->channel.client = client;
368   slv->channel.is_master = GNUNET_NO;
369   slv->slave_key = req->slave_key;
370   slv->chan_key = req->channel_key;
371   GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
372                       &slv->chan_key_hash);
373   slv->origin = req->origin;
374   slv->relay_count = ntohl (req->relay_count);
375
376   const struct GNUNET_PeerIdentity *relays
377     = (const struct GNUNET_PeerIdentity *) &req[1];
378   slv->relays
379     = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
380   uint32_t i;
381   for (i = 0; i < slv->relay_count; i++)
382     memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
383
384   GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
385                                  slave_counters_cb, slv);
386
387   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
388   GNUNET_SERVER_receive_done (client, GNUNET_OK);
389 }
390
391
392 static void
393 send_transmit_ack (struct Channel *ch)
394 {
395   struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
396   res->header.size = htons (sizeof (*res));
397   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
398   res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
399
400   GNUNET_SERVER_notification_context_add (nc, ch->client);
401   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
402                                               GNUNET_NO);
403   GNUNET_free (res);
404 }
405
406
407 static int
408 transmit_notify (void *cls, size_t *data_size, void *data)
409 {
410   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
411   struct Channel *ch = cls;
412   struct TransmitMessage *msg = ch->tmit_head;
413
414   if (NULL == msg || *data_size < ntohs (msg->size))
415   {
416     *data_size = 0;
417     return GNUNET_NO;
418   }
419
420   *data_size = ntohs (msg->size);
421   memcpy (data, msg->buf, *data_size);
422
423   GNUNET_free (ch->tmit_buf);
424   ch->tmit_buf = NULL;
425   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
426
427   return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
428 }
429
430
431 static void
432 master_transmit_message (void *cls,
433                          const struct GNUNET_SCHEDULER_TaskContext *tc)
434 {
435   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
436   struct Master *mst = cls;
437   mst->channel.tmit_task = 0;
438   if (NULL == mst->tmit_handle)
439   {
440     mst->tmit_handle
441       = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
442                                         mst->max_group_generation,
443                                         transmit_notify, mst);
444   }
445   else
446   {
447     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
448   }
449 }
450
451
452 static void
453 slave_transmit_message (void *cls,
454                         const struct GNUNET_SCHEDULER_TaskContext *tc)
455 {
456   struct Slave *slv = cls;
457   slv->channel.tmit_task = 0;
458   if (NULL == slv->tmit_handle)
459   {
460     slv->tmit_handle
461       = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
462                                           transmit_notify, slv);
463   }
464   else
465   {
466     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
467   }
468 }
469
470
471 static int
472 buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
473 {
474   uint16_t size = ntohs (msg->size);
475   struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
476
477   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
478     return GNUNET_SYSERR;
479
480   if (0 == ch->tmit_size)
481   {
482     ch->tmit_buf = GNUNET_malloc (size);
483     memcpy (ch->tmit_buf, msg, size);
484     ch->tmit_size = size;
485   }
486   else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
487   {
488     ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
489     memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
490     ch->tmit_size += size;
491   }
492
493   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
494       < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
495   {
496     struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
497     tmit_msg->buf = (char *) msg;
498     tmit_msg->size = size;
499     tmit_msg->status = ch->tmit_status;
500     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
501     tmit_delay = GNUNET_TIME_UNIT_ZERO;
502   }
503
504   if (0 != ch->tmit_task)
505     GNUNET_SCHEDULER_cancel (ch->tmit_task);
506
507   ch->tmit_task
508     = ch->is_master
509     ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
510     : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
511
512   return GNUNET_OK;
513 }
514
515 static void
516 handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
517                         const struct GNUNET_MessageHeader *msg)
518 {
519   const struct GNUNET_PSYC_MessageMethod *meth
520     = (const struct GNUNET_PSYC_MessageMethod *) msg;
521   struct Channel *ch
522     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
523   GNUNET_assert (NULL != ch);
524
525   if (GNUNET_NO != ch->in_transmit)
526   {
527     // FIXME: already transmitting a message, send back error message.
528     return;
529   }
530
531   ch->in_transmit = GNUNET_YES;
532   ch->tmit_buf = NULL;
533   ch->tmit_size = 0;
534   ch->tmit_mod_recvd = 0;
535   ch->tmit_mod_count = ntohl (meth->mod_count);
536   ch->tmit_status = GNUNET_PSYC_DATA_CONT;
537
538   buffer_message (ch, msg);
539
540   if (0 == ch->tmit_mod_count)
541     send_transmit_ack (ch);
542
543   GNUNET_SERVER_receive_done (client, GNUNET_OK);
544 };
545
546
547 static void
548 handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
549                           const struct GNUNET_MessageHeader *msg)
550 {
551   const struct GNUNET_PSYC_MessageModifier *mod
552     = (const struct GNUNET_PSYC_MessageModifier *) msg;
553   struct Channel *ch
554     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
555   GNUNET_assert (NULL != ch);
556
557   ch->tmit_mod_recvd++;
558   buffer_message (ch, msg);
559
560   if (ch->tmit_mod_recvd == ch->tmit_mod_count)
561     send_transmit_ack (ch);
562
563   GNUNET_SERVER_receive_done (client, GNUNET_OK);
564 };
565
566
567 static void
568 handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
569                       const struct GNUNET_MessageHeader *msg)
570 {
571   const struct GNUNET_PSYC_MessageData *data
572     = (const struct GNUNET_PSYC_MessageData *) msg;
573   struct Channel *ch
574     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
575   GNUNET_assert (NULL != ch);
576
577   ch->tmit_status = ntohs (data->status);
578   buffer_message (ch, msg);
579   send_transmit_ack (ch);
580
581   if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
582     ch->in_transmit = GNUNET_NO;
583
584   GNUNET_SERVER_receive_done (client, GNUNET_OK);
585 };
586
587
588 /**
589  * Initialize the PSYC service.
590  *
591  * @param cls Closure.
592  * @param server The initialized server.
593  * @param c Configuration to use.
594  */
595 static void
596 run (void *cls, struct GNUNET_SERVER_Handle *server,
597      const struct GNUNET_CONFIGURATION_Handle *c)
598 {
599   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
600     { &handle_master_start, NULL,
601       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
602
603     { &handle_slave_join, NULL,
604       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
605
606     { &handle_transmit_method, NULL,
607       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
608
609     { &handle_transmit_modifier, NULL,
610       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
611
612     { &handle_transmit_data, NULL,
613       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
614
615     { NULL, NULL, 0, 0 }
616   };
617
618   cfg = c;
619   store = GNUNET_PSYCSTORE_connect (cfg);
620   stats = GNUNET_STATISTICS_create ("psyc", cfg);
621   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
622   nc = GNUNET_SERVER_notification_context_create (server, 1);
623   GNUNET_SERVER_add_handlers (server, handlers);
624   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
625   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
626                                 NULL);
627 }
628
629
630 /**
631  * The main function for the service.
632  *
633  * @param argc number of arguments from the command line
634  * @param argv command line arguments
635  * @return 0 ok, 1 on error
636  */
637 int
638 main (int argc, char *const *argv)
639 {
640   return (GNUNET_OK ==
641           GNUNET_SERVICE_run (argc, argv, "psyc",
642                               GNUNET_SERVICE_OPTION_NONE,
643                               &run, NULL)) ? 0 : 1;
644 }
645
646 /* end of gnunet-service-psycstore.c */