fix RPS service: Provide closure for view insertion
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software: you can redistribute it and/or modify it
6  * under the terms of the GNU Affero General Public License as published
7  * by the Free Software Foundation, either version 3 of the License,
8  * or (at your option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 /**
20  * @file psycutil/psyc_message.c
21  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
22  * @author Gabor X Toth
23  */
24
25 #include <inttypes.h>
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_psyc_util_lib.h"
30 #include "gnunet_psyc_service.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
33
34
35 struct GNUNET_PSYC_TransmitHandle
36 {
37   /**
38    * Client connection to service.
39    */
40   struct GNUNET_MQ_Handle *mq;
41
42   /**
43    * Message currently being received from the client.
44    */
45   struct GNUNET_MessageHeader *msg;
46
47   /**
48    * Envelope for @a msg
49    */
50   struct GNUNET_MQ_Envelope *env;
51
52   /**
53    * Callback to request next modifier from client.
54    */
55   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
56
57   /**
58    * Closure for the notify callbacks.
59    */
60   void *notify_mod_cls;
61
62   /**
63    * Callback to request next data fragment from client.
64    */
65   GNUNET_PSYC_TransmitNotifyData notify_data;
66
67   /**
68    * Closure for the notify callbacks.
69    */
70   void *notify_data_cls;
71
72   /**
73    * Modifier of the environment that is currently being transmitted.
74    */
75   struct GNUNET_PSYC_Modifier *mod;
76
77   /**
78    *
79    */
80   const char *mod_value;
81
82   /**
83    * Number of bytes remaining to be transmitted from the current modifier value.
84    */
85   uint32_t mod_value_remaining;
86
87   /**
88    * State of the current message being received from client.
89    */
90   enum GNUNET_PSYC_MessageState state;
91
92   /**
93    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
94    */
95   uint8_t acks_pending;
96
97   /**
98    * Is transmission paused?
99    */
100   uint8_t paused;
101
102   /**
103    * Are we currently transmitting a message?
104    */
105   uint8_t in_transmit;
106
107   /**
108    * Notify callback is currently being called.
109    */
110   uint8_t in_notify;
111
112 };
113
114
115
116 struct GNUNET_PSYC_ReceiveHandle
117 {
118   /**
119    * Message callback.
120    */
121   GNUNET_PSYC_MessageCallback message_cb;
122
123   /**
124    * Message part callback.
125    */
126   GNUNET_PSYC_MessagePartCallback message_part_cb;
127
128   /**
129    * Closure for the callbacks.
130    */
131   void *cb_cls;
132
133   /**
134    * ID of the message being received from the PSYC service.
135    */
136   uint64_t message_id;
137
138   /**
139    * Public key of the slave from which a message is being received.
140    */
141   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
142
143   /**
144    * State of the currently being received message from the PSYC service.
145    */
146   enum GNUNET_PSYC_MessageState state;
147
148   /**
149    * Flags for the currently being received message from the PSYC service.
150    */
151   enum GNUNET_PSYC_MessageFlags flags;
152
153   /**
154    * Expected value size for the modifier being received from the PSYC service.
155    */
156   uint32_t mod_value_size_expected;
157
158   /**
159    * Actual value size for the modifier being received from the PSYC service.
160    */
161   uint32_t mod_value_size;
162 };
163
164
165 /**** Messages ****/
166
167
168 /**
169  * Create a PSYC message.
170  *
171  * @param method_name
172  *        PSYC method for the message.
173  * @param env
174  *        Environment for the message.
175  * @param data
176  *        Data payload for the message.
177  * @param data_size
178  *        Size of @a data.
179  *
180  * @return Message header with size information,
181  *         followed by the message parts.
182  */
183 struct GNUNET_PSYC_Message *
184 GNUNET_PSYC_message_create (const char *method_name,
185                             const struct GNUNET_PSYC_Environment *env,
186                             const void *data,
187                             size_t data_size)
188 {
189   struct GNUNET_PSYC_Modifier *mod = NULL;
190   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
191   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
192   struct GNUNET_MessageHeader *pmsg = NULL;
193   uint16_t env_size = 0;
194   if (NULL != env)
195   {
196     mod = GNUNET_PSYC_env_head (env);
197     while (NULL != mod)
198     {
199       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
200       mod = mod->next;
201     }
202   }
203
204   struct GNUNET_PSYC_Message *msg;
205   uint16_t method_name_size = strlen (method_name) + 1;
206   if (method_name_size == 1)
207     return NULL;
208
209   uint16_t msg_size = sizeof (*msg)                      /* header */
210     + sizeof (*pmeth) + method_name_size                 /* method */
211     + env_size                                           /* modifiers */
212     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
213     + sizeof (*pmsg);                                    /* end of message */
214   msg = GNUNET_malloc (msg_size);
215   msg->header.size = htons (msg_size);
216   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
217
218   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
219   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
220   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
221   GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
222
223   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
224   if (NULL != env)
225   {
226     mod = GNUNET_PSYC_env_head (env);
227     while (NULL != mod)
228     {
229       uint16_t mod_name_size = strlen (mod->name) + 1;
230       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
231       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
232       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
233       p += pmod->header.size;
234       pmod->header.size = htons (pmod->header.size);
235
236       pmod->oper = mod->oper;
237       pmod->name_size = htons (mod_name_size);
238       pmod->value_size = htonl (mod->value_size);
239
240       GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
241       if (0 < mod->value_size)
242         GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
243
244       mod = mod->next;
245     }
246   }
247
248   if (0 < data_size)
249   {
250     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
251     pmsg->size = sizeof (*pmsg) + data_size;
252     p += pmsg->size;
253     pmsg->size = htons (pmsg->size);
254     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
255     GNUNET_memcpy (&pmsg[1], data, data_size);
256   }
257
258   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
259   pmsg->size = htons (sizeof (*pmsg));
260   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
261
262   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
263   return msg;
264 }
265
266
267 void
268 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
269                          const struct GNUNET_MessageHeader *msg)
270 {
271   uint16_t size = ntohs (msg->size);
272   uint16_t type = ntohs (msg->type);
273
274   GNUNET_log (kind,
275               "Message of type %d and size %u:\n",
276               type,
277               size);
278   switch (type)
279   {
280   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
281   {
282     const struct GNUNET_PSYC_MessageHeader *pmsg
283       = (const struct GNUNET_PSYC_MessageHeader *) msg;
284     GNUNET_log (kind,
285                 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
286                 GNUNET_ntohll (pmsg->message_id),
287                 ntohl (pmsg->flags));
288     break;
289   }
290   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
291   {
292     const struct GNUNET_PSYC_MessageMethod *meth
293       = (const struct GNUNET_PSYC_MessageMethod *) msg;
294     GNUNET_log (kind,
295                 "\t%.*s\n",
296                 (int) (size - sizeof (*meth)),
297                 (const char *) &meth[1]);
298     break;
299   }
300   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
301   {
302     const struct GNUNET_PSYC_MessageModifier *mod
303       = (const struct GNUNET_PSYC_MessageModifier *) msg;
304     uint16_t name_size = ntohs (mod->name_size);
305     char oper = ' ' < mod->oper ? mod->oper : ' ';
306     GNUNET_log (kind,
307                 "\t%c%.*s\t%.*s\n",
308                 oper,
309                 (int) name_size,
310                 (const char *) &mod[1],
311                 (int) (size - sizeof (*mod) - name_size),
312                 ((const char *) &mod[1]) + name_size);
313     break;
314   }
315   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
316   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
317     GNUNET_log (kind,
318                 "\t%.*s\n",
319                 (int) (size - sizeof (*msg)),
320                 (const char *) &msg[1]);
321     break;
322   }
323 }
324
325
326 /**** Transmitting messages ****/
327
328
329 /**
330  * Create a transmission handle.
331  */
332 struct GNUNET_PSYC_TransmitHandle *
333 GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
334 {
335   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
336
337   tmit->mq = mq;
338   return tmit;
339 }
340
341
342 /**
343  * Destroy a transmission handle.
344  */
345 void
346 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
347 {
348   GNUNET_free (tmit);
349 }
350
351
352 /**
353  * Queue a message part for transmission.
354  *
355  * The message part is added to the current message buffer.
356  * When this buffer is full, it is added to the transmission queue.
357  *
358  * @param tmit
359  *        Transmission handle.
360  * @param msg
361  *        Message part, or NULL.
362  * @param tmit_now
363  *        Transmit message now, or wait for buffer to fill up?
364  *        #GNUNET_YES or #GNUNET_NO.
365  */
366 static void
367 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
368                        const struct GNUNET_MessageHeader *msg,
369                        uint8_t tmit_now)
370 {
371   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
372
373   LOG (GNUNET_ERROR_TYPE_DEBUG,
374        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
375        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
376
377   if (NULL != tmit->msg)
378   {
379     if (NULL == msg
380         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
381     {
382       /* End of message or buffer is full, add it to transmission queue
383        * and start with empty buffer */
384       tmit->msg->size = htons (tmit->msg->size);
385       GNUNET_MQ_send (tmit->mq, tmit->env);
386       tmit->env = NULL;
387       tmit->msg = NULL;
388       tmit->acks_pending++;
389     }
390     else
391     {
392       /* Message fits in current buffer, append */
393       GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
394       tmit->msg->size += size;
395     }
396   }
397
398   if (NULL == tmit->msg && NULL != msg)
399   {
400     /* Empty buffer, copy over message. */
401     tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
402                                      GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
403                                      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
404     /* store current message size in host byte order
405      * then later switch it to network byte order before sending */
406     tmit->msg->size = sizeof (*tmit->msg) + size;
407
408     GNUNET_memcpy (&tmit->msg[1], msg, size);
409   }
410
411   if (NULL != tmit->msg
412       && (GNUNET_YES == tmit_now
413           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
414               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
415   {
416     /* End of message or buffer is full, add it to transmission queue. */
417     tmit->msg->size = htons (tmit->msg->size);
418     GNUNET_MQ_send (tmit->mq, tmit->env);
419     tmit->env = NULL;
420     tmit->msg = NULL;
421     tmit->acks_pending++;
422   }
423 }
424
425
426 /**
427  * Request data from client to transmit.
428  *
429  * @param tmit  Transmission handle.
430  */
431 static void
432 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
433 {
434   int notify_ret = GNUNET_YES;
435   uint16_t data_size = 0;
436   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
437   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
438   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
439
440   if (NULL != tmit->notify_data)
441   {
442     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
443     tmit->in_notify = GNUNET_YES;
444     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
445     tmit->in_notify = GNUNET_NO;
446   }
447   LOG (GNUNET_ERROR_TYPE_DEBUG,
448        "transmit_data (ret: %d, size: %u): %.*s\n",
449        notify_ret, data_size, data_size, &msg[1]);
450   switch (notify_ret)
451   {
452   case GNUNET_NO:
453     if (0 == data_size)
454     {
455       /* Transmission paused, nothing to send. */
456       tmit->paused = GNUNET_YES;
457       return;
458     }
459     break;
460
461   case GNUNET_YES:
462     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
463     break;
464
465   default:
466     LOG (GNUNET_ERROR_TYPE_ERROR,
467          "TransmitNotifyData callback returned error when requesting data.\n");
468
469     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
470     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
471     msg->size = htons (sizeof (*msg));
472     transmit_queue_insert (tmit, msg, GNUNET_YES);
473     tmit->in_transmit = GNUNET_NO;
474     return;
475   }
476
477   if (0 < data_size)
478   {
479     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
480     msg->size = htons (sizeof (*msg) + data_size);
481     transmit_queue_insert (tmit, msg, !notify_ret);
482   }
483
484   /* End of message. */
485   if (GNUNET_YES == notify_ret)
486   {
487     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
488     msg->size = htons (sizeof (*msg));
489     transmit_queue_insert (tmit, msg, GNUNET_YES);
490     /* FIXME: wait for ACK before setting in_transmit to no */
491     tmit->in_transmit = GNUNET_NO;
492   }
493 }
494
495
496 /**
497  * Request a modifier from a client to transmit.
498  *
499  * @param tmit  Transmission handle.
500  */
501 static void
502 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
503 {
504   uint16_t max_data_size = 0;
505   uint16_t data_size = 0;
506   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
507   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
508   int notify_ret = GNUNET_YES;
509
510   switch (tmit->state)
511   {
512   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
513   {
514     struct GNUNET_PSYC_MessageModifier *mod
515       = (struct GNUNET_PSYC_MessageModifier *) msg;
516     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
517     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
518
519     if (NULL != tmit->notify_mod)
520     {
521       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
522       data_size = max_data_size;
523       tmit->in_notify = GNUNET_YES;
524       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
525                                      &mod->oper, &mod->value_size);
526       tmit->in_notify = GNUNET_NO;
527     }
528
529     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
530     LOG (GNUNET_ERROR_TYPE_DEBUG,
531          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
532          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
533     if (mod->name_size < data_size)
534     {
535       tmit->mod_value_remaining
536         = mod->value_size - (data_size - mod->name_size);
537       mod->value_size = htonl (mod->value_size);
538       mod->name_size = htons (mod->name_size);
539     }
540     else if (0 < data_size)
541     {
542       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
543       notify_ret = GNUNET_SYSERR;
544     }
545     break;
546   }
547   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
548   {
549     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
550     msg->size = sizeof (struct GNUNET_MessageHeader);
551
552     if (NULL != tmit->notify_mod)
553     {
554       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
555       data_size = max_data_size;
556       tmit->in_notify = GNUNET_YES;
557       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
558                                      &data_size, &msg[1], NULL, NULL);
559       tmit->in_notify = GNUNET_NO;
560     }
561     tmit->mod_value_remaining -= data_size;
562     LOG (GNUNET_ERROR_TYPE_DEBUG,
563          "transmit_mod (ret: %d, size: %u): %.*s\n",
564          notify_ret, data_size, data_size, &msg[1]);
565     break;
566   }
567   default:
568     GNUNET_assert (0);
569   }
570
571   switch (notify_ret)
572   {
573   case GNUNET_NO:
574     if (0 == data_size)
575     { /* Transmission paused, nothing to send. */
576       tmit->paused = GNUNET_YES;
577       return;
578     }
579     tmit->state
580       = (0 == tmit->mod_value_remaining)
581       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
582       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
583     break;
584
585   case GNUNET_YES: /* End of modifiers. */
586     GNUNET_assert (0 == tmit->mod_value_remaining);
587     break;
588
589   default:
590     LOG (GNUNET_ERROR_TYPE_ERROR,
591          "TransmitNotifyModifier callback returned with error.\n");
592
593     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
594     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
595     msg->size = htons (sizeof (*msg));
596     transmit_queue_insert (tmit, msg, GNUNET_YES);
597     tmit->in_transmit = GNUNET_NO;
598     return;
599   }
600
601   if (0 < data_size)
602   {
603     GNUNET_assert (data_size <= max_data_size);
604     msg->size = htons (msg->size + data_size);
605     transmit_queue_insert (tmit, msg, GNUNET_NO);
606   }
607
608   if (GNUNET_YES == notify_ret)
609   {
610     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
611     if (0 == tmit->acks_pending)
612       transmit_data (tmit);
613   }
614   else
615   {
616     transmit_mod (tmit);
617   }
618 }
619
620
621 int
622 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
623                      uint32_t *full_value_size)
624
625 {
626   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
627   uint16_t name_size = 0;
628   uint32_t value_size = 0;
629   const char *value = NULL;
630
631   if (NULL != oper)
632   { /* New modifier */
633     if (NULL != tmit->mod)
634       tmit->mod = tmit->mod->next;
635     if (NULL == tmit->mod)
636     { /* No more modifiers, continue with data */
637       *data_size = 0;
638       return GNUNET_YES;
639     }
640
641     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
642     *full_value_size = tmit->mod->value_size;
643     *oper = tmit->mod->oper;
644     name_size = strlen (tmit->mod->name) + 1;
645
646     if (name_size + tmit->mod->value_size <= *data_size)
647     {
648       value_size = tmit->mod->value_size;
649       *data_size = name_size + value_size;
650     }
651     else /* full modifier does not fit in data, continuation needed */
652     {
653       value_size = *data_size - name_size;
654       tmit->mod_value = tmit->mod->value + value_size;
655     }
656
657     GNUNET_memcpy (data, tmit->mod->name, name_size);
658     GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
659     return GNUNET_NO;
660   }
661   else
662   { /* Modifier continuation */
663     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
664     value = tmit->mod_value;
665     if (tmit->mod_value_remaining <= *data_size)
666     {
667       value_size = tmit->mod_value_remaining;
668       tmit->mod_value = NULL;
669     }
670     else
671     {
672       value_size = *data_size;
673       tmit->mod_value += value_size;
674     }
675
676     if (*data_size < value_size)
677     {
678       LOG (GNUNET_ERROR_TYPE_DEBUG,
679            "Value in environment larger than buffer: %u < %zu\n",
680            *data_size, value_size);
681       *data_size = 0;
682       return GNUNET_NO;
683     }
684
685     *data_size = value_size;
686     GNUNET_memcpy (data, value, value_size);
687     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
688   }
689 }
690
691
692 /**
693  * Transmit a message.
694  *
695  * @param tmit
696  *        Transmission handle.
697  * @param method_name
698  *        Which method should be invoked.
699  * @param env
700  *        Environment for the message.
701  *        Should stay available until the first call to notify_data.
702  *        Can be NULL if there are no modifiers or @a notify_mod is
703  *        provided instead.
704  * @param notify_mod
705  *        Function to call to obtain modifiers.
706  *        Can be NULL if there are no modifiers or @a env is provided instead.
707  * @param notify_data
708  *        Function to call to obtain fragments of the data.
709  * @param notify_cls
710  *        Closure for @a notify_mod and @a notify_data.
711  * @param flags
712  *        Flags for the message being transmitted.
713  *
714  * @return #GNUNET_OK if the transmission was started.
715  *         #GNUNET_SYSERR if another transmission is already going on.
716  */
717 int
718 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
719                               const char *method_name,
720                               const struct GNUNET_PSYC_Environment *env,
721                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
722                               GNUNET_PSYC_TransmitNotifyData notify_data,
723                               void *notify_cls,
724                               uint32_t flags)
725 {
726   if (GNUNET_NO != tmit->in_transmit)
727     return GNUNET_SYSERR;
728   tmit->in_transmit = GNUNET_YES;
729
730   size_t size = strlen (method_name) + 1;
731   struct GNUNET_PSYC_MessageMethod *pmeth;
732
733   tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
734                                    GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
735                                    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
736   /* store current message size in host byte order
737    * then later switch it to network byte order before sending */
738   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
739
740   if (NULL != notify_mod)
741   {
742     tmit->notify_mod = notify_mod;
743     tmit->notify_mod_cls = notify_cls;
744   }
745   else
746   {
747     tmit->notify_mod = &transmit_notify_env;
748     tmit->notify_mod_cls = tmit;
749     if (NULL != env)
750     {
751       struct GNUNET_PSYC_Modifier mod = {};
752       mod.next = GNUNET_PSYC_env_head (env);
753       tmit->mod = &mod;
754
755       struct GNUNET_PSYC_Modifier *m = tmit->mod;
756       while (NULL != (m = m->next))
757       {
758         if (m->oper != GNUNET_PSYC_OP_SET)
759           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
760       }
761     }
762     else
763     {
764       tmit->mod = NULL;
765     }
766   }
767
768   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
769   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
770   pmeth->header.size = htons (sizeof (*pmeth) + size);
771   pmeth->flags = htonl (flags);
772   GNUNET_memcpy (&pmeth[1], method_name, size);
773
774   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
775   tmit->notify_data = notify_data;
776   tmit->notify_data_cls = notify_cls;
777
778   transmit_mod (tmit);
779   return GNUNET_OK;
780 }
781
782
783 /**
784  * Resume transmission.
785  *
786  * @param tmit  Transmission handle.
787  */
788 void
789 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
790 {
791   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
792     return;
793
794   if (0 == tmit->acks_pending)
795   {
796     tmit->paused = GNUNET_NO;
797     transmit_data (tmit);
798   }
799 }
800
801
802 /**
803  * Abort transmission request.
804  *
805  * @param tmit  Transmission handle.
806  */
807 void
808 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
809 {
810   if (GNUNET_NO == tmit->in_transmit)
811     return;
812
813   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
814   tmit->in_transmit = GNUNET_NO;
815   tmit->paused = GNUNET_NO;
816
817   /* FIXME */
818   struct GNUNET_MessageHeader msg;
819   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
820   msg.size = htons (sizeof (msg));
821   transmit_queue_insert (tmit, &msg, GNUNET_YES);
822 }
823
824
825 /**
826  * Got acknowledgement of a transmitted message part, continue transmission.
827  *
828  * @param tmit  Transmission handle.
829  */
830 void
831 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
832 {
833   if (0 == tmit->acks_pending)
834   {
835     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
836     GNUNET_break (0);
837     return;
838   }
839   tmit->acks_pending--;
840
841   if (GNUNET_YES == tmit->paused)
842     return;
843
844   switch (tmit->state)
845   {
846   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
847   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
848     transmit_mod (tmit);
849     break;
850
851   case GNUNET_PSYC_MESSAGE_STATE_DATA:
852     transmit_data (tmit);
853     break;
854
855   case GNUNET_PSYC_MESSAGE_STATE_END:
856   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
857     break;
858
859   default:
860     LOG (GNUNET_ERROR_TYPE_DEBUG,
861          "Ignoring message ACK in state %u.\n", tmit->state);
862   }
863 }
864
865
866 /**** Receiving messages ****/
867
868
869 /**
870  * Create handle for receiving messages.
871  */
872 struct GNUNET_PSYC_ReceiveHandle *
873 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
874                             GNUNET_PSYC_MessagePartCallback message_part_cb,
875                             void *cb_cls)
876 {
877   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
878   recv->message_cb = message_cb;
879   recv->message_part_cb = message_part_cb;
880   recv->cb_cls = cb_cls;
881   return recv;
882 }
883
884
885 /**
886  * Destroy handle for receiving messages.
887  */
888 void
889 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
890 {
891   GNUNET_free (recv);
892 }
893
894
895 /**
896  * Reset stored data related to the last received message.
897  */
898 void
899 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
900 {
901   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
902   recv->flags = 0;
903   recv->message_id = 0;
904   recv->mod_value_size = 0;
905   recv->mod_value_size_expected = 0;
906 }
907
908
909 static void
910 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
911 {
912   if (NULL != recv->message_part_cb)
913     recv->message_part_cb (recv->cb_cls, NULL, NULL);
914
915   if (NULL != recv->message_cb)
916     recv->message_cb (recv->cb_cls, NULL);
917
918   GNUNET_PSYC_receive_reset (recv);
919 }
920
921
922 /**
923  * Handle incoming PSYC message.
924  *
925  * @param recv  Receive handle.
926  * @param msg   The message.
927  *
928  * @return #GNUNET_OK on success,
929  *         #GNUNET_SYSERR on receive error.
930  */
931 int
932 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
933                              const struct GNUNET_PSYC_MessageHeader *msg)
934 {
935   uint16_t size = ntohs (msg->header.size);
936   uint32_t flags = ntohl (msg->flags);
937
938   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
939                            (struct GNUNET_MessageHeader *) msg);
940
941   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
942   {
943     recv->message_id = GNUNET_ntohll (msg->message_id);
944     recv->flags = flags;
945     recv->slave_pub_key = msg->slave_pub_key;
946     recv->mod_value_size = 0;
947     recv->mod_value_size_expected = 0;
948   }
949   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
950   {
951     // FIXME
952     LOG (GNUNET_ERROR_TYPE_WARNING,
953          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
954          GNUNET_ntohll (msg->message_id), recv->message_id);
955     GNUNET_break_op (0);
956     recv_error (recv);
957     return GNUNET_SYSERR;
958   }
959   else if (flags != recv->flags)
960   {
961     LOG (GNUNET_ERROR_TYPE_WARNING,
962          "Unexpected message flags. Got: %lu, expected: %lu\n",
963          flags, recv->flags);
964     GNUNET_break_op (0);
965     recv_error (recv);
966     return GNUNET_SYSERR;
967   }
968
969   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
970
971   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
972   {
973     const struct GNUNET_MessageHeader *pmsg
974       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
975     psize = ntohs (pmsg->size);
976     ptype = ntohs (pmsg->type);
977     size_eq = size_min = 0;
978
979     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
980     {
981       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
982                   "Dropping message of type %u with invalid size %u.\n",
983                   ptype, psize);
984       recv_error (recv);
985       return GNUNET_SYSERR;
986     }
987
988     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989                 "Received message part of type %u and size %u from PSYC.\n",
990                 ptype, psize);
991     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
992
993     switch (ptype)
994     {
995     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
996       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
997       break;
998     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
999       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
1000       break;
1001     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1002     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1003       size_min = sizeof (struct GNUNET_MessageHeader);
1004       break;
1005     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1006     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1007       size_eq = sizeof (struct GNUNET_MessageHeader);
1008       break;
1009     default:
1010       GNUNET_break_op (0);
1011       recv_error (recv);
1012       return GNUNET_SYSERR;
1013     }
1014
1015     if (! ((0 < size_eq && psize == size_eq)
1016            || (0 < size_min && size_min <= psize)))
1017     {
1018       GNUNET_break_op (0);
1019       recv_error (recv);
1020       return GNUNET_SYSERR;
1021     }
1022
1023     switch (ptype)
1024     {
1025     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1026     {
1027       struct GNUNET_PSYC_MessageMethod *meth
1028         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1029
1030       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1031       {
1032         LOG (GNUNET_ERROR_TYPE_WARNING,
1033              "Dropping out of order message method (%u).\n",
1034              recv->state);
1035         /* It is normal to receive an incomplete message right after connecting,
1036          * but should not happen later.
1037          * FIXME: add a check for this condition.
1038          */
1039         GNUNET_break_op (0);
1040         recv_error (recv);
1041         return GNUNET_SYSERR;
1042       }
1043
1044       if ('\0' != *((char *) meth + psize - 1))
1045       {
1046         LOG (GNUNET_ERROR_TYPE_WARNING,
1047              "Dropping message with malformed method. "
1048              "Message ID: %" PRIu64 "\n", recv->message_id);
1049         GNUNET_break_op (0);
1050         recv_error (recv);
1051         return GNUNET_SYSERR;
1052       }
1053       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1054       break;
1055     }
1056     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1057     {
1058       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1059             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1060             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1061       {
1062         LOG (GNUNET_ERROR_TYPE_WARNING,
1063              "Dropping out of order message modifier (%u).\n",
1064              recv->state);
1065         GNUNET_break_op (0);
1066         recv_error (recv);
1067         return GNUNET_SYSERR;
1068       }
1069
1070       struct GNUNET_PSYC_MessageModifier *mod
1071         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1072
1073       uint16_t name_size = ntohs (mod->name_size);
1074       recv->mod_value_size_expected = ntohl (mod->value_size);
1075       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1076
1077       if (psize < sizeof (*mod) + name_size
1078           || '\0' != *((char *) &mod[1] + name_size - 1)
1079           || recv->mod_value_size_expected < recv->mod_value_size)
1080       {
1081         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1082         GNUNET_break_op (0);
1083         recv_error (recv);
1084         return GNUNET_SYSERR;
1085       }
1086       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1087       break;
1088     }
1089     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1090     {
1091       recv->mod_value_size += psize - sizeof (*pmsg);
1092
1093       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1094             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1095           || recv->mod_value_size_expected < recv->mod_value_size)
1096       {
1097         LOG (GNUNET_ERROR_TYPE_WARNING,
1098              "Dropping out of order message modifier continuation "
1099              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1100              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1101              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1102              recv->mod_value_size_expected, recv->mod_value_size);
1103         GNUNET_break_op (0);
1104         recv_error (recv);
1105         return GNUNET_SYSERR;
1106       }
1107       break;
1108     }
1109     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1110     {
1111       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1112           || recv->mod_value_size_expected != recv->mod_value_size)
1113       {
1114         LOG (GNUNET_ERROR_TYPE_WARNING,
1115              "Dropping out of order message data fragment "
1116              "(%u < %u || %lu != %lu).\n",
1117              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1118              recv->mod_value_size_expected, recv->mod_value_size);
1119
1120         GNUNET_break_op (0);
1121         recv_error (recv);
1122         return GNUNET_SYSERR;
1123       }
1124       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1125       break;
1126     }
1127     }
1128
1129     if (NULL != recv->message_part_cb)
1130       recv->message_part_cb (recv->cb_cls, msg, pmsg);
1131
1132     switch (ptype)
1133     {
1134     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1135     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1136       GNUNET_PSYC_receive_reset (recv);
1137       break;
1138     }
1139   }
1140
1141   if (NULL != recv->message_cb)
1142     recv->message_cb (recv->cb_cls, msg);
1143   return GNUNET_OK;
1144 }
1145
1146
1147 /**
1148  * Check if @a data contains a series of valid message parts.
1149  *
1150  * @param      data_size    Size of @a data.
1151  * @param      data         Data.
1152  * @param[out] first_ptype  Type of first message part.
1153  * @param[out] last_ptype   Type of last message part.
1154  *
1155  * @return Number of message parts found in @a data.
1156  *         or GNUNET_SYSERR if the message contains invalid parts.
1157  */
1158 int
1159 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1160                                  uint16_t *first_ptype, uint16_t *last_ptype)
1161 {
1162   const struct GNUNET_MessageHeader *pmsg;
1163   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1164   if (NULL != first_ptype)
1165     *first_ptype = 0;
1166   if (NULL != last_ptype)
1167     *last_ptype = 0;
1168
1169   for (pos = 0; pos < data_size; pos += psize, parts++)
1170   {
1171     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1172     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1173     psize = ntohs (pmsg->size);
1174     ptype = ntohs (pmsg->type);
1175     if (0 == parts && NULL != first_ptype)
1176       *first_ptype = ptype;
1177     if (NULL != last_ptype
1178         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1179       *last_ptype = ptype;
1180     if (psize < sizeof (*pmsg)
1181         || pos + psize > data_size
1182         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1183         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1184     {
1185       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1186                   "Invalid message part of type %u and size %u.\n",
1187                   ptype, psize);
1188       return GNUNET_SYSERR;
1189     }
1190     /** @todo FIXME: check message part order */
1191   }
1192   return parts;
1193 }
1194
1195
1196 struct ParseMessageClosure
1197 {
1198   struct GNUNET_PSYC_Environment *env;
1199   const char **method_name;
1200   const void **data;
1201   uint16_t *data_size;
1202   enum GNUNET_PSYC_MessageState msg_state;
1203 };
1204
1205
1206 static void
1207 parse_message_part_cb (void *cls,
1208                        const struct GNUNET_PSYC_MessageHeader *msg,
1209                        const struct GNUNET_MessageHeader *pmsg)
1210 {
1211   struct ParseMessageClosure *pmc = cls;
1212   if (NULL == pmsg)
1213   {
1214     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1215     return;
1216   }
1217
1218   switch (ntohs (pmsg->type))
1219   {
1220   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1221   {
1222     struct GNUNET_PSYC_MessageMethod *
1223       pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1224     *pmc->method_name = (const char *) &pmeth[1];
1225     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1226     break;
1227   }
1228
1229   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1230   {
1231     struct GNUNET_PSYC_MessageModifier *
1232       pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1233
1234     const char *name = (const char *) &pmod[1];
1235     const void *value = name + ntohs (pmod->name_size);
1236     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1237                          ntohl (pmod->value_size));
1238     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1239     break;
1240   }
1241
1242   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1243     *pmc->data = &pmsg[1];
1244     *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1245     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1246     break;
1247
1248   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1249     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1250     break;
1251
1252   default:
1253     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1254   }
1255 }
1256
1257
1258 /**
1259  * Parse PSYC message.
1260  *
1261  * @param msg
1262  *        The PSYC message to parse.
1263  * @param[out] method_name
1264  *        Pointer to the method name inside @a pmsg.
1265  * @param env
1266  *        The environment for the message with a list of modifiers.
1267  * @param[out] data
1268  *        Pointer to data inside @a msg.
1269  * @param[out] data_size
1270  *        Size of @data is written here.
1271  *
1272  * @return #GNUNET_OK on success,
1273  *         #GNUNET_SYSERR on parse error.
1274  */
1275 int
1276 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1277                            const char **method_name,
1278                            struct GNUNET_PSYC_Environment *env,
1279                            const void **data,
1280                            uint16_t *data_size)
1281 {
1282   struct ParseMessageClosure cls;
1283   cls.env = env;
1284   cls.method_name = method_name;
1285   cls.data = data;
1286   cls.data_size = data_size;
1287
1288   struct GNUNET_PSYC_ReceiveHandle *
1289     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1290   int ret = GNUNET_PSYC_receive_message (recv, msg);
1291   GNUNET_PSYC_receive_destroy (recv);
1292
1293   if (GNUNET_OK != ret)
1294     return GNUNET_SYSERR;
1295
1296   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1297     ? GNUNET_OK
1298     : GNUNET_NO;
1299 }
1300
1301
1302 /**
1303  * Initialize PSYC message header.
1304  */
1305 void
1306 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1307                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1308                                  uint32_t flags)
1309 {
1310   uint16_t size = ntohs (mmsg->header.size);
1311   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1312
1313   pmsg->header.size = htons (psize);
1314   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1315   pmsg->message_id = mmsg->message_id;
1316   pmsg->fragment_offset = mmsg->fragment_offset;
1317   pmsg->flags = htonl (flags);
1318
1319   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1320 }
1321
1322
1323 /**
1324  * Create a new PSYC message header from a multicast message.
1325  */
1326 struct GNUNET_PSYC_MessageHeader *
1327 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1328                                    uint32_t flags)
1329 {
1330   struct GNUNET_PSYC_MessageHeader *pmsg;
1331   uint16_t size = ntohs (mmsg->header.size);
1332   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1333
1334   pmsg = GNUNET_malloc (psize);
1335   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1336   return pmsg;
1337 }
1338
1339
1340 /**
1341  * Create a new PSYC message header from a PSYC message.
1342  */
1343 struct GNUNET_PSYC_MessageHeader *
1344 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1345 {
1346   uint16_t msg_size = ntohs (msg->header.size);
1347   struct GNUNET_PSYC_MessageHeader *
1348     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1349   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1350   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1351   GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1352   return pmsg;
1353 }