8c214d2b642f4f90a607ccd503461e878d577e85
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37 struct GNUNET_PSYC_TransmitHandle
38 {
39   /**
40    * Client connection to service.
41    */
42   struct GNUNET_CLIENT_MANAGER_Connection *client;
43
44   /**
45    * Message currently being received from the client.
46    */
47   struct GNUNET_MessageHeader *msg;
48
49   /**
50    * Callback to request next modifier from client.
51    */
52   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
53
54   /**
55    * Closure for the notify callbacks.
56    */
57   void *notify_mod_cls;
58
59   /**
60    * Callback to request next data fragment from client.
61    */
62   GNUNET_PSYC_TransmitNotifyData notify_data;
63
64   /**
65    * Closure for the notify callbacks.
66    */
67   void *notify_data_cls;
68
69   /**
70    * Modifier of the environment that is currently being transmitted.
71    */
72   struct GNUNET_PSYC_Modifier *mod;
73
74   /**
75    *
76    */
77   const char *mod_value;
78
79   /**
80    * Number of bytes remaining to be transmitted from the current modifier value.
81    */
82   uint32_t mod_value_remaining;
83
84   /**
85    * State of the current message being received from client.
86    */
87   enum GNUNET_PSYC_MessageState state;
88
89   /**
90    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
91    */
92   uint8_t acks_pending;
93
94   /**
95    * Is transmission paused?
96    */
97   uint8_t paused;
98
99   /**
100    * Are we currently transmitting a message?
101    */
102   uint8_t in_transmit;
103
104   /**
105    * Notify callback is currently being called.
106    */
107   uint8_t in_notify;
108
109 };
110
111
112
113 struct GNUNET_PSYC_ReceiveHandle
114 {
115   /**
116    * Message callback.
117    */
118   GNUNET_PSYC_MessageCallback message_cb;
119
120   /**
121    * Message part callback.
122    */
123   GNUNET_PSYC_MessagePartCallback message_part_cb;
124
125   /**
126    * Closure for the callbacks.
127    */
128   void *cb_cls;
129
130   /**
131    * ID of the message being received from the PSYC service.
132    */
133   uint64_t message_id;
134
135   /**
136    * Public key of the slave from which a message is being received.
137    */
138   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
139
140   /**
141    * State of the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageState state;
144
145   /**
146    * Flags for the currently being received message from the PSYC service.
147    */
148   enum GNUNET_PSYC_MessageFlags flags;
149
150   /**
151    * Expected value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size_expected;
154
155   /**
156    * Actual value size for the modifier being received from the PSYC service.
157    */
158   uint32_t mod_value_size;
159 };
160
161
162 /**** Messages ****/
163
164
165 /**
166  * Create a PSYC message.
167  *
168  * @param method_name
169  *        PSYC method for the message.
170  * @param env
171  *        Environment for the message.
172  * @param data
173  *        Data payload for the message.
174  * @param data_size
175  *        Size of @a data.
176  *
177  * @return Message header with size information,
178  *         followed by the message parts.
179  */
180 struct GNUNET_PSYC_Message *
181 GNUNET_PSYC_message_create (const char *method_name,
182                             const struct GNUNET_PSYC_Environment *env,
183                             const void *data,
184                             size_t data_size)
185 {
186   struct GNUNET_PSYC_Modifier *mod = NULL;
187   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
188   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
189   struct GNUNET_MessageHeader *pmsg = NULL;
190   uint16_t env_size = 0;
191   if (NULL != env)
192   {
193     mod = GNUNET_PSYC_env_head (env);
194     while (NULL != mod)
195     {
196       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
197       mod = mod->next;
198     }
199   }
200
201   struct GNUNET_PSYC_Message *msg;
202   uint16_t method_name_size = strlen (method_name) + 1;
203   if (method_name_size == 1)
204     return NULL;
205
206   uint16_t msg_size = sizeof (*msg)                      /* header */
207     + sizeof (*pmeth) + method_name_size                 /* method */
208     + env_size                                           /* modifiers */
209     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
210     + sizeof (*pmsg);                                    /* end of message */
211   msg = GNUNET_malloc (msg_size);
212   msg->header.size = htons (msg_size);
213   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
214
215   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
216   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
217   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
218   memcpy (&pmeth[1], method_name, method_name_size);
219
220   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
221   if (NULL != env)
222   {
223     mod = GNUNET_PSYC_env_head (env);
224     while (NULL != mod)
225     {
226       uint16_t mod_name_size = strlen (mod->name) + 1;
227       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
228       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
229       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
230       p += pmod->header.size;
231       pmod->header.size = htons (pmod->header.size);
232
233       pmod->oper = mod->oper;
234       pmod->name_size = htons (mod_name_size);
235       pmod->value_size = htonl (mod->value_size);
236
237       memcpy (&pmod[1], mod->name, mod_name_size);
238       if (0 < mod->value_size)
239         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
240
241       mod = mod->next;
242     }
243   }
244
245   if (0 < data_size)
246   {
247     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
248     pmsg->size = sizeof (*pmsg) + data_size;
249     p += pmsg->size;
250     pmsg->size = htons (pmsg->size);
251     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
252     memcpy (&pmsg[1], data, data_size);
253   }
254
255   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
256   pmsg->size = htons (sizeof (*pmsg));
257   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
258
259   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
260   return msg;
261 }
262
263
264 void
265 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
266                          const struct GNUNET_MessageHeader *msg)
267 {
268   uint16_t size = ntohs (msg->size);
269   uint16_t type = ntohs (msg->type);
270   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
271   switch (type)
272   {
273   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
274   {
275     struct GNUNET_PSYC_MessageHeader *pmsg
276       = (struct GNUNET_PSYC_MessageHeader *) msg;
277     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
278                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
279     break;
280   }
281   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
282   {
283     struct GNUNET_PSYC_MessageMethod *meth
284       = (struct GNUNET_PSYC_MessageMethod *) msg;
285     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
286     break;
287   }
288   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
289   {
290     struct GNUNET_PSYC_MessageModifier *mod
291       = (struct GNUNET_PSYC_MessageModifier *) msg;
292     uint16_t name_size = ntohs (mod->name_size);
293     char oper = ' ' < mod->oper ? mod->oper : ' ';
294     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
295                 size - sizeof (*mod) - name_size,
296                 ((char *) &mod[1]) + name_size);
297     break;
298   }
299   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
300   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
301     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
302     break;
303   }
304 }
305
306
307 /**** Transmitting messages ****/
308
309
310 /**
311  * Create a transmission handle.
312  */
313 struct GNUNET_PSYC_TransmitHandle *
314 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
315 {
316   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
317   tmit->client = client;
318   return tmit;
319 }
320
321
322 /**
323  * Destroy a transmission handle.
324  */
325 void
326 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
327 {
328   GNUNET_free (tmit);
329 }
330
331
332 /**
333  * Queue a message part for transmission.
334  *
335  * The message part is added to the current message buffer.
336  * When this buffer is full, it is added to the transmission queue.
337  *
338  * @param tmit
339  *        Transmission handle.
340  * @param msg
341  *        Message part, or NULL.
342  * @param tmit_now
343  *        Transmit message now, or wait for buffer to fill up?
344  *        #GNUNET_YES or #GNUNET_NO.
345  */
346 static void
347 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
348                        const struct GNUNET_MessageHeader *msg,
349                        uint8_t tmit_now)
350 {
351   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
352
353   LOG (GNUNET_ERROR_TYPE_DEBUG,
354        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
355        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
356
357   if (NULL != tmit->msg)
358   {
359     if (NULL == msg
360         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
361     {
362       /* End of message or buffer is full, add it to transmission queue
363        * and start with empty buffer */
364       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
365       tmit->msg->size = htons (tmit->msg->size);
366       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
367       tmit->msg = NULL;
368       tmit->acks_pending++;
369     }
370     else
371     {
372       /* Message fits in current buffer, append */
373       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
374       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
375       tmit->msg->size += size;
376     }
377   }
378
379   if (NULL == tmit->msg && NULL != msg)
380   {
381     /* Empty buffer, copy over message. */
382     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
383     tmit->msg->size = sizeof (*tmit->msg) + size;
384     memcpy (&tmit->msg[1], msg, size);
385   }
386
387   if (NULL != tmit->msg
388       && (GNUNET_YES == tmit_now
389           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
390               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
391   {
392     /* End of message or buffer is full, add it to transmission queue. */
393     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
394     tmit->msg->size = htons (tmit->msg->size);
395     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
396     tmit->msg = NULL;
397     tmit->acks_pending++;
398   }
399 }
400
401
402 /**
403  * Request data from client to transmit.
404  *
405  * @param tmit  Transmission handle.
406  */
407 static void
408 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
409 {
410   int notify_ret = GNUNET_YES;
411   uint16_t data_size = 0;
412   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
413   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
414   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
415
416   if (NULL != tmit->notify_data)
417   {
418     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
419     tmit->in_notify = GNUNET_YES;
420     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
421     tmit->in_notify = GNUNET_NO;
422   }
423   LOG (GNUNET_ERROR_TYPE_DEBUG,
424        "transmit_data (ret: %d, size: %u): %.*s\n",
425        notify_ret, data_size, data_size, &msg[1]);
426   switch (notify_ret)
427   {
428   case GNUNET_NO:
429     if (0 == data_size)
430     {
431       /* Transmission paused, nothing to send. */
432       tmit->paused = GNUNET_YES;
433       return;
434     }
435     break;
436
437   case GNUNET_YES:
438     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
439     break;
440
441   default:
442     LOG (GNUNET_ERROR_TYPE_ERROR,
443          "TransmitNotifyData callback returned error when requesting data.\n");
444
445     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
446     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
447     msg->size = htons (sizeof (*msg));
448     transmit_queue_insert (tmit, msg, GNUNET_YES);
449     tmit->in_transmit = GNUNET_NO;
450     return;
451   }
452
453   if (0 < data_size)
454   {
455     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
456     msg->size = htons (sizeof (*msg) + data_size);
457     transmit_queue_insert (tmit, msg, !notify_ret);
458   }
459
460   /* End of message. */
461   if (GNUNET_YES == notify_ret)
462   {
463     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
464     msg->size = htons (sizeof (*msg));
465     transmit_queue_insert (tmit, msg, GNUNET_YES);
466     /* FIXME: wait for ACK before setting in_transmit to no */
467     tmit->in_transmit = GNUNET_NO;
468   }
469 }
470
471
472 /**
473  * Request a modifier from a client to transmit.
474  *
475  * @param tmit  Transmission handle.
476  */
477 static void
478 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
479 {
480   uint16_t max_data_size = 0;
481   uint16_t data_size = 0;
482   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
483   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
484   int notify_ret = GNUNET_YES;
485
486   switch (tmit->state)
487   {
488   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
489   {
490     struct GNUNET_PSYC_MessageModifier *mod
491       = (struct GNUNET_PSYC_MessageModifier *) msg;
492     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
493     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
494
495     if (NULL != tmit->notify_mod)
496     {
497       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
498       data_size = max_data_size;
499       tmit->in_notify = GNUNET_YES;
500       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
501                                      &mod->oper, &mod->value_size);
502       tmit->in_notify = GNUNET_NO;
503     }
504
505     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
506     LOG (GNUNET_ERROR_TYPE_DEBUG,
507          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
508          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
509     if (mod->name_size < data_size)
510     {
511       tmit->mod_value_remaining
512         = mod->value_size - (data_size - mod->name_size);
513       mod->value_size = htonl (mod->value_size);
514       mod->name_size = htons (mod->name_size);
515     }
516     else if (0 < data_size)
517     {
518       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
519       notify_ret = GNUNET_SYSERR;
520     }
521     break;
522   }
523   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
524   {
525     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
526     msg->size = sizeof (struct GNUNET_MessageHeader);
527
528     if (NULL != tmit->notify_mod)
529     {
530       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
531       data_size = max_data_size;
532       tmit->in_notify = GNUNET_YES;
533       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
534                                      &data_size, &msg[1], NULL, NULL);
535       tmit->in_notify = GNUNET_NO;
536     }
537     tmit->mod_value_remaining -= data_size;
538     LOG (GNUNET_ERROR_TYPE_DEBUG,
539          "transmit_mod (ret: %d, size: %u): %.*s\n",
540          notify_ret, data_size, data_size, &msg[1]);
541     break;
542   }
543   default:
544     GNUNET_assert (0);
545   }
546
547   switch (notify_ret)
548   {
549   case GNUNET_NO:
550     if (0 == data_size)
551     { /* Transmission paused, nothing to send. */
552       tmit->paused = GNUNET_YES;
553       return;
554     }
555     tmit->state
556       = (0 == tmit->mod_value_remaining)
557       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
558       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
559     break;
560
561   case GNUNET_YES: /* End of modifiers. */
562     GNUNET_assert (0 == tmit->mod_value_remaining);
563     break;
564
565   default:
566     LOG (GNUNET_ERROR_TYPE_ERROR,
567          "TransmitNotifyModifier callback returned with error.\n");
568
569     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
570     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
571     msg->size = htons (sizeof (*msg));
572     transmit_queue_insert (tmit, msg, GNUNET_YES);
573     tmit->in_transmit = GNUNET_NO;
574     return;
575   }
576
577   if (0 < data_size)
578   {
579     GNUNET_assert (data_size <= max_data_size);
580     msg->size = htons (msg->size + data_size);
581     transmit_queue_insert (tmit, msg, GNUNET_NO);
582   }
583
584   if (GNUNET_YES == notify_ret)
585   {
586     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
587     if (0 == tmit->acks_pending)
588       transmit_data (tmit);
589   }
590   else
591   {
592     transmit_mod (tmit);
593   }
594 }
595
596
597 int
598 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
599                      uint32_t *full_value_size)
600
601 {
602   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
603   uint16_t name_size = 0;
604   uint32_t value_size = 0;
605   const char *value = NULL;
606
607   if (NULL != oper)
608   { /* New modifier */
609     if (NULL != tmit->mod)
610       tmit->mod = tmit->mod->next;
611     if (NULL == tmit->mod)
612     { /* No more modifiers, continue with data */
613       *data_size = 0;
614       return GNUNET_YES;
615     }
616
617     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
618     *full_value_size = tmit->mod->value_size;
619     *oper = tmit->mod->oper;
620     name_size = strlen (tmit->mod->name) + 1;
621
622     if (name_size + tmit->mod->value_size <= *data_size)
623     {
624       value_size = tmit->mod->value_size;
625       *data_size = name_size + value_size;
626     }
627     else /* full modifier does not fit in data, continuation needed */
628     {
629       value_size = *data_size - name_size;
630       tmit->mod_value = tmit->mod->value + value_size;
631     }
632
633     memcpy (data, tmit->mod->name, name_size);
634     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
635     return GNUNET_NO;
636   }
637   else
638   { /* Modifier continuation */
639     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
640     value = tmit->mod_value;
641     if (tmit->mod_value_remaining <= *data_size)
642     {
643       value_size = tmit->mod_value_remaining;
644       tmit->mod_value = NULL;
645     }
646     else
647     {
648       value_size = *data_size;
649       tmit->mod_value += value_size;
650     }
651
652     if (*data_size < value_size)
653     {
654       LOG (GNUNET_ERROR_TYPE_DEBUG,
655            "Value in environment larger than buffer: %u < %zu\n",
656            *data_size, value_size);
657       *data_size = 0;
658       return GNUNET_NO;
659     }
660
661     *data_size = value_size;
662     memcpy (data, value, value_size);
663     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
664   }
665 }
666
667
668 /**
669  * Transmit a message.
670  *
671  * @param tmit
672  *        Transmission handle.
673  * @param method_name
674  *        Which method should be invoked.
675  * @param env
676  *        Environment for the message.
677  *        Should stay available until the first call to notify_data.
678  *        Can be NULL if there are no modifiers or @a notify_mod is
679  *        provided instead.
680  * @param notify_mod
681  *        Function to call to obtain modifiers.
682  *        Can be NULL if there are no modifiers or @a env is provided instead.
683  * @param notify_data
684  *        Function to call to obtain fragments of the data.
685  * @param notify_cls
686  *        Closure for @a notify_mod and @a notify_data.
687  * @param flags
688  *        Flags for the message being transmitted.
689  *
690  * @return #GNUNET_OK if the transmission was started.
691  *         #GNUNET_SYSERR if another transmission is already going on.
692  */
693 int
694 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
695                               const char *method_name,
696                               const struct GNUNET_PSYC_Environment *env,
697                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
698                               GNUNET_PSYC_TransmitNotifyData notify_data,
699                               void *notify_cls,
700                               uint32_t flags)
701 {
702   if (GNUNET_NO != tmit->in_transmit)
703     return GNUNET_SYSERR;
704   tmit->in_transmit = GNUNET_YES;
705
706   size_t size = strlen (method_name) + 1;
707   struct GNUNET_PSYC_MessageMethod *pmeth;
708   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
709   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
710
711   if (NULL != notify_mod)
712   {
713     tmit->notify_mod = notify_mod;
714     tmit->notify_mod_cls = notify_cls;
715   }
716   else
717   {
718     tmit->notify_mod = &transmit_notify_env;
719     tmit->notify_mod_cls = tmit;
720     if (NULL != env)
721     {
722       struct GNUNET_PSYC_Modifier mod = {};
723       mod.next = GNUNET_PSYC_env_head (env);
724       tmit->mod = &mod;
725
726       struct GNUNET_PSYC_Modifier *m = tmit->mod;
727       while (NULL != (m = m->next))
728       {
729         if (m->oper != GNUNET_PSYC_OP_SET)
730           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
731       }
732     }
733     else
734     {
735       tmit->mod = NULL;
736     }
737   }
738
739   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
740   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
741   pmeth->header.size = htons (sizeof (*pmeth) + size);
742   pmeth->flags = htonl (flags);
743   memcpy (&pmeth[1], method_name, size);
744
745   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
746   tmit->notify_data = notify_data;
747   tmit->notify_data_cls = notify_cls;
748
749   transmit_mod (tmit);
750   return GNUNET_OK;
751 }
752
753
754 /**
755  * Resume transmission.
756  *
757  * @param tmit  Transmission handle.
758  */
759 void
760 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
761 {
762   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
763     return;
764
765   if (0 == tmit->acks_pending)
766   {
767     tmit->paused = GNUNET_NO;
768     transmit_data (tmit);
769   }
770 }
771
772
773 /**
774  * Abort transmission request.
775  *
776  * @param tmit  Transmission handle.
777  */
778 void
779 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
780 {
781   if (GNUNET_NO == tmit->in_transmit)
782     return;
783
784   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
785   tmit->in_transmit = GNUNET_NO;
786   tmit->paused = GNUNET_NO;
787
788   /* FIXME */
789   struct GNUNET_MessageHeader msg;
790   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
791   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
792   msg.size = htons (sizeof (msg));
793   transmit_queue_insert (tmit, &msg, GNUNET_YES);
794 }
795
796
797 /**
798  * Got acknowledgement of a transmitted message part, continue transmission.
799  *
800  * @param tmit  Transmission handle.
801  */
802 void
803 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
804 {
805   if (0 == tmit->acks_pending)
806   {
807     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
808     GNUNET_break (0);
809     return;
810   }
811   tmit->acks_pending--;
812
813   switch (tmit->state)
814   {
815   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
816   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
817     transmit_mod (tmit);
818     break;
819
820   case GNUNET_PSYC_MESSAGE_STATE_DATA:
821     transmit_data (tmit);
822     break;
823
824   case GNUNET_PSYC_MESSAGE_STATE_END:
825   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
826     break;
827
828   default:
829     LOG (GNUNET_ERROR_TYPE_DEBUG,
830          "Ignoring message ACK in state %u.\n", tmit->state);
831   }
832 }
833
834
835 /**** Receiving messages ****/
836
837
838 /**
839  * Create handle for receiving messages.
840  */
841 struct GNUNET_PSYC_ReceiveHandle *
842 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
843                             GNUNET_PSYC_MessagePartCallback message_part_cb,
844                             void *cb_cls)
845 {
846   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
847   recv->message_cb = message_cb;
848   recv->message_part_cb = message_part_cb;
849   recv->cb_cls = cb_cls;
850   return recv;
851 }
852
853
854 /**
855  * Destroy handle for receiving messages.
856  */
857 void
858 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
859 {
860   GNUNET_free (recv);
861 }
862
863
864 /**
865  * Reset stored data related to the last received message.
866  */
867 void
868 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
869 {
870   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
871   recv->flags = 0;
872   recv->message_id = 0;
873   recv->mod_value_size = 0;
874   recv->mod_value_size_expected = 0;
875 }
876
877
878 static void
879 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
880 {
881   if (NULL != recv->message_part_cb)
882     recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
883                            0, NULL);
884
885   if (NULL != recv->message_cb)
886     recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
887
888   GNUNET_PSYC_receive_reset (recv);
889 }
890
891
892 /**
893  * Handle incoming PSYC message.
894  *
895  * @param recv  Receive handle.
896  * @param msg   The message.
897  *
898  * @return #GNUNET_OK on success,
899  *         #GNUNET_SYSERR on receive error.
900  */
901 int
902 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
903                              const struct GNUNET_PSYC_MessageHeader *msg)
904 {
905   uint16_t size = ntohs (msg->header.size);
906   uint32_t flags = ntohl (msg->flags);
907   uint64_t message_id;
908
909   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
910                            (struct GNUNET_MessageHeader *) msg);
911
912   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
913   {
914     recv->message_id = GNUNET_ntohll (msg->message_id);
915     recv->flags = flags;
916     recv->slave_key = msg->slave_key;
917     recv->mod_value_size = 0;
918     recv->mod_value_size_expected = 0;
919   }
920   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
921   {
922     // FIXME
923     LOG (GNUNET_ERROR_TYPE_WARNING,
924          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
925          GNUNET_ntohll (msg->message_id), recv->message_id);
926     GNUNET_break_op (0);
927     recv_error (recv);
928     return GNUNET_SYSERR;
929   }
930   else if (flags != recv->flags)
931   {
932     LOG (GNUNET_ERROR_TYPE_WARNING,
933          "Unexpected message flags. Got: %lu, expected: %lu\n",
934          flags, recv->flags);
935     GNUNET_break_op (0);
936     recv_error (recv);
937     return GNUNET_SYSERR;
938   }
939   message_id = recv->message_id;
940
941   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
942
943   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
944   {
945     const struct GNUNET_MessageHeader *pmsg
946       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
947     psize = ntohs (pmsg->size);
948     ptype = ntohs (pmsg->type);
949     size_eq = size_min = 0;
950
951     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
952     {
953       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
954                   "Dropping message of type %u with invalid size %u.\n",
955                   ptype, psize);
956       recv_error (recv);
957       return GNUNET_SYSERR;
958     }
959
960     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961                 "Received message part of type %u and size %u from PSYC.\n",
962                 ptype, psize);
963     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
964
965     switch (ptype)
966     {
967     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
968       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
969       break;
970     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
971       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
972       break;
973     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
974     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
975       size_min = sizeof (struct GNUNET_MessageHeader);
976       break;
977     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
978     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
979       size_eq = sizeof (struct GNUNET_MessageHeader);
980       break;
981     default:
982       GNUNET_break_op (0);
983       recv_error (recv);
984       return GNUNET_SYSERR;
985     }
986
987     if (! ((0 < size_eq && psize == size_eq)
988            || (0 < size_min && size_min <= psize)))
989     {
990       GNUNET_break_op (0);
991       recv_error (recv);
992       return GNUNET_SYSERR;
993     }
994
995     switch (ptype)
996     {
997     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
998     {
999       struct GNUNET_PSYC_MessageMethod *meth
1000         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1001
1002       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1003       {
1004         LOG (GNUNET_ERROR_TYPE_WARNING,
1005              "Dropping out of order message method (%u).\n",
1006              recv->state);
1007         /* It is normal to receive an incomplete message right after connecting,
1008          * but should not happen later.
1009          * FIXME: add a check for this condition.
1010          */
1011         GNUNET_break_op (0);
1012         recv_error (recv);
1013         return GNUNET_SYSERR;
1014       }
1015
1016       if ('\0' != *((char *) meth + psize - 1))
1017       {
1018         LOG (GNUNET_ERROR_TYPE_WARNING,
1019              "Dropping message with malformed method. "
1020              "Message ID: %" PRIu64 "\n", recv->message_id);
1021         GNUNET_break_op (0);
1022         recv_error (recv);
1023         return GNUNET_SYSERR;
1024       }
1025       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1026       break;
1027     }
1028     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1029     {
1030       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1031             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1032             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1033       {
1034         LOG (GNUNET_ERROR_TYPE_WARNING,
1035              "Dropping out of order message modifier (%u).\n",
1036              recv->state);
1037         GNUNET_break_op (0);
1038         recv_error (recv);
1039         return GNUNET_SYSERR;
1040       }
1041
1042       struct GNUNET_PSYC_MessageModifier *mod
1043         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1044
1045       uint16_t name_size = ntohs (mod->name_size);
1046       recv->mod_value_size_expected = ntohl (mod->value_size);
1047       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1048
1049       if (psize < sizeof (*mod) + name_size
1050           || '\0' != *((char *) &mod[1] + name_size - 1)
1051           || recv->mod_value_size_expected < recv->mod_value_size)
1052       {
1053         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1054         GNUNET_break_op (0);
1055         recv_error (recv);
1056         return GNUNET_SYSERR;
1057       }
1058       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1059       break;
1060     }
1061     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1062     {
1063       recv->mod_value_size += psize - sizeof (*pmsg);
1064
1065       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1066             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1067           || recv->mod_value_size_expected < recv->mod_value_size)
1068       {
1069         LOG (GNUNET_ERROR_TYPE_WARNING,
1070              "Dropping out of order message modifier continuation "
1071              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1072              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1073              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1074              recv->mod_value_size_expected, recv->mod_value_size);
1075         GNUNET_break_op (0);
1076         recv_error (recv);
1077         return GNUNET_SYSERR;
1078       }
1079       break;
1080     }
1081     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1082     {
1083       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1084           || recv->mod_value_size_expected != recv->mod_value_size)
1085       {
1086         LOG (GNUNET_ERROR_TYPE_WARNING,
1087              "Dropping out of order message data fragment "
1088              "(%u < %u || %lu != %lu).\n",
1089              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1090              recv->mod_value_size_expected, recv->mod_value_size);
1091
1092         GNUNET_break_op (0);
1093         recv_error (recv);
1094         return GNUNET_SYSERR;
1095       }
1096       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1097       break;
1098     }
1099     }
1100
1101     if (NULL != recv->message_part_cb)
1102       recv->message_part_cb (recv->cb_cls, &recv->slave_key,
1103                              recv->message_id, recv->flags,
1104                              0, // FIXME: data_offset
1105                              pmsg);
1106
1107     switch (ptype)
1108     {
1109     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1110     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1111       GNUNET_PSYC_receive_reset (recv);
1112       break;
1113     }
1114   }
1115
1116   if (NULL != recv->message_cb)
1117     recv->message_cb (recv->cb_cls, message_id, flags, msg);
1118   return GNUNET_OK;
1119 }
1120
1121
1122 /**
1123  * Check if @a data contains a series of valid message parts.
1124  *
1125  * @param      data_size    Size of @a data.
1126  * @param      data         Data.
1127  * @param[out] first_ptype  Type of first message part.
1128  * @param[out] last_ptype   Type of last message part.
1129  *
1130  * @return Number of message parts found in @a data.
1131  *         or GNUNET_SYSERR if the message contains invalid parts.
1132  */
1133 int
1134 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1135                                  uint16_t *first_ptype, uint16_t *last_ptype)
1136 {
1137   const struct GNUNET_MessageHeader *pmsg;
1138   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1139   if (NULL != first_ptype)
1140     *first_ptype = 0;
1141   if (NULL != last_ptype)
1142     *last_ptype = 0;
1143
1144   for (pos = 0; pos < data_size; pos += psize, parts++)
1145   {
1146     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1147     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1148     psize = ntohs (pmsg->size);
1149     ptype = ntohs (pmsg->type);
1150     if (0 == parts && NULL != first_ptype)
1151       *first_ptype = ptype;
1152     if (NULL != last_ptype
1153         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1154       *last_ptype = ptype;
1155     if (psize < sizeof (*pmsg)
1156         || pos + psize > data_size
1157         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1158         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1159     {
1160       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161                   "Invalid message part of type %u and size %u.\n",
1162                   ptype, psize);
1163       return GNUNET_SYSERR;
1164     }
1165     /** @todo FIXME: check message part order */
1166   }
1167   return parts;
1168 }
1169
1170
1171 struct ParseMessageClosure
1172 {
1173   struct GNUNET_PSYC_Environment *env;
1174   const char **method_name;
1175   const void **data;
1176   uint16_t *data_size;
1177   enum GNUNET_PSYC_MessageState msg_state;
1178 };
1179
1180
1181 static void
1182 parse_message_part_cb (void *cls,
1183                        const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1184                        uint64_t message_id, uint32_t flags, uint64_t data_offset,
1185                        const struct GNUNET_MessageHeader *msg)
1186 {
1187   struct ParseMessageClosure *pmc = cls;
1188   if (NULL == msg)
1189   {
1190     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1191     return;
1192   }
1193
1194   switch (ntohs (msg->type))
1195   {
1196   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1197   {
1198     struct GNUNET_PSYC_MessageMethod *
1199       pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1200     *pmc->method_name = (const char *) &pmeth[1];
1201     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1202     break;
1203   }
1204
1205   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1206   {
1207     struct GNUNET_PSYC_MessageModifier *
1208       pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1209
1210     const char *name = (const char *) &pmod[1];
1211     const void *value = name + ntohs (pmod->name_size);
1212     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1213                          ntohl (pmod->value_size));
1214     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1215     break;
1216   }
1217
1218   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1219     *pmc->data = &msg[1];
1220     *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1221     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1222     break;
1223
1224   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1225     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1226     break;
1227
1228   default:
1229     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1230   }
1231 }
1232
1233
1234 /**
1235  * Parse PSYC message.
1236  *
1237  * @param msg
1238  *        The PSYC message to parse.
1239  * @param[out] method_name
1240  *        Pointer to the method name inside @a pmsg.
1241  * @param env
1242  *        The environment for the message with a list of modifiers.
1243  * @param[out] data
1244  *        Pointer to data inside @a pmsg.
1245  * @param[out] data_size
1246  *        Size of @data is written here.
1247  *
1248  * @return #GNUNET_OK on success,
1249  *         #GNUNET_SYSERR on parse error.
1250  */
1251 int
1252 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1253                            const char **method_name,
1254                            struct GNUNET_PSYC_Environment *env,
1255                            const void **data,
1256                            uint16_t *data_size)
1257 {
1258   struct ParseMessageClosure cls;
1259   cls.env = env;
1260   cls.method_name = method_name;
1261   cls.data = data;
1262   cls.data_size = data_size;
1263
1264   struct GNUNET_PSYC_ReceiveHandle *
1265     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1266   int ret = GNUNET_PSYC_receive_message (recv, msg);
1267   GNUNET_PSYC_receive_destroy (recv);
1268
1269   if (GNUNET_OK != ret)
1270     return GNUNET_SYSERR;
1271
1272   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1273     ? GNUNET_OK
1274     : GNUNET_NO;
1275 }
1276
1277
1278 /**
1279  * Initialize PSYC message header.
1280  */
1281 void
1282 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1283                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1284                                  uint32_t flags)
1285 {
1286   uint16_t size = ntohs (mmsg->header.size);
1287   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1288
1289   pmsg->header.size = htons (psize);
1290   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1291   pmsg->message_id = mmsg->message_id;
1292   pmsg->fragment_offset = mmsg->fragment_offset;
1293   pmsg->flags = htonl (flags);
1294
1295   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1296 }
1297
1298
1299 /**
1300  * Create a new PSYC message header from a multicast message.
1301  */
1302 struct GNUNET_PSYC_MessageHeader *
1303 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1304                                    uint32_t flags)
1305 {
1306   struct GNUNET_PSYC_MessageHeader *pmsg;
1307   uint16_t size = ntohs (mmsg->header.size);
1308   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1309
1310   pmsg = GNUNET_malloc (psize);
1311   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1312   return pmsg;
1313 }
1314
1315
1316 /**
1317  * Create a new PSYC message header from a PSYC message.
1318  */
1319 struct GNUNET_PSYC_MessageHeader *
1320 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1321 {
1322   uint16_t msg_size = ntohs (msg->header.size);
1323   struct GNUNET_PSYC_MessageHeader *
1324     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1325   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1326   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1327   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1328   return pmsg;
1329 }