reading alpha, beta from .conf
[oweals/gnunet.git] / src / psyc / psyc_util_lib.c
1 /*
2  * This file is part of GNUnet
3  * (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file psyc/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_env_lib.h"
32 #include "gnunet_psyc_service.h"
33 #include "gnunet_psyc_util_lib.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
36
37
38 struct GNUNET_PSYC_TransmitHandle
39 {
40   /**
41    * Client connection to service.
42    */
43   struct GNUNET_CLIENT_MANAGER_Connection *client;
44
45   /**
46    * Message currently being received from the client.
47    */
48   struct GNUNET_MessageHeader *msg;
49
50   /**
51    * Callback to request next modifier from client.
52    */
53   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
54
55   /**
56    * Closure for the notify callbacks.
57    */
58   void *notify_mod_cls;
59
60   /**
61    * Callback to request next data fragment from client.
62    */
63   GNUNET_PSYC_TransmitNotifyData notify_data;
64
65   /**
66    * Closure for the notify callbacks.
67    */
68   void *notify_data_cls;
69
70   /**
71    * Modifier of the environment that is currently being transmitted.
72    */
73   struct GNUNET_ENV_Modifier *mod;
74
75   /**
76    *
77    */
78   const char *mod_value;
79
80   /**
81    * Number of bytes remaining to be transmitted from the current modifier value.
82    */
83   uint32_t mod_value_remaining;
84
85   /**
86    * State of the current message being received from client.
87    */
88   enum GNUNET_PSYC_MessageState state;
89
90   /**
91    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
92    */
93   uint8_t acks_pending;
94
95   /**
96    * Is transmission paused?
97    */
98   uint8_t paused;
99
100   /**
101    * Are we currently transmitting a message?
102    */
103   uint8_t in_transmit;
104 };
105
106
107
108 struct GNUNET_PSYC_ReceiveHandle
109 {
110   /**
111    * Message callback.
112    */
113   GNUNET_PSYC_MessageCallback message_cb;
114
115   /**
116    * Message part callback.
117    */
118   GNUNET_PSYC_MessagePartCallback message_part_cb;
119
120   /**
121    * Closure for the callbacks.
122    */
123   void *cb_cls;
124
125   /**
126    * ID of the message being received from the PSYC service.
127    */
128   uint64_t message_id;
129
130   /**
131    * Public key of the slave from which a message is being received.
132    */
133   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
134
135   /**
136    * State of the currently being received message from the PSYC service.
137    */
138   enum GNUNET_PSYC_MessageState state;
139
140   /**
141    * Flags for the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageFlags flags;
144
145   /**
146    * Expected value size for the modifier being received from the PSYC service.
147    */
148   uint32_t mod_value_size_expected;
149
150   /**
151    * Actual value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size;
154 };
155
156
157 /**** Messages ****/
158
159
160 /**
161  * Create a PSYC message.
162  *
163  * @param method_name
164  *        PSYC method for the message.
165  * @param env
166  *        Environment for the message.
167  * @param data
168  *        Data payload for the message.
169  * @param data_size
170  *        Size of @a data.
171  *
172  * @return Message header with size information,
173  *         followed by the message parts.
174  */
175 struct GNUNET_PSYC_Message *
176 GNUNET_PSYC_message_create (const char *method_name,
177                             const struct GNUNET_ENV_Environment *env,
178                             const void *data,
179                             size_t data_size)
180 {
181   struct GNUNET_ENV_Modifier *mod = NULL;
182   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
183   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
184   struct GNUNET_MessageHeader *pmsg = NULL;
185   uint16_t env_size = 0;
186   if (NULL != env)
187   {
188     mod = GNUNET_ENV_environment_head (env);
189     while (NULL != mod)
190     {
191       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
192       mod = mod->next;
193     }
194   }
195
196   struct GNUNET_PSYC_Message *msg;
197   uint16_t method_name_size = strlen (method_name) + 1;
198   if (method_name_size == 1)
199     return NULL;
200
201   uint16_t msg_size = sizeof (*msg)                     /* header */
202     + sizeof (*pmeth) + method_name_size                /* method */
203     + env_size                                          /* modifiers */
204     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */
205     + sizeof (*pmsg);                                   /* end of message */
206   msg = GNUNET_malloc (msg_size);
207   msg->header.size = htons (msg_size);
208   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
209
210   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
211   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
212   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
213   memcpy (&pmeth[1], method_name, method_name_size);
214
215   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
216   if (NULL != env)
217   {
218     mod = GNUNET_ENV_environment_head (env);
219     while (NULL != mod)
220     {
221       uint16_t mod_name_size = strlen (mod->name) + 1;
222       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
223       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
224       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
225       p += pmod->header.size;
226       pmod->header.size = htons (pmod->header.size);
227
228       memcpy (&pmod[1], mod->name, mod_name_size);
229       if (0 < mod->value_size)
230         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
231
232       mod = mod->next;
233     }
234   }
235
236   if (0 < data_size)
237   {
238     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
239     pmsg->size = sizeof (*pmsg) + data_size;
240     p += pmsg->size;
241     pmsg->size = htons (pmsg->size);
242     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
243     memcpy (&pmsg[1], data, data_size);
244   }
245
246   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
247   pmsg->size = htons (sizeof (*pmsg));
248   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
249
250   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
251   return msg;
252 }
253
254
255 void
256 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
257                          const struct GNUNET_MessageHeader *msg)
258 {
259   uint16_t size = ntohs (msg->size);
260   uint16_t type = ntohs (msg->type);
261   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
262   switch (type)
263   {
264   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
265   {
266     struct GNUNET_PSYC_MessageHeader *pmsg
267       = (struct GNUNET_PSYC_MessageHeader *) msg;
268     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
269                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
270     break;
271   }
272   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
273   {
274     struct GNUNET_PSYC_MessageMethod *meth
275       = (struct GNUNET_PSYC_MessageMethod *) msg;
276     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
277     break;
278   }
279   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
280   {
281     struct GNUNET_PSYC_MessageModifier *mod
282       = (struct GNUNET_PSYC_MessageModifier *) msg;
283     uint16_t name_size = ntohs (mod->name_size);
284     char oper = ' ' < mod->oper ? mod->oper : ' ';
285     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
286                 size - sizeof (*mod) - name_size,
287                 ((char *) &mod[1]) + name_size);
288     break;
289   }
290   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
291   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
292     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
293     break;
294   }
295 }
296
297
298 /**** Transmitting messages ****/
299
300
301 /**
302  * Create a transmission handle.
303  */
304 struct GNUNET_PSYC_TransmitHandle *
305 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
306 {
307   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
308   tmit->client = client;
309   return tmit;
310 }
311
312
313 /**
314  * Destroy a transmission handle.
315  */
316 void
317 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
318 {
319   GNUNET_free (tmit);
320 }
321
322
323 /**
324  * Queue a message part for transmission.
325  *
326  * The message part is added to the current message buffer.
327  * When this buffer is full, it is added to the transmission queue.
328  *
329  * @param tmit  Transmission handle.
330  * @param msg  Message part, or NULL.
331  * @param end  End of message? #GNUNET_YES or #GNUNET_NO.
332  */
333 static void
334 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
335                        const struct GNUNET_MessageHeader *msg,
336                        uint8_t end)
337 {
338   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
339
340   LOG (GNUNET_ERROR_TYPE_DEBUG,
341        "Queueing message part of type %u and size %u (end: %u)).\n",
342        ntohs (msg->type), size, end);
343
344   if (NULL != tmit->msg)
345   {
346     if (NULL == msg
347         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
348     {
349       /* End of message or buffer is full, add it to transmission queue
350        * and start with empty buffer */
351       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
352       tmit->msg->size = htons (tmit->msg->size);
353       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
354       tmit->msg = NULL;
355       tmit->acks_pending++;
356     }
357     else
358     {
359       /* Message fits in current buffer, append */
360       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
361       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
362       tmit->msg->size += size;
363     }
364   }
365
366   if (NULL == tmit->msg && NULL != msg)
367   {
368     /* Empty buffer, copy over message. */
369     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
370     tmit->msg->size = sizeof (*tmit->msg) + size;
371     memcpy (&tmit->msg[1], msg, size);
372   }
373
374   if (NULL != tmit->msg
375       && (GNUNET_YES == end
376           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
377               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
378   {
379     /* End of message or buffer is full, add it to transmission queue. */
380     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
381     tmit->msg->size = htons (tmit->msg->size);
382     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
383     tmit->msg = NULL;
384     tmit->acks_pending++;
385   }
386
387   if (GNUNET_YES == end)
388     tmit->in_transmit = GNUNET_NO;
389 }
390
391
392 /**
393  * Request data from client to transmit.
394  *
395  * @param tmit  Transmission handle.
396  */
397 static void
398 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
399 {
400   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
401   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
402   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
403   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
404
405   int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
406   LOG (GNUNET_ERROR_TYPE_DEBUG,
407        "transmit_data (ret: %d, size: %u): %.*s\n",
408        notify_ret, data_size, data_size, &msg[1]);
409   switch (notify_ret)
410   {
411   case GNUNET_NO:
412     if (0 == data_size)
413     {
414       /* Transmission paused, nothing to send. */
415       tmit->paused = GNUNET_YES;
416       return;
417     }
418     break;
419
420   case GNUNET_YES:
421     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
422     break;
423
424   default:
425     LOG (GNUNET_ERROR_TYPE_ERROR,
426          "TransmitNotifyData callback returned error when requesting data.\n");
427
428     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
429     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
430     msg->size = htons (sizeof (*msg));
431     transmit_queue_insert (tmit, msg, GNUNET_YES);
432     return;
433   }
434
435   if (0 < data_size)
436   {
437     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
438     msg->size = htons (sizeof (*msg) + data_size);
439     transmit_queue_insert (tmit, msg, !notify_ret);
440   }
441
442   /* End of message. */
443   if (GNUNET_YES == notify_ret)
444   {
445     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
446     msg->size = htons (sizeof (*msg));
447     transmit_queue_insert (tmit, msg, GNUNET_YES);
448   }
449 }
450
451
452 /**
453  * Request a modifier from a client to transmit.
454  *
455  * @param tmit  Transmission handle.
456  */
457 static void
458 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
459 {
460   uint16_t max_data_size, data_size;
461   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
462   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
463   int notify_ret;
464
465   switch (tmit->state)
466   {
467   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
468   {
469     struct GNUNET_PSYC_MessageModifier *mod
470       = (struct GNUNET_PSYC_MessageModifier *) msg;
471     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
472     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
473     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
474     notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
475                                    &mod->oper, &mod->value_size);
476
477     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
478     LOG (GNUNET_ERROR_TYPE_DEBUG,
479          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
480          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
481     if (mod->name_size < data_size)
482     {
483       tmit->mod_value_remaining
484         = mod->value_size - (data_size - mod->name_size);
485       mod->value_size = htonl (mod->value_size);
486       mod->name_size = htons (mod->name_size);
487     }
488     else if (0 < data_size)
489     {
490       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
491       notify_ret = GNUNET_SYSERR;
492     }
493     break;
494   }
495   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
496   {
497     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
498     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
499     msg->size = sizeof (struct GNUNET_MessageHeader);
500     notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
501                                    &data_size, &msg[1], NULL, NULL);
502     tmit->mod_value_remaining -= data_size;
503     LOG (GNUNET_ERROR_TYPE_DEBUG,
504          "transmit_mod (ret: %d, size: %u): %.*s\n",
505          notify_ret, data_size, data_size, &msg[1]);
506     break;
507   }
508   default:
509     GNUNET_assert (0);
510   }
511
512   switch (notify_ret)
513   {
514   case GNUNET_NO:
515     if (0 == data_size)
516     { /* Transmission paused, nothing to send. */
517       tmit->paused = GNUNET_YES;
518       return;
519     }
520     tmit->state
521       = (0 == tmit->mod_value_remaining)
522       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
523       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
524     break;
525
526   case GNUNET_YES: /* End of modifiers. */
527     GNUNET_assert (0 == tmit->mod_value_remaining);
528     break;
529
530   default:
531     LOG (GNUNET_ERROR_TYPE_ERROR,
532          "TransmitNotifyModifier callback returned with error.\n");
533
534     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
535     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
536     msg->size = htons (sizeof (*msg));
537
538     transmit_queue_insert (tmit, msg, GNUNET_YES);
539     return;
540   }
541
542   if (0 < data_size)
543   {
544     GNUNET_assert (data_size <= max_data_size);
545     msg->size = htons (msg->size + data_size);
546     transmit_queue_insert (tmit, msg, GNUNET_NO);
547   }
548
549   if (GNUNET_YES == notify_ret)
550   {
551     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
552     if (0 == tmit->acks_pending)
553       transmit_data (tmit);
554   }
555   else
556   {
557     transmit_mod (tmit);
558   }
559 }
560
561
562 int
563 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
564                      uint32_t *full_value_size)
565
566 {
567   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
568   uint16_t name_size = 0;
569   size_t value_size = 0;
570   const char *value = NULL;
571
572   if (NULL != oper)
573   { /* New modifier */
574     if (NULL != tmit->mod)
575       tmit->mod = tmit->mod->next;
576     if (NULL == tmit->mod)
577     { /* No more modifiers, continue with data */
578       *data_size = 0;
579       return GNUNET_YES;
580     }
581
582     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
583     *full_value_size = tmit->mod->value_size;
584     *oper = tmit->mod->oper;
585     name_size = strlen (tmit->mod->name) + 1;
586
587     if (name_size + tmit->mod->value_size <= *data_size)
588     {
589       *data_size = name_size + tmit->mod->value_size;
590     }
591     else
592     {
593       value_size = *data_size - name_size;
594       tmit->mod_value = tmit->mod->value + value_size;
595     }
596
597     memcpy (data, tmit->mod->name, name_size);
598     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
599     return GNUNET_NO;
600   }
601   else
602   { /* Modifier continuation */
603     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
604     value = tmit->mod_value;
605     if (tmit->mod_value_remaining <= *data_size)
606     {
607       value_size = tmit->mod_value_remaining;
608       tmit->mod_value = NULL;
609     }
610     else
611     {
612       value_size = *data_size;
613       tmit->mod_value += value_size;
614     }
615
616     if (*data_size < value_size)
617     {
618       LOG (GNUNET_ERROR_TYPE_DEBUG,
619            "Value in environment larger than buffer: %u < %zu\n",
620            *data_size, value_size);
621       *data_size = 0;
622       return GNUNET_NO;
623     }
624
625     *data_size = value_size;
626     memcpy (data, value, value_size);
627     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
628   }
629 }
630
631
632 /**
633  * Transmit a message.
634  *
635  * @param tmit         Transmission handle.
636  * @param method_name  Which method should be invoked.
637  * @param env          Environment for the message.
638  *   Should stay available until the first call to notify_data.
639  *   Can be NULL if there are no modifiers or @a notify_mod is provided instead.
640  * @param notify_mod   Function to call to obtain modifiers.
641  *   Can be NULL if there are no modifiers or @a env is provided instead.
642  * @param notify_data  Function to call to obtain fragments of the data.
643  * @param notify_cls   Closure for @a notify_mod and @a notify_data.
644  * @param flags        Flags for the message being transmitted.
645  *
646  * @return #GNUNET_OK if the transmission was started.
647  *         #GNUNET_SYSERR if another transmission is already going on.
648  */
649 int
650 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
651                               const char *method_name,
652                               const struct GNUNET_ENV_Environment *env,
653                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
654                               GNUNET_PSYC_TransmitNotifyData notify_data,
655                               void *notify_cls,
656                               uint32_t flags)
657 {
658   if (GNUNET_NO != tmit->in_transmit)
659     return GNUNET_SYSERR;
660   tmit->in_transmit = GNUNET_YES;
661
662   size_t size = strlen (method_name) + 1;
663   struct GNUNET_PSYC_MessageMethod *pmeth;
664   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
665   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
666
667   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
668   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
669   pmeth->header.size = htons (sizeof (*pmeth) + size);
670   pmeth->flags = htonl (flags);
671   memcpy (&pmeth[1], method_name, size);
672
673   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
674   tmit->notify_data = notify_data;
675   tmit->notify_data_cls = notify_cls;
676
677   if (NULL != notify_mod)
678   {
679     tmit->notify_mod = notify_mod;
680     tmit->notify_mod_cls = notify_cls;
681   }
682   else
683   {
684     tmit->notify_mod = &transmit_notify_env;
685     tmit->notify_mod_cls = tmit;
686     if (NULL != env)
687     {
688       struct GNUNET_ENV_Modifier mod = {};
689       mod.next = GNUNET_ENV_environment_head (env);
690       tmit->mod = &mod;
691     }
692     else
693     {
694       tmit->mod = NULL;
695     }
696   }
697
698   transmit_mod (tmit);
699   return GNUNET_OK;
700 }
701
702
703 /**
704  * Resume transmission.
705  *
706  * @param tmit  Transmission handle.
707  */
708 void
709 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
710 {
711   if (0 == tmit->acks_pending)
712   {
713     tmit->paused = GNUNET_NO;
714     transmit_data (tmit);
715   }
716 }
717
718
719 /**
720  * Abort transmission request.
721  *
722  * @param tmit  Transmission handle.
723  */
724 void
725 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
726 {
727   if (GNUNET_NO == tmit->in_transmit)
728     return;
729
730   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
731   tmit->in_transmit = GNUNET_NO;
732   tmit->paused = GNUNET_NO;
733
734   /* FIXME */
735   struct GNUNET_MessageHeader msg;
736   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
737   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
738   msg.size = htons (sizeof (msg));
739   transmit_queue_insert (tmit, &msg, GNUNET_YES);
740 }
741
742
743 /**
744  * Got acknowledgement of a transmitted message part, continue transmission.
745  *
746  * @param tmit  Transmission handle.
747  */
748 void
749 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
750 {
751   if (0 == tmit->acks_pending)
752   {
753     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
754     GNUNET_break (0);
755     return;
756   }
757   tmit->acks_pending--;
758
759   switch (tmit->state)
760   {
761   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
762   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
763     if (GNUNET_NO == tmit->paused)
764       transmit_mod (tmit);
765     break;
766
767   case GNUNET_PSYC_MESSAGE_STATE_DATA:
768     if (GNUNET_NO == tmit->paused)
769       transmit_data (tmit);
770     break;
771
772   case GNUNET_PSYC_MESSAGE_STATE_END:
773   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
774     break;
775
776   default:
777     LOG (GNUNET_ERROR_TYPE_DEBUG,
778          "Ignoring message ACK in state %u.\n", tmit->state);
779   }
780 }
781
782
783 /**** Receiving messages ****/
784
785
786 /**
787  * Create handle for receiving messages.
788  */
789 struct GNUNET_PSYC_ReceiveHandle *
790 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
791                             GNUNET_PSYC_MessagePartCallback message_part_cb,
792                             void *cb_cls)
793 {
794   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
795   recv->message_cb = message_cb;
796   recv->message_part_cb = message_part_cb;
797   recv->cb_cls = cb_cls;
798   return recv;
799 }
800
801
802 /**
803  * Destroy handle for receiving messages.
804  */
805 void
806 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
807 {
808   GNUNET_free (recv);
809 }
810
811
812 /**
813  * Reset stored data related to the last received message.
814  */
815 void
816 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
817 {
818   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
819   recv->flags = 0;
820   recv->message_id = 0;
821   recv->mod_value_size = 0;
822   recv->mod_value_size_expected = 0;
823 }
824
825
826 static void
827 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
828 {
829   if (NULL != recv->message_part_cb)
830     recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL);
831
832   if (NULL != recv->message_cb)
833     recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
834
835   GNUNET_PSYC_receive_reset (recv);
836 }
837
838
839 /**
840  * Handle incoming PSYC message.
841  *
842  * @param recv  Receive handle.
843  * @param msg   The message.
844  *
845  * @return #GNUNET_OK on success,
846  *         #GNUNET_SYSERR on receive error.
847  */
848 int
849 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
850                              const struct GNUNET_PSYC_MessageHeader *msg)
851 {
852   uint16_t size = ntohs (msg->header.size);
853   uint32_t flags = ntohl (msg->flags);
854   uint64_t message_id;
855
856   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
857                            (struct GNUNET_MessageHeader *) msg);
858
859   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
860   {
861     recv->message_id = GNUNET_ntohll (msg->message_id);
862     recv->flags = flags;
863     recv->slave_key = msg->slave_key;
864     recv->mod_value_size = 0;
865     recv->mod_value_size_expected = 0;
866   }
867   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
868   {
869     // FIXME
870     LOG (GNUNET_ERROR_TYPE_WARNING,
871          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
872          GNUNET_ntohll (msg->message_id), recv->message_id);
873     GNUNET_break_op (0);
874     recv_error (recv);
875     return GNUNET_SYSERR;
876   }
877   else if (flags != recv->flags)
878   {
879     LOG (GNUNET_ERROR_TYPE_WARNING,
880          "Unexpected message flags. Got: %lu, expected: %lu\n",
881          flags, recv->flags);
882     GNUNET_break_op (0);
883     recv_error (recv);
884     return GNUNET_SYSERR;
885   }
886   message_id = recv->message_id;
887
888   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
889
890   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
891   {
892     const struct GNUNET_MessageHeader *pmsg
893       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
894     psize = ntohs (pmsg->size);
895     ptype = ntohs (pmsg->type);
896     size_eq = size_min = 0;
897
898     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
899     {
900       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
901                   "Dropping message of type %u with invalid size %u.\n",
902                   ptype, psize);
903       recv_error (recv);
904       return GNUNET_SYSERR;
905     }
906
907     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
908                 "Received message part from PSYC.\n");
909     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
910
911     switch (ptype)
912     {
913     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
914       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
915       break;
916     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
917       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
918       break;
919     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
920     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
921       size_min = sizeof (struct GNUNET_MessageHeader);
922       break;
923     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
924     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
925       size_eq = sizeof (struct GNUNET_MessageHeader);
926       break;
927     default:
928       GNUNET_break_op (0);
929       recv_error (recv);
930       return GNUNET_SYSERR;
931     }
932
933     if (! ((0 < size_eq && psize == size_eq)
934            || (0 < size_min && size_min <= psize)))
935     {
936       GNUNET_break_op (0);
937       recv_error (recv);
938       return GNUNET_SYSERR;
939     }
940
941     switch (ptype)
942     {
943     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
944     {
945       struct GNUNET_PSYC_MessageMethod *meth
946         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
947
948       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
949       {
950         LOG (GNUNET_ERROR_TYPE_WARNING,
951              "Dropping out of order message method (%u).\n",
952              recv->state);
953         /* It is normal to receive an incomplete message right after connecting,
954          * but should not happen later.
955          * FIXME: add a check for this condition.
956          */
957         GNUNET_break_op (0);
958         recv_error (recv);
959         return GNUNET_SYSERR;
960       }
961
962       if ('\0' != *((char *) meth + psize - 1))
963       {
964         LOG (GNUNET_ERROR_TYPE_WARNING,
965              "Dropping message with malformed method. "
966              "Message ID: %" PRIu64 "\n", recv->message_id);
967         GNUNET_break_op (0);
968         recv_error (recv);
969         return GNUNET_SYSERR;
970       }
971       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
972       break;
973     }
974     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
975     {
976       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
977             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
978             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
979       {
980         LOG (GNUNET_ERROR_TYPE_WARNING,
981              "Dropping out of order message modifier (%u).\n",
982              recv->state);
983         GNUNET_break_op (0);
984         recv_error (recv);
985         return GNUNET_SYSERR;
986       }
987
988       struct GNUNET_PSYC_MessageModifier *mod
989         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
990
991       uint16_t name_size = ntohs (mod->name_size);
992       recv->mod_value_size_expected = ntohl (mod->value_size);
993       recv->mod_value_size = psize - sizeof (*mod) - name_size;
994
995       if (psize < sizeof (*mod) + name_size
996           || '\0' != *((char *) &mod[1] + name_size - 1)
997           || recv->mod_value_size_expected < recv->mod_value_size)
998       {
999         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1000         GNUNET_break_op (0);
1001         recv_error (recv);
1002         return GNUNET_SYSERR;
1003       }
1004       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1005       break;
1006     }
1007     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1008     {
1009       recv->mod_value_size += psize - sizeof (*pmsg);
1010
1011       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1012             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1013           || recv->mod_value_size_expected < recv->mod_value_size)
1014       {
1015         LOG (GNUNET_ERROR_TYPE_WARNING,
1016              "Dropping out of order message modifier continuation "
1017              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1018              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1019              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1020              recv->mod_value_size_expected, recv->mod_value_size);
1021         GNUNET_break_op (0);
1022         recv_error (recv);
1023         return GNUNET_SYSERR;
1024       }
1025       break;
1026     }
1027     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1028     {
1029       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1030           || recv->mod_value_size_expected != recv->mod_value_size)
1031       {
1032         LOG (GNUNET_ERROR_TYPE_WARNING,
1033              "Dropping out of order message data fragment "
1034              "(%u < %u || %lu != %lu).\n",
1035              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1036              recv->mod_value_size_expected, recv->mod_value_size);
1037
1038         GNUNET_break_op (0);
1039         recv_error (recv);
1040         return GNUNET_SYSERR;
1041       }
1042       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1043       break;
1044     }
1045     }
1046
1047     if (NULL != recv->message_part_cb)
1048       recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset
1049                              recv->flags, pmsg);
1050
1051     switch (ptype)
1052     {
1053     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1054     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1055       GNUNET_PSYC_receive_reset (recv);
1056       break;
1057     }
1058   }
1059
1060   if (NULL != recv->message_cb)
1061     recv->message_cb (recv->cb_cls, message_id, flags, msg);
1062   return GNUNET_OK;
1063 }
1064
1065
1066 /**
1067  * Check if @a data contains a series of valid message parts.
1068  *
1069  * @param      data_size    Size of @a data.
1070  * @param      data         Data.
1071  * @param[out] first_ptype  Type of first message part.
1072  * @param[out] last_ptype   Type of last message part.
1073  *
1074  * @return Number of message parts found in @a data.
1075  *         or GNUNET_SYSERR if the message contains invalid parts.
1076  */
1077 int
1078 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1079                                  uint16_t *first_ptype, uint16_t *last_ptype)
1080 {
1081   const struct GNUNET_MessageHeader *pmsg;
1082   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1083   if (NULL != first_ptype)
1084     *first_ptype = 0;
1085   if (NULL != last_ptype)
1086     *last_ptype = 0;
1087
1088   for (pos = 0; pos < data_size; pos += psize, parts++)
1089   {
1090     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1091     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1092     psize = ntohs (pmsg->size);
1093     ptype = ntohs (pmsg->type);
1094     if (0 == parts && NULL != first_ptype)
1095       *first_ptype = ptype;
1096     if (NULL != last_ptype
1097         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1098       *last_ptype = ptype;
1099     if (psize < sizeof (*pmsg)
1100         || pos + psize > data_size
1101         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1102         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1103     {
1104       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1105                   "Invalid message part of type %u and size %u.\n",
1106                   ptype, psize);
1107       return GNUNET_SYSERR;
1108     }
1109     /* FIXME: check message part order */
1110   }
1111   return parts;
1112 }
1113
1114
1115 struct ParseMessageClosure
1116 {
1117   struct GNUNET_ENV_Environment *env;
1118   const char **method_name;
1119   const void **data;
1120   uint16_t *data_size;
1121   enum GNUNET_PSYC_MessageState msg_state;
1122 };
1123
1124
1125 static void
1126 parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset,
1127                        uint32_t flags, const struct GNUNET_MessageHeader *msg)
1128 {
1129   struct ParseMessageClosure *pmc = cls;
1130   if (NULL == msg)
1131   {
1132     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1133     return;
1134   }
1135
1136   switch (ntohs (msg->type))
1137   {
1138   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1139   {
1140     struct GNUNET_PSYC_MessageMethod *
1141       pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1142     *pmc->method_name = (const char *) &pmeth[1];
1143     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1144     break;
1145   }
1146
1147   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1148   {
1149     struct GNUNET_PSYC_MessageModifier *
1150       pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1151
1152     const char *name = (const char *) &pmod[1];
1153     const void *value = name + pmod->name_size;
1154     GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1155                                 pmod->value_size);
1156     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1157     break;
1158   }
1159
1160   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1161     *pmc->data = &msg[1];
1162     *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1163     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1164     break;
1165
1166   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1167     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1168     break;
1169
1170   default:
1171     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1172   }
1173 }
1174
1175
1176 /**
1177  * Parse PSYC message.
1178  *
1179  * @param msg
1180  *        The PSYC message to parse.
1181  * @param[out] method_name
1182  *        Pointer to the method name inside @a pmsg.
1183  * @param env
1184  *        The environment for the message with a list of modifiers.
1185  * @param[out] data
1186  *        Pointer to data inside @a pmsg.
1187  * @param[out] data_size
1188  *        Size of @data is written here.
1189  *
1190  * @return #GNUNET_OK on success,
1191  *         #GNUNET_SYSERR on parse error.
1192  */
1193 int
1194 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg,
1195                            const char **method_name,
1196                            struct GNUNET_ENV_Environment *env,
1197                            const void **data,
1198                            uint16_t *data_size)
1199 {
1200   struct ParseMessageClosure cls;
1201   cls.env = env;
1202   cls.method_name = method_name;
1203   cls.data = data;
1204   cls.data_size = data_size;
1205
1206   uint16_t msg_size = ntohs (msg->header.size);
1207   struct GNUNET_PSYC_MessageHeader *
1208     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1209   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1210
1211   struct GNUNET_PSYC_ReceiveHandle *
1212     recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls);
1213   GNUNET_PSYC_receive_message (recv, pmsg);
1214   GNUNET_PSYC_receive_destroy (recv);
1215   GNUNET_free (pmsg);
1216
1217   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1218     ? GNUNET_OK
1219     : GNUNET_SYSERR;
1220 }