X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdv%2Fdv_api.c;h=d954d718b30b45ba9da651e67d45f7f2710916ba;hb=19377520016cc644070d207af34ae3e76618fdc8;hp=9e99790fbcae290976f03e17ec67ef0e9311c9a0;hpb=54ce17ec50827d3a905a3cb182ccd3c18bbd3668;p=oweals%2Fgnunet.git diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index 9e99790fb..d954d718b 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c @@ -4,7 +4,7 @@ GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -22,7 +22,7 @@ * @file dv/dv_api.c * @brief library to access the DV service * @author Christian Grothoff - * @author Not Nathan Evans + * @author Nathan Evans */ #include "platform.h" #include "gnunet_bandwidth_lib.h" @@ -36,8 +36,11 @@ #include "gnunet_time_lib.h" #include "gnunet_dv_service.h" #include "dv.h" +#include "gnunet_transport_plugin.h" - +/** + * Store ready to send messages + */ struct PendingMessages { /** @@ -62,10 +65,6 @@ struct PendingMessages */ struct GNUNET_DV_Handle { - /** - * Our scheduler. - */ - struct GNUNET_SCHEDULER_Handle *sched; /** * Configuration to use. @@ -92,11 +91,6 @@ struct GNUNET_DV_Handle */ struct PendingMessages *current; - /** - * Kill off the connection and any pending messages. - */ - int do_destroy; - /** * Handler for messages we receive from the DV service */ @@ -107,12 +101,69 @@ struct GNUNET_DV_Handle */ void *receive_cls; + /** + * Current unique ID + */ + uint32_t uid_gen; + + /** + * Hashmap containing outstanding send requests awaiting confirmation. + */ + struct GNUNET_CONTAINER_MultiHashMap *send_callbacks; + }; +struct StartContext +{ + /** + * Start message + */ + struct GNUNET_MessageHeader *message; + + /** + * Handle to service, in case of timeout + */ + struct GNUNET_DV_Handle *handle; +}; + +struct SendCallbackContext +{ + /** + * The continuation to call once a message is confirmed sent (or failed) + */ + GNUNET_TRANSPORT_TransmitContinuation cont; + + /** + * Closure to call with send continuation. + */ + void *cont_cls; + + /** + * Target of the message. + */ + struct GNUNET_PeerIdentity target; +}; + +/** + * Convert unique ID to hash code. + * + * @param uid unique ID to convert + * @param hash set to uid (extended with zeros) + */ +static void +hash_from_uid (uint32_t uid, + GNUNET_HashCode *hash) +{ + memset (hash, 0, sizeof(GNUNET_HashCode)); + *((uint32_t*)hash) = uid; +} + /** * Try to (re)connect to the dv service. * + * @param ret handle to the (disconnected) dv service + * * @return GNUNET_YES on success, GNUNET_NO on failure. */ static int @@ -120,10 +171,10 @@ try_connect (struct GNUNET_DV_Handle *ret) { if (ret->client != NULL) return GNUNET_OK; - ret->client = GNUNET_CLIENT_connect (ret->sched, "dv", ret->cfg); + ret->client = GNUNET_CLIENT_connect ("dv", ret->cfg); if (ret->client != NULL) return GNUNET_YES; -#if DEBUG_STATISTICS +#if DEBUG_DV_MESSAGES GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _("Failed to connect to the dv service!\n")); #endif @@ -134,6 +185,9 @@ static void process_pending_message(struct GNUNET_DV_Handle *handle); /** * Send complete, schedule next + * + * @param handle handle to the dv service + * @param code return code for send (unused) */ static void finish (struct GNUNET_DV_Handle *handle, int code) @@ -142,32 +196,68 @@ finish (struct GNUNET_DV_Handle *handle, int code) handle->current = NULL; process_pending_message (handle); + GNUNET_free(pos->msg); GNUNET_free (pos); } - +/** + * Notification that we can send data + * + * @param cls handle to the dv service (struct GNUNET_DV_Handle) + * @param size how many bytes can we send + * @param buf where to copy the message to send + * + * @return how many bytes we copied to buf + */ static size_t transmit_pending (void *cls, size_t size, void *buf) { struct GNUNET_DV_Handle *handle = cls; size_t ret; + size_t tsize; + +#if DEBUG_DV + if (handle->current != NULL) + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending called with message type %d\n", ntohs(handle->current->msg->header.type)); +#endif if (buf == NULL) { +#if DEBUG_DV + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n"); +#endif finish(handle, GNUNET_SYSERR); return 0; } handle->th = NULL; + ret = 0; + + if (handle->current != NULL) + { + tsize = ntohs(handle->current->msg->header.size); + if (size >= tsize) + { + memcpy(buf, handle->current->msg, tsize); +#if DEBUG_DV + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Copied %d bytes into buffer!\n\n\n", tsize); +#endif + finish(handle, GNUNET_OK); + return tsize; + } + + } + return ret; } /** * Try to send messages from list of messages to send + * + * @param handle handle to the distance vector service */ static void process_pending_message(struct GNUNET_DV_Handle *handle) { - struct GNUNET_TIME_Relative timeout; if (handle->current != NULL) return; /* action already pending */ @@ -181,25 +271,19 @@ static void process_pending_message(struct GNUNET_DV_Handle *handle) handle->current = handle->pending_list; if (NULL == handle->current) { - if (handle->do_destroy) - { - handle->do_destroy = GNUNET_NO; - //GNUNET_DV_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */ - } return; } handle->pending_list = handle->pending_list->next; handle->current->next = NULL; - timeout = GNUNET_TIME_absolute_get_remaining (handle->current->timeout); if (NULL == (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs(handle->current->msg->msgbuf_size), - timeout, - GNUNET_YES, - &transmit_pending, handle))) + ntohs(handle->current->msg->header.size), + handle->current->msg->timeout, + GNUNET_YES, + &transmit_pending, handle))) { -#if DEBUG_STATISTICS +#if DEBUG_DV GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to dv service.\n"); #endif @@ -230,56 +314,103 @@ static void add_pending(struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMe last = pos; pos = pos->next; } - new_message->next = last->next; /* Should always be null */ last->next = new_message; } else { - new_message->next = handle->pending_list; /* Will always be null */ handle->pending_list = new_message; } process_pending_message(handle); } - - - +/** + * Handles a message sent from the DV service to us. + * Parse it out and give it to the plugin. + * + * @param cls the handle to the DV API + * @param msg the message that was received + */ void handle_message_receipt (void *cls, const struct GNUNET_MessageHeader * msg) { struct GNUNET_DV_Handle *handle = cls; struct GNUNET_DV_MessageReceived *received_msg; + struct GNUNET_DV_SendResultMessage *send_result_msg; size_t packed_msg_len; size_t sender_address_len; char *sender_address; char *packed_msg; + char *packed_msg_start; + GNUNET_HashCode uidhash; + struct SendCallbackContext *send_ctx; + + if (msg == NULL) + { +#if DEBUG_DV_MESSAGES + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: connection closed\n"); +#endif + return; /* Connection closed? */ + } - GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE); - - if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived)) - return; - - received_msg = (struct GNUNET_DV_MessageReceived *)msg; - packed_msg_len = ntohs(received_msg->msg_len); - sender_address_len = ntohs(received_msg->sender_address_len); - GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len)); + GNUNET_assert((ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE) || (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT)); - sender_address = GNUNET_malloc(sender_address_len); - memcpy(sender_address, &received_msg[1], sender_address_len); - packed_msg = GNUNET_malloc(packed_msg_len); - memcpy(packed_msg, &received_msg[1 + sender_address_len], packed_msg_len); + switch (ntohs(msg->type)) + { + case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE: + if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived)) + return; - handle->receive_handler(handle->receive_cls, - &received_msg->sender, - packed_msg, - packed_msg_len, - ntohl(received_msg->distance), - sender_address, - sender_address_len); + received_msg = (struct GNUNET_DV_MessageReceived *)msg; + packed_msg_len = ntohl(received_msg->msg_len); + sender_address_len = ntohs(msg->size) - packed_msg_len - sizeof(struct GNUNET_DV_MessageReceived); + GNUNET_assert(sender_address_len > 0); + sender_address = GNUNET_malloc(sender_address_len); + memcpy(sender_address, &received_msg[1], sender_address_len); + packed_msg_start = (char *)&received_msg[1]; + packed_msg = GNUNET_malloc(packed_msg_len); + memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len); + +#if DEBUG_DV_MESSAGES + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type); + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: message sender reported as %s\n", GNUNET_i2s(&received_msg->sender)); + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", ntohl(received_msg->distance)); +#endif - GNUNET_free(sender_address); + handle->receive_handler(handle->receive_cls, + &received_msg->sender, + packed_msg, + packed_msg_len, + ntohl(received_msg->distance), + sender_address, + sender_address_len); + + GNUNET_free(sender_address); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT: + if (ntohs(msg->size) < sizeof(struct GNUNET_DV_SendResultMessage)) + return; + send_result_msg = (struct GNUNET_DV_SendResultMessage *)msg; + hash_from_uid(ntohl(send_result_msg->uid), &uidhash); + send_ctx = GNUNET_CONTAINER_multihashmap_get(handle->send_callbacks, &uidhash); + + if ((send_ctx != NULL) && (send_ctx->cont != NULL)) + { + if (ntohl(send_result_msg->result) == 0) + { + send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_OK); + } + else + { + send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR); + } + } + GNUNET_free_non_null(send_ctx); + break; + default: + break; + } GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle, GNUNET_TIME_UNIT_FOREVER_REL); @@ -297,6 +428,8 @@ void handle_message_receipt (void *cls, * @param timeout how long can this message be delayed (pass through to core) * @param addr the address of this peer (internally known to DV) * @param addrlen the length of the peer address + * @param cont continuation to call once the message has been sent (or failed) + * @param cont_cls closure for continuation * */ int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle, @@ -306,32 +439,92 @@ int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle, unsigned int priority, struct GNUNET_TIME_Relative timeout, const void *addr, - size_t addrlen) + size_t addrlen, + GNUNET_TRANSPORT_TransmitContinuation + cont, void *cont_cls) { struct GNUNET_DV_SendMessage *msg; + struct SendCallbackContext *send_ctx; + char *end_of_message; + GNUNET_HashCode uidhash; + int msize; +#if DEBUG_DV_MESSAGES + dv_handle->uid_gen = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); +#else + dv_handle->uid_gen++; +#endif - msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen); - msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen); + msize = sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size; + msg = GNUNET_malloc(msize); + msg->header.size = htons(msize); msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND); memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity)); - msg->msgbuf = GNUNET_malloc(msgbuf_size); - memcpy(msg->msgbuf, msgbuf, msgbuf_size); - msg->msgbuf_size = htons(msgbuf_size); msg->priority = htonl(priority); msg->timeout = timeout; - msg->addrlen = htons(addrlen); + msg->addrlen = htonl(addrlen); + msg->uid = htonl(dv_handle->uid_gen); memcpy(&msg[1], addr, addrlen); - + end_of_message = (char *)&msg[1]; + end_of_message = &end_of_message[addrlen]; + memcpy(end_of_message, msgbuf, msgbuf_size); add_pending(dv_handle, msg); - process_pending_message(dv_handle); + send_ctx = GNUNET_malloc(sizeof(struct SendCallbackContext)); + send_ctx->cont = cont; + send_ctx->cont_cls = cont_cls; + memcpy(&send_ctx->target, target, sizeof(struct GNUNET_PeerIdentity)); + hash_from_uid(dv_handle->uid_gen, &uidhash); + GNUNET_CONTAINER_multihashmap_put(dv_handle->send_callbacks, &uidhash, send_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); return GNUNET_OK; } +/** + * Callback to transmit a start message to + * the DV service, once we can send + * + * @param cls struct StartContext + * @param size how much can we send + * @param buf where to copy the message + * + * @return number of bytes copied to buf + */ +static size_t +transmit_start (void *cls, size_t size, void *buf) +{ + struct StartContext *start_context = cls; + struct GNUNET_DV_Handle *handle = start_context->handle; + size_t tsize; +#if DEBUG_DV + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: sending start request to service\n"); +#endif + if (buf == NULL) + { + GNUNET_free(start_context->message); + GNUNET_free(start_context); + GNUNET_DV_disconnect(handle); + return 0; + } + + tsize = ntohs(start_context->message->size); + if (size >= tsize) + { + memcpy(buf, start_context->message, tsize); + GNUNET_free(start_context->message); + GNUNET_free(start_context); + GNUNET_CLIENT_receive (handle->client, + &handle_message_receipt, + handle, GNUNET_TIME_UNIT_FOREVER_REL); + + + return tsize; + } + + return 0; +} + /** * Connect to the DV service * - * @param sched the scheduler to use * @param cfg the configuration to use * @param receive_handler method call when on receipt from the service * @param receive_handler_cls closure for receive_handler @@ -339,31 +532,43 @@ int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle, * @return handle to the DV service */ struct GNUNET_DV_Handle * -GNUNET_DV_connect (struct GNUNET_SCHEDULER_Handle *sched, - const struct GNUNET_CONFIGURATION_Handle *cfg, +GNUNET_DV_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_DV_MessageReceivedHandler receive_handler, void *receive_handler_cls) { struct GNUNET_DV_Handle *handle; - + struct GNUNET_MessageHeader *start_message; + struct StartContext *start_context; handle = GNUNET_malloc(sizeof(struct GNUNET_DV_Handle)); handle->cfg = cfg; - handle->sched = sched; handle->pending_list = NULL; handle->current = NULL; - handle->do_destroy = GNUNET_NO; handle->th = NULL; - handle->client = GNUNET_CLIENT_connect(sched, "dv", cfg); + handle->client = GNUNET_CLIENT_connect("dv", cfg); handle->receive_handler = receive_handler; handle->receive_cls = receive_handler_cls; if (handle->client == NULL) - return NULL; + { + GNUNET_free(handle); + return NULL; + } - GNUNET_CLIENT_receive (handle->client, - &handle_message_receipt, - handle, GNUNET_TIME_UNIT_FOREVER_REL); + start_message = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader)); + start_message->size = htons(sizeof(struct GNUNET_MessageHeader)); + start_message->type = htons(GNUNET_MESSAGE_TYPE_DV_START); + + start_context = GNUNET_malloc(sizeof(struct StartContext)); + start_context->handle = handle; + start_context->message = start_message; + GNUNET_CLIENT_notify_transmit_ready (handle->client, + sizeof(struct GNUNET_MessageHeader), + GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60), + GNUNET_YES, + &transmit_start, start_context); + + handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create(100); return handle; }