From 3b2b7374d2d31f7b2638c82711f40f7113e9f7f0 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 10 Nov 2012 21:21:01 +0000 Subject: [PATCH] -implementing #2435 --- src/dht/dht.h | 35 ++++++- src/dht/dht_api.c | 131 +++++++++++++++++++++++++-- src/dht/gnunet-service-dht_clients.c | 103 ++++++++++++++++++++- src/include/gnunet_dht_service.h | 24 ++++- src/include/gnunet_protocols.h | 5 + 5 files changed, 284 insertions(+), 14 deletions(-) diff --git a/src/dht/dht.h b/src/dht/dht.h index 772471a7c..f736c8d75 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -108,6 +108,39 @@ struct GNUNET_DHT_ClientGetMessage }; +/** + * DHT GET RESULTS KNOWN message sent from clients to service. Indicates that a GET + * request should exclude certain results which are already known. + */ +struct GNUNET_DHT_ClientGetResultSeenMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN + */ + struct GNUNET_MessageHeader header; + + /** + * Reserved, always 0. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * The key we are searching for (to make it easy to find the corresponding + * GET inside the service). + */ + struct GNUNET_HashCode key; + + /** + * Unique ID identifying this request. + */ + uint64_t unique_id GNUNET_PACKED; + + /* Followed by an array of the hash codes of known results */ + +}; + + + /** * Reply to a GET send from the service to a client. */ @@ -325,7 +358,7 @@ struct GNUNET_DHT_MonitorStartStopMessage struct GNUNET_DHT_MonitorGetMessage { /** - * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET */ struct GNUNET_MessageHeader header; diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 7964ba98f..f46900778 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -172,16 +172,43 @@ struct GNUNET_DHT_GetHandle */ struct PendingMessage *message; + /** + * Array of hash codes over the results that we have already + * seen. + */ + struct GNUNET_HashCode *seen_results; + /** * Key that this get request is for */ - struct GNUNET_HashCode key; + struct GNUNET_HashCode key; /** * Unique identifier for this request (for key collisions). */ uint64_t unique_id; + /** + * Size of the 'seen_results' array. Note that not + * all positions might be used (as we over-allocate). + */ + unsigned int seen_results_size; + + /** + * Offset into the 'seen_results' array marking the + * end of the positions that are actually used. + */ + unsigned int seen_results_end; + + /** + * Offset into the 'seen_results' array marking the + * position up to where we've send the hash codes to + * the DHT for blocking (needed as we might not be + * able to send all hash codes at once). + */ + unsigned int seen_results_transmission_offset; + + }; @@ -352,6 +379,50 @@ try_connect (struct GNUNET_DHT_Handle *handle) } +/** + * Queue messages to DHT to block certain results from the result set. + * + * @param get_handle GET to generate messages for. + */ +static void +queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle) +{ + struct PendingMessage *pm; + struct GNUNET_DHT_ClientGetResultSeenMessage *msg; + uint16_t msize; + unsigned int delta; + unsigned int max; + + while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end) + { + delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset; + max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); + if (delta > max) + delta = max; + msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode); + + pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1]; + pm->msg = &msg->header; + pm->handle = get_handle->dht_handle; + pm->unique_id = get_handle->unique_id; + pm->free_on_send = GNUNET_YES; + pm->in_pending_queue = GNUNET_YES; + msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN); + msg->header.size = htons (msize); + msg->key = get_handle->key; + msg->unique_id = get_handle->unique_id; + memcpy (&msg[1], + &get_handle->seen_results[get_handle->seen_results_transmission_offset], + sizeof (struct GNUNET_HashCode) * delta); + get_handle->seen_results_transmission_offset += delta; + GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head, + get_handle->dht_handle->pending_tail, + pm); + } +} + + /** * Add the request corresponding to the given route handle * to the pending queue (if it is not already in there). @@ -365,16 +436,18 @@ static int add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value) { struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_GetHandle *rh = value; + struct GNUNET_DHT_GetHandle *get_handle = value; - if (GNUNET_NO == rh->message->in_pending_queue) + if (GNUNET_NO == get_handle->message->in_pending_queue) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key), handle); + get_handle->seen_results_transmission_offset = 0; GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, - rh->message); - rh->message->in_pending_queue = GNUNET_YES; + get_handle->message); + queue_filter_messages (get_handle); + get_handle->message->in_pending_queue = GNUNET_YES; } return GNUNET_YES; } @@ -578,6 +651,7 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value) struct GNUNET_DHT_GetHandle *get_handle = value; const struct GNUNET_PeerIdentity *put_path; const struct GNUNET_PeerIdentity *get_path; + struct GNUNET_HashCode hc; uint32_t put_path_length; uint32_t get_path_length; size_t data_length; @@ -614,6 +688,17 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value) put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1]; get_path = &put_path[put_path_length]; data = &get_path[get_path_length]; + /* remember that we've seen this result */ + GNUNET_CRYPTO_hash (data, data_length, &hc); + if (get_handle->seen_results_size == get_handle->seen_results_end) + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_size, + get_handle->seen_results_size * 2 + 1); + GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset); + get_handle->seen_results[get_handle->seen_results_end++] = hc; + /* no need to block it explicitly, service already knows about it! */ + get_handle->seen_results_transmission_offset++; + get_handle->iter (get_handle->iter_cls, GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key, get_path, get_path_length, put_path, put_path_length, @@ -1194,6 +1279,38 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, } + +/** + * Tell the DHT not to return any of the following known results + * to this client. + * + * @param get_handle get operation for which results should be filtered + * @param num_results number of results to be blocked that are + * provided in this call (size of the 'results' array) + * @param results array of hash codes over the 'data' of the results + * to be blocked + */ +void +GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, + unsigned int num_results, + const struct GNUNET_HashCode *results) +{ + unsigned int needed; + + needed = get_handle->seen_results_end + num_results; + if (needed > get_handle->seen_results_size) + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_size, + needed); + memcpy (&get_handle->seen_results[get_handle->seen_results_end], + results, + num_results * sizeof (struct GNUNET_HashCode)); + get_handle->seen_results_end += num_results; + queue_filter_messages (get_handle); + process_pending_messages (get_handle->dht_handle); +} + + /** * Stop async DHT-get. * @@ -1242,8 +1359,10 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) get_handle->message->in_pending_queue = GNUNET_NO; } GNUNET_free (get_handle->message); + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_end, + 0); GNUNET_free (get_handle); - process_pending_messages (handle); } diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index 1d2e1e9bb..593674569 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -551,9 +551,7 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, /** - * Handler for any generic DHT messages, calls the appropriate handler - * depending on message type, sends confirmation if responses aren't otherwise - * expected. + * Handler for DHT GET messages from the client. * * @param cls closure for the service * @param client the client we received this message from @@ -620,6 +618,103 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, } +/** + * Closure for 'find_by_unique_id'. + */ +struct FindByUniqueIdContext +{ + /** + * Where to store the result, if found. + */ + struct ClientQueryRecord *cqr; + + uint64_t unique_id; +}; + + +/** + * Function called for each existing DHT record for the given + * query. Checks if it matches the UID given in the closure + * and if so returns the entry as a result. + * + * @param cls the search context + * @param key query for the lookup (not used) + * @param value the 'struct ClientQueryRecord' + * @return GNUNET_YES to continue iteration (result not yet found) + */ +static int +find_by_unique_id (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct FindByUniqueIdContext *fui_ctx = cls; + struct ClientQueryRecord *cqr = value; + + if (cqr->unique_id != fui_ctx->unique_id) + return GNUNET_YES; + fui_ctx->cqr = cqr; + return GNUNET_NO; +} + + +/** + * Handler for "GET result seen" messages from the client. + * + * @param cls closure for the service + * @param client the client we received this message from + * @param message the actual message received + */ +static void +handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; + uint16_t size; + unsigned int hash_count; + unsigned int old_count; + const struct GNUNET_HashCode *hc; + struct FindByUniqueIdContext fui_ctx; + struct ClientQueryRecord *cqr; + + size = ntohs (message->size); + if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message; + hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); + if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + hc = (const struct GNUNET_HashCode*) &seen[1]; + fui_ctx.unique_id = seen->unique_id; + fui_ctx.cqr = NULL; + GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, + &seen->key, + &find_by_unique_id, + &fui_ctx); + if (NULL == (cqr = fui_ctx.cqr)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + /* finally, update 'seen' list */ + old_count = cqr->seen_replies_count; + GNUNET_array_grow (cqr->seen_replies, + cqr->seen_replies_count, + cqr->seen_replies_count + hash_count); + memcpy (&cqr->seen_replies[old_count], + hc, + sizeof (struct GNUNET_HashCode) * hash_count); +} + + /** * Closure for 'remove_by_unique_id'. */ @@ -1350,6 +1445,8 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) {&handle_dht_local_monitor_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, + {&handle_dht_local_get_result_seen, NULL, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0}, {NULL, NULL, 0, 0} }; forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h index 89f42acb4..0de7551a4 100644 --- a/src/include/gnunet_dht_service.h +++ b/src/include/gnunet_dht_service.h @@ -236,13 +236,29 @@ typedef void (*GNUNET_DHT_GetIterator) (void *cls, */ struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, - enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_HashCode *key, uint32_t desired_replication_level, - enum GNUNET_DHT_RouteOption options, const void *xquery, - size_t xquery_size, GNUNET_DHT_GetIterator iter, - void *iter_cls); + enum GNUNET_DHT_RouteOption options, + const void *xquery, size_t xquery_size, + GNUNET_DHT_GetIterator iter, void *iter_cls); +/** + * Tell the DHT not to return any of the following known results + * to this client. + * + * @param get_handle get operation for which results should be filtered + * @param num_results number of results to be blocked that are + * provided in this call (size of the 'results' array) + * @param results array of hash codes over the 'data' of the results + * to be blocked + */ +void +GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, + unsigned int num_results, + const struct GNUNET_HashCode *results); + /** * Stop async DHT-get. Frees associated resources. * diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index df123ceb1..8abaf35b4 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -558,6 +558,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK 155 +/** + * Certain results are already known to the client, filter those. + */ +#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN 156 + /******************************************************************************* * HOSTLIST message types -- 2.25.1