X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdv%2Fdv_api.c;h=9ab15ea627a595159b03b0d83b2caaa326e96de3;hb=1bf55b33a415d584ae338a0afea5b51d059f11ab;hp=4418f3a0f98a015a5d6bc67dfef426701c591abf;hpb=7b8e790147f04a3223040999e38562b9c7218abd;p=oweals%2Fgnunet.git diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index 4418f3a0f..9ab15ea62 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c @@ -36,6 +36,7 @@ #include "gnunet_time_lib.h" #include "gnunet_dv_service.h" #include "dv.h" +#include "../transport/plugin_transport.h" struct PendingMessages @@ -109,12 +110,21 @@ struct GNUNET_DV_Handle */ void *receive_cls; + /** + * Current unique ID + */ + uint32_t uid_gen; + + /** + * Hashmap containing outstanding send requests awaiting confirmation. + */ + struct GNUNET_CONTAINER_MultiHashMap *send_callbacks; + }; struct StartContext { - /** * Start message */ @@ -126,6 +136,37 @@ struct StartContext struct GNUNET_DV_Handle *handle; }; +struct SendCallbackContext +{ + /** + * The continuation to call once a message is confirmed sent (or failed) + */ + GNUNET_TRANSPORT_TransmitContinuation cont; + + /** + * Closure to call with send continuation. + */ + void *cont_cls; + + /** + * Target of the message. + */ + struct GNUNET_PeerIdentity target; +}; + +/** + * Convert unique ID to hash code. + * + * @param uid unique ID to convert + * @param hash set to uid (extended with zeros) + */ +static void +hash_from_uid (uint32_t uid, + GNUNET_HashCode *hash) +{ + memset (hash, 0, sizeof(GNUNET_HashCode)); + *((uint32_t*)hash) = uid; +} /** * Try to (re)connect to the dv service. @@ -159,6 +200,7 @@ finish (struct GNUNET_DV_Handle *handle, int code) handle->current = NULL; process_pending_message (handle); + GNUNET_free(pos->msg); GNUNET_free (pos); } @@ -170,8 +212,16 @@ transmit_pending (void *cls, size_t size, void *buf) size_t ret; size_t tsize; +#if DEBUG_DV + if (handle->current != NULL) + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending called with message type %d\n", ntohs(handle->current->msg->header.type)); +#endif + if (buf == NULL) { +#if DEBUG_DV + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n"); +#endif finish(handle, GNUNET_SYSERR); return 0; } @@ -185,11 +235,13 @@ transmit_pending (void *cls, size_t size, void *buf) if (size >= tsize) { memcpy(buf, handle->current->msg, tsize); +#if DEBUG_DV + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Copied %d bytes into buffer!\n\n\n", tsize); +#endif + finish(handle, GNUNET_OK); + return tsize; } - else - { - return ret; - } + } return ret; @@ -200,7 +252,6 @@ transmit_pending (void *cls, size_t size, void *buf) */ static void process_pending_message(struct GNUNET_DV_Handle *handle) { - struct GNUNET_TIME_Relative timeout; if (handle->current != NULL) return; /* action already pending */ @@ -224,11 +275,10 @@ static void process_pending_message(struct GNUNET_DV_Handle *handle) handle->pending_list = handle->pending_list->next; handle->current->next = NULL; - timeout = GNUNET_TIME_absolute_get_remaining (handle->current->timeout); if (NULL == (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs(handle->current->msg->msgbuf_size), - timeout, + ntohl(handle->current->msg->msgbuf_size), + handle->current->msg->timeout, GNUNET_YES, &transmit_pending, handle))) { @@ -263,62 +313,101 @@ static void add_pending(struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMe last = pos; pos = pos->next; } - new_message->next = last->next; /* Should always be null */ last->next = new_message; } else { - new_message->next = handle->pending_list; /* Will always be null */ handle->pending_list = new_message; } process_pending_message(handle); } - +/** + * Handles a message sent from the DV service to us. + * Parse it out and give it to the plugin. + * + * @param cls the handle to the DV API + * @param msg the message that was received + */ void handle_message_receipt (void *cls, const struct GNUNET_MessageHeader * msg) { struct GNUNET_DV_Handle *handle = cls; struct GNUNET_DV_MessageReceived *received_msg; + struct GNUNET_DV_SendResultMessage *send_result_msg; size_t packed_msg_len; size_t sender_address_len; char *sender_address; char *packed_msg; + char *packed_msg_start; + GNUNET_HashCode uidhash; + struct SendCallbackContext *send_ctx; if (msg == NULL) { return; /* Connection closed? */ } - GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE); + GNUNET_assert((ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE) || (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT)); - if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived)) - return; + switch (ntohs(msg->type)) + { + case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE: + if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived)) + return; - received_msg = (struct GNUNET_DV_MessageReceived *)msg; - packed_msg_len = ntohs(received_msg->msg_len); - sender_address_len = ntohs(received_msg->sender_address_len); -#if DEBUG_DV - fprintf(stdout, "dv api receives message from service: total len: %lu, packed len: %lu, sender_address_len: %lu, base message len: %lu\ntotal is %lu, should be %lu\n", ntohs(msg->size), packed_msg_len, sender_address_len, sizeof(struct GNUNET_DV_MessageReceived), sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len, ntohs(msg->size)); -#endif - GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len)); + received_msg = (struct GNUNET_DV_MessageReceived *)msg; + packed_msg_len = ntohl(received_msg->msg_len); + sender_address_len = ntohl(received_msg->sender_address_len); - sender_address = GNUNET_malloc(sender_address_len); - memcpy(sender_address, &received_msg[1], sender_address_len); - packed_msg = GNUNET_malloc(packed_msg_len); - memcpy(packed_msg, &received_msg[1 + sender_address_len], packed_msg_len); + GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len)); + sender_address = GNUNET_malloc(sender_address_len); + memcpy(sender_address, &received_msg[1], sender_address_len); + packed_msg_start = (char *)&received_msg[1]; + packed_msg = GNUNET_malloc(packed_msg_len); + memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len); - handle->receive_handler(handle->receive_cls, - &received_msg->sender, - packed_msg, - packed_msg_len, - ntohl(received_msg->distance), - sender_address, - sender_address_len); +#if DEBUG_DV + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type); + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: message sender reported as %s\n", GNUNET_i2s(&received_msg->sender)); + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n", ntohl(received_msg->distance)); +#endif - GNUNET_free(sender_address); + handle->receive_handler(handle->receive_cls, + &received_msg->sender, + packed_msg, + packed_msg_len, + ntohl(received_msg->distance), + sender_address, + sender_address_len); + + GNUNET_free(sender_address); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT: + if (ntohs(msg->size) < sizeof(struct GNUNET_DV_SendResultMessage)) + return; + send_result_msg = (struct GNUNET_DV_SendResultMessage *)msg; + hash_from_uid(ntohl(send_result_msg->uid), &uidhash); + send_ctx = GNUNET_CONTAINER_multihashmap_get(handle->send_callbacks, &uidhash); + + if ((send_ctx != NULL) && (send_ctx->cont != NULL)) + { + if (ntohl(send_result_msg->result) == 0) + { + send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_OK); + } + else + { + send_ctx->cont(send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR); + } + } + GNUNET_free_non_null(send_ctx); + break; + default: + break; + } GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle, GNUNET_TIME_UNIT_FOREVER_REL); @@ -336,6 +425,8 @@ void handle_message_receipt (void *cls, * @param timeout how long can this message be delayed (pass through to core) * @param addr the address of this peer (internally known to DV) * @param addrlen the length of the peer address + * @param cont continuation to call once the message has been sent (or failed) + * @param cont_cls closure for continuation * */ int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle, @@ -345,23 +436,42 @@ int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle, unsigned int priority, struct GNUNET_TIME_Relative timeout, const void *addr, - size_t addrlen) + size_t addrlen, + GNUNET_TRANSPORT_TransmitContinuation + cont, void *cont_cls) { struct GNUNET_DV_SendMessage *msg; + struct SendCallbackContext *send_ctx; + char *end_of_message; + GNUNET_HashCode uidhash; + int msize; +#if DEBUG_DV_MESSAGES + dv_handle->uid_gen = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); +#else + dv_handle->uid_gen++; +#endif - msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen); - msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen); + msize = sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size; + msg = GNUNET_malloc(msize); + msg->header.size = htons(msize); msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND); memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity)); - msg->msgbuf = GNUNET_malloc(msgbuf_size); - memcpy(msg->msgbuf, msgbuf, msgbuf_size); - msg->msgbuf_size = htons(msgbuf_size); + msg->msgbuf_size = htonl(msgbuf_size); msg->priority = htonl(priority); msg->timeout = timeout; - msg->addrlen = htons(addrlen); + msg->addrlen = htonl(addrlen); + msg->uid = htonl(dv_handle->uid_gen); memcpy(&msg[1], addr, addrlen); - + end_of_message = (char *)&msg[1]; + end_of_message = &end_of_message[addrlen]; + memcpy(end_of_message, msgbuf, msgbuf_size); add_pending(dv_handle, msg); + send_ctx = GNUNET_malloc(sizeof(struct SendCallbackContext)); + send_ctx->cont = cont; + send_ctx->cont_cls = cont_cls; + memcpy(&send_ctx->target, target, sizeof(struct GNUNET_PeerIdentity)); + hash_from_uid(dv_handle->uid_gen, &uidhash); + GNUNET_CONTAINER_multihashmap_put(dv_handle->send_callbacks, &uidhash, send_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); return GNUNET_OK; } @@ -388,6 +498,8 @@ transmit_start (void *cls, size_t size, void *buf) if (size >= tsize) { memcpy(buf, start_context->message, tsize); + GNUNET_free(start_context->message); + GNUNET_free(start_context); return tsize; } @@ -444,6 +556,8 @@ GNUNET_DV_connect (struct GNUNET_SCHEDULER_Handle *sched, GNUNET_YES, &transmit_start, start_context); + handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create(100); + GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle, GNUNET_TIME_UNIT_FOREVER_REL);