-do not run test without sqlite
[oweals/gnunet.git] / src / psycstore / psyc_util_lib.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_env_lib.h"
32 #include "gnunet_psyc_service.h"
33 #include "gnunet_psyc_util_lib.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
36
37
38 struct GNUNET_PSYC_TransmitHandle
39 {
40   /**
41    * Client connection to service.
42    */
43   struct GNUNET_CLIENT_MANAGER_Connection *client;
44
45   /**
46    * Message currently being received from the client.
47    */
48   struct GNUNET_MessageHeader *msg;
49
50   /**
51    * Callback to request next modifier from client.
52    */
53   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
54
55   /**
56    * Closure for the notify callbacks.
57    */
58   void *notify_mod_cls;
59
60   /**
61    * Callback to request next data fragment from client.
62    */
63   GNUNET_PSYC_TransmitNotifyData notify_data;
64
65   /**
66    * Closure for the notify callbacks.
67    */
68   void *notify_data_cls;
69
70   /**
71    * Modifier of the environment that is currently being transmitted.
72    */
73   struct GNUNET_ENV_Modifier *mod;
74
75   /**
76    *
77    */
78   const char *mod_value;
79
80   /**
81    * Number of bytes remaining to be transmitted from the current modifier value.
82    */
83   uint32_t mod_value_remaining;
84
85   /**
86    * State of the current message being received from client.
87    */
88   enum GNUNET_PSYC_MessageState state;
89
90   /**
91    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
92    */
93   uint8_t acks_pending;
94
95   /**
96    * Is transmission paused?
97    */
98   uint8_t paused;
99
100   /**
101    * Are we currently transmitting a message?
102    */
103   uint8_t in_transmit;
104 };
105
106
107
108 struct GNUNET_PSYC_ReceiveHandle
109 {
110   /**
111    * Message callback.
112    */
113   GNUNET_PSYC_MessageCallback message_cb;
114
115   /**
116    * Message part callback.
117    */
118   GNUNET_PSYC_MessagePartCallback message_part_cb;
119
120   /**
121    * Closure for the callbacks.
122    */
123   void *cb_cls;
124
125   /**
126    * ID of the message being received from the PSYC service.
127    */
128   uint64_t message_id;
129
130   /**
131    * Public key of the slave from which a message is being received.
132    */
133   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
134
135   /**
136    * State of the currently being received message from the PSYC service.
137    */
138   enum GNUNET_PSYC_MessageState state;
139
140   /**
141    * Flags for the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageFlags flags;
144
145   /**
146    * Expected value size for the modifier being received from the PSYC service.
147    */
148   uint32_t mod_value_size_expected;
149
150   /**
151    * Actual value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size;
154 };
155
156
157 /**** Messages ****/
158
159
160 /**
161  * Create a PSYC message.
162  *
163  * @param method_name
164  *        PSYC method for the message.
165  * @param env
166  *        Environment for the message.
167  * @param data
168  *        Data payload for the message.
169  * @param data_size
170  *        Size of @a data.
171  *
172  * @return Message header with size information,
173  *         followed by the message parts.
174  */
175 struct GNUNET_PSYC_Message *
176 GNUNET_PSYC_message_create (const char *method_name,
177                             const struct GNUNET_ENV_Environment *env,
178                             const void *data,
179                             size_t data_size)
180 {
181   struct GNUNET_ENV_Modifier *mod = NULL;
182   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
183   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
184   struct GNUNET_MessageHeader *pmsg = NULL;
185   uint16_t env_size = 0;
186   if (NULL != env)
187   {
188     mod = GNUNET_ENV_environment_head (env);
189     while (NULL != mod)
190     {
191       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
192       mod = mod->next;
193     }
194   }
195
196   struct GNUNET_PSYC_Message *msg;
197   uint16_t method_name_size = strlen (method_name) + 1;
198   if (method_name_size == 1)
199     return NULL;
200
201   uint16_t msg_size = sizeof (*msg)                     /* header */
202     + sizeof (*pmeth) + method_name_size                /* method */
203     + env_size                                          /* modifiers */
204     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */
205     + sizeof (*pmsg);                                   /* end of message */
206   msg = GNUNET_malloc (msg_size);
207   msg->header.size = htons (msg_size);
208   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
209
210   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
211   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
212   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
213   memcpy (&pmeth[1], method_name, method_name_size);
214
215   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
216   if (NULL != env)
217   {
218     mod = GNUNET_ENV_environment_head (env);
219     while (NULL != mod)
220     {
221       uint16_t mod_name_size = strlen (mod->name) + 1;
222       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
223       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
224       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
225       p += pmod->header.size;
226       pmod->header.size = htons (pmod->header.size);
227
228       memcpy (&pmod[1], mod->name, mod_name_size);
229       if (0 < mod->value_size)
230         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
231
232       mod = mod->next;
233     }
234   }
235
236   if (0 < data_size)
237   {
238     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
239     pmsg->size = sizeof (*pmsg) + data_size;
240     p += pmsg->size;
241     pmsg->size = htons (pmsg->size);
242     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
243     memcpy (&pmsg[1], data, data_size);
244   }
245
246   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
247   pmsg->size = htons (sizeof (*pmsg));
248   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
249
250   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
251   return msg;
252 }
253
254
255 void
256 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
257                          const struct GNUNET_MessageHeader *msg)
258 {
259   uint16_t size = ntohs (msg->size);
260   uint16_t type = ntohs (msg->type);
261   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
262   switch (type)
263   {
264   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
265   {
266     struct GNUNET_PSYC_MessageHeader *pmsg
267       = (struct GNUNET_PSYC_MessageHeader *) msg;
268     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
269                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
270     break;
271   }
272   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
273   {
274     struct GNUNET_PSYC_MessageMethod *meth
275       = (struct GNUNET_PSYC_MessageMethod *) msg;
276     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
277     break;
278   }
279   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
280   {
281     struct GNUNET_PSYC_MessageModifier *mod
282       = (struct GNUNET_PSYC_MessageModifier *) msg;
283     uint16_t name_size = ntohs (mod->name_size);
284     char oper = ' ' < mod->oper ? mod->oper : ' ';
285     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
286                 size - sizeof (*mod) - name_size,
287                 ((char *) &mod[1]) + name_size);
288     break;
289   }
290   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
291   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
292     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
293     break;
294   }
295 }
296
297
298 /**** Transmitting messages ****/
299
300
301 /**
302  * Create a transmission handle.
303  */
304 struct GNUNET_PSYC_TransmitHandle *
305 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
306 {
307   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
308   tmit->client = client;
309   return tmit;
310 }
311
312
313 /**
314  * Destroy a transmission handle.
315  */
316 void
317 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
318 {
319   GNUNET_free (tmit);
320 }
321
322
323 /**
324  * Queue a message part for transmission.
325  *
326  * The message part is added to the current message buffer.
327  * When this buffer is full, it is added to the transmission queue.
328  *
329  * @param tmit
330  *        Transmission handle.
331  * @param msg
332  *        Message part, or NULL.
333  * @param end
334  *        End of message?
335  *        #GNUNET_YES or #GNUNET_NO.
336  */
337 static void
338 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
339                        const struct GNUNET_MessageHeader *msg,
340                        uint8_t end)
341 {
342   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
343
344   LOG (GNUNET_ERROR_TYPE_DEBUG,
345        "Queueing message part of type %u and size %u (end: %u)).\n",
346        NULL != msg ? ntohs (msg->type) : 0, size, end);
347
348   if (NULL != tmit->msg)
349   {
350     if (NULL == msg
351         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
352     {
353       /* End of message or buffer is full, add it to transmission queue
354        * and start with empty buffer */
355       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
356       tmit->msg->size = htons (tmit->msg->size);
357       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
358       tmit->msg = NULL;
359       tmit->acks_pending++;
360     }
361     else
362     {
363       /* Message fits in current buffer, append */
364       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
365       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
366       tmit->msg->size += size;
367     }
368   }
369
370   if (NULL == tmit->msg && NULL != msg)
371   {
372     /* Empty buffer, copy over message. */
373     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
374     tmit->msg->size = sizeof (*tmit->msg) + size;
375     memcpy (&tmit->msg[1], msg, size);
376   }
377
378   if (NULL != tmit->msg
379       && (GNUNET_YES == end
380           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
381               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
382   {
383     /* End of message or buffer is full, add it to transmission queue. */
384     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
385     tmit->msg->size = htons (tmit->msg->size);
386     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
387     tmit->msg = NULL;
388     tmit->acks_pending++;
389   }
390
391   if (GNUNET_YES == end)
392     tmit->in_transmit = GNUNET_NO;
393 }
394
395
396 /**
397  * Request data from client to transmit.
398  *
399  * @param tmit  Transmission handle.
400  */
401 static void
402 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
403 {
404   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
405   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
406   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
407   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
408
409   int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
410   LOG (GNUNET_ERROR_TYPE_DEBUG,
411        "transmit_data (ret: %d, size: %u): %.*s\n",
412        notify_ret, data_size, data_size, &msg[1]);
413   switch (notify_ret)
414   {
415   case GNUNET_NO:
416     if (0 == data_size)
417     {
418       /* Transmission paused, nothing to send. */
419       tmit->paused = GNUNET_YES;
420       return;
421     }
422     break;
423
424   case GNUNET_YES:
425     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
426     break;
427
428   default:
429     LOG (GNUNET_ERROR_TYPE_ERROR,
430          "TransmitNotifyData callback returned error when requesting data.\n");
431
432     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
433     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
434     msg->size = htons (sizeof (*msg));
435     transmit_queue_insert (tmit, msg, GNUNET_YES);
436     return;
437   }
438
439   if (0 < data_size)
440   {
441     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
442     msg->size = htons (sizeof (*msg) + data_size);
443     transmit_queue_insert (tmit, msg, !notify_ret);
444   }
445
446   /* End of message. */
447   if (GNUNET_YES == notify_ret)
448   {
449     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
450     msg->size = htons (sizeof (*msg));
451     transmit_queue_insert (tmit, msg, GNUNET_YES);
452   }
453 }
454
455
456 /**
457  * Request a modifier from a client to transmit.
458  *
459  * @param tmit  Transmission handle.
460  */
461 static void
462 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
463 {
464   uint16_t max_data_size, data_size;
465   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
466   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
467   int notify_ret;
468
469   switch (tmit->state)
470   {
471   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
472   {
473     struct GNUNET_PSYC_MessageModifier *mod
474       = (struct GNUNET_PSYC_MessageModifier *) msg;
475     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
476     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
477     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
478     notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
479                                    &mod->oper, &mod->value_size);
480
481     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
482     LOG (GNUNET_ERROR_TYPE_DEBUG,
483          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
484          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
485     if (mod->name_size < data_size)
486     {
487       tmit->mod_value_remaining
488         = mod->value_size - (data_size - mod->name_size);
489       mod->value_size = htonl (mod->value_size);
490       mod->name_size = htons (mod->name_size);
491     }
492     else if (0 < data_size)
493     {
494       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
495       notify_ret = GNUNET_SYSERR;
496     }
497     break;
498   }
499   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
500   {
501     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
502     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
503     msg->size = sizeof (struct GNUNET_MessageHeader);
504     notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
505                                    &data_size, &msg[1], NULL, NULL);
506     tmit->mod_value_remaining -= data_size;
507     LOG (GNUNET_ERROR_TYPE_DEBUG,
508          "transmit_mod (ret: %d, size: %u): %.*s\n",
509          notify_ret, data_size, data_size, &msg[1]);
510     break;
511   }
512   default:
513     GNUNET_assert (0);
514   }
515
516   switch (notify_ret)
517   {
518   case GNUNET_NO:
519     if (0 == data_size)
520     { /* Transmission paused, nothing to send. */
521       tmit->paused = GNUNET_YES;
522       return;
523     }
524     tmit->state
525       = (0 == tmit->mod_value_remaining)
526       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
527       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
528     break;
529
530   case GNUNET_YES: /* End of modifiers. */
531     GNUNET_assert (0 == tmit->mod_value_remaining);
532     break;
533
534   default:
535     LOG (GNUNET_ERROR_TYPE_ERROR,
536          "TransmitNotifyModifier callback returned with error.\n");
537
538     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
539     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
540     msg->size = htons (sizeof (*msg));
541
542     transmit_queue_insert (tmit, msg, GNUNET_YES);
543     return;
544   }
545
546   if (0 < data_size)
547   {
548     GNUNET_assert (data_size <= max_data_size);
549     msg->size = htons (msg->size + data_size);
550     transmit_queue_insert (tmit, msg, GNUNET_NO);
551   }
552
553   if (GNUNET_YES == notify_ret)
554   {
555     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
556     if (0 == tmit->acks_pending)
557       transmit_data (tmit);
558   }
559   else
560   {
561     transmit_mod (tmit);
562   }
563 }
564
565
566 int
567 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
568                      uint32_t *full_value_size)
569
570 {
571   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
572   uint16_t name_size = 0;
573   size_t value_size = 0;
574   const char *value = NULL;
575
576   if (NULL != oper)
577   { /* New modifier */
578     if (NULL != tmit->mod)
579       tmit->mod = tmit->mod->next;
580     if (NULL == tmit->mod)
581     { /* No more modifiers, continue with data */
582       *data_size = 0;
583       return GNUNET_YES;
584     }
585
586     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
587     *full_value_size = tmit->mod->value_size;
588     *oper = tmit->mod->oper;
589     name_size = strlen (tmit->mod->name) + 1;
590
591     if (name_size + tmit->mod->value_size <= *data_size)
592     {
593       *data_size = name_size + tmit->mod->value_size;
594     }
595     else
596     {
597       value_size = *data_size - name_size;
598       tmit->mod_value = tmit->mod->value + value_size;
599     }
600
601     memcpy (data, tmit->mod->name, name_size);
602     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
603     return GNUNET_NO;
604   }
605   else
606   { /* Modifier continuation */
607     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
608     value = tmit->mod_value;
609     if (tmit->mod_value_remaining <= *data_size)
610     {
611       value_size = tmit->mod_value_remaining;
612       tmit->mod_value = NULL;
613     }
614     else
615     {
616       value_size = *data_size;
617       tmit->mod_value += value_size;
618     }
619
620     if (*data_size < value_size)
621     {
622       LOG (GNUNET_ERROR_TYPE_DEBUG,
623            "Value in environment larger than buffer: %u < %zu\n",
624            *data_size, value_size);
625       *data_size = 0;
626       return GNUNET_NO;
627     }
628
629     *data_size = value_size;
630     memcpy (data, value, value_size);
631     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
632   }
633 }
634
635
636 /**
637  * Transmit a message.
638  *
639  * @param tmit
640  *        Transmission handle.
641  * @param method_name
642  *        Which method should be invoked.
643  * @param env
644  *        Environment for the message.
645  *        Should stay available until the first call to notify_data.
646  *        Can be NULL if there are no modifiers or @a notify_mod is
647  *        provided instead.
648  * @param notify_mod
649  *        Function to call to obtain modifiers.
650  *        Can be NULL if there are no modifiers or @a env is provided instead.
651  * @param notify_data
652  *        Function to call to obtain fragments of the data.
653  * @param notify_cls
654  *        Closure for @a notify_mod and @a notify_data.
655  * @param flags
656  *        Flags for the message being transmitted.
657  *
658  * @return #GNUNET_OK if the transmission was started.
659  *         #GNUNET_SYSERR if another transmission is already going on.
660  */
661 int
662 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
663                               const char *method_name,
664                               const struct GNUNET_ENV_Environment *env,
665                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
666                               GNUNET_PSYC_TransmitNotifyData notify_data,
667                               void *notify_cls,
668                               uint32_t flags)
669 {
670   if (GNUNET_NO != tmit->in_transmit)
671     return GNUNET_SYSERR;
672   tmit->in_transmit = GNUNET_YES;
673
674   size_t size = strlen (method_name) + 1;
675   struct GNUNET_PSYC_MessageMethod *pmeth;
676   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
677   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
678
679   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
680   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
681   pmeth->header.size = htons (sizeof (*pmeth) + size);
682   pmeth->flags = htonl (flags);
683   memcpy (&pmeth[1], method_name, size);
684
685   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
686   tmit->notify_data = notify_data;
687   tmit->notify_data_cls = notify_cls;
688
689   if (NULL != notify_mod)
690   {
691     tmit->notify_mod = notify_mod;
692     tmit->notify_mod_cls = notify_cls;
693   }
694   else
695   {
696     tmit->notify_mod = &transmit_notify_env;
697     tmit->notify_mod_cls = tmit;
698     if (NULL != env)
699     {
700       struct GNUNET_ENV_Modifier mod = {};
701       mod.next = GNUNET_ENV_environment_head (env);
702       tmit->mod = &mod;
703     }
704     else
705     {
706       tmit->mod = NULL;
707     }
708   }
709
710   transmit_mod (tmit);
711   return GNUNET_OK;
712 }
713
714
715 /**
716  * Resume transmission.
717  *
718  * @param tmit  Transmission handle.
719  */
720 void
721 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
722 {
723   if (0 == tmit->acks_pending)
724   {
725     tmit->paused = GNUNET_NO;
726     transmit_data (tmit);
727   }
728 }
729
730
731 /**
732  * Abort transmission request.
733  *
734  * @param tmit  Transmission handle.
735  */
736 void
737 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
738 {
739   if (GNUNET_NO == tmit->in_transmit)
740     return;
741
742   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
743   tmit->in_transmit = GNUNET_NO;
744   tmit->paused = GNUNET_NO;
745
746   /* FIXME */
747   struct GNUNET_MessageHeader msg;
748   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
749   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
750   msg.size = htons (sizeof (msg));
751   transmit_queue_insert (tmit, &msg, GNUNET_YES);
752 }
753
754
755 /**
756  * Got acknowledgement of a transmitted message part, continue transmission.
757  *
758  * @param tmit  Transmission handle.
759  */
760 void
761 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
762 {
763   if (0 == tmit->acks_pending)
764   {
765     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
766     GNUNET_break (0);
767     return;
768   }
769   tmit->acks_pending--;
770
771   switch (tmit->state)
772   {
773   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
774   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
775     if (GNUNET_NO == tmit->paused)
776       transmit_mod (tmit);
777     break;
778
779   case GNUNET_PSYC_MESSAGE_STATE_DATA:
780     if (GNUNET_NO == tmit->paused)
781       transmit_data (tmit);
782     break;
783
784   case GNUNET_PSYC_MESSAGE_STATE_END:
785   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
786     break;
787
788   default:
789     LOG (GNUNET_ERROR_TYPE_DEBUG,
790          "Ignoring message ACK in state %u.\n", tmit->state);
791   }
792 }
793
794
795 /**** Receiving messages ****/
796
797
798 /**
799  * Create handle for receiving messages.
800  */
801 struct GNUNET_PSYC_ReceiveHandle *
802 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
803                             GNUNET_PSYC_MessagePartCallback message_part_cb,
804                             void *cb_cls)
805 {
806   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
807   recv->message_cb = message_cb;
808   recv->message_part_cb = message_part_cb;
809   recv->cb_cls = cb_cls;
810   return recv;
811 }
812
813
814 /**
815  * Destroy handle for receiving messages.
816  */
817 void
818 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
819 {
820   GNUNET_free (recv);
821 }
822
823
824 /**
825  * Reset stored data related to the last received message.
826  */
827 void
828 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
829 {
830   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
831   recv->flags = 0;
832   recv->message_id = 0;
833   recv->mod_value_size = 0;
834   recv->mod_value_size_expected = 0;
835 }
836
837
838 static void
839 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
840 {
841   if (NULL != recv->message_part_cb)
842     recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL);
843
844   if (NULL != recv->message_cb)
845     recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
846
847   GNUNET_PSYC_receive_reset (recv);
848 }
849
850
851 /**
852  * Handle incoming PSYC message.
853  *
854  * @param recv  Receive handle.
855  * @param msg   The message.
856  *
857  * @return #GNUNET_OK on success,
858  *         #GNUNET_SYSERR on receive error.
859  */
860 int
861 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
862                              const struct GNUNET_PSYC_MessageHeader *msg)
863 {
864   uint16_t size = ntohs (msg->header.size);
865   uint32_t flags = ntohl (msg->flags);
866   uint64_t message_id;
867
868   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
869                            (struct GNUNET_MessageHeader *) msg);
870
871   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
872   {
873     recv->message_id = GNUNET_ntohll (msg->message_id);
874     recv->flags = flags;
875     recv->slave_key = msg->slave_key;
876     recv->mod_value_size = 0;
877     recv->mod_value_size_expected = 0;
878   }
879   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
880   {
881     // FIXME
882     LOG (GNUNET_ERROR_TYPE_WARNING,
883          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
884          GNUNET_ntohll (msg->message_id), recv->message_id);
885     GNUNET_break_op (0);
886     recv_error (recv);
887     return GNUNET_SYSERR;
888   }
889   else if (flags != recv->flags)
890   {
891     LOG (GNUNET_ERROR_TYPE_WARNING,
892          "Unexpected message flags. Got: %lu, expected: %lu\n",
893          flags, recv->flags);
894     GNUNET_break_op (0);
895     recv_error (recv);
896     return GNUNET_SYSERR;
897   }
898   message_id = recv->message_id;
899
900   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
901
902   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
903   {
904     const struct GNUNET_MessageHeader *pmsg
905       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
906     psize = ntohs (pmsg->size);
907     ptype = ntohs (pmsg->type);
908     size_eq = size_min = 0;
909
910     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
911     {
912       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
913                   "Dropping message of type %u with invalid size %u.\n",
914                   ptype, psize);
915       recv_error (recv);
916       return GNUNET_SYSERR;
917     }
918
919     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920                 "Received message part of type %u and size %u from PSYC.\n",
921                 ptype, psize);
922     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
923
924     switch (ptype)
925     {
926     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
927       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
928       break;
929     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
930       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
931       break;
932     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
933     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
934       size_min = sizeof (struct GNUNET_MessageHeader);
935       break;
936     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
937     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
938       size_eq = sizeof (struct GNUNET_MessageHeader);
939       break;
940     default:
941       GNUNET_break_op (0);
942       recv_error (recv);
943       return GNUNET_SYSERR;
944     }
945
946     if (! ((0 < size_eq && psize == size_eq)
947            || (0 < size_min && size_min <= psize)))
948     {
949       GNUNET_break_op (0);
950       recv_error (recv);
951       return GNUNET_SYSERR;
952     }
953
954     switch (ptype)
955     {
956     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
957     {
958       struct GNUNET_PSYC_MessageMethod *meth
959         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
960
961       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
962       {
963         LOG (GNUNET_ERROR_TYPE_WARNING,
964              "Dropping out of order message method (%u).\n",
965              recv->state);
966         /* It is normal to receive an incomplete message right after connecting,
967          * but should not happen later.
968          * FIXME: add a check for this condition.
969          */
970         GNUNET_break_op (0);
971         recv_error (recv);
972         return GNUNET_SYSERR;
973       }
974
975       if ('\0' != *((char *) meth + psize - 1))
976       {
977         LOG (GNUNET_ERROR_TYPE_WARNING,
978              "Dropping message with malformed method. "
979              "Message ID: %" PRIu64 "\n", recv->message_id);
980         GNUNET_break_op (0);
981         recv_error (recv);
982         return GNUNET_SYSERR;
983       }
984       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
985       break;
986     }
987     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
988     {
989       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
990             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
991             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
992       {
993         LOG (GNUNET_ERROR_TYPE_WARNING,
994              "Dropping out of order message modifier (%u).\n",
995              recv->state);
996         GNUNET_break_op (0);
997         recv_error (recv);
998         return GNUNET_SYSERR;
999       }
1000
1001       struct GNUNET_PSYC_MessageModifier *mod
1002         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1003
1004       uint16_t name_size = ntohs (mod->name_size);
1005       recv->mod_value_size_expected = ntohl (mod->value_size);
1006       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1007
1008       if (psize < sizeof (*mod) + name_size
1009           || '\0' != *((char *) &mod[1] + name_size - 1)
1010           || recv->mod_value_size_expected < recv->mod_value_size)
1011       {
1012         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1013         GNUNET_break_op (0);
1014         recv_error (recv);
1015         return GNUNET_SYSERR;
1016       }
1017       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1018       break;
1019     }
1020     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1021     {
1022       recv->mod_value_size += psize - sizeof (*pmsg);
1023
1024       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1025             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1026           || recv->mod_value_size_expected < recv->mod_value_size)
1027       {
1028         LOG (GNUNET_ERROR_TYPE_WARNING,
1029              "Dropping out of order message modifier continuation "
1030              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1031              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1032              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1033              recv->mod_value_size_expected, recv->mod_value_size);
1034         GNUNET_break_op (0);
1035         recv_error (recv);
1036         return GNUNET_SYSERR;
1037       }
1038       break;
1039     }
1040     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1041     {
1042       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1043           || recv->mod_value_size_expected != recv->mod_value_size)
1044       {
1045         LOG (GNUNET_ERROR_TYPE_WARNING,
1046              "Dropping out of order message data fragment "
1047              "(%u < %u || %lu != %lu).\n",
1048              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1049              recv->mod_value_size_expected, recv->mod_value_size);
1050
1051         GNUNET_break_op (0);
1052         recv_error (recv);
1053         return GNUNET_SYSERR;
1054       }
1055       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1056       break;
1057     }
1058     }
1059
1060     if (NULL != recv->message_part_cb)
1061       recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset
1062                              recv->flags, pmsg);
1063
1064     switch (ptype)
1065     {
1066     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1067     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1068       GNUNET_PSYC_receive_reset (recv);
1069       break;
1070     }
1071   }
1072
1073   if (NULL != recv->message_cb)
1074     recv->message_cb (recv->cb_cls, message_id, flags, msg);
1075   return GNUNET_OK;
1076 }
1077
1078
1079 /**
1080  * Check if @a data contains a series of valid message parts.
1081  *
1082  * @param      data_size    Size of @a data.
1083  * @param      data         Data.
1084  * @param[out] first_ptype  Type of first message part.
1085  * @param[out] last_ptype   Type of last message part.
1086  *
1087  * @return Number of message parts found in @a data.
1088  *         or GNUNET_SYSERR if the message contains invalid parts.
1089  */
1090 int
1091 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1092                                  uint16_t *first_ptype, uint16_t *last_ptype)
1093 {
1094   const struct GNUNET_MessageHeader *pmsg;
1095   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1096   if (NULL != first_ptype)
1097     *first_ptype = 0;
1098   if (NULL != last_ptype)
1099     *last_ptype = 0;
1100
1101   for (pos = 0; pos < data_size; pos += psize, parts++)
1102   {
1103     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1104     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1105     psize = ntohs (pmsg->size);
1106     ptype = ntohs (pmsg->type);
1107     if (0 == parts && NULL != first_ptype)
1108       *first_ptype = ptype;
1109     if (NULL != last_ptype
1110         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1111       *last_ptype = ptype;
1112     if (psize < sizeof (*pmsg)
1113         || pos + psize > data_size
1114         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1115         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1116     {
1117       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1118                   "Invalid message part of type %u and size %u.\n",
1119                   ptype, psize);
1120       return GNUNET_SYSERR;
1121     }
1122     /** @todo FIXME: check message part order */
1123   }
1124   return parts;
1125 }
1126
1127
1128 struct ParseMessageClosure
1129 {
1130   struct GNUNET_ENV_Environment *env;
1131   const char **method_name;
1132   const void **data;
1133   uint16_t *data_size;
1134   enum GNUNET_PSYC_MessageState msg_state;
1135 };
1136
1137
1138 static void
1139 parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset,
1140                        uint32_t flags, const struct GNUNET_MessageHeader *msg)
1141 {
1142   struct ParseMessageClosure *pmc = cls;
1143   if (NULL == msg)
1144   {
1145     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1146     return;
1147   }
1148
1149   switch (ntohs (msg->type))
1150   {
1151   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1152   {
1153     struct GNUNET_PSYC_MessageMethod *
1154       pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1155     *pmc->method_name = (const char *) &pmeth[1];
1156     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1157     break;
1158   }
1159
1160   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1161   {
1162     struct GNUNET_PSYC_MessageModifier *
1163       pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1164
1165     const char *name = (const char *) &pmod[1];
1166     const void *value = name + pmod->name_size;
1167     GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1168                                 pmod->value_size);
1169     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1170     break;
1171   }
1172
1173   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1174     *pmc->data = &msg[1];
1175     *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1176     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1177     break;
1178
1179   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1180     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1181     break;
1182
1183   default:
1184     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1185   }
1186 }
1187
1188
1189 /**
1190  * Parse PSYC message.
1191  *
1192  * @param msg
1193  *        The PSYC message to parse.
1194  * @param[out] method_name
1195  *        Pointer to the method name inside @a pmsg.
1196  * @param env
1197  *        The environment for the message with a list of modifiers.
1198  * @param[out] data
1199  *        Pointer to data inside @a pmsg.
1200  * @param[out] data_size
1201  *        Size of @data is written here.
1202  *
1203  * @return #GNUNET_OK on success,
1204  *         #GNUNET_SYSERR on parse error.
1205  */
1206 int
1207 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg,
1208                            const char **method_name,
1209                            struct GNUNET_ENV_Environment *env,
1210                            const void **data,
1211                            uint16_t *data_size)
1212 {
1213   struct ParseMessageClosure cls;
1214   cls.env = env;
1215   cls.method_name = method_name;
1216   cls.data = data;
1217   cls.data_size = data_size;
1218
1219   uint16_t msg_size = ntohs (msg->header.size);
1220   struct GNUNET_PSYC_MessageHeader *
1221     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1222   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1223
1224   struct GNUNET_PSYC_ReceiveHandle *
1225     recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls);
1226   GNUNET_PSYC_receive_message (recv, pmsg);
1227   GNUNET_PSYC_receive_destroy (recv);
1228   GNUNET_free (pmsg);
1229
1230   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1231     ? GNUNET_OK
1232     : GNUNET_SYSERR;
1233 }