2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycutil/psyc_message.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
37 struct GNUNET_PSYC_TransmitHandle
40 * Client connection to service.
42 struct GNUNET_MQ_Handle *mq;
45 * Message currently being received from the client.
47 struct GNUNET_MessageHeader *msg;
52 struct GNUNET_MQ_Envelope *env;
55 * Callback to request next modifier from client.
57 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
60 * Closure for the notify callbacks.
65 * Callback to request next data fragment from client.
67 GNUNET_PSYC_TransmitNotifyData notify_data;
70 * Closure for the notify callbacks.
72 void *notify_data_cls;
75 * Modifier of the environment that is currently being transmitted.
77 struct GNUNET_PSYC_Modifier *mod;
82 const char *mod_value;
85 * Number of bytes remaining to be transmitted from the current modifier value.
87 uint32_t mod_value_remaining;
90 * State of the current message being received from client.
92 enum GNUNET_PSYC_MessageState state;
95 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
100 * Is transmission paused?
105 * Are we currently transmitting a message?
110 * Notify callback is currently being called.
118 struct GNUNET_PSYC_ReceiveHandle
123 GNUNET_PSYC_MessageCallback message_cb;
126 * Message part callback.
128 GNUNET_PSYC_MessagePartCallback message_part_cb;
131 * Closure for the callbacks.
136 * ID of the message being received from the PSYC service.
141 * Public key of the slave from which a message is being received.
143 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
146 * State of the currently being received message from the PSYC service.
148 enum GNUNET_PSYC_MessageState state;
151 * Flags for the currently being received message from the PSYC service.
153 enum GNUNET_PSYC_MessageFlags flags;
156 * Expected value size for the modifier being received from the PSYC service.
158 uint32_t mod_value_size_expected;
161 * Actual value size for the modifier being received from the PSYC service.
163 uint32_t mod_value_size;
171 * Create a PSYC message.
174 * PSYC method for the message.
176 * Environment for the message.
178 * Data payload for the message.
182 * @return Message header with size information,
183 * followed by the message parts.
185 struct GNUNET_PSYC_Message *
186 GNUNET_PSYC_message_create (const char *method_name,
187 const struct GNUNET_PSYC_Environment *env,
191 struct GNUNET_PSYC_Modifier *mod = NULL;
192 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
193 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
194 struct GNUNET_MessageHeader *pmsg = NULL;
195 uint16_t env_size = 0;
198 mod = GNUNET_PSYC_env_head (env);
201 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
206 struct GNUNET_PSYC_Message *msg;
207 uint16_t method_name_size = strlen (method_name) + 1;
208 if (method_name_size == 1)
211 uint16_t msg_size = sizeof (*msg) /* header */
212 + sizeof (*pmeth) + method_name_size /* method */
213 + env_size /* modifiers */
214 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
215 + sizeof (*pmsg); /* end of message */
216 msg = GNUNET_malloc (msg_size);
217 msg->header.size = htons (msg_size);
218 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
220 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
221 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
222 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
223 GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
225 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
228 mod = GNUNET_PSYC_env_head (env);
231 uint16_t mod_name_size = strlen (mod->name) + 1;
232 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
233 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
234 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
235 p += pmod->header.size;
236 pmod->header.size = htons (pmod->header.size);
238 pmod->oper = mod->oper;
239 pmod->name_size = htons (mod_name_size);
240 pmod->value_size = htonl (mod->value_size);
242 GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
243 if (0 < mod->value_size)
244 GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
252 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
253 pmsg->size = sizeof (*pmsg) + data_size;
255 pmsg->size = htons (pmsg->size);
256 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
257 GNUNET_memcpy (&pmsg[1], data, data_size);
260 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
261 pmsg->size = htons (sizeof (*pmsg));
262 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
264 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
270 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
271 const struct GNUNET_MessageHeader *msg)
273 uint16_t size = ntohs (msg->size);
274 uint16_t type = ntohs (msg->type);
277 "Message of type %d and size %u:\n",
282 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
284 const struct GNUNET_PSYC_MessageHeader *pmsg
285 = (const struct GNUNET_PSYC_MessageHeader *) msg;
287 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
288 GNUNET_ntohll (pmsg->message_id),
289 ntohl (pmsg->flags));
292 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
294 const struct GNUNET_PSYC_MessageMethod *meth
295 = (const struct GNUNET_PSYC_MessageMethod *) msg;
298 (int) (size - sizeof (*meth)),
299 (const char *) &meth[1]);
302 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
304 const struct GNUNET_PSYC_MessageModifier *mod
305 = (const struct GNUNET_PSYC_MessageModifier *) msg;
306 uint16_t name_size = ntohs (mod->name_size);
307 char oper = ' ' < mod->oper ? mod->oper : ' ';
312 (const char *) &mod[1],
313 (int) (size - sizeof (*mod) - name_size),
314 ((const char *) &mod[1]) + name_size);
317 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
318 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
321 (int) (size - sizeof (*msg)),
322 (const char *) &msg[1]);
328 /**** Transmitting messages ****/
332 * Create a transmission handle.
334 struct GNUNET_PSYC_TransmitHandle *
335 GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
337 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
345 * Destroy a transmission handle.
348 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
355 * Queue a message part for transmission.
357 * The message part is added to the current message buffer.
358 * When this buffer is full, it is added to the transmission queue.
361 * Transmission handle.
363 * Message part, or NULL.
365 * Transmit message now, or wait for buffer to fill up?
366 * #GNUNET_YES or #GNUNET_NO.
369 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
370 const struct GNUNET_MessageHeader *msg,
373 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
375 LOG (GNUNET_ERROR_TYPE_DEBUG,
376 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
377 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
379 if (NULL != tmit->msg)
382 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
384 /* End of message or buffer is full, add it to transmission queue
385 * and start with empty buffer */
386 tmit->msg->size = htons (tmit->msg->size);
387 GNUNET_MQ_send (tmit->mq, tmit->env);
390 tmit->acks_pending++;
394 /* Message fits in current buffer, append */
395 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
396 tmit->msg->size += size;
400 if (NULL == tmit->msg && NULL != msg)
402 /* Empty buffer, copy over message. */
403 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
404 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
405 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
406 /* store current message size in host byte order
407 * then later switch it to network byte order before sending */
408 tmit->msg->size = sizeof (*tmit->msg) + size;
410 GNUNET_memcpy (&tmit->msg[1], msg, size);
413 if (NULL != tmit->msg
414 && (GNUNET_YES == tmit_now
415 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
416 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
418 /* End of message or buffer is full, add it to transmission queue. */
419 tmit->msg->size = htons (tmit->msg->size);
420 GNUNET_MQ_send (tmit->mq, tmit->env);
423 tmit->acks_pending++;
429 * Request data from client to transmit.
431 * @param tmit Transmission handle.
434 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
436 int notify_ret = GNUNET_YES;
437 uint16_t data_size = 0;
438 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
439 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
440 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
442 if (NULL != tmit->notify_data)
444 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
445 tmit->in_notify = GNUNET_YES;
446 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
447 tmit->in_notify = GNUNET_NO;
449 LOG (GNUNET_ERROR_TYPE_DEBUG,
450 "transmit_data (ret: %d, size: %u): %.*s\n",
451 notify_ret, data_size, data_size, &msg[1]);
457 /* Transmission paused, nothing to send. */
458 tmit->paused = GNUNET_YES;
464 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
468 LOG (GNUNET_ERROR_TYPE_ERROR,
469 "TransmitNotifyData callback returned error when requesting data.\n");
471 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
472 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
473 msg->size = htons (sizeof (*msg));
474 transmit_queue_insert (tmit, msg, GNUNET_YES);
475 tmit->in_transmit = GNUNET_NO;
481 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
482 msg->size = htons (sizeof (*msg) + data_size);
483 transmit_queue_insert (tmit, msg, !notify_ret);
486 /* End of message. */
487 if (GNUNET_YES == notify_ret)
489 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
490 msg->size = htons (sizeof (*msg));
491 transmit_queue_insert (tmit, msg, GNUNET_YES);
492 /* FIXME: wait for ACK before setting in_transmit to no */
493 tmit->in_transmit = GNUNET_NO;
499 * Request a modifier from a client to transmit.
501 * @param tmit Transmission handle.
504 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
506 uint16_t max_data_size = 0;
507 uint16_t data_size = 0;
508 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
509 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
510 int notify_ret = GNUNET_YES;
514 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
516 struct GNUNET_PSYC_MessageModifier *mod
517 = (struct GNUNET_PSYC_MessageModifier *) msg;
518 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
519 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
521 if (NULL != tmit->notify_mod)
523 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
524 data_size = max_data_size;
525 tmit->in_notify = GNUNET_YES;
526 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
527 &mod->oper, &mod->value_size);
528 tmit->in_notify = GNUNET_NO;
531 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
532 LOG (GNUNET_ERROR_TYPE_DEBUG,
533 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
534 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
535 if (mod->name_size < data_size)
537 tmit->mod_value_remaining
538 = mod->value_size - (data_size - mod->name_size);
539 mod->value_size = htonl (mod->value_size);
540 mod->name_size = htons (mod->name_size);
542 else if (0 < data_size)
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
545 notify_ret = GNUNET_SYSERR;
549 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
551 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
552 msg->size = sizeof (struct GNUNET_MessageHeader);
554 if (NULL != tmit->notify_mod)
556 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
557 data_size = max_data_size;
558 tmit->in_notify = GNUNET_YES;
559 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
560 &data_size, &msg[1], NULL, NULL);
561 tmit->in_notify = GNUNET_NO;
563 tmit->mod_value_remaining -= data_size;
564 LOG (GNUNET_ERROR_TYPE_DEBUG,
565 "transmit_mod (ret: %d, size: %u): %.*s\n",
566 notify_ret, data_size, data_size, &msg[1]);
577 { /* Transmission paused, nothing to send. */
578 tmit->paused = GNUNET_YES;
582 = (0 == tmit->mod_value_remaining)
583 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
584 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
587 case GNUNET_YES: /* End of modifiers. */
588 GNUNET_assert (0 == tmit->mod_value_remaining);
592 LOG (GNUNET_ERROR_TYPE_ERROR,
593 "TransmitNotifyModifier callback returned with error.\n");
595 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
596 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
597 msg->size = htons (sizeof (*msg));
598 transmit_queue_insert (tmit, msg, GNUNET_YES);
599 tmit->in_transmit = GNUNET_NO;
605 GNUNET_assert (data_size <= max_data_size);
606 msg->size = htons (msg->size + data_size);
607 transmit_queue_insert (tmit, msg, GNUNET_NO);
610 if (GNUNET_YES == notify_ret)
612 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
613 if (0 == tmit->acks_pending)
614 transmit_data (tmit);
624 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
625 uint32_t *full_value_size)
628 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
629 uint16_t name_size = 0;
630 uint32_t value_size = 0;
631 const char *value = NULL;
635 if (NULL != tmit->mod)
636 tmit->mod = tmit->mod->next;
637 if (NULL == tmit->mod)
638 { /* No more modifiers, continue with data */
643 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
644 *full_value_size = tmit->mod->value_size;
645 *oper = tmit->mod->oper;
646 name_size = strlen (tmit->mod->name) + 1;
648 if (name_size + tmit->mod->value_size <= *data_size)
650 value_size = tmit->mod->value_size;
651 *data_size = name_size + value_size;
653 else /* full modifier does not fit in data, continuation needed */
655 value_size = *data_size - name_size;
656 tmit->mod_value = tmit->mod->value + value_size;
659 GNUNET_memcpy (data, tmit->mod->name, name_size);
660 GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
664 { /* Modifier continuation */
665 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
666 value = tmit->mod_value;
667 if (tmit->mod_value_remaining <= *data_size)
669 value_size = tmit->mod_value_remaining;
670 tmit->mod_value = NULL;
674 value_size = *data_size;
675 tmit->mod_value += value_size;
678 if (*data_size < value_size)
680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Value in environment larger than buffer: %u < %zu\n",
682 *data_size, value_size);
687 *data_size = value_size;
688 GNUNET_memcpy (data, value, value_size);
689 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
695 * Transmit a message.
698 * Transmission handle.
700 * Which method should be invoked.
702 * Environment for the message.
703 * Should stay available until the first call to notify_data.
704 * Can be NULL if there are no modifiers or @a notify_mod is
707 * Function to call to obtain modifiers.
708 * Can be NULL if there are no modifiers or @a env is provided instead.
710 * Function to call to obtain fragments of the data.
712 * Closure for @a notify_mod and @a notify_data.
714 * Flags for the message being transmitted.
716 * @return #GNUNET_OK if the transmission was started.
717 * #GNUNET_SYSERR if another transmission is already going on.
720 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
721 const char *method_name,
722 const struct GNUNET_PSYC_Environment *env,
723 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
724 GNUNET_PSYC_TransmitNotifyData notify_data,
728 if (GNUNET_NO != tmit->in_transmit)
729 return GNUNET_SYSERR;
730 tmit->in_transmit = GNUNET_YES;
732 size_t size = strlen (method_name) + 1;
733 struct GNUNET_PSYC_MessageMethod *pmeth;
735 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
736 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
737 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
738 /* store current message size in host byte order
739 * then later switch it to network byte order before sending */
740 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
742 if (NULL != notify_mod)
744 tmit->notify_mod = notify_mod;
745 tmit->notify_mod_cls = notify_cls;
749 tmit->notify_mod = &transmit_notify_env;
750 tmit->notify_mod_cls = tmit;
753 struct GNUNET_PSYC_Modifier mod = {};
754 mod.next = GNUNET_PSYC_env_head (env);
757 struct GNUNET_PSYC_Modifier *m = tmit->mod;
758 while (NULL != (m = m->next))
760 if (m->oper != GNUNET_PSYC_OP_SET)
761 flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
770 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
771 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
772 pmeth->header.size = htons (sizeof (*pmeth) + size);
773 pmeth->flags = htonl (flags);
774 GNUNET_memcpy (&pmeth[1], method_name, size);
776 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
777 tmit->notify_data = notify_data;
778 tmit->notify_data_cls = notify_cls;
786 * Resume transmission.
788 * @param tmit Transmission handle.
791 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
793 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
796 if (0 == tmit->acks_pending)
798 tmit->paused = GNUNET_NO;
799 transmit_data (tmit);
805 * Abort transmission request.
807 * @param tmit Transmission handle.
810 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
812 if (GNUNET_NO == tmit->in_transmit)
815 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
816 tmit->in_transmit = GNUNET_NO;
817 tmit->paused = GNUNET_NO;
820 struct GNUNET_MessageHeader msg;
821 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
822 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
823 msg.size = htons (sizeof (msg));
824 transmit_queue_insert (tmit, &msg, GNUNET_YES);
829 * Got acknowledgement of a transmitted message part, continue transmission.
831 * @param tmit Transmission handle.
834 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
836 if (0 == tmit->acks_pending)
838 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
842 tmit->acks_pending--;
844 if (GNUNET_YES == tmit->paused)
849 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
850 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
854 case GNUNET_PSYC_MESSAGE_STATE_DATA:
855 transmit_data (tmit);
858 case GNUNET_PSYC_MESSAGE_STATE_END:
859 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
863 LOG (GNUNET_ERROR_TYPE_DEBUG,
864 "Ignoring message ACK in state %u.\n", tmit->state);
869 /**** Receiving messages ****/
873 * Create handle for receiving messages.
875 struct GNUNET_PSYC_ReceiveHandle *
876 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
877 GNUNET_PSYC_MessagePartCallback message_part_cb,
880 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
881 recv->message_cb = message_cb;
882 recv->message_part_cb = message_part_cb;
883 recv->cb_cls = cb_cls;
889 * Destroy handle for receiving messages.
892 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
899 * Reset stored data related to the last received message.
902 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
904 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
906 recv->message_id = 0;
907 recv->mod_value_size = 0;
908 recv->mod_value_size_expected = 0;
913 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
915 if (NULL != recv->message_part_cb)
916 recv->message_part_cb (recv->cb_cls, NULL, NULL);
918 if (NULL != recv->message_cb)
919 recv->message_cb (recv->cb_cls, NULL);
921 GNUNET_PSYC_receive_reset (recv);
926 * Handle incoming PSYC message.
928 * @param recv Receive handle.
929 * @param msg The message.
931 * @return #GNUNET_OK on success,
932 * #GNUNET_SYSERR on receive error.
935 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
936 const struct GNUNET_PSYC_MessageHeader *msg)
938 uint16_t size = ntohs (msg->header.size);
939 uint32_t flags = ntohl (msg->flags);
941 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
942 (struct GNUNET_MessageHeader *) msg);
944 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
946 recv->message_id = GNUNET_ntohll (msg->message_id);
948 recv->slave_pub_key = msg->slave_pub_key;
949 recv->mod_value_size = 0;
950 recv->mod_value_size_expected = 0;
952 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
955 LOG (GNUNET_ERROR_TYPE_WARNING,
956 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
957 GNUNET_ntohll (msg->message_id), recv->message_id);
960 return GNUNET_SYSERR;
962 else if (flags != recv->flags)
964 LOG (GNUNET_ERROR_TYPE_WARNING,
965 "Unexpected message flags. Got: %lu, expected: %lu\n",
969 return GNUNET_SYSERR;
972 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
974 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
976 const struct GNUNET_MessageHeader *pmsg
977 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
978 psize = ntohs (pmsg->size);
979 ptype = ntohs (pmsg->type);
980 size_eq = size_min = 0;
982 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
984 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
985 "Dropping message of type %u with invalid size %u.\n",
988 return GNUNET_SYSERR;
991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992 "Received message part of type %u and size %u from PSYC.\n",
994 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
998 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
999 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
1001 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1002 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
1004 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1005 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1006 size_min = sizeof (struct GNUNET_MessageHeader);
1008 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1009 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1010 size_eq = sizeof (struct GNUNET_MessageHeader);
1013 GNUNET_break_op (0);
1015 return GNUNET_SYSERR;
1018 if (! ((0 < size_eq && psize == size_eq)
1019 || (0 < size_min && size_min <= psize)))
1021 GNUNET_break_op (0);
1023 return GNUNET_SYSERR;
1028 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1030 struct GNUNET_PSYC_MessageMethod *meth
1031 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1033 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1035 LOG (GNUNET_ERROR_TYPE_WARNING,
1036 "Dropping out of order message method (%u).\n",
1038 /* It is normal to receive an incomplete message right after connecting,
1039 * but should not happen later.
1040 * FIXME: add a check for this condition.
1042 GNUNET_break_op (0);
1044 return GNUNET_SYSERR;
1047 if ('\0' != *((char *) meth + psize - 1))
1049 LOG (GNUNET_ERROR_TYPE_WARNING,
1050 "Dropping message with malformed method. "
1051 "Message ID: %" PRIu64 "\n", recv->message_id);
1052 GNUNET_break_op (0);
1054 return GNUNET_SYSERR;
1056 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1059 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1061 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1062 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1063 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1065 LOG (GNUNET_ERROR_TYPE_WARNING,
1066 "Dropping out of order message modifier (%u).\n",
1068 GNUNET_break_op (0);
1070 return GNUNET_SYSERR;
1073 struct GNUNET_PSYC_MessageModifier *mod
1074 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1076 uint16_t name_size = ntohs (mod->name_size);
1077 recv->mod_value_size_expected = ntohl (mod->value_size);
1078 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1080 if (psize < sizeof (*mod) + name_size
1081 || '\0' != *((char *) &mod[1] + name_size - 1)
1082 || recv->mod_value_size_expected < recv->mod_value_size)
1084 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1085 GNUNET_break_op (0);
1087 return GNUNET_SYSERR;
1089 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1092 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1094 recv->mod_value_size += psize - sizeof (*pmsg);
1096 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1097 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1098 || recv->mod_value_size_expected < recv->mod_value_size)
1100 LOG (GNUNET_ERROR_TYPE_WARNING,
1101 "Dropping out of order message modifier continuation "
1102 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1103 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1104 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1105 recv->mod_value_size_expected, recv->mod_value_size);
1106 GNUNET_break_op (0);
1108 return GNUNET_SYSERR;
1112 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1114 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1115 || recv->mod_value_size_expected != recv->mod_value_size)
1117 LOG (GNUNET_ERROR_TYPE_WARNING,
1118 "Dropping out of order message data fragment "
1119 "(%u < %u || %lu != %lu).\n",
1120 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1121 recv->mod_value_size_expected, recv->mod_value_size);
1123 GNUNET_break_op (0);
1125 return GNUNET_SYSERR;
1127 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1132 if (NULL != recv->message_part_cb)
1133 recv->message_part_cb (recv->cb_cls, msg, pmsg);
1137 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1138 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1139 GNUNET_PSYC_receive_reset (recv);
1144 if (NULL != recv->message_cb)
1145 recv->message_cb (recv->cb_cls, msg);
1151 * Check if @a data contains a series of valid message parts.
1153 * @param data_size Size of @a data.
1155 * @param[out] first_ptype Type of first message part.
1156 * @param[out] last_ptype Type of last message part.
1158 * @return Number of message parts found in @a data.
1159 * or GNUNET_SYSERR if the message contains invalid parts.
1162 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1163 uint16_t *first_ptype, uint16_t *last_ptype)
1165 const struct GNUNET_MessageHeader *pmsg;
1166 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1167 if (NULL != first_ptype)
1169 if (NULL != last_ptype)
1172 for (pos = 0; pos < data_size; pos += psize, parts++)
1174 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1175 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1176 psize = ntohs (pmsg->size);
1177 ptype = ntohs (pmsg->type);
1178 if (0 == parts && NULL != first_ptype)
1179 *first_ptype = ptype;
1180 if (NULL != last_ptype
1181 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1182 *last_ptype = ptype;
1183 if (psize < sizeof (*pmsg)
1184 || pos + psize > data_size
1185 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1186 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1188 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1189 "Invalid message part of type %u and size %u.\n",
1191 return GNUNET_SYSERR;
1193 /** @todo FIXME: check message part order */
1199 struct ParseMessageClosure
1201 struct GNUNET_PSYC_Environment *env;
1202 const char **method_name;
1204 uint16_t *data_size;
1205 enum GNUNET_PSYC_MessageState msg_state;
1210 parse_message_part_cb (void *cls,
1211 const struct GNUNET_PSYC_MessageHeader *msg,
1212 const struct GNUNET_MessageHeader *pmsg)
1214 struct ParseMessageClosure *pmc = cls;
1217 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1221 switch (ntohs (pmsg->type))
1223 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1225 struct GNUNET_PSYC_MessageMethod *
1226 pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1227 *pmc->method_name = (const char *) &pmeth[1];
1228 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1232 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1234 struct GNUNET_PSYC_MessageModifier *
1235 pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1237 const char *name = (const char *) &pmod[1];
1238 const void *value = name + ntohs (pmod->name_size);
1239 GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1240 ntohl (pmod->value_size));
1241 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1245 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1246 *pmc->data = &pmsg[1];
1247 *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1248 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1251 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1252 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1256 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1262 * Parse PSYC message.
1265 * The PSYC message to parse.
1266 * @param[out] method_name
1267 * Pointer to the method name inside @a pmsg.
1269 * The environment for the message with a list of modifiers.
1271 * Pointer to data inside @a msg.
1272 * @param[out] data_size
1273 * Size of @data is written here.
1275 * @return #GNUNET_OK on success,
1276 * #GNUNET_SYSERR on parse error.
1279 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1280 const char **method_name,
1281 struct GNUNET_PSYC_Environment *env,
1283 uint16_t *data_size)
1285 struct ParseMessageClosure cls;
1287 cls.method_name = method_name;
1289 cls.data_size = data_size;
1291 struct GNUNET_PSYC_ReceiveHandle *
1292 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1293 int ret = GNUNET_PSYC_receive_message (recv, msg);
1294 GNUNET_PSYC_receive_destroy (recv);
1296 if (GNUNET_OK != ret)
1297 return GNUNET_SYSERR;
1299 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1306 * Initialize PSYC message header.
1309 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1310 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1313 uint16_t size = ntohs (mmsg->header.size);
1314 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1316 pmsg->header.size = htons (psize);
1317 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1318 pmsg->message_id = mmsg->message_id;
1319 pmsg->fragment_offset = mmsg->fragment_offset;
1320 pmsg->flags = htonl (flags);
1322 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1327 * Create a new PSYC message header from a multicast message.
1329 struct GNUNET_PSYC_MessageHeader *
1330 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1333 struct GNUNET_PSYC_MessageHeader *pmsg;
1334 uint16_t size = ntohs (mmsg->header.size);
1335 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1337 pmsg = GNUNET_malloc (psize);
1338 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1344 * Create a new PSYC message header from a PSYC message.
1346 struct GNUNET_PSYC_MessageHeader *
1347 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1349 uint16_t msg_size = ntohs (msg->header.size);
1350 struct GNUNET_PSYC_MessageHeader *
1351 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1352 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1353 pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1354 GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));