2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software: you can redistribute it and/or modify it
6 * under the terms of the GNU Affero General Public License as published
7 * by the Free Software Foundation, either version 3 of the License,
8 * or (at your option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Affero General Public License for more details.
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file psycutil/psyc_message.c
21 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
22 * @author Gabor X Toth
28 #include "gnunet_util_lib.h"
29 #include "gnunet_psyc_util_lib.h"
30 #include "gnunet_psyc_service.h"
32 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
35 struct GNUNET_PSYC_TransmitHandle
38 * Client connection to service.
40 struct GNUNET_MQ_Handle *mq;
43 * Message currently being received from the client.
45 struct GNUNET_MessageHeader *msg;
50 struct GNUNET_MQ_Envelope *env;
53 * Callback to request next modifier from client.
55 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
58 * Closure for the notify callbacks.
63 * Callback to request next data fragment from client.
65 GNUNET_PSYC_TransmitNotifyData notify_data;
68 * Closure for the notify callbacks.
70 void *notify_data_cls;
73 * Modifier of the environment that is currently being transmitted.
75 struct GNUNET_PSYC_Modifier *mod;
80 const char *mod_value;
83 * Number of bytes remaining to be transmitted from the current modifier value.
85 uint32_t mod_value_remaining;
88 * State of the current message being received from client.
90 enum GNUNET_PSYC_MessageState state;
93 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
98 * Is transmission paused?
103 * Are we currently transmitting a message?
108 * Notify callback is currently being called.
116 struct GNUNET_PSYC_ReceiveHandle
121 GNUNET_PSYC_MessageCallback message_cb;
124 * Message part callback.
126 GNUNET_PSYC_MessagePartCallback message_part_cb;
129 * Closure for the callbacks.
134 * ID of the message being received from the PSYC service.
139 * Public key of the slave from which a message is being received.
141 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
144 * State of the currently being received message from the PSYC service.
146 enum GNUNET_PSYC_MessageState state;
149 * Flags for the currently being received message from the PSYC service.
151 enum GNUNET_PSYC_MessageFlags flags;
154 * Expected value size for the modifier being received from the PSYC service.
156 uint32_t mod_value_size_expected;
159 * Actual value size for the modifier being received from the PSYC service.
161 uint32_t mod_value_size;
169 * Create a PSYC message.
172 * PSYC method for the message.
174 * Environment for the message.
176 * Data payload for the message.
180 * @return Message header with size information,
181 * followed by the message parts.
183 struct GNUNET_PSYC_Message *
184 GNUNET_PSYC_message_create (const char *method_name,
185 const struct GNUNET_PSYC_Environment *env,
189 struct GNUNET_PSYC_Modifier *mod = NULL;
190 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
191 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
192 struct GNUNET_MessageHeader *pmsg = NULL;
193 uint16_t env_size = 0;
196 mod = GNUNET_PSYC_env_head (env);
199 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
204 struct GNUNET_PSYC_Message *msg;
205 uint16_t method_name_size = strlen (method_name) + 1;
206 if (method_name_size == 1)
209 uint16_t msg_size = sizeof (*msg) /* header */
210 + sizeof (*pmeth) + method_name_size /* method */
211 + env_size /* modifiers */
212 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
213 + sizeof (*pmsg); /* end of message */
214 msg = GNUNET_malloc (msg_size);
215 msg->header.size = htons (msg_size);
216 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
218 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
219 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
220 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
221 GNUNET_memcpy (&pmeth[1], method_name, method_name_size);
223 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
226 mod = GNUNET_PSYC_env_head (env);
229 uint16_t mod_name_size = strlen (mod->name) + 1;
230 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
231 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
232 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
233 p += pmod->header.size;
234 pmod->header.size = htons (pmod->header.size);
236 pmod->oper = mod->oper;
237 pmod->name_size = htons (mod_name_size);
238 pmod->value_size = htonl (mod->value_size);
240 GNUNET_memcpy (&pmod[1], mod->name, mod_name_size);
241 if (0 < mod->value_size)
242 GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
250 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
251 pmsg->size = sizeof (*pmsg) + data_size;
253 pmsg->size = htons (pmsg->size);
254 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
255 GNUNET_memcpy (&pmsg[1], data, data_size);
258 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
259 pmsg->size = htons (sizeof (*pmsg));
260 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
262 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
268 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
269 const struct GNUNET_MessageHeader *msg)
271 uint16_t size = ntohs (msg->size);
272 uint16_t type = ntohs (msg->type);
275 "Message of type %d and size %u:\n",
280 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
282 const struct GNUNET_PSYC_MessageHeader *pmsg
283 = (const struct GNUNET_PSYC_MessageHeader *) msg;
285 "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
286 GNUNET_ntohll (pmsg->message_id),
287 ntohl (pmsg->flags));
290 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
292 const struct GNUNET_PSYC_MessageMethod *meth
293 = (const struct GNUNET_PSYC_MessageMethod *) msg;
296 (int) (size - sizeof (*meth)),
297 (const char *) &meth[1]);
300 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
302 const struct GNUNET_PSYC_MessageModifier *mod
303 = (const struct GNUNET_PSYC_MessageModifier *) msg;
304 uint16_t name_size = ntohs (mod->name_size);
305 char oper = ' ' < mod->oper ? mod->oper : ' ';
310 (const char *) &mod[1],
311 (int) (size - sizeof (*mod) - name_size),
312 ((const char *) &mod[1]) + name_size);
315 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
316 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
319 (int) (size - sizeof (*msg)),
320 (const char *) &msg[1]);
326 /**** Transmitting messages ****/
330 * Create a transmission handle.
332 struct GNUNET_PSYC_TransmitHandle *
333 GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
335 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
343 * Destroy a transmission handle.
346 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
353 * Queue a message part for transmission.
355 * The message part is added to the current message buffer.
356 * When this buffer is full, it is added to the transmission queue.
359 * Transmission handle.
361 * Message part, or NULL.
363 * Transmit message now, or wait for buffer to fill up?
364 * #GNUNET_YES or #GNUNET_NO.
367 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
368 const struct GNUNET_MessageHeader *msg,
371 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
373 LOG (GNUNET_ERROR_TYPE_DEBUG,
374 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
375 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
377 if (NULL != tmit->msg)
380 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
382 /* End of message or buffer is full, add it to transmission queue
383 * and start with empty buffer */
384 tmit->msg->size = htons (tmit->msg->size);
385 GNUNET_MQ_send (tmit->mq, tmit->env);
388 tmit->acks_pending++;
392 /* Message fits in current buffer, append */
393 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
394 tmit->msg->size += size;
398 if (NULL == tmit->msg && NULL != msg)
400 /* Empty buffer, copy over message. */
401 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
402 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
403 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
404 /* store current message size in host byte order
405 * then later switch it to network byte order before sending */
406 tmit->msg->size = sizeof (*tmit->msg) + size;
408 GNUNET_memcpy (&tmit->msg[1], msg, size);
411 if (NULL != tmit->msg
412 && (GNUNET_YES == tmit_now
413 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
414 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
416 /* End of message or buffer is full, add it to transmission queue. */
417 tmit->msg->size = htons (tmit->msg->size);
418 GNUNET_MQ_send (tmit->mq, tmit->env);
421 tmit->acks_pending++;
427 * Request data from client to transmit.
429 * @param tmit Transmission handle.
432 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
434 int notify_ret = GNUNET_YES;
435 uint16_t data_size = 0;
436 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
437 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
438 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
440 if (NULL != tmit->notify_data)
442 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
443 tmit->in_notify = GNUNET_YES;
444 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
445 tmit->in_notify = GNUNET_NO;
447 LOG (GNUNET_ERROR_TYPE_DEBUG,
448 "transmit_data (ret: %d, size: %u): %.*s\n",
449 notify_ret, data_size, data_size, &msg[1]);
455 /* Transmission paused, nothing to send. */
456 tmit->paused = GNUNET_YES;
462 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
466 LOG (GNUNET_ERROR_TYPE_ERROR,
467 "TransmitNotifyData callback returned error when requesting data.\n");
469 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
470 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
471 msg->size = htons (sizeof (*msg));
472 transmit_queue_insert (tmit, msg, GNUNET_YES);
473 tmit->in_transmit = GNUNET_NO;
479 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
480 msg->size = htons (sizeof (*msg) + data_size);
481 transmit_queue_insert (tmit, msg, !notify_ret);
484 /* End of message. */
485 if (GNUNET_YES == notify_ret)
487 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
488 msg->size = htons (sizeof (*msg));
489 transmit_queue_insert (tmit, msg, GNUNET_YES);
490 /* FIXME: wait for ACK before setting in_transmit to no */
491 tmit->in_transmit = GNUNET_NO;
497 * Request a modifier from a client to transmit.
499 * @param tmit Transmission handle.
502 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
504 uint16_t max_data_size = 0;
505 uint16_t data_size = 0;
506 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
507 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
508 int notify_ret = GNUNET_YES;
512 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
514 struct GNUNET_PSYC_MessageModifier *mod
515 = (struct GNUNET_PSYC_MessageModifier *) msg;
516 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
517 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
519 if (NULL != tmit->notify_mod)
521 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
522 data_size = max_data_size;
523 tmit->in_notify = GNUNET_YES;
524 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
525 &mod->oper, &mod->value_size);
526 tmit->in_notify = GNUNET_NO;
529 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
530 LOG (GNUNET_ERROR_TYPE_DEBUG,
531 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
532 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
533 if (mod->name_size < data_size)
535 tmit->mod_value_remaining
536 = mod->value_size - (data_size - mod->name_size);
537 mod->value_size = htonl (mod->value_size);
538 mod->name_size = htons (mod->name_size);
540 else if (0 < data_size)
542 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
543 notify_ret = GNUNET_SYSERR;
547 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
549 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
550 msg->size = sizeof (struct GNUNET_MessageHeader);
552 if (NULL != tmit->notify_mod)
554 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
555 data_size = max_data_size;
556 tmit->in_notify = GNUNET_YES;
557 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
558 &data_size, &msg[1], NULL, NULL);
559 tmit->in_notify = GNUNET_NO;
561 tmit->mod_value_remaining -= data_size;
562 LOG (GNUNET_ERROR_TYPE_DEBUG,
563 "transmit_mod (ret: %d, size: %u): %.*s\n",
564 notify_ret, data_size, data_size, &msg[1]);
575 { /* Transmission paused, nothing to send. */
576 tmit->paused = GNUNET_YES;
580 = (0 == tmit->mod_value_remaining)
581 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
582 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
585 case GNUNET_YES: /* End of modifiers. */
586 GNUNET_assert (0 == tmit->mod_value_remaining);
590 LOG (GNUNET_ERROR_TYPE_ERROR,
591 "TransmitNotifyModifier callback returned with error.\n");
593 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
594 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
595 msg->size = htons (sizeof (*msg));
596 transmit_queue_insert (tmit, msg, GNUNET_YES);
597 tmit->in_transmit = GNUNET_NO;
603 GNUNET_assert (data_size <= max_data_size);
604 msg->size = htons (msg->size + data_size);
605 transmit_queue_insert (tmit, msg, GNUNET_NO);
608 if (GNUNET_YES == notify_ret)
610 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
611 if (0 == tmit->acks_pending)
612 transmit_data (tmit);
622 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
623 uint32_t *full_value_size)
626 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
627 uint16_t name_size = 0;
628 uint32_t value_size = 0;
629 const char *value = NULL;
633 if (NULL != tmit->mod)
634 tmit->mod = tmit->mod->next;
635 if (NULL == tmit->mod)
636 { /* No more modifiers, continue with data */
641 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
642 *full_value_size = tmit->mod->value_size;
643 *oper = tmit->mod->oper;
644 name_size = strlen (tmit->mod->name) + 1;
646 if (name_size + tmit->mod->value_size <= *data_size)
648 value_size = tmit->mod->value_size;
649 *data_size = name_size + value_size;
651 else /* full modifier does not fit in data, continuation needed */
653 value_size = *data_size - name_size;
654 tmit->mod_value = tmit->mod->value + value_size;
657 GNUNET_memcpy (data, tmit->mod->name, name_size);
658 GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size);
662 { /* Modifier continuation */
663 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
664 value = tmit->mod_value;
665 if (tmit->mod_value_remaining <= *data_size)
667 value_size = tmit->mod_value_remaining;
668 tmit->mod_value = NULL;
672 value_size = *data_size;
673 tmit->mod_value += value_size;
676 if (*data_size < value_size)
678 LOG (GNUNET_ERROR_TYPE_DEBUG,
679 "Value in environment larger than buffer: %u < %zu\n",
680 *data_size, value_size);
685 *data_size = value_size;
686 GNUNET_memcpy (data, value, value_size);
687 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
693 * Transmit a message.
696 * Transmission handle.
698 * Which method should be invoked.
700 * Environment for the message.
701 * Should stay available until the first call to notify_data.
702 * Can be NULL if there are no modifiers or @a notify_mod is
705 * Function to call to obtain modifiers.
706 * Can be NULL if there are no modifiers or @a env is provided instead.
708 * Function to call to obtain fragments of the data.
710 * Closure for @a notify_mod and @a notify_data.
712 * Flags for the message being transmitted.
714 * @return #GNUNET_OK if the transmission was started.
715 * #GNUNET_SYSERR if another transmission is already going on.
718 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
719 const char *method_name,
720 const struct GNUNET_PSYC_Environment *env,
721 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
722 GNUNET_PSYC_TransmitNotifyData notify_data,
726 if (GNUNET_NO != tmit->in_transmit)
727 return GNUNET_SYSERR;
728 tmit->in_transmit = GNUNET_YES;
730 size_t size = strlen (method_name) + 1;
731 struct GNUNET_PSYC_MessageMethod *pmeth;
733 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
734 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
735 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
736 /* store current message size in host byte order
737 * then later switch it to network byte order before sending */
738 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
740 if (NULL != notify_mod)
742 tmit->notify_mod = notify_mod;
743 tmit->notify_mod_cls = notify_cls;
747 tmit->notify_mod = &transmit_notify_env;
748 tmit->notify_mod_cls = tmit;
751 struct GNUNET_PSYC_Modifier mod = {};
752 mod.next = GNUNET_PSYC_env_head (env);
755 struct GNUNET_PSYC_Modifier *m = tmit->mod;
756 while (NULL != (m = m->next))
758 if (m->oper != GNUNET_PSYC_OP_SET)
759 flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
768 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
769 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
770 pmeth->header.size = htons (sizeof (*pmeth) + size);
771 pmeth->flags = htonl (flags);
772 GNUNET_memcpy (&pmeth[1], method_name, size);
774 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
775 tmit->notify_data = notify_data;
776 tmit->notify_data_cls = notify_cls;
784 * Resume transmission.
786 * @param tmit Transmission handle.
789 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
791 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
794 if (0 == tmit->acks_pending)
796 tmit->paused = GNUNET_NO;
797 transmit_data (tmit);
803 * Abort transmission request.
805 * @param tmit Transmission handle.
808 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
810 if (GNUNET_NO == tmit->in_transmit)
813 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
814 tmit->in_transmit = GNUNET_NO;
815 tmit->paused = GNUNET_NO;
818 struct GNUNET_MessageHeader msg;
819 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
820 msg.size = htons (sizeof (msg));
821 transmit_queue_insert (tmit, &msg, GNUNET_YES);
826 * Got acknowledgement of a transmitted message part, continue transmission.
828 * @param tmit Transmission handle.
831 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
833 if (0 == tmit->acks_pending)
835 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
839 tmit->acks_pending--;
841 if (GNUNET_YES == tmit->paused)
846 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
847 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
851 case GNUNET_PSYC_MESSAGE_STATE_DATA:
852 transmit_data (tmit);
855 case GNUNET_PSYC_MESSAGE_STATE_END:
856 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
860 LOG (GNUNET_ERROR_TYPE_DEBUG,
861 "Ignoring message ACK in state %u.\n", tmit->state);
866 /**** Receiving messages ****/
870 * Create handle for receiving messages.
872 struct GNUNET_PSYC_ReceiveHandle *
873 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
874 GNUNET_PSYC_MessagePartCallback message_part_cb,
877 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
878 recv->message_cb = message_cb;
879 recv->message_part_cb = message_part_cb;
880 recv->cb_cls = cb_cls;
886 * Destroy handle for receiving messages.
889 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
896 * Reset stored data related to the last received message.
899 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
901 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
903 recv->message_id = 0;
904 recv->mod_value_size = 0;
905 recv->mod_value_size_expected = 0;
910 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
912 if (NULL != recv->message_part_cb)
913 recv->message_part_cb (recv->cb_cls, NULL, NULL);
915 if (NULL != recv->message_cb)
916 recv->message_cb (recv->cb_cls, NULL);
918 GNUNET_PSYC_receive_reset (recv);
923 * Handle incoming PSYC message.
925 * @param recv Receive handle.
926 * @param msg The message.
928 * @return #GNUNET_OK on success,
929 * #GNUNET_SYSERR on receive error.
932 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
933 const struct GNUNET_PSYC_MessageHeader *msg)
935 uint16_t size = ntohs (msg->header.size);
936 uint32_t flags = ntohl (msg->flags);
938 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
939 (struct GNUNET_MessageHeader *) msg);
941 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
943 recv->message_id = GNUNET_ntohll (msg->message_id);
945 recv->slave_pub_key = msg->slave_pub_key;
946 recv->mod_value_size = 0;
947 recv->mod_value_size_expected = 0;
949 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
952 LOG (GNUNET_ERROR_TYPE_WARNING,
953 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
954 GNUNET_ntohll (msg->message_id), recv->message_id);
957 return GNUNET_SYSERR;
959 else if (flags != recv->flags)
961 LOG (GNUNET_ERROR_TYPE_WARNING,
962 "Unexpected message flags. Got: %lu, expected: %lu\n",
966 return GNUNET_SYSERR;
969 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
971 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
973 const struct GNUNET_MessageHeader *pmsg
974 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
975 psize = ntohs (pmsg->size);
976 ptype = ntohs (pmsg->type);
977 size_eq = size_min = 0;
979 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
981 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
982 "Dropping message of type %u with invalid size %u.\n",
985 return GNUNET_SYSERR;
988 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989 "Received message part of type %u and size %u from PSYC.\n",
991 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
995 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
996 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
998 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
999 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
1001 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1002 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1003 size_min = sizeof (struct GNUNET_MessageHeader);
1005 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1006 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1007 size_eq = sizeof (struct GNUNET_MessageHeader);
1010 GNUNET_break_op (0);
1012 return GNUNET_SYSERR;
1015 if (! ((0 < size_eq && psize == size_eq)
1016 || (0 < size_min && size_min <= psize)))
1018 GNUNET_break_op (0);
1020 return GNUNET_SYSERR;
1025 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1027 struct GNUNET_PSYC_MessageMethod *meth
1028 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1030 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1032 LOG (GNUNET_ERROR_TYPE_WARNING,
1033 "Dropping out of order message method (%u).\n",
1035 /* It is normal to receive an incomplete message right after connecting,
1036 * but should not happen later.
1037 * FIXME: add a check for this condition.
1039 GNUNET_break_op (0);
1041 return GNUNET_SYSERR;
1044 if ('\0' != *((char *) meth + psize - 1))
1046 LOG (GNUNET_ERROR_TYPE_WARNING,
1047 "Dropping message with malformed method. "
1048 "Message ID: %" PRIu64 "\n", recv->message_id);
1049 GNUNET_break_op (0);
1051 return GNUNET_SYSERR;
1053 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1056 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1058 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1059 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1060 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1062 LOG (GNUNET_ERROR_TYPE_WARNING,
1063 "Dropping out of order message modifier (%u).\n",
1065 GNUNET_break_op (0);
1067 return GNUNET_SYSERR;
1070 struct GNUNET_PSYC_MessageModifier *mod
1071 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1073 uint16_t name_size = ntohs (mod->name_size);
1074 recv->mod_value_size_expected = ntohl (mod->value_size);
1075 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1077 if (psize < sizeof (*mod) + name_size
1078 || '\0' != *((char *) &mod[1] + name_size - 1)
1079 || recv->mod_value_size_expected < recv->mod_value_size)
1081 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1082 GNUNET_break_op (0);
1084 return GNUNET_SYSERR;
1086 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1089 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1091 recv->mod_value_size += psize - sizeof (*pmsg);
1093 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1094 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1095 || recv->mod_value_size_expected < recv->mod_value_size)
1097 LOG (GNUNET_ERROR_TYPE_WARNING,
1098 "Dropping out of order message modifier continuation "
1099 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1100 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1101 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1102 recv->mod_value_size_expected, recv->mod_value_size);
1103 GNUNET_break_op (0);
1105 return GNUNET_SYSERR;
1109 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1111 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1112 || recv->mod_value_size_expected != recv->mod_value_size)
1114 LOG (GNUNET_ERROR_TYPE_WARNING,
1115 "Dropping out of order message data fragment "
1116 "(%u < %u || %lu != %lu).\n",
1117 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1118 recv->mod_value_size_expected, recv->mod_value_size);
1120 GNUNET_break_op (0);
1122 return GNUNET_SYSERR;
1124 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1129 if (NULL != recv->message_part_cb)
1130 recv->message_part_cb (recv->cb_cls, msg, pmsg);
1134 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1135 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1136 GNUNET_PSYC_receive_reset (recv);
1141 if (NULL != recv->message_cb)
1142 recv->message_cb (recv->cb_cls, msg);
1148 * Check if @a data contains a series of valid message parts.
1150 * @param data_size Size of @a data.
1152 * @param[out] first_ptype Type of first message part.
1153 * @param[out] last_ptype Type of last message part.
1155 * @return Number of message parts found in @a data.
1156 * or GNUNET_SYSERR if the message contains invalid parts.
1159 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1160 uint16_t *first_ptype, uint16_t *last_ptype)
1162 const struct GNUNET_MessageHeader *pmsg;
1163 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1164 if (NULL != first_ptype)
1166 if (NULL != last_ptype)
1169 for (pos = 0; pos < data_size; pos += psize, parts++)
1171 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1172 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1173 psize = ntohs (pmsg->size);
1174 ptype = ntohs (pmsg->type);
1175 if (0 == parts && NULL != first_ptype)
1176 *first_ptype = ptype;
1177 if (NULL != last_ptype
1178 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1179 *last_ptype = ptype;
1180 if (psize < sizeof (*pmsg)
1181 || pos + psize > data_size
1182 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1183 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1185 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1186 "Invalid message part of type %u and size %u.\n",
1188 return GNUNET_SYSERR;
1190 /** @todo FIXME: check message part order */
1196 struct ParseMessageClosure
1198 struct GNUNET_PSYC_Environment *env;
1199 const char **method_name;
1201 uint16_t *data_size;
1202 enum GNUNET_PSYC_MessageState msg_state;
1207 parse_message_part_cb (void *cls,
1208 const struct GNUNET_PSYC_MessageHeader *msg,
1209 const struct GNUNET_MessageHeader *pmsg)
1211 struct ParseMessageClosure *pmc = cls;
1214 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1218 switch (ntohs (pmsg->type))
1220 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1222 struct GNUNET_PSYC_MessageMethod *
1223 pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1224 *pmc->method_name = (const char *) &pmeth[1];
1225 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1229 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1231 struct GNUNET_PSYC_MessageModifier *
1232 pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1234 const char *name = (const char *) &pmod[1];
1235 const void *value = name + ntohs (pmod->name_size);
1236 GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1237 ntohl (pmod->value_size));
1238 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1242 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1243 *pmc->data = &pmsg[1];
1244 *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg);
1245 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1248 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1249 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1253 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1259 * Parse PSYC message.
1262 * The PSYC message to parse.
1263 * @param[out] method_name
1264 * Pointer to the method name inside @a pmsg.
1266 * The environment for the message with a list of modifiers.
1268 * Pointer to data inside @a msg.
1269 * @param[out] data_size
1270 * Size of @data is written here.
1272 * @return #GNUNET_OK on success,
1273 * #GNUNET_SYSERR on parse error.
1276 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1277 const char **method_name,
1278 struct GNUNET_PSYC_Environment *env,
1280 uint16_t *data_size)
1282 struct ParseMessageClosure cls;
1284 cls.method_name = method_name;
1286 cls.data_size = data_size;
1288 struct GNUNET_PSYC_ReceiveHandle *
1289 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1290 int ret = GNUNET_PSYC_receive_message (recv, msg);
1291 GNUNET_PSYC_receive_destroy (recv);
1293 if (GNUNET_OK != ret)
1294 return GNUNET_SYSERR;
1296 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1303 * Initialize PSYC message header.
1306 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1307 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1310 uint16_t size = ntohs (mmsg->header.size);
1311 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1313 pmsg->header.size = htons (psize);
1314 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1315 pmsg->message_id = mmsg->message_id;
1316 pmsg->fragment_offset = mmsg->fragment_offset;
1317 pmsg->flags = htonl (flags);
1319 GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1324 * Create a new PSYC message header from a multicast message.
1326 struct GNUNET_PSYC_MessageHeader *
1327 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1330 struct GNUNET_PSYC_MessageHeader *pmsg;
1331 uint16_t size = ntohs (mmsg->header.size);
1332 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1334 pmsg = GNUNET_malloc (psize);
1335 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1341 * Create a new PSYC message header from a PSYC message.
1343 struct GNUNET_PSYC_MessageHeader *
1344 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1346 uint16_t msg_size = ntohs (msg->header.size);
1347 struct GNUNET_PSYC_MessageHeader *
1348 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1349 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1350 pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1351 GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));