X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fpsycutil%2Fpsyc_message.c;h=3421fd0ba8c384dbf2bc31610bc483034fb3053e;hb=79fb947eb8fba243ea65e19b40b65e04f8806865;hp=8c214d2b642f4f90a607ccd503461e878d577e85;hpb=50eaf8d7de763d25b7dae7ffdee8d7c6b5fe71ea;p=oweals%2Fgnunet.git diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c index 8c214d2b6..3421fd0ba 100644 --- a/src/psycutil/psyc_message.c +++ b/src/psycutil/psyc_message.c @@ -1,6 +1,6 @@ /* * This file is part of GNUnet - * Copyright (C) 2013 Christian Grothoff (and other contributing authors) + * Copyright (C) 2013 GNUnet e.V. * * GNUnet is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published @@ -19,7 +19,7 @@ */ /** - * @file psycstore/psyc_util_lib.c + * @file psycutil/psyc_message.c * @brief PSYC utilities; receiving/transmitting/logging PSYC messages. * @author Gabor X Toth */ @@ -39,13 +39,18 @@ struct GNUNET_PSYC_TransmitHandle /** * Client connection to service. */ - struct GNUNET_CLIENT_MANAGER_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Message currently being received from the client. */ struct GNUNET_MessageHeader *msg; + /** + * Envelope for @a msg + */ + struct GNUNET_MQ_Envelope *env; + /** * Callback to request next modifier from client. */ @@ -135,7 +140,7 @@ struct GNUNET_PSYC_ReceiveHandle /** * Public key of the slave from which a message is being received. */ - struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; /** * State of the currently being received message from the PSYC service. @@ -215,7 +220,7 @@ GNUNET_PSYC_message_create (const char *method_name, pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1]; pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + method_name_size); - memcpy (&pmeth[1], method_name, method_name_size); + GNUNET_memcpy (&pmeth[1], method_name, method_name_size); uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size; if (NULL != env) @@ -234,9 +239,9 @@ GNUNET_PSYC_message_create (const char *method_name, pmod->name_size = htons (mod_name_size); pmod->value_size = htonl (mod->value_size); - memcpy (&pmod[1], mod->name, mod_name_size); + GNUNET_memcpy (&pmod[1], mod->name, mod_name_size); if (0 < mod->value_size) - memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size); + GNUNET_memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size); mod = mod->next; } @@ -249,7 +254,7 @@ GNUNET_PSYC_message_create (const char *method_name, p += pmsg->size; pmsg->size = htons (pmsg->size); pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); - memcpy (&pmsg[1], data, data_size); + GNUNET_memcpy (&pmsg[1], data, data_size); } pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p); @@ -267,38 +272,54 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, { uint16_t size = ntohs (msg->size); uint16_t type = ntohs (msg->type); - GNUNET_log (kind, "Message of type %d and size %u:\n", type, size); + + GNUNET_log (kind, + "Message of type %d and size %u:\n", + type, + size); switch (type) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: { - struct GNUNET_PSYC_MessageHeader *pmsg - = (struct GNUNET_PSYC_MessageHeader *) msg; - GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n", - GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags)); + const struct GNUNET_PSYC_MessageHeader *pmsg + = (const struct GNUNET_PSYC_MessageHeader *) msg; + GNUNET_log (kind, + "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n", + GNUNET_ntohll (pmsg->message_id), + ntohl (pmsg->flags)); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: { - struct GNUNET_PSYC_MessageMethod *meth - = (struct GNUNET_PSYC_MessageMethod *) msg; - GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]); + const struct GNUNET_PSYC_MessageMethod *meth + = (const struct GNUNET_PSYC_MessageMethod *) msg; + GNUNET_log (kind, + "\t%.*s\n", + (int) (size - sizeof (*meth)), + (const char *) &meth[1]); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { - struct GNUNET_PSYC_MessageModifier *mod - = (struct GNUNET_PSYC_MessageModifier *) msg; + const struct GNUNET_PSYC_MessageModifier *mod + = (const struct GNUNET_PSYC_MessageModifier *) msg; uint16_t name_size = ntohs (mod->name_size); char oper = ' ' < mod->oper ? mod->oper : ' '; - GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], - size - sizeof (*mod) - name_size, - ((char *) &mod[1]) + name_size); + GNUNET_log (kind, + "\t%c%.*s\t%.*s\n", + oper, + (int) name_size, + (const char *) &mod[1], + (int) (size - sizeof (*mod) - name_size), + ((const char *) &mod[1]) + name_size); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]); + GNUNET_log (kind, + "\t%.*s\n", + (int) (size - sizeof (*msg)), + (const char *) &msg[1]); break; } } @@ -311,10 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, * Create a transmission handle. */ struct GNUNET_PSYC_TransmitHandle * -GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client) +GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq) { - struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit)); - tmit->client = client; + struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); + + tmit->mq = mq; return tmit; } @@ -361,17 +383,16 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, { /* End of message or buffer is full, add it to transmission queue * and start with empty buffer */ - tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); tmit->msg->size = htons (tmit->msg->size); - GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); + GNUNET_MQ_send (tmit->mq, tmit->env); + tmit->env = NULL; tmit->msg = NULL; tmit->acks_pending++; } else { /* Message fits in current buffer, append */ - tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size); - memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); + GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); tmit->msg->size += size; } } @@ -379,9 +400,14 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, if (NULL == tmit->msg && NULL != msg) { /* Empty buffer, copy over message. */ - tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size); + tmit->env = GNUNET_MQ_msg_extra (tmit->msg, + GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + /* store current message size in host byte order + * then later switch it to network byte order before sending */ tmit->msg->size = sizeof (*tmit->msg) + size; - memcpy (&tmit->msg[1], msg, size); + + GNUNET_memcpy (&tmit->msg[1], msg, size); } if (NULL != tmit->msg @@ -390,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) { /* End of message or buffer is full, add it to transmission queue. */ - tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); tmit->msg->size = htons (tmit->msg->size); - GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); + GNUNET_MQ_send (tmit->mq, tmit->env); + tmit->env = NULL; tmit->msg = NULL; tmit->acks_pending++; } @@ -630,8 +656,8 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, tmit->mod_value = tmit->mod->value + value_size; } - memcpy (data, tmit->mod->name, name_size); - memcpy ((char *)data + name_size, tmit->mod->value, value_size); + GNUNET_memcpy (data, tmit->mod->name, name_size); + GNUNET_memcpy ((char *)data + name_size, tmit->mod->value, value_size); return GNUNET_NO; } else @@ -659,7 +685,7 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, } *data_size = value_size; - memcpy (data, value, value_size); + GNUNET_memcpy (data, value, value_size); return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO; } } @@ -705,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, size_t size = strlen (method_name) + 1; struct GNUNET_PSYC_MessageMethod *pmeth; - tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size); + + tmit->env = GNUNET_MQ_msg_extra (tmit->msg, + GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + /* store current message size in host byte order + * then later switch it to network byte order before sending */ tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; if (NULL != notify_mod) @@ -740,7 +771,7 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + size); pmeth->flags = htonl (flags); - memcpy (&pmeth[1], method_name, size); + GNUNET_memcpy (&pmeth[1], method_name, size); tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; tmit->notify_data = notify_data; @@ -810,6 +841,9 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit) } tmit->acks_pending--; + if (GNUNET_YES == tmit->paused) + return; + switch (tmit->state) { case GNUNET_PSYC_MESSAGE_STATE_MODIFIER: @@ -879,11 +913,10 @@ static void recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) { if (NULL != recv->message_part_cb) - recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags, - 0, NULL); + recv->message_part_cb (recv->cb_cls, NULL, NULL); if (NULL != recv->message_cb) - recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); + recv->message_cb (recv->cb_cls, NULL); GNUNET_PSYC_receive_reset (recv); } @@ -904,7 +937,6 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, { uint16_t size = ntohs (msg->header.size); uint32_t flags = ntohl (msg->flags); - uint64_t message_id; GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, (struct GNUNET_MessageHeader *) msg); @@ -913,7 +945,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, { recv->message_id = GNUNET_ntohll (msg->message_id); recv->flags = flags; - recv->slave_key = msg->slave_key; + recv->slave_pub_key = msg->slave_pub_key; recv->mod_value_size = 0; recv->mod_value_size_expected = 0; } @@ -936,7 +968,6 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, recv_error (recv); return GNUNET_SYSERR; } - message_id = recv->message_id; uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; @@ -1099,10 +1130,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, } if (NULL != recv->message_part_cb) - recv->message_part_cb (recv->cb_cls, &recv->slave_key, - recv->message_id, recv->flags, - 0, // FIXME: data_offset - pmsg); + recv->message_part_cb (recv->cb_cls, msg, pmsg); switch (ptype) { @@ -1114,7 +1142,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, } if (NULL != recv->message_cb) - recv->message_cb (recv->cb_cls, message_id, flags, msg); + recv->message_cb (recv->cb_cls, msg); return GNUNET_OK; } @@ -1180,23 +1208,22 @@ struct ParseMessageClosure static void parse_message_part_cb (void *cls, - const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t message_id, uint32_t flags, uint64_t data_offset, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_PSYC_MessageHeader *msg, + const struct GNUNET_MessageHeader *pmsg) { struct ParseMessageClosure *pmc = cls; - if (NULL == msg) + if (NULL == pmsg) { pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; return; } - switch (ntohs (msg->type)) + switch (ntohs (pmsg->type)) { case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: { struct GNUNET_PSYC_MessageMethod * - pmeth = (struct GNUNET_PSYC_MessageMethod *) msg; + pmeth = (struct GNUNET_PSYC_MessageMethod *) pmsg; *pmc->method_name = (const char *) &pmeth[1]; pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; break; @@ -1205,7 +1232,7 @@ parse_message_part_cb (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: { struct GNUNET_PSYC_MessageModifier * - pmod = (struct GNUNET_PSYC_MessageModifier *) msg; + pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg; const char *name = (const char *) &pmod[1]; const void *value = name + ntohs (pmod->name_size); @@ -1216,8 +1243,8 @@ parse_message_part_cb (void *cls, } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - *pmc->data = &msg[1]; - *pmc->data_size = ntohs (msg->size) - sizeof (*msg); + *pmc->data = &pmsg[1]; + *pmc->data_size = ntohs (pmsg->size) - sizeof (*pmsg); pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; break; @@ -1241,7 +1268,7 @@ parse_message_part_cb (void *cls, * @param env * The environment for the message with a list of modifiers. * @param[out] data - * Pointer to data inside @a pmsg. + * Pointer to data inside @a msg. * @param[out] data_size * Size of @data is written here. * @@ -1292,7 +1319,7 @@ GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg, pmsg->fragment_offset = mmsg->fragment_offset; pmsg->flags = htonl (flags); - memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); + GNUNET_memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); } @@ -1324,6 +1351,6 @@ GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *m pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg)); pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg)); - memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); + GNUNET_memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); return pmsg; }