made the service more resilient against out of order and simply incorrect messages
[oweals/gnunet.git] / src / psyc / gnunet-service-psyc.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/gnunet-service-psyc.c
23  * @brief PSYC service
24  * @author Gabor X Toth
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_multicast_service.h"
33 #include "gnunet_psycstore_service.h"
34 #include "gnunet_psyc_service.h"
35 #include "psyc.h"
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
38
39
40 /**
41  * Handle to our current configuration.
42  */
43 static const struct GNUNET_CONFIGURATION_Handle *cfg;
44
45 /**
46  * Handle to the statistics service.
47  */
48 static struct GNUNET_STATISTICS_Handle *stats;
49
50 /**
51  * Notification context, simplifies client broadcasts.
52  */
53 static struct GNUNET_SERVER_NotificationContext *nc;
54
55 /**
56  * Handle to the PSYCstore.
57  */
58 static struct GNUNET_PSYCSTORE_Handle *store;
59
60 /**
61  * Message in the transmission queue.
62  */
63 struct TransmitMessage
64 {
65   struct TransmitMessage *prev;
66   struct TransmitMessage *next;
67
68   char *buf;
69   uint16_t size;
70   uint8_t status;
71 };
72
73 /**
74  * Common part of the client context for both a master and slave channel.
75  */
76 struct Channel
77 {
78   struct GNUNET_SERVER_Client *client;
79
80   struct TransmitMessage *tmit_head;
81   struct TransmitMessage *tmit_tail;
82
83   char *tmit_buf;
84   uint32_t tmit_mod_count;
85   uint32_t tmit_mod_recvd;
86   uint16_t tmit_size;
87   uint8_t tmit_status;
88
89   uint8_t in_transmit;
90   uint8_t is_master;
91 };
92
93 /**
94  * Client context for a channel master.
95  */
96 struct Master
97 {
98   struct Channel channel;
99   struct GNUNET_CRYPTO_EccPrivateKey private_key;
100   struct GNUNET_CRYPTO_EccPublicSignKey public_key;
101
102   struct GNUNET_MULTICAST_Origin *origin;
103   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
104
105   uint64_t max_message_id;
106   uint64_t max_state_message_id;
107   uint64_t max_group_generation;
108
109   /**
110    * enum GNUNET_PSYC_Policy
111    */
112   uint32_t policy;
113 };
114
115
116 /**
117  * Client context for a channel slave.
118  */
119 struct Slave
120 {
121   struct Channel channel;
122   struct GNUNET_CRYPTO_EccPrivateKey slave_key;
123   struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
124
125   struct GNUNET_MULTICAST_Member *member;
126   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
127
128   uint64_t max_message_id;
129   uint64_t max_request_id;
130 };
131
132
133 /**
134  * Task run during shutdown.
135  *
136  * @param cls unused
137  * @param tc unused
138  */
139 static void
140 shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
141 {
142   if (NULL != nc)
143   {
144     GNUNET_SERVER_notification_context_destroy (nc);
145     nc = NULL;
146   }
147   if (NULL != stats)
148   {
149     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
150     stats = NULL;
151   }
152 }
153
154 /**
155  * Called whenever a client is disconnected.
156  * Frees our resources associated with that client.
157  *
158  * @param cls Closure.
159  * @param client Identification of the client.
160  */
161 static void
162 client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
163 {
164   if (NULL == client)
165     return;
166
167   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
168
169   struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
170                                                               struct Channel);
171   GNUNET_assert (NULL != ch);
172
173   if (NULL != ch->tmit_buf)
174   {
175     GNUNET_free (ch->tmit_buf);
176     ch->tmit_buf = NULL;
177   }
178   GNUNET_free (ch);
179 }
180
181
182 void
183 counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id,
184              uint64_t max_group_generation, uint64_t max_state_message_id)
185 {
186   struct Channel *ch = cls;
187   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
188   res->header.size = htons (sizeof (*res));
189   res->max_message_id = GNUNET_htonll (max_message_id);
190
191   if (ch->is_master)
192   {
193     struct Master *mst = cls;
194     mst->max_message_id = max_message_id;
195     mst->max_state_message_id = max_state_message_id;
196     mst->max_group_generation = max_group_generation;
197     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
198   }
199   else
200   {
201     struct Slave *slv = cls;
202     slv->max_message_id = max_message_id;
203     res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
204   }
205
206   GNUNET_SERVER_notification_context_add (nc, ch->client);
207   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
208                                               GNUNET_NO);
209   GNUNET_free (res);
210 }
211
212
213 static void
214 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
215                      const struct GNUNET_MessageHeader *msg)
216 {
217   const struct MasterStartRequest *req
218     = (const struct MasterStartRequest *) msg;
219   struct Master *mst = GNUNET_new (struct Master);
220   mst->channel.client = client;
221   mst->channel.is_master = GNUNET_YES;
222   mst->policy = ntohl (req->policy);
223   mst->private_key = req->channel_key;
224   GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key,
225                                                   &mst->public_key);
226
227   GNUNET_PSYCSTORE_counters_get (store, &mst->public_key,
228                                  counters_cb, mst);
229
230   GNUNET_SERVER_client_set_user_context (client, mst);
231   GNUNET_SERVER_receive_done (client, GNUNET_OK);
232 }
233
234
235 static void
236 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
237                    const struct GNUNET_MessageHeader *msg)
238 {
239   const struct SlaveJoinRequest *req
240     = (const struct SlaveJoinRequest *) msg;
241   struct Slave *slv = GNUNET_new (struct Slave);
242   slv->channel.client = client;
243   slv->channel.is_master = GNUNET_NO;
244   slv->channel_key = req->channel_key;
245   slv->slave_key = req->slave_key;
246
247   GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key,
248                                  counters_cb, slv);
249
250   GNUNET_SERVER_client_set_user_context (client, slv);
251   GNUNET_SERVER_receive_done (client, GNUNET_OK);
252 }
253
254
255 static void
256 send_transmit_ack (struct Channel *ch)
257 {
258   struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
259   res->header.size = htons (sizeof (*res));
260   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
261   res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
262
263   GNUNET_SERVER_notification_context_add (nc, ch->client);
264   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
265                                               GNUNET_NO);
266   GNUNET_free (res);
267 }
268
269
270 static int
271 transmit_notify (void *cls, uint64_t fragment_id, size_t *data_size, void *data)
272 {
273   struct Channel *ch = cls;
274   struct TransmitMessage *msg = ch->tmit_head;
275
276   if (NULL == msg || *data_size < msg->size)
277   {
278     *data_size = 0;
279     return GNUNET_NO;
280   }
281
282   memcpy (data, msg->buf, msg->size);
283   *data_size = msg->size;
284
285   GNUNET_free (ch->tmit_buf);
286   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
287
288   return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
289 }
290
291
292 static int
293 master_transmit_message (struct Master *mst)
294 {
295   if (NULL == mst->tmit_handle)
296   {
297     mst->tmit_handle
298       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
299                                         mst->max_group_generation,
300                                         transmit_notify, mst);
301   }
302   else
303   {
304     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
305   }
306   return GNUNET_OK;
307 }
308
309
310 static int
311 slave_transmit_message (struct Slave *slv)
312 {
313   if (NULL == slv->tmit_handle)
314   {
315     slv->tmit_handle
316       = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id,
317                                           transmit_notify, slv);
318   }
319   else
320   {
321     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
322   }
323   return GNUNET_OK;
324 }
325
326
327 static int
328 buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
329 {
330   uint16_t size = ntohs (msg->size);
331
332   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
333     return GNUNET_SYSERR;
334
335   if (0 == ch->tmit_size)
336   {
337     ch->tmit_buf = GNUNET_malloc (size);
338     memcpy (ch->tmit_buf, msg, size);
339     ch->tmit_size = size;
340   }
341   else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
342   {
343     ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
344     memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
345     ch->tmit_size += size;
346   }
347
348   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
349       < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
350   {
351     struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
352     tmit_msg->buf = (char *) msg;
353     tmit_msg->size = size;
354     tmit_msg->status = ch->tmit_status;
355     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
356
357     ch->is_master
358       ? master_transmit_message ((struct Master *) ch)
359       : slave_transmit_message ((struct Slave *) ch);
360   }
361
362   return GNUNET_OK;
363 }
364
365 static void
366 handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
367                         const struct GNUNET_MessageHeader *msg)
368 {
369   const struct GNUNET_PSYC_MessageMethod *meth
370     = (const struct GNUNET_PSYC_MessageMethod *) msg;
371   struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
372                                                               struct Channel);
373   GNUNET_assert (NULL != ch);
374
375   if (GNUNET_NO != ch->in_transmit)
376   {
377     // FIXME: already transmitting a message, send back error message.
378     return;
379   }
380
381   ch->tmit_buf = NULL;
382   ch->tmit_size = 0;
383   ch->tmit_mod_recvd = 0;
384   ch->tmit_mod_count = ntohl (meth->mod_count);
385   ch->tmit_status = GNUNET_PSYC_DATA_CONT;
386
387   buffer_message (ch, msg);
388
389   if (0 == ch->tmit_mod_count)
390     send_transmit_ack (ch);
391 };
392
393
394 static void
395 handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
396                           const struct GNUNET_MessageHeader *msg)
397 {
398   const struct GNUNET_PSYC_MessageModifier *mod
399     = (const struct GNUNET_PSYC_MessageModifier *) msg;
400   struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
401                                                               struct Channel);
402   GNUNET_assert (NULL != ch);
403
404   ch->tmit_mod_recvd++;
405   buffer_message (ch, msg);
406
407   if (ch->tmit_mod_recvd == ch->tmit_mod_count)
408     send_transmit_ack (ch);
409 };
410
411
412 static void
413 handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
414                       const struct GNUNET_MessageHeader *msg)
415 {
416   const struct GNUNET_PSYC_MessageData *data
417     = (const struct GNUNET_PSYC_MessageData *) msg;
418   struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
419                                                               struct Channel);
420   GNUNET_assert (NULL != ch);
421
422   ch->tmit_status = data->status;
423   buffer_message (ch, msg);
424   send_transmit_ack (ch);
425 };
426
427
428 /**
429  * Initialize the PSYC service.
430  *
431  * @param cls Closure.
432  * @param server The initialized server.
433  * @param c Configuration to use.
434  */
435 static void
436 run (void *cls, struct GNUNET_SERVER_Handle *server,
437      const struct GNUNET_CONFIGURATION_Handle *c)
438 {
439   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
440     { &handle_master_start, NULL,
441       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
442
443     { &handle_slave_join, NULL,
444       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
445
446     { &handle_transmit_method, NULL,
447       GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 },
448
449     { &handle_transmit_modifier, NULL,
450       GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 },
451
452     { &handle_transmit_data, NULL,
453       GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 },
454
455     { NULL, NULL, 0, 0 }
456   };
457
458   cfg = c;
459   store = GNUNET_PSYCSTORE_connect (cfg);
460   stats = GNUNET_STATISTICS_create ("psyc", cfg);
461   nc = GNUNET_SERVER_notification_context_create (server, 1);
462   GNUNET_SERVER_add_handlers (server, handlers);
463   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
464   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
465                                 NULL);
466 }
467
468
469 /**
470  * The main function for the service.
471  *
472  * @param argc number of arguments from the command line
473  * @param argv command line arguments
474  * @return 0 ok, 1 on error
475  */
476 int
477 main (int argc, char *const *argv)
478 {
479   return (GNUNET_OK ==
480           GNUNET_SERVICE_run (argc, argv, "psyc",
481                               GNUNET_SERVICE_OPTION_NONE,
482                               &run, NULL)) ? 0 : 1;
483 }
484
485 /* end of gnunet-service-psycstore.c */