From e86fd05ff7bbef8aa1741671163564fb35beafb5 Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Thu, 18 Mar 2010 16:32:41 +0000 Subject: [PATCH] getting dht closer to being this crazy meta dht thing --- src/dht/dht.h | 42 ++- src/dht/dht_api.c | 551 +++++++++++++++++++++++------------ src/dht/gnunet-service-dht.c | 179 ++++++------ 3 files changed, 480 insertions(+), 292 deletions(-) diff --git a/src/dht/dht.h b/src/dht/dht.h index 82b0df9a2..0cfd9b3bf 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -32,6 +32,24 @@ typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls, struct GNUNET_MessageHeader *msg); +/** + * Generic DHT message, wrapper for other message types + */ +struct GNUNET_DHT_StopMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE + */ + struct GNUNET_MessageHeader header; + + /** + * Unique ID identifying this request + */ + uint64_t unique_id; + +}; + + /** * Generic DHT message, wrapper for other message types */ @@ -58,12 +76,18 @@ struct GNUNET_DHT_Message uint16_t options; /** - * Is this message uniquely identified? If so it has - * a unique_id appended to it. + * Is this message uniquely identified? If so it will + * be fire and forget, if not we will wait for a receipt + * from the service. */ - /* uint16_t unique; I don't think we need this, it should be held in the encapsulated message */ + uint16_t unique; + + + /** + * Unique ID identifying this request + */ + uint64_t unique_id; - /* uint64_t unique_id*/ /* */ /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */ @@ -112,11 +136,6 @@ struct GNUNET_DHT_GetMessage */ size_t type; - /** - * The key to search for - */ - GNUNET_HashCode key; - }; /** @@ -156,11 +175,6 @@ struct GNUNET_DHT_FindPeerMessage */ struct GNUNET_MessageHeader header; - /** - * The key being looked up - */ - GNUNET_HashCode key; - }; /** diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index a55508b07..7ded088d8 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -23,6 +23,13 @@ * @brief library to access the DHT service * @author Christian Grothoff * @author Nathan Evans + * + * TODO: Only allow a single message until confirmed as received by + * the service. For put messages call continuation as soon as + * receipt acknowledged (then remove), for GET or other messages + * only call continuation when data received. + * Add unique identifier to message types requesting data to be + * returned. */ #include "platform.h" #include "gnunet_bandwidth_lib.h" @@ -39,13 +46,10 @@ #define DEBUG_DHT_API GNUNET_YES -struct PendingMessages -{ - /** - * Linked list of pending messages - */ - struct PendingMessages *next; +#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +struct PendingMessage +{ /** * Message that is pending */ @@ -56,8 +60,109 @@ struct PendingMessages */ struct GNUNET_TIME_Relative timeout; + /** + * Continuation to call on message send + * or message receipt confirmation + */ + GNUNET_DHT_MessageCallback cont; + + /** + * Continuation closure + */ + void *cont_cls; + + /** + * Whether or not to await verification the message + * was received by the service + */ + size_t is_unique; + + /** + * Unique ID for this request + */ + uint64_t unique_id; + +}; + +struct GNUNET_DHT_GetContext +{ + + + /** + * Iterator to call on data receipt + */ + GNUNET_DHT_GetIterator iter; + + /** + * Closure for the iterator callback + */ + void *iter_cls; + }; +/** + * Handle to control a unique operation (one that is + * expected to return results) + */ +struct GNUNET_DHT_RouteHandle +{ + + /** + * Unique identifier for this request (for key collisions) + */ + uint64_t uid; + + /** + * Key that this get request is for + */ + GNUNET_HashCode key; + + /** + * Iterator to call on data receipt + */ + GNUNET_DHT_ReplyProcessor iter; + + /** + * Closure for the iterator callback + */ + void *iter_cls; + + /** + * Main handle to this DHT api + */ + struct GNUNET_DHT_Handle *dht_handle; +}; + +/** + * Handle for a non unique request, holds callback + * which needs to be called before we allow other + * messages to be processed and sent to the DHT service + */ +struct GNUNET_DHT_NonUniqueHandle +{ + /** + * Key that this get request is for + */ + GNUNET_HashCode key; + + /** + * Type of data get request was for + */ + uint32_t type; + + /** + * Continuation to call on service + * confirmation of message receipt. + */ + GNUNET_SCHEDULER_Task cont; + + /** + * Send continuation cls + */ + void *cont_cls; +}; + + /** * Connection to the DHT service. */ @@ -84,25 +189,24 @@ struct GNUNET_DHT_Handle struct GNUNET_CLIENT_TransmitHandle *th; /** - * List of the currently pending messages for the DHT service. - */ - struct PendingMessages *pending_list; - - /** - * Message we are currently sending. + * Message we are currently sending, only allow + * a single message to be queued. If not unique + * (typically a put request), await a confirmation + * from the service that the message was received. + * If unique, just fire and forget. */ - struct PendingMessages *current; + struct PendingMessage *current; /** - * Hash map containing the current outstanding get requests + * Hash map containing the current outstanding unique requests */ - struct GNUNET_CONTAINER_MultiHashMap *outstanding_get_requests; + struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; /** - * Hash map containing the current outstanding put requests, awaiting - * a response + * Non unique handle. If set don't schedule another non + * unique request. */ - struct GNUNET_CONTAINER_MultiHashMap *outstanding_put_requests; + struct GNUNET_DHT_NonUniqueHandle *non_unique_request; /** * Kill off the connection and any pending messages. @@ -116,6 +220,27 @@ static struct GNUNET_TIME_Relative default_request_timeout; /* Forward declaration */ static void process_pending_message(struct GNUNET_DHT_Handle *handle); +static GNUNET_HashCode * hash_from_uid(uint64_t uid) +{ + int count; + int remaining; + GNUNET_HashCode *hash; + hash = GNUNET_malloc(sizeof(GNUNET_HashCode)); + count = 0; + + while (count < sizeof(GNUNET_HashCode)) + { + remaining = sizeof(GNUNET_HashCode) - count; + if (remaining > sizeof(uid)) + remaining = sizeof(uid); + + memcpy(hash, &uid, remaining); + count += remaining; + } + + return hash; +} + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types @@ -124,9 +249,52 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle); void service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { - + struct GNUNET_DHT_Handle *handle = cls; + struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)msg; + struct GNUNET_MessageHeader *enc_msg; + struct GNUNET_DHT_RouteHandle *route_handle; + GNUNET_HashCode *uid_hash; + size_t enc_size; /* TODO: find out message type, handle callbacks for different types of messages. - * Should be a put acknowledgment, get data or find node result. */ + * Should be a non unique acknowledgment, or unique result. */ + +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu)\n", "DHT API", ntohl(dht_msg->unique_id)); +#endif + + if (ntohs(dht_msg->unique)) + { + uid_hash = hash_from_uid(ntohl(dht_msg->unique_id)); + + route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash); + if (route_handle == NULL) /* We have no recollection of this request */ + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id)); +#endif + } + else + { + enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message); + GNUNET_assert(enc_size > 0); + enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; + route_handle->iter(route_handle->iter_cls, enc_msg); + } + } + else + { + if (handle->current->unique_id == ntohl(dht_msg->unique_id)) + { + handle->current->cont(handle->current->cont_cls, GNUNET_OK); + GNUNET_free(handle->current->msg); + handle->current = NULL; + GNUNET_free(handle->current); + } + } + + } @@ -151,14 +319,14 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); handle->cfg = cfg; handle->sched = sched; - handle->pending_list = NULL; + handle->current = NULL; handle->do_destroy = GNUNET_NO; handle->th = NULL; handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg); - handle->outstanding_get_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */ - handle->outstanding_put_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */ + handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len); + if (handle->client == NULL) return NULL; #if DEBUG_DHT_API @@ -181,7 +349,6 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, void GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) { - struct PendingMessages *pos; #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); @@ -196,11 +363,6 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) if (handle->current != NULL) /* We are trying to send something now, clean it up */ GNUNET_free(handle->current); - while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */ - { - handle->pending_list = pos->next; - GNUNET_free(pos); - } if (handle->client != NULL) /* Finally, disconnect from the service */ { GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); @@ -211,64 +373,6 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) } -/** - * Handle to control a GET operation. - */ -struct GNUNET_DHT_GetHandle -{ - - /** - * Key that this get request is for - */ - GNUNET_HashCode key; - - /** - * Type of data get request was for - */ - uint32_t type; - - /** - * Iterator to call on data receipt - */ - GNUNET_DHT_Iterator iter; - - /** - * Closure for the iterator callback - */ - void *iter_cls; - - /** - * Main handle to this DHT api - */ - struct GNUNET_DHT_Handle *dht_handle; -}; - -/** - * Handle for a PUT request, holds callback - */ -struct GNUNET_DHT_PutHandle -{ - /** - * Key that this get request is for - */ - GNUNET_HashCode key; - - /** - * Type of data get request was for - */ - uint32_t type; - - /** - * Continuation to call on put send - */ - GNUNET_SCHEDULER_Task cont; - - /** - * Send continuation cls - */ - void *cont_cls; -}; - /** * Send complete (or failed), schedule next (or don't) */ @@ -276,32 +380,20 @@ static void finish (struct GNUNET_DHT_Handle *handle, int code) { /* TODO: if code is not GNUNET_OK, do something! */ - struct PendingMessages *pos = handle->current; - struct GNUNET_DHT_GetMessage *get; - struct GNUNET_DHT_PutMessage *put; + struct PendingMessage *pos = handle->current; GNUNET_assert(pos != NULL); - switch (ntohs(pos->msg->type)) - { - case GNUNET_MESSAGE_TYPE_DHT_GET: - get = (struct GNUNET_DHT_GetMessage *)pos->msg; - GNUNET_free(get); - break; - case GNUNET_MESSAGE_TYPE_DHT_PUT: - put = (struct GNUNET_DHT_PutMessage *)pos->msg; - GNUNET_free(put); - break; - default: - GNUNET_break(0); - } - - handle->current = NULL; - - if (code != GNUNET_SYSERR) - process_pending_message (handle); + if (pos->is_unique) + { + if (pos->cont != NULL) + pos->cont(pos->cont_cls, code); - GNUNET_free(pos); + GNUNET_free(pos->msg); + handle->current = NULL; + GNUNET_free(pos); + } + /* Otherwise we need to wait for a response to this message! */ } /** @@ -389,14 +481,6 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle) //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */ } - /* schedule next action */ - handle->current = handle->pending_list; - if (NULL == handle->current) - { - return; - } - handle->pending_list = handle->pending_list->next; - handle->current->next = NULL; if (NULL == (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, @@ -418,39 +502,120 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle) } /** - * Add a pending message to the linked list of messages which need to be sent + * Iterator called on each result obtained from a generic route + * operation + */ +void get_reply_iterator (void *cls, + const struct GNUNET_MessageHeader *reply) +{ + +} + +/** + * Perform an asynchronous FIND_PEER operation on the DHT. * - * @param handle handle to the specified DHT api - * @param msg the message to add to the list + * @param handle handle to the DHT service + * @param key the key to look up + * @param desired_replication_level how many peers should ultimately receive + * this message (advisory only, target may be too high for the + * given DHT or not hit exactly). + * @param options options for routing + * @param enc send the encapsulated message to a peer close to the key + * @param iter function to call on each result, NULL if no replies are expected + * @param iter_cls closure for iter + * @param timeout when to abort with an error if we fail to get + * a confirmation for the PUT from the local DHT service + * @param cont continuation to call when done; + * reason will be TIMEOUT on error, + * reason will be PREREQ_DONE on success + * @param cont_cls closure for cont + * @return handle to stop the request */ -static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageHeader *msg) +struct GNUNET_DHT_RouteHandle * +GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, + const GNUNET_HashCode *key, + unsigned int desired_replication_level, + enum GNUNET_DHT_RouteOption options, + const struct GNUNET_MessageHeader *enc, + struct GNUNET_TIME_Relative timeout, + GNUNET_DHT_ReplyProcessor iter, + void *iter_cls, + GNUNET_DHT_MessageCallback cont, + void *cont_cls) { - struct PendingMessages *new_message; - struct PendingMessages *pos; - struct PendingMessages *last; + struct GNUNET_DHT_RouteHandle *route_handle; + struct PendingMessage *pending; + struct GNUNET_DHT_Message *message; + size_t is_unique; + size_t msize; + GNUNET_HashCode *uid_key; + int count; - new_message = GNUNET_malloc(sizeof(struct PendingMessages)); - new_message->msg = msg; - new_message->timeout = default_request_timeout; + is_unique = GNUNET_YES; + if (iter == NULL) + is_unique = GNUNET_NO; - if (handle->pending_list != NULL) + route_handle = NULL; + + if (is_unique) { - pos = handle->pending_list; - while(pos != NULL) + route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle)); + memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode)); + route_handle->iter = iter; + route_handle->iter_cls = iter_cls; + route_handle->dht_handle = handle; + route_handle->uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); + + count = 0; + uid_key = hash_from_uid(route_handle->uid); + /* While we have an outstanding request with the same identifier! */ + while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES) { - last = pos; - pos = pos->next; + GNUNET_free(uid_key); + uid_key = hash_from_uid(route_handle->uid); } - new_message->next = last->next; /* Should always be null */ - last->next = new_message; + /** + * Store based on random identifier! + */ + GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size) + sizeof(route_handle->uid); + GNUNET_free(uid_key); } else { - new_message->next = handle->pending_list; /* Will always be null */ - handle->pending_list = new_message; + msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size); } + message = GNUNET_malloc(msize); + message->header.size = htons(msize); + message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT); + memcpy(&message->key, key, sizeof(GNUNET_HashCode)); + message->options = htons(options); + message->desired_replication_level = htons(options); + message->unique = htons(is_unique); + + pending = GNUNET_malloc(sizeof(struct PendingMessage)); + pending->msg = &message->header; + pending->timeout = timeout; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->is_unique = is_unique; + + GNUNET_assert(handle->current == NULL); + process_pending_message(handle); + + return route_handle; +} + +void +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph); + + +void dht_get_processor (void *cls, + const struct GNUNET_MessageHeader *reply) +{ + } /** @@ -463,48 +628,76 @@ static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageH * @param iter_cls closure for iter * @return handle to stop the async get */ -struct GNUNET_DHT_GetHandle * +struct GNUNET_DHT_RouteHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, + struct GNUNET_TIME_Relative timeout, uint32_t type, const GNUNET_HashCode * key, - GNUNET_DHT_Iterator iter, + GNUNET_DHT_GetIterator iter, void *iter_cls) { + struct GNUNET_DHT_GetContext *get_context; struct GNUNET_DHT_GetMessage *get_msg; - struct GNUNET_DHT_GetHandle *get_handle; - - get_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_get_requests, key); - if (get_handle != NULL) - { - /* - * A get has been previously sent, return existing handle. - * FIXME: should we re-transmit the request to the DHT service? - */ - return get_handle; - } + if (handle->current != NULL) /* Can't send right now, we have a pending message... */ + return NULL; - get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle)); - get_handle->type = type; - memcpy(&get_handle->key, key, sizeof(GNUNET_HashCode)); - get_handle->iter = iter; - get_handle->iter_cls = iter_cls; + get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext)); + get_context->iter = iter; + get_context->iter_cls = iter_cls; #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key)); #endif - GNUNET_CONTAINER_multihashmap_put(handle->outstanding_get_requests, key, get_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET); get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); get_msg->type = htonl(type); - memcpy(&get_msg->key, key, sizeof(GNUNET_HashCode)); - add_pending(handle, &get_msg->header); + return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, NULL, NULL); - return get_handle; +} + + +void +GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle) +{ + struct PendingMessage *pending; + struct GNUNET_DHT_StopMessage *message; + size_t msize; + GNUNET_HashCode *uid_key; + + msize = sizeof(struct GNUNET_DHT_StopMessage); + + message = GNUNET_malloc(msize); + message->header.size = htons(msize); + message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP); + message->unique_id = htonl(route_handle->uid); + + pending = GNUNET_malloc(sizeof(struct PendingMessage)); + pending->msg = (struct GNUNET_MessageHeader *)message; + pending->timeout = DEFAULT_DHT_TIMEOUT; + pending->cont = NULL; + pending->cont_cls = NULL; + pending->is_unique = GNUNET_NO; + + GNUNET_assert(route_handle->dht_handle->current == NULL); + + process_pending_message(route_handle->dht_handle); + + uid_key = hash_from_uid(route_handle->uid); + + if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES) + { +#if DEBUG_DHT_API + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid); +#endif + } + + return; } @@ -514,27 +707,33 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, * @param record GET operation to stop. */ void -GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) +GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *handle) { +#if OLDREMOVE struct GNUNET_DHT_GetMessage *get_msg; struct GNUNET_DHT_Handle *handle; + GNUNET_HashCode *uid_key; +#endif + + GNUNET_DHT_route_stop(handle); + +#if OLDREMOVE + uid_key = hash_from_uid(get_handle->uid); + GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES); if (handle->do_destroy == GNUNET_NO) { get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP); get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); - get_msg->type = htonl(get_handle->type); - memcpy(&get_msg->key, &get_handle->key, sizeof(GNUNET_HashCode)); - add_pending(handle, &get_msg->header); + } +#endif #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Removing pending get request with key %s\n", "DHT API", GNUNET_h2s(&get_handle->key)); + "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&handle->key), handle->uid); #endif - GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_get_requests, &get_handle->key, get_handle) == GNUNET_YES); - GNUNET_free(get_handle); } @@ -562,44 +761,30 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const char *data, struct GNUNET_TIME_Absolute exp, struct GNUNET_TIME_Relative timeout, - GNUNET_SCHEDULER_Task cont, + GNUNET_DHT_MessageCallback cont, void *cont_cls) { struct GNUNET_DHT_PutMessage *put_msg; - struct GNUNET_DHT_PutHandle *put_handle; size_t msize; - put_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_put_requests, key); - - if (put_handle != NULL) + if (handle->current != NULL) { - /* - * A put has been previously queued, but not yet sent. - * FIXME: change the continuation function and callback or something? - */ + cont(cont_cls, GNUNET_SYSERR); return; } - put_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_PutHandle)); - put_handle->type = type; - memcpy(&put_handle->key, key, sizeof(GNUNET_HashCode)); - #if DEBUG_DHT_API GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key)); #endif - GNUNET_CONTAINER_multihashmap_put(handle->outstanding_put_requests, key, put_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - msize = sizeof(struct GNUNET_DHT_PutMessage) + size; put_msg = GNUNET_malloc(msize); put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); put_msg->header.size = htons(msize); put_msg->type = htonl(type); - memcpy(&put_msg->key, key, sizeof(GNUNET_HashCode)); memcpy(&put_msg[1], data, size); - add_pending(handle, &put_msg->header); + GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls); - return; } diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 5bbb2e739..215d39c44 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -88,42 +88,15 @@ struct ClientList /** * Server handler for initiating local dht get requests */ -static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message); - -/** - * Server handler for stopping local dht get requests - */ -static void handle_dht_get_stop (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message); - -/** - * Server handler for initiating local dht find peer requests - */ -static void handle_dht_find_peer (void *cls, struct GNUNET_SERVER_Client * - client, const struct GNUNET_MessageHeader * - message); - -/** - * Server handler for stopping local dht find peer requests - */ -static void handle_dht_find_peer_stop (void *cls, struct GNUNET_SERVER_Client * - client, const struct GNUNET_MessageHeader * - message); - -/** - * Server handler for initiating local dht put requests - */ -static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message); - +static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client, + const struct GNUNET_MessageHeader *message); static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { - {&handle_dht_get, NULL, GNUNET_MESSAGE_TYPE_DHT_GET, 0}, - {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0}, + {&handle_dht_plugin_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, +/* {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0}, {&handle_dht_put, NULL, GNUNET_MESSAGE_TYPE_DHT_PUT, 0}, {&handle_dht_find_peer, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0}, - {&handle_dht_find_peer_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_STOP, 0}, + {&handle_dht_find_peer_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_STOP, 0},*/ {NULL, NULL, 0, 0} }; @@ -163,18 +136,16 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = { }; + /** * Server handler for initiating local dht get requests */ -static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message) +static void handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, GNUNET_HashCode *key) { - struct GNUNET_DHT_GetMessage *get_msg = (struct GNUNET_DHT_GetMessage *)message; GNUNET_HashCode get_key; size_t get_type; GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage)); - memcpy(&get_key, &get_msg->key, sizeof(GNUNET_HashCode)); get_type = ntohs(get_msg->type); #if DEBUG_DHT @@ -182,92 +153,38 @@ static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client, "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET", get_type, GNUNET_h2s(&get_key)); #endif - /* FIXME: Implement get stop functionality here */ - + /* FIXME: Implement get functionality here */ } -/** - * Server handler for stopping local dht get requests - */ -static void handle_dht_get_stop (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_DHT_GetMessage *get_msg = (struct GNUNET_DHT_GetMessage *)message; /* Get message and get stop message are the same except for type */ - GNUNET_HashCode get_key; - size_t get_type; - - GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage)); - - memcpy(&get_key, &get_msg->key, sizeof(GNUNET_HashCode)); - get_type = ntohs(get_msg->type); - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET STOP", get_type, GNUNET_h2s(&get_key)); -#endif - - /* FIXME: Implement get stop functionality here */ - -} /** * Server handler for initiating local dht find peer requests */ -static void handle_dht_find_peer (void *cls, struct GNUNET_SERVER_Client * - client, const struct GNUNET_MessageHeader * - message) +static void handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, GNUNET_HashCode *key) { - struct GNUNET_DHT_FindPeerMessage *find_msg = (struct GNUNET_DHT_FindPeerMessage *)message; - struct GNUNET_PeerIdentity peer; GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct GNUNET_DHT_FindPeerMessage)); - memcpy(&peer, &find_msg->peer, sizeof(struct GNUNET_PeerIdentity)); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, peer id %s\n", "DHT", "FIND PEER", GNUNET_i2s(&peer)); + "`%s': Received `%s' request from client, key %s\n", "DHT", "FIND PEER", GNUNET_h2s(key)); #endif /* FIXME: Implement find peer functionality here */ } -/** - * Server handler for stopping local dht find peer requests - */ -static void handle_dht_find_peer_stop (void *cls, struct GNUNET_SERVER_Client * - client, const struct GNUNET_MessageHeader * - message) -{ - struct GNUNET_DHT_FindPeerMessage *find_msg = (struct GNUNET_DHT_FindPeerMessage *)message; /* Find peer stop message is identical to find peer message */ - struct GNUNET_PeerIdentity peer; - - GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct GNUNET_DHT_FindPeerMessage)); - memcpy(&peer, &find_msg->peer, sizeof(struct GNUNET_PeerIdentity)); - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, for peer id %s\n", "DHT", "FIND PEER STOP", GNUNET_i2s(&peer)); -#endif - - /* FIXME: Implement find peer stop functionality here */ - -} /** * Server handler for initiating local dht put requests */ -static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client, - const struct GNUNET_MessageHeader *message) +static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GNUNET_HashCode *key) { - struct GNUNET_DHT_PutMessage *put_msg = (struct GNUNET_DHT_PutMessage *)message; - GNUNET_HashCode put_key; size_t put_type; size_t data_size; char *data; GNUNET_assert(ntohs(put_msg->header.size) >= sizeof(struct GNUNET_DHT_PutMessage)); - memcpy(&put_key, &put_msg->key, sizeof(GNUNET_HashCode)); put_type = ntohs(put_msg->type); data_size = ntohs(put_msg->data_size); GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct GNUNET_DHT_PutMessage) + data_size); @@ -276,7 +193,7 @@ static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client, #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(&put_key)); + "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(key)); #endif /** @@ -284,7 +201,79 @@ static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client, */ GNUNET_free(data); +} + + +static void +handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg) +{ + struct GNUNET_MessageHeader *enc_msg; + size_t enc_type; + + enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1]; + enc_type = ntohs(enc_msg->type); + + +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), ntohl(dht_msg->unique_id)); +#endif + + /* FIXME: Implement demultiplexing functionality here */ + switch (enc_type) + { + case GNUNET_MESSAGE_TYPE_DHT_GET: + handle_dht_get(cls, (struct GNUNET_DHT_GetMessage *)enc_msg, &dht_msg->key); + break; + case GNUNET_MESSAGE_TYPE_DHT_PUT: + handle_dht_put(cls, (struct GNUNET_DHT_PutMessage *)enc_msg, &dht_msg->key); + break; + case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: + handle_dht_find_peer(cls, (struct GNUNET_DHT_FindPeerMessage *)enc_msg, &dht_msg->key); + break; + default: +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Message type (%d) not handled\n", "DHT", enc_type); +#endif + } + +} + + +static void +handle_dht_stop_message(void *cls, struct GNUNET_DHT_StopMessage *dht_stop_msg) +{ + +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", ntohl(dht_stop_msg->unique_id)); +#endif +} + + + +/** + * Server handler for initiating local dht get requests + */ +static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client, + const struct GNUNET_MessageHeader *message) +{ + + #if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "`%s': Received `%s' request from client, message type %d, size %d\n", "DHT", "GENERIC", ntohs(message->type), ntohs(message->size)); +#endif + + switch(ntohs(message->type)) + { + case GNUNET_MESSAGE_TYPE_DHT: + handle_dht_start_message(cls, (struct GNUNET_DHT_Message *)message); + case GNUNET_MESSAGE_TYPE_DHT_STOP: + handle_dht_stop_message(cls, (struct GNUNET_DHT_StopMessage *)message); + } + GNUNET_SERVER_receive_done(client, GNUNET_OK); } /** -- 2.25.1