2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
22 * @file psyc/psyc_util_lib.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_env_lib.h"
32 #include "gnunet_psyc_service.h"
33 #include "gnunet_psyc_util_lib.h"
35 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
38 * Message receive states.
40 enum GNUNET_PSYC_MessageState
42 GNUNET_PSYC_MESSAGE_STATE_START = 0,
43 GNUNET_PSYC_MESSAGE_STATE_HEADER = 1,
44 GNUNET_PSYC_MESSAGE_STATE_METHOD = 2,
45 GNUNET_PSYC_MESSAGE_STATE_MODIFIER = 3,
46 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT = 4,
47 GNUNET_PSYC_MESSAGE_STATE_DATA = 5,
48 GNUNET_PSYC_MESSAGE_STATE_END = 6,
49 GNUNET_PSYC_MESSAGE_STATE_CANCEL = 7,
50 GNUNET_PSYC_MESSAGE_STATE_ERROR = 8,
54 struct GNUNET_PSYC_TransmitHandle
57 * Client connection to service.
59 struct GNUNET_CLIENT_MANAGER_Connection *client;
62 * Message currently being received from the client.
64 struct GNUNET_MessageHeader *msg;
67 * Callback to request next modifier from client.
69 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
72 * Closure for the notify callbacks.
77 * Callback to request next data fragment from client.
79 GNUNET_PSYC_TransmitNotifyData notify_data;
82 * Closure for the notify callbacks.
84 void *notify_data_cls;
87 * Modifier of the environment that is currently being transmitted.
89 struct GNUNET_ENV_Modifier *mod;
94 const char *mod_value;
95 size_t mod_value_size;
98 * State of the current message being received from client.
100 enum GNUNET_PSYC_MessageState state;
103 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
105 uint8_t acks_pending;
108 * Is transmission paused?
113 * Are we currently transmitting a message?
120 struct GNUNET_PSYC_ReceiveHandle
123 * Message part callback.
125 GNUNET_PSYC_MessageCallback message_cb;
128 * Message part callback for historic message.
130 GNUNET_PSYC_MessageCallback hist_message_cb;
133 * Closure for the callbacks.
138 * ID of the message being received from the PSYC service.
143 * Public key of the slave from which a message is being received.
145 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
148 * State of the currently being received message from the PSYC service.
150 enum GNUNET_PSYC_MessageState state;
153 * Flags for the currently being received message from the PSYC service.
155 enum GNUNET_PSYC_MessageFlags flags;
158 * Expected value size for the modifier being received from the PSYC service.
160 uint32_t mod_value_size_expected;
163 * Actual value size for the modifier being received from the PSYC service.
165 uint32_t mod_value_size;
170 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
171 const struct GNUNET_MessageHeader *msg)
173 uint16_t size = ntohs (msg->size);
174 uint16_t type = ntohs (msg->type);
175 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
178 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
180 struct GNUNET_PSYC_MessageHeader *pmsg
181 = (struct GNUNET_PSYC_MessageHeader *) msg;
182 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
183 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
186 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
188 struct GNUNET_PSYC_MessageMethod *meth
189 = (struct GNUNET_PSYC_MessageMethod *) msg;
190 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
193 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
195 struct GNUNET_PSYC_MessageModifier *mod
196 = (struct GNUNET_PSYC_MessageModifier *) msg;
197 uint16_t name_size = ntohs (mod->name_size);
198 char oper = ' ' < mod->oper ? mod->oper : ' ';
199 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
200 size - sizeof (*mod) - name_size - 1,
201 ((char *) &mod[1]) + name_size + 1);
204 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
205 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
206 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
212 /**** Transmitting messages ****/
216 * Create a transmission handle.
218 struct GNUNET_PSYC_TransmitHandle *
219 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
221 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
222 tmit->client = client;
228 * Destroy a transmission handle.
231 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
238 * Queue a message part for transmission.
240 * The message part is added to the current message buffer.
241 * When this buffer is full, it is added to the transmission queue.
243 * @param tmit Transmission handle.
244 * @param msg Message part, or NULL.
245 * @param end End of message? #GNUNET_YES or #GNUNET_NO.
248 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
249 const struct GNUNET_MessageHeader *msg,
252 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
254 LOG (GNUNET_ERROR_TYPE_DEBUG,
255 "Queueing message of type %u and size %u (end: %u)).\n",
256 ntohs (msg->type), size, end);
258 if (NULL != tmit->msg)
261 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
263 /* End of message or buffer is full, add it to transmission queue
264 * and start with empty buffer */
265 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
266 tmit->msg->size = htons (tmit->msg->size);
267 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
269 tmit->acks_pending++;
273 /* Message fits in current buffer, append */
274 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
275 memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
276 tmit->msg->size += size;
280 if (NULL == tmit->msg && NULL != msg)
282 /* Empty buffer, copy over message. */
283 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
284 tmit->msg->size = sizeof (*tmit->msg) + size;
285 memcpy (&tmit->msg[1], msg, size);
288 if (NULL != tmit->msg
289 && (GNUNET_YES == end
290 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
291 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
293 /* End of message or buffer is full, add it to transmission queue. */
294 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
295 tmit->msg->size = htons (tmit->msg->size);
296 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
298 tmit->acks_pending++;
301 if (GNUNET_YES == end)
302 tmit->in_transmit = GNUNET_NO;
307 * Request data from client to transmit.
309 * @param tmit Transmission handle.
312 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
314 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
315 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
316 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
317 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
319 int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
325 /* Transmission paused, nothing to send. */
326 tmit->paused = GNUNET_YES;
332 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
336 LOG (GNUNET_ERROR_TYPE_ERROR,
337 "TransmitNotifyData callback returned error when requesting data.\n");
339 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
340 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
341 msg->size = htons (sizeof (*msg));
342 transmit_queue_insert (tmit, msg, GNUNET_YES);
348 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
349 msg->size = htons (sizeof (*msg) + data_size);
350 transmit_queue_insert (tmit, msg, !notify_ret);
353 /* End of message. */
354 if (GNUNET_YES == notify_ret)
356 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
357 msg->size = htons (sizeof (*msg));
358 transmit_queue_insert (tmit, msg, GNUNET_YES);
364 * Request a modifier from a client to transmit.
366 * @param tmit Transmission handle.
369 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
371 uint16_t max_data_size, data_size;
372 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
373 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
378 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
380 struct GNUNET_PSYC_MessageModifier *mod
381 = (struct GNUNET_PSYC_MessageModifier *) msg;
382 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
383 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
384 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
385 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
386 &mod->oper, &mod->value_size);
387 mod->name_size = strnlen ((char *) &mod[1], data_size);
388 if (mod->name_size < data_size)
390 mod->value_size = htonl (mod->value_size);
391 mod->name_size = htons (mod->name_size);
393 else if (0 < data_size)
395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
396 notify_ret = GNUNET_SYSERR;
400 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
402 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
403 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
404 msg->size = sizeof (struct GNUNET_MessageHeader);
405 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
406 &data_size, &msg[1], NULL, NULL);
417 { /* Transmission paused, nothing to send. */
418 tmit->paused = GNUNET_YES;
421 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
427 /* End of modifiers. */
428 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
429 if (0 == tmit->acks_pending)
430 transmit_data (tmit);
434 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
438 LOG (GNUNET_ERROR_TYPE_ERROR,
439 "TransmitNotifyModifier callback returned error "
440 "when requesting a modifier.\n");
442 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
443 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
444 msg->size = htons (sizeof (*msg));
446 transmit_queue_insert (tmit, msg, GNUNET_YES);
452 GNUNET_assert (data_size <= max_data_size);
453 msg->size = htons (msg->size + data_size);
454 transmit_queue_insert (tmit, msg, GNUNET_NO);
462 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
463 uint32_t *full_value_size)
466 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
467 uint16_t name_size = 0;
468 size_t value_size = 0;
469 const char *value = NULL;
471 if (NULL != oper && NULL != tmit->mod)
473 tmit->mod = tmit->mod->next;
474 if (NULL == tmit->mod)
475 { /* No more modifiers, continue with data */
480 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
481 *full_value_size = tmit->mod->value_size;
482 *oper = tmit->mod->oper;
483 name_size = strlen (tmit->mod->name);
485 if (name_size + 1 + tmit->mod->value_size <= *data_size)
487 *data_size = name_size + 1 + tmit->mod->value_size;
491 tmit->mod_value_size = tmit->mod->value_size;
492 value_size = *data_size - name_size - 1;
493 tmit->mod_value_size -= value_size;
494 tmit->mod_value = tmit->mod->value + value_size;
497 memcpy (data, tmit->mod->name, name_size);
498 ((char *)data)[name_size] = '\0';
499 memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
501 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
502 { /* Modifier continuation */
503 value = tmit->mod_value;
504 if (tmit->mod_value_size <= *data_size)
506 value_size = tmit->mod_value_size;
507 tmit->mod_value = NULL;
511 value_size = *data_size;
512 tmit->mod_value += value_size;
514 tmit->mod_value_size -= value_size;
516 if (*data_size < value_size)
518 LOG (GNUNET_ERROR_TYPE_DEBUG,
519 "Value in environment larger than buffer: %u < %zu\n",
520 *data_size, value_size);
525 *data_size = value_size;
526 memcpy (data, value, value_size);
529 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
534 * Transmit a message.
536 * @param tmit Transmission handle.
537 * @param method_name Which method should be invoked.
538 * @param env Environment for the message.
539 * Should stay available until the first call to notify_data.
540 * Can be NULL if there are no modifiers or @a notify_mod is provided instead.
541 * @param notify_mod Function to call to obtain modifiers.
542 * Can be NULL if there are no modifiers or @a env is provided instead.
543 * @param notify_data Function to call to obtain fragments of the data.
544 * @param notify_cls Closure for @a notify_mod and @a notify_data.
545 * @param flags Flags for the message being transmitted.
547 * @return #GNUNET_OK if the transmission was started.
548 * #GNUNET_SYSERR if another transmission is already going on.
551 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
552 const char *method_name,
553 const struct GNUNET_ENV_Environment *env,
554 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
555 GNUNET_PSYC_TransmitNotifyData notify_data,
559 if (GNUNET_NO != tmit->in_transmit)
560 return GNUNET_SYSERR;
561 tmit->in_transmit = GNUNET_YES;
563 size_t size = strlen (method_name) + 1;
564 struct GNUNET_PSYC_MessageMethod *pmeth;
565 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
566 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
568 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
569 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
570 pmeth->header.size = htons (sizeof (*pmeth) + size);
571 pmeth->flags = htonl (flags);
572 memcpy (&pmeth[1], method_name, size);
574 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
575 tmit->notify_data = notify_data;
576 tmit->notify_data_cls = notify_cls;
578 if (NULL != notify_mod)
580 tmit->notify_mod = notify_mod;
581 tmit->notify_mod_cls = notify_cls;
585 tmit->notify_mod = &transmit_notify_env;
586 tmit->notify_mod_cls = tmit;
589 ? GNUNET_ENV_environment_head (env)
599 * Resume transmission.
601 * @param tmit Transmission handle.
604 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
606 if (0 == tmit->acks_pending)
608 tmit->paused = GNUNET_NO;
609 transmit_data (tmit);
615 * Abort transmission request.
617 * @param tmit Transmission handle.
620 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
622 if (GNUNET_NO == tmit->in_transmit)
625 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
626 tmit->in_transmit = GNUNET_NO;
627 tmit->paused = GNUNET_NO;
630 struct GNUNET_MessageHeader msg;
631 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
632 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
633 msg.size = htons (sizeof (msg));
634 transmit_queue_insert (tmit, &msg, GNUNET_YES);
639 * Got acknowledgement of a transmitted message part, continue transmission.
641 * @param tmit Transmission handle.
644 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
646 if (0 == tmit->acks_pending)
648 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
652 tmit->acks_pending--;
656 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
657 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
658 if (GNUNET_NO == tmit->paused)
662 case GNUNET_PSYC_MESSAGE_STATE_DATA:
663 if (GNUNET_NO == tmit->paused)
664 transmit_data (tmit);
667 case GNUNET_PSYC_MESSAGE_STATE_END:
668 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
672 LOG (GNUNET_ERROR_TYPE_DEBUG,
673 "Ignoring message ACK in state %u.\n", tmit->state);
678 /**** Receiving messages ****/
682 * Create handle for receiving messages.
684 struct GNUNET_PSYC_ReceiveHandle *
685 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
686 GNUNET_PSYC_MessageCallback hist_message_cb,
689 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
690 recv->message_cb = message_cb;
691 recv->hist_message_cb = hist_message_cb;
692 recv->cb_cls = cb_cls;
698 * Destroy handle for receiving messages.
701 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
708 * Reset stored data related to the last received message.
711 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
713 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
715 recv->message_id = 0;
716 recv->mod_value_size = 0;
717 recv->mod_value_size_expected = 0;
722 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
724 GNUNET_PSYC_MessageCallback message_cb
725 = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC
726 ? recv->hist_message_cb
729 if (NULL != message_cb)
730 message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
732 GNUNET_PSYC_receive_reset (recv);
737 * Handle incoming PSYC message.
739 * @param recv Receive handle.
740 * @param msg The message.
742 * @return #GNUNET_OK on success,
743 * #GNUNET_SYSERR on receive error.
746 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
747 const struct GNUNET_PSYC_MessageHeader *msg)
749 uint16_t size = ntohs (msg->header.size);
750 uint32_t flags = ntohl (msg->flags);
752 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
753 (struct GNUNET_MessageHeader *) msg);
755 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
757 recv->message_id = GNUNET_ntohll (msg->message_id);
759 recv->slave_key = msg->slave_key;
760 recv->mod_value_size = 0;
761 recv->mod_value_size_expected = 0;
763 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
766 LOG (GNUNET_ERROR_TYPE_WARNING,
767 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
768 GNUNET_ntohll (msg->message_id), recv->message_id);
771 return GNUNET_SYSERR;
773 else if (flags != recv->flags)
775 LOG (GNUNET_ERROR_TYPE_WARNING,
776 "Unexpected message flags. Got: %lu, expected: %lu\n",
780 return GNUNET_SYSERR;
783 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
785 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
787 const struct GNUNET_MessageHeader *pmsg
788 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
789 psize = ntohs (pmsg->size);
790 ptype = ntohs (pmsg->type);
791 size_eq = size_min = 0;
793 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
795 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
796 "Dropping message of type %u with invalid size %u.\n",
799 return GNUNET_SYSERR;
802 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
803 "Received message part from PSYC.\n");
804 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
808 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
809 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
811 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
812 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
814 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
815 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
816 size_min = sizeof (struct GNUNET_MessageHeader);
818 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
819 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
820 size_eq = sizeof (struct GNUNET_MessageHeader);
825 return GNUNET_SYSERR;
828 if (! ((0 < size_eq && psize == size_eq)
829 || (0 < size_min && size_min <= psize)))
833 return GNUNET_SYSERR;
838 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
840 struct GNUNET_PSYC_MessageMethod *meth
841 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
843 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
845 LOG (GNUNET_ERROR_TYPE_WARNING,
846 "Dropping out of order message method (%u).\n",
848 /* It is normal to receive an incomplete message right after connecting,
849 * but should not happen later.
850 * FIXME: add a check for this condition.
854 return GNUNET_SYSERR;
857 if ('\0' != *((char *) meth + psize - 1))
859 LOG (GNUNET_ERROR_TYPE_WARNING,
860 "Dropping message with malformed method. "
861 "Message ID: %" PRIu64 "\n", recv->message_id);
864 return GNUNET_SYSERR;
866 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
869 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
871 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
872 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
873 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
875 LOG (GNUNET_ERROR_TYPE_WARNING,
876 "Dropping out of order message modifier (%u).\n",
880 return GNUNET_SYSERR;
883 struct GNUNET_PSYC_MessageModifier *mod
884 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
886 uint16_t name_size = ntohs (mod->name_size);
887 recv->mod_value_size_expected = ntohl (mod->value_size);
888 recv->mod_value_size = psize - sizeof (*mod) - name_size - 1;
890 if (psize < sizeof (*mod) + name_size + 1
891 || '\0' != *((char *) &mod[1] + name_size)
892 || recv->mod_value_size_expected < recv->mod_value_size)
894 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
897 return GNUNET_SYSERR;
899 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
902 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
904 recv->mod_value_size += psize - sizeof (*pmsg);
906 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
907 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
908 || recv->mod_value_size_expected < recv->mod_value_size)
910 LOG (GNUNET_ERROR_TYPE_WARNING,
911 "Dropping out of order message modifier continuation "
912 "!(%u == %u || %u == %u) || %lu < %lu.\n",
913 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
914 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
915 recv->mod_value_size_expected, recv->mod_value_size);
918 return GNUNET_SYSERR;
922 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
924 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
925 || recv->mod_value_size_expected != recv->mod_value_size)
927 LOG (GNUNET_ERROR_TYPE_WARNING,
928 "Dropping out of order message data fragment "
929 "(%u < %u || %lu != %lu).\n",
930 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
931 recv->mod_value_size_expected, recv->mod_value_size);
935 return GNUNET_SYSERR;
937 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
942 GNUNET_PSYC_MessageCallback message_cb
943 = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC
944 ? recv->hist_message_cb
947 if (NULL != message_cb)
948 message_cb (recv->cb_cls, recv->message_id, recv->flags, pmsg);
952 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
953 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
954 GNUNET_PSYC_receive_reset (recv);
963 * Check if @a data contains a series of valid message parts.
965 * @param data_size Size of @a data.
967 * @param[out] first_ptype Type of first message part.
968 * @param[out] last_ptype Type of last message part.
970 * @return Number of message parts found in @a data.
971 * or GNUNET_SYSERR if the message contains invalid parts.
974 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
975 uint16_t *first_ptype, uint16_t *last_ptype)
977 const struct GNUNET_MessageHeader *pmsg;
978 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
979 if (NULL != first_ptype)
981 if (NULL != last_ptype)
984 for (pos = 0; pos < data_size; pos += psize, parts++)
986 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
987 psize = ntohs (pmsg->size);
988 ptype = ntohs (pmsg->type);
989 if (0 == parts && NULL != first_ptype)
990 *first_ptype = ptype;
991 if (NULL != last_ptype
992 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
994 if (psize < sizeof (*pmsg)
995 || pos + psize > data_size
996 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
997 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
999 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1000 "Invalid message part of type %u and size %u.\n",
1002 return GNUNET_SYSERR;
1004 /* FIXME: check message part order */