social cli
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37 struct GNUNET_PSYC_TransmitHandle
38 {
39   /**
40    * Client connection to service.
41    */
42   struct GNUNET_CLIENT_MANAGER_Connection *client;
43
44   /**
45    * Message currently being received from the client.
46    */
47   struct GNUNET_MessageHeader *msg;
48
49   /**
50    * Callback to request next modifier from client.
51    */
52   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
53
54   /**
55    * Closure for the notify callbacks.
56    */
57   void *notify_mod_cls;
58
59   /**
60    * Callback to request next data fragment from client.
61    */
62   GNUNET_PSYC_TransmitNotifyData notify_data;
63
64   /**
65    * Closure for the notify callbacks.
66    */
67   void *notify_data_cls;
68
69   /**
70    * Modifier of the environment that is currently being transmitted.
71    */
72   struct GNUNET_PSYC_Modifier *mod;
73
74   /**
75    *
76    */
77   const char *mod_value;
78
79   /**
80    * Number of bytes remaining to be transmitted from the current modifier value.
81    */
82   uint32_t mod_value_remaining;
83
84   /**
85    * State of the current message being received from client.
86    */
87   enum GNUNET_PSYC_MessageState state;
88
89   /**
90    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
91    */
92   uint8_t acks_pending;
93
94   /**
95    * Is transmission paused?
96    */
97   uint8_t paused;
98
99   /**
100    * Are we currently transmitting a message?
101    */
102   uint8_t in_transmit;
103
104   /**
105    * Notify callback is currently being called.
106    */
107   uint8_t in_notify;
108
109 };
110
111
112
113 struct GNUNET_PSYC_ReceiveHandle
114 {
115   /**
116    * Message callback.
117    */
118   GNUNET_PSYC_MessageCallback message_cb;
119
120   /**
121    * Message part callback.
122    */
123   GNUNET_PSYC_MessagePartCallback message_part_cb;
124
125   /**
126    * Closure for the callbacks.
127    */
128   void *cb_cls;
129
130   /**
131    * ID of the message being received from the PSYC service.
132    */
133   uint64_t message_id;
134
135   /**
136    * Public key of the slave from which a message is being received.
137    */
138   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
139
140   /**
141    * State of the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageState state;
144
145   /**
146    * Flags for the currently being received message from the PSYC service.
147    */
148   enum GNUNET_PSYC_MessageFlags flags;
149
150   /**
151    * Expected value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size_expected;
154
155   /**
156    * Actual value size for the modifier being received from the PSYC service.
157    */
158   uint32_t mod_value_size;
159 };
160
161
162 /**** Messages ****/
163
164
165 /**
166  * Create a PSYC message.
167  *
168  * @param method_name
169  *        PSYC method for the message.
170  * @param env
171  *        Environment for the message.
172  * @param data
173  *        Data payload for the message.
174  * @param data_size
175  *        Size of @a data.
176  *
177  * @return Message header with size information,
178  *         followed by the message parts.
179  */
180 struct GNUNET_PSYC_Message *
181 GNUNET_PSYC_message_create (const char *method_name,
182                             const struct GNUNET_PSYC_Environment *env,
183                             const void *data,
184                             size_t data_size)
185 {
186   struct GNUNET_PSYC_Modifier *mod = NULL;
187   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
188   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
189   struct GNUNET_MessageHeader *pmsg = NULL;
190   uint16_t env_size = 0;
191   if (NULL != env)
192   {
193     mod = GNUNET_PSYC_env_head (env);
194     while (NULL != mod)
195     {
196       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
197       mod = mod->next;
198     }
199   }
200
201   struct GNUNET_PSYC_Message *msg;
202   uint16_t method_name_size = strlen (method_name) + 1;
203   if (method_name_size == 1)
204     return NULL;
205
206   uint16_t msg_size = sizeof (*msg)                      /* header */
207     + sizeof (*pmeth) + method_name_size                 /* method */
208     + env_size                                           /* modifiers */
209     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
210     + sizeof (*pmsg);                                    /* end of message */
211   msg = GNUNET_malloc (msg_size);
212   msg->header.size = htons (msg_size);
213   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
214
215   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
216   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
217   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
218   memcpy (&pmeth[1], method_name, method_name_size);
219
220   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
221   if (NULL != env)
222   {
223     mod = GNUNET_PSYC_env_head (env);
224     while (NULL != mod)
225     {
226       uint16_t mod_name_size = strlen (mod->name) + 1;
227       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
228       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
229       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
230       p += pmod->header.size;
231       pmod->header.size = htons (pmod->header.size);
232
233       pmod->oper = mod->oper;
234       pmod->name_size = htons (mod_name_size);
235       pmod->value_size = htonl (mod->value_size);
236
237       memcpy (&pmod[1], mod->name, mod_name_size);
238       if (0 < mod->value_size)
239         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
240
241       mod = mod->next;
242     }
243   }
244
245   if (0 < data_size)
246   {
247     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
248     pmsg->size = sizeof (*pmsg) + data_size;
249     p += pmsg->size;
250     pmsg->size = htons (pmsg->size);
251     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
252     memcpy (&pmsg[1], data, data_size);
253   }
254
255   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
256   pmsg->size = htons (sizeof (*pmsg));
257   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
258
259   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
260   return msg;
261 }
262
263
264 void
265 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
266                          const struct GNUNET_MessageHeader *msg)
267 {
268   uint16_t size = ntohs (msg->size);
269   uint16_t type = ntohs (msg->type);
270   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
271   switch (type)
272   {
273   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
274   {
275     struct GNUNET_PSYC_MessageHeader *pmsg
276       = (struct GNUNET_PSYC_MessageHeader *) msg;
277     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
278                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
279     break;
280   }
281   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
282   {
283     struct GNUNET_PSYC_MessageMethod *meth
284       = (struct GNUNET_PSYC_MessageMethod *) msg;
285     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
286     break;
287   }
288   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
289   {
290     struct GNUNET_PSYC_MessageModifier *mod
291       = (struct GNUNET_PSYC_MessageModifier *) msg;
292     uint16_t name_size = ntohs (mod->name_size);
293     char oper = ' ' < mod->oper ? mod->oper : ' ';
294     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
295                 size - sizeof (*mod) - name_size,
296                 ((char *) &mod[1]) + name_size);
297     break;
298   }
299   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
300   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
301     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
302     break;
303   }
304 }
305
306
307 /**** Transmitting messages ****/
308
309
310 /**
311  * Create a transmission handle.
312  */
313 struct GNUNET_PSYC_TransmitHandle *
314 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
315 {
316   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
317   tmit->client = client;
318   return tmit;
319 }
320
321
322 /**
323  * Destroy a transmission handle.
324  */
325 void
326 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
327 {
328   GNUNET_free (tmit);
329 }
330
331
332 /**
333  * Queue a message part for transmission.
334  *
335  * The message part is added to the current message buffer.
336  * When this buffer is full, it is added to the transmission queue.
337  *
338  * @param tmit
339  *        Transmission handle.
340  * @param msg
341  *        Message part, or NULL.
342  * @param tmit_now
343  *        Transmit message now, or wait for buffer to fill up?
344  *        #GNUNET_YES or #GNUNET_NO.
345  */
346 static void
347 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
348                        const struct GNUNET_MessageHeader *msg,
349                        uint8_t tmit_now)
350 {
351   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
352
353   LOG (GNUNET_ERROR_TYPE_DEBUG,
354        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
355        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
356
357   if (NULL != tmit->msg)
358   {
359     if (NULL == msg
360         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
361     {
362       /* End of message or buffer is full, add it to transmission queue
363        * and start with empty buffer */
364       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
365       tmit->msg->size = htons (tmit->msg->size);
366       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
367       tmit->msg = NULL;
368       tmit->acks_pending++;
369     }
370     else
371     {
372       /* Message fits in current buffer, append */
373       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
374       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
375       tmit->msg->size += size;
376     }
377   }
378
379   if (NULL == tmit->msg && NULL != msg)
380   {
381     /* Empty buffer, copy over message. */
382     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
383     tmit->msg->size = sizeof (*tmit->msg) + size;
384     memcpy (&tmit->msg[1], msg, size);
385   }
386
387   if (NULL != tmit->msg
388       && (GNUNET_YES == tmit_now
389           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
390               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
391   {
392     /* End of message or buffer is full, add it to transmission queue. */
393     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
394     tmit->msg->size = htons (tmit->msg->size);
395     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
396     tmit->msg = NULL;
397     tmit->acks_pending++;
398   }
399 }
400
401
402 /**
403  * Request data from client to transmit.
404  *
405  * @param tmit  Transmission handle.
406  */
407 static void
408 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
409 {
410   int notify_ret = GNUNET_YES;
411   uint16_t data_size = 0;
412   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
413   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
414   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
415
416   if (NULL != tmit->notify_data)
417   {
418     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
419     tmit->in_notify = GNUNET_YES;
420     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
421     tmit->in_notify = GNUNET_NO;
422   }
423   LOG (GNUNET_ERROR_TYPE_DEBUG,
424        "transmit_data (ret: %d, size: %u): %.*s\n",
425        notify_ret, data_size, data_size, &msg[1]);
426   switch (notify_ret)
427   {
428   case GNUNET_NO:
429     if (0 == data_size)
430     {
431       /* Transmission paused, nothing to send. */
432       tmit->paused = GNUNET_YES;
433       return;
434     }
435     break;
436
437   case GNUNET_YES:
438     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
439     break;
440
441   default:
442     LOG (GNUNET_ERROR_TYPE_ERROR,
443          "TransmitNotifyData callback returned error when requesting data.\n");
444
445     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
446     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
447     msg->size = htons (sizeof (*msg));
448     transmit_queue_insert (tmit, msg, GNUNET_YES);
449     tmit->in_transmit = GNUNET_NO;
450     return;
451   }
452
453   if (0 < data_size)
454   {
455     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
456     msg->size = htons (sizeof (*msg) + data_size);
457     transmit_queue_insert (tmit, msg, !notify_ret);
458   }
459
460   /* End of message. */
461   if (GNUNET_YES == notify_ret)
462   {
463     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
464     msg->size = htons (sizeof (*msg));
465     transmit_queue_insert (tmit, msg, GNUNET_YES);
466     /* FIXME: wait for ACK before setting in_transmit to no */
467     tmit->in_transmit = GNUNET_NO;
468   }
469 }
470
471
472 /**
473  * Request a modifier from a client to transmit.
474  *
475  * @param tmit  Transmission handle.
476  */
477 static void
478 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
479 {
480   uint16_t max_data_size = 0;
481   uint16_t data_size = 0;
482   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
483   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
484   int notify_ret = GNUNET_YES;
485
486   switch (tmit->state)
487   {
488   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
489   {
490     struct GNUNET_PSYC_MessageModifier *mod
491       = (struct GNUNET_PSYC_MessageModifier *) msg;
492     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
493     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
494
495     if (NULL != tmit->notify_mod)
496     {
497       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
498       data_size = max_data_size;
499       tmit->in_notify = GNUNET_YES;
500       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
501                                      &mod->oper, &mod->value_size);
502       tmit->in_notify = GNUNET_NO;
503     }
504
505     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
506     LOG (GNUNET_ERROR_TYPE_DEBUG,
507          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
508          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
509     if (mod->name_size < data_size)
510     {
511       tmit->mod_value_remaining
512         = mod->value_size - (data_size - mod->name_size);
513       mod->value_size = htonl (mod->value_size);
514       mod->name_size = htons (mod->name_size);
515     }
516     else if (0 < data_size)
517     {
518       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
519       notify_ret = GNUNET_SYSERR;
520     }
521     break;
522   }
523   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
524   {
525     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
526     msg->size = sizeof (struct GNUNET_MessageHeader);
527
528     if (NULL != tmit->notify_mod)
529     {
530       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
531       data_size = max_data_size;
532       tmit->in_notify = GNUNET_YES;
533       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
534                                      &data_size, &msg[1], NULL, NULL);
535       tmit->in_notify = GNUNET_NO;
536     }
537     tmit->mod_value_remaining -= data_size;
538     LOG (GNUNET_ERROR_TYPE_DEBUG,
539          "transmit_mod (ret: %d, size: %u): %.*s\n",
540          notify_ret, data_size, data_size, &msg[1]);
541     break;
542   }
543   default:
544     GNUNET_assert (0);
545   }
546
547   switch (notify_ret)
548   {
549   case GNUNET_NO:
550     if (0 == data_size)
551     { /* Transmission paused, nothing to send. */
552       tmit->paused = GNUNET_YES;
553       return;
554     }
555     tmit->state
556       = (0 == tmit->mod_value_remaining)
557       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
558       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
559     break;
560
561   case GNUNET_YES: /* End of modifiers. */
562     GNUNET_assert (0 == tmit->mod_value_remaining);
563     break;
564
565   default:
566     LOG (GNUNET_ERROR_TYPE_ERROR,
567          "TransmitNotifyModifier callback returned with error.\n");
568
569     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
570     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
571     msg->size = htons (sizeof (*msg));
572     transmit_queue_insert (tmit, msg, GNUNET_YES);
573     tmit->in_transmit = GNUNET_NO;
574     return;
575   }
576
577   if (0 < data_size)
578   {
579     GNUNET_assert (data_size <= max_data_size);
580     msg->size = htons (msg->size + data_size);
581     transmit_queue_insert (tmit, msg, GNUNET_NO);
582   }
583
584   if (GNUNET_YES == notify_ret)
585   {
586     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
587     if (0 == tmit->acks_pending)
588       transmit_data (tmit);
589   }
590   else
591   {
592     transmit_mod (tmit);
593   }
594 }
595
596
597 int
598 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
599                      uint32_t *full_value_size)
600
601 {
602   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
603   uint16_t name_size = 0;
604   uint32_t value_size = 0;
605   const char *value = NULL;
606
607   if (NULL != oper)
608   { /* New modifier */
609     if (NULL != tmit->mod)
610       tmit->mod = tmit->mod->next;
611     if (NULL == tmit->mod)
612     { /* No more modifiers, continue with data */
613       *data_size = 0;
614       return GNUNET_YES;
615     }
616
617     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
618     *full_value_size = tmit->mod->value_size;
619     *oper = tmit->mod->oper;
620     name_size = strlen (tmit->mod->name) + 1;
621
622     if (name_size + tmit->mod->value_size <= *data_size)
623     {
624       value_size = tmit->mod->value_size;
625       *data_size = name_size + value_size;
626     }
627     else /* full modifier does not fit in data, continuation needed */
628     {
629       value_size = *data_size - name_size;
630       tmit->mod_value = tmit->mod->value + value_size;
631     }
632
633     memcpy (data, tmit->mod->name, name_size);
634     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
635     return GNUNET_NO;
636   }
637   else
638   { /* Modifier continuation */
639     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
640     value = tmit->mod_value;
641     if (tmit->mod_value_remaining <= *data_size)
642     {
643       value_size = tmit->mod_value_remaining;
644       tmit->mod_value = NULL;
645     }
646     else
647     {
648       value_size = *data_size;
649       tmit->mod_value += value_size;
650     }
651
652     if (*data_size < value_size)
653     {
654       LOG (GNUNET_ERROR_TYPE_DEBUG,
655            "Value in environment larger than buffer: %u < %zu\n",
656            *data_size, value_size);
657       *data_size = 0;
658       return GNUNET_NO;
659     }
660
661     *data_size = value_size;
662     memcpy (data, value, value_size);
663     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
664   }
665 }
666
667
668 /**
669  * Transmit a message.
670  *
671  * @param tmit
672  *        Transmission handle.
673  * @param method_name
674  *        Which method should be invoked.
675  * @param env
676  *        Environment for the message.
677  *        Should stay available until the first call to notify_data.
678  *        Can be NULL if there are no modifiers or @a notify_mod is
679  *        provided instead.
680  * @param notify_mod
681  *        Function to call to obtain modifiers.
682  *        Can be NULL if there are no modifiers or @a env is provided instead.
683  * @param notify_data
684  *        Function to call to obtain fragments of the data.
685  * @param notify_cls
686  *        Closure for @a notify_mod and @a notify_data.
687  * @param flags
688  *        Flags for the message being transmitted.
689  *
690  * @return #GNUNET_OK if the transmission was started.
691  *         #GNUNET_SYSERR if another transmission is already going on.
692  */
693 int
694 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
695                               const char *method_name,
696                               const struct GNUNET_PSYC_Environment *env,
697                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
698                               GNUNET_PSYC_TransmitNotifyData notify_data,
699                               void *notify_cls,
700                               uint32_t flags)
701 {
702   if (GNUNET_NO != tmit->in_transmit)
703     return GNUNET_SYSERR;
704   tmit->in_transmit = GNUNET_YES;
705
706   size_t size = strlen (method_name) + 1;
707   struct GNUNET_PSYC_MessageMethod *pmeth;
708   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
709   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
710
711   if (NULL != notify_mod)
712   {
713     tmit->notify_mod = notify_mod;
714     tmit->notify_mod_cls = notify_cls;
715   }
716   else
717   {
718     tmit->notify_mod = &transmit_notify_env;
719     tmit->notify_mod_cls = tmit;
720     if (NULL != env)
721     {
722       struct GNUNET_PSYC_Modifier mod = {};
723       mod.next = GNUNET_PSYC_env_head (env);
724       tmit->mod = &mod;
725
726       struct GNUNET_PSYC_Modifier *m = tmit->mod;
727       while (NULL != (m = m->next))
728       {
729         if (m->oper != GNUNET_PSYC_OP_SET)
730           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
731       }
732     }
733     else
734     {
735       tmit->mod = NULL;
736     }
737   }
738
739   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
740   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
741   pmeth->header.size = htons (sizeof (*pmeth) + size);
742   pmeth->flags = htonl (flags);
743   memcpy (&pmeth[1], method_name, size);
744
745   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
746   tmit->notify_data = notify_data;
747   tmit->notify_data_cls = notify_cls;
748
749   transmit_mod (tmit);
750   return GNUNET_OK;
751 }
752
753
754 /**
755  * Resume transmission.
756  *
757  * @param tmit  Transmission handle.
758  */
759 void
760 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
761 {
762   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
763     return;
764
765   if (0 == tmit->acks_pending)
766   {
767     tmit->paused = GNUNET_NO;
768     transmit_data (tmit);
769   }
770 }
771
772
773 /**
774  * Abort transmission request.
775  *
776  * @param tmit  Transmission handle.
777  */
778 void
779 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
780 {
781   if (GNUNET_NO == tmit->in_transmit)
782     return;
783
784   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
785   tmit->in_transmit = GNUNET_NO;
786   tmit->paused = GNUNET_NO;
787
788   /* FIXME */
789   struct GNUNET_MessageHeader msg;
790   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
791   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
792   msg.size = htons (sizeof (msg));
793   transmit_queue_insert (tmit, &msg, GNUNET_YES);
794 }
795
796
797 /**
798  * Got acknowledgement of a transmitted message part, continue transmission.
799  *
800  * @param tmit  Transmission handle.
801  */
802 void
803 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
804 {
805   if (0 == tmit->acks_pending)
806   {
807     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
808     GNUNET_break (0);
809     return;
810   }
811   tmit->acks_pending--;
812
813   switch (tmit->state)
814   {
815   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
816   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
817     transmit_mod (tmit);
818     break;
819
820   case GNUNET_PSYC_MESSAGE_STATE_DATA:
821     transmit_data (tmit);
822     break;
823
824   case GNUNET_PSYC_MESSAGE_STATE_END:
825   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
826     break;
827
828   default:
829     LOG (GNUNET_ERROR_TYPE_DEBUG,
830          "Ignoring message ACK in state %u.\n", tmit->state);
831   }
832 }
833
834
835 /**** Receiving messages ****/
836
837
838 /**
839  * Create handle for receiving messages.
840  */
841 struct GNUNET_PSYC_ReceiveHandle *
842 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
843                             GNUNET_PSYC_MessagePartCallback message_part_cb,
844                             void *cb_cls)
845 {
846   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
847   recv->message_cb = message_cb;
848   recv->message_part_cb = message_part_cb;
849   recv->cb_cls = cb_cls;
850   return recv;
851 }
852
853
854 /**
855  * Destroy handle for receiving messages.
856  */
857 void
858 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
859 {
860   GNUNET_free (recv);
861 }
862
863
864 /**
865  * Reset stored data related to the last received message.
866  */
867 void
868 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
869 {
870   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
871   recv->flags = 0;
872   recv->message_id = 0;
873   recv->mod_value_size = 0;
874   recv->mod_value_size_expected = 0;
875 }
876
877
878 static void
879 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
880 {
881   if (NULL != recv->message_part_cb)
882     recv->message_part_cb (recv->cb_cls, NULL, NULL);
883
884   if (NULL != recv->message_cb)
885     recv->message_cb (recv->cb_cls, NULL);
886
887   GNUNET_PSYC_receive_reset (recv);
888 }
889
890
891 /**
892  * Handle incoming PSYC message.
893  *
894  * @param recv  Receive handle.
895  * @param msg   The message.
896  *
897  * @return #GNUNET_OK on success,
898  *         #GNUNET_SYSERR on receive error.
899  */
900 int
901 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
902                              const struct GNUNET_PSYC_MessageHeader *msg)
903 {
904   uint16_t size = ntohs (msg->header.size);
905   uint32_t flags = ntohl (msg->flags);
906
907   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
908                            (struct GNUNET_MessageHeader *) msg);
909
910   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
911   {
912     recv->message_id = GNUNET_ntohll (msg->message_id);
913     recv->flags = flags;
914     recv->slave_pub_key = msg->slave_pub_key;
915     recv->mod_value_size = 0;
916     recv->mod_value_size_expected = 0;
917   }
918   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
919   {
920     // FIXME
921     LOG (GNUNET_ERROR_TYPE_WARNING,
922          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
923          GNUNET_ntohll (msg->message_id), recv->message_id);
924     GNUNET_break_op (0);
925     recv_error (recv);
926     return GNUNET_SYSERR;
927   }
928   else if (flags != recv->flags)
929   {
930     LOG (GNUNET_ERROR_TYPE_WARNING,
931          "Unexpected message flags. Got: %lu, expected: %lu\n",
932          flags, recv->flags);
933     GNUNET_break_op (0);
934     recv_error (recv);
935     return GNUNET_SYSERR;
936   }
937
938   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
939
940   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
941   {
942     const struct GNUNET_MessageHeader *pmsg
943       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
944     psize = ntohs (pmsg->size);
945     ptype = ntohs (pmsg->type);
946     size_eq = size_min = 0;
947
948     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
949     {
950       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
951                   "Dropping message of type %u with invalid size %u.\n",
952                   ptype, psize);
953       recv_error (recv);
954       return GNUNET_SYSERR;
955     }
956
957     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958                 "Received message part of type %u and size %u from PSYC.\n",
959                 ptype, psize);
960     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
961
962     switch (ptype)
963     {
964     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
965       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
966       break;
967     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
968       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
969       break;
970     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
971     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
972       size_min = sizeof (struct GNUNET_MessageHeader);
973       break;
974     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
975     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
976       size_eq = sizeof (struct GNUNET_MessageHeader);
977       break;
978     default:
979       GNUNET_break_op (0);
980       recv_error (recv);
981       return GNUNET_SYSERR;
982     }
983
984     if (! ((0 < size_eq && psize == size_eq)
985            || (0 < size_min && size_min <= psize)))
986     {
987       GNUNET_break_op (0);
988       recv_error (recv);
989       return GNUNET_SYSERR;
990     }
991
992     switch (ptype)
993     {
994     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
995     {
996       struct GNUNET_PSYC_MessageMethod *meth
997         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
998
999       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1000       {
1001         LOG (GNUNET_ERROR_TYPE_WARNING,
1002              "Dropping out of order message method (%u).\n",
1003              recv->state);
1004         /* It is normal to receive an incomplete message right after connecting,
1005          * but should not happen later.
1006          * FIXME: add a check for this condition.
1007          */
1008         GNUNET_break_op (0);
1009         recv_error (recv);
1010         return GNUNET_SYSERR;
1011       }
1012
1013       if ('\0' != *((char *) meth + psize - 1))
1014       {
1015         LOG (GNUNET_ERROR_TYPE_WARNING,
1016              "Dropping message with malformed method. "
1017              "Message ID: %" PRIu64 "\n", recv->message_id);
1018         GNUNET_break_op (0);
1019         recv_error (recv);
1020         return GNUNET_SYSERR;
1021       }
1022       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1023       break;
1024     }
1025     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1026     {
1027       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1028             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1029             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1030       {
1031         LOG (GNUNET_ERROR_TYPE_WARNING,
1032              "Dropping out of order message modifier (%u).\n",
1033              recv->state);
1034         GNUNET_break_op (0);
1035         recv_error (recv);
1036         return GNUNET_SYSERR;
1037       }
1038
1039       struct GNUNET_PSYC_MessageModifier *mod
1040         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1041
1042       uint16_t name_size = ntohs (mod->name_size);
1043       recv->mod_value_size_expected = ntohl (mod->value_size);
1044       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1045
1046       if (psize < sizeof (*mod) + name_size
1047           || '\0' != *((char *) &mod[1] + name_size - 1)
1048           || recv->mod_value_size_expected < recv->mod_value_size)
1049       {
1050         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1051         GNUNET_break_op (0);
1052         recv_error (recv);
1053         return GNUNET_SYSERR;
1054       }
1055       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1056       break;
1057     }
1058     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1059     {
1060       recv->mod_value_size += psize - sizeof (*pmsg);
1061
1062       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1063             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1064           || recv->mod_value_size_expected < recv->mod_value_size)
1065       {
1066         LOG (GNUNET_ERROR_TYPE_WARNING,
1067              "Dropping out of order message modifier continuation "
1068              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1069              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1070              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1071              recv->mod_value_size_expected, recv->mod_value_size);
1072         GNUNET_break_op (0);
1073         recv_error (recv);
1074         return GNUNET_SYSERR;
1075       }
1076       break;
1077     }
1078     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1079     {
1080       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1081           || recv->mod_value_size_expected != recv->mod_value_size)
1082       {
1083         LOG (GNUNET_ERROR_TYPE_WARNING,
1084              "Dropping out of order message data fragment "
1085              "(%u < %u || %lu != %lu).\n",
1086              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1087              recv->mod_value_size_expected, recv->mod_value_size);
1088
1089         GNUNET_break_op (0);
1090         recv_error (recv);
1091         return GNUNET_SYSERR;
1092       }
1093       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1094       break;
1095     }
1096     }
1097
1098     if (NULL != recv->message_part_cb)
1099       recv->message_part_cb (recv->cb_cls, msg, pmsg);
1100
1101     switch (ptype)
1102     {
1103     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1104     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1105       GNUNET_PSYC_receive_reset (recv);
1106       break;
1107     }
1108   }
1109
1110   if (NULL != recv->message_cb)
1111     recv->message_cb (recv->cb_cls, msg);
1112   return GNUNET_OK;
1113 }
1114
1115
1116 /**
1117  * Check if @a data contains a series of valid message parts.
1118  *
1119  * @param      data_size    Size of @a data.
1120  * @param      data         Data.
1121  * @param[out] first_ptype  Type of first message part.
1122  * @param[out] last_ptype   Type of last message part.
1123  *
1124  * @return Number of message parts found in @a data.
1125  *         or GNUNET_SYSERR if the message contains invalid parts.
1126  */
1127 int
1128 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1129                                  uint16_t *first_ptype, uint16_t *last_ptype)
1130 {
1131   const struct GNUNET_MessageHeader *pmsg;
1132   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1133   if (NULL != first_ptype)
1134     *first_ptype = 0;
1135   if (NULL != last_ptype)
1136     *last_ptype = 0;
1137
1138   for (pos = 0; pos < data_size; pos += psize, parts++)
1139   {
1140     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1141     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1142     psize = ntohs (pmsg->size);
1143     ptype = ntohs (pmsg->type);
1144     if (0 == parts && NULL != first_ptype)
1145       *first_ptype = ptype;
1146     if (NULL != last_ptype
1147         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1148       *last_ptype = ptype;
1149     if (psize < sizeof (*pmsg)
1150         || pos + psize > data_size
1151         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1152         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1153     {
1154       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1155                   "Invalid message part of type %u and size %u.\n",
1156                   ptype, psize);
1157       return GNUNET_SYSERR;
1158     }
1159     /** @todo FIXME: check message part order */
1160   }
1161   return parts;
1162 }
1163
1164
1165 struct ParseMessageClosure
1166 {
1167   struct GNUNET_PSYC_Environment *env;
1168   const char **method_name;
1169   const void **data;
1170   uint16_t *data_size;
1171   enum GNUNET_PSYC_MessageState msg_state;
1172 };
1173
1174
1175 static void
1176 parse_message_part_cb (void *cls,
1177                        const struct GNUNET_PSYC_MessageHeader *msg,
1178                        const struct GNUNET_MessageHeader *pmsg)
1179 {
1180   struct ParseMessageClosure *pmc = cls;
1181   if (NULL == pmsg)
1182   {
1183     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1184     return;
1185   }
1186
1187   switch (ntohs (pmsg->type))
1188   {
1189   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1190   {
1191     struct GNUNET_PSYC_MessageMethod *
1192       pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1193     *pmc->method_name = (const char *) &pmeth[1];
1194     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1195     break;
1196   }
1197
1198   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1199   {
1200     struct GNUNET_PSYC_MessageModifier *
1201       pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1202
1203     const char *name = (const char *) &pmod[1];
1204     const void *value = name + ntohs (pmod->name_size);
1205     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1206                          ntohl (pmod->value_size));
1207     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1208     break;
1209   }
1210
1211   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1212     *pmc->data = &pmsg[1];
1213     *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1214     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1215     break;
1216
1217   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1218     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1219     break;
1220
1221   default:
1222     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1223   }
1224 }
1225
1226
1227 /**
1228  * Parse PSYC message.
1229  *
1230  * @param msg
1231  *        The PSYC message to parse.
1232  * @param[out] method_name
1233  *        Pointer to the method name inside @a pmsg.
1234  * @param env
1235  *        The environment for the message with a list of modifiers.
1236  * @param[out] data
1237  *        Pointer to data inside @a msg.
1238  * @param[out] data_size
1239  *        Size of @data is written here.
1240  *
1241  * @return #GNUNET_OK on success,
1242  *         #GNUNET_SYSERR on parse error.
1243  */
1244 int
1245 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1246                            const char **method_name,
1247                            struct GNUNET_PSYC_Environment *env,
1248                            const void **data,
1249                            uint16_t *data_size)
1250 {
1251   struct ParseMessageClosure cls;
1252   cls.env = env;
1253   cls.method_name = method_name;
1254   cls.data = data;
1255   cls.data_size = data_size;
1256
1257   struct GNUNET_PSYC_ReceiveHandle *
1258     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1259   int ret = GNUNET_PSYC_receive_message (recv, msg);
1260   GNUNET_PSYC_receive_destroy (recv);
1261
1262   if (GNUNET_OK != ret)
1263     return GNUNET_SYSERR;
1264
1265   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1266     ? GNUNET_OK
1267     : GNUNET_NO;
1268 }
1269
1270
1271 /**
1272  * Initialize PSYC message header.
1273  */
1274 void
1275 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1276                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1277                                  uint32_t flags)
1278 {
1279   uint16_t size = ntohs (mmsg->header.size);
1280   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1281
1282   pmsg->header.size = htons (psize);
1283   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1284   pmsg->message_id = mmsg->message_id;
1285   pmsg->fragment_offset = mmsg->fragment_offset;
1286   pmsg->flags = htonl (flags);
1287
1288   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1289 }
1290
1291
1292 /**
1293  * Create a new PSYC message header from a multicast message.
1294  */
1295 struct GNUNET_PSYC_MessageHeader *
1296 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1297                                    uint32_t flags)
1298 {
1299   struct GNUNET_PSYC_MessageHeader *pmsg;
1300   uint16_t size = ntohs (mmsg->header.size);
1301   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1302
1303   pmsg = GNUNET_malloc (psize);
1304   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1305   return pmsg;
1306 }
1307
1308
1309 /**
1310  * Create a new PSYC message header from a PSYC message.
1311  */
1312 struct GNUNET_PSYC_MessageHeader *
1313 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1314 {
1315   uint16_t msg_size = ntohs (msg->header.size);
1316   struct GNUNET_PSYC_MessageHeader *
1317     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1318   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1319   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1320   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1321   return pmsg;
1322 }