fixing compiler warnings
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37 struct GNUNET_PSYC_TransmitHandle
38 {
39   /**
40    * Client connection to service.
41    */
42   struct GNUNET_CLIENT_MANAGER_Connection *client;
43
44   /**
45    * Message currently being received from the client.
46    */
47   struct GNUNET_MessageHeader *msg;
48
49   /**
50    * Callback to request next modifier from client.
51    */
52   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
53
54   /**
55    * Closure for the notify callbacks.
56    */
57   void *notify_mod_cls;
58
59   /**
60    * Callback to request next data fragment from client.
61    */
62   GNUNET_PSYC_TransmitNotifyData notify_data;
63
64   /**
65    * Closure for the notify callbacks.
66    */
67   void *notify_data_cls;
68
69   /**
70    * Modifier of the environment that is currently being transmitted.
71    */
72   struct GNUNET_PSYC_Modifier *mod;
73
74   /**
75    *
76    */
77   const char *mod_value;
78
79   /**
80    * Number of bytes remaining to be transmitted from the current modifier value.
81    */
82   uint32_t mod_value_remaining;
83
84   /**
85    * State of the current message being received from client.
86    */
87   enum GNUNET_PSYC_MessageState state;
88
89   /**
90    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
91    */
92   uint8_t acks_pending;
93
94   /**
95    * Is transmission paused?
96    */
97   uint8_t paused;
98
99   /**
100    * Are we currently transmitting a message?
101    */
102   uint8_t in_transmit;
103
104   /**
105    * Notify callback is currently being called.
106    */
107   uint8_t in_notify;
108
109 };
110
111
112
113 struct GNUNET_PSYC_ReceiveHandle
114 {
115   /**
116    * Message callback.
117    */
118   GNUNET_PSYC_MessageCallback message_cb;
119
120   /**
121    * Message part callback.
122    */
123   GNUNET_PSYC_MessagePartCallback message_part_cb;
124
125   /**
126    * Closure for the callbacks.
127    */
128   void *cb_cls;
129
130   /**
131    * ID of the message being received from the PSYC service.
132    */
133   uint64_t message_id;
134
135   /**
136    * Public key of the slave from which a message is being received.
137    */
138   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
139
140   /**
141    * State of the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageState state;
144
145   /**
146    * Flags for the currently being received message from the PSYC service.
147    */
148   enum GNUNET_PSYC_MessageFlags flags;
149
150   /**
151    * Expected value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size_expected;
154
155   /**
156    * Actual value size for the modifier being received from the PSYC service.
157    */
158   uint32_t mod_value_size;
159 };
160
161
162 /**** Messages ****/
163
164
165 /**
166  * Create a PSYC message.
167  *
168  * @param method_name
169  *        PSYC method for the message.
170  * @param env
171  *        Environment for the message.
172  * @param data
173  *        Data payload for the message.
174  * @param data_size
175  *        Size of @a data.
176  *
177  * @return Message header with size information,
178  *         followed by the message parts.
179  */
180 struct GNUNET_PSYC_Message *
181 GNUNET_PSYC_message_create (const char *method_name,
182                             const struct GNUNET_PSYC_Environment *env,
183                             const void *data,
184                             size_t data_size)
185 {
186   struct GNUNET_PSYC_Modifier *mod = NULL;
187   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
188   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
189   struct GNUNET_MessageHeader *pmsg = NULL;
190   uint16_t env_size = 0;
191   if (NULL != env)
192   {
193     mod = GNUNET_PSYC_env_head (env);
194     while (NULL != mod)
195     {
196       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
197       mod = mod->next;
198     }
199   }
200
201   struct GNUNET_PSYC_Message *msg;
202   uint16_t method_name_size = strlen (method_name) + 1;
203   if (method_name_size == 1)
204     return NULL;
205
206   uint16_t msg_size = sizeof (*msg)                      /* header */
207     + sizeof (*pmeth) + method_name_size                 /* method */
208     + env_size                                           /* modifiers */
209     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
210     + sizeof (*pmsg);                                    /* end of message */
211   msg = GNUNET_malloc (msg_size);
212   msg->header.size = htons (msg_size);
213   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
214
215   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
216   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
217   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
218   memcpy (&pmeth[1], method_name, method_name_size);
219
220   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
221   if (NULL != env)
222   {
223     mod = GNUNET_PSYC_env_head (env);
224     while (NULL != mod)
225     {
226       uint16_t mod_name_size = strlen (mod->name) + 1;
227       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
228       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
229       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
230       p += pmod->header.size;
231       pmod->header.size = htons (pmod->header.size);
232
233       pmod->oper = mod->oper;
234       pmod->name_size = htons (mod_name_size);
235       pmod->value_size = htonl (mod->value_size);
236
237       memcpy (&pmod[1], mod->name, mod_name_size);
238       if (0 < mod->value_size)
239         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
240
241       mod = mod->next;
242     }
243   }
244
245   if (0 < data_size)
246   {
247     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
248     pmsg->size = sizeof (*pmsg) + data_size;
249     p += pmsg->size;
250     pmsg->size = htons (pmsg->size);
251     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
252     memcpy (&pmsg[1], data, data_size);
253   }
254
255   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
256   pmsg->size = htons (sizeof (*pmsg));
257   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
258
259   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
260   return msg;
261 }
262
263
264 void
265 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
266                          const struct GNUNET_MessageHeader *msg)
267 {
268   uint16_t size = ntohs (msg->size);
269   uint16_t type = ntohs (msg->type);
270   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
271   switch (type)
272   {
273   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
274   {
275     struct GNUNET_PSYC_MessageHeader *pmsg
276       = (struct GNUNET_PSYC_MessageHeader *) msg;
277     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
278                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
279     break;
280   }
281   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
282   {
283     struct GNUNET_PSYC_MessageMethod *meth
284       = (struct GNUNET_PSYC_MessageMethod *) msg;
285     GNUNET_log (kind,
286                 "\t%.*s\n",
287                 (int) (size - sizeof (*meth)),
288                 &meth[1]);
289     break;
290   }
291   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
292   {
293     struct GNUNET_PSYC_MessageModifier *mod
294       = (struct GNUNET_PSYC_MessageModifier *) msg;
295     uint16_t name_size = ntohs (mod->name_size);
296     char oper = ' ' < mod->oper ? mod->oper : ' ';
297     GNUNET_log (kind,
298                 "\t%c%.*s\t%.*s\n",
299                 oper,
300                 (int) name_size,
301                 &mod[1],
302                 (int) (size - sizeof (*mod) - name_size),
303                 ((char *) &mod[1]) + name_size);
304     break;
305   }
306   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
307   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
308     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
309     break;
310   }
311 }
312
313
314 /**** Transmitting messages ****/
315
316
317 /**
318  * Create a transmission handle.
319  */
320 struct GNUNET_PSYC_TransmitHandle *
321 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
322 {
323   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
324   tmit->client = client;
325   return tmit;
326 }
327
328
329 /**
330  * Destroy a transmission handle.
331  */
332 void
333 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
334 {
335   GNUNET_free (tmit);
336 }
337
338
339 /**
340  * Queue a message part for transmission.
341  *
342  * The message part is added to the current message buffer.
343  * When this buffer is full, it is added to the transmission queue.
344  *
345  * @param tmit
346  *        Transmission handle.
347  * @param msg
348  *        Message part, or NULL.
349  * @param tmit_now
350  *        Transmit message now, or wait for buffer to fill up?
351  *        #GNUNET_YES or #GNUNET_NO.
352  */
353 static void
354 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
355                        const struct GNUNET_MessageHeader *msg,
356                        uint8_t tmit_now)
357 {
358   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
359
360   LOG (GNUNET_ERROR_TYPE_DEBUG,
361        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
362        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
363
364   if (NULL != tmit->msg)
365   {
366     if (NULL == msg
367         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
368     {
369       /* End of message or buffer is full, add it to transmission queue
370        * and start with empty buffer */
371       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
372       tmit->msg->size = htons (tmit->msg->size);
373       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
374       tmit->msg = NULL;
375       tmit->acks_pending++;
376     }
377     else
378     {
379       /* Message fits in current buffer, append */
380       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
381       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
382       tmit->msg->size += size;
383     }
384   }
385
386   if (NULL == tmit->msg && NULL != msg)
387   {
388     /* Empty buffer, copy over message. */
389     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
390     tmit->msg->size = sizeof (*tmit->msg) + size;
391     memcpy (&tmit->msg[1], msg, size);
392   }
393
394   if (NULL != tmit->msg
395       && (GNUNET_YES == tmit_now
396           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
398   {
399     /* End of message or buffer is full, add it to transmission queue. */
400     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
401     tmit->msg->size = htons (tmit->msg->size);
402     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
403     tmit->msg = NULL;
404     tmit->acks_pending++;
405   }
406 }
407
408
409 /**
410  * Request data from client to transmit.
411  *
412  * @param tmit  Transmission handle.
413  */
414 static void
415 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
416 {
417   int notify_ret = GNUNET_YES;
418   uint16_t data_size = 0;
419   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
420   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
421   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
422
423   if (NULL != tmit->notify_data)
424   {
425     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
426     tmit->in_notify = GNUNET_YES;
427     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
428     tmit->in_notify = GNUNET_NO;
429   }
430   LOG (GNUNET_ERROR_TYPE_DEBUG,
431        "transmit_data (ret: %d, size: %u): %.*s\n",
432        notify_ret, data_size, data_size, &msg[1]);
433   switch (notify_ret)
434   {
435   case GNUNET_NO:
436     if (0 == data_size)
437     {
438       /* Transmission paused, nothing to send. */
439       tmit->paused = GNUNET_YES;
440       return;
441     }
442     break;
443
444   case GNUNET_YES:
445     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
446     break;
447
448   default:
449     LOG (GNUNET_ERROR_TYPE_ERROR,
450          "TransmitNotifyData callback returned error when requesting data.\n");
451
452     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
453     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
454     msg->size = htons (sizeof (*msg));
455     transmit_queue_insert (tmit, msg, GNUNET_YES);
456     tmit->in_transmit = GNUNET_NO;
457     return;
458   }
459
460   if (0 < data_size)
461   {
462     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
463     msg->size = htons (sizeof (*msg) + data_size);
464     transmit_queue_insert (tmit, msg, !notify_ret);
465   }
466
467   /* End of message. */
468   if (GNUNET_YES == notify_ret)
469   {
470     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
471     msg->size = htons (sizeof (*msg));
472     transmit_queue_insert (tmit, msg, GNUNET_YES);
473     /* FIXME: wait for ACK before setting in_transmit to no */
474     tmit->in_transmit = GNUNET_NO;
475   }
476 }
477
478
479 /**
480  * Request a modifier from a client to transmit.
481  *
482  * @param tmit  Transmission handle.
483  */
484 static void
485 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
486 {
487   uint16_t max_data_size = 0;
488   uint16_t data_size = 0;
489   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
490   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
491   int notify_ret = GNUNET_YES;
492
493   switch (tmit->state)
494   {
495   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
496   {
497     struct GNUNET_PSYC_MessageModifier *mod
498       = (struct GNUNET_PSYC_MessageModifier *) msg;
499     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
500     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
501
502     if (NULL != tmit->notify_mod)
503     {
504       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
505       data_size = max_data_size;
506       tmit->in_notify = GNUNET_YES;
507       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
508                                      &mod->oper, &mod->value_size);
509       tmit->in_notify = GNUNET_NO;
510     }
511
512     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
513     LOG (GNUNET_ERROR_TYPE_DEBUG,
514          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
515          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
516     if (mod->name_size < data_size)
517     {
518       tmit->mod_value_remaining
519         = mod->value_size - (data_size - mod->name_size);
520       mod->value_size = htonl (mod->value_size);
521       mod->name_size = htons (mod->name_size);
522     }
523     else if (0 < data_size)
524     {
525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
526       notify_ret = GNUNET_SYSERR;
527     }
528     break;
529   }
530   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
531   {
532     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
533     msg->size = sizeof (struct GNUNET_MessageHeader);
534
535     if (NULL != tmit->notify_mod)
536     {
537       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
538       data_size = max_data_size;
539       tmit->in_notify = GNUNET_YES;
540       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
541                                      &data_size, &msg[1], NULL, NULL);
542       tmit->in_notify = GNUNET_NO;
543     }
544     tmit->mod_value_remaining -= data_size;
545     LOG (GNUNET_ERROR_TYPE_DEBUG,
546          "transmit_mod (ret: %d, size: %u): %.*s\n",
547          notify_ret, data_size, data_size, &msg[1]);
548     break;
549   }
550   default:
551     GNUNET_assert (0);
552   }
553
554   switch (notify_ret)
555   {
556   case GNUNET_NO:
557     if (0 == data_size)
558     { /* Transmission paused, nothing to send. */
559       tmit->paused = GNUNET_YES;
560       return;
561     }
562     tmit->state
563       = (0 == tmit->mod_value_remaining)
564       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
565       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
566     break;
567
568   case GNUNET_YES: /* End of modifiers. */
569     GNUNET_assert (0 == tmit->mod_value_remaining);
570     break;
571
572   default:
573     LOG (GNUNET_ERROR_TYPE_ERROR,
574          "TransmitNotifyModifier callback returned with error.\n");
575
576     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
577     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
578     msg->size = htons (sizeof (*msg));
579     transmit_queue_insert (tmit, msg, GNUNET_YES);
580     tmit->in_transmit = GNUNET_NO;
581     return;
582   }
583
584   if (0 < data_size)
585   {
586     GNUNET_assert (data_size <= max_data_size);
587     msg->size = htons (msg->size + data_size);
588     transmit_queue_insert (tmit, msg, GNUNET_NO);
589   }
590
591   if (GNUNET_YES == notify_ret)
592   {
593     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
594     if (0 == tmit->acks_pending)
595       transmit_data (tmit);
596   }
597   else
598   {
599     transmit_mod (tmit);
600   }
601 }
602
603
604 int
605 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
606                      uint32_t *full_value_size)
607
608 {
609   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
610   uint16_t name_size = 0;
611   uint32_t value_size = 0;
612   const char *value = NULL;
613
614   if (NULL != oper)
615   { /* New modifier */
616     if (NULL != tmit->mod)
617       tmit->mod = tmit->mod->next;
618     if (NULL == tmit->mod)
619     { /* No more modifiers, continue with data */
620       *data_size = 0;
621       return GNUNET_YES;
622     }
623
624     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
625     *full_value_size = tmit->mod->value_size;
626     *oper = tmit->mod->oper;
627     name_size = strlen (tmit->mod->name) + 1;
628
629     if (name_size + tmit->mod->value_size <= *data_size)
630     {
631       value_size = tmit->mod->value_size;
632       *data_size = name_size + value_size;
633     }
634     else /* full modifier does not fit in data, continuation needed */
635     {
636       value_size = *data_size - name_size;
637       tmit->mod_value = tmit->mod->value + value_size;
638     }
639
640     memcpy (data, tmit->mod->name, name_size);
641     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
642     return GNUNET_NO;
643   }
644   else
645   { /* Modifier continuation */
646     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
647     value = tmit->mod_value;
648     if (tmit->mod_value_remaining <= *data_size)
649     {
650       value_size = tmit->mod_value_remaining;
651       tmit->mod_value = NULL;
652     }
653     else
654     {
655       value_size = *data_size;
656       tmit->mod_value += value_size;
657     }
658
659     if (*data_size < value_size)
660     {
661       LOG (GNUNET_ERROR_TYPE_DEBUG,
662            "Value in environment larger than buffer: %u < %zu\n",
663            *data_size, value_size);
664       *data_size = 0;
665       return GNUNET_NO;
666     }
667
668     *data_size = value_size;
669     memcpy (data, value, value_size);
670     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
671   }
672 }
673
674
675 /**
676  * Transmit a message.
677  *
678  * @param tmit
679  *        Transmission handle.
680  * @param method_name
681  *        Which method should be invoked.
682  * @param env
683  *        Environment for the message.
684  *        Should stay available until the first call to notify_data.
685  *        Can be NULL if there are no modifiers or @a notify_mod is
686  *        provided instead.
687  * @param notify_mod
688  *        Function to call to obtain modifiers.
689  *        Can be NULL if there are no modifiers or @a env is provided instead.
690  * @param notify_data
691  *        Function to call to obtain fragments of the data.
692  * @param notify_cls
693  *        Closure for @a notify_mod and @a notify_data.
694  * @param flags
695  *        Flags for the message being transmitted.
696  *
697  * @return #GNUNET_OK if the transmission was started.
698  *         #GNUNET_SYSERR if another transmission is already going on.
699  */
700 int
701 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
702                               const char *method_name,
703                               const struct GNUNET_PSYC_Environment *env,
704                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
705                               GNUNET_PSYC_TransmitNotifyData notify_data,
706                               void *notify_cls,
707                               uint32_t flags)
708 {
709   if (GNUNET_NO != tmit->in_transmit)
710     return GNUNET_SYSERR;
711   tmit->in_transmit = GNUNET_YES;
712
713   size_t size = strlen (method_name) + 1;
714   struct GNUNET_PSYC_MessageMethod *pmeth;
715   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
716   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
717
718   if (NULL != notify_mod)
719   {
720     tmit->notify_mod = notify_mod;
721     tmit->notify_mod_cls = notify_cls;
722   }
723   else
724   {
725     tmit->notify_mod = &transmit_notify_env;
726     tmit->notify_mod_cls = tmit;
727     if (NULL != env)
728     {
729       struct GNUNET_PSYC_Modifier mod = {};
730       mod.next = GNUNET_PSYC_env_head (env);
731       tmit->mod = &mod;
732
733       struct GNUNET_PSYC_Modifier *m = tmit->mod;
734       while (NULL != (m = m->next))
735       {
736         if (m->oper != GNUNET_PSYC_OP_SET)
737           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
738       }
739     }
740     else
741     {
742       tmit->mod = NULL;
743     }
744   }
745
746   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
747   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
748   pmeth->header.size = htons (sizeof (*pmeth) + size);
749   pmeth->flags = htonl (flags);
750   memcpy (&pmeth[1], method_name, size);
751
752   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
753   tmit->notify_data = notify_data;
754   tmit->notify_data_cls = notify_cls;
755
756   transmit_mod (tmit);
757   return GNUNET_OK;
758 }
759
760
761 /**
762  * Resume transmission.
763  *
764  * @param tmit  Transmission handle.
765  */
766 void
767 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
768 {
769   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
770     return;
771
772   if (0 == tmit->acks_pending)
773   {
774     tmit->paused = GNUNET_NO;
775     transmit_data (tmit);
776   }
777 }
778
779
780 /**
781  * Abort transmission request.
782  *
783  * @param tmit  Transmission handle.
784  */
785 void
786 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
787 {
788   if (GNUNET_NO == tmit->in_transmit)
789     return;
790
791   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
792   tmit->in_transmit = GNUNET_NO;
793   tmit->paused = GNUNET_NO;
794
795   /* FIXME */
796   struct GNUNET_MessageHeader msg;
797   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
798   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
799   msg.size = htons (sizeof (msg));
800   transmit_queue_insert (tmit, &msg, GNUNET_YES);
801 }
802
803
804 /**
805  * Got acknowledgement of a transmitted message part, continue transmission.
806  *
807  * @param tmit  Transmission handle.
808  */
809 void
810 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
811 {
812   if (0 == tmit->acks_pending)
813   {
814     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
815     GNUNET_break (0);
816     return;
817   }
818   tmit->acks_pending--;
819
820   switch (tmit->state)
821   {
822   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
823   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
824     transmit_mod (tmit);
825     break;
826
827   case GNUNET_PSYC_MESSAGE_STATE_DATA:
828     transmit_data (tmit);
829     break;
830
831   case GNUNET_PSYC_MESSAGE_STATE_END:
832   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
833     break;
834
835   default:
836     LOG (GNUNET_ERROR_TYPE_DEBUG,
837          "Ignoring message ACK in state %u.\n", tmit->state);
838   }
839 }
840
841
842 /**** Receiving messages ****/
843
844
845 /**
846  * Create handle for receiving messages.
847  */
848 struct GNUNET_PSYC_ReceiveHandle *
849 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
850                             GNUNET_PSYC_MessagePartCallback message_part_cb,
851                             void *cb_cls)
852 {
853   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
854   recv->message_cb = message_cb;
855   recv->message_part_cb = message_part_cb;
856   recv->cb_cls = cb_cls;
857   return recv;
858 }
859
860
861 /**
862  * Destroy handle for receiving messages.
863  */
864 void
865 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
866 {
867   GNUNET_free (recv);
868 }
869
870
871 /**
872  * Reset stored data related to the last received message.
873  */
874 void
875 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
876 {
877   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
878   recv->flags = 0;
879   recv->message_id = 0;
880   recv->mod_value_size = 0;
881   recv->mod_value_size_expected = 0;
882 }
883
884
885 static void
886 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
887 {
888   if (NULL != recv->message_part_cb)
889     recv->message_part_cb (recv->cb_cls, NULL, NULL);
890
891   if (NULL != recv->message_cb)
892     recv->message_cb (recv->cb_cls, NULL);
893
894   GNUNET_PSYC_receive_reset (recv);
895 }
896
897
898 /**
899  * Handle incoming PSYC message.
900  *
901  * @param recv  Receive handle.
902  * @param msg   The message.
903  *
904  * @return #GNUNET_OK on success,
905  *         #GNUNET_SYSERR on receive error.
906  */
907 int
908 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
909                              const struct GNUNET_PSYC_MessageHeader *msg)
910 {
911   uint16_t size = ntohs (msg->header.size);
912   uint32_t flags = ntohl (msg->flags);
913
914   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
915                            (struct GNUNET_MessageHeader *) msg);
916
917   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
918   {
919     recv->message_id = GNUNET_ntohll (msg->message_id);
920     recv->flags = flags;
921     recv->slave_pub_key = msg->slave_pub_key;
922     recv->mod_value_size = 0;
923     recv->mod_value_size_expected = 0;
924   }
925   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
926   {
927     // FIXME
928     LOG (GNUNET_ERROR_TYPE_WARNING,
929          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
930          GNUNET_ntohll (msg->message_id), recv->message_id);
931     GNUNET_break_op (0);
932     recv_error (recv);
933     return GNUNET_SYSERR;
934   }
935   else if (flags != recv->flags)
936   {
937     LOG (GNUNET_ERROR_TYPE_WARNING,
938          "Unexpected message flags. Got: %lu, expected: %lu\n",
939          flags, recv->flags);
940     GNUNET_break_op (0);
941     recv_error (recv);
942     return GNUNET_SYSERR;
943   }
944
945   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
946
947   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
948   {
949     const struct GNUNET_MessageHeader *pmsg
950       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
951     psize = ntohs (pmsg->size);
952     ptype = ntohs (pmsg->type);
953     size_eq = size_min = 0;
954
955     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
956     {
957       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
958                   "Dropping message of type %u with invalid size %u.\n",
959                   ptype, psize);
960       recv_error (recv);
961       return GNUNET_SYSERR;
962     }
963
964     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965                 "Received message part of type %u and size %u from PSYC.\n",
966                 ptype, psize);
967     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
968
969     switch (ptype)
970     {
971     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
972       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
973       break;
974     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
975       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
976       break;
977     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
978     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
979       size_min = sizeof (struct GNUNET_MessageHeader);
980       break;
981     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
982     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
983       size_eq = sizeof (struct GNUNET_MessageHeader);
984       break;
985     default:
986       GNUNET_break_op (0);
987       recv_error (recv);
988       return GNUNET_SYSERR;
989     }
990
991     if (! ((0 < size_eq && psize == size_eq)
992            || (0 < size_min && size_min <= psize)))
993     {
994       GNUNET_break_op (0);
995       recv_error (recv);
996       return GNUNET_SYSERR;
997     }
998
999     switch (ptype)
1000     {
1001     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1002     {
1003       struct GNUNET_PSYC_MessageMethod *meth
1004         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1005
1006       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1007       {
1008         LOG (GNUNET_ERROR_TYPE_WARNING,
1009              "Dropping out of order message method (%u).\n",
1010              recv->state);
1011         /* It is normal to receive an incomplete message right after connecting,
1012          * but should not happen later.
1013          * FIXME: add a check for this condition.
1014          */
1015         GNUNET_break_op (0);
1016         recv_error (recv);
1017         return GNUNET_SYSERR;
1018       }
1019
1020       if ('\0' != *((char *) meth + psize - 1))
1021       {
1022         LOG (GNUNET_ERROR_TYPE_WARNING,
1023              "Dropping message with malformed method. "
1024              "Message ID: %" PRIu64 "\n", recv->message_id);
1025         GNUNET_break_op (0);
1026         recv_error (recv);
1027         return GNUNET_SYSERR;
1028       }
1029       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1030       break;
1031     }
1032     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1033     {
1034       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1035             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1036             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1037       {
1038         LOG (GNUNET_ERROR_TYPE_WARNING,
1039              "Dropping out of order message modifier (%u).\n",
1040              recv->state);
1041         GNUNET_break_op (0);
1042         recv_error (recv);
1043         return GNUNET_SYSERR;
1044       }
1045
1046       struct GNUNET_PSYC_MessageModifier *mod
1047         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1048
1049       uint16_t name_size = ntohs (mod->name_size);
1050       recv->mod_value_size_expected = ntohl (mod->value_size);
1051       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1052
1053       if (psize < sizeof (*mod) + name_size
1054           || '\0' != *((char *) &mod[1] + name_size - 1)
1055           || recv->mod_value_size_expected < recv->mod_value_size)
1056       {
1057         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1058         GNUNET_break_op (0);
1059         recv_error (recv);
1060         return GNUNET_SYSERR;
1061       }
1062       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1063       break;
1064     }
1065     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1066     {
1067       recv->mod_value_size += psize - sizeof (*pmsg);
1068
1069       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1070             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1071           || recv->mod_value_size_expected < recv->mod_value_size)
1072       {
1073         LOG (GNUNET_ERROR_TYPE_WARNING,
1074              "Dropping out of order message modifier continuation "
1075              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1076              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1077              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1078              recv->mod_value_size_expected, recv->mod_value_size);
1079         GNUNET_break_op (0);
1080         recv_error (recv);
1081         return GNUNET_SYSERR;
1082       }
1083       break;
1084     }
1085     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1086     {
1087       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1088           || recv->mod_value_size_expected != recv->mod_value_size)
1089       {
1090         LOG (GNUNET_ERROR_TYPE_WARNING,
1091              "Dropping out of order message data fragment "
1092              "(%u < %u || %lu != %lu).\n",
1093              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1094              recv->mod_value_size_expected, recv->mod_value_size);
1095
1096         GNUNET_break_op (0);
1097         recv_error (recv);
1098         return GNUNET_SYSERR;
1099       }
1100       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1101       break;
1102     }
1103     }
1104
1105     if (NULL != recv->message_part_cb)
1106       recv->message_part_cb (recv->cb_cls, msg, pmsg);
1107
1108     switch (ptype)
1109     {
1110     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1111     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1112       GNUNET_PSYC_receive_reset (recv);
1113       break;
1114     }
1115   }
1116
1117   if (NULL != recv->message_cb)
1118     recv->message_cb (recv->cb_cls, msg);
1119   return GNUNET_OK;
1120 }
1121
1122
1123 /**
1124  * Check if @a data contains a series of valid message parts.
1125  *
1126  * @param      data_size    Size of @a data.
1127  * @param      data         Data.
1128  * @param[out] first_ptype  Type of first message part.
1129  * @param[out] last_ptype   Type of last message part.
1130  *
1131  * @return Number of message parts found in @a data.
1132  *         or GNUNET_SYSERR if the message contains invalid parts.
1133  */
1134 int
1135 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1136                                  uint16_t *first_ptype, uint16_t *last_ptype)
1137 {
1138   const struct GNUNET_MessageHeader *pmsg;
1139   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1140   if (NULL != first_ptype)
1141     *first_ptype = 0;
1142   if (NULL != last_ptype)
1143     *last_ptype = 0;
1144
1145   for (pos = 0; pos < data_size; pos += psize, parts++)
1146   {
1147     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1148     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1149     psize = ntohs (pmsg->size);
1150     ptype = ntohs (pmsg->type);
1151     if (0 == parts && NULL != first_ptype)
1152       *first_ptype = ptype;
1153     if (NULL != last_ptype
1154         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1155       *last_ptype = ptype;
1156     if (psize < sizeof (*pmsg)
1157         || pos + psize > data_size
1158         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1159         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1160     {
1161       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1162                   "Invalid message part of type %u and size %u.\n",
1163                   ptype, psize);
1164       return GNUNET_SYSERR;
1165     }
1166     /** @todo FIXME: check message part order */
1167   }
1168   return parts;
1169 }
1170
1171
1172 struct ParseMessageClosure
1173 {
1174   struct GNUNET_PSYC_Environment *env;
1175   const char **method_name;
1176   const void **data;
1177   uint16_t *data_size;
1178   enum GNUNET_PSYC_MessageState msg_state;
1179 };
1180
1181
1182 static void
1183 parse_message_part_cb (void *cls,
1184                        const struct GNUNET_PSYC_MessageHeader *msg,
1185                        const struct GNUNET_MessageHeader *pmsg)
1186 {
1187   struct ParseMessageClosure *pmc = cls;
1188   if (NULL == pmsg)
1189   {
1190     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1191     return;
1192   }
1193
1194   switch (ntohs (pmsg->type))
1195   {
1196   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1197   {
1198     struct GNUNET_PSYC_MessageMethod *
1199       pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1200     *pmc->method_name = (const char *) &pmeth[1];
1201     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1202     break;
1203   }
1204
1205   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1206   {
1207     struct GNUNET_PSYC_MessageModifier *
1208       pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1209
1210     const char *name = (const char *) &pmod[1];
1211     const void *value = name + ntohs (pmod->name_size);
1212     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1213                          ntohl (pmod->value_size));
1214     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1215     break;
1216   }
1217
1218   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1219     *pmc->data = &pmsg[1];
1220     *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1221     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1222     break;
1223
1224   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1225     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1226     break;
1227
1228   default:
1229     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1230   }
1231 }
1232
1233
1234 /**
1235  * Parse PSYC message.
1236  *
1237  * @param msg
1238  *        The PSYC message to parse.
1239  * @param[out] method_name
1240  *        Pointer to the method name inside @a pmsg.
1241  * @param env
1242  *        The environment for the message with a list of modifiers.
1243  * @param[out] data
1244  *        Pointer to data inside @a msg.
1245  * @param[out] data_size
1246  *        Size of @data is written here.
1247  *
1248  * @return #GNUNET_OK on success,
1249  *         #GNUNET_SYSERR on parse error.
1250  */
1251 int
1252 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1253                            const char **method_name,
1254                            struct GNUNET_PSYC_Environment *env,
1255                            const void **data,
1256                            uint16_t *data_size)
1257 {
1258   struct ParseMessageClosure cls;
1259   cls.env = env;
1260   cls.method_name = method_name;
1261   cls.data = data;
1262   cls.data_size = data_size;
1263
1264   struct GNUNET_PSYC_ReceiveHandle *
1265     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1266   int ret = GNUNET_PSYC_receive_message (recv, msg);
1267   GNUNET_PSYC_receive_destroy (recv);
1268
1269   if (GNUNET_OK != ret)
1270     return GNUNET_SYSERR;
1271
1272   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1273     ? GNUNET_OK
1274     : GNUNET_NO;
1275 }
1276
1277
1278 /**
1279  * Initialize PSYC message header.
1280  */
1281 void
1282 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1283                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1284                                  uint32_t flags)
1285 {
1286   uint16_t size = ntohs (mmsg->header.size);
1287   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1288
1289   pmsg->header.size = htons (psize);
1290   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1291   pmsg->message_id = mmsg->message_id;
1292   pmsg->fragment_offset = mmsg->fragment_offset;
1293   pmsg->flags = htonl (flags);
1294
1295   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1296 }
1297
1298
1299 /**
1300  * Create a new PSYC message header from a multicast message.
1301  */
1302 struct GNUNET_PSYC_MessageHeader *
1303 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1304                                    uint32_t flags)
1305 {
1306   struct GNUNET_PSYC_MessageHeader *pmsg;
1307   uint16_t size = ntohs (mmsg->header.size);
1308   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1309
1310   pmsg = GNUNET_malloc (psize);
1311   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1312   return pmsg;
1313 }
1314
1315
1316 /**
1317  * Create a new PSYC message header from a PSYC message.
1318  */
1319 struct GNUNET_PSYC_MessageHeader *
1320 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1321 {
1322   uint16_t msg_size = ntohs (msg->header.size);
1323   struct GNUNET_PSYC_MessageHeader *
1324     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1325   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1326   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1327   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1328   return pmsg;
1329 }