reduce loop counters to more practical levels
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycutil/psyc_message.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37 struct GNUNET_PSYC_TransmitHandle
38 {
39   /**
40    * Client connection to service.
41    */
42   struct GNUNET_MQ_Handle *mq;
43
44   /**
45    * Message currently being received from the client.
46    */
47   struct GNUNET_MessageHeader *msg;
48
49   /**
50    * Envelope for @a msg
51    */
52   struct GNUNET_MQ_Envelope *env;
53
54   /**
55    * Callback to request next modifier from client.
56    */
57   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
58
59   /**
60    * Closure for the notify callbacks.
61    */
62   void *notify_mod_cls;
63
64   /**
65    * Callback to request next data fragment from client.
66    */
67   GNUNET_PSYC_TransmitNotifyData notify_data;
68
69   /**
70    * Closure for the notify callbacks.
71    */
72   void *notify_data_cls;
73
74   /**
75    * Modifier of the environment that is currently being transmitted.
76    */
77   struct GNUNET_PSYC_Modifier *mod;
78
79   /**
80    *
81    */
82   const char *mod_value;
83
84   /**
85    * Number of bytes remaining to be transmitted from the current modifier value.
86    */
87   uint32_t mod_value_remaining;
88
89   /**
90    * State of the current message being received from client.
91    */
92   enum GNUNET_PSYC_MessageState state;
93
94   /**
95    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
96    */
97   uint8_t acks_pending;
98
99   /**
100    * Is transmission paused?
101    */
102   uint8_t paused;
103
104   /**
105    * Are we currently transmitting a message?
106    */
107   uint8_t in_transmit;
108
109   /**
110    * Notify callback is currently being called.
111    */
112   uint8_t in_notify;
113
114 };
115
116
117
118 struct GNUNET_PSYC_ReceiveHandle
119 {
120   /**
121    * Message callback.
122    */
123   GNUNET_PSYC_MessageCallback message_cb;
124
125   /**
126    * Message part callback.
127    */
128   GNUNET_PSYC_MessagePartCallback message_part_cb;
129
130   /**
131    * Closure for the callbacks.
132    */
133   void *cb_cls;
134
135   /**
136    * ID of the message being received from the PSYC service.
137    */
138   uint64_t message_id;
139
140   /**
141    * Public key of the slave from which a message is being received.
142    */
143   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
144
145   /**
146    * State of the currently being received message from the PSYC service.
147    */
148   enum GNUNET_PSYC_MessageState state;
149
150   /**
151    * Flags for the currently being received message from the PSYC service.
152    */
153   enum GNUNET_PSYC_MessageFlags flags;
154
155   /**
156    * Expected value size for the modifier being received from the PSYC service.
157    */
158   uint32_t mod_value_size_expected;
159
160   /**
161    * Actual value size for the modifier being received from the PSYC service.
162    */
163   uint32_t mod_value_size;
164 };
165
166
167 /**** Messages ****/
168
169
170 /**
171  * Create a PSYC message.
172  *
173  * @param method_name
174  *        PSYC method for the message.
175  * @param env
176  *        Environment for the message.
177  * @param data
178  *        Data payload for the message.
179  * @param data_size
180  *        Size of @a data.
181  *
182  * @return Message header with size information,
183  *         followed by the message parts.
184  */
185 struct GNUNET_PSYC_Message *
186 GNUNET_PSYC_message_create (const char *method_name,
187                             const struct GNUNET_PSYC_Environment *env,
188                             const void *data,
189                             size_t data_size)
190 {
191   struct GNUNET_PSYC_Modifier *mod = NULL;
192   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
193   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
194   struct GNUNET_MessageHeader *pmsg = NULL;
195   uint16_t env_size = 0;
196   if (NULL != env)
197   {
198     mod = GNUNET_PSYC_env_head (env);
199     while (NULL != mod)
200     {
201       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
202       mod = mod->next;
203     }
204   }
205
206   struct GNUNET_PSYC_Message *msg;
207   uint16_t method_name_size = strlen (method_name) + 1;
208   if (method_name_size == 1)
209     return NULL;
210
211   uint16_t msg_size = sizeof (*msg)                      /* header */
212     + sizeof (*pmeth) + method_name_size                 /* method */
213     + env_size                                           /* modifiers */
214     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
215     + sizeof (*pmsg);                                    /* end of message */
216   msg = GNUNET_malloc (msg_size);
217   msg->header.size = htons (msg_size);
218   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
219
220   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
221   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
222   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
223   GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
224
225   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
226   if (NULL != env)
227   {
228     mod = GNUNET_PSYC_env_head (env);
229     while (NULL != mod)
230     {
231       uint16_t mod_name_size = strlen (mod->name) + 1;
232       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
233       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
234       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
235       p += pmod->header.size;
236       pmod->header.size = htons (pmod->header.size);
237
238       pmod->oper = mod->oper;
239       pmod->name_size = htons (mod_name_size);
240       pmod->value_size = htonl (mod->value_size);
241
242       GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
243       if (0 < mod->value_size)
244         GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
245
246       mod = mod->next;
247     }
248   }
249
250   if (0 < data_size)
251   {
252     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
253     pmsg->size = sizeof (*pmsg) + data_size;
254     p += pmsg->size;
255     pmsg->size = htons (pmsg->size);
256     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
257     GNUNET_memcpy (&pmsg[1], data, data_size);
258   }
259
260   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
261   pmsg->size = htons (sizeof (*pmsg));
262   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
263
264   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
265   return msg;
266 }
267
268
269 void
270 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
271                          const struct GNUNET_MessageHeader *msg)
272 {
273   uint16_t size = ntohs (msg->size);
274   uint16_t type = ntohs (msg->type);
275
276   GNUNET_log (kind,
277               "Message of type %d and size %u:\n",
278               type,
279               size);
280   switch (type)
281   {
282   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
283   {
284     const struct GNUNET_PSYC_MessageHeader *pmsg
285       = (const struct GNUNET_PSYC_MessageHeader *) msg;
286     GNUNET_log (kind,
287                 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
288                 GNUNET_ntohll (pmsg->message_id),
289                 ntohl (pmsg->flags));
290     break;
291   }
292   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
293   {
294     const struct GNUNET_PSYC_MessageMethod *meth
295       = (const struct GNUNET_PSYC_MessageMethod *) msg;
296     GNUNET_log (kind,
297                 "\t%.*s\n",
298                 (int) (size - sizeof (*meth)),
299                 (const char *) &meth[1]);
300     break;
301   }
302   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
303   {
304     const struct GNUNET_PSYC_MessageModifier *mod
305       = (const struct GNUNET_PSYC_MessageModifier *) msg;
306     uint16_t name_size = ntohs (mod->name_size);
307     char oper = ' ' < mod->oper ? mod->oper : ' ';
308     GNUNET_log (kind,
309                 "\t%c%.*s\t%.*s\n",
310                 oper,
311                 (int) name_size,
312                 (const char *) &mod[1],
313                 (int) (size - sizeof (*mod) - name_size),
314                 ((const char *) &mod[1]) + name_size);
315     break;
316   }
317   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
318   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
319     GNUNET_log (kind,
320                 "\t%.*s\n",
321                 (int) (size - sizeof (*msg)),
322                 (const char *) &msg[1]);
323     break;
324   }
325 }
326
327
328 /**** Transmitting messages ****/
329
330
331 /**
332  * Create a transmission handle.
333  */
334 struct GNUNET_PSYC_TransmitHandle *
335 GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
336 {
337   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
338
339   tmit->mq = mq;
340   return tmit;
341 }
342
343
344 /**
345  * Destroy a transmission handle.
346  */
347 void
348 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
349 {
350   GNUNET_free (tmit);
351 }
352
353
354 /**
355  * Queue a message part for transmission.
356  *
357  * The message part is added to the current message buffer.
358  * When this buffer is full, it is added to the transmission queue.
359  *
360  * @param tmit
361  *        Transmission handle.
362  * @param msg
363  *        Message part, or NULL.
364  * @param tmit_now
365  *        Transmit message now, or wait for buffer to fill up?
366  *        #GNUNET_YES or #GNUNET_NO.
367  */
368 static void
369 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
370                        const struct GNUNET_MessageHeader *msg,
371                        uint8_t tmit_now)
372 {
373   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
374
375   LOG (GNUNET_ERROR_TYPE_DEBUG,
376        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
377        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
378
379   if (NULL != tmit->msg)
380   {
381     if (NULL == msg
382         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
383     {
384       /* End of message or buffer is full, add it to transmission queue
385        * and start with empty buffer */
386       tmit->msg->size = htons (tmit->msg->size);
387       GNUNET_MQ_send (tmit->mq, tmit->env);
388       tmit->env = NULL;
389       tmit->msg = NULL;
390       tmit->acks_pending++;
391     }
392     else
393     {
394       /* Message fits in current buffer, append */
395       GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
396       tmit->msg->size += size;
397     }
398   }
399
400   if (NULL == tmit->msg && NULL != msg)
401   {
402     /* Empty buffer, copy over message. */
403     tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
404                                      GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
405                                      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
406     /* store current message size in host byte order
407      * then later switch it to network byte order before sending */
408     tmit->msg->size = sizeof (*tmit->msg) + size;
409
410     GNUNET_memcpy (&tmit->msg[1], msg, size);
411   }
412
413   if (NULL != tmit->msg
414       && (GNUNET_YES == tmit_now
415           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
416               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
417   {
418     /* End of message or buffer is full, add it to transmission queue. */
419     tmit->msg->size = htons (tmit->msg->size);
420     GNUNET_MQ_send (tmit->mq, tmit->env);
421     tmit->env = NULL;
422     tmit->msg = NULL;
423     tmit->acks_pending++;
424   }
425 }
426
427
428 /**
429  * Request data from client to transmit.
430  *
431  * @param tmit  Transmission handle.
432  */
433 static void
434 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
435 {
436   int notify_ret = GNUNET_YES;
437   uint16_t data_size = 0;
438   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
439   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
440   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
441
442   if (NULL != tmit->notify_data)
443   {
444     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
445     tmit->in_notify = GNUNET_YES;
446     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
447     tmit->in_notify = GNUNET_NO;
448   }
449   LOG (GNUNET_ERROR_TYPE_DEBUG,
450        "transmit_data (ret: %d, size: %u): %.*s\n",
451        notify_ret, data_size, data_size, &msg[1]);
452   switch (notify_ret)
453   {
454   case GNUNET_NO:
455     if (0 == data_size)
456     {
457       /* Transmission paused, nothing to send. */
458       tmit->paused = GNUNET_YES;
459       return;
460     }
461     break;
462
463   case GNUNET_YES:
464     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
465     break;
466
467   default:
468     LOG (GNUNET_ERROR_TYPE_ERROR,
469          "TransmitNotifyData callback returned error when requesting data.\n");
470
471     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
472     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
473     msg->size = htons (sizeof (*msg));
474     transmit_queue_insert (tmit, msg, GNUNET_YES);
475     tmit->in_transmit = GNUNET_NO;
476     return;
477   }
478
479   if (0 < data_size)
480   {
481     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
482     msg->size = htons (sizeof (*msg) + data_size);
483     transmit_queue_insert (tmit, msg, !notify_ret);
484   }
485
486   /* End of message. */
487   if (GNUNET_YES == notify_ret)
488   {
489     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
490     msg->size = htons (sizeof (*msg));
491     transmit_queue_insert (tmit, msg, GNUNET_YES);
492     /* FIXME: wait for ACK before setting in_transmit to no */
493     tmit->in_transmit = GNUNET_NO;
494   }
495 }
496
497
498 /**
499  * Request a modifier from a client to transmit.
500  *
501  * @param tmit  Transmission handle.
502  */
503 static void
504 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
505 {
506   uint16_t max_data_size = 0;
507   uint16_t data_size = 0;
508   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
509   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
510   int notify_ret = GNUNET_YES;
511
512   switch (tmit->state)
513   {
514   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
515   {
516     struct GNUNET_PSYC_MessageModifier *mod
517       = (struct GNUNET_PSYC_MessageModifier *) msg;
518     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
519     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
520
521     if (NULL != tmit->notify_mod)
522     {
523       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
524       data_size = max_data_size;
525       tmit->in_notify = GNUNET_YES;
526       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
527                                      &mod->oper, &mod->value_size);
528       tmit->in_notify = GNUNET_NO;
529     }
530
531     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
532     LOG (GNUNET_ERROR_TYPE_DEBUG,
533          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
534          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
535     if (mod->name_size < data_size)
536     {
537       tmit->mod_value_remaining
538         = mod->value_size - (data_size - mod->name_size);
539       mod->value_size = htonl (mod->value_size);
540       mod->name_size = htons (mod->name_size);
541     }
542     else if (0 < data_size)
543     {
544       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
545       notify_ret = GNUNET_SYSERR;
546     }
547     break;
548   }
549   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
550   {
551     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
552     msg->size = sizeof (struct GNUNET_MessageHeader);
553
554     if (NULL != tmit->notify_mod)
555     {
556       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
557       data_size = max_data_size;
558       tmit->in_notify = GNUNET_YES;
559       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
560                                      &data_size, &msg[1], NULL, NULL);
561       tmit->in_notify = GNUNET_NO;
562     }
563     tmit->mod_value_remaining -= data_size;
564     LOG (GNUNET_ERROR_TYPE_DEBUG,
565          "transmit_mod (ret: %d, size: %u): %.*s\n",
566          notify_ret, data_size, data_size, &msg[1]);
567     break;
568   }
569   default:
570     GNUNET_assert (0);
571   }
572
573   switch (notify_ret)
574   {
575   case GNUNET_NO:
576     if (0 == data_size)
577     { /* Transmission paused, nothing to send. */
578       tmit->paused = GNUNET_YES;
579       return;
580     }
581     tmit->state
582       = (0 == tmit->mod_value_remaining)
583       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
584       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
585     break;
586
587   case GNUNET_YES: /* End of modifiers. */
588     GNUNET_assert (0 == tmit->mod_value_remaining);
589     break;
590
591   default:
592     LOG (GNUNET_ERROR_TYPE_ERROR,
593          "TransmitNotifyModifier callback returned with error.\n");
594
595     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
596     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
597     msg->size = htons (sizeof (*msg));
598     transmit_queue_insert (tmit, msg, GNUNET_YES);
599     tmit->in_transmit = GNUNET_NO;
600     return;
601   }
602
603   if (0 < data_size)
604   {
605     GNUNET_assert (data_size <= max_data_size);
606     msg->size = htons (msg->size + data_size);
607     transmit_queue_insert (tmit, msg, GNUNET_NO);
608   }
609
610   if (GNUNET_YES == notify_ret)
611   {
612     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
613     if (0 == tmit->acks_pending)
614       transmit_data (tmit);
615   }
616   else
617   {
618     transmit_mod (tmit);
619   }
620 }
621
622
623 int
624 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
625                      uint32_t *full_value_size)
626
627 {
628   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
629   uint16_t name_size = 0;
630   uint32_t value_size = 0;
631   const char *value = NULL;
632
633   if (NULL != oper)
634   { /* New modifier */
635     if (NULL != tmit->mod)
636       tmit->mod = tmit->mod->next;
637     if (NULL == tmit->mod)
638     { /* No more modifiers, continue with data */
639       *data_size = 0;
640       return GNUNET_YES;
641     }
642
643     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
644     *full_value_size = tmit->mod->value_size;
645     *oper = tmit->mod->oper;
646     name_size = strlen (tmit->mod->name) + 1;
647
648     if (name_size + tmit->mod->value_size <= *data_size)
649     {
650       value_size = tmit->mod->value_size;
651       *data_size = name_size + value_size;
652     }
653     else /* full modifier does not fit in data, continuation needed */
654     {
655       value_size = *data_size - name_size;
656       tmit->mod_value = tmit->mod->value + value_size;
657     }
658
659     GNUNET_memcpy (data, tmit->mod->name, name_size);
660     GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
661     return GNUNET_NO;
662   }
663   else
664   { /* Modifier continuation */
665     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
666     value = tmit->mod_value;
667     if (tmit->mod_value_remaining <= *data_size)
668     {
669       value_size = tmit->mod_value_remaining;
670       tmit->mod_value = NULL;
671     }
672     else
673     {
674       value_size = *data_size;
675       tmit->mod_value += value_size;
676     }
677
678     if (*data_size < value_size)
679     {
680       LOG (GNUNET_ERROR_TYPE_DEBUG,
681            "Value in environment larger than buffer: %u < %zu\n",
682            *data_size, value_size);
683       *data_size = 0;
684       return GNUNET_NO;
685     }
686
687     *data_size = value_size;
688     GNUNET_memcpy (data, value, value_size);
689     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
690   }
691 }
692
693
694 /**
695  * Transmit a message.
696  *
697  * @param tmit
698  *        Transmission handle.
699  * @param method_name
700  *        Which method should be invoked.
701  * @param env
702  *        Environment for the message.
703  *        Should stay available until the first call to notify_data.
704  *        Can be NULL if there are no modifiers or @a notify_mod is
705  *        provided instead.
706  * @param notify_mod
707  *        Function to call to obtain modifiers.
708  *        Can be NULL if there are no modifiers or @a env is provided instead.
709  * @param notify_data
710  *        Function to call to obtain fragments of the data.
711  * @param notify_cls
712  *        Closure for @a notify_mod and @a notify_data.
713  * @param flags
714  *        Flags for the message being transmitted.
715  *
716  * @return #GNUNET_OK if the transmission was started.
717  *         #GNUNET_SYSERR if another transmission is already going on.
718  */
719 int
720 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
721                               const char *method_name,
722                               const struct GNUNET_PSYC_Environment *env,
723                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
724                               GNUNET_PSYC_TransmitNotifyData notify_data,
725                               void *notify_cls,
726                               uint32_t flags)
727 {
728   if (GNUNET_NO != tmit->in_transmit)
729     return GNUNET_SYSERR;
730   tmit->in_transmit = GNUNET_YES;
731
732   size_t size = strlen (method_name) + 1;
733   struct GNUNET_PSYC_MessageMethod *pmeth;
734
735   tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
736                                    GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
737                                    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
738   /* store current message size in host byte order
739    * then later switch it to network byte order before sending */
740   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
741
742   if (NULL != notify_mod)
743   {
744     tmit->notify_mod = notify_mod;
745     tmit->notify_mod_cls = notify_cls;
746   }
747   else
748   {
749     tmit->notify_mod = &transmit_notify_env;
750     tmit->notify_mod_cls = tmit;
751     if (NULL != env)
752     {
753       struct GNUNET_PSYC_Modifier mod = {};
754       mod.next = GNUNET_PSYC_env_head (env);
755       tmit->mod = &mod;
756
757       struct GNUNET_PSYC_Modifier *m = tmit->mod;
758       while (NULL != (m = m->next))
759       {
760         if (m->oper != GNUNET_PSYC_OP_SET)
761           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
762       }
763     }
764     else
765     {
766       tmit->mod = NULL;
767     }
768   }
769
770   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
771   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
772   pmeth->header.size = htons (sizeof (*pmeth) + size);
773   pmeth->flags = htonl (flags);
774   GNUNET_memcpy (&pmeth[1], method_name, size);
775
776   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
777   tmit->notify_data = notify_data;
778   tmit->notify_data_cls = notify_cls;
779
780   transmit_mod (tmit);
781   return GNUNET_OK;
782 }
783
784
785 /**
786  * Resume transmission.
787  *
788  * @param tmit  Transmission handle.
789  */
790 void
791 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
792 {
793   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
794     return;
795
796   if (0 == tmit->acks_pending)
797   {
798     tmit->paused = GNUNET_NO;
799     transmit_data (tmit);
800   }
801 }
802
803
804 /**
805  * Abort transmission request.
806  *
807  * @param tmit  Transmission handle.
808  */
809 void
810 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
811 {
812   if (GNUNET_NO == tmit->in_transmit)
813     return;
814
815   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
816   tmit->in_transmit = GNUNET_NO;
817   tmit->paused = GNUNET_NO;
818
819   /* FIXME */
820   struct GNUNET_MessageHeader msg;
821   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
822   msg.size = htons (sizeof (msg));
823   transmit_queue_insert (tmit, &msg, GNUNET_YES);
824 }
825
826
827 /**
828  * Got acknowledgement of a transmitted message part, continue transmission.
829  *
830  * @param tmit  Transmission handle.
831  */
832 void
833 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
834 {
835   if (0 == tmit->acks_pending)
836   {
837     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
838     GNUNET_break (0);
839     return;
840   }
841   tmit->acks_pending--;
842
843   if (GNUNET_YES == tmit->paused)
844     return;
845
846   switch (tmit->state)
847   {
848   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
849   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
850     transmit_mod (tmit);
851     break;
852
853   case GNUNET_PSYC_MESSAGE_STATE_DATA:
854     transmit_data (tmit);
855     break;
856
857   case GNUNET_PSYC_MESSAGE_STATE_END:
858   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
859     break;
860
861   default:
862     LOG (GNUNET_ERROR_TYPE_DEBUG,
863          "Ignoring message ACK in state %u.\n", tmit->state);
864   }
865 }
866
867
868 /**** Receiving messages ****/
869
870
871 /**
872  * Create handle for receiving messages.
873  */
874 struct GNUNET_PSYC_ReceiveHandle *
875 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
876                             GNUNET_PSYC_MessagePartCallback message_part_cb,
877                             void *cb_cls)
878 {
879   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
880   recv->message_cb = message_cb;
881   recv->message_part_cb = message_part_cb;
882   recv->cb_cls = cb_cls;
883   return recv;
884 }
885
886
887 /**
888  * Destroy handle for receiving messages.
889  */
890 void
891 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
892 {
893   GNUNET_free (recv);
894 }
895
896
897 /**
898  * Reset stored data related to the last received message.
899  */
900 void
901 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
902 {
903   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
904   recv->flags = 0;
905   recv->message_id = 0;
906   recv->mod_value_size = 0;
907   recv->mod_value_size_expected = 0;
908 }
909
910
911 static void
912 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
913 {
914   if (NULL != recv->message_part_cb)
915     recv->message_part_cb (recv->cb_cls, NULL, NULL);
916
917   if (NULL != recv->message_cb)
918     recv->message_cb (recv->cb_cls, NULL);
919
920   GNUNET_PSYC_receive_reset (recv);
921 }
922
923
924 /**
925  * Handle incoming PSYC message.
926  *
927  * @param recv  Receive handle.
928  * @param msg   The message.
929  *
930  * @return #GNUNET_OK on success,
931  *         #GNUNET_SYSERR on receive error.
932  */
933 int
934 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
935                              const struct GNUNET_PSYC_MessageHeader *msg)
936 {
937   uint16_t size = ntohs (msg->header.size);
938   uint32_t flags = ntohl (msg->flags);
939
940   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
941                            (struct GNUNET_MessageHeader *) msg);
942
943   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
944   {
945     recv->message_id = GNUNET_ntohll (msg->message_id);
946     recv->flags = flags;
947     recv->slave_pub_key = msg->slave_pub_key;
948     recv->mod_value_size = 0;
949     recv->mod_value_size_expected = 0;
950   }
951   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
952   {
953     // FIXME
954     LOG (GNUNET_ERROR_TYPE_WARNING,
955          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
956          GNUNET_ntohll (msg->message_id), recv->message_id);
957     GNUNET_break_op (0);
958     recv_error (recv);
959     return GNUNET_SYSERR;
960   }
961   else if (flags != recv->flags)
962   {
963     LOG (GNUNET_ERROR_TYPE_WARNING,
964          "Unexpected message flags. Got: %lu, expected: %lu\n",
965          flags, recv->flags);
966     GNUNET_break_op (0);
967     recv_error (recv);
968     return GNUNET_SYSERR;
969   }
970
971   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
972
973   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
974   {
975     const struct GNUNET_MessageHeader *pmsg
976       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
977     psize = ntohs (pmsg->size);
978     ptype = ntohs (pmsg->type);
979     size_eq = size_min = 0;
980
981     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
982     {
983       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
984                   "Dropping message of type %u with invalid size %u.\n",
985                   ptype, psize);
986       recv_error (recv);
987       return GNUNET_SYSERR;
988     }
989
990     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991                 "Received message part of type %u and size %u from PSYC.\n",
992                 ptype, psize);
993     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
994
995     switch (ptype)
996     {
997     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
998       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
999       break;
1000     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1001       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
1002       break;
1003     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1004     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1005       size_min = sizeof (struct GNUNET_MessageHeader);
1006       break;
1007     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1008     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1009       size_eq = sizeof (struct GNUNET_MessageHeader);
1010       break;
1011     default:
1012       GNUNET_break_op (0);
1013       recv_error (recv);
1014       return GNUNET_SYSERR;
1015     }
1016
1017     if (! ((0 < size_eq && psize == size_eq)
1018            || (0 < size_min && size_min <= psize)))
1019     {
1020       GNUNET_break_op (0);
1021       recv_error (recv);
1022       return GNUNET_SYSERR;
1023     }
1024
1025     switch (ptype)
1026     {
1027     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1028     {
1029       struct GNUNET_PSYC_MessageMethod *meth
1030         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1031
1032       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1033       {
1034         LOG (GNUNET_ERROR_TYPE_WARNING,
1035              "Dropping out of order message method (%u).\n",
1036              recv->state);
1037         /* It is normal to receive an incomplete message right after connecting,
1038          * but should not happen later.
1039          * FIXME: add a check for this condition.
1040          */
1041         GNUNET_break_op (0);
1042         recv_error (recv);
1043         return GNUNET_SYSERR;
1044       }
1045
1046       if ('\0' != *((char *) meth + psize - 1))
1047       {
1048         LOG (GNUNET_ERROR_TYPE_WARNING,
1049              "Dropping message with malformed method. "
1050              "Message ID: %" PRIu64 "\n", recv->message_id);
1051         GNUNET_break_op (0);
1052         recv_error (recv);
1053         return GNUNET_SYSERR;
1054       }
1055       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1056       break;
1057     }
1058     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1059     {
1060       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1061             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1062             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1063       {
1064         LOG (GNUNET_ERROR_TYPE_WARNING,
1065              "Dropping out of order message modifier (%u).\n",
1066              recv->state);
1067         GNUNET_break_op (0);
1068         recv_error (recv);
1069         return GNUNET_SYSERR;
1070       }
1071
1072       struct GNUNET_PSYC_MessageModifier *mod
1073         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1074
1075       uint16_t name_size = ntohs (mod->name_size);
1076       recv->mod_value_size_expected = ntohl (mod->value_size);
1077       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1078
1079       if (psize < sizeof (*mod) + name_size
1080           || '\0' != *((char *) &mod[1] + name_size - 1)
1081           || recv->mod_value_size_expected < recv->mod_value_size)
1082       {
1083         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1084         GNUNET_break_op (0);
1085         recv_error (recv);
1086         return GNUNET_SYSERR;
1087       }
1088       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1089       break;
1090     }
1091     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1092     {
1093       recv->mod_value_size += psize - sizeof (*pmsg);
1094
1095       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1096             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1097           || recv->mod_value_size_expected < recv->mod_value_size)
1098       {
1099         LOG (GNUNET_ERROR_TYPE_WARNING,
1100              "Dropping out of order message modifier continuation "
1101              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1102              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1103              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1104              recv->mod_value_size_expected, recv->mod_value_size);
1105         GNUNET_break_op (0);
1106         recv_error (recv);
1107         return GNUNET_SYSERR;
1108       }
1109       break;
1110     }
1111     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1112     {
1113       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1114           || recv->mod_value_size_expected != recv->mod_value_size)
1115       {
1116         LOG (GNUNET_ERROR_TYPE_WARNING,
1117              "Dropping out of order message data fragment "
1118              "(%u < %u || %lu != %lu).\n",
1119              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1120              recv->mod_value_size_expected, recv->mod_value_size);
1121
1122         GNUNET_break_op (0);
1123         recv_error (recv);
1124         return GNUNET_SYSERR;
1125       }
1126       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1127       break;
1128     }
1129     }
1130
1131     if (NULL != recv->message_part_cb)
1132       recv->message_part_cb (recv->cb_cls, msg, pmsg);
1133
1134     switch (ptype)
1135     {
1136     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1137     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1138       GNUNET_PSYC_receive_reset (recv);
1139       break;
1140     }
1141   }
1142
1143   if (NULL != recv->message_cb)
1144     recv->message_cb (recv->cb_cls, msg);
1145   return GNUNET_OK;
1146 }
1147
1148
1149 /**
1150  * Check if @a data contains a series of valid message parts.
1151  *
1152  * @param      data_size    Size of @a data.
1153  * @param      data         Data.
1154  * @param[out] first_ptype  Type of first message part.
1155  * @param[out] last_ptype   Type of last message part.
1156  *
1157  * @return Number of message parts found in @a data.
1158  *         or GNUNET_SYSERR if the message contains invalid parts.
1159  */
1160 int
1161 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1162                                  uint16_t *first_ptype, uint16_t *last_ptype)
1163 {
1164   const struct GNUNET_MessageHeader *pmsg;
1165   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1166   if (NULL != first_ptype)
1167     *first_ptype = 0;
1168   if (NULL != last_ptype)
1169     *last_ptype = 0;
1170
1171   for (pos = 0; pos < data_size; pos += psize, parts++)
1172   {
1173     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1174     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1175     psize = ntohs (pmsg->size);
1176     ptype = ntohs (pmsg->type);
1177     if (0 == parts && NULL != first_ptype)
1178       *first_ptype = ptype;
1179     if (NULL != last_ptype
1180         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1181       *last_ptype = ptype;
1182     if (psize < sizeof (*pmsg)
1183         || pos + psize > data_size
1184         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1185         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1186     {
1187       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188                   "Invalid message part of type %u and size %u.\n",
1189                   ptype, psize);
1190       return GNUNET_SYSERR;
1191     }
1192     /** @todo FIXME: check message part order */
1193   }
1194   return parts;
1195 }
1196
1197
1198 struct ParseMessageClosure
1199 {
1200   struct GNUNET_PSYC_Environment *env;
1201   const char **method_name;
1202   const void **data;
1203   uint16_t *data_size;
1204   enum GNUNET_PSYC_MessageState msg_state;
1205 };
1206
1207
1208 static void
1209 parse_message_part_cb (void *cls,
1210                        const struct GNUNET_PSYC_MessageHeader *msg,
1211                        const struct GNUNET_MessageHeader *pmsg)
1212 {
1213   struct ParseMessageClosure *pmc = cls;
1214   if (NULL == pmsg)
1215   {
1216     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1217     return;
1218   }
1219
1220   switch (ntohs (pmsg->type))
1221   {
1222   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1223   {
1224     struct GNUNET_PSYC_MessageMethod *
1225       pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1226     *pmc->method_name = (const char *) &pmeth[1];
1227     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1228     break;
1229   }
1230
1231   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1232   {
1233     struct GNUNET_PSYC_MessageModifier *
1234       pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1235
1236     const char *name = (const char *) &pmod[1];
1237     const void *value = name + ntohs (pmod->name_size);
1238     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1239                          ntohl (pmod->value_size));
1240     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1241     break;
1242   }
1243
1244   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1245     *pmc->data = &pmsg[1];
1246     *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1247     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1248     break;
1249
1250   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1251     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1252     break;
1253
1254   default:
1255     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1256   }
1257 }
1258
1259
1260 /**
1261  * Parse PSYC message.
1262  *
1263  * @param msg
1264  *        The PSYC message to parse.
1265  * @param[out] method_name
1266  *        Pointer to the method name inside @a pmsg.
1267  * @param env
1268  *        The environment for the message with a list of modifiers.
1269  * @param[out] data
1270  *        Pointer to data inside @a msg.
1271  * @param[out] data_size
1272  *        Size of @data is written here.
1273  *
1274  * @return #GNUNET_OK on success,
1275  *         #GNUNET_SYSERR on parse error.
1276  */
1277 int
1278 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1279                            const char **method_name,
1280                            struct GNUNET_PSYC_Environment *env,
1281                            const void **data,
1282                            uint16_t *data_size)
1283 {
1284   struct ParseMessageClosure cls;
1285   cls.env = env;
1286   cls.method_name = method_name;
1287   cls.data = data;
1288   cls.data_size = data_size;
1289
1290   struct GNUNET_PSYC_ReceiveHandle *
1291     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1292   int ret = GNUNET_PSYC_receive_message (recv, msg);
1293   GNUNET_PSYC_receive_destroy (recv);
1294
1295   if (GNUNET_OK != ret)
1296     return GNUNET_SYSERR;
1297
1298   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1299     ? GNUNET_OK
1300     : GNUNET_NO;
1301 }
1302
1303
1304 /**
1305  * Initialize PSYC message header.
1306  */
1307 void
1308 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1309                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1310                                  uint32_t flags)
1311 {
1312   uint16_t size = ntohs (mmsg->header.size);
1313   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1314
1315   pmsg->header.size = htons (psize);
1316   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1317   pmsg->message_id = mmsg->message_id;
1318   pmsg->fragment_offset = mmsg->fragment_offset;
1319   pmsg->flags = htonl (flags);
1320
1321   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1322 }
1323
1324
1325 /**
1326  * Create a new PSYC message header from a multicast message.
1327  */
1328 struct GNUNET_PSYC_MessageHeader *
1329 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1330                                    uint32_t flags)
1331 {
1332   struct GNUNET_PSYC_MessageHeader *pmsg;
1333   uint16_t size = ntohs (mmsg->header.size);
1334   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1335
1336   pmsg = GNUNET_malloc (psize);
1337   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1338   return pmsg;
1339 }
1340
1341
1342 /**
1343  * Create a new PSYC message header from a PSYC message.
1344  */
1345 struct GNUNET_PSYC_MessageHeader *
1346 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1347 {
1348   uint16_t msg_size = ntohs (msg->header.size);
1349   struct GNUNET_PSYC_MessageHeader *
1350     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1351   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1352   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1353   GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1354   return pmsg;
1355 }