From 90c0d0137a71dc752066ae2922ae0dedf6f8b63e Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Sun, 4 Apr 2010 14:14:15 +0000 Subject: [PATCH] service does simple put and get into datacache, test case verifies it works --- src/dht/dht.h | 8 +- src/dht/dht_api.c | 511 +++++++++++++++++++++-------------- src/dht/gnunet-service-dht.c | 433 ++++++++++++++++++++--------- src/dht/test_dht_api.c | 128 +++++---- 4 files changed, 695 insertions(+), 385 deletions(-) diff --git a/src/dht/dht.h b/src/dht/dht.h index 68591a97e..6c2f4cbd8 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -30,7 +30,8 @@ #define DEBUG_DHT GNUNET_NO typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls, - struct GNUNET_MessageHeader *msg); + struct GNUNET_MessageHeader + * msg); /** * Generic DHT message, wrapper for other message types @@ -158,6 +159,11 @@ struct GNUNET_DHT_GetResultMessage */ GNUNET_HashCode key; + /** + * When does this entry expire? + */ + struct GNUNET_TIME_Absolute expiration; + /** * The size of the data, appended to the end of this message. */ diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index f935d69d3..d4fc9296b 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -39,7 +39,7 @@ #include "gnunet_dht_service.h" #include "dht.h" -#define DEBUG_DHT_API GNUNET_NO +#define DEBUG_DHT_API GNUNET_YES #define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) @@ -193,12 +193,12 @@ struct GNUNET_DHT_FindPeerHandle /** * Handle to the actual route operation for the request */ - struct GNUNET_DHT_RouteHandle *route_handle; + struct GNUNET_DHT_RouteHandle *route_handle; /** * The context of the get request */ - struct GNUNET_DHT_FindPeerContext find_peer_context; + struct GNUNET_DHT_FindPeerContext find_peer_context; }; @@ -257,23 +257,24 @@ struct GNUNET_DHT_Handle static struct GNUNET_TIME_Relative default_request_timeout; /* Forward declaration */ -static void process_pending_message(struct GNUNET_DHT_Handle *handle); +static void process_pending_message (struct GNUNET_DHT_Handle *handle); -static GNUNET_HashCode * hash_from_uid(uint64_t uid) +static GNUNET_HashCode * +hash_from_uid (uint64_t uid) { int count; int remaining; GNUNET_HashCode *hash; - hash = GNUNET_malloc(sizeof(GNUNET_HashCode)); + hash = GNUNET_malloc (sizeof (GNUNET_HashCode)); count = 0; - while (count < sizeof(GNUNET_HashCode)) + while (count < sizeof (GNUNET_HashCode)) { - remaining = sizeof(GNUNET_HashCode) - count; - if (remaining > sizeof(uid)) - remaining = sizeof(uid); + remaining = sizeof (GNUNET_HashCode) - count; + if (remaining > sizeof (uid)) + remaining = sizeof (uid); - memcpy(hash, &uid, remaining); + memcpy (hash, &uid, remaining); count += remaining; } @@ -285,8 +286,8 @@ static GNUNET_HashCode * hash_from_uid(uint64_t uid) * a demultiplexer which handles numerous message types * */ -void service_message_handler (void *cls, - const struct GNUNET_MessageHeader *msg) +void +service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; struct GNUNET_DHT_Message *dht_msg; @@ -300,73 +301,89 @@ void service_message_handler (void *cls, * Should be a non unique acknowledgment, or unique result. */ if (msg == NULL) - { + { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received NULL from server, connection down?\n", "DHT API"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received NULL from server, connection down?\n", + "DHT API"); #endif - return; - } + return; + } - if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT) - { - dht_msg = (struct GNUNET_DHT_Message *)msg; - uid = GNUNET_ntohll(dht_msg->unique_id); + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_DHT: + { + dht_msg = (struct GNUNET_DHT_Message *) msg; + uid = GNUNET_ntohll (dht_msg->unique_id); #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu)\n", "DHT API", uid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu)\n", + "DHT API", uid); #endif - if (ntohs(dht_msg->unique)) - { - uid_hash = hash_from_uid(ntohl(dht_msg->unique_id)); - route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash); - GNUNET_free(uid_hash); - if (route_handle == NULL) /* We have no recollection of this request */ + if (ntohs (dht_msg->unique)) { + uid_hash = hash_from_uid (uid); + route_handle = + GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests, + uid_hash); + GNUNET_free (uid_hash); + if (route_handle == NULL) /* We have no recollection of this request */ + { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu), but have no recollection of it!\n", + "DHT API", uid); #endif + } + else + { + enc_size = + ntohs (dht_msg->header.size) - + sizeof (struct GNUNET_DHT_Message); + GNUNET_assert (enc_size > 0); + enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; + route_handle->iter (route_handle->iter_cls, enc_msg); + + } } - else - { - enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message); - GNUNET_assert(enc_size > 0); - enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; - route_handle->iter(route_handle->iter_cls, enc_msg); - } + break; } - } - else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP) - { - stop_msg = (struct GNUNET_DHT_StopMessage *)msg; - uid = GNUNET_ntohll(stop_msg->unique_id); -#if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id); -#endif - if (handle->current->unique_id == uid) + case GNUNET_MESSAGE_TYPE_DHT_STOP: { + stop_msg = (struct GNUNET_DHT_StopMessage *) msg; + uid = GNUNET_ntohll (stop_msg->unique_id); #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Have pending confirmation for this message!\n", "DHT API", uid); + "`%s': Received response to message (uid %llu), current uid %llu\n", + "DHT API", uid, handle->current->unique_id); #endif - if (handle->current->cont != NULL) - GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE); - - GNUNET_free(handle->current->msg); - GNUNET_free(handle->current); - handle->current = NULL; - } - } - else - { + if (handle->current->unique_id == uid) + { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Have pending confirmation for this message!\n", + "DHT API", uid); #endif - } - + if (handle->current->cont != NULL) + GNUNET_SCHEDULER_add_continuation (handle->sched, + handle->current->cont, + handle->current->cont_cls, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + + GNUNET_free (handle->current->msg); + GNUNET_free (handle->current); + handle->current = NULL; + } + break; + } + default: + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "`%s': Received unknown message type %d\n", "DHT API", + ntohs (msg->type)); + } + } GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); @@ -391,9 +408,10 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, { struct GNUNET_DHT_Handle *handle; - handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle)); + handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle)); - default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); + default_request_timeout = + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); handle->cfg = cfg; handle->sched = sched; @@ -401,12 +419,13 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, handle->do_destroy = GNUNET_NO; handle->th = NULL; - handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg); - handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len); + handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); + handle->outstanding_requests = + GNUNET_CONTAINER_multihashmap_create (ht_len); if (handle->client == NULL) { - GNUNET_free(handle); + GNUNET_free (handle); return NULL; } #if DEBUG_DHT_API @@ -433,17 +452,17 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); #endif - GNUNET_assert(handle != NULL); + GNUNET_assert (handle != NULL); - if (handle->th != NULL) /* We have a live transmit request in the Aether */ + if (handle->th != NULL) /* We have a live transmit request in the Aether */ { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; } - if (handle->current != NULL) /* We are trying to send something now, clean it up */ - GNUNET_free(handle->current); + if (handle->current != NULL) /* We are trying to send something now, clean it up */ + GNUNET_free (handle->current); - if (handle->client != NULL) /* Finally, disconnect from the service */ + if (handle->client != NULL) /* Finally, disconnect from the service */ { GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); handle->client = NULL; @@ -462,24 +481,27 @@ finish (struct GNUNET_DHT_Handle *handle, int code) /* TODO: if code is not GNUNET_OK, do something! */ struct PendingMessage *pos = handle->current; #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Finish called!\n", "DHT API"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); #endif - GNUNET_assert(pos != NULL); + GNUNET_assert (pos != NULL); if (pos->is_unique) { if (pos->cont != NULL) - { - if (code == GNUNET_SYSERR) - GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT); - else - GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE); - } - - GNUNET_free(pos->msg); + { + if (code == GNUNET_SYSERR) + GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, + pos->cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); + else + GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, + pos->cont_cls, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + } + + GNUNET_free (pos->msg); handle->current = NULL; - GNUNET_free(pos); + GNUNET_free (pos); } /* Otherwise we need to wait for a response to this message! */ } @@ -494,8 +516,8 @@ transmit_pending (void *cls, size_t size, void *buf) size_t tsize; #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': In transmit_pending\n", "DHT API"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': In transmit_pending\n", "DHT API"); #endif if (buf == NULL) { @@ -504,30 +526,30 @@ transmit_pending (void *cls, size_t size, void *buf) "`%s': In transmit_pending buf is NULL\n", "DHT API"); #endif /* FIXME: free associated resources or summat */ - finish(handle, GNUNET_SYSERR); + finish (handle, GNUNET_SYSERR); return 0; } handle->th = NULL; if (handle->current != NULL) - { - tsize = ntohs(handle->current->msg->size); - if (size >= tsize) { + tsize = ntohs (handle->current->msg->size); + if (size >= tsize) + { #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending message size %d\n", "DHT API", tsize); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending message size %d\n", "DHT API", tsize); #endif - memcpy(buf, handle->current->msg, tsize); - finish(handle, GNUNET_OK); - return tsize; + memcpy (buf, handle->current->msg, tsize); + finish (handle, GNUNET_OK); + return tsize; + } + else + { + return 0; + } } - else - { - return 0; - } - } /* Have no pending request */ return 0; } @@ -557,7 +579,8 @@ try_connect (struct GNUNET_DHT_Handle *handle) /** * Try to send messages from list of messages to send */ -static void process_pending_message(struct GNUNET_DHT_Handle *handle) +static void +process_pending_message (struct GNUNET_DHT_Handle *handle) { if (handle->current == NULL) @@ -577,10 +600,13 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle) if (NULL == (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, - ntohs(handle->current->msg->size), - handle->current->timeout, - GNUNET_YES, - &transmit_pending, handle))) + ntohs (handle-> + current->msg-> + size), + handle->current-> + timeout, GNUNET_YES, + &transmit_pending, + handle))) { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -590,7 +616,8 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle) } #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size)); + "`%s': Scheduled sending message of size %d to service\n", + "DHT API", ntohs (handle->current->msg->size)); #endif } @@ -598,10 +625,28 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle) * Iterator called on each result obtained from a generic route * operation */ -void get_reply_iterator (void *cls, - const struct GNUNET_MessageHeader *reply) +void +get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) { + struct GNUNET_DHT_GetHandle *get_handle = cls; + struct GNUNET_DHT_GetResultMessage *result; + size_t data_size; + char *result_data; + if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT) + return; + + GNUNET_assert (ntohs (reply->size) >= + sizeof (struct GNUNET_DHT_GetResultMessage)); + result = (struct GNUNET_DHT_GetResultMessage *) reply; + data_size = ntohs (result->data_size); + GNUNET_assert (ntohs (reply->size) == + sizeof (struct GNUNET_DHT_GetResultMessage) + data_size); + result_data = (char *) &result[1]; /* Set data pointer to end of message */ + + get_handle->get_context.iter (get_handle->get_context.iter_cls, + result->expiration, &result->key, + ntohs (result->type), data_size, result_data); } @@ -609,10 +654,32 @@ void get_reply_iterator (void *cls, * Iterator called on each result obtained from a generic route * operation */ -void find_peer_reply_iterator (void *cls, - const struct GNUNET_MessageHeader *reply) +void +find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) { + struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; + struct GNUNET_DHT_FindPeerResultMessage *result; + size_t data_size; + struct GNUNET_MessageHeader *result_data; + if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) + return; + + GNUNET_assert (ntohs (reply->size) >= + sizeof (struct GNUNET_DHT_FindPeerResultMessage)); + result = (struct GNUNET_DHT_FindPeerResultMessage *) reply; + data_size = ntohs (result->data_size); + GNUNET_assert (ntohs (reply->size) == + sizeof (struct GNUNET_DHT_GetResultMessage) + data_size); + + if (data_size > 0) + result_data = (struct GNUNET_MessageHeader *) &result[1]; /* Set data pointer to end of message */ + else + result_data = NULL; + + find_peer_handle->find_peer_context.proc (find_peer_handle-> + find_peer_context.proc_cls, + &result->peer, result_data); } /** @@ -639,15 +706,14 @@ void find_peer_reply_iterator (void *cls, */ struct GNUNET_DHT_RouteHandle * GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, - const GNUNET_HashCode *key, + const GNUNET_HashCode * key, unsigned int desired_replication_level, enum GNUNET_DHT_RouteOption options, const struct GNUNET_MessageHeader *enc, struct GNUNET_TIME_Relative timeout, GNUNET_DHT_ReplyProcessor iter, void *iter_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_RouteHandle *route_handle; struct PendingMessage *pending; @@ -665,48 +731,52 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, uid_key = NULL; do - { - GNUNET_free_non_null(uid_key); - uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); - uid_key = hash_from_uid(uid); - } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES); + { + GNUNET_free_non_null (uid_key); + uid = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, -1); + uid_key = hash_from_uid (uid); + } + while (GNUNET_CONTAINER_multihashmap_contains + (handle->outstanding_requests, uid_key) == GNUNET_YES); if (is_unique) { - route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle)); - memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode)); + route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); + memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); route_handle->iter = iter; route_handle->iter_cls = iter_cls; route_handle->dht_handle = handle; route_handle->uid = uid; #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Unique ID is %llu\n", "DHT API", uid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Unique ID is %llu\n", "DHT API", uid); #endif /** * Store based on random identifier! */ - GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); + GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, + uid_key, route_handle, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size); } else { - msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); + msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size); } - GNUNET_free(uid_key); - message = GNUNET_malloc(msize); - message->header.size = htons(msize); - message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT); - memcpy(&message->key, key, sizeof(GNUNET_HashCode)); - message->options = htons(options); - message->desired_replication_level = htons(options); - message->unique = htons(is_unique); - message->unique_id = GNUNET_htonll(uid); - memcpy(&message[1], enc, ntohs(enc->size)); - - pending = GNUNET_malloc(sizeof(struct PendingMessage)); + GNUNET_free (uid_key); + message = GNUNET_malloc (msize); + message->header.size = htons (msize); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT); + memcpy (&message->key, key, sizeof (GNUNET_HashCode)); + message->options = htons (options); + message->desired_replication_level = htons (options); + message->unique = htons (is_unique); + message->unique_id = GNUNET_htonll (uid); + memcpy (&message[1], enc, ntohs (enc->size)); + + pending = GNUNET_malloc (sizeof (struct PendingMessage)); pending->msg = &message->header; pending->timeout = timeout; pending->cont = cont; @@ -714,17 +784,18 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, pending->is_unique = is_unique; pending->unique_id = uid; - GNUNET_assert(handle->current == NULL); + GNUNET_assert (handle->current == NULL); handle->current = pending; - process_pending_message(handle); + process_pending_message (handle); return route_handle; } void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls); +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls); /** * Perform an asynchronous GET operation on the DHT identified. @@ -747,79 +818,86 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, GNUNET_DHT_GetIterator iter, void *iter_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_GetHandle *get_handle; struct GNUNET_DHT_GetMessage *get_msg; - if (handle->current != NULL) /* Can't send right now, we have a pending message... */ + if (handle->current != NULL) /* Can't send right now, we have a pending message... */ return NULL; - get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle)); + get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); get_handle->get_context.iter = iter; get_handle->get_context.iter_cls = iter_cls; #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key)); + "`%s': Inserting pending get request with key %s\n", "DHT API", + GNUNET_h2s (key)); #endif - get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); - get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET); - get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); - get_msg->type = htonl(type); + get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage)); + get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); + get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); + get_msg->type = htonl (type); - get_handle->route_handle = GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_handle, cont, cont_cls); + get_handle->route_handle = + GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout, + &get_reply_iterator, get_handle, cont, cont_cls); return get_handle; } void -GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct PendingMessage *pending; struct GNUNET_DHT_StopMessage *message; size_t msize; GNUNET_HashCode *uid_key; - msize = sizeof(struct GNUNET_DHT_StopMessage); + msize = sizeof (struct GNUNET_DHT_StopMessage); - message = GNUNET_malloc(msize); - message->header.size = htons(msize); - message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); + message = GNUNET_malloc (msize); + message->header.size = htons (msize); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP); #if DEBUG_DHT_API - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Remove outstanding request for uid %llu\n", "DHT API", + route_handle->uid); #endif - message->unique_id = GNUNET_htonll(route_handle->uid); + message->unique_id = GNUNET_htonll (route_handle->uid); - GNUNET_assert(route_handle->dht_handle->current == NULL); + GNUNET_assert (route_handle->dht_handle->current == NULL); - pending = GNUNET_malloc(sizeof(struct PendingMessage)); - pending->msg = (struct GNUNET_MessageHeader *)message; + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = (struct GNUNET_MessageHeader *) message; pending->timeout = DEFAULT_DHT_TIMEOUT; pending->cont = cont; pending->cont_cls = cont_cls; pending->is_unique = GNUNET_NO; pending->unique_id = route_handle->uid; - GNUNET_assert(route_handle->dht_handle->current == NULL); + GNUNET_assert (route_handle->dht_handle->current == NULL); route_handle->dht_handle->current = pending; - process_pending_message(route_handle->dht_handle); + process_pending_message (route_handle->dht_handle); - uid_key = hash_from_uid(route_handle->uid); + uid_key = hash_from_uid (route_handle->uid); - if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES) + if (GNUNET_CONTAINER_multihashmap_remove + (route_handle->dht_handle->outstanding_requests, uid_key, + route_handle) != GNUNET_YES) { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid); + "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", + "DHT API", GNUNET_h2s (uid_key), route_handle->uid); #endif } - GNUNET_free(uid_key); + GNUNET_free (uid_key); return; } @@ -830,14 +908,17 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHED * @param get_handle handle to the GET operation to stop */ void -GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&get_handle->route_handle->key), get_handle->route_handle->uid); + "`%s': Removing pending get request with key %s, uid %llu\n", + "DHT API", GNUNET_h2s (&get_handle->route_handle->key), + get_handle->route_handle->uid); #endif - GNUNET_DHT_route_stop(get_handle->route_handle, cont, cont_cls); - GNUNET_free(get_handle); + GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls); + GNUNET_free (get_handle); } @@ -860,47 +941,53 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, GNUNET_SCHEDULER_T */ struct GNUNET_DHT_FindPeerHandle * GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, - enum GNUNET_DHT_RouteOption options, - struct GNUNET_MessageHeader *message, - const GNUNET_HashCode * key, - GNUNET_DHT_FindPeerProcessor proc, - void *proc_cls, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + struct GNUNET_TIME_Relative timeout, + enum GNUNET_DHT_RouteOption options, + struct GNUNET_MessageHeader *message, + const GNUNET_HashCode * key, + GNUNET_DHT_FindPeerProcessor proc, + void *proc_cls, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_FindPeerHandle *find_peer_handle; struct GNUNET_DHT_FindPeerMessage *find_peer_msg; size_t msize; - if (handle->current != NULL) /* Can't send right now, we have a pending message... */ + if (handle->current != NULL) /* Can't send right now, we have a pending message... */ return NULL; if (message != NULL) - msize = ntohs(message->size); + msize = ntohs (message->size); else msize = 0; - find_peer_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerHandle)); + find_peer_handle = + GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle)); find_peer_handle->find_peer_context.proc = proc; find_peer_handle->find_peer_context.proc_cls = proc_cls; #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending `%s' request with key %s\n", "DHT API", "FIND PEER", GNUNET_h2s(key)); + "`%s': Inserting pending `%s' request with key %s\n", "DHT API", + "FIND PEER", GNUNET_h2s (key)); #endif - find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage) + msize); - find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); - find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage)); + find_peer_msg = + GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + msize); + find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); + find_peer_msg->header.size = + htons (sizeof (struct GNUNET_DHT_FindPeerMessage)); find_peer_msg->msg_len = msize; if (message != NULL) - { - memcpy(&find_peer_msg[1], message, msize); - } + { + memcpy (&find_peer_msg[1], message, msize); + } - find_peer_handle->route_handle = GNUNET_DHT_route_start(handle, key, 0, options, &find_peer_msg->header, timeout, &find_peer_reply_iterator, find_peer_handle, cont, cont_cls); + find_peer_handle->route_handle = + GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg->header, + timeout, &find_peer_reply_iterator, + find_peer_handle, cont, cont_cls); return find_peer_handle; } @@ -910,14 +997,18 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, * @param find_peer_handle GET operation to stop. */ void -GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) +GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) { #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending `%s' request with key %s, uid %llu\n", "DHT API", "FIND PEER", GNUNET_h2s(&find_peer_handle->route_handle->key), find_peer_handle->route_handle->uid); + "`%s': Removing pending `%s' request with key %s, uid %llu\n", + "DHT API", "FIND PEER", + GNUNET_h2s (&find_peer_handle->route_handle->key), + find_peer_handle->route_handle->uid); #endif - GNUNET_DHT_route_stop(find_peer_handle->route_handle, cont, cont_cls); - GNUNET_free(find_peer_handle); + GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls); + GNUNET_free (find_peer_handle); } @@ -946,33 +1037,35 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const char *data, struct GNUNET_TIME_Absolute exp, struct GNUNET_TIME_Relative timeout, - GNUNET_SCHEDULER_Task cont, - void *cont_cls) + GNUNET_SCHEDULER_Task cont, void *cont_cls) { struct GNUNET_DHT_PutMessage *put_msg; size_t msize; if (handle->current != NULL) { - GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT); + GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, + GNUNET_SCHEDULER_REASON_TIMEOUT); return; } #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key)); + "`%s': Inserting pending put request with key %s\n", "DHT API", + GNUNET_h2s (key)); #endif - msize = sizeof(struct GNUNET_DHT_PutMessage) + size; - put_msg = GNUNET_malloc(msize); - put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); - put_msg->header.size = htons(msize); - put_msg->type = htonl(type); - put_msg->data_size = htons(size); + msize = sizeof (struct GNUNET_DHT_PutMessage) + size; + put_msg = GNUNET_malloc (msize); + put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT); + put_msg->header.size = htons (msize); + put_msg->type = htonl (type); + put_msg->data_size = htons (size); put_msg->expiration = exp; - memcpy(&put_msg[1], data, size); + memcpy (&put_msg[1], data, size); - GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls); + GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL, + NULL, cont, cont_cls); - GNUNET_free(put_msg); + GNUNET_free (put_msg); } diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index c6ddb0ab9..f03cb379b 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -40,7 +40,7 @@ /** * Handle to the datacache service (for inserting/retrieving data) */ - struct GNUNET_DATACACHE_Handle *datacache; +struct GNUNET_DATACACHE_Handle *datacache; /** * The main scheduler to use for the DHT service @@ -72,28 +72,60 @@ static struct GNUNET_PeerIdentity my_identity; */ static GNUNET_SCHEDULER_TaskIdentifier cleanup_task; -struct ClientList +/** + * Context for handling results from a get request. + */ +struct DatacacheGetContext { /** - * This is a linked list + * The client to send the result to. + */ + struct GNUNET_SERVER_Client *client; + + /** + * The unique id of this request */ - struct ClientList *next; + unsigned long long unique_id; +}; + +struct DHT_MessageContext +{ /** - * The client in question + * The client this request was received from. */ struct GNUNET_SERVER_Client *client; + + /** + * The key this request was about + */ + GNUNET_HashCode *key; + + /** + * The unique identifier of this request + */ + unsigned long long unique_id; + + /** + * Desired replication level + */ + size_t replication; + + /** + * Any message options for this request + */ + size_t msg_options; }; /** * Server handler for handling locally received dht requests */ static void -handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message); +handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message); static void -handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client, +handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message); static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { @@ -107,28 +139,29 @@ static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { * Core handler for p2p dht get requests. */ static int handle_dht_p2p_get (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance); + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, + uint32_t distance); /** * Core handler for p2p dht put requests. */ static int handle_dht_p2p_put (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance); + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, + uint32_t distance); /** * Core handler for p2p dht find peer requests. */ static int handle_dht_p2p_find_peer (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance); + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader + *message, + struct GNUNET_TIME_Relative latency, + uint32_t distance); static struct GNUNET_CORE_MessageHandler core_handlers[] = { {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_GET, 0}, @@ -138,25 +171,131 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = { }; +static size_t +send_reply (void *cls, size_t size, void *buf) +{ + struct GNUNET_DHT_Message *reply = cls; + + if (buf == NULL) /* Message timed out, that's crappy... */ + { + GNUNET_free (reply); + return 0; + } + + if (size >= ntohs (reply->header.size)) + { + memcpy (buf, reply, ntohs (reply->header.size)); + return ntohs (reply->header.size); + } + else + return 0; +} + + +static void +send_reply_to_client (struct GNUNET_SERVER_Client *client, + struct GNUNET_MessageHeader *message, + unsigned long long uid) +{ + struct GNUNET_DHT_Message *reply; + size_t msize; + size_t tsize; +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Sending reply to client.\n", "DHT"); +#endif + msize = ntohs (message->size); + tsize = sizeof (struct GNUNET_DHT_Message) + msize; + reply = GNUNET_malloc (tsize); + reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT); + reply->header.size = htons (tsize); + if (uid != 0) + reply->unique = htons (GNUNET_YES); + reply->unique_id = GNUNET_htonll (uid); + memcpy (&reply[1], message, msize); + + GNUNET_SERVER_notify_transmit_ready (client, + tsize, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &send_reply, reply); + +} + + +/** + * Iterator for local get request results, return + * GNUNET_OK to continue iteration, anything else + * to stop iteration. + */ +static int +datacache_get_iterator (void *cls, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + uint32_t size, const char *data, uint32_t type) +{ + struct DatacacheGetContext *datacache_get_ctx = cls; + struct GNUNET_DHT_GetResultMessage *get_result; + + get_result = + GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size); + get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); + get_result->header.size = + htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); + get_result->data_size = htons (size); + get_result->expiration = exp; + memcpy (&get_result->key, key, sizeof (GNUNET_HashCode)); + get_result->type = htons (type); + memcpy (&get_result[1], data, size); + + send_reply_to_client (datacache_get_ctx->client, &get_result->header, + datacache_get_ctx->unique_id); + + GNUNET_free (get_result); + return GNUNET_OK; +} /** * Server handler for initiating local dht get requests */ -static void handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, GNUNET_HashCode *key) +static void +handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, + struct DHT_MessageContext *message_context) { #if DEBUG_DHT GNUNET_HashCode get_key; #endif size_t get_type; + unsigned int results; + struct DatacacheGetContext *datacache_get_context; - GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage)); - get_type = ntohs(get_msg->type); + GNUNET_assert (ntohs (get_msg->header.size) >= + sizeof (struct GNUNET_DHT_GetMessage)); + get_type = ntohs (get_msg->type); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET", get_type, GNUNET_h2s(&get_key)); + "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", + "DHT", "GET", get_type, GNUNET_h2s (&get_key), + message_context->unique_id); #endif + datacache_get_context = GNUNET_malloc (sizeof (struct DatacacheGetContext)); + datacache_get_context->client = message_context->client; + datacache_get_context->unique_id = message_context->unique_id; + + results = 0; + if (datacache != NULL) + results = + GNUNET_DATACACHE_get (datacache, message_context->key, get_type, + datacache_get_iterator, datacache_get_context); + +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Found %d results for local `%s' request\n", "DHT", + results, "GET"); +#endif + GNUNET_free (datacache_get_context); /* FIXME: Implement get functionality here */ } @@ -164,14 +303,20 @@ static void handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, GN /** * Server handler for initiating local dht find peer requests */ -static void handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, GNUNET_HashCode *key) +static void +handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, + struct DHT_MessageContext *message_context) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", "DHT", "FIND PEER", GNUNET_h2s(key), ntohs(find_msg->header.size), sizeof(struct GNUNET_DHT_FindPeerMessage)); + "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", + "DHT", "FIND PEER", GNUNET_h2s (message_context->key), + ntohs (find_msg->header.size), + sizeof (struct GNUNET_DHT_FindPeerMessage)); #endif - GNUNET_assert(ntohs(find_msg->header.size) >= sizeof(struct GNUNET_DHT_FindPeerMessage)); + GNUNET_assert (ntohs (find_msg->header.size) >= + sizeof (struct GNUNET_DHT_FindPeerMessage)); /* FIXME: Implement find peer functionality here */ } @@ -180,35 +325,44 @@ static void handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage * /** * Server handler for initiating local dht put requests */ -static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GNUNET_HashCode *key) +static void +handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, + struct DHT_MessageContext *message_context) { size_t put_type; size_t data_size; - char *data; - GNUNET_assert(ntohs(put_msg->header.size) >= sizeof(struct GNUNET_DHT_PutMessage)); + GNUNET_assert (ntohs (put_msg->header.size) >= + sizeof (struct GNUNET_DHT_PutMessage)); - put_type = ntohs(put_msg->type); - data_size = ntohs(put_msg->data_size); + put_type = ntohs (put_msg->type); + data_size = ntohs (put_msg->data_size); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': %s msg total size is %d, data size %d, struct size %d\n", "DHT", "PUT", ntohs(put_msg->header.size), data_size, sizeof(struct GNUNET_DHT_PutMessage)); + "`%s': %s msg total size is %d, data size %d, struct size %d\n", + "DHT", "PUT", ntohs (put_msg->header.size), data_size, + sizeof (struct GNUNET_DHT_PutMessage)); #endif - GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct GNUNET_DHT_PutMessage) + data_size); - data = GNUNET_malloc(data_size); - memcpy(data, &put_msg[1], data_size); + GNUNET_assert (ntohs (put_msg->header.size) == + sizeof (struct GNUNET_DHT_PutMessage) + data_size); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(key)); + "`%s': Received `%s' request from client, message type %d, key %s\n", + "DHT", "PUT", put_type, GNUNET_h2s (message_context->key)); #endif - + /** + * Simplest DHT functionality, store any message we receive a put request for. + */ + if (datacache != NULL) + GNUNET_DATACACHE_put (datacache, message_context->key, data_size, + (char *) &put_msg[1], put_type, + put_msg->expiration); /** * FIXME: Implement dht put request functionality here! */ - GNUNET_free(data); } /** @@ -224,117 +378,145 @@ struct SendConfirmationContext /** * Transmit handle. */ - struct GNUNET_CONNECTION_TransmitHandle * transmit_handle; + struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; }; -static size_t send_confirmation (void *cls, - size_t size, void *buf) +static size_t +send_confirmation (void *cls, size_t size, void *buf) { struct GNUNET_DHT_StopMessage *confirmation_message = cls; - if (buf == NULL) /* Message timed out, that's crappy... */ - { - GNUNET_free(confirmation_message); - return 0; - } + if (buf == NULL) /* Message timed out, that's crappy... */ + { + GNUNET_free (confirmation_message); + return 0; + } - if (size >= ntohs(confirmation_message->header.size)) - { - memcpy(buf, confirmation_message, ntohs(confirmation_message->header.size)); - return ntohs(confirmation_message->header.size); - } + if (size >= ntohs (confirmation_message->header.size)) + { + memcpy (buf, confirmation_message, + ntohs (confirmation_message->header.size)); + return ntohs (confirmation_message->header.size); + } else return 0; } + static void -send_client_receipt_confirmation(struct GNUNET_SERVER_Client *client, uint64_t uid) +send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client, + uint64_t uid) { struct GNUNET_DHT_StopMessage *confirm_message; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Sending receipt confirmation for uid %llu\n", "DHT", uid); + "`%s': Sending receipt confirmation for uid %llu\n", "DHT", + uid); #endif - confirm_message = GNUNET_malloc(sizeof(struct GNUNET_DHT_StopMessage)); - confirm_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); - confirm_message->header.size = htons(sizeof(struct GNUNET_DHT_StopMessage)); - confirm_message->unique_id = GNUNET_htonll(uid); + confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage)); + confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP); + confirm_message->header.size = + htons (sizeof (struct GNUNET_DHT_StopMessage)); + confirm_message->unique_id = GNUNET_htonll (uid); GNUNET_SERVER_notify_transmit_ready (client, - sizeof(struct GNUNET_DHT_StopMessage), - GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5), + sizeof (struct GNUNET_DHT_StopMessage), + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), &send_confirmation, confirm_message); } static void -handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message) +handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)message; + struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message; struct GNUNET_MessageHeader *enc_msg; + struct DHT_MessageContext *message_context; size_t enc_type; - enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; - enc_type = ntohs(enc_msg->type); + enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; + enc_type = ntohs (enc_msg->type); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), GNUNET_ntohll(dht_msg->unique_id)); + "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", + "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key), + GNUNET_ntohll (dht_msg->unique_id)); #endif - /* FIXME: Implement demultiplexing functionality here */ + message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext)); + message_context->client = client; + message_context->key = &dht_msg->key; + message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id); + message_context->replication = ntohs (dht_msg->desired_replication_level); + message_context->msg_options = ntohs (dht_msg->options); + switch (enc_type) { case GNUNET_MESSAGE_TYPE_DHT_GET: - handle_dht_get(cls, (struct GNUNET_DHT_GetMessage *)enc_msg, &dht_msg->key); + handle_dht_get (cls, (struct GNUNET_DHT_GetMessage *) enc_msg, + message_context); break; case GNUNET_MESSAGE_TYPE_DHT_PUT: - handle_dht_put(cls, (struct GNUNET_DHT_PutMessage *)enc_msg, &dht_msg->key); - send_client_receipt_confirmation(client, GNUNET_ntohll(dht_msg->unique_id)); + handle_dht_put (cls, (struct GNUNET_DHT_PutMessage *) enc_msg, + message_context); + send_client_receipt_confirmation (client, + GNUNET_ntohll (dht_msg->unique_id)); break; case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: - handle_dht_find_peer(cls, (struct GNUNET_DHT_FindPeerMessage *)enc_msg, &dht_msg->key); + handle_dht_find_peer (cls, + (struct GNUNET_DHT_FindPeerMessage *) enc_msg, + message_context); break; default: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s': Message type (%d) not handled\n", "DHT", enc_type); } - GNUNET_SERVER_receive_done(client, GNUNET_OK); + GNUNET_free (message_context); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } static void -handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message) +handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - struct GNUNET_DHT_StopMessage * dht_stop_msg = (struct GNUNET_DHT_StopMessage *)message; + struct GNUNET_DHT_StopMessage *dht_stop_msg = + (struct GNUNET_DHT_StopMessage *) message; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", GNUNET_ntohll(dht_stop_msg->unique_id)); + "`%s': Received `%s' request from client, uid %llu\n", "DHT", + "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); #endif - send_client_receipt_confirmation(client, GNUNET_ntohll(dht_stop_msg->unique_id)); - GNUNET_SERVER_receive_done(client, GNUNET_OK); + + /* TODO: Put in demultiplexing here */ + + send_client_receipt_confirmation (client, + GNUNET_ntohll (dht_stop_msg->unique_id)); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** * Core handler for p2p dht get requests. */ -static int handle_dht_p2p_get (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) +static int +handle_dht_p2p_get (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, uint32_t distance) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from another peer\n", "DHT", "GET"); + "`%s': Received `%s' request from another peer\n", "DHT", + "GET"); #endif return GNUNET_YES; @@ -343,15 +525,16 @@ static int handle_dht_p2p_get (void *cls, /** * Core handler for p2p dht put requests. */ -static int handle_dht_p2p_put (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) +static int +handle_dht_p2p_put (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, uint32_t distance) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from another peer\n", "DHT", "PUT"); + "`%s': Received `%s' request from another peer\n", "DHT", + "PUT"); #endif return GNUNET_YES; @@ -360,15 +543,17 @@ static int handle_dht_p2p_put (void *cls, /** * Core handler for p2p dht find peer requests. */ -static int handle_dht_p2p_find_peer (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) +static int +handle_dht_p2p_find_peer (void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, + uint32_t distance) { #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from another peer\n", "DHT", "FIND PEER"); + "`%s': Received `%s' request from another peer\n", "DHT", + "FIND PEER"); #endif return GNUNET_YES; @@ -381,8 +566,7 @@ static int handle_dht_p2p_find_peer (void *cls, * @param tc unused */ static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_CORE_disconnect (coreAPI); } @@ -390,23 +574,25 @@ shutdown_task (void *cls, /** * To be called on core init/fail. */ -void core_init (void *cls, - struct GNUNET_CORE_Handle * server, - const struct GNUNET_PeerIdentity *identity, - const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded * publicKey) +void +core_init (void *cls, + struct GNUNET_CORE_Handle *server, + const struct GNUNET_PeerIdentity *identity, + const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey) { if (server == NULL) { - GNUNET_SCHEDULER_cancel(sched, cleanup_task); - GNUNET_SCHEDULER_add_now(sched, &shutdown_task, NULL); + GNUNET_SCHEDULER_cancel (sched, cleanup_task); + GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); return; } #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Core connection initialized, I am peer: %s\n", "dht", GNUNET_i2s(identity)); + "%s: Core connection initialized, I am peer: %s\n", "dht", + GNUNET_i2s (identity)); #endif - memcpy(&my_identity, identity, sizeof(struct GNUNET_PeerIdentity)); + memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity)); coreAPI = server; } @@ -429,32 +615,31 @@ run (void *cls, datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache"); - client_transmit_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); + client_transmit_timeout = + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); GNUNET_SERVER_add_handlers (server, plugin_handlers); - coreAPI = - GNUNET_CORE_connect (sched, /* Main scheduler */ - cfg, /* Main configuration */ - client_transmit_timeout, /* Delay for connecting */ - NULL, /* FIXME: anything we want to pass around? */ - &core_init, /* Call core_init once connected */ - NULL, /* Don't care about pre-connects */ - NULL, /* Don't care about connects */ - NULL, /* Don't care about disconnects */ - NULL, /* Don't want notified about all incoming messages */ - GNUNET_NO, /* For header only inbound notification */ - NULL, /* Don't want notified about all outbound messages */ - GNUNET_NO, /* For header only outbound notification */ - core_handlers); /* Register these handlers */ + coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */ + cfg, /* Main configuration */ + client_transmit_timeout, /* Delay for connecting */ + NULL, /* FIXME: anything we want to pass around? */ + &core_init, /* Call core_init once connected */ + NULL, /* Don't care about pre-connects */ + NULL, /* Don't care about connects */ + NULL, /* Don't care about disconnects */ + NULL, /* Don't want notified about all incoming messages */ + GNUNET_NO, /* For header only inbound notification */ + NULL, /* Don't want notified about all outbound messages */ + GNUNET_NO, /* For header only outbound notification */ + core_handlers); /* Register these handlers */ if (coreAPI == NULL) return; /* Scheduled the task to clean up when shutdown is called */ cleanup_task = GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, - NULL); + GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, NULL); } diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index 9dc429148..f9d9e008c 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -75,8 +75,7 @@ GNUNET_SCHEDULER_TaskIdentifier die_task; static void -end (void *cls, - const struct GNUNET_SCHEDULER_TaskContext * tc) +end (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { /* do work here */ GNUNET_SCHEDULER_cancel (sched, die_task); @@ -86,15 +85,17 @@ end (void *cls, die_task = GNUNET_SCHEDULER_NO_TASK; if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning FAIL!\n"); - ok = 365; - } + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DHT disconnected, returning FAIL!\n"); + ok = 365; + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning success!\n"); - ok = 0; - } + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DHT disconnected, returning success!\n"); + ok = 0; + } } static void @@ -114,7 +115,7 @@ end_badly () { /* do work here */ #if VERBOSE - fprintf(stderr, "Ending on an unhappy note.\n"); + fprintf (stderr, "Ending on an unhappy note.\n"); #endif GNUNET_DHT_disconnect (p1.dht_handle); @@ -129,18 +130,18 @@ end_badly () * @param cls closure * @param tc context information (why was this task triggered now) */ -void test_find_peer_stop (void *cls, - const struct GNUNET_SCHEDULER_TaskContext * tc) +void +test_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerContext *peer = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer_stop!\n"); if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) - GNUNET_SCHEDULER_add_now(sched, &end_badly, NULL); + GNUNET_SCHEDULER_add_now (sched, &end_badly, NULL); GNUNET_assert (peer->dht_handle != NULL); - GNUNET_DHT_find_peer_stop(peer->find_peer_handle, &end, &p1); + GNUNET_DHT_find_peer_stop (peer->find_peer_handle, &end, &p1); //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &end, &p1); @@ -152,20 +153,22 @@ void test_find_peer_stop (void *cls, * @param cls closure * @param tc context information (why was this task triggered now) */ -void test_find_peer (void *cls, - const struct GNUNET_SCHEDULER_TaskContext * tc) +void +test_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerContext *peer = cls; GNUNET_HashCode hash; - memset(&hash, 42, sizeof(GNUNET_HashCode)); + memset (&hash, 42, sizeof (GNUNET_HashCode)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n"); GNUNET_assert (peer->dht_handle != NULL); - peer->find_peer_handle = GNUNET_DHT_find_peer_start(peer->dht_handle, TIMEOUT, 0, NULL, &hash, NULL, NULL, &test_find_peer_stop, &p1); + peer->find_peer_handle = + GNUNET_DHT_find_peer_start (peer->dht_handle, TIMEOUT, 0, NULL, &hash, + NULL, NULL, &test_find_peer_stop, &p1); if (peer->find_peer_handle == NULL) - GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1); + GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); } /** @@ -174,21 +177,34 @@ void test_find_peer (void *cls, * @param cls closure * @param tc context information (why was this task triggered now) */ -void test_put (void *cls, - const struct GNUNET_SCHEDULER_TaskContext * tc) +void +test_get_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerContext *peer = cls; - GNUNET_HashCode hash; - char *data; - size_t data_size = 42; - memset(&hash, 42, sizeof(GNUNET_HashCode)); - data = GNUNET_malloc(data_size); - memset(data, 43, data_size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_put!\n"); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get_stop!\n"); + if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) + GNUNET_SCHEDULER_add_now (sched, &end_badly, NULL); + GNUNET_assert (peer->dht_handle != NULL); - GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_to_absolute(TIMEOUT), TIMEOUT, &test_find_peer, &p1); + GNUNET_DHT_get_stop (peer->get_handle, &test_find_peer, &p1); + + //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); + +} + +void +test_get_iterator (void *cls, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + uint32_t type, uint32_t size, const void *data) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "test_get_iterator called (we got a result), stopping get request!\n"); + GNUNET_SCHEDULER_add_continuation (sched, &test_get_stop, &p1, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } /** @@ -197,21 +213,23 @@ void test_put (void *cls, * @param cls closure * @param tc context information (why was this task triggered now) */ -void test_get_stop (void *cls, - const struct GNUNET_SCHEDULER_TaskContext * tc) +void +test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerContext *peer = cls; + GNUNET_HashCode hash; + memset (&hash, 42, sizeof (GNUNET_HashCode)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get_stop!\n"); - if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT) - GNUNET_SCHEDULER_add_now(sched, &end_badly, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get!\n"); GNUNET_assert (peer->dht_handle != NULL); - GNUNET_DHT_get_stop(peer->get_handle, &test_put, &p1); - - //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1); + peer->get_handle = + GNUNET_DHT_get_start (peer->dht_handle, TIMEOUT, 42, &hash, + &test_get_iterator, NULL, NULL, NULL); + if (peer->get_handle == NULL) + GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); } /** @@ -220,21 +238,25 @@ void test_get_stop (void *cls, * @param cls closure * @param tc context information (why was this task triggered now) */ -void test_get (void *cls, - const struct GNUNET_SCHEDULER_TaskContext * tc) +void +test_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct PeerContext *peer = cls; GNUNET_HashCode hash; - memset(&hash, 42, sizeof(GNUNET_HashCode)); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get!\n"); + char *data; + size_t data_size = 42; + memset (&hash, 42, sizeof (GNUNET_HashCode)); + data = GNUNET_malloc (data_size); + memset (data, 43, data_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_put!\n"); peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg, 100); + GNUNET_assert (peer->dht_handle != NULL); - peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, TIMEOUT, 42, &hash, NULL, NULL, &test_get_stop, &p1); + GNUNET_DHT_put (peer->dht_handle, &hash, 0, data_size, data, + GNUNET_TIME_relative_to_absolute (TIMEOUT), TIMEOUT, + &test_get, &p1); - if (peer->get_handle == NULL) - GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1); } static void @@ -264,11 +286,16 @@ run (void *cls, sched = s; die_task = GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1), &end_badly, NULL); + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_MINUTES, 1), + &end_badly, NULL); setup_peer (&p1, "test_dht_api_peer1.conf"); - GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_get, &p1); + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 1), &test_put, + &p1); } static int @@ -290,8 +317,7 @@ check () ok = 1; GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, - argv, "test-dht-api", "nohelp", - options, &run, &ok); + argv, "test-dht-api", "nohelp", options, &run, &ok); stop_arm (&p1); return ok; } @@ -314,7 +340,7 @@ main (int argc, char *argv[]) NULL); ret = check (); - //GNUNET_DISK_directory_remove ("/tmp/test-gnunetd-dht-peer-1"); + GNUNET_DISK_directory_remove ("/tmp/test-gnunetd-dht-peer-1"); return ret; } -- 2.25.1