unset XDG environment variable during testcases, as they can screw things up badly
[oweals/gnunet.git] / src / psycutil / psyc_message.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 GNUnet e.V.
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycutil/psyc_message.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35
36
37 struct GNUNET_PSYC_TransmitHandle
38 {
39   /**
40    * Client connection to service.
41    */
42   struct GNUNET_MQ_Handle *mq;
43
44   /**
45    * Message currently being received from the client.
46    */
47   struct GNUNET_MessageHeader *msg;
48
49   /**
50    * Envelope for @a msg
51    */
52   struct GNUNET_MQ_Envelope *env;
53
54   /**
55    * Callback to request next modifier from client.
56    */
57   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
58
59   /**
60    * Closure for the notify callbacks.
61    */
62   void *notify_mod_cls;
63
64   /**
65    * Callback to request next data fragment from client.
66    */
67   GNUNET_PSYC_TransmitNotifyData notify_data;
68
69   /**
70    * Closure for the notify callbacks.
71    */
72   void *notify_data_cls;
73
74   /**
75    * Modifier of the environment that is currently being transmitted.
76    */
77   struct GNUNET_PSYC_Modifier *mod;
78
79   /**
80    *
81    */
82   const char *mod_value;
83
84   /**
85    * Number of bytes remaining to be transmitted from the current modifier value.
86    */
87   uint32_t mod_value_remaining;
88
89   /**
90    * State of the current message being received from client.
91    */
92   enum GNUNET_PSYC_MessageState state;
93
94   /**
95    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
96    */
97   uint8_t acks_pending;
98
99   /**
100    * Is transmission paused?
101    */
102   uint8_t paused;
103
104   /**
105    * Are we currently transmitting a message?
106    */
107   uint8_t in_transmit;
108
109   /**
110    * Notify callback is currently being called.
111    */
112   uint8_t in_notify;
113
114 };
115
116
117
118 struct GNUNET_PSYC_ReceiveHandle
119 {
120   /**
121    * Message callback.
122    */
123   GNUNET_PSYC_MessageCallback message_cb;
124
125   /**
126    * Message part callback.
127    */
128   GNUNET_PSYC_MessagePartCallback message_part_cb;
129
130   /**
131    * Closure for the callbacks.
132    */
133   void *cb_cls;
134
135   /**
136    * ID of the message being received from the PSYC service.
137    */
138   uint64_t message_id;
139
140   /**
141    * Public key of the slave from which a message is being received.
142    */
143   struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
144
145   /**
146    * State of the currently being received message from the PSYC service.
147    */
148   enum GNUNET_PSYC_MessageState state;
149
150   /**
151    * Flags for the currently being received message from the PSYC service.
152    */
153   enum GNUNET_PSYC_MessageFlags flags;
154
155   /**
156    * Expected value size for the modifier being received from the PSYC service.
157    */
158   uint32_t mod_value_size_expected;
159
160   /**
161    * Actual value size for the modifier being received from the PSYC service.
162    */
163   uint32_t mod_value_size;
164 };
165
166
167 /**** Messages ****/
168
169
170 /**
171  * Create a PSYC message.
172  *
173  * @param method_name
174  *        PSYC method for the message.
175  * @param env
176  *        Environment for the message.
177  * @param data
178  *        Data payload for the message.
179  * @param data_size
180  *        Size of @a data.
181  *
182  * @return Message header with size information,
183  *         followed by the message parts.
184  */
185 struct GNUNET_PSYC_Message *
186 GNUNET_PSYC_message_create (const char *method_name,
187                             const struct GNUNET_PSYC_Environment *env,
188                             const void *data,
189                             size_t data_size)
190 {
191   struct GNUNET_PSYC_Modifier *mod = NULL;
192   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
193   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
194   struct GNUNET_MessageHeader *pmsg = NULL;
195   uint16_t env_size = 0;
196   if (NULL != env)
197   {
198     mod = GNUNET_PSYC_env_head (env);
199     while (NULL != mod)
200     {
201       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
202       mod = mod->next;
203     }
204   }
205
206   struct GNUNET_PSYC_Message *msg;
207   uint16_t method_name_size = strlen (method_name) + 1;
208   if (method_name_size == 1)
209     return NULL;
210
211   uint16_t msg_size = sizeof (*msg)                      /* header */
212     + sizeof (*pmeth) + method_name_size                 /* method */
213     + env_size                                           /* modifiers */
214     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
215     + sizeof (*pmsg);                                    /* end of message */
216   msg = GNUNET_malloc (msg_size);
217   msg->header.size = htons (msg_size);
218   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
219
220   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
221   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
222   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
223   GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
224
225   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
226   if (NULL != env)
227   {
228     mod = GNUNET_PSYC_env_head (env);
229     while (NULL != mod)
230     {
231       uint16_t mod_name_size = strlen (mod->name) + 1;
232       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
233       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
234       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
235       p += pmod->header.size;
236       pmod->header.size = htons (pmod->header.size);
237
238       pmod->oper = mod->oper;
239       pmod->name_size = htons (mod_name_size);
240       pmod->value_size = htonl (mod->value_size);
241
242       GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
243       if (0 < mod->value_size)
244         GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
245
246       mod = mod->next;
247     }
248   }
249
250   if (0 < data_size)
251   {
252     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
253     pmsg->size = sizeof (*pmsg) + data_size;
254     p += pmsg->size;
255     pmsg->size = htons (pmsg->size);
256     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
257     GNUNET_memcpy (&pmsg[1], data, data_size);
258   }
259
260   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
261   pmsg->size = htons (sizeof (*pmsg));
262   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
263
264   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
265   return msg;
266 }
267
268
269 void
270 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
271                          const struct GNUNET_MessageHeader *msg)
272 {
273   uint16_t size = ntohs (msg->size);
274   uint16_t type = ntohs (msg->type);
275
276   GNUNET_log (kind,
277               "Message of type %d and size %u:\n",
278               type,
279               size);
280   switch (type)
281   {
282   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
283   {
284     const struct GNUNET_PSYC_MessageHeader *pmsg
285       = (const struct GNUNET_PSYC_MessageHeader *) msg;
286     GNUNET_log (kind,
287                 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
288                 GNUNET_ntohll (pmsg->message_id),
289                 ntohl (pmsg->flags));
290     break;
291   }
292   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
293   {
294     const struct GNUNET_PSYC_MessageMethod *meth
295       = (const struct GNUNET_PSYC_MessageMethod *) msg;
296     GNUNET_log (kind,
297                 "\t%.*s\n",
298                 (int) (size - sizeof (*meth)),
299                 (const char *) &meth[1]);
300     break;
301   }
302   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
303   {
304     const struct GNUNET_PSYC_MessageModifier *mod
305       = (const struct GNUNET_PSYC_MessageModifier *) msg;
306     uint16_t name_size = ntohs (mod->name_size);
307     char oper = ' ' < mod->oper ? mod->oper : ' ';
308     GNUNET_log (kind,
309                 "\t%c%.*s\t%.*s\n",
310                 oper,
311                 (int) name_size,
312                 (const char *) &mod[1],
313                 (int) (size - sizeof (*mod) - name_size),
314                 ((const char *) &mod[1]) + name_size);
315     break;
316   }
317   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
318   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
319     GNUNET_log (kind,
320                 "\t%.*s\n",
321                 (int) (size - sizeof (*msg)),
322                 (const char *) &msg[1]);
323     break;
324   }
325 }
326
327
328 /**** Transmitting messages ****/
329
330
331 /**
332  * Create a transmission handle.
333  */
334 struct GNUNET_PSYC_TransmitHandle *
335 GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
336 {
337   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
338
339   tmit->mq = mq;
340   return tmit;
341 }
342
343
344 /**
345  * Destroy a transmission handle.
346  */
347 void
348 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
349 {
350   GNUNET_free (tmit);
351 }
352
353
354 /**
355  * Queue a message part for transmission.
356  *
357  * The message part is added to the current message buffer.
358  * When this buffer is full, it is added to the transmission queue.
359  *
360  * @param tmit
361  *        Transmission handle.
362  * @param msg
363  *        Message part, or NULL.
364  * @param tmit_now
365  *        Transmit message now, or wait for buffer to fill up?
366  *        #GNUNET_YES or #GNUNET_NO.
367  */
368 static void
369 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
370                        const struct GNUNET_MessageHeader *msg,
371                        uint8_t tmit_now)
372 {
373   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
374
375   LOG (GNUNET_ERROR_TYPE_DEBUG,
376        "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
377        NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
378
379   if (NULL != tmit->msg)
380   {
381     if (NULL == msg
382         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
383     {
384       /* End of message or buffer is full, add it to transmission queue
385        * and start with empty buffer */
386       tmit->msg->size = htons (tmit->msg->size);
387       GNUNET_MQ_send (tmit->mq, tmit->env);
388       tmit->env = NULL;
389       tmit->msg = NULL;
390       tmit->acks_pending++;
391     }
392     else
393     {
394       /* Message fits in current buffer, append */
395       GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
396       tmit->msg->size += size;
397     }
398   }
399
400   if (NULL == tmit->msg && NULL != msg)
401   {
402     /* Empty buffer, copy over message. */
403     tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
404                                      GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
405                                      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
406     /* store current message size in host byte order
407      * then later switch it to network byte order before sending */
408     tmit->msg->size = sizeof (*tmit->msg) + size;
409
410     GNUNET_memcpy (&tmit->msg[1], msg, size);
411   }
412
413   if (NULL != tmit->msg
414       && (GNUNET_YES == tmit_now
415           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
416               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
417   {
418     /* End of message or buffer is full, add it to transmission queue. */
419     tmit->msg->size = htons (tmit->msg->size);
420     GNUNET_MQ_send (tmit->mq, tmit->env);
421     tmit->env = NULL;
422     tmit->msg = NULL;
423     tmit->acks_pending++;
424   }
425 }
426
427
428 /**
429  * Request data from client to transmit.
430  *
431  * @param tmit  Transmission handle.
432  */
433 static void
434 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
435 {
436   int notify_ret = GNUNET_YES;
437   uint16_t data_size = 0;
438   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
439   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
440   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
441
442   if (NULL != tmit->notify_data)
443   {
444     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
445     tmit->in_notify = GNUNET_YES;
446     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
447     tmit->in_notify = GNUNET_NO;
448   }
449   LOG (GNUNET_ERROR_TYPE_DEBUG,
450        "transmit_data (ret: %d, size: %u): %.*s\n",
451        notify_ret, data_size, data_size, &msg[1]);
452   switch (notify_ret)
453   {
454   case GNUNET_NO:
455     if (0 == data_size)
456     {
457       /* Transmission paused, nothing to send. */
458       tmit->paused = GNUNET_YES;
459       return;
460     }
461     break;
462
463   case GNUNET_YES:
464     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
465     break;
466
467   default:
468     LOG (GNUNET_ERROR_TYPE_ERROR,
469          "TransmitNotifyData callback returned error when requesting data.\n");
470
471     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
472     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
473     msg->size = htons (sizeof (*msg));
474     transmit_queue_insert (tmit, msg, GNUNET_YES);
475     tmit->in_transmit = GNUNET_NO;
476     return;
477   }
478
479   if (0 < data_size)
480   {
481     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
482     msg->size = htons (sizeof (*msg) + data_size);
483     transmit_queue_insert (tmit, msg, !notify_ret);
484   }
485
486   /* End of message. */
487   if (GNUNET_YES == notify_ret)
488   {
489     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
490     msg->size = htons (sizeof (*msg));
491     transmit_queue_insert (tmit, msg, GNUNET_YES);
492     /* FIXME: wait for ACK before setting in_transmit to no */
493     tmit->in_transmit = GNUNET_NO;
494   }
495 }
496
497
498 /**
499  * Request a modifier from a client to transmit.
500  *
501  * @param tmit  Transmission handle.
502  */
503 static void
504 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
505 {
506   uint16_t max_data_size = 0;
507   uint16_t data_size = 0;
508   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
509   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
510   int notify_ret = GNUNET_YES;
511
512   switch (tmit->state)
513   {
514   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
515   {
516     struct GNUNET_PSYC_MessageModifier *mod
517       = (struct GNUNET_PSYC_MessageModifier *) msg;
518     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
519     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
520
521     if (NULL != tmit->notify_mod)
522     {
523       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
524       data_size = max_data_size;
525       tmit->in_notify = GNUNET_YES;
526       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
527                                      &mod->oper, &mod->value_size);
528       tmit->in_notify = GNUNET_NO;
529     }
530
531     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
532     LOG (GNUNET_ERROR_TYPE_DEBUG,
533          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
534          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
535     if (mod->name_size < data_size)
536     {
537       tmit->mod_value_remaining
538         = mod->value_size - (data_size - mod->name_size);
539       mod->value_size = htonl (mod->value_size);
540       mod->name_size = htons (mod->name_size);
541     }
542     else if (0 < data_size)
543     {
544       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
545       notify_ret = GNUNET_SYSERR;
546     }
547     break;
548   }
549   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
550   {
551     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
552     msg->size = sizeof (struct GNUNET_MessageHeader);
553
554     if (NULL != tmit->notify_mod)
555     {
556       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
557       data_size = max_data_size;
558       tmit->in_notify = GNUNET_YES;
559       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
560                                      &data_size, &msg[1], NULL, NULL);
561       tmit->in_notify = GNUNET_NO;
562     }
563     tmit->mod_value_remaining -= data_size;
564     LOG (GNUNET_ERROR_TYPE_DEBUG,
565          "transmit_mod (ret: %d, size: %u): %.*s\n",
566          notify_ret, data_size, data_size, &msg[1]);
567     break;
568   }
569   default:
570     GNUNET_assert (0);
571   }
572
573   switch (notify_ret)
574   {
575   case GNUNET_NO:
576     if (0 == data_size)
577     { /* Transmission paused, nothing to send. */
578       tmit->paused = GNUNET_YES;
579       return;
580     }
581     tmit->state
582       = (0 == tmit->mod_value_remaining)
583       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
584       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
585     break;
586
587   case GNUNET_YES: /* End of modifiers. */
588     GNUNET_assert (0 == tmit->mod_value_remaining);
589     break;
590
591   default:
592     LOG (GNUNET_ERROR_TYPE_ERROR,
593          "TransmitNotifyModifier callback returned with error.\n");
594
595     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
596     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
597     msg->size = htons (sizeof (*msg));
598     transmit_queue_insert (tmit, msg, GNUNET_YES);
599     tmit->in_transmit = GNUNET_NO;
600     return;
601   }
602
603   if (0 < data_size)
604   {
605     GNUNET_assert (data_size <= max_data_size);
606     msg->size = htons (msg->size + data_size);
607     transmit_queue_insert (tmit, msg, GNUNET_NO);
608   }
609
610   if (GNUNET_YES == notify_ret)
611   {
612     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
613     if (0 == tmit->acks_pending)
614       transmit_data (tmit);
615   }
616   else
617   {
618     transmit_mod (tmit);
619   }
620 }
621
622
623 int
624 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
625                      uint32_t *full_value_size)
626
627 {
628   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
629   uint16_t name_size = 0;
630   uint32_t value_size = 0;
631   const char *value = NULL;
632
633   if (NULL != oper)
634   { /* New modifier */
635     if (NULL != tmit->mod)
636       tmit->mod = tmit->mod->next;
637     if (NULL == tmit->mod)
638     { /* No more modifiers, continue with data */
639       *data_size = 0;
640       return GNUNET_YES;
641     }
642
643     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
644     *full_value_size = tmit->mod->value_size;
645     *oper = tmit->mod->oper;
646     name_size = strlen (tmit->mod->name) + 1;
647
648     if (name_size + tmit->mod->value_size <= *data_size)
649     {
650       value_size = tmit->mod->value_size;
651       *data_size = name_size + value_size;
652     }
653     else /* full modifier does not fit in data, continuation needed */
654     {
655       value_size = *data_size - name_size;
656       tmit->mod_value = tmit->mod->value + value_size;
657     }
658
659     GNUNET_memcpy (data, tmit->mod->name, name_size);
660     GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
661     return GNUNET_NO;
662   }
663   else
664   { /* Modifier continuation */
665     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
666     value = tmit->mod_value;
667     if (tmit->mod_value_remaining <= *data_size)
668     {
669       value_size = tmit->mod_value_remaining;
670       tmit->mod_value = NULL;
671     }
672     else
673     {
674       value_size = *data_size;
675       tmit->mod_value += value_size;
676     }
677
678     if (*data_size < value_size)
679     {
680       LOG (GNUNET_ERROR_TYPE_DEBUG,
681            "Value in environment larger than buffer: %u < %zu\n",
682            *data_size, value_size);
683       *data_size = 0;
684       return GNUNET_NO;
685     }
686
687     *data_size = value_size;
688     GNUNET_memcpy (data, value, value_size);
689     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
690   }
691 }
692
693
694 /**
695  * Transmit a message.
696  *
697  * @param tmit
698  *        Transmission handle.
699  * @param method_name
700  *        Which method should be invoked.
701  * @param env
702  *        Environment for the message.
703  *        Should stay available until the first call to notify_data.
704  *        Can be NULL if there are no modifiers or @a notify_mod is
705  *        provided instead.
706  * @param notify_mod
707  *        Function to call to obtain modifiers.
708  *        Can be NULL if there are no modifiers or @a env is provided instead.
709  * @param notify_data
710  *        Function to call to obtain fragments of the data.
711  * @param notify_cls
712  *        Closure for @a notify_mod and @a notify_data.
713  * @param flags
714  *        Flags for the message being transmitted.
715  *
716  * @return #GNUNET_OK if the transmission was started.
717  *         #GNUNET_SYSERR if another transmission is already going on.
718  */
719 int
720 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
721                               const char *method_name,
722                               const struct GNUNET_PSYC_Environment *env,
723                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
724                               GNUNET_PSYC_TransmitNotifyData notify_data,
725                               void *notify_cls,
726                               uint32_t flags)
727 {
728   if (GNUNET_NO != tmit->in_transmit)
729     return GNUNET_SYSERR;
730   tmit->in_transmit = GNUNET_YES;
731
732   size_t size = strlen (method_name) + 1;
733   struct GNUNET_PSYC_MessageMethod *pmeth;
734
735   tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
736                                    GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
737                                    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
738   /* store current message size in host byte order
739    * then later switch it to network byte order before sending */
740   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
741
742   if (NULL != notify_mod)
743   {
744     tmit->notify_mod = notify_mod;
745     tmit->notify_mod_cls = notify_cls;
746   }
747   else
748   {
749     tmit->notify_mod = &transmit_notify_env;
750     tmit->notify_mod_cls = tmit;
751     if (NULL != env)
752     {
753       struct GNUNET_PSYC_Modifier mod = {};
754       mod.next = GNUNET_PSYC_env_head (env);
755       tmit->mod = &mod;
756
757       struct GNUNET_PSYC_Modifier *m = tmit->mod;
758       while (NULL != (m = m->next))
759       {
760         if (m->oper != GNUNET_PSYC_OP_SET)
761           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
762       }
763     }
764     else
765     {
766       tmit->mod = NULL;
767     }
768   }
769
770   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
771   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
772   pmeth->header.size = htons (sizeof (*pmeth) + size);
773   pmeth->flags = htonl (flags);
774   GNUNET_memcpy (&pmeth[1], method_name, size);
775
776   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
777   tmit->notify_data = notify_data;
778   tmit->notify_data_cls = notify_cls;
779
780   transmit_mod (tmit);
781   return GNUNET_OK;
782 }
783
784
785 /**
786  * Resume transmission.
787  *
788  * @param tmit  Transmission handle.
789  */
790 void
791 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
792 {
793   if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
794     return;
795
796   if (0 == tmit->acks_pending)
797   {
798     tmit->paused = GNUNET_NO;
799     transmit_data (tmit);
800   }
801 }
802
803
804 /**
805  * Abort transmission request.
806  *
807  * @param tmit  Transmission handle.
808  */
809 void
810 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
811 {
812   if (GNUNET_NO == tmit->in_transmit)
813     return;
814
815   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
816   tmit->in_transmit = GNUNET_NO;
817   tmit->paused = GNUNET_NO;
818
819   /* FIXME */
820   struct GNUNET_MessageHeader msg;
821   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
822   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
823   msg.size = htons (sizeof (msg));
824   transmit_queue_insert (tmit, &msg, GNUNET_YES);
825 }
826
827
828 /**
829  * Got acknowledgement of a transmitted message part, continue transmission.
830  *
831  * @param tmit  Transmission handle.
832  */
833 void
834 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
835 {
836   if (0 == tmit->acks_pending)
837   {
838     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
839     GNUNET_break (0);
840     return;
841   }
842   tmit->acks_pending--;
843
844   if (GNUNET_YES == tmit->paused)
845     return;
846
847   switch (tmit->state)
848   {
849   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
850   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
851     transmit_mod (tmit);
852     break;
853
854   case GNUNET_PSYC_MESSAGE_STATE_DATA:
855     transmit_data (tmit);
856     break;
857
858   case GNUNET_PSYC_MESSAGE_STATE_END:
859   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
860     break;
861
862   default:
863     LOG (GNUNET_ERROR_TYPE_DEBUG,
864          "Ignoring message ACK in state %u.\n", tmit->state);
865   }
866 }
867
868
869 /**** Receiving messages ****/
870
871
872 /**
873  * Create handle for receiving messages.
874  */
875 struct GNUNET_PSYC_ReceiveHandle *
876 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
877                             GNUNET_PSYC_MessagePartCallback message_part_cb,
878                             void *cb_cls)
879 {
880   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
881   recv->message_cb = message_cb;
882   recv->message_part_cb = message_part_cb;
883   recv->cb_cls = cb_cls;
884   return recv;
885 }
886
887
888 /**
889  * Destroy handle for receiving messages.
890  */
891 void
892 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
893 {
894   GNUNET_free (recv);
895 }
896
897
898 /**
899  * Reset stored data related to the last received message.
900  */
901 void
902 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
903 {
904   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
905   recv->flags = 0;
906   recv->message_id = 0;
907   recv->mod_value_size = 0;
908   recv->mod_value_size_expected = 0;
909 }
910
911
912 static void
913 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
914 {
915   if (NULL != recv->message_part_cb)
916     recv->message_part_cb (recv->cb_cls, NULL, NULL);
917
918   if (NULL != recv->message_cb)
919     recv->message_cb (recv->cb_cls, NULL);
920
921   GNUNET_PSYC_receive_reset (recv);
922 }
923
924
925 /**
926  * Handle incoming PSYC message.
927  *
928  * @param recv  Receive handle.
929  * @param msg   The message.
930  *
931  * @return #GNUNET_OK on success,
932  *         #GNUNET_SYSERR on receive error.
933  */
934 int
935 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
936                              const struct GNUNET_PSYC_MessageHeader *msg)
937 {
938   uint16_t size = ntohs (msg->header.size);
939   uint32_t flags = ntohl (msg->flags);
940
941   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
942                            (struct GNUNET_MessageHeader *) msg);
943
944   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
945   {
946     recv->message_id = GNUNET_ntohll (msg->message_id);
947     recv->flags = flags;
948     recv->slave_pub_key = msg->slave_pub_key;
949     recv->mod_value_size = 0;
950     recv->mod_value_size_expected = 0;
951   }
952   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
953   {
954     // FIXME
955     LOG (GNUNET_ERROR_TYPE_WARNING,
956          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
957          GNUNET_ntohll (msg->message_id), recv->message_id);
958     GNUNET_break_op (0);
959     recv_error (recv);
960     return GNUNET_SYSERR;
961   }
962   else if (flags != recv->flags)
963   {
964     LOG (GNUNET_ERROR_TYPE_WARNING,
965          "Unexpected message flags. Got: %lu, expected: %lu\n",
966          flags, recv->flags);
967     GNUNET_break_op (0);
968     recv_error (recv);
969     return GNUNET_SYSERR;
970   }
971
972   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
973
974   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
975   {
976     const struct GNUNET_MessageHeader *pmsg
977       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
978     psize = ntohs (pmsg->size);
979     ptype = ntohs (pmsg->type);
980     size_eq = size_min = 0;
981
982     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
983     {
984       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
985                   "Dropping message of type %u with invalid size %u.\n",
986                   ptype, psize);
987       recv_error (recv);
988       return GNUNET_SYSERR;
989     }
990
991     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992                 "Received message part of type %u and size %u from PSYC.\n",
993                 ptype, psize);
994     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
995
996     switch (ptype)
997     {
998     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
999       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
1000       break;
1001     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1002       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
1003       break;
1004     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1005     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1006       size_min = sizeof (struct GNUNET_MessageHeader);
1007       break;
1008     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1009     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1010       size_eq = sizeof (struct GNUNET_MessageHeader);
1011       break;
1012     default:
1013       GNUNET_break_op (0);
1014       recv_error (recv);
1015       return GNUNET_SYSERR;
1016     }
1017
1018     if (! ((0 < size_eq && psize == size_eq)
1019            || (0 < size_min && size_min <= psize)))
1020     {
1021       GNUNET_break_op (0);
1022       recv_error (recv);
1023       return GNUNET_SYSERR;
1024     }
1025
1026     switch (ptype)
1027     {
1028     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1029     {
1030       struct GNUNET_PSYC_MessageMethod *meth
1031         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1032
1033       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1034       {
1035         LOG (GNUNET_ERROR_TYPE_WARNING,
1036              "Dropping out of order message method (%u).\n",
1037              recv->state);
1038         /* It is normal to receive an incomplete message right after connecting,
1039          * but should not happen later.
1040          * FIXME: add a check for this condition.
1041          */
1042         GNUNET_break_op (0);
1043         recv_error (recv);
1044         return GNUNET_SYSERR;
1045       }
1046
1047       if ('\0' != *((char *) meth + psize - 1))
1048       {
1049         LOG (GNUNET_ERROR_TYPE_WARNING,
1050              "Dropping message with malformed method. "
1051              "Message ID: %" PRIu64 "\n", recv->message_id);
1052         GNUNET_break_op (0);
1053         recv_error (recv);
1054         return GNUNET_SYSERR;
1055       }
1056       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1057       break;
1058     }
1059     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1060     {
1061       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1062             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1063             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1064       {
1065         LOG (GNUNET_ERROR_TYPE_WARNING,
1066              "Dropping out of order message modifier (%u).\n",
1067              recv->state);
1068         GNUNET_break_op (0);
1069         recv_error (recv);
1070         return GNUNET_SYSERR;
1071       }
1072
1073       struct GNUNET_PSYC_MessageModifier *mod
1074         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1075
1076       uint16_t name_size = ntohs (mod->name_size);
1077       recv->mod_value_size_expected = ntohl (mod->value_size);
1078       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1079
1080       if (psize < sizeof (*mod) + name_size
1081           || '\0' != *((char *) &mod[1] + name_size - 1)
1082           || recv->mod_value_size_expected < recv->mod_value_size)
1083       {
1084         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1085         GNUNET_break_op (0);
1086         recv_error (recv);
1087         return GNUNET_SYSERR;
1088       }
1089       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1090       break;
1091     }
1092     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1093     {
1094       recv->mod_value_size += psize - sizeof (*pmsg);
1095
1096       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1097             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1098           || recv->mod_value_size_expected < recv->mod_value_size)
1099       {
1100         LOG (GNUNET_ERROR_TYPE_WARNING,
1101              "Dropping out of order message modifier continuation "
1102              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1103              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1104              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1105              recv->mod_value_size_expected, recv->mod_value_size);
1106         GNUNET_break_op (0);
1107         recv_error (recv);
1108         return GNUNET_SYSERR;
1109       }
1110       break;
1111     }
1112     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1113     {
1114       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1115           || recv->mod_value_size_expected != recv->mod_value_size)
1116       {
1117         LOG (GNUNET_ERROR_TYPE_WARNING,
1118              "Dropping out of order message data fragment "
1119              "(%u < %u || %lu != %lu).\n",
1120              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1121              recv->mod_value_size_expected, recv->mod_value_size);
1122
1123         GNUNET_break_op (0);
1124         recv_error (recv);
1125         return GNUNET_SYSERR;
1126       }
1127       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1128       break;
1129     }
1130     }
1131
1132     if (NULL != recv->message_part_cb)
1133       recv->message_part_cb (recv->cb_cls, msg, pmsg);
1134
1135     switch (ptype)
1136     {
1137     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1138     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1139       GNUNET_PSYC_receive_reset (recv);
1140       break;
1141     }
1142   }
1143
1144   if (NULL != recv->message_cb)
1145     recv->message_cb (recv->cb_cls, msg);
1146   return GNUNET_OK;
1147 }
1148
1149
1150 /**
1151  * Check if @a data contains a series of valid message parts.
1152  *
1153  * @param      data_size    Size of @a data.
1154  * @param      data         Data.
1155  * @param[out] first_ptype  Type of first message part.
1156  * @param[out] last_ptype   Type of last message part.
1157  *
1158  * @return Number of message parts found in @a data.
1159  *         or GNUNET_SYSERR if the message contains invalid parts.
1160  */
1161 int
1162 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1163                                  uint16_t *first_ptype, uint16_t *last_ptype)
1164 {
1165   const struct GNUNET_MessageHeader *pmsg;
1166   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1167   if (NULL != first_ptype)
1168     *first_ptype = 0;
1169   if (NULL != last_ptype)
1170     *last_ptype = 0;
1171
1172   for (pos = 0; pos < data_size; pos += psize, parts++)
1173   {
1174     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1175     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1176     psize = ntohs (pmsg->size);
1177     ptype = ntohs (pmsg->type);
1178     if (0 == parts && NULL != first_ptype)
1179       *first_ptype = ptype;
1180     if (NULL != last_ptype
1181         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1182       *last_ptype = ptype;
1183     if (psize < sizeof (*pmsg)
1184         || pos + psize > data_size
1185         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1186         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1187     {
1188       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1189                   "Invalid message part of type %u and size %u.\n",
1190                   ptype, psize);
1191       return GNUNET_SYSERR;
1192     }
1193     /** @todo FIXME: check message part order */
1194   }
1195   return parts;
1196 }
1197
1198
1199 struct ParseMessageClosure
1200 {
1201   struct GNUNET_PSYC_Environment *env;
1202   const char **method_name;
1203   const void **data;
1204   uint16_t *data_size;
1205   enum GNUNET_PSYC_MessageState msg_state;
1206 };
1207
1208
1209 static void
1210 parse_message_part_cb (void *cls,
1211                        const struct GNUNET_PSYC_MessageHeader *msg,
1212                        const struct GNUNET_MessageHeader *pmsg)
1213 {
1214   struct ParseMessageClosure *pmc = cls;
1215   if (NULL == pmsg)
1216   {
1217     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1218     return;
1219   }
1220
1221   switch (ntohs (pmsg->type))
1222   {
1223   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1224   {
1225     struct GNUNET_PSYC_MessageMethod *
1226       pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1227     *pmc->method_name = (const char *) &pmeth[1];
1228     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1229     break;
1230   }
1231
1232   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1233   {
1234     struct GNUNET_PSYC_MessageModifier *
1235       pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1236
1237     const char *name = (const char *) &pmod[1];
1238     const void *value = name + ntohs (pmod->name_size);
1239     GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1240                          ntohl (pmod->value_size));
1241     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1242     break;
1243   }
1244
1245   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1246     *pmc->data = &pmsg[1];
1247     *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1248     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1249     break;
1250
1251   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1252     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1253     break;
1254
1255   default:
1256     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1257   }
1258 }
1259
1260
1261 /**
1262  * Parse PSYC message.
1263  *
1264  * @param msg
1265  *        The PSYC message to parse.
1266  * @param[out] method_name
1267  *        Pointer to the method name inside @a pmsg.
1268  * @param env
1269  *        The environment for the message with a list of modifiers.
1270  * @param[out] data
1271  *        Pointer to data inside @a msg.
1272  * @param[out] data_size
1273  *        Size of @data is written here.
1274  *
1275  * @return #GNUNET_OK on success,
1276  *         #GNUNET_SYSERR on parse error.
1277  */
1278 int
1279 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1280                            const char **method_name,
1281                            struct GNUNET_PSYC_Environment *env,
1282                            const void **data,
1283                            uint16_t *data_size)
1284 {
1285   struct ParseMessageClosure cls;
1286   cls.env = env;
1287   cls.method_name = method_name;
1288   cls.data = data;
1289   cls.data_size = data_size;
1290
1291   struct GNUNET_PSYC_ReceiveHandle *
1292     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1293   int ret = GNUNET_PSYC_receive_message (recv, msg);
1294   GNUNET_PSYC_receive_destroy (recv);
1295
1296   if (GNUNET_OK != ret)
1297     return GNUNET_SYSERR;
1298
1299   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1300     ? GNUNET_OK
1301     : GNUNET_NO;
1302 }
1303
1304
1305 /**
1306  * Initialize PSYC message header.
1307  */
1308 void
1309 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1310                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1311                                  uint32_t flags)
1312 {
1313   uint16_t size = ntohs (mmsg->header.size);
1314   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1315
1316   pmsg->header.size = htons (psize);
1317   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1318   pmsg->message_id = mmsg->message_id;
1319   pmsg->fragment_offset = mmsg->fragment_offset;
1320   pmsg->flags = htonl (flags);
1321
1322   GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1323 }
1324
1325
1326 /**
1327  * Create a new PSYC message header from a multicast message.
1328  */
1329 struct GNUNET_PSYC_MessageHeader *
1330 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1331                                    uint32_t flags)
1332 {
1333   struct GNUNET_PSYC_MessageHeader *pmsg;
1334   uint16_t size = ntohs (mmsg->header.size);
1335   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1336
1337   pmsg = GNUNET_malloc (psize);
1338   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1339   return pmsg;
1340 }
1341
1342
1343 /**
1344  * Create a new PSYC message header from a PSYC message.
1345  */
1346 struct GNUNET_PSYC_MessageHeader *
1347 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1348 {
1349   uint16_t msg_size = ntohs (msg->header.size);
1350   struct GNUNET_PSYC_MessageHeader *
1351     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1352   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1353   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1354   GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1355   return pmsg;
1356 }