-only trigger check config if we actually need it
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37 struct GNUNET_PSYC_TransmitHandle
38 {
39   /**
40    * Client connection to service.
41    */
42   struct GNUNET_CLIENT_MANAGER_Connection *client;
43
44   /**
45    * Message currently being received from the client.
46    */
47   struct GNUNET_MessageHeader *msg;
48
49   /**
50    * Callback to request next modifier from client.
51    */
52   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
53
54   /**
55    * Closure for the notify callbacks.
56    */
57   void *notify_mod_cls;
58
59   /**
60    * Callback to request next data fragment from client.
61    */
62   GNUNET_PSYC_TransmitNotifyData notify_data;
63
64   /**
65    * Closure for the notify callbacks.
66    */
67   void *notify_data_cls;
68
69   /**
70    * Modifier of the environment that is currently being transmitted.
71    */
72   struct GNUNET_PSYC_Modifier *mod;
73
74   /**
75    *
76    */
77   const char *mod_value;
78
79   /**
80    * Number of bytes remaining to be transmitted from the current modifier value.
81    */
82   uint32_t mod_value_remaining;
83
84   /**
85    * State of the current message being received from client.
86    */
87   enum GNUNET_PSYC_MessageState state;
88
89   /**
90    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
91    */
92   uint8_t acks_pending;
93
94   /**
95    * Is transmission paused?
96    */
97   uint8_t paused;
98
99   /**
100    * Are we currently transmitting a message?
101    */
102   uint8_t in_transmit;
103
104   /**
105    * Notify callback is currently being called.
106    */
107   uint8_t in_notify;
108
109 };
110
111
112
113 struct GNUNET_PSYC_ReceiveHandle
114 {
115   /**
116    * Message callback.
117    */
118   GNUNET_PSYC_MessageCallback message_cb;
119
120   /**
121    * Message part callback.
122    */
123   GNUNET_PSYC_MessagePartCallback message_part_cb;
124
125   /**
126    * Closure for the callbacks.
127    */
128   void *cb_cls;
129
130   /**
131    * ID of the message being received from the PSYC service.
132    */
133   uint64_t message_id;
134
135   /**
136    * Public key of the slave from which a message is being received.
137    */
138   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
139
140   /**
141    * State of the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageState state;
144
145   /**
146    * Flags for the currently being received message from the PSYC service.
147    */
148   enum GNUNET_PSYC_MessageFlags flags;
149
150   /**
151    * Expected value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size_expected;
154
155   /**
156    * Actual value size for the modifier being received from the PSYC service.
157    */
158   uint32_t mod_value_size;
159 };
160
161
162 /**** Messages ****/
163
164
165 /**
166  * Create a PSYC message.
167  *
168  * @param method_name
169  *        PSYC method for the message.
170  * @param env
171  *        Environment for the message.
172  * @param data
173  *        Data payload for the message.
174  * @param data_size
175  *        Size of @a data.
176  *
177  * @return Message header with size information,
178  *         followed by the message parts.
179  */
180 struct GNUNET_PSYC_Message *
181 GNUNET_PSYC_message_create (const char *method_name,
182                             const struct GNUNET_PSYC_Environment *env,
183                             const void *data,
184                             size_t data_size)
185 {
186   struct GNUNET_PSYC_Modifier *mod = NULL;
187   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
188   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
189   struct GNUNET_MessageHeader *pmsg = NULL;
190   uint16_t env_size = 0;
191   if (NULL != env)
192   {
193     mod = GNUNET_PSYC_env_head (env);
194     while (NULL != mod)
195     {
196       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
197       mod = mod->next;
198     }
199   }
200
201   struct GNUNET_PSYC_Message *msg;
202   uint16_t method_name_size = strlen (method_name) + 1;
203   if (method_name_size == 1)
204     return NULL;
205
206   uint16_t msg_size = sizeof (*msg)                      /* header */
207     + sizeof (*pmeth) + method_name_size                 /* method */
208     + env_size                                           /* modifiers */
209     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
210     + sizeof (*pmsg);                                    /* end of message */
211   msg = GNUNET_malloc (msg_size);
212   msg->header.size = htons (msg_size);
213   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
214
215   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
216   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
217   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
218   memcpy (&pmeth[1], method_name, method_name_size);
219
220   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
221   if (NULL != env)
222   {
223     mod = GNUNET_PSYC_env_head (env);
224     while (NULL != mod)
225     {
226       uint16_t mod_name_size = strlen (mod->name) + 1;
227       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
228       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
229       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
230       p += pmod->header.size;
231       pmod->header.size = htons (pmod->header.size);
232
233       pmod->oper = mod->oper;
234       pmod->name_size = htons (mod_name_size);
235       pmod->value_size = htonl (mod->value_size);
236
237       memcpy (&pmod[1], mod->name, mod_name_size);
238       if (0 < mod->value_size)
239         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
240
241       mod = mod->next;
242     }
243   }
244
245   if (0 < data_size)
246   {
247     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
248     pmsg->size = sizeof (*pmsg) + data_size;
249     p += pmsg->size;
250     pmsg->size = htons (pmsg->size);
251     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
252     memcpy (&pmsg[1], data, data_size);
253   }
254
255   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
256   pmsg->size = htons (sizeof (*pmsg));
257   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
258
259   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
260   return msg;
261 }
262
263
264 void
265 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
266                          const struct GNUNET_MessageHeader *msg)
267 {
268   uint16_t size = ntohs (msg->size);
269   uint16_t type = ntohs (msg->type);
270
271   GNUNET_log (kind,
272               "Message of type %d and size %u:\n",
273               type,
274               size);
275   switch (type)
276   {
277   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
278   {
279     const struct GNUNET_PSYC_MessageHeader *pmsg
280       = (const struct GNUNET_PSYC_MessageHeader *) msg;
281     GNUNET_log (kind,
282                 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
283                 GNUNET_ntohll (pmsg->message_id),
284                 ntohl (pmsg->flags));
285     break;
286   }
287   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
288   {
289     const struct GNUNET_PSYC_MessageMethod *meth
290       = (const struct GNUNET_PSYC_MessageMethod *) msg;
291     GNUNET_log (kind,
292                 "\t%.*s\n",
293                 (int) (size - sizeof (*meth)),
294                 (const char *) &meth[1]);
295     break;
296   }
297   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
298   {
299     const struct GNUNET_PSYC_MessageModifier *mod
300       = (const struct GNUNET_PSYC_MessageModifier *) msg;
301     uint16_t name_size = ntohs (mod->name_size);
302     char oper = ' ' < mod->oper ? mod->oper : ' ';
303     GNUNET_log (kind,
304                 "\t%c%.*s\t%.*s\n",
305                 oper,
306                 (int) name_size,
307                 (const char *) &mod[1],
308                 (int) (size - sizeof (*mod) - name_size),
309                 ((const char *) &mod[1]) + name_size);
310     break;
311   }
312   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
313   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
314     GNUNET_log (kind,
315                 "\t%.*s\n",
316                 (int) (size - sizeof (*msg)),
317                 (const char *) &msg[1]);
318     break;
319   }
320 }
321
322
323 /**** Transmitting messages ****/
324
325
326 /**
327  * Create a transmission handle.
328  */
329 struct GNUNET_PSYC_TransmitHandle *
330 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
331 {
332   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
333
334   tmit->client = client;
335   return tmit;
336 }
337
338
339 /**
340  * Destroy a transmission handle.
341  */
342 void
343 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
344 {
345   GNUNET_free (tmit);
346 }
347
348
349 /**
350  * Queue a message part for transmission.
351  *
352  * The message part is added to the current message buffer.
353  * When this buffer is full, it is added to the transmission queue.
354  *
355  * @param tmit
356  *        Transmission handle.
357  * @param msg
358  *        Message part, or NULL.
359  * @param tmit_now
360  *        Transmit message now, or wait for buffer to fill up?
361  *        #GNUNET_YES or #GNUNET_NO.
362  */
363 static void
364 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
365                        const struct GNUNET_MessageHeader *msg,
366                        uint8_t tmit_now)
367 {
368   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
369
370   LOG (GNUNET_ERROR_TYPE_DEBUG,
371        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
372        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
373
374   if (NULL != tmit->msg)
375   {
376     if (NULL == msg
377         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
378     {
379       /* End of message or buffer is full, add it to transmission queue
380        * and start with empty buffer */
381       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
382       tmit->msg->size = htons (tmit->msg->size);
383       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
384       tmit->msg = NULL;
385       tmit->acks_pending++;
386     }
387     else
388     {
389       /* Message fits in current buffer, append */
390       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
391       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
392       tmit->msg->size += size;
393     }
394   }
395
396   if (NULL == tmit->msg && NULL != msg)
397   {
398     /* Empty buffer, copy over message. */
399     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
400     tmit->msg->size = sizeof (*tmit->msg) + size;
401     memcpy (&tmit->msg[1], msg, size);
402   }
403
404   if (NULL != tmit->msg
405       && (GNUNET_YES == tmit_now
406           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
407               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
408   {
409     /* End of message or buffer is full, add it to transmission queue. */
410     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
411     tmit->msg->size = htons (tmit->msg->size);
412     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
413     tmit->msg = NULL;
414     tmit->acks_pending++;
415   }
416 }
417
418
419 /**
420  * Request data from client to transmit.
421  *
422  * @param tmit  Transmission handle.
423  */
424 static void
425 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
426 {
427   int notify_ret = GNUNET_YES;
428   uint16_t data_size = 0;
429   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
430   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
431   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
432
433   if (NULL != tmit->notify_data)
434   {
435     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
436     tmit->in_notify = GNUNET_YES;
437     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
438     tmit->in_notify = GNUNET_NO;
439   }
440   LOG (GNUNET_ERROR_TYPE_DEBUG,
441        "transmit_data (ret: %d, size: %u): %.*s\n",
442        notify_ret, data_size, data_size, &msg[1]);
443   switch (notify_ret)
444   {
445   case GNUNET_NO:
446     if (0 == data_size)
447     {
448       /* Transmission paused, nothing to send. */
449       tmit->paused = GNUNET_YES;
450       return;
451     }
452     break;
453
454   case GNUNET_YES:
455     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
456     break;
457
458   default:
459     LOG (GNUNET_ERROR_TYPE_ERROR,
460          "TransmitNotifyData callback returned error when requesting data.\n");
461
462     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
463     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
464     msg->size = htons (sizeof (*msg));
465     transmit_queue_insert (tmit, msg, GNUNET_YES);
466     tmit->in_transmit = GNUNET_NO;
467     return;
468   }
469
470   if (0 < data_size)
471   {
472     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
473     msg->size = htons (sizeof (*msg) + data_size);
474     transmit_queue_insert (tmit, msg, !notify_ret);
475   }
476
477   /* End of message. */
478   if (GNUNET_YES == notify_ret)
479   {
480     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
481     msg->size = htons (sizeof (*msg));
482     transmit_queue_insert (tmit, msg, GNUNET_YES);
483     /* FIXME: wait for ACK before setting in_transmit to no */
484     tmit->in_transmit = GNUNET_NO;
485   }
486 }
487
488
489 /**
490  * Request a modifier from a client to transmit.
491  *
492  * @param tmit  Transmission handle.
493  */
494 static void
495 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
496 {
497   uint16_t max_data_size = 0;
498   uint16_t data_size = 0;
499   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
500   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
501   int notify_ret = GNUNET_YES;
502
503   switch (tmit->state)
504   {
505   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
506   {
507     struct GNUNET_PSYC_MessageModifier *mod
508       = (struct GNUNET_PSYC_MessageModifier *) msg;
509     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
510     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
511
512     if (NULL != tmit->notify_mod)
513     {
514       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
515       data_size = max_data_size;
516       tmit->in_notify = GNUNET_YES;
517       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
518                                      &mod->oper, &mod->value_size);
519       tmit->in_notify = GNUNET_NO;
520     }
521
522     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
523     LOG (GNUNET_ERROR_TYPE_DEBUG,
524          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
525          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
526     if (mod->name_size < data_size)
527     {
528       tmit->mod_value_remaining
529         = mod->value_size - (data_size - mod->name_size);
530       mod->value_size = htonl (mod->value_size);
531       mod->name_size = htons (mod->name_size);
532     }
533     else if (0 < data_size)
534     {
535       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
536       notify_ret = GNUNET_SYSERR;
537     }
538     break;
539   }
540   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
541   {
542     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
543     msg->size = sizeof (struct GNUNET_MessageHeader);
544
545     if (NULL != tmit->notify_mod)
546     {
547       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
548       data_size = max_data_size;
549       tmit->in_notify = GNUNET_YES;
550       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
551                                      &data_size, &msg[1], NULL, NULL);
552       tmit->in_notify = GNUNET_NO;
553     }
554     tmit->mod_value_remaining -= data_size;
555     LOG (GNUNET_ERROR_TYPE_DEBUG,
556          "transmit_mod (ret: %d, size: %u): %.*s\n",
557          notify_ret, data_size, data_size, &msg[1]);
558     break;
559   }
560   default:
561     GNUNET_assert (0);
562   }
563
564   switch (notify_ret)
565   {
566   case GNUNET_NO:
567     if (0 == data_size)
568     { /* Transmission paused, nothing to send. */
569       tmit->paused = GNUNET_YES;
570       return;
571     }
572     tmit->state
573       = (0 == tmit->mod_value_remaining)
574       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
575       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
576     break;
577
578   case GNUNET_YES: /* End of modifiers. */
579     GNUNET_assert (0 == tmit->mod_value_remaining);
580     break;
581
582   default:
583     LOG (GNUNET_ERROR_TYPE_ERROR,
584          "TransmitNotifyModifier callback returned with error.\n");
585
586     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
587     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
588     msg->size = htons (sizeof (*msg));
589     transmit_queue_insert (tmit, msg, GNUNET_YES);
590     tmit->in_transmit = GNUNET_NO;
591     return;
592   }
593
594   if (0 < data_size)
595   {
596     GNUNET_assert (data_size <= max_data_size);
597     msg->size = htons (msg->size + data_size);
598     transmit_queue_insert (tmit, msg, GNUNET_NO);
599   }
600
601   if (GNUNET_YES == notify_ret)
602   {
603     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
604     if (0 == tmit->acks_pending)
605       transmit_data (tmit);
606   }
607   else
608   {
609     transmit_mod (tmit);
610   }
611 }
612
613
614 int
615 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
616                      uint32_t *full_value_size)
617
618 {
619   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
620   uint16_t name_size = 0;
621   uint32_t value_size = 0;
622   const char *value = NULL;
623
624   if (NULL != oper)
625   { /* New modifier */
626     if (NULL != tmit->mod)
627       tmit->mod = tmit->mod->next;
628     if (NULL == tmit->mod)
629     { /* No more modifiers, continue with data */
630       *data_size = 0;
631       return GNUNET_YES;
632     }
633
634     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
635     *full_value_size = tmit->mod->value_size;
636     *oper = tmit->mod->oper;
637     name_size = strlen (tmit->mod->name) + 1;
638
639     if (name_size + tmit->mod->value_size <= *data_size)
640     {
641       value_size = tmit->mod->value_size;
642       *data_size = name_size + value_size;
643     }
644     else /* full modifier does not fit in data, continuation needed */
645     {
646       value_size = *data_size - name_size;
647       tmit->mod_value = tmit->mod->value + value_size;
648     }
649
650     memcpy (data, tmit->mod->name, name_size);
651     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
652     return GNUNET_NO;
653   }
654   else
655   { /* Modifier continuation */
656     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
657     value = tmit->mod_value;
658     if (tmit->mod_value_remaining <= *data_size)
659     {
660       value_size = tmit->mod_value_remaining;
661       tmit->mod_value = NULL;
662     }
663     else
664     {
665       value_size = *data_size;
666       tmit->mod_value += value_size;
667     }
668
669     if (*data_size < value_size)
670     {
671       LOG (GNUNET_ERROR_TYPE_DEBUG,
672            "Value in environment larger than buffer: %u < %zu\n",
673            *data_size, value_size);
674       *data_size = 0;
675       return GNUNET_NO;
676     }
677
678     *data_size = value_size;
679     memcpy (data, value, value_size);
680     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
681   }
682 }
683
684
685 /**
686  * Transmit a message.
687  *
688  * @param tmit
689  *        Transmission handle.
690  * @param method_name
691  *        Which method should be invoked.
692  * @param env
693  *        Environment for the message.
694  *        Should stay available until the first call to notify_data.
695  *        Can be NULL if there are no modifiers or @a notify_mod is
696  *        provided instead.
697  * @param notify_mod
698  *        Function to call to obtain modifiers.
699  *        Can be NULL if there are no modifiers or @a env is provided instead.
700  * @param notify_data
701  *        Function to call to obtain fragments of the data.
702  * @param notify_cls
703  *        Closure for @a notify_mod and @a notify_data.
704  * @param flags
705  *        Flags for the message being transmitted.
706  *
707  * @return #GNUNET_OK if the transmission was started.
708  *         #GNUNET_SYSERR if another transmission is already going on.
709  */
710 int
711 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
712                               const char *method_name,
713                               const struct GNUNET_PSYC_Environment *env,
714                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
715                               GNUNET_PSYC_TransmitNotifyData notify_data,
716                               void *notify_cls,
717                               uint32_t flags)
718 {
719   if (GNUNET_NO != tmit->in_transmit)
720     return GNUNET_SYSERR;
721   tmit->in_transmit = GNUNET_YES;
722
723   size_t size = strlen (method_name) + 1;
724   struct GNUNET_PSYC_MessageMethod *pmeth;
725   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
726   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
727
728   if (NULL != notify_mod)
729   {
730     tmit->notify_mod = notify_mod;
731     tmit->notify_mod_cls = notify_cls;
732   }
733   else
734   {
735     tmit->notify_mod = &transmit_notify_env;
736     tmit->notify_mod_cls = tmit;
737     if (NULL != env)
738     {
739       struct GNUNET_PSYC_Modifier mod = {};
740       mod.next = GNUNET_PSYC_env_head (env);
741       tmit->mod = &mod;
742
743       struct GNUNET_PSYC_Modifier *m = tmit->mod;
744       while (NULL != (m = m->next))
745       {
746         if (m->oper != GNUNET_PSYC_OP_SET)
747           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
748       }
749     }
750     else
751     {
752       tmit->mod = NULL;
753     }
754   }
755
756   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
757   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
758   pmeth->header.size = htons (sizeof (*pmeth) + size);
759   pmeth->flags = htonl (flags);
760   memcpy (&pmeth[1], method_name, size);
761
762   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
763   tmit->notify_data = notify_data;
764   tmit->notify_data_cls = notify_cls;
765
766   transmit_mod (tmit);
767   return GNUNET_OK;
768 }
769
770
771 /**
772  * Resume transmission.
773  *
774  * @param tmit  Transmission handle.
775  */
776 void
777 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
778 {
779   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
780     return;
781
782   if (0 == tmit->acks_pending)
783   {
784     tmit->paused = GNUNET_NO;
785     transmit_data (tmit);
786   }
787 }
788
789
790 /**
791  * Abort transmission request.
792  *
793  * @param tmit  Transmission handle.
794  */
795 void
796 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
797 {
798   if (GNUNET_NO == tmit->in_transmit)
799     return;
800
801   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
802   tmit->in_transmit = GNUNET_NO;
803   tmit->paused = GNUNET_NO;
804
805   /* FIXME */
806   struct GNUNET_MessageHeader msg;
807   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
808   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
809   msg.size = htons (sizeof (msg));
810   transmit_queue_insert (tmit, &msg, GNUNET_YES);
811 }
812
813
814 /**
815  * Got acknowledgement of a transmitted message part, continue transmission.
816  *
817  * @param tmit  Transmission handle.
818  */
819 void
820 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
821 {
822   if (0 == tmit->acks_pending)
823   {
824     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
825     GNUNET_break (0);
826     return;
827   }
828   tmit->acks_pending--;
829
830   switch (tmit->state)
831   {
832   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
833   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
834     transmit_mod (tmit);
835     break;
836
837   case GNUNET_PSYC_MESSAGE_STATE_DATA:
838     transmit_data (tmit);
839     break;
840
841   case GNUNET_PSYC_MESSAGE_STATE_END:
842   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
843     break;
844
845   default:
846     LOG (GNUNET_ERROR_TYPE_DEBUG,
847          "Ignoring message ACK in state %u.\n", tmit->state);
848   }
849 }
850
851
852 /**** Receiving messages ****/
853
854
855 /**
856  * Create handle for receiving messages.
857  */
858 struct GNUNET_PSYC_ReceiveHandle *
859 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
860                             GNUNET_PSYC_MessagePartCallback message_part_cb,
861                             void *cb_cls)
862 {
863   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
864   recv->message_cb = message_cb;
865   recv->message_part_cb = message_part_cb;
866   recv->cb_cls = cb_cls;
867   return recv;
868 }
869
870
871 /**
872  * Destroy handle for receiving messages.
873  */
874 void
875 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
876 {
877   GNUNET_free (recv);
878 }
879
880
881 /**
882  * Reset stored data related to the last received message.
883  */
884 void
885 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
886 {
887   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
888   recv->flags = 0;
889   recv->message_id = 0;
890   recv->mod_value_size = 0;
891   recv->mod_value_size_expected = 0;
892 }
893
894
895 static void
896 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
897 {
898   if (NULL != recv->message_part_cb)
899     recv->message_part_cb (recv->cb_cls, NULL, NULL);
900
901   if (NULL != recv->message_cb)
902     recv->message_cb (recv->cb_cls, NULL);
903
904   GNUNET_PSYC_receive_reset (recv);
905 }
906
907
908 /**
909  * Handle incoming PSYC message.
910  *
911  * @param recv  Receive handle.
912  * @param msg   The message.
913  *
914  * @return #GNUNET_OK on success,
915  *         #GNUNET_SYSERR on receive error.
916  */
917 int
918 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
919                              const struct GNUNET_PSYC_MessageHeader *msg)
920 {
921   uint16_t size = ntohs (msg->header.size);
922   uint32_t flags = ntohl (msg->flags);
923
924   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
925                            (struct GNUNET_MessageHeader *) msg);
926
927   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
928   {
929     recv->message_id = GNUNET_ntohll (msg->message_id);
930     recv->flags = flags;
931     recv->slave_pub_key = msg->slave_pub_key;
932     recv->mod_value_size = 0;
933     recv->mod_value_size_expected = 0;
934   }
935   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
936   {
937     // FIXME
938     LOG (GNUNET_ERROR_TYPE_WARNING,
939          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
940          GNUNET_ntohll (msg->message_id), recv->message_id);
941     GNUNET_break_op (0);
942     recv_error (recv);
943     return GNUNET_SYSERR;
944   }
945   else if (flags != recv->flags)
946   {
947     LOG (GNUNET_ERROR_TYPE_WARNING,
948          "Unexpected message flags. Got: %lu, expected: %lu\n",
949          flags, recv->flags);
950     GNUNET_break_op (0);
951     recv_error (recv);
952     return GNUNET_SYSERR;
953   }
954
955   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
956
957   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
958   {
959     const struct GNUNET_MessageHeader *pmsg
960       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
961     psize = ntohs (pmsg->size);
962     ptype = ntohs (pmsg->type);
963     size_eq = size_min = 0;
964
965     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
966     {
967       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
968                   "Dropping message of type %u with invalid size %u.\n",
969                   ptype, psize);
970       recv_error (recv);
971       return GNUNET_SYSERR;
972     }
973
974     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                 "Received message part of type %u and size %u from PSYC.\n",
976                 ptype, psize);
977     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
978
979     switch (ptype)
980     {
981     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
982       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
983       break;
984     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
985       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
986       break;
987     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
988     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
989       size_min = sizeof (struct GNUNET_MessageHeader);
990       break;
991     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
992     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
993       size_eq = sizeof (struct GNUNET_MessageHeader);
994       break;
995     default:
996       GNUNET_break_op (0);
997       recv_error (recv);
998       return GNUNET_SYSERR;
999     }
1000
1001     if (! ((0 < size_eq && psize == size_eq)
1002            || (0 < size_min && size_min <= psize)))
1003     {
1004       GNUNET_break_op (0);
1005       recv_error (recv);
1006       return GNUNET_SYSERR;
1007     }
1008
1009     switch (ptype)
1010     {
1011     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1012     {
1013       struct GNUNET_PSYC_MessageMethod *meth
1014         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1015
1016       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1017       {
1018         LOG (GNUNET_ERROR_TYPE_WARNING,
1019              "Dropping out of order message method (%u).\n",
1020              recv->state);
1021         /* It is normal to receive an incomplete message right after connecting,
1022          * but should not happen later.
1023          * FIXME: add a check for this condition.
1024          */
1025         GNUNET_break_op (0);
1026         recv_error (recv);
1027         return GNUNET_SYSERR;
1028       }
1029
1030       if ('\0' != *((char *) meth + psize - 1))
1031       {
1032         LOG (GNUNET_ERROR_TYPE_WARNING,
1033              "Dropping message with malformed method. "
1034              "Message ID: %" PRIu64 "\n", recv->message_id);
1035         GNUNET_break_op (0);
1036         recv_error (recv);
1037         return GNUNET_SYSERR;
1038       }
1039       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1040       break;
1041     }
1042     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1043     {
1044       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1045             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1046             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1047       {
1048         LOG (GNUNET_ERROR_TYPE_WARNING,
1049              "Dropping out of order message modifier (%u).\n",
1050              recv->state);
1051         GNUNET_break_op (0);
1052         recv_error (recv);
1053         return GNUNET_SYSERR;
1054       }
1055
1056       struct GNUNET_PSYC_MessageModifier *mod
1057         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1058
1059       uint16_t name_size = ntohs (mod->name_size);
1060       recv->mod_value_size_expected = ntohl (mod->value_size);
1061       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1062
1063       if (psize < sizeof (*mod) + name_size
1064           || '\0' != *((char *) &mod[1] + name_size - 1)
1065           || recv->mod_value_size_expected < recv->mod_value_size)
1066       {
1067         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1068         GNUNET_break_op (0);
1069         recv_error (recv);
1070         return GNUNET_SYSERR;
1071       }
1072       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1073       break;
1074     }
1075     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1076     {
1077       recv->mod_value_size += psize - sizeof (*pmsg);
1078
1079       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1080             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1081           || recv->mod_value_size_expected < recv->mod_value_size)
1082       {
1083         LOG (GNUNET_ERROR_TYPE_WARNING,
1084              "Dropping out of order message modifier continuation "
1085              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1086              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1087              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1088              recv->mod_value_size_expected, recv->mod_value_size);
1089         GNUNET_break_op (0);
1090         recv_error (recv);
1091         return GNUNET_SYSERR;
1092       }
1093       break;
1094     }
1095     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1096     {
1097       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1098           || recv->mod_value_size_expected != recv->mod_value_size)
1099       {
1100         LOG (GNUNET_ERROR_TYPE_WARNING,
1101              "Dropping out of order message data fragment "
1102              "(%u < %u || %lu != %lu).\n",
1103              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1104              recv->mod_value_size_expected, recv->mod_value_size);
1105
1106         GNUNET_break_op (0);
1107         recv_error (recv);
1108         return GNUNET_SYSERR;
1109       }
1110       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1111       break;
1112     }
1113     }
1114
1115     if (NULL != recv->message_part_cb)
1116       recv->message_part_cb (recv->cb_cls, msg, pmsg);
1117
1118     switch (ptype)
1119     {
1120     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1121     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1122       GNUNET_PSYC_receive_reset (recv);
1123       break;
1124     }
1125   }
1126
1127   if (NULL != recv->message_cb)
1128     recv->message_cb (recv->cb_cls, msg);
1129   return GNUNET_OK;
1130 }
1131
1132
1133 /**
1134  * Check if @a data contains a series of valid message parts.
1135  *
1136  * @param      data_size    Size of @a data.
1137  * @param      data         Data.
1138  * @param[out] first_ptype  Type of first message part.
1139  * @param[out] last_ptype   Type of last message part.
1140  *
1141  * @return Number of message parts found in @a data.
1142  *         or GNUNET_SYSERR if the message contains invalid parts.
1143  */
1144 int
1145 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1146                                  uint16_t *first_ptype, uint16_t *last_ptype)
1147 {
1148   const struct GNUNET_MessageHeader *pmsg;
1149   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1150   if (NULL != first_ptype)
1151     *first_ptype = 0;
1152   if (NULL != last_ptype)
1153     *last_ptype = 0;
1154
1155   for (pos = 0; pos < data_size; pos += psize, parts++)
1156   {
1157     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1158     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1159     psize = ntohs (pmsg->size);
1160     ptype = ntohs (pmsg->type);
1161     if (0 == parts && NULL != first_ptype)
1162       *first_ptype = ptype;
1163     if (NULL != last_ptype
1164         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1165       *last_ptype = ptype;
1166     if (psize < sizeof (*pmsg)
1167         || pos + psize > data_size
1168         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1169         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1170     {
1171       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1172                   "Invalid message part of type %u and size %u.\n",
1173                   ptype, psize);
1174       return GNUNET_SYSERR;
1175     }
1176     /** @todo FIXME: check message part order */
1177   }
1178   return parts;
1179 }
1180
1181
1182 struct ParseMessageClosure
1183 {
1184   struct GNUNET_PSYC_Environment *env;
1185   const char **method_name;
1186   const void **data;
1187   uint16_t *data_size;
1188   enum GNUNET_PSYC_MessageState msg_state;
1189 };
1190
1191
1192 static void
1193 parse_message_part_cb (void *cls,
1194                        const struct GNUNET_PSYC_MessageHeader *msg,
1195                        const struct GNUNET_MessageHeader *pmsg)
1196 {
1197   struct ParseMessageClosure *pmc = cls;
1198   if (NULL == pmsg)
1199   {
1200     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1201     return;
1202   }
1203
1204   switch (ntohs (pmsg->type))
1205   {
1206   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1207   {
1208     struct GNUNET_PSYC_MessageMethod *
1209       pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1210     *pmc->method_name = (const char *) &pmeth[1];
1211     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1212     break;
1213   }
1214
1215   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1216   {
1217     struct GNUNET_PSYC_MessageModifier *
1218       pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1219
1220     const char *name = (const char *) &pmod[1];
1221     const void *value = name + ntohs (pmod->name_size);
1222     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1223                          ntohl (pmod->value_size));
1224     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1225     break;
1226   }
1227
1228   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1229     *pmc->data = &pmsg[1];
1230     *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1231     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1232     break;
1233
1234   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1235     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1236     break;
1237
1238   default:
1239     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1240   }
1241 }
1242
1243
1244 /**
1245  * Parse PSYC message.
1246  *
1247  * @param msg
1248  *        The PSYC message to parse.
1249  * @param[out] method_name
1250  *        Pointer to the method name inside @a pmsg.
1251  * @param env
1252  *        The environment for the message with a list of modifiers.
1253  * @param[out] data
1254  *        Pointer to data inside @a msg.
1255  * @param[out] data_size
1256  *        Size of @data is written here.
1257  *
1258  * @return #GNUNET_OK on success,
1259  *         #GNUNET_SYSERR on parse error.
1260  */
1261 int
1262 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1263                            const char **method_name,
1264                            struct GNUNET_PSYC_Environment *env,
1265                            const void **data,
1266                            uint16_t *data_size)
1267 {
1268   struct ParseMessageClosure cls;
1269   cls.env = env;
1270   cls.method_name = method_name;
1271   cls.data = data;
1272   cls.data_size = data_size;
1273
1274   struct GNUNET_PSYC_ReceiveHandle *
1275     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1276   int ret = GNUNET_PSYC_receive_message (recv, msg);
1277   GNUNET_PSYC_receive_destroy (recv);
1278
1279   if (GNUNET_OK != ret)
1280     return GNUNET_SYSERR;
1281
1282   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1283     ? GNUNET_OK
1284     : GNUNET_NO;
1285 }
1286
1287
1288 /**
1289  * Initialize PSYC message header.
1290  */
1291 void
1292 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1293                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1294                                  uint32_t flags)
1295 {
1296   uint16_t size = ntohs (mmsg->header.size);
1297   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1298
1299   pmsg->header.size = htons (psize);
1300   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1301   pmsg->message_id = mmsg->message_id;
1302   pmsg->fragment_offset = mmsg->fragment_offset;
1303   pmsg->flags = htonl (flags);
1304
1305   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1306 }
1307
1308
1309 /**
1310  * Create a new PSYC message header from a multicast message.
1311  */
1312 struct GNUNET_PSYC_MessageHeader *
1313 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1314                                    uint32_t flags)
1315 {
1316   struct GNUNET_PSYC_MessageHeader *pmsg;
1317   uint16_t size = ntohs (mmsg->header.size);
1318   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1319
1320   pmsg = GNUNET_malloc (psize);
1321   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1322   return pmsg;
1323 }
1324
1325
1326 /**
1327  * Create a new PSYC message header from a PSYC message.
1328  */
1329 struct GNUNET_PSYC_MessageHeader *
1330 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1331 {
1332   uint16_t msg_size = ntohs (msg->header.size);
1333   struct GNUNET_PSYC_MessageHeader *
1334     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1335   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1336   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1337   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1338   return pmsg;
1339 }