psycstore: add fragment_limit arg for message_get
[oweals/gnunet.git] / src / psycstore / psyc_util_lib.c
1 /*
2  * This file is part of GNUnet
3  * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4  *
5  * GNUnet is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your
8  * option) any later version.
9  *
10  * GNUnet is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with GNUnet; see the file COPYING.  If not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file psycstore/psyc_util_lib.c
23  * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24  * @author Gabor X Toth
25  */
26
27 #include <inttypes.h>
28
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_env_lib.h"
32 #include "gnunet_psyc_service.h"
33 #include "gnunet_psyc_util_lib.h"
34
35 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
36
37
38 struct GNUNET_PSYC_TransmitHandle
39 {
40   /**
41    * Client connection to service.
42    */
43   struct GNUNET_CLIENT_MANAGER_Connection *client;
44
45   /**
46    * Message currently being received from the client.
47    */
48   struct GNUNET_MessageHeader *msg;
49
50   /**
51    * Callback to request next modifier from client.
52    */
53   GNUNET_PSYC_TransmitNotifyModifier notify_mod;
54
55   /**
56    * Closure for the notify callbacks.
57    */
58   void *notify_mod_cls;
59
60   /**
61    * Callback to request next data fragment from client.
62    */
63   GNUNET_PSYC_TransmitNotifyData notify_data;
64
65   /**
66    * Closure for the notify callbacks.
67    */
68   void *notify_data_cls;
69
70   /**
71    * Modifier of the environment that is currently being transmitted.
72    */
73   struct GNUNET_ENV_Modifier *mod;
74
75   /**
76    *
77    */
78   const char *mod_value;
79
80   /**
81    * Number of bytes remaining to be transmitted from the current modifier value.
82    */
83   uint32_t mod_value_remaining;
84
85   /**
86    * State of the current message being received from client.
87    */
88   enum GNUNET_PSYC_MessageState state;
89
90   /**
91    * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
92    */
93   uint8_t acks_pending;
94
95   /**
96    * Is transmission paused?
97    */
98   uint8_t paused;
99
100   /**
101    * Are we currently transmitting a message?
102    */
103   uint8_t in_transmit;
104 };
105
106
107
108 struct GNUNET_PSYC_ReceiveHandle
109 {
110   /**
111    * Message callback.
112    */
113   GNUNET_PSYC_MessageCallback message_cb;
114
115   /**
116    * Message part callback.
117    */
118   GNUNET_PSYC_MessagePartCallback message_part_cb;
119
120   /**
121    * Closure for the callbacks.
122    */
123   void *cb_cls;
124
125   /**
126    * ID of the message being received from the PSYC service.
127    */
128   uint64_t message_id;
129
130   /**
131    * Public key of the slave from which a message is being received.
132    */
133   struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
134
135   /**
136    * State of the currently being received message from the PSYC service.
137    */
138   enum GNUNET_PSYC_MessageState state;
139
140   /**
141    * Flags for the currently being received message from the PSYC service.
142    */
143   enum GNUNET_PSYC_MessageFlags flags;
144
145   /**
146    * Expected value size for the modifier being received from the PSYC service.
147    */
148   uint32_t mod_value_size_expected;
149
150   /**
151    * Actual value size for the modifier being received from the PSYC service.
152    */
153   uint32_t mod_value_size;
154 };
155
156
157 /**** Messages ****/
158
159
160 /**
161  * Create a PSYC message.
162  *
163  * @param method_name
164  *        PSYC method for the message.
165  * @param env
166  *        Environment for the message.
167  * @param data
168  *        Data payload for the message.
169  * @param data_size
170  *        Size of @a data.
171  *
172  * @return Message header with size information,
173  *         followed by the message parts.
174  */
175 struct GNUNET_PSYC_Message *
176 GNUNET_PSYC_message_create (const char *method_name,
177                             const struct GNUNET_ENV_Environment *env,
178                             const void *data,
179                             size_t data_size)
180 {
181   struct GNUNET_ENV_Modifier *mod = NULL;
182   struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
183   struct GNUNET_PSYC_MessageModifier *pmod = NULL;
184   struct GNUNET_MessageHeader *pmsg = NULL;
185   uint16_t env_size = 0;
186   if (NULL != env)
187   {
188     mod = GNUNET_ENV_environment_head (env);
189     while (NULL != mod)
190     {
191       env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
192       mod = mod->next;
193     }
194   }
195
196   struct GNUNET_PSYC_Message *msg;
197   uint16_t method_name_size = strlen (method_name) + 1;
198   if (method_name_size == 1)
199     return NULL;
200
201   uint16_t msg_size = sizeof (*msg)                      /* header */
202     + sizeof (*pmeth) + method_name_size                 /* method */
203     + env_size                                           /* modifiers */
204     + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
205     + sizeof (*pmsg);                                    /* end of message */
206   msg = GNUNET_malloc (msg_size);
207   msg->header.size = htons (msg_size);
208   msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
209
210   pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
211   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
212   pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
213   memcpy (&pmeth[1], method_name, method_name_size);
214
215   uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
216   if (NULL != env)
217   {
218     mod = GNUNET_ENV_environment_head (env);
219     while (NULL != mod)
220     {
221       uint16_t mod_name_size = strlen (mod->name) + 1;
222       pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
223       pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
224       pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
225       p += pmod->header.size;
226       pmod->header.size = htons (pmod->header.size);
227
228       pmod->oper = mod->oper;
229       pmod->name_size = htons (mod_name_size);
230       pmod->value_size = htonl (mod->value_size);
231
232       memcpy (&pmod[1], mod->name, mod_name_size);
233       if (0 < mod->value_size)
234         memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
235
236       mod = mod->next;
237     }
238   }
239
240   if (0 < data_size)
241   {
242     pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
243     pmsg->size = sizeof (*pmsg) + data_size;
244     p += pmsg->size;
245     pmsg->size = htons (pmsg->size);
246     pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
247     memcpy (&pmsg[1], data, data_size);
248   }
249
250   pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
251   pmsg->size = htons (sizeof (*pmsg));
252   pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
253
254   GNUNET_assert (p + sizeof (*pmsg) == msg_size);
255   return msg;
256 }
257
258
259 void
260 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
261                          const struct GNUNET_MessageHeader *msg)
262 {
263   uint16_t size = ntohs (msg->size);
264   uint16_t type = ntohs (msg->type);
265   GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
266   switch (type)
267   {
268   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
269   {
270     struct GNUNET_PSYC_MessageHeader *pmsg
271       = (struct GNUNET_PSYC_MessageHeader *) msg;
272     GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
273                 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
274     break;
275   }
276   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
277   {
278     struct GNUNET_PSYC_MessageMethod *meth
279       = (struct GNUNET_PSYC_MessageMethod *) msg;
280     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
281     break;
282   }
283   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
284   {
285     struct GNUNET_PSYC_MessageModifier *mod
286       = (struct GNUNET_PSYC_MessageModifier *) msg;
287     uint16_t name_size = ntohs (mod->name_size);
288     char oper = ' ' < mod->oper ? mod->oper : ' ';
289     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
290                 size - sizeof (*mod) - name_size,
291                 ((char *) &mod[1]) + name_size);
292     break;
293   }
294   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
295   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
296     GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
297     break;
298   }
299 }
300
301
302 /**** Transmitting messages ****/
303
304
305 /**
306  * Create a transmission handle.
307  */
308 struct GNUNET_PSYC_TransmitHandle *
309 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
310 {
311   struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
312   tmit->client = client;
313   return tmit;
314 }
315
316
317 /**
318  * Destroy a transmission handle.
319  */
320 void
321 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
322 {
323   GNUNET_free (tmit);
324 }
325
326
327 /**
328  * Queue a message part for transmission.
329  *
330  * The message part is added to the current message buffer.
331  * When this buffer is full, it is added to the transmission queue.
332  *
333  * @param tmit
334  *        Transmission handle.
335  * @param msg
336  *        Message part, or NULL.
337  * @param end
338  *        End of message?
339  *        #GNUNET_YES or #GNUNET_NO.
340  */
341 static void
342 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
343                        const struct GNUNET_MessageHeader *msg,
344                        uint8_t end)
345 {
346   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
347
348   LOG (GNUNET_ERROR_TYPE_DEBUG,
349        "Queueing message part of type %u and size %u (end: %u)).\n",
350        NULL != msg ? ntohs (msg->type) : 0, size, end);
351
352   if (NULL != tmit->msg)
353   {
354     if (NULL == msg
355         || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
356     {
357       /* End of message or buffer is full, add it to transmission queue
358        * and start with empty buffer */
359       tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
360       tmit->msg->size = htons (tmit->msg->size);
361       GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
362       tmit->msg = NULL;
363       tmit->acks_pending++;
364     }
365     else
366     {
367       /* Message fits in current buffer, append */
368       tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
369       memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
370       tmit->msg->size += size;
371     }
372   }
373
374   if (NULL == tmit->msg && NULL != msg)
375   {
376     /* Empty buffer, copy over message. */
377     tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
378     tmit->msg->size = sizeof (*tmit->msg) + size;
379     memcpy (&tmit->msg[1], msg, size);
380   }
381
382   if (NULL != tmit->msg
383       && (GNUNET_YES == end
384           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
385               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
386   {
387     /* End of message or buffer is full, add it to transmission queue. */
388     tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
389     tmit->msg->size = htons (tmit->msg->size);
390     GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
391     tmit->msg = NULL;
392     tmit->acks_pending++;
393   }
394
395   if (GNUNET_YES == end)
396     tmit->in_transmit = GNUNET_NO;
397 }
398
399
400 /**
401  * Request data from client to transmit.
402  *
403  * @param tmit  Transmission handle.
404  */
405 static void
406 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
407 {
408   int notify_ret = GNUNET_YES;
409   uint16_t data_size = 0;
410   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
411   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
412   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
413
414   if (NULL != tmit->notify_data)
415   {
416     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
417     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
418   }
419   LOG (GNUNET_ERROR_TYPE_DEBUG,
420        "transmit_data (ret: %d, size: %u): %.*s\n",
421        notify_ret, data_size, data_size, &msg[1]);
422   switch (notify_ret)
423   {
424   case GNUNET_NO:
425     if (0 == data_size)
426     {
427       /* Transmission paused, nothing to send. */
428       tmit->paused = GNUNET_YES;
429       return;
430     }
431     break;
432
433   case GNUNET_YES:
434     tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
435     break;
436
437   default:
438     LOG (GNUNET_ERROR_TYPE_ERROR,
439          "TransmitNotifyData callback returned error when requesting data.\n");
440
441     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
442     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443     msg->size = htons (sizeof (*msg));
444     transmit_queue_insert (tmit, msg, GNUNET_YES);
445     return;
446   }
447
448   if (0 < data_size)
449   {
450     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
451     msg->size = htons (sizeof (*msg) + data_size);
452     transmit_queue_insert (tmit, msg, !notify_ret);
453   }
454
455   /* End of message. */
456   if (GNUNET_YES == notify_ret)
457   {
458     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
459     msg->size = htons (sizeof (*msg));
460     transmit_queue_insert (tmit, msg, GNUNET_YES);
461   }
462 }
463
464
465 /**
466  * Request a modifier from a client to transmit.
467  *
468  * @param tmit  Transmission handle.
469  */
470 static void
471 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
472 {
473   uint16_t max_data_size = 0;
474   uint16_t data_size = 0;
475   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
476   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
477   int notify_ret = GNUNET_YES;
478
479   switch (tmit->state)
480   {
481   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
482   {
483     struct GNUNET_PSYC_MessageModifier *mod
484       = (struct GNUNET_PSYC_MessageModifier *) msg;
485     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
486     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
487
488     if (NULL != tmit->notify_mod)
489     {
490       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
491       data_size = max_data_size;
492       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
493                                      &mod->oper, &mod->value_size);
494     }
495
496     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
497     LOG (GNUNET_ERROR_TYPE_DEBUG,
498          "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
499          notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
500     if (mod->name_size < data_size)
501     {
502       tmit->mod_value_remaining
503         = mod->value_size - (data_size - mod->name_size);
504       mod->value_size = htonl (mod->value_size);
505       mod->name_size = htons (mod->name_size);
506     }
507     else if (0 < data_size)
508     {
509       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
510       notify_ret = GNUNET_SYSERR;
511     }
512     break;
513   }
514   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
515   {
516     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
517     msg->size = sizeof (struct GNUNET_MessageHeader);
518
519     if (NULL != tmit->notify_mod)
520     {
521       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
522       data_size = max_data_size;
523       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
524                                      &data_size, &msg[1], NULL, NULL);
525     }
526     tmit->mod_value_remaining -= data_size;
527     LOG (GNUNET_ERROR_TYPE_DEBUG,
528          "transmit_mod (ret: %d, size: %u): %.*s\n",
529          notify_ret, data_size, data_size, &msg[1]);
530     break;
531   }
532   default:
533     GNUNET_assert (0);
534   }
535
536   switch (notify_ret)
537   {
538   case GNUNET_NO:
539     if (0 == data_size)
540     { /* Transmission paused, nothing to send. */
541       tmit->paused = GNUNET_YES;
542       return;
543     }
544     tmit->state
545       = (0 == tmit->mod_value_remaining)
546       ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
547       : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
548     break;
549
550   case GNUNET_YES: /* End of modifiers. */
551     GNUNET_assert (0 == tmit->mod_value_remaining);
552     break;
553
554   default:
555     LOG (GNUNET_ERROR_TYPE_ERROR,
556          "TransmitNotifyModifier callback returned with error.\n");
557
558     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
559     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
560     msg->size = htons (sizeof (*msg));
561
562     transmit_queue_insert (tmit, msg, GNUNET_YES);
563     return;
564   }
565
566   if (0 < data_size)
567   {
568     GNUNET_assert (data_size <= max_data_size);
569     msg->size = htons (msg->size + data_size);
570     transmit_queue_insert (tmit, msg, GNUNET_NO);
571   }
572
573   if (GNUNET_YES == notify_ret)
574   {
575     tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
576     if (0 == tmit->acks_pending)
577       transmit_data (tmit);
578   }
579   else
580   {
581     transmit_mod (tmit);
582   }
583 }
584
585
586 int
587 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
588                      uint32_t *full_value_size)
589
590 {
591   struct GNUNET_PSYC_TransmitHandle *tmit = cls;
592   uint16_t name_size = 0;
593   uint32_t value_size = 0;
594   const char *value = NULL;
595
596   if (NULL != oper)
597   { /* New modifier */
598     if (NULL != tmit->mod)
599       tmit->mod = tmit->mod->next;
600     if (NULL == tmit->mod)
601     { /* No more modifiers, continue with data */
602       *data_size = 0;
603       return GNUNET_YES;
604     }
605
606     GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
607     *full_value_size = tmit->mod->value_size;
608     *oper = tmit->mod->oper;
609     name_size = strlen (tmit->mod->name) + 1;
610
611     if (name_size + tmit->mod->value_size <= *data_size)
612     {
613       value_size = tmit->mod->value_size;
614       *data_size = name_size + value_size;
615     }
616     else /* full modifier does not fit in data, continuation needed */
617     {
618       value_size = *data_size - name_size;
619       tmit->mod_value = tmit->mod->value + value_size;
620     }
621
622     memcpy (data, tmit->mod->name, name_size);
623     memcpy ((char *)data + name_size, tmit->mod->value, value_size);
624     return GNUNET_NO;
625   }
626   else
627   { /* Modifier continuation */
628     GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
629     value = tmit->mod_value;
630     if (tmit->mod_value_remaining <= *data_size)
631     {
632       value_size = tmit->mod_value_remaining;
633       tmit->mod_value = NULL;
634     }
635     else
636     {
637       value_size = *data_size;
638       tmit->mod_value += value_size;
639     }
640
641     if (*data_size < value_size)
642     {
643       LOG (GNUNET_ERROR_TYPE_DEBUG,
644            "Value in environment larger than buffer: %u < %zu\n",
645            *data_size, value_size);
646       *data_size = 0;
647       return GNUNET_NO;
648     }
649
650     *data_size = value_size;
651     memcpy (data, value, value_size);
652     return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
653   }
654 }
655
656
657 /**
658  * Transmit a message.
659  *
660  * @param tmit
661  *        Transmission handle.
662  * @param method_name
663  *        Which method should be invoked.
664  * @param env
665  *        Environment for the message.
666  *        Should stay available until the first call to notify_data.
667  *        Can be NULL if there are no modifiers or @a notify_mod is
668  *        provided instead.
669  * @param notify_mod
670  *        Function to call to obtain modifiers.
671  *        Can be NULL if there are no modifiers or @a env is provided instead.
672  * @param notify_data
673  *        Function to call to obtain fragments of the data.
674  * @param notify_cls
675  *        Closure for @a notify_mod and @a notify_data.
676  * @param flags
677  *        Flags for the message being transmitted.
678  *
679  * @return #GNUNET_OK if the transmission was started.
680  *         #GNUNET_SYSERR if another transmission is already going on.
681  */
682 int
683 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
684                               const char *method_name,
685                               const struct GNUNET_ENV_Environment *env,
686                               GNUNET_PSYC_TransmitNotifyModifier notify_mod,
687                               GNUNET_PSYC_TransmitNotifyData notify_data,
688                               void *notify_cls,
689                               uint32_t flags)
690 {
691   if (GNUNET_NO != tmit->in_transmit)
692     return GNUNET_SYSERR;
693   tmit->in_transmit = GNUNET_YES;
694
695   size_t size = strlen (method_name) + 1;
696   struct GNUNET_PSYC_MessageMethod *pmeth;
697   tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
698   tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
699
700   if (NULL != notify_mod)
701   {
702     tmit->notify_mod = notify_mod;
703     tmit->notify_mod_cls = notify_cls;
704   }
705   else
706   {
707     tmit->notify_mod = &transmit_notify_env;
708     tmit->notify_mod_cls = tmit;
709     if (NULL != env)
710     {
711       struct GNUNET_ENV_Modifier mod = {};
712       mod.next = GNUNET_ENV_environment_head (env);
713       tmit->mod = &mod;
714
715       struct GNUNET_ENV_Modifier *m = tmit->mod;
716       while (NULL != (m = m->next))
717       {
718         if (m->oper != GNUNET_ENV_OP_SET)
719           flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
720       }
721     }
722     else
723     {
724       tmit->mod = NULL;
725     }
726   }
727
728   pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
729   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
730   pmeth->header.size = htons (sizeof (*pmeth) + size);
731   pmeth->flags = htonl (flags);
732   memcpy (&pmeth[1], method_name, size);
733
734   tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
735   tmit->notify_data = notify_data;
736   tmit->notify_data_cls = notify_cls;
737
738   transmit_mod (tmit);
739   return GNUNET_OK;
740 }
741
742
743 /**
744  * Resume transmission.
745  *
746  * @param tmit  Transmission handle.
747  */
748 void
749 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
750 {
751   if (0 == tmit->acks_pending)
752   {
753     tmit->paused = GNUNET_NO;
754     transmit_data (tmit);
755   }
756 }
757
758
759 /**
760  * Abort transmission request.
761  *
762  * @param tmit  Transmission handle.
763  */
764 void
765 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
766 {
767   if (GNUNET_NO == tmit->in_transmit)
768     return;
769
770   tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
771   tmit->in_transmit = GNUNET_NO;
772   tmit->paused = GNUNET_NO;
773
774   /* FIXME */
775   struct GNUNET_MessageHeader msg;
776   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
777   msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
778   msg.size = htons (sizeof (msg));
779   transmit_queue_insert (tmit, &msg, GNUNET_YES);
780 }
781
782
783 /**
784  * Got acknowledgement of a transmitted message part, continue transmission.
785  *
786  * @param tmit  Transmission handle.
787  */
788 void
789 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
790 {
791   if (0 == tmit->acks_pending)
792   {
793     LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
794     GNUNET_break (0);
795     return;
796   }
797   tmit->acks_pending--;
798
799   switch (tmit->state)
800   {
801   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
802   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
803     if (GNUNET_NO == tmit->paused)
804       transmit_mod (tmit);
805     break;
806
807   case GNUNET_PSYC_MESSAGE_STATE_DATA:
808     if (GNUNET_NO == tmit->paused)
809       transmit_data (tmit);
810     break;
811
812   case GNUNET_PSYC_MESSAGE_STATE_END:
813   case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
814     break;
815
816   default:
817     LOG (GNUNET_ERROR_TYPE_DEBUG,
818          "Ignoring message ACK in state %u.\n", tmit->state);
819   }
820 }
821
822
823 /**** Receiving messages ****/
824
825
826 /**
827  * Create handle for receiving messages.
828  */
829 struct GNUNET_PSYC_ReceiveHandle *
830 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
831                             GNUNET_PSYC_MessagePartCallback message_part_cb,
832                             void *cb_cls)
833 {
834   struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
835   recv->message_cb = message_cb;
836   recv->message_part_cb = message_part_cb;
837   recv->cb_cls = cb_cls;
838   return recv;
839 }
840
841
842 /**
843  * Destroy handle for receiving messages.
844  */
845 void
846 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
847 {
848   GNUNET_free (recv);
849 }
850
851
852 /**
853  * Reset stored data related to the last received message.
854  */
855 void
856 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
857 {
858   recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
859   recv->flags = 0;
860   recv->message_id = 0;
861   recv->mod_value_size = 0;
862   recv->mod_value_size_expected = 0;
863 }
864
865
866 static void
867 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
868 {
869   if (NULL != recv->message_part_cb)
870     recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
871                            0, NULL);
872
873   if (NULL != recv->message_cb)
874     recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
875
876   GNUNET_PSYC_receive_reset (recv);
877 }
878
879
880 /**
881  * Handle incoming PSYC message.
882  *
883  * @param recv  Receive handle.
884  * @param msg   The message.
885  *
886  * @return #GNUNET_OK on success,
887  *         #GNUNET_SYSERR on receive error.
888  */
889 int
890 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
891                              const struct GNUNET_PSYC_MessageHeader *msg)
892 {
893   uint16_t size = ntohs (msg->header.size);
894   uint32_t flags = ntohl (msg->flags);
895   uint64_t message_id;
896
897   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
898                            (struct GNUNET_MessageHeader *) msg);
899
900   if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
901   {
902     recv->message_id = GNUNET_ntohll (msg->message_id);
903     recv->flags = flags;
904     recv->slave_key = msg->slave_key;
905     recv->mod_value_size = 0;
906     recv->mod_value_size_expected = 0;
907   }
908   else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
909   {
910     // FIXME
911     LOG (GNUNET_ERROR_TYPE_WARNING,
912          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
913          GNUNET_ntohll (msg->message_id), recv->message_id);
914     GNUNET_break_op (0);
915     recv_error (recv);
916     return GNUNET_SYSERR;
917   }
918   else if (flags != recv->flags)
919   {
920     LOG (GNUNET_ERROR_TYPE_WARNING,
921          "Unexpected message flags. Got: %lu, expected: %lu\n",
922          flags, recv->flags);
923     GNUNET_break_op (0);
924     recv_error (recv);
925     return GNUNET_SYSERR;
926   }
927   message_id = recv->message_id;
928
929   uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
930
931   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
932   {
933     const struct GNUNET_MessageHeader *pmsg
934       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
935     psize = ntohs (pmsg->size);
936     ptype = ntohs (pmsg->type);
937     size_eq = size_min = 0;
938
939     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
940     {
941       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
942                   "Dropping message of type %u with invalid size %u.\n",
943                   ptype, psize);
944       recv_error (recv);
945       return GNUNET_SYSERR;
946     }
947
948     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949                 "Received message part of type %u and size %u from PSYC.\n",
950                 ptype, psize);
951     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
952
953     switch (ptype)
954     {
955     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
956       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
957       break;
958     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
959       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
960       break;
961     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
962     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
963       size_min = sizeof (struct GNUNET_MessageHeader);
964       break;
965     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
966     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
967       size_eq = sizeof (struct GNUNET_MessageHeader);
968       break;
969     default:
970       GNUNET_break_op (0);
971       recv_error (recv);
972       return GNUNET_SYSERR;
973     }
974
975     if (! ((0 < size_eq && psize == size_eq)
976            || (0 < size_min && size_min <= psize)))
977     {
978       GNUNET_break_op (0);
979       recv_error (recv);
980       return GNUNET_SYSERR;
981     }
982
983     switch (ptype)
984     {
985     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
986     {
987       struct GNUNET_PSYC_MessageMethod *meth
988         = (struct GNUNET_PSYC_MessageMethod *) pmsg;
989
990       if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
991       {
992         LOG (GNUNET_ERROR_TYPE_WARNING,
993              "Dropping out of order message method (%u).\n",
994              recv->state);
995         /* It is normal to receive an incomplete message right after connecting,
996          * but should not happen later.
997          * FIXME: add a check for this condition.
998          */
999         GNUNET_break_op (0);
1000         recv_error (recv);
1001         return GNUNET_SYSERR;
1002       }
1003
1004       if ('\0' != *((char *) meth + psize - 1))
1005       {
1006         LOG (GNUNET_ERROR_TYPE_WARNING,
1007              "Dropping message with malformed method. "
1008              "Message ID: %" PRIu64 "\n", recv->message_id);
1009         GNUNET_break_op (0);
1010         recv_error (recv);
1011         return GNUNET_SYSERR;
1012       }
1013       recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1014       break;
1015     }
1016     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1017     {
1018       if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1019             || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1020             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1021       {
1022         LOG (GNUNET_ERROR_TYPE_WARNING,
1023              "Dropping out of order message modifier (%u).\n",
1024              recv->state);
1025         GNUNET_break_op (0);
1026         recv_error (recv);
1027         return GNUNET_SYSERR;
1028       }
1029
1030       struct GNUNET_PSYC_MessageModifier *mod
1031         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1032
1033       uint16_t name_size = ntohs (mod->name_size);
1034       recv->mod_value_size_expected = ntohl (mod->value_size);
1035       recv->mod_value_size = psize - sizeof (*mod) - name_size;
1036
1037       if (psize < sizeof (*mod) + name_size
1038           || '\0' != *((char *) &mod[1] + name_size - 1)
1039           || recv->mod_value_size_expected < recv->mod_value_size)
1040       {
1041         LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1042         GNUNET_break_op (0);
1043         recv_error (recv);
1044         return GNUNET_SYSERR;
1045       }
1046       recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1047       break;
1048     }
1049     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1050     {
1051       recv->mod_value_size += psize - sizeof (*pmsg);
1052
1053       if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1054             || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1055           || recv->mod_value_size_expected < recv->mod_value_size)
1056       {
1057         LOG (GNUNET_ERROR_TYPE_WARNING,
1058              "Dropping out of order message modifier continuation "
1059              "!(%u == %u || %u == %u) || %lu < %lu.\n",
1060              GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1061              GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1062              recv->mod_value_size_expected, recv->mod_value_size);
1063         GNUNET_break_op (0);
1064         recv_error (recv);
1065         return GNUNET_SYSERR;
1066       }
1067       break;
1068     }
1069     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1070     {
1071       if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1072           || recv->mod_value_size_expected != recv->mod_value_size)
1073       {
1074         LOG (GNUNET_ERROR_TYPE_WARNING,
1075              "Dropping out of order message data fragment "
1076              "(%u < %u || %lu != %lu).\n",
1077              recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1078              recv->mod_value_size_expected, recv->mod_value_size);
1079
1080         GNUNET_break_op (0);
1081         recv_error (recv);
1082         return GNUNET_SYSERR;
1083       }
1084       recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1085       break;
1086     }
1087     }
1088
1089     if (NULL != recv->message_part_cb)
1090       recv->message_part_cb (recv->cb_cls, &recv->slave_key,
1091                              recv->message_id, recv->flags,
1092                              0, // FIXME: data_offset
1093                              pmsg);
1094
1095     switch (ptype)
1096     {
1097     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1098     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1099       GNUNET_PSYC_receive_reset (recv);
1100       break;
1101     }
1102   }
1103
1104   if (NULL != recv->message_cb)
1105     recv->message_cb (recv->cb_cls, message_id, flags, msg);
1106   return GNUNET_OK;
1107 }
1108
1109
1110 /**
1111  * Check if @a data contains a series of valid message parts.
1112  *
1113  * @param      data_size    Size of @a data.
1114  * @param      data         Data.
1115  * @param[out] first_ptype  Type of first message part.
1116  * @param[out] last_ptype   Type of last message part.
1117  *
1118  * @return Number of message parts found in @a data.
1119  *         or GNUNET_SYSERR if the message contains invalid parts.
1120  */
1121 int
1122 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1123                                  uint16_t *first_ptype, uint16_t *last_ptype)
1124 {
1125   const struct GNUNET_MessageHeader *pmsg;
1126   uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1127   if (NULL != first_ptype)
1128     *first_ptype = 0;
1129   if (NULL != last_ptype)
1130     *last_ptype = 0;
1131
1132   for (pos = 0; pos < data_size; pos += psize, parts++)
1133   {
1134     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1135     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1136     psize = ntohs (pmsg->size);
1137     ptype = ntohs (pmsg->type);
1138     if (0 == parts && NULL != first_ptype)
1139       *first_ptype = ptype;
1140     if (NULL != last_ptype
1141         && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1142       *last_ptype = ptype;
1143     if (psize < sizeof (*pmsg)
1144         || pos + psize > data_size
1145         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1146         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1147     {
1148       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1149                   "Invalid message part of type %u and size %u.\n",
1150                   ptype, psize);
1151       return GNUNET_SYSERR;
1152     }
1153     /** @todo FIXME: check message part order */
1154   }
1155   return parts;
1156 }
1157
1158
1159 struct ParseMessageClosure
1160 {
1161   struct GNUNET_ENV_Environment *env;
1162   const char **method_name;
1163   const void **data;
1164   uint16_t *data_size;
1165   enum GNUNET_PSYC_MessageState msg_state;
1166 };
1167
1168
1169 static void
1170 parse_message_part_cb (void *cls,
1171                        const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1172                        uint64_t message_id, uint32_t flags, uint64_t data_offset,
1173                        const struct GNUNET_MessageHeader *msg)
1174 {
1175   struct ParseMessageClosure *pmc = cls;
1176   if (NULL == msg)
1177   {
1178     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1179     return;
1180   }
1181
1182   switch (ntohs (msg->type))
1183   {
1184   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1185   {
1186     struct GNUNET_PSYC_MessageMethod *
1187       pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1188     *pmc->method_name = (const char *) &pmeth[1];
1189     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1190     break;
1191   }
1192
1193   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1194   {
1195     struct GNUNET_PSYC_MessageModifier *
1196       pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1197
1198     const char *name = (const char *) &pmod[1];
1199     const void *value = name + ntohs (pmod->name_size);
1200     GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1201                                 ntohl (pmod->value_size));
1202     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1203     break;
1204   }
1205
1206   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1207     *pmc->data = &msg[1];
1208     *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1209     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1210     break;
1211
1212   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1213     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1214     break;
1215
1216   default:
1217     pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1218   }
1219 }
1220
1221
1222 /**
1223  * Parse PSYC message.
1224  *
1225  * @param msg
1226  *        The PSYC message to parse.
1227  * @param[out] method_name
1228  *        Pointer to the method name inside @a pmsg.
1229  * @param env
1230  *        The environment for the message with a list of modifiers.
1231  * @param[out] data
1232  *        Pointer to data inside @a pmsg.
1233  * @param[out] data_size
1234  *        Size of @data is written here.
1235  *
1236  * @return #GNUNET_OK on success,
1237  *         #GNUNET_SYSERR on parse error.
1238  */
1239 int
1240 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1241                            const char **method_name,
1242                            struct GNUNET_ENV_Environment *env,
1243                            const void **data,
1244                            uint16_t *data_size)
1245 {
1246   struct ParseMessageClosure cls;
1247   cls.env = env;
1248   cls.method_name = method_name;
1249   cls.data = data;
1250   cls.data_size = data_size;
1251
1252   struct GNUNET_PSYC_ReceiveHandle *
1253     recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1254   GNUNET_PSYC_receive_message (recv, msg);
1255   GNUNET_PSYC_receive_destroy (recv);
1256
1257   return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1258     ? GNUNET_OK
1259     : GNUNET_SYSERR;
1260 }
1261
1262
1263 /**
1264  * Initialize PSYC message header.
1265  */
1266 void
1267 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1268                                  const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1269                                  uint32_t flags)
1270 {
1271   uint16_t size = ntohs (mmsg->header.size);
1272   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1273
1274   pmsg->header.size = htons (psize);
1275   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1276   pmsg->message_id = mmsg->message_id;
1277   pmsg->fragment_offset = mmsg->fragment_offset;
1278   pmsg->flags = htonl (flags);
1279
1280   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1281 }
1282
1283
1284 /**
1285  * Create a new PSYC message header from a multicast message.
1286  */
1287 struct GNUNET_PSYC_MessageHeader *
1288 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1289                                    uint32_t flags)
1290 {
1291   struct GNUNET_PSYC_MessageHeader *pmsg;
1292   uint16_t size = ntohs (mmsg->header.size);
1293   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1294
1295   pmsg = GNUNET_malloc (psize);
1296   GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1297   return pmsg;
1298 }
1299
1300
1301 /**
1302  * Create a new PSYC message header from a PSYC message.
1303  */
1304 struct GNUNET_PSYC_MessageHeader *
1305 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1306 {
1307   uint16_t msg_size = ntohs (msg->header.size);
1308   struct GNUNET_PSYC_MessageHeader *
1309     pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1310   pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1311   pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1312   memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1313   return pmsg;
1314 }