- Convince 'make -j 10' to work nicely.
[oweals/gnunet.git] / src / psyc / psyc_util_lib.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_env_lib.h"
32 #include "gnunet_psyc_service.h"
33 #include "gnunet_psyc_util_lib.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
36
37 /**
38  * Message receive states.
39  */
40 enum GNUNET_PSYC_MessageState
41 {
42   GNUNET_PSYC_MESSAGE_STATE_START    = 0,
43   GNUNET_PSYC_MESSAGE_STATE_HEADER   = 1,
44   GNUNET_PSYC_MESSAGE_STATE_METHOD   = 2,
45   GNUNET_PSYC_MESSAGE_STATE_MODIFIER = 3,
46   GNUNET_PSYC_MESSAGE_STATE_MOD_CONT = 4,
47   GNUNET_PSYC_MESSAGE_STATE_DATA     = 5,
48   GNUNET_PSYC_MESSAGE_STATE_END      = 6,
49   GNUNET_PSYC_MESSAGE_STATE_CANCEL   = 7,
50   GNUNET_PSYC_MESSAGE_STATE_ERROR    = 8,
51 };
52
53
54 struct GNUNET_PSYC_TransmitHandle
55 {
56   /**
57    * Client connection to service.
58    */
59   struct GNUNET_CLIENT_MANAGER_Connection *client;
60
61   /**
62    * Message currently being received from the client.
63    */
64   struct GNUNET_MessageHeader *msg;
65
66   /**
67    * Callback to request next modifier from client.
68    */
69   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
70
71   /**
72    * Closure for the notify callbacks.
73    */
74   void *notify_mod_cls;
75
76   /**
77    * Callback to request next data fragment from client.
78    */
79   GNUNET_PSYC_TransmitNotifyData notify_data;
80
81   /**
82    * Closure for the notify callbacks.
83    */
84   void *notify_data_cls;
85
86   /**
87    * Modifier of the environment that is currently being transmitted.
88    */
89   struct GNUNET_ENV_Modifier *mod;
90
91   /**
92    *
93    */
94   const char *mod_value;
95   size_t mod_value_size;
96
97   /**
98    * State of the current message being received from client.
99    */
100   enum GNUNET_PSYC_MessageState state;
101
102   /**
103    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
104    */
105   uint8_t acks_pending;
106
107   /**
108    * Is transmission paused?
109    */
110   uint8_t paused;
111
112   /**
113    * Are we currently transmitting a message?
114    */
115   uint8_t in_transmit;
116 };
117
118
119
120 struct GNUNET_PSYC_ReceiveHandle
121 {
122   /**
123    * Message part callback.
124    */
125   GNUNET_PSYC_MessageCallback message_cb;
126
127   /**
128    * Message part callback for historic message.
129    */
130   GNUNET_PSYC_MessageCallback hist_message_cb;
131
132   /**
133    * Closure for the callbacks.
134    */
135   void *cb_cls;
136
137   /**
138    * ID of the message being received from the PSYC service.
139    */
140   uint64_t message_id;
141
142   /**
143    * Public key of the slave from which a message is being received.
144    */
145   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
146
147   /**
148    * State of the currently being received message from the PSYC service.
149    */
150   enum GNUNET_PSYC_MessageState state;
151
152   /**
153    * Flags for the currently being received message from the PSYC service.
154    */
155   enum GNUNET_PSYC_MessageFlags flags;
156
157   /**
158    * Expected value size for the modifier being received from the PSYC service.
159    */
160   uint32_t mod_value_size_expected;
161
162   /**
163    * Actual value size for the modifier being received from the PSYC service.
164    */
165   uint32_t mod_value_size;
166 };
167
168
169 void
170 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
171                          const struct GNUNET_MessageHeader *msg)
172 {
173   uint16_t size = ntohs (msg->size);
174   uint16_t type = ntohs (msg->type);
175   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
176   switch (type)
177   {
178   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
179   {
180     struct GNUNET_PSYC_MessageHeader *pmsg
181       = (struct GNUNET_PSYC_MessageHeader *) msg;
182     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
183                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
184     break;
185   }
186   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
187   {
188     struct GNUNET_PSYC_MessageMethod *meth
189       = (struct GNUNET_PSYC_MessageMethod *) msg;
190     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
191     break;
192   }
193   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
194   {
195     struct GNUNET_PSYC_MessageModifier *mod
196       = (struct GNUNET_PSYC_MessageModifier *) msg;
197     uint16_t name_size = ntohs (mod->name_size);
198     char oper = ' ' < mod->oper ? mod->oper : ' ';
199     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
200                 size - sizeof (*mod) - name_size - 1,
201                 ((char *) &mod[1]) + name_size + 1);
202     break;
203   }
204   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
205   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
206     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
207     break;
208   }
209 }
210
211
212 /**** Transmitting messages ****/
213
214
215 /**
216  * Create a transmission handle.
217  */
218 struct GNUNET_PSYC_TransmitHandle *
219 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
220 {
221   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
222   tmit->client = client;
223   return tmit;
224 }
225
226
227 /**
228  * Destroy a transmission handle.
229  */
230 void
231 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
232 {
233   GNUNET_free (tmit);
234 }
235
236
237 /**
238  * Queue a message part for transmission.
239  *
240  * The message part is added to the current message buffer.
241  * When this buffer is full, it is added to the transmission queue.
242  *
243  * @param tmit  Transmission handle.
244  * @param msg  Message part, or NULL.
245  * @param end  End of message? #GNUNET_YES or #GNUNET_NO.
246  */
247 static void
248 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
249                        const struct GNUNET_MessageHeader *msg,
250                        uint8_t end)
251 {
252   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
253
254   LOG (GNUNET_ERROR_TYPE_DEBUG,
255        "Queueing message of type %u and size %u (end: %u)).\n",
256        ntohs (msg->type), size, end);
257
258   if (NULL != tmit->msg)
259   {
260     if (NULL == msg
261         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
262     {
263       /* End of message or buffer is full, add it to transmission queue
264        * and start with empty buffer */
265       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
266       tmit->msg->size = htons (tmit->msg->size);
267       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
268       tmit->msg = NULL;
269       tmit->acks_pending++;
270     }
271     else
272     {
273       /* Message fits in current buffer, append */
274       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
275       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
276       tmit->msg->size += size;
277     }
278   }
279
280   if (NULL == tmit->msg && NULL != msg)
281   {
282     /* Empty buffer, copy over message. */
283     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
284     tmit->msg->size = sizeof (*tmit->msg) + size;
285     memcpy (&tmit->msg[1], msg, size);
286   }
287
288   if (NULL != tmit->msg
289       && (GNUNET_YES == end
290           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
291               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
292   {
293     /* End of message or buffer is full, add it to transmission queue. */
294     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
295     tmit->msg->size = htons (tmit->msg->size);
296     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
297     tmit->msg = NULL;
298     tmit->acks_pending++;
299   }
300
301   if (GNUNET_YES == end)
302     tmit->in_transmit = GNUNET_NO;
303 }
304
305
306 /**
307  * Request data from client to transmit.
308  *
309  * @param tmit  Transmission handle.
310  */
311 static void
312 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
313 {
314   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
315   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
316   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
317   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
318
319   int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
320   switch (notify_ret)
321   {
322   case GNUNET_NO:
323     if (0 == data_size)
324     {
325       /* Transmission paused, nothing to send. */
326       tmit->paused = GNUNET_YES;
327       return;
328     }
329     break;
330
331   case GNUNET_YES:
332     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
333     break;
334
335   default:
336     LOG (GNUNET_ERROR_TYPE_ERROR,
337          "TransmitNotifyData callback returned error when requesting data.\n");
338
339     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
340     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
341     msg->size = htons (sizeof (*msg));
342     transmit_queue_insert (tmit, msg, GNUNET_YES);
343     return;
344   }
345
346   if (0 < data_size)
347   {
348     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
349     msg->size = htons (sizeof (*msg) + data_size);
350     transmit_queue_insert (tmit, msg, !notify_ret);
351   }
352
353   /* End of message. */
354   if (GNUNET_YES == notify_ret)
355   {
356     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
357     msg->size = htons (sizeof (*msg));
358     transmit_queue_insert (tmit, msg, GNUNET_YES);
359   }
360 }
361
362
363 /**
364  * Request a modifier from a client to transmit.
365  *
366  * @param tmit  Transmission handle.
367  */
368 static void
369 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
370 {
371   uint16_t max_data_size, data_size;
372   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
373   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
374   int notify_ret;
375
376   switch (tmit->state)
377   {
378   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
379   {
380     struct GNUNET_PSYC_MessageModifier *mod
381       = (struct GNUNET_PSYC_MessageModifier *) msg;
382     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
383     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
384     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
385     notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
386                                    &mod->oper, &mod->value_size);
387     mod->name_size = strnlen ((char *) &mod[1], data_size);
388     if (mod->name_size < data_size)
389     {
390       mod->value_size = htonl (mod->value_size);
391       mod->name_size = htons (mod->name_size);
392     }
393     else if (0 < data_size)
394     {
395       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
396       notify_ret = GNUNET_SYSERR;
397     }
398     break;
399   }
400   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
401   {
402     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
403     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
404     msg->size = sizeof (struct GNUNET_MessageHeader);
405     notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
406                                    &data_size, &msg[1], NULL, NULL);
407     break;
408   }
409   default:
410     GNUNET_assert (0);
411   }
412
413   switch (notify_ret)
414   {
415   case GNUNET_NO:
416     if (0 == data_size)
417     { /* Transmission paused, nothing to send. */
418       tmit->paused = GNUNET_YES;
419       return;
420     }
421     tmit->state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
422     break;
423
424   case GNUNET_YES:
425     if (0 == data_size)
426     {
427       /* End of modifiers. */
428       tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
429       if (0 == tmit->acks_pending)
430         transmit_data (tmit);
431
432       return;
433     }
434     tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
435     break;
436
437   default:
438     LOG (GNUNET_ERROR_TYPE_ERROR,
439          "TransmitNotifyModifier callback returned error "
440          "when requesting a modifier.\n");
441
442     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
443     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
444     msg->size = htons (sizeof (*msg));
445
446     transmit_queue_insert (tmit, msg, GNUNET_YES);
447     return;
448   }
449
450   if (0 < data_size)
451   {
452     GNUNET_assert (data_size <= max_data_size);
453     msg->size = htons (msg->size + data_size);
454     transmit_queue_insert (tmit, msg, GNUNET_NO);
455   }
456
457   transmit_mod (tmit);
458 }
459
460
461 int
462 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
463                      uint32_t *full_value_size)
464
465 {
466   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
467   uint16_t name_size = 0;
468   size_t value_size = 0;
469   const char *value = NULL;
470
471   if (NULL != oper && NULL != tmit->mod)
472   { /* New modifier */
473     tmit->mod = tmit->mod->next;
474     if (NULL == tmit->mod)
475     { /* No more modifiers, continue with data */
476       *data_size = 0;
477       return GNUNET_YES;
478     }
479
480     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
481     *full_value_size = tmit->mod->value_size;
482     *oper = tmit->mod->oper;
483     name_size = strlen (tmit->mod->name);
484
485     if (name_size + 1 + tmit->mod->value_size <= *data_size)
486     {
487       *data_size = name_size + 1 + tmit->mod->value_size;
488     }
489     else
490     {
491       tmit->mod_value_size = tmit->mod->value_size;
492       value_size = *data_size - name_size - 1;
493       tmit->mod_value_size -= value_size;
494       tmit->mod_value = tmit->mod->value + value_size;
495     }
496
497     memcpy (data, tmit->mod->name, name_size);
498     ((char *)data)[name_size] = '\0';
499     memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
500   }
501   else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
502   { /* Modifier continuation */
503     value = tmit->mod_value;
504     if (tmit->mod_value_size <= *data_size)
505     {
506       value_size = tmit->mod_value_size;
507       tmit->mod_value = NULL;
508     }
509     else
510     {
511       value_size = *data_size;
512       tmit->mod_value += value_size;
513     }
514     tmit->mod_value_size -= value_size;
515
516     if (*data_size < value_size)
517     {
518       LOG (GNUNET_ERROR_TYPE_DEBUG,
519            "Value in environment larger than buffer: %u < %zu\n",
520            *data_size, value_size);
521       *data_size = 0;
522       return GNUNET_NO;
523     }
524
525     *data_size = value_size;
526     memcpy (data, value, value_size);
527   }
528
529   return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
530 }
531
532
533 /**
534  * Transmit a message.
535  *
536  * @param tmit         Transmission handle.
537  * @param method_name  Which method should be invoked.
538  * @param env          Environment for the message.
539  *   Should stay available until the first call to notify_data.
540  *   Can be NULL if there are no modifiers or @a notify_mod is provided instead.
541  * @param notify_mod   Function to call to obtain modifiers.
542  *   Can be NULL if there are no modifiers or @a env is provided instead.
543  * @param notify_data  Function to call to obtain fragments of the data.
544  * @param notify_cls   Closure for @a notify_mod and @a notify_data.
545  * @param flags        Flags for the message being transmitted.
546  *
547  * @return #GNUNET_OK if the transmission was started.
548  *         #GNUNET_SYSERR if another transmission is already going on.
549  */
550 int
551 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
552                               const char *method_name,
553                               const struct GNUNET_ENV_Environment *env,
554                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
555                               GNUNET_PSYC_TransmitNotifyData notify_data,
556                               void *notify_cls,
557                               uint32_t flags)
558 {
559   if (GNUNET_NO != tmit->in_transmit)
560     return GNUNET_SYSERR;
561   tmit->in_transmit = GNUNET_YES;
562
563   size_t size = strlen (method_name) + 1;
564   struct GNUNET_PSYC_MessageMethod *pmeth;
565   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
566   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
567
568   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
569   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
570   pmeth->header.size = htons (sizeof (*pmeth) + size);
571   pmeth->flags = htonl (flags);
572   memcpy (&pmeth[1], method_name, size);
573
574   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
575   tmit->notify_data = notify_data;
576   tmit->notify_data_cls = notify_cls;
577
578   if (NULL != notify_mod)
579   {
580     tmit->notify_mod = notify_mod;
581     tmit->notify_mod_cls = notify_cls;
582   }
583   else
584   {
585     tmit->notify_mod = &transmit_notify_env;
586     tmit->notify_mod_cls = tmit;
587     tmit->mod
588       = (NULL != env)
589       ? GNUNET_ENV_environment_head (env)
590       : NULL;
591   }
592
593   transmit_mod (tmit);
594   return GNUNET_OK;
595 }
596
597
598 /**
599  * Resume transmission.
600  *
601  * @param tmit  Transmission handle.
602  */
603 void
604 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
605 {
606   if (0 == tmit->acks_pending)
607   {
608     tmit->paused = GNUNET_NO;
609     transmit_data (tmit);
610   }
611 }
612
613
614 /**
615  * Abort transmission request.
616  *
617  * @param tmit  Transmission handle.
618  */
619 void
620 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
621 {
622   if (GNUNET_NO == tmit->in_transmit)
623     return;
624
625   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
626   tmit->in_transmit = GNUNET_NO;
627   tmit->paused = GNUNET_NO;
628
629   /* FIXME */
630   struct GNUNET_MessageHeader msg;
631   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
632   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
633   msg.size = htons (sizeof (msg));
634   transmit_queue_insert (tmit, &msg, GNUNET_YES);
635 }
636
637
638 /**
639  * Got acknowledgement of a transmitted message part, continue transmission.
640  *
641  * @param tmit  Transmission handle.
642  */
643 void
644 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
645 {
646   if (0 == tmit->acks_pending)
647   {
648     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
649     GNUNET_break (0);
650     return;
651   }
652   tmit->acks_pending--;
653
654   switch (tmit->state)
655   {
656   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
657   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
658     if (GNUNET_NO == tmit->paused)
659       transmit_mod (tmit);
660     break;
661
662   case GNUNET_PSYC_MESSAGE_STATE_DATA:
663     if (GNUNET_NO == tmit->paused)
664       transmit_data (tmit);
665     break;
666
667   case GNUNET_PSYC_MESSAGE_STATE_END:
668   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
669     break;
670
671   default:
672     LOG (GNUNET_ERROR_TYPE_DEBUG,
673          "Ignoring message ACK in state %u.\n", tmit->state);
674   }
675 }
676
677
678 /**** Receiving messages ****/
679
680
681 /**
682  * Create handle for receiving messages.
683  */
684 struct GNUNET_PSYC_ReceiveHandle *
685 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
686                             GNUNET_PSYC_MessageCallback hist_message_cb,
687                             void *cb_cls)
688 {
689   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
690   recv->message_cb = message_cb;
691   recv->hist_message_cb = hist_message_cb;
692   recv->cb_cls = cb_cls;
693   return recv;
694 }
695
696
697 /**
698  * Destroy handle for receiving messages.
699  */
700 void
701 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
702 {
703   GNUNET_free (recv);
704 }
705
706
707 /**
708  * Reset stored data related to the last received message.
709  */
710 void
711 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
712 {
713   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
714   recv->flags = 0;
715   recv->message_id = 0;
716   recv->mod_value_size = 0;
717   recv->mod_value_size_expected = 0;
718 }
719
720
721 static void
722 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
723 {
724   GNUNET_PSYC_MessageCallback message_cb
725     = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC
726     ? recv->hist_message_cb
727     : recv->message_cb;
728
729   if (NULL != message_cb)
730     message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
731
732   GNUNET_PSYC_receive_reset (recv);
733 }
734
735
736 /**
737  * Handle incoming PSYC message.
738  *
739  * @param recv  Receive handle.
740  * @param msg   The message.
741  *
742  * @return #GNUNET_OK on success,
743  *         #GNUNET_SYSERR on receive error.
744  */
745 int
746 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
747                              const struct GNUNET_PSYC_MessageHeader *msg)
748 {
749   uint16_t size = ntohs (msg->header.size);
750   uint32_t flags = ntohl (msg->flags);
751
752   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
753                            (struct GNUNET_MessageHeader *) msg);
754
755   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
756   {
757     recv->message_id = GNUNET_ntohll (msg->message_id);
758     recv->flags = flags;
759     recv->slave_key = msg->slave_key;
760     recv->mod_value_size = 0;
761     recv->mod_value_size_expected = 0;
762   }
763   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
764   {
765     // FIXME
766     LOG (GNUNET_ERROR_TYPE_WARNING,
767          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
768          GNUNET_ntohll (msg->message_id), recv->message_id);
769     GNUNET_break_op (0);
770     recv_error (recv);
771     return GNUNET_SYSERR;
772   }
773   else if (flags != recv->flags)
774   {
775     LOG (GNUNET_ERROR_TYPE_WARNING,
776          "Unexpected message flags. Got: %lu, expected: %lu\n",
777          flags, recv->flags);
778     GNUNET_break_op (0);
779     recv_error (recv);
780     return GNUNET_SYSERR;
781   }
782
783   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
784
785   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
786   {
787     const struct GNUNET_MessageHeader *pmsg
788       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
789     psize = ntohs (pmsg->size);
790     ptype = ntohs (pmsg->type);
791     size_eq = size_min = 0;
792
793     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
794     {
795       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
796                   "Dropping message of type %u with invalid size %u.\n",
797                   ptype, psize);
798       recv_error (recv);
799       return GNUNET_SYSERR;
800     }
801
802     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
803                 "Received message part from PSYC.\n");
804     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
805
806     switch (ptype)
807     {
808     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
809       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
810       break;
811     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
812       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
813       break;
814     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
815     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
816       size_min = sizeof (struct GNUNET_MessageHeader);
817       break;
818     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
819     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
820       size_eq = sizeof (struct GNUNET_MessageHeader);
821       break;
822     default:
823       GNUNET_break_op (0);
824       recv_error (recv);
825       return GNUNET_SYSERR;
826     }
827
828     if (! ((0 < size_eq && psize == size_eq)
829            || (0 < size_min && size_min <= psize)))
830     {
831       GNUNET_break_op (0);
832       recv_error (recv);
833       return GNUNET_SYSERR;
834     }
835
836     switch (ptype)
837     {
838     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
839     {
840       struct GNUNET_PSYC_MessageMethod *meth
841         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
842
843       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
844       {
845         LOG (GNUNET_ERROR_TYPE_WARNING,
846              "Dropping out of order message method (%u).\n",
847              recv->state);
848         /* It is normal to receive an incomplete message right after connecting,
849          * but should not happen later.
850          * FIXME: add a check for this condition.
851          */
852         GNUNET_break_op (0);
853         recv_error (recv);
854         return GNUNET_SYSERR;
855       }
856
857       if ('\0' != *((char *) meth + psize - 1))
858       {
859         LOG (GNUNET_ERROR_TYPE_WARNING,
860              "Dropping message with malformed method. "
861              "Message ID: %" PRIu64 "\n", recv->message_id);
862         GNUNET_break_op (0);
863         recv_error (recv);
864         return GNUNET_SYSERR;
865       }
866       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
867       break;
868     }
869     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
870     {
871       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
872             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
873             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
874       {
875         LOG (GNUNET_ERROR_TYPE_WARNING,
876              "Dropping out of order message modifier (%u).\n",
877              recv->state);
878         GNUNET_break_op (0);
879         recv_error (recv);
880         return GNUNET_SYSERR;
881       }
882
883       struct GNUNET_PSYC_MessageModifier *mod
884         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
885
886       uint16_t name_size = ntohs (mod->name_size);
887       recv->mod_value_size_expected = ntohl (mod->value_size);
888       recv->mod_value_size = psize - sizeof (*mod) - name_size - 1;
889
890       if (psize < sizeof (*mod) + name_size + 1
891           || '\0' != *((char *) &mod[1] + name_size)
892           || recv->mod_value_size_expected < recv->mod_value_size)
893       {
894         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
895         GNUNET_break_op (0);
896         recv_error (recv);
897         return GNUNET_SYSERR;
898       }
899       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
900       break;
901     }
902     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
903     {
904       recv->mod_value_size += psize - sizeof (*pmsg);
905
906       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
907             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
908           || recv->mod_value_size_expected < recv->mod_value_size)
909       {
910         LOG (GNUNET_ERROR_TYPE_WARNING,
911              "Dropping out of order message modifier continuation "
912              "!(%u == %u || %u == %u) || %lu < %lu.\n",
913              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
914              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
915              recv->mod_value_size_expected, recv->mod_value_size);
916         GNUNET_break_op (0);
917         recv_error (recv);
918         return GNUNET_SYSERR;
919       }
920       break;
921     }
922     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
923     {
924       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
925           || recv->mod_value_size_expected != recv->mod_value_size)
926       {
927         LOG (GNUNET_ERROR_TYPE_WARNING,
928              "Dropping out of order message data fragment "
929              "(%u < %u || %lu != %lu).\n",
930              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
931              recv->mod_value_size_expected, recv->mod_value_size);
932
933         GNUNET_break_op (0);
934         recv_error (recv);
935         return GNUNET_SYSERR;
936       }
937       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
938       break;
939     }
940     }
941
942     GNUNET_PSYC_MessageCallback message_cb
943       = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC
944       ? recv->hist_message_cb
945       : recv->message_cb;
946
947     if (NULL != message_cb)
948       message_cb (recv->cb_cls, recv->message_id, recv->flags, pmsg);
949
950     switch (ptype)
951     {
952     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
953     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
954       GNUNET_PSYC_receive_reset (recv);
955       break;
956     }
957   }
958   return GNUNET_OK;
959 }
960
961
962 /**
963  * Check if @a data contains a series of valid message parts.
964  *
965  * @param      data_size    Size of @a data.
966  * @param      data         Data.
967  * @param[out] first_ptype  Type of first message part.
968  * @param[out] last_ptype   Type of last message part.
969  *
970  * @return Number of message parts found in @a data.
971  *         or GNUNET_SYSERR if the message contains invalid parts.
972  */
973 int
974 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
975                                  uint16_t *first_ptype, uint16_t *last_ptype)
976 {
977   const struct GNUNET_MessageHeader *pmsg;
978   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
979   if (NULL != first_ptype)
980     *first_ptype = 0;
981   if (NULL != last_ptype)
982     *last_ptype = 0;
983
984   for (pos = 0; pos < data_size; pos += psize, parts++)
985   {
986     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
987     psize = ntohs (pmsg->size);
988     ptype = ntohs (pmsg->type);
989     if (0 == parts && NULL != first_ptype)
990       *first_ptype = ptype;
991     if (NULL != last_ptype
992         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
993       *last_ptype = ptype;
994     if (psize < sizeof (*pmsg)
995         || pos + psize > data_size
996         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
997         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
998     {
999       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1000                   "Invalid message part of type %u and size %u.\n",
1001                   ptype, psize);
1002       return GNUNET_SYSERR;
1003     }
1004     /* FIXME: check message part order */
1005   }
1006   return parts;
1007 }