#include "gnunet_time_lib.h"
#include "gnunet_dv_service.h"
#include "dv.h"
+#include "../transport/plugin_transport.h"
struct PendingMessages
*/
void *receive_cls;
+ /**
+ * Current unique ID
+ */
+ uint32_t uid_gen;
+
+ /**
+ * Hashmap containing outstanding send requests awaiting confirmation.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *send_callbacks;
+
};
struct StartContext
{
-
/**
* Start message
*/
struct GNUNET_DV_Handle *handle;
};
+struct SendCallbackContext
+{
+ /**
+ * The continuation to call once a message is confirmed sent (or failed)
+ */
+ GNUNET_TRANSPORT_TransmitContinuation cont;
+
+ /**
+ * Closure to call with send continuation.
+ */
+ void *cont_cls;
+
+ /**
+ * Target of the message.
+ */
+ struct GNUNET_PeerIdentity target;
+};
+
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
+static void
+hash_from_uid (uint32_t uid,
+ GNUNET_HashCode *hash)
+{
+ memset (hash, 0, sizeof(GNUNET_HashCode));
+ *((uint32_t*)hash) = uid;
+}
/**
* Try to (re)connect to the dv service.
handle->current = NULL;
process_pending_message (handle);
+ GNUNET_free(pos->msg);
GNUNET_free (pos);
}
size_t ret;
size_t tsize;
+#if DEBUG_DV
+ if (handle->current != NULL)
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending called with message type %d\n", ntohs(handle->current->msg->header.type));
+#endif
+
if (buf == NULL)
{
+#if DEBUG_DV
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n");
+#endif
finish(handle, GNUNET_SYSERR);
return 0;
}
if (size >= tsize)
{
memcpy(buf, handle->current->msg, tsize);
+#if DEBUG_DV
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Copied %d bytes into buffer!\n\n\n", tsize);
+#endif
+ finish(handle, GNUNET_OK);
+ return tsize;
}
- else
- {
- return ret;
- }
+
}
return ret;
*/
static void process_pending_message(struct GNUNET_DV_Handle *handle)
{
- struct GNUNET_TIME_Relative timeout;
if (handle->current != NULL)
return; /* action already pending */
handle->pending_list = handle->pending_list->next;
handle->current->next = NULL;
- timeout = GNUNET_TIME_absolute_get_remaining (handle->current->timeout);
if (NULL ==
(handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
- ntohs(handle->current->msg->msgbuf_size),
- timeout,
+ ntohl(handle->current->msg->msgbuf_size),
+ handle->current->msg->timeout,
GNUNET_YES,
&transmit_pending, handle)))
{
last = pos;
pos = pos->next;
}
- new_message->next = last->next; /* Should always be null */
last->next = new_message;
}
else
{
- new_message->next = handle->pending_list; /* Will always be null */
handle->pending_list = new_message;
}
process_pending_message(handle);
}
-
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
+ */
void handle_message_receipt (void *cls,
const struct GNUNET_MessageHeader * msg)
{
struct GNUNET_DV_Handle *handle = cls;
struct GNUNET_DV_MessageReceived *received_msg;
+ struct GNUNET_DV_SendResultMessage *send_result_msg;
size_t packed_msg_len;
size_t sender_address_len;
char *sender_address;
char *packed_msg;
+ char *packed_msg_start;
+ GNUNET_HashCode uidhash;
+ struct SendCallbackContext *send_ctx;
if (msg == NULL)
{
return; /* Connection closed? */
}
-#if DEBUG_DV
- fprintf(stdout, "dv api receives message of type %d or raw %d\n", ntohs(msg->type), msg->type);
-#endif
- GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
+ GNUNET_assert((ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE) || (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT));
- if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
- return;
+ switch (ntohs(msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE:
+ if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
+ return;
- received_msg = (struct GNUNET_DV_MessageReceived *)msg;
- packed_msg_len = ntohs(received_msg->msg_len);
- sender_address_len = ntohs(received_msg->sender_address_len);
- GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
+ received_msg = (struct GNUNET_DV_MessageReceived *)msg;
+ packed_msg_len = ntohl(received_msg->msg_len);
+ sender_address_len = ntohl(received_msg->sender_address_len);
- sender_address = GNUNET_malloc(sender_address_len);
- memcpy(sender_address, &received_msg[1], sender_address_len);
- packed_msg = GNUNET_malloc(packed_msg_len);
- memcpy(packed_msg, &received_msg[1 + sender_address_len], packed_msg_len);
+ GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
+ sender_address = GNUNET_malloc(sender_address_len);
+ memcpy(sender_address, &received_msg[1], sender_address_len);
+ packed_msg_start = (char *)&received_msg[1];
+ packed_msg = GNUNET_malloc(packed_msg_len);
+ memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
- handle->receive_handler(handle->receive_cls,
- &received_msg->sender,
- packed_msg,
- packed_msg_len,
- ntohl(received_msg->distance),
- sender_address,
- sender_address_len);
+#if DEBUG_DV
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type);
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: message sender reported as %s\n", GNUNET_i2s(&received_msg->sender));
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", ntohl(received_msg->distance));
+#endif
- GNUNET_free(sender_address);
+ handle->receive_handler(handle->receive_cls,
+ &received_msg->sender,
+ packed_msg,
+ packed_msg_len,
+ ntohl(received_msg->distance),
+ sender_address,
+ sender_address_len);
+
+ GNUNET_free(sender_address);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT:
+ if (ntohs(msg->size) < sizeof(struct GNUNET_DV_SendResultMessage))
+ return;
+ send_result_msg = (struct GNUNET_DV_SendResultMessage *)msg;
+ hash_from_uid(ntohl(send_result_msg->uid), &uidhash);
+ send_ctx = GNUNET_CONTAINER_multihashmap_get(handle->send_callbacks, &uidhash);
+
+ if ((send_ctx != NULL) && (send_ctx->cont != NULL))
+ {
+ if (ntohl(send_result_msg->result) == 0)
+ {
+ send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_OK);
+ }
+ else
+ {
+ send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR);
+ }
+ }
+ GNUNET_free_non_null(send_ctx);
+ break;
+ default:
+ break;
+ }
GNUNET_CLIENT_receive (handle->client,
&handle_message_receipt,
handle, GNUNET_TIME_UNIT_FOREVER_REL);
* @param timeout how long can this message be delayed (pass through to core)
* @param addr the address of this peer (internally known to DV)
* @param addrlen the length of the peer address
+ * @param cont continuation to call once the message has been sent (or failed)
+ * @param cont_cls closure for continuation
*
*/
int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle,
unsigned int priority,
struct GNUNET_TIME_Relative timeout,
const void *addr,
- size_t addrlen)
+ size_t addrlen,
+ GNUNET_TRANSPORT_TransmitContinuation
+ cont, void *cont_cls)
{
struct GNUNET_DV_SendMessage *msg;
+ struct SendCallbackContext *send_ctx;
+ char *end_of_message;
+ GNUNET_HashCode uidhash;
+ int msize;
+#if DEBUG_DV_MESSAGES
+ dv_handle->uid_gen = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
+#else
+ dv_handle->uid_gen++;
+#endif
- msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
- msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
+ msize = sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size;
+ msg = GNUNET_malloc(msize);
+ msg->header.size = htons(msize);
msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity));
- msg->msgbuf = GNUNET_malloc(msgbuf_size);
- memcpy(msg->msgbuf, msgbuf, msgbuf_size);
- msg->msgbuf_size = htons(msgbuf_size);
+ msg->msgbuf_size = htonl(msgbuf_size);
msg->priority = htonl(priority);
msg->timeout = timeout;
- msg->addrlen = htons(addrlen);
+ msg->addrlen = htonl(addrlen);
+ msg->uid = htonl(dv_handle->uid_gen);
memcpy(&msg[1], addr, addrlen);
-
+ end_of_message = (char *)&msg[1];
+ end_of_message = &end_of_message[addrlen];
+ memcpy(end_of_message, msgbuf, msgbuf_size);
add_pending(dv_handle, msg);
+ send_ctx = GNUNET_malloc(sizeof(struct SendCallbackContext));
+ send_ctx->cont = cont;
+ send_ctx->cont_cls = cont_cls;
+ memcpy(&send_ctx->target, target, sizeof(struct GNUNET_PeerIdentity));
+ hash_from_uid(dv_handle->uid_gen, &uidhash);
+ GNUNET_CONTAINER_multihashmap_put(dv_handle->send_callbacks, &uidhash, send_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
return GNUNET_OK;
}
if (size >= tsize)
{
memcpy(buf, start_context->message, tsize);
+ GNUNET_free(start_context->message);
+ GNUNET_free(start_context);
return tsize;
}
GNUNET_YES,
&transmit_start, start_context);
+ handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create(100);
+
GNUNET_CLIENT_receive (handle->client,
&handle_message_receipt,
handle, GNUNET_TIME_UNIT_FOREVER_REL);