From: Bart Polot Date: Wed, 14 Sep 2011 13:54:46 +0000 (+0000) Subject: Added code for handling a case when a PUT is received with a key for which there... X-Git-Tag: initial-import-from-subversion-38251~17114 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=89116db69e4e81337ddbd3c03e2bd50fe2bb8eab;p=oweals%2Fgnunet.git Added code for handling a case when a PUT is received with a key for which there is an active GET query. --- diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index c8d3a1afc..5ad1d296f 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am @@ -162,6 +162,7 @@ if ENABLE_TEST_RUN TESTS = test_dht_api $(check_SCRIPTS) \ test_dht_twopeer \ test_dht_twopeer_put_get \ + test_dht_twopeer_get_put \ test_dht_twopeer_path_tracking \ test_dht_multipeer \ test_dhtlog diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index a68ba5563..838744d9e 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -478,6 +478,11 @@ struct DHTRouteSource */ struct DHTRouteSource *prev; + /** + * UID of the request + */ + uint64_t uid; + /** * Source of the request. Replies should be forwarded to * this peer. @@ -2243,7 +2248,7 @@ handle_dht_get (const struct GNUNET_MessageHeader *msg, if (msize != sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size) { - GNUNET_break (0); + GNUNET_break_op (0); return 0; } end = (const char *) &get_msg[1]; @@ -2656,37 +2661,94 @@ handle_dht_put (const struct GNUNET_MessageHeader *msg, } #endif -// GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "******************************************************** PUT 1\n"); - record = - GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key); + record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, + &msg_ctx->key); if (NULL != record) { struct DHTRouteSource *pos; - struct GNUNET_DHT_GetMessage *gmsg; - size_t gsize; + struct GNUNET_DHT_GetResultMessage *get_result; + struct DHT_MessageContext *new_msg_ctx; + size_t get_size; -// GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "******************************************************** PUT 2\n"); pos = record->head; while (pos != NULL) { /* TODO: do only for local started requests? or also for remote peers? */ /* TODO: include this in statistics? under what? */ + /* TODO: reverse order of path_history? */ if (NULL == pos->client) + { + pos = pos->next; continue; + } - gsize = data_size + sizeof (struct GNUNET_DHT_GetMessage); - gmsg = GNUNET_malloc (gsize); - gmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); - gmsg->header.size = htons (gsize); - gmsg->type = put_msg->type; - memcpy (&gmsg[1], &put_msg[1], data_size); + /********** CODE ADAPTED FROM DATACHACHE_GET_ITERATOR BEGIN *************/ + new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); + memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext)); + if (GNUNET_DHT_RO_RECORD_ROUTE == + (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) + { + new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE; + new_msg_ctx->path_history_len = msg_ctx->path_history_len; + /* Assign to previous msg_ctx path history, caller should free after our return */ + new_msg_ctx->path_history = msg_ctx->path_history; +#if DEBUG_PATH + for (i = 0; i < new_msg_ctx->path_history_len; i++) + { + path_offset = + &new_msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)]; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "(put for active get) Key %s Found peer %d:%s\n", + GNUNET_h2s (&msg_ctx->key), i, + GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset)); + } +#endif + } - /* TODO: duplicate and reverse order of path_history? */ - send_reply_to_client (pos->client, &gmsg->header, msg_ctx); - GNUNET_free (gmsg); + get_size = + sizeof (struct GNUNET_DHT_GetResultMessage) + data_size + + (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); + get_result = GNUNET_malloc (get_size); + get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); + get_result->header.size = htons (get_size); + get_result->expiration = put_msg->expiration; + get_result->type = put_msg->type; + get_result->put_path_length = htons (msg_ctx->path_history_len); +#if DEBUG_PATH + path_offset = msg_ctx->path_history; + for (i = 0; i < msg_ctx->path_history_len; i++) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "(get_iterator PUT path) Key %s Found peer %d:%s\n", + GNUNET_h2s (&msg_ctx->key), i, + GNUNET_i2s ((struct GNUNET_PeerIdentity *) + &path_offset[i * + sizeof (struct + GNUNET_PeerIdentity)])); + } +#endif + /* Copy the actual data and the path_history to the end of the get result */ + memcpy (&get_result[1], &put_msg[1], data_size); + path_offset = (char *) &get_result[1]; + path_offset += data_size; + memcpy (path_offset, msg_ctx->path_history, + msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); + new_msg_ctx->peer = &my_identity; + new_msg_ctx->bloom = + GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); + new_msg_ctx->hop_count = 0; + /* Make result routing a higher priority */ + new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; + new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; + new_msg_ctx->unique_id = pos->uid; + send_reply_to_client(pos->client, &get_result->header, new_msg_ctx); + // GNUNET_CONTAINER_bloomfilter_free (new_msg_ctx->bloom); + GNUNET_free (new_msg_ctx); + GNUNET_free (get_result); + /********** CODE ADAPTED FROM DATACHACHE_GET_ITERATOR END ***************/ + pos = pos->next; } } -// GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "******************************************************** PUT END\n"); if (msg_ctx->closest != GNUNET_YES) { @@ -3133,8 +3195,8 @@ cache_response (struct DHT_MessageContext *msg_ctx) source_info); source_info->find_peers_responded = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - memcpy (&source_info->source, msg_ctx->peer, - sizeof (struct GNUNET_PeerIdentity)); + /* FIXME bart assign instead of memcpy (more explicit) */ + source_info->source = *msg_ctx->peer; GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail, source_info); if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */ @@ -3145,6 +3207,7 @@ cache_response (struct DHT_MessageContext *msg_ctx) source_info->hnode = GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info, now.abs_value); + source_info->uid = msg_ctx->unique_id; #if DEBUG_DHT > 1 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Created new forward source info for %s uid %llu\n", diff --git a/src/dht/test_dht_twopeer_get_put.c b/src/dht/test_dht_twopeer_get_put.c index ef491031c..a629cdfd8 100644 --- a/src/dht/test_dht_twopeer_get_put.c +++ b/src/dht/test_dht_twopeer_get_put.c @@ -551,7 +551,7 @@ check () int ret; /* Arguments for GNUNET_PROGRAM_run */ - char *const argv[] = { "test-dht-twopeer-put-get", /* Name to give running binary */ + char *const argv[] = { "test-dht-twopeer-get-put", /* Name to give running binary */ "-c", "test_dht_twopeer_data.conf", /* Config file to use */ #if VERBOSE @@ -565,7 +565,7 @@ check () /* Run the run function as a new program */ ret = GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv, - "test-dht-twopeer-put-get", "nohelp", options, &run, + "test-dht-twopeer-get-put", "nohelp", options, &run, &ok); if (ret != GNUNET_OK) {