2 * This file is part of GNUnet
3 * Copyright (C) 2013 GNUnet e.V.
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/psyc_util_lib.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_psyc_util_lib.h"
32 #include "gnunet_psyc_service.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
37 struct GNUNET_PSYC_TransmitHandle
40 * Client connection to service.
42 struct GNUNET_CLIENT_MANAGER_Connection *client;
45 * Message currently being received from the client.
47 struct GNUNET_MessageHeader *msg;
50 * Callback to request next modifier from client.
52 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
55 * Closure for the notify callbacks.
60 * Callback to request next data fragment from client.
62 GNUNET_PSYC_TransmitNotifyData notify_data;
65 * Closure for the notify callbacks.
67 void *notify_data_cls;
70 * Modifier of the environment that is currently being transmitted.
72 struct GNUNET_PSYC_Modifier *mod;
77 const char *mod_value;
80 * Number of bytes remaining to be transmitted from the current modifier value.
82 uint32_t mod_value_remaining;
85 * State of the current message being received from client.
87 enum GNUNET_PSYC_MessageState state;
90 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
95 * Is transmission paused?
100 * Are we currently transmitting a message?
105 * Notify callback is currently being called.
113 struct GNUNET_PSYC_ReceiveHandle
118 GNUNET_PSYC_MessageCallback message_cb;
121 * Message part callback.
123 GNUNET_PSYC_MessagePartCallback message_part_cb;
126 * Closure for the callbacks.
131 * ID of the message being received from the PSYC service.
136 * Public key of the slave from which a message is being received.
138 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
141 * State of the currently being received message from the PSYC service.
143 enum GNUNET_PSYC_MessageState state;
146 * Flags for the currently being received message from the PSYC service.
148 enum GNUNET_PSYC_MessageFlags flags;
151 * Expected value size for the modifier being received from the PSYC service.
153 uint32_t mod_value_size_expected;
156 * Actual value size for the modifier being received from the PSYC service.
158 uint32_t mod_value_size;
166 * Create a PSYC message.
169 * PSYC method for the message.
171 * Environment for the message.
173 * Data payload for the message.
177 * @return Message header with size information,
178 * followed by the message parts.
180 struct GNUNET_PSYC_Message *
181 GNUNET_PSYC_message_create (const char *method_name,
182 const struct GNUNET_PSYC_Environment *env,
186 struct GNUNET_PSYC_Modifier *mod = NULL;
187 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
188 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
189 struct GNUNET_MessageHeader *pmsg = NULL;
190 uint16_t env_size = 0;
193 mod = GNUNET_PSYC_env_head (env);
196 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
201 struct GNUNET_PSYC_Message *msg;
202 uint16_t method_name_size = strlen (method_name) + 1;
203 if (method_name_size == 1)
206 uint16_t msg_size = sizeof (*msg) /* header */
207 + sizeof (*pmeth) + method_name_size /* method */
208 + env_size /* modifiers */
209 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
210 + sizeof (*pmsg); /* end of message */
211 msg = GNUNET_malloc (msg_size);
212 msg->header.size = htons (msg_size);
213 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
215 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
216 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
217 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
218 memcpy (&pmeth[1], method_name, method_name_size);
220 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
223 mod = GNUNET_PSYC_env_head (env);
226 uint16_t mod_name_size = strlen (mod->name) + 1;
227 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
228 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
229 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
230 p += pmod->header.size;
231 pmod->header.size = htons (pmod->header.size);
233 pmod->oper = mod->oper;
234 pmod->name_size = htons (mod_name_size);
235 pmod->value_size = htonl (mod->value_size);
237 memcpy (&pmod[1], mod->name, mod_name_size);
238 if (0 < mod->value_size)
239 memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
247 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
248 pmsg->size = sizeof (*pmsg) + data_size;
250 pmsg->size = htons (pmsg->size);
251 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
252 memcpy (&pmsg[1], data, data_size);
255 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
256 pmsg->size = htons (sizeof (*pmsg));
257 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
259 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
265 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
266 const struct GNUNET_MessageHeader *msg)
268 uint16_t size = ntohs (msg->size);
269 uint16_t type = ntohs (msg->type);
270 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
273 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
275 struct GNUNET_PSYC_MessageHeader *pmsg
276 = (struct GNUNET_PSYC_MessageHeader *) msg;
277 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
278 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
281 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
283 struct GNUNET_PSYC_MessageMethod *meth
284 = (struct GNUNET_PSYC_MessageMethod *) msg;
285 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
288 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
290 struct GNUNET_PSYC_MessageModifier *mod
291 = (struct GNUNET_PSYC_MessageModifier *) msg;
292 uint16_t name_size = ntohs (mod->name_size);
293 char oper = ' ' < mod->oper ? mod->oper : ' ';
294 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
295 size - sizeof (*mod) - name_size,
296 ((char *) &mod[1]) + name_size);
299 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
300 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
301 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
307 /**** Transmitting messages ****/
311 * Create a transmission handle.
313 struct GNUNET_PSYC_TransmitHandle *
314 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
316 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
317 tmit->client = client;
323 * Destroy a transmission handle.
326 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
333 * Queue a message part for transmission.
335 * The message part is added to the current message buffer.
336 * When this buffer is full, it is added to the transmission queue.
339 * Transmission handle.
341 * Message part, or NULL.
343 * Transmit message now, or wait for buffer to fill up?
344 * #GNUNET_YES or #GNUNET_NO.
347 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
348 const struct GNUNET_MessageHeader *msg,
351 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
353 LOG (GNUNET_ERROR_TYPE_DEBUG,
354 "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
355 NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
357 if (NULL != tmit->msg)
360 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
362 /* End of message or buffer is full, add it to transmission queue
363 * and start with empty buffer */
364 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
365 tmit->msg->size = htons (tmit->msg->size);
366 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
368 tmit->acks_pending++;
372 /* Message fits in current buffer, append */
373 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
374 memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
375 tmit->msg->size += size;
379 if (NULL == tmit->msg && NULL != msg)
381 /* Empty buffer, copy over message. */
382 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
383 tmit->msg->size = sizeof (*tmit->msg) + size;
384 memcpy (&tmit->msg[1], msg, size);
387 if (NULL != tmit->msg
388 && (GNUNET_YES == tmit_now
389 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
390 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
392 /* End of message or buffer is full, add it to transmission queue. */
393 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
394 tmit->msg->size = htons (tmit->msg->size);
395 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
397 tmit->acks_pending++;
403 * Request data from client to transmit.
405 * @param tmit Transmission handle.
408 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
410 int notify_ret = GNUNET_YES;
411 uint16_t data_size = 0;
412 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
413 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
414 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
416 if (NULL != tmit->notify_data)
418 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
419 tmit->in_notify = GNUNET_YES;
420 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
421 tmit->in_notify = GNUNET_NO;
423 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 "transmit_data (ret: %d, size: %u): %.*s\n",
425 notify_ret, data_size, data_size, &msg[1]);
431 /* Transmission paused, nothing to send. */
432 tmit->paused = GNUNET_YES;
438 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
442 LOG (GNUNET_ERROR_TYPE_ERROR,
443 "TransmitNotifyData callback returned error when requesting data.\n");
445 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
446 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
447 msg->size = htons (sizeof (*msg));
448 transmit_queue_insert (tmit, msg, GNUNET_YES);
449 tmit->in_transmit = GNUNET_NO;
455 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
456 msg->size = htons (sizeof (*msg) + data_size);
457 transmit_queue_insert (tmit, msg, !notify_ret);
460 /* End of message. */
461 if (GNUNET_YES == notify_ret)
463 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
464 msg->size = htons (sizeof (*msg));
465 transmit_queue_insert (tmit, msg, GNUNET_YES);
466 /* FIXME: wait for ACK before setting in_transmit to no */
467 tmit->in_transmit = GNUNET_NO;
473 * Request a modifier from a client to transmit.
475 * @param tmit Transmission handle.
478 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
480 uint16_t max_data_size = 0;
481 uint16_t data_size = 0;
482 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
483 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
484 int notify_ret = GNUNET_YES;
488 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
490 struct GNUNET_PSYC_MessageModifier *mod
491 = (struct GNUNET_PSYC_MessageModifier *) msg;
492 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
493 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
495 if (NULL != tmit->notify_mod)
497 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
498 data_size = max_data_size;
499 tmit->in_notify = GNUNET_YES;
500 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
501 &mod->oper, &mod->value_size);
502 tmit->in_notify = GNUNET_NO;
505 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
506 LOG (GNUNET_ERROR_TYPE_DEBUG,
507 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
508 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
509 if (mod->name_size < data_size)
511 tmit->mod_value_remaining
512 = mod->value_size - (data_size - mod->name_size);
513 mod->value_size = htonl (mod->value_size);
514 mod->name_size = htons (mod->name_size);
516 else if (0 < data_size)
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
519 notify_ret = GNUNET_SYSERR;
523 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
525 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
526 msg->size = sizeof (struct GNUNET_MessageHeader);
528 if (NULL != tmit->notify_mod)
530 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
531 data_size = max_data_size;
532 tmit->in_notify = GNUNET_YES;
533 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
534 &data_size, &msg[1], NULL, NULL);
535 tmit->in_notify = GNUNET_NO;
537 tmit->mod_value_remaining -= data_size;
538 LOG (GNUNET_ERROR_TYPE_DEBUG,
539 "transmit_mod (ret: %d, size: %u): %.*s\n",
540 notify_ret, data_size, data_size, &msg[1]);
551 { /* Transmission paused, nothing to send. */
552 tmit->paused = GNUNET_YES;
556 = (0 == tmit->mod_value_remaining)
557 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
558 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
561 case GNUNET_YES: /* End of modifiers. */
562 GNUNET_assert (0 == tmit->mod_value_remaining);
566 LOG (GNUNET_ERROR_TYPE_ERROR,
567 "TransmitNotifyModifier callback returned with error.\n");
569 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
570 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
571 msg->size = htons (sizeof (*msg));
572 transmit_queue_insert (tmit, msg, GNUNET_YES);
573 tmit->in_transmit = GNUNET_NO;
579 GNUNET_assert (data_size <= max_data_size);
580 msg->size = htons (msg->size + data_size);
581 transmit_queue_insert (tmit, msg, GNUNET_NO);
584 if (GNUNET_YES == notify_ret)
586 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
587 if (0 == tmit->acks_pending)
588 transmit_data (tmit);
598 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
599 uint32_t *full_value_size)
602 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
603 uint16_t name_size = 0;
604 uint32_t value_size = 0;
605 const char *value = NULL;
609 if (NULL != tmit->mod)
610 tmit->mod = tmit->mod->next;
611 if (NULL == tmit->mod)
612 { /* No more modifiers, continue with data */
617 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
618 *full_value_size = tmit->mod->value_size;
619 *oper = tmit->mod->oper;
620 name_size = strlen (tmit->mod->name) + 1;
622 if (name_size + tmit->mod->value_size <= *data_size)
624 value_size = tmit->mod->value_size;
625 *data_size = name_size + value_size;
627 else /* full modifier does not fit in data, continuation needed */
629 value_size = *data_size - name_size;
630 tmit->mod_value = tmit->mod->value + value_size;
633 memcpy (data, tmit->mod->name, name_size);
634 memcpy ((char *)data + name_size, tmit->mod->value, value_size);
638 { /* Modifier continuation */
639 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
640 value = tmit->mod_value;
641 if (tmit->mod_value_remaining <= *data_size)
643 value_size = tmit->mod_value_remaining;
644 tmit->mod_value = NULL;
648 value_size = *data_size;
649 tmit->mod_value += value_size;
652 if (*data_size < value_size)
654 LOG (GNUNET_ERROR_TYPE_DEBUG,
655 "Value in environment larger than buffer: %u < %zu\n",
656 *data_size, value_size);
661 *data_size = value_size;
662 memcpy (data, value, value_size);
663 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
669 * Transmit a message.
672 * Transmission handle.
674 * Which method should be invoked.
676 * Environment for the message.
677 * Should stay available until the first call to notify_data.
678 * Can be NULL if there are no modifiers or @a notify_mod is
681 * Function to call to obtain modifiers.
682 * Can be NULL if there are no modifiers or @a env is provided instead.
684 * Function to call to obtain fragments of the data.
686 * Closure for @a notify_mod and @a notify_data.
688 * Flags for the message being transmitted.
690 * @return #GNUNET_OK if the transmission was started.
691 * #GNUNET_SYSERR if another transmission is already going on.
694 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
695 const char *method_name,
696 const struct GNUNET_PSYC_Environment *env,
697 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
698 GNUNET_PSYC_TransmitNotifyData notify_data,
702 if (GNUNET_NO != tmit->in_transmit)
703 return GNUNET_SYSERR;
704 tmit->in_transmit = GNUNET_YES;
706 size_t size = strlen (method_name) + 1;
707 struct GNUNET_PSYC_MessageMethod *pmeth;
708 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
709 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
711 if (NULL != notify_mod)
713 tmit->notify_mod = notify_mod;
714 tmit->notify_mod_cls = notify_cls;
718 tmit->notify_mod = &transmit_notify_env;
719 tmit->notify_mod_cls = tmit;
722 struct GNUNET_PSYC_Modifier mod = {};
723 mod.next = GNUNET_PSYC_env_head (env);
726 struct GNUNET_PSYC_Modifier *m = tmit->mod;
727 while (NULL != (m = m->next))
729 if (m->oper != GNUNET_PSYC_OP_SET)
730 flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
739 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
740 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
741 pmeth->header.size = htons (sizeof (*pmeth) + size);
742 pmeth->flags = htonl (flags);
743 memcpy (&pmeth[1], method_name, size);
745 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
746 tmit->notify_data = notify_data;
747 tmit->notify_data_cls = notify_cls;
755 * Resume transmission.
757 * @param tmit Transmission handle.
760 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
762 if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
765 if (0 == tmit->acks_pending)
767 tmit->paused = GNUNET_NO;
768 transmit_data (tmit);
774 * Abort transmission request.
776 * @param tmit Transmission handle.
779 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
781 if (GNUNET_NO == tmit->in_transmit)
784 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
785 tmit->in_transmit = GNUNET_NO;
786 tmit->paused = GNUNET_NO;
789 struct GNUNET_MessageHeader msg;
790 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
791 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
792 msg.size = htons (sizeof (msg));
793 transmit_queue_insert (tmit, &msg, GNUNET_YES);
798 * Got acknowledgement of a transmitted message part, continue transmission.
800 * @param tmit Transmission handle.
803 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
805 if (0 == tmit->acks_pending)
807 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
811 tmit->acks_pending--;
815 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
816 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
820 case GNUNET_PSYC_MESSAGE_STATE_DATA:
821 transmit_data (tmit);
824 case GNUNET_PSYC_MESSAGE_STATE_END:
825 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
829 LOG (GNUNET_ERROR_TYPE_DEBUG,
830 "Ignoring message ACK in state %u.\n", tmit->state);
835 /**** Receiving messages ****/
839 * Create handle for receiving messages.
841 struct GNUNET_PSYC_ReceiveHandle *
842 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
843 GNUNET_PSYC_MessagePartCallback message_part_cb,
846 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
847 recv->message_cb = message_cb;
848 recv->message_part_cb = message_part_cb;
849 recv->cb_cls = cb_cls;
855 * Destroy handle for receiving messages.
858 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
865 * Reset stored data related to the last received message.
868 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
870 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
872 recv->message_id = 0;
873 recv->mod_value_size = 0;
874 recv->mod_value_size_expected = 0;
879 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
881 if (NULL != recv->message_part_cb)
882 recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
885 if (NULL != recv->message_cb)
886 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
888 GNUNET_PSYC_receive_reset (recv);
893 * Handle incoming PSYC message.
895 * @param recv Receive handle.
896 * @param msg The message.
898 * @return #GNUNET_OK on success,
899 * #GNUNET_SYSERR on receive error.
902 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
903 const struct GNUNET_PSYC_MessageHeader *msg)
905 uint16_t size = ntohs (msg->header.size);
906 uint32_t flags = ntohl (msg->flags);
909 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
910 (struct GNUNET_MessageHeader *) msg);
912 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
914 recv->message_id = GNUNET_ntohll (msg->message_id);
916 recv->slave_pub_key = msg->slave_pub_key;
917 recv->mod_value_size = 0;
918 recv->mod_value_size_expected = 0;
920 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
923 LOG (GNUNET_ERROR_TYPE_WARNING,
924 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
925 GNUNET_ntohll (msg->message_id), recv->message_id);
928 return GNUNET_SYSERR;
930 else if (flags != recv->flags)
932 LOG (GNUNET_ERROR_TYPE_WARNING,
933 "Unexpected message flags. Got: %lu, expected: %lu\n",
937 return GNUNET_SYSERR;
939 message_id = recv->message_id;
941 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
943 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
945 const struct GNUNET_MessageHeader *pmsg
946 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
947 psize = ntohs (pmsg->size);
948 ptype = ntohs (pmsg->type);
949 size_eq = size_min = 0;
951 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
953 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
954 "Dropping message of type %u with invalid size %u.\n",
957 return GNUNET_SYSERR;
960 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961 "Received message part of type %u and size %u from PSYC.\n",
963 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
967 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
968 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
970 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
971 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
973 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
974 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
975 size_min = sizeof (struct GNUNET_MessageHeader);
977 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
978 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
979 size_eq = sizeof (struct GNUNET_MessageHeader);
984 return GNUNET_SYSERR;
987 if (! ((0 < size_eq && psize == size_eq)
988 || (0 < size_min && size_min <= psize)))
992 return GNUNET_SYSERR;
997 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
999 struct GNUNET_PSYC_MessageMethod *meth
1000 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
1002 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
1004 LOG (GNUNET_ERROR_TYPE_WARNING,
1005 "Dropping out of order message method (%u).\n",
1007 /* It is normal to receive an incomplete message right after connecting,
1008 * but should not happen later.
1009 * FIXME: add a check for this condition.
1011 GNUNET_break_op (0);
1013 return GNUNET_SYSERR;
1016 if ('\0' != *((char *) meth + psize - 1))
1018 LOG (GNUNET_ERROR_TYPE_WARNING,
1019 "Dropping message with malformed method. "
1020 "Message ID: %" PRIu64 "\n", recv->message_id);
1021 GNUNET_break_op (0);
1023 return GNUNET_SYSERR;
1025 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1028 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1030 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1031 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1032 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1034 LOG (GNUNET_ERROR_TYPE_WARNING,
1035 "Dropping out of order message modifier (%u).\n",
1037 GNUNET_break_op (0);
1039 return GNUNET_SYSERR;
1042 struct GNUNET_PSYC_MessageModifier *mod
1043 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1045 uint16_t name_size = ntohs (mod->name_size);
1046 recv->mod_value_size_expected = ntohl (mod->value_size);
1047 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1049 if (psize < sizeof (*mod) + name_size
1050 || '\0' != *((char *) &mod[1] + name_size - 1)
1051 || recv->mod_value_size_expected < recv->mod_value_size)
1053 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1054 GNUNET_break_op (0);
1056 return GNUNET_SYSERR;
1058 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1061 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1063 recv->mod_value_size += psize - sizeof (*pmsg);
1065 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1066 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1067 || recv->mod_value_size_expected < recv->mod_value_size)
1069 LOG (GNUNET_ERROR_TYPE_WARNING,
1070 "Dropping out of order message modifier continuation "
1071 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1072 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1073 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1074 recv->mod_value_size_expected, recv->mod_value_size);
1075 GNUNET_break_op (0);
1077 return GNUNET_SYSERR;
1081 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1083 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1084 || recv->mod_value_size_expected != recv->mod_value_size)
1086 LOG (GNUNET_ERROR_TYPE_WARNING,
1087 "Dropping out of order message data fragment "
1088 "(%u < %u || %lu != %lu).\n",
1089 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1090 recv->mod_value_size_expected, recv->mod_value_size);
1092 GNUNET_break_op (0);
1094 return GNUNET_SYSERR;
1096 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1101 if (NULL != recv->message_part_cb)
1102 recv->message_part_cb (recv->cb_cls, &recv->slave_pub_key,
1103 recv->message_id, recv->flags,
1104 GNUNET_ntohll (msg->fragment_offset),
1109 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1110 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1111 GNUNET_PSYC_receive_reset (recv);
1116 if (NULL != recv->message_cb)
1117 recv->message_cb (recv->cb_cls, message_id, flags, msg);
1123 * Check if @a data contains a series of valid message parts.
1125 * @param data_size Size of @a data.
1127 * @param[out] first_ptype Type of first message part.
1128 * @param[out] last_ptype Type of last message part.
1130 * @return Number of message parts found in @a data.
1131 * or GNUNET_SYSERR if the message contains invalid parts.
1134 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1135 uint16_t *first_ptype, uint16_t *last_ptype)
1137 const struct GNUNET_MessageHeader *pmsg;
1138 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1139 if (NULL != first_ptype)
1141 if (NULL != last_ptype)
1144 for (pos = 0; pos < data_size; pos += psize, parts++)
1146 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1147 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1148 psize = ntohs (pmsg->size);
1149 ptype = ntohs (pmsg->type);
1150 if (0 == parts && NULL != first_ptype)
1151 *first_ptype = ptype;
1152 if (NULL != last_ptype
1153 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1154 *last_ptype = ptype;
1155 if (psize < sizeof (*pmsg)
1156 || pos + psize > data_size
1157 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1158 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1161 "Invalid message part of type %u and size %u.\n",
1163 return GNUNET_SYSERR;
1165 /** @todo FIXME: check message part order */
1171 struct ParseMessageClosure
1173 struct GNUNET_PSYC_Environment *env;
1174 const char **method_name;
1176 uint16_t *data_size;
1177 enum GNUNET_PSYC_MessageState msg_state;
1182 parse_message_part_cb (void *cls,
1183 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
1184 uint64_t message_id, uint32_t flags, uint64_t fragment_offset,
1185 const struct GNUNET_MessageHeader *msg)
1187 struct ParseMessageClosure *pmc = cls;
1190 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1194 switch (ntohs (msg->type))
1196 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1198 struct GNUNET_PSYC_MessageMethod *
1199 pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1200 *pmc->method_name = (const char *) &pmeth[1];
1201 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1205 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1207 struct GNUNET_PSYC_MessageModifier *
1208 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1210 const char *name = (const char *) &pmod[1];
1211 const void *value = name + ntohs (pmod->name_size);
1212 GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
1213 ntohl (pmod->value_size));
1214 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1218 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1219 *pmc->data = &msg[1];
1220 *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1221 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1224 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1225 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1229 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1235 * Parse PSYC message.
1238 * The PSYC message to parse.
1239 * @param[out] method_name
1240 * Pointer to the method name inside @a pmsg.
1242 * The environment for the message with a list of modifiers.
1244 * Pointer to data inside @a pmsg.
1245 * @param[out] data_size
1246 * Size of @data is written here.
1248 * @return #GNUNET_OK on success,
1249 * #GNUNET_SYSERR on parse error.
1252 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1253 const char **method_name,
1254 struct GNUNET_PSYC_Environment *env,
1256 uint16_t *data_size)
1258 struct ParseMessageClosure cls;
1260 cls.method_name = method_name;
1262 cls.data_size = data_size;
1264 struct GNUNET_PSYC_ReceiveHandle *
1265 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1266 int ret = GNUNET_PSYC_receive_message (recv, msg);
1267 GNUNET_PSYC_receive_destroy (recv);
1269 if (GNUNET_OK != ret)
1270 return GNUNET_SYSERR;
1272 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1279 * Initialize PSYC message header.
1282 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1283 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1286 uint16_t size = ntohs (mmsg->header.size);
1287 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1289 pmsg->header.size = htons (psize);
1290 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1291 pmsg->message_id = mmsg->message_id;
1292 pmsg->fragment_offset = mmsg->fragment_offset;
1293 pmsg->flags = htonl (flags);
1295 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1300 * Create a new PSYC message header from a multicast message.
1302 struct GNUNET_PSYC_MessageHeader *
1303 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1306 struct GNUNET_PSYC_MessageHeader *pmsg;
1307 uint16_t size = ntohs (mmsg->header.size);
1308 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1310 pmsg = GNUNET_malloc (psize);
1311 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1317 * Create a new PSYC message header from a PSYC message.
1319 struct GNUNET_PSYC_MessageHeader *
1320 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1322 uint16_t msg_size = ntohs (msg->header.size);
1323 struct GNUNET_PSYC_MessageHeader *
1324 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1325 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1326 pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1327 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));