2 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
22 * @file psycstore/psyc_util_lib.c
23 * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
24 * @author Gabor X Toth
30 #include "gnunet_util_lib.h"
31 #include "gnunet_env_lib.h"
32 #include "gnunet_psyc_service.h"
33 #include "gnunet_psyc_util_lib.h"
35 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
38 struct GNUNET_PSYC_TransmitHandle
41 * Client connection to service.
43 struct GNUNET_CLIENT_MANAGER_Connection *client;
46 * Message currently being received from the client.
48 struct GNUNET_MessageHeader *msg;
51 * Callback to request next modifier from client.
53 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
56 * Closure for the notify callbacks.
61 * Callback to request next data fragment from client.
63 GNUNET_PSYC_TransmitNotifyData notify_data;
66 * Closure for the notify callbacks.
68 void *notify_data_cls;
71 * Modifier of the environment that is currently being transmitted.
73 struct GNUNET_ENV_Modifier *mod;
78 const char *mod_value;
81 * Number of bytes remaining to be transmitted from the current modifier value.
83 uint32_t mod_value_remaining;
86 * State of the current message being received from client.
88 enum GNUNET_PSYC_MessageState state;
91 * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
96 * Is transmission paused?
101 * Are we currently transmitting a message?
108 struct GNUNET_PSYC_ReceiveHandle
113 GNUNET_PSYC_MessageCallback message_cb;
116 * Message part callback.
118 GNUNET_PSYC_MessagePartCallback message_part_cb;
121 * Closure for the callbacks.
126 * ID of the message being received from the PSYC service.
131 * Public key of the slave from which a message is being received.
133 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
136 * State of the currently being received message from the PSYC service.
138 enum GNUNET_PSYC_MessageState state;
141 * Flags for the currently being received message from the PSYC service.
143 enum GNUNET_PSYC_MessageFlags flags;
146 * Expected value size for the modifier being received from the PSYC service.
148 uint32_t mod_value_size_expected;
151 * Actual value size for the modifier being received from the PSYC service.
153 uint32_t mod_value_size;
161 * Create a PSYC message.
164 * PSYC method for the message.
166 * Environment for the message.
168 * Data payload for the message.
172 * @return Message header with size information,
173 * followed by the message parts.
175 struct GNUNET_PSYC_Message *
176 GNUNET_PSYC_message_create (const char *method_name,
177 const struct GNUNET_ENV_Environment *env,
181 struct GNUNET_ENV_Modifier *mod = NULL;
182 struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
183 struct GNUNET_PSYC_MessageModifier *pmod = NULL;
184 struct GNUNET_MessageHeader *pmsg = NULL;
185 uint16_t env_size = 0;
188 mod = GNUNET_ENV_environment_head (env);
191 env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
196 struct GNUNET_PSYC_Message *msg;
197 uint16_t method_name_size = strlen (method_name) + 1;
198 if (method_name_size == 1)
201 uint16_t msg_size = sizeof (*msg) /* header */
202 + sizeof (*pmeth) + method_name_size /* method */
203 + env_size /* modifiers */
204 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
205 + sizeof (*pmsg); /* end of message */
206 msg = GNUNET_malloc (msg_size);
207 msg->header.size = htons (msg_size);
208 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
210 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
211 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
212 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
213 memcpy (&pmeth[1], method_name, method_name_size);
215 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
218 mod = GNUNET_ENV_environment_head (env);
221 uint16_t mod_name_size = strlen (mod->name) + 1;
222 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
223 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
224 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
225 p += pmod->header.size;
226 pmod->header.size = htons (pmod->header.size);
228 pmod->oper = mod->oper;
229 pmod->name_size = htons (mod_name_size);
230 pmod->value_size = htonl (mod->value_size);
232 memcpy (&pmod[1], mod->name, mod_name_size);
233 if (0 < mod->value_size)
234 memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
242 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
243 pmsg->size = sizeof (*pmsg) + data_size;
245 pmsg->size = htons (pmsg->size);
246 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
247 memcpy (&pmsg[1], data, data_size);
250 pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
251 pmsg->size = htons (sizeof (*pmsg));
252 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
254 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
260 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
261 const struct GNUNET_MessageHeader *msg)
263 uint16_t size = ntohs (msg->size);
264 uint16_t type = ntohs (msg->type);
265 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
268 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
270 struct GNUNET_PSYC_MessageHeader *pmsg
271 = (struct GNUNET_PSYC_MessageHeader *) msg;
272 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
273 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
276 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
278 struct GNUNET_PSYC_MessageMethod *meth
279 = (struct GNUNET_PSYC_MessageMethod *) msg;
280 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
283 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
285 struct GNUNET_PSYC_MessageModifier *mod
286 = (struct GNUNET_PSYC_MessageModifier *) msg;
287 uint16_t name_size = ntohs (mod->name_size);
288 char oper = ' ' < mod->oper ? mod->oper : ' ';
289 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
290 size - sizeof (*mod) - name_size,
291 ((char *) &mod[1]) + name_size);
294 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
295 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
296 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
302 /**** Transmitting messages ****/
306 * Create a transmission handle.
308 struct GNUNET_PSYC_TransmitHandle *
309 GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
311 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
312 tmit->client = client;
318 * Destroy a transmission handle.
321 GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
328 * Queue a message part for transmission.
330 * The message part is added to the current message buffer.
331 * When this buffer is full, it is added to the transmission queue.
334 * Transmission handle.
336 * Message part, or NULL.
339 * #GNUNET_YES or #GNUNET_NO.
342 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
343 const struct GNUNET_MessageHeader *msg,
346 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
348 LOG (GNUNET_ERROR_TYPE_DEBUG,
349 "Queueing message part of type %u and size %u (end: %u)).\n",
350 NULL != msg ? ntohs (msg->type) : 0, size, end);
352 if (NULL != tmit->msg)
355 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
357 /* End of message or buffer is full, add it to transmission queue
358 * and start with empty buffer */
359 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
360 tmit->msg->size = htons (tmit->msg->size);
361 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
363 tmit->acks_pending++;
367 /* Message fits in current buffer, append */
368 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
369 memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
370 tmit->msg->size += size;
374 if (NULL == tmit->msg && NULL != msg)
376 /* Empty buffer, copy over message. */
377 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
378 tmit->msg->size = sizeof (*tmit->msg) + size;
379 memcpy (&tmit->msg[1], msg, size);
382 if (NULL != tmit->msg
383 && (GNUNET_YES == end
384 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
385 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
387 /* End of message or buffer is full, add it to transmission queue. */
388 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
389 tmit->msg->size = htons (tmit->msg->size);
390 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
392 tmit->acks_pending++;
395 if (GNUNET_YES == end)
396 tmit->in_transmit = GNUNET_NO;
401 * Request data from client to transmit.
403 * @param tmit Transmission handle.
406 transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
408 int notify_ret = GNUNET_YES;
409 uint16_t data_size = 0;
410 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
411 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
412 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
414 if (NULL != tmit->notify_data)
416 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
417 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
419 LOG (GNUNET_ERROR_TYPE_DEBUG,
420 "transmit_data (ret: %d, size: %u): %.*s\n",
421 notify_ret, data_size, data_size, &msg[1]);
427 /* Transmission paused, nothing to send. */
428 tmit->paused = GNUNET_YES;
434 tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
438 LOG (GNUNET_ERROR_TYPE_ERROR,
439 "TransmitNotifyData callback returned error when requesting data.\n");
441 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg));
444 transmit_queue_insert (tmit, msg, GNUNET_YES);
450 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
451 msg->size = htons (sizeof (*msg) + data_size);
452 transmit_queue_insert (tmit, msg, !notify_ret);
455 /* End of message. */
456 if (GNUNET_YES == notify_ret)
458 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
459 msg->size = htons (sizeof (*msg));
460 transmit_queue_insert (tmit, msg, GNUNET_YES);
466 * Request a modifier from a client to transmit.
468 * @param tmit Transmission handle.
471 transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
473 uint16_t max_data_size = 0;
474 uint16_t data_size = 0;
475 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
476 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
477 int notify_ret = GNUNET_YES;
481 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
483 struct GNUNET_PSYC_MessageModifier *mod
484 = (struct GNUNET_PSYC_MessageModifier *) msg;
485 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
486 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
488 if (NULL != tmit->notify_mod)
490 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
491 data_size = max_data_size;
492 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
493 &mod->oper, &mod->value_size);
496 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
497 LOG (GNUNET_ERROR_TYPE_DEBUG,
498 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
499 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
500 if (mod->name_size < data_size)
502 tmit->mod_value_remaining
503 = mod->value_size - (data_size - mod->name_size);
504 mod->value_size = htonl (mod->value_size);
505 mod->name_size = htons (mod->name_size);
507 else if (0 < data_size)
509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
510 notify_ret = GNUNET_SYSERR;
514 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
516 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
517 msg->size = sizeof (struct GNUNET_MessageHeader);
519 if (NULL != tmit->notify_mod)
521 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
522 data_size = max_data_size;
523 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
524 &data_size, &msg[1], NULL, NULL);
526 tmit->mod_value_remaining -= data_size;
527 LOG (GNUNET_ERROR_TYPE_DEBUG,
528 "transmit_mod (ret: %d, size: %u): %.*s\n",
529 notify_ret, data_size, data_size, &msg[1]);
540 { /* Transmission paused, nothing to send. */
541 tmit->paused = GNUNET_YES;
545 = (0 == tmit->mod_value_remaining)
546 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
547 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
550 case GNUNET_YES: /* End of modifiers. */
551 GNUNET_assert (0 == tmit->mod_value_remaining);
555 LOG (GNUNET_ERROR_TYPE_ERROR,
556 "TransmitNotifyModifier callback returned with error.\n");
558 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
559 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
560 msg->size = htons (sizeof (*msg));
562 transmit_queue_insert (tmit, msg, GNUNET_YES);
568 GNUNET_assert (data_size <= max_data_size);
569 msg->size = htons (msg->size + data_size);
570 transmit_queue_insert (tmit, msg, GNUNET_NO);
573 if (GNUNET_YES == notify_ret)
575 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
576 if (0 == tmit->acks_pending)
577 transmit_data (tmit);
587 transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
588 uint32_t *full_value_size)
591 struct GNUNET_PSYC_TransmitHandle *tmit = cls;
592 uint16_t name_size = 0;
593 uint32_t value_size = 0;
594 const char *value = NULL;
598 if (NULL != tmit->mod)
599 tmit->mod = tmit->mod->next;
600 if (NULL == tmit->mod)
601 { /* No more modifiers, continue with data */
606 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
607 *full_value_size = tmit->mod->value_size;
608 *oper = tmit->mod->oper;
609 name_size = strlen (tmit->mod->name) + 1;
611 if (name_size + tmit->mod->value_size <= *data_size)
613 value_size = tmit->mod->value_size;
614 *data_size = name_size + value_size;
616 else /* full modifier does not fit in data, continuation needed */
618 value_size = *data_size - name_size;
619 tmit->mod_value = tmit->mod->value + value_size;
622 memcpy (data, tmit->mod->name, name_size);
623 memcpy ((char *)data + name_size, tmit->mod->value, value_size);
627 { /* Modifier continuation */
628 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
629 value = tmit->mod_value;
630 if (tmit->mod_value_remaining <= *data_size)
632 value_size = tmit->mod_value_remaining;
633 tmit->mod_value = NULL;
637 value_size = *data_size;
638 tmit->mod_value += value_size;
641 if (*data_size < value_size)
643 LOG (GNUNET_ERROR_TYPE_DEBUG,
644 "Value in environment larger than buffer: %u < %zu\n",
645 *data_size, value_size);
650 *data_size = value_size;
651 memcpy (data, value, value_size);
652 return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
658 * Transmit a message.
661 * Transmission handle.
663 * Which method should be invoked.
665 * Environment for the message.
666 * Should stay available until the first call to notify_data.
667 * Can be NULL if there are no modifiers or @a notify_mod is
670 * Function to call to obtain modifiers.
671 * Can be NULL if there are no modifiers or @a env is provided instead.
673 * Function to call to obtain fragments of the data.
675 * Closure for @a notify_mod and @a notify_data.
677 * Flags for the message being transmitted.
679 * @return #GNUNET_OK if the transmission was started.
680 * #GNUNET_SYSERR if another transmission is already going on.
683 GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
684 const char *method_name,
685 const struct GNUNET_ENV_Environment *env,
686 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
687 GNUNET_PSYC_TransmitNotifyData notify_data,
691 if (GNUNET_NO != tmit->in_transmit)
692 return GNUNET_SYSERR;
693 tmit->in_transmit = GNUNET_YES;
695 size_t size = strlen (method_name) + 1;
696 struct GNUNET_PSYC_MessageMethod *pmeth;
697 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
698 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
700 if (NULL != notify_mod)
702 tmit->notify_mod = notify_mod;
703 tmit->notify_mod_cls = notify_cls;
707 tmit->notify_mod = &transmit_notify_env;
708 tmit->notify_mod_cls = tmit;
711 struct GNUNET_ENV_Modifier mod = {};
712 mod.next = GNUNET_ENV_environment_head (env);
715 struct GNUNET_ENV_Modifier *m = tmit->mod;
716 while (NULL != (m = m->next))
718 if (m->oper != GNUNET_ENV_OP_SET)
719 flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
728 pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
729 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
730 pmeth->header.size = htons (sizeof (*pmeth) + size);
731 pmeth->flags = htonl (flags);
732 memcpy (&pmeth[1], method_name, size);
734 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
735 tmit->notify_data = notify_data;
736 tmit->notify_data_cls = notify_cls;
744 * Resume transmission.
746 * @param tmit Transmission handle.
749 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
751 if (0 == tmit->acks_pending)
753 tmit->paused = GNUNET_NO;
754 transmit_data (tmit);
760 * Abort transmission request.
762 * @param tmit Transmission handle.
765 GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
767 if (GNUNET_NO == tmit->in_transmit)
770 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
771 tmit->in_transmit = GNUNET_NO;
772 tmit->paused = GNUNET_NO;
775 struct GNUNET_MessageHeader msg;
776 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
777 msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
778 msg.size = htons (sizeof (msg));
779 transmit_queue_insert (tmit, &msg, GNUNET_YES);
784 * Got acknowledgement of a transmitted message part, continue transmission.
786 * @param tmit Transmission handle.
789 GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
791 if (0 == tmit->acks_pending)
793 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
797 tmit->acks_pending--;
801 case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
802 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
803 if (GNUNET_NO == tmit->paused)
807 case GNUNET_PSYC_MESSAGE_STATE_DATA:
808 if (GNUNET_NO == tmit->paused)
809 transmit_data (tmit);
812 case GNUNET_PSYC_MESSAGE_STATE_END:
813 case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
817 LOG (GNUNET_ERROR_TYPE_DEBUG,
818 "Ignoring message ACK in state %u.\n", tmit->state);
823 /**** Receiving messages ****/
827 * Create handle for receiving messages.
829 struct GNUNET_PSYC_ReceiveHandle *
830 GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
831 GNUNET_PSYC_MessagePartCallback message_part_cb,
834 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
835 recv->message_cb = message_cb;
836 recv->message_part_cb = message_part_cb;
837 recv->cb_cls = cb_cls;
843 * Destroy handle for receiving messages.
846 GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
853 * Reset stored data related to the last received message.
856 GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
858 recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
860 recv->message_id = 0;
861 recv->mod_value_size = 0;
862 recv->mod_value_size_expected = 0;
867 recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
869 if (NULL != recv->message_part_cb)
870 recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
873 if (NULL != recv->message_cb)
874 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
876 GNUNET_PSYC_receive_reset (recv);
881 * Handle incoming PSYC message.
883 * @param recv Receive handle.
884 * @param msg The message.
886 * @return #GNUNET_OK on success,
887 * #GNUNET_SYSERR on receive error.
890 GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
891 const struct GNUNET_PSYC_MessageHeader *msg)
893 uint16_t size = ntohs (msg->header.size);
894 uint32_t flags = ntohl (msg->flags);
897 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
898 (struct GNUNET_MessageHeader *) msg);
900 if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
902 recv->message_id = GNUNET_ntohll (msg->message_id);
904 recv->slave_key = msg->slave_key;
905 recv->mod_value_size = 0;
906 recv->mod_value_size_expected = 0;
908 else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
911 LOG (GNUNET_ERROR_TYPE_WARNING,
912 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
913 GNUNET_ntohll (msg->message_id), recv->message_id);
916 return GNUNET_SYSERR;
918 else if (flags != recv->flags)
920 LOG (GNUNET_ERROR_TYPE_WARNING,
921 "Unexpected message flags. Got: %lu, expected: %lu\n",
925 return GNUNET_SYSERR;
927 message_id = recv->message_id;
929 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
931 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
933 const struct GNUNET_MessageHeader *pmsg
934 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
935 psize = ntohs (pmsg->size);
936 ptype = ntohs (pmsg->type);
937 size_eq = size_min = 0;
939 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
941 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
942 "Dropping message of type %u with invalid size %u.\n",
945 return GNUNET_SYSERR;
948 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949 "Received message part of type %u and size %u from PSYC.\n",
951 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
955 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
956 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
958 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
959 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
961 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
962 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
963 size_min = sizeof (struct GNUNET_MessageHeader);
965 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
966 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
967 size_eq = sizeof (struct GNUNET_MessageHeader);
972 return GNUNET_SYSERR;
975 if (! ((0 < size_eq && psize == size_eq)
976 || (0 < size_min && size_min <= psize)))
980 return GNUNET_SYSERR;
985 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
987 struct GNUNET_PSYC_MessageMethod *meth
988 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
990 if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
992 LOG (GNUNET_ERROR_TYPE_WARNING,
993 "Dropping out of order message method (%u).\n",
995 /* It is normal to receive an incomplete message right after connecting,
996 * but should not happen later.
997 * FIXME: add a check for this condition.
1001 return GNUNET_SYSERR;
1004 if ('\0' != *((char *) meth + psize - 1))
1006 LOG (GNUNET_ERROR_TYPE_WARNING,
1007 "Dropping message with malformed method. "
1008 "Message ID: %" PRIu64 "\n", recv->message_id);
1009 GNUNET_break_op (0);
1011 return GNUNET_SYSERR;
1013 recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1016 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1018 if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
1019 || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1020 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
1022 LOG (GNUNET_ERROR_TYPE_WARNING,
1023 "Dropping out of order message modifier (%u).\n",
1025 GNUNET_break_op (0);
1027 return GNUNET_SYSERR;
1030 struct GNUNET_PSYC_MessageModifier *mod
1031 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
1033 uint16_t name_size = ntohs (mod->name_size);
1034 recv->mod_value_size_expected = ntohl (mod->value_size);
1035 recv->mod_value_size = psize - sizeof (*mod) - name_size;
1037 if (psize < sizeof (*mod) + name_size
1038 || '\0' != *((char *) &mod[1] + name_size - 1)
1039 || recv->mod_value_size_expected < recv->mod_value_size)
1041 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
1042 GNUNET_break_op (0);
1044 return GNUNET_SYSERR;
1046 recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1049 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
1051 recv->mod_value_size += psize - sizeof (*pmsg);
1053 if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
1054 || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
1055 || recv->mod_value_size_expected < recv->mod_value_size)
1057 LOG (GNUNET_ERROR_TYPE_WARNING,
1058 "Dropping out of order message modifier continuation "
1059 "!(%u == %u || %u == %u) || %lu < %lu.\n",
1060 GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
1061 GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
1062 recv->mod_value_size_expected, recv->mod_value_size);
1063 GNUNET_break_op (0);
1065 return GNUNET_SYSERR;
1069 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1071 if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
1072 || recv->mod_value_size_expected != recv->mod_value_size)
1074 LOG (GNUNET_ERROR_TYPE_WARNING,
1075 "Dropping out of order message data fragment "
1076 "(%u < %u || %lu != %lu).\n",
1077 recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
1078 recv->mod_value_size_expected, recv->mod_value_size);
1080 GNUNET_break_op (0);
1082 return GNUNET_SYSERR;
1084 recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1089 if (NULL != recv->message_part_cb)
1090 recv->message_part_cb (recv->cb_cls, &recv->slave_key,
1091 recv->message_id, recv->flags,
1092 0, // FIXME: data_offset
1097 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1098 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
1099 GNUNET_PSYC_receive_reset (recv);
1104 if (NULL != recv->message_cb)
1105 recv->message_cb (recv->cb_cls, message_id, flags, msg);
1111 * Check if @a data contains a series of valid message parts.
1113 * @param data_size Size of @a data.
1115 * @param[out] first_ptype Type of first message part.
1116 * @param[out] last_ptype Type of last message part.
1118 * @return Number of message parts found in @a data.
1119 * or GNUNET_SYSERR if the message contains invalid parts.
1122 GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1123 uint16_t *first_ptype, uint16_t *last_ptype)
1125 const struct GNUNET_MessageHeader *pmsg;
1126 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
1127 if (NULL != first_ptype)
1129 if (NULL != last_ptype)
1132 for (pos = 0; pos < data_size; pos += psize, parts++)
1134 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1135 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1136 psize = ntohs (pmsg->size);
1137 ptype = ntohs (pmsg->type);
1138 if (0 == parts && NULL != first_ptype)
1139 *first_ptype = ptype;
1140 if (NULL != last_ptype
1141 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
1142 *last_ptype = ptype;
1143 if (psize < sizeof (*pmsg)
1144 || pos + psize > data_size
1145 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
1146 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
1148 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1149 "Invalid message part of type %u and size %u.\n",
1151 return GNUNET_SYSERR;
1153 /** @todo FIXME: check message part order */
1159 struct ParseMessageClosure
1161 struct GNUNET_ENV_Environment *env;
1162 const char **method_name;
1164 uint16_t *data_size;
1165 enum GNUNET_PSYC_MessageState msg_state;
1170 parse_message_part_cb (void *cls,
1171 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1172 uint64_t message_id, uint32_t flags, uint64_t data_offset,
1173 const struct GNUNET_MessageHeader *msg)
1175 struct ParseMessageClosure *pmc = cls;
1178 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1182 switch (ntohs (msg->type))
1184 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1186 struct GNUNET_PSYC_MessageMethod *
1187 pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1188 *pmc->method_name = (const char *) &pmeth[1];
1189 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1193 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1195 struct GNUNET_PSYC_MessageModifier *
1196 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1198 const char *name = (const char *) &pmod[1];
1199 const void *value = name + ntohs (pmod->name_size);
1200 GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1201 ntohl (pmod->value_size));
1202 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1206 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1207 *pmc->data = &msg[1];
1208 *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1209 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1212 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1213 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1217 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1223 * Parse PSYC message.
1226 * The PSYC message to parse.
1227 * @param[out] method_name
1228 * Pointer to the method name inside @a pmsg.
1230 * The environment for the message with a list of modifiers.
1232 * Pointer to data inside @a pmsg.
1233 * @param[out] data_size
1234 * Size of @data is written here.
1236 * @return #GNUNET_OK on success,
1237 * #GNUNET_SYSERR on parse error.
1240 GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
1241 const char **method_name,
1242 struct GNUNET_ENV_Environment *env,
1244 uint16_t *data_size)
1246 struct ParseMessageClosure cls;
1248 cls.method_name = method_name;
1250 cls.data_size = data_size;
1252 struct GNUNET_PSYC_ReceiveHandle *
1253 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1254 GNUNET_PSYC_receive_message (recv, msg);
1255 GNUNET_PSYC_receive_destroy (recv);
1257 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1264 * Initialize PSYC message header.
1267 GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
1268 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1271 uint16_t size = ntohs (mmsg->header.size);
1272 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1274 pmsg->header.size = htons (psize);
1275 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1276 pmsg->message_id = mmsg->message_id;
1277 pmsg->fragment_offset = mmsg->fragment_offset;
1278 pmsg->flags = htonl (flags);
1280 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
1285 * Create a new PSYC message header from a multicast message.
1287 struct GNUNET_PSYC_MessageHeader *
1288 GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
1291 struct GNUNET_PSYC_MessageHeader *pmsg;
1292 uint16_t size = ntohs (mmsg->header.size);
1293 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
1295 pmsg = GNUNET_malloc (psize);
1296 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
1302 * Create a new PSYC message header from a PSYC message.
1304 struct GNUNET_PSYC_MessageHeader *
1305 GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
1307 uint16_t msg_size = ntohs (msg->header.size);
1308 struct GNUNET_PSYC_MessageHeader *
1309 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1310 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1311 pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
1312 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));