From: Nathan S. Evans Date: Mon, 30 Aug 2010 16:33:42 +0000 (+0000) Subject: assorted dht changes, fixes, etc. X-Git-Tag: initial-import-from-subversion-38251~20515 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=810b7ad575e584d4a3fa9923d2c8df85976739d5;p=oweals%2Fgnunet.git assorted dht changes, fixes, etc. --- diff --git a/src/dht/dht.h b/src/dht/dht.h index 0080bec8f..c73a07a66 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -61,6 +61,8 @@ #define STAT_GET_RESPONSE_START "# DHT GET Responses Initiated" #define STAT_HELLOS_PROVIDED "# HELLO Messages given to transport" #define STAT_DISCONNECTS "# Disconnects received" +#define STAT_DUPLICATE_UID "# Duplicate UID's encountered (bad if any!)" +#define STAT_RECENT_SEEN "# recent requests seen again (routing loops, alternate paths)" typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls, const struct GNUNET_MessageHeader diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index cb2adf978..4d671148d 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -871,6 +871,55 @@ int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequ return GNUNET_YES; } +/** + * Send a message to the DHT telling it to issue a single find + * peer request using the peers unique identifier as key. This + * is used to fill the routing table, and is normally controlled + * by the DHT itself. However, for testing and perhaps more + * close control over the DHT, this can be explicitly managed. + * + * @param handle handle to the DHT service + * @param cont continuation to call once the message is sent + * @param cont_cls closure for continuation + * + * @return GNUNET_YES if the control message was sent, GNUNET_NO if not + */ +int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle, + GNUNET_SCHEDULER_Task cont, void *cont_cls) +{ + struct GNUNET_DHT_ControlMessage *msg; + struct PendingMessage *pending; + + if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) + return GNUNET_NO; + + msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage)); + msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL); + msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); + + pending = GNUNET_malloc (sizeof (struct PendingMessage)); + pending->msg = &msg->header; + pending->timeout = GNUNET_TIME_relative_get_forever(); + pending->free_on_send = GNUNET_YES; + pending->cont = cont; + pending->cont_cls = cont_cls; + pending->unique_id = 0; + + if (handle->current == NULL) + { + handle->current = pending; + process_pending_message (handle); + } + else + { + handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; + handle->retransmission_buffer = pending; + } + + return GNUNET_YES; +} + /** * Send a message to the DHT telling it to start issuing random PUT * requests every 'frequency' milliseconds. diff --git a/src/dht/gnunet-dht-driver.c b/src/dht/gnunet-dht-driver.c index 78e6850f4..db7bf2159 100644 --- a/src/dht/gnunet-dht-driver.c +++ b/src/dht/gnunet-dht-driver.c @@ -50,12 +50,21 @@ /* Timeout for waiting for puts to be sent to the service */ #define DEFAULT_PUT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 10) +/* Timeout for waiting for puts to be sent to the service */ +#define DEFAULT_FIND_PEER_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 40) + #define DEFAULT_SECONDS_PER_PEER_START GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 45) #define DEFAULT_TEST_DATA_SIZE 8 +#define DEFAULT_BUCKET_SIZE 4 + +#define FIND_PEER_THRESHOLD DEFAULT_BUCKET_SIZE * 2 + #define DEFAULT_MAX_OUTSTANDING_PUTS 10 +#define DEFAULT_MAX_OUTSTANDING_FIND_PEERS 1 + #define DEFAULT_MAX_OUTSTANDING_GETS 10 #define DEFAULT_CONNECT_TIMEOUT 60 @@ -97,6 +106,30 @@ struct MaliciousContext int malicious_type; }; +struct TestFindPeer +{ + /* This is a linked list */ + struct TestFindPeer *next; + + /* Handle to the bigger context */ + struct FindPeerContext *find_peer_context; + + /** + * Handle to the peer's DHT service (via the API) + */ + struct GNUNET_DHT_Handle *dht_handle; + + /** + * Handle to the peer daemon + */ + struct GNUNET_TESTING_Daemon *daemon; + + /** + * Task for disconnecting DHT handles + */ + GNUNET_SCHEDULER_TaskIdentifier disconnect_task; +}; + struct TestPutContext { /* This is a linked list */ @@ -232,8 +265,12 @@ static struct GNUNET_TIME_Relative get_delay; static struct GNUNET_TIME_Relative put_delay; +static struct GNUNET_TIME_Relative find_peer_delay; + static struct GNUNET_TIME_Relative seconds_per_peer_start; +static int do_find_peer; + static unsigned long long test_data_size = DEFAULT_TEST_DATA_SIZE; static unsigned long long max_outstanding_puts = DEFAULT_MAX_OUTSTANDING_PUTS; @@ -242,6 +279,8 @@ static unsigned long long max_outstanding_gets = DEFAULT_MAX_OUTSTANDING_GETS; static unsigned long long malicious_getters; +static unsigned long long max_outstanding_find_peers; + static unsigned long long malicious_putters; static unsigned long long malicious_droppers; @@ -651,6 +690,7 @@ static int stats_handle (void *cls, stats_ctx->peer = peer; GNUNET_CONTAINER_multihashmap_put(stats_map, &peer->hashPubKey, stats_ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } + GNUNET_assert(stats_ctx != NULL); if (strcmp(name, STAT_ROUTES) == 0) stats_ctx->stat_routes = value; @@ -696,6 +736,7 @@ static void log_dht_statistics (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) { stats_map = GNUNET_CONTAINER_multihashmap_create(num_peers); + fprintf(stderr, "Starting statistics logging\n"); GNUNET_TESTING_get_statistics(pg, &stats_finished, &stats_handle, NULL); } @@ -1005,6 +1046,169 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, rand), &do_put, test_put->next); } +/** + * Context for sending out find peer requests. + */ +struct FindPeerContext +{ + struct GNUNET_DHT_Handle *dht_handle; + struct GNUNET_TIME_Absolute endtime; + unsigned int current_peers; + unsigned int previous_peers; + unsigned int outstanding; + unsigned int total; +}; + +static void +schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc); + +/** + * Given a number of total peers and a bucket size, estimate the number of + * connections in a perfect kademlia topology. + */ +static unsigned int connection_estimate(unsigned int peer_count, unsigned int bucket_size) +{ + unsigned int i; + unsigned int filled; + i = num_peers; + + filled = 0; + while (i > bucket_size) + { + filled++; + i = i/2; + } + return filled * bucket_size * peer_count; + +} + +/** + * Callback for iterating over all the peer connections of a peer group. + */ +void count_peers_cb (void *cls, + const struct GNUNET_PeerIdentity *first, + const struct GNUNET_PeerIdentity *second, + struct GNUNET_TIME_Relative latency, + uint32_t distance, + const char *emsg) +{ + struct FindPeerContext *find_peer_context = cls; + if ((first != NULL) && (second != NULL)) + { + find_peer_context->current_peers++; + } + else + { + GNUNET_assert(dhtlog_handle != NULL); + fprintf(stderr, "peer count finished (%u connections), %u new peers, connection estimate %u\n", find_peer_context->current_peers, find_peer_context->current_peers - find_peer_context->previous_peers, connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)); + if ((find_peer_context->current_peers - find_peer_context->previous_peers > FIND_PEER_THRESHOLD) && + (find_peer_context->current_peers < connection_estimate(num_peers, DEFAULT_BUCKET_SIZE)) && + (GNUNET_TIME_absolute_get_remaining(find_peer_context->endtime).value > 0)) + { + fprintf(stderr, "Scheduling another round of find peer requests.\n"); + GNUNET_SCHEDULER_add_now(sched, schedule_find_peer_requests, find_peer_context); + } + else + { + fprintf(stderr, "Not sending any more find peer requests.\n"); + } + } +} + +/** + * Connect to all peers in the peer group and iterate over their + * connections. + */ +static void +count_new_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct FindPeerContext *find_peer_context = cls; + find_peer_context->previous_peers = find_peer_context->current_peers; + find_peer_context->current_peers = 0; + GNUNET_TESTING_get_topology (pg, &count_peers_cb, find_peer_context); +} + + +static void +decrement_find_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct TestFindPeer *test_find_peer = cls; + GNUNET_assert(test_find_peer->find_peer_context->outstanding > 0); + test_find_peer->find_peer_context->outstanding--; + test_find_peer->find_peer_context->total--; + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%d find_peers remaining\n", test_find_peer->find_peer_context->total); + if ((0 == test_find_peer->find_peer_context->total) && + (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value > 0)) + { + GNUNET_SCHEDULER_add_now(sched, &count_new_peers, test_find_peer->find_peer_context); + } + GNUNET_free(test_find_peer); +} + +/** + * A find peer request has been sent to the server, now we will schedule a task + * to wait the appropriate time to allow the request to go out and back. + * + * @param cls closure - a TestFindPeer struct + * @param tc context the task is being called with + */ +static void +handle_find_peer_sent (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct TestFindPeer *test_find_peer = cls; + + GNUNET_DHT_disconnect(test_find_peer->dht_handle); + GNUNET_SCHEDULER_add_delayed(sched, find_peer_delay, &decrement_find_peers, test_find_peer); +} + +static void +send_find_peer_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct TestFindPeer *test_find_peer = cls; + + if (test_find_peer->find_peer_context->outstanding > max_outstanding_find_peers) + { + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 300), &send_find_peer_request, test_find_peer); + return; + } + + test_find_peer->find_peer_context->outstanding++; + if (GNUNET_TIME_absolute_get_remaining(test_find_peer->find_peer_context->endtime).value == 0) + { + GNUNET_SCHEDULER_add_now(sched, &decrement_find_peers, test_find_peer); + return; + } + + test_find_peer->dht_handle = GNUNET_DHT_connect(sched, test_find_peer->daemon->cfg, 1); + GNUNET_assert(test_find_peer->dht_handle != NULL); + fprintf(stderr, "calling GNUNET_DHT_find_peers\n"); + GNUNET_DHT_find_peers (test_find_peer->dht_handle, + &handle_find_peer_sent, test_find_peer); +} + +/** + * Set up a single find peer request for each peer in the topology. Do this + * until the settle time is over, limited by the number of outstanding requests + * and the time allowed for each one! + */ +static void +schedule_find_peer_requests (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc) +{ + struct FindPeerContext *find_peer_ctx = cls; + struct TestFindPeer *test_find_peer; + uint32_t i; + uint32_t random; + + for (i = 0; i < max_outstanding_find_peers; i++) + { + test_find_peer = GNUNET_malloc(sizeof(struct TestFindPeer)); + random = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, num_peers); + test_find_peer->daemon = GNUNET_TESTING_daemon_get(pg, random); + test_find_peer->find_peer_context = find_peer_ctx; + find_peer_ctx->total++; + GNUNET_SCHEDULER_add_now(sched, &send_find_peer_request, test_find_peer); + } +} /** * Set up some all of the put and get operations we want @@ -1061,6 +1265,7 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t int i; int max; struct TopologyIteratorContext *topo_ctx; + struct FindPeerContext *find_peer_context; if (dhtlog_handle != NULL) { if (settle_time >= 60 * 2) @@ -1078,7 +1283,14 @@ continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * t GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), &capture_current_topology, topo_ctx); } else - GNUNET_SCHEDULER_add_now (sched, &setup_puts_and_gets, NULL); + GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), &setup_puts_and_gets, NULL); + + if (GNUNET_YES == do_find_peer) + { + find_peer_context = GNUNET_malloc(sizeof(struct FindPeerContext)); + find_peer_context->endtime = GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time)); + GNUNET_SCHEDULER_add_now(sched, &schedule_find_peer_requests, find_peer_context); + } } /** @@ -1509,6 +1721,7 @@ run (void *cls, hostfile = NULL; hosts = NULL; + temphost = NULL; if (hostfile != NULL) { if (GNUNET_OK != GNUNET_DISK_file_test (hostfile)) @@ -1533,6 +1746,7 @@ run (void *cls, "Could not read file %s specified for host list, ending test!", hostfile); GNUNET_free (hostfile); GNUNET_free (data); + GNUNET_free_non_null(trialmessage); return; } @@ -1587,6 +1801,20 @@ run (void *cls, &num_gets)) num_gets = num_peers; + if (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "find_peer_delay", + &temp_config_number)) + find_peer_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, temp_config_number); + else + find_peer_delay = DEFAULT_FIND_PEER_DELAY; + + if (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "concurrent_find_peers", + &temp_config_number)) + max_outstanding_find_peers = temp_config_number; + else + max_outstanding_find_peers = DEFAULT_MAX_OUTSTANDING_FIND_PEERS; + if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (cfg, "dht_testing", "get_timeout", &temp_config_number)) @@ -1658,6 +1886,15 @@ run (void *cls, &malicious_put_frequency)) malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY; + if (GNUNET_NO == + GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht", + "find_peers")) + { + do_find_peer = GNUNET_NO; + } + else + do_find_peer = GNUNET_YES; + topology_str = NULL; if ((GNUNET_YES == GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology", @@ -1745,7 +1982,7 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Number of peers must be specified in section %s option %s\n", topology_str, "TESTING", "NUM_PEERS"); } - + GNUNET_assert(num_peers > 0 && num_peers < (unsigned long long)-1); /* Set peers_left so we know when all peers started */ peers_left = num_peers; @@ -1800,6 +2037,7 @@ run (void *cls, &topology_callback, NULL, hosts); + GNUNET_free_non_null(temphost); } diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 98e02d2a0..e28682ee4 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -67,24 +67,29 @@ */ #define MINIMUM_PEER_THRESHOLD 20 - - #define DHT_MAX_RECENT 100 +#define FIND_PEER_CALC_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) + /** * Default time to wait to send messages on behalf of other peers. */ -#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10); +#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) /** * Default importance for handling messages on behalf of other peers. */ #define DHT_DEFAULT_P2P_IMPORTANCE 0 +/** + * How long to keep recent requests arounds by default. + */ +#define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60) + /** * Default time to wait to send find peer messages sent by the dht service. */ -#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); +#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** * Default importance for find peer messages sent by the dht service. @@ -99,7 +104,7 @@ /** * Default options for find peer requests sent by the dht service. */ -#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE +#define DHT_DEFAULT_FIND_PEER_OPTIONS GNUNET_DHT_RO_NONE /** * How long at least to wait before sending another find peer request. @@ -109,13 +114,19 @@ /** * How long at most to wait before sending another find peer request. */ -#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5) +#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8) /** * How often to update our preference levels for peers in our routing tables. */ #define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2) +/** + * How long at most on average will we allow a reply forward to take + * (before we quit sending out new requests) + */ +#define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) + /** * How many initial requests to send out (in true Kademlia fashion) */ @@ -144,6 +155,12 @@ */ #define MAX_HOPS 20 +/** + * How many time differences between requesting a core send and + * the actual callback to remember. + */ +#define MAX_REPLY_TIMES 8 + /** * Linked list of messages to send to clients. */ @@ -164,6 +181,11 @@ struct P2PPendingMessage */ unsigned int importance; + /** + * Time when this request was scheduled to be sent. + */ + struct GNUNET_TIME_Absolute scheduled; + /** * How long to wait before sending message. */ @@ -251,7 +273,6 @@ struct PeerInfo * Task for scheduling periodic ping messages for this peer. */ GNUNET_SCHEDULER_TaskIdentifier ping_task; - }; /** @@ -329,7 +350,6 @@ struct ClientList * Tail of linked list of pending messages for this client */ struct PendingMessage *pending_tail; - }; @@ -353,7 +373,7 @@ struct DHT_MessageContext /** * The key this request was about */ - const GNUNET_HashCode *key; + GNUNET_HashCode key; /** * The unique identifier of this request @@ -483,6 +503,21 @@ struct DHTQueryRecord }; +/** + * Context used to calculate the number of find peer messages + * per X time units since our last scheduled find peer message + * was sent. If we have seen too many messages, delay or don't + * send our own out. + */ +struct FindPeerMessageContext +{ + unsigned int count; + + struct GNUNET_TIME_Absolute start; + + struct GNUNET_TIME_Absolute end; +}; + /** * DHT Routing results structure */ @@ -518,17 +553,49 @@ struct RecentRequests struct RecentRequest { + /** + * Position of this node in the min heap. + */ + struct GNUNET_CONTAINER_HeapNode *heap_node; + + /** + * Bloomfilter containing entries for peers + * we forwarded this request to. + */ + struct GNUNET_CONTAINER_BloomFilter *bloom; + + /** + * Timestamp of this request, for ordering + * the min heap. + */ + struct GNUNET_TIME_Absolute timestamp; + + /** + * Key of this request. + */ GNUNET_HashCode key; + + /** + * Unique identifier for this request. + */ uint64_t uid; -}; + /** + * Task to remove this entry on timeout. + */ + GNUNET_SCHEDULER_TaskIdentifier remove_task; +}; -#if 0 /** * Recent requests by hash/uid and by time inserted. */ static struct RecentRequests recent; -#endif + +/** + * Context to use to calculate find peer rates. + */ +static struct FindPeerMessageContext find_peer_context; + /** * Don't use our routing algorithm, always route * to closest peer; initially send requests to 3 @@ -546,6 +613,12 @@ static int stop_on_closest; */ static int stop_on_found; +/** + * Whether DHT needs to manage find peer requests, or + * an external force will do it on behalf of the DHT. + */ +static int do_find_peer; + /** * How many peers have we added since we sent out our last * find peer request? @@ -662,25 +735,113 @@ static unsigned int malicious_dropper; */ static unsigned int malicious_getter; -/* +/** * GNUNET_YES or GNUNET_NO, whether or not to act as * a malicious node which sends out lots of PUTS */ static unsigned int malicious_putter; +/** + * Frequency for malicious get requests. + */ static unsigned long long malicious_get_frequency; +/** + * Frequency for malicious put requests. + */ static unsigned long long malicious_put_frequency; +/** + * Reply times for requests, if we are busy, don't send any + * more requests! + */ +static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES]; + +/** + * Current counter for replies. + */ +static unsigned int reply_counter; + /** * Forward declaration. */ static size_t send_generic_reply (void *cls, size_t size, void *buf); -/* Declare here so retry_core_send is aware of it */ +/** Declare here so retry_core_send is aware of it */ size_t core_transmit_notify (void *cls, size_t size, void *buf); +/** + * Convert unique ID to hash code. + * + * @param uid unique ID to convert + * @param hash set to uid (extended with zeros) + */ +static void +hash_from_uid (uint64_t uid, + GNUNET_HashCode *hash) +{ + memset (hash, 0, sizeof(GNUNET_HashCode)); + *((uint64_t*)hash) = uid; +} + +#if AVG +/** + * Calculate the average send time between messages so that we can + * ignore certain requests if we get too busy. + * + * @return the average time between asking core to send a message + * and when the buffer for copying it is passed + */ +static struct GNUNET_TIME_Relative get_average_send_delay() +{ + unsigned int i; + unsigned int divisor; + struct GNUNET_TIME_Relative average_time; + average_time = GNUNET_TIME_relative_get_zero(); + divisor = 0; + for (i = 0; i < MAX_REPLY_TIMES; i++) + { + average_time = GNUNET_TIME_relative_add(average_time, reply_times[i]); + if (reply_times[i].value == (uint64_t)0) + continue; + else + divisor++; + } + if (divisor == 0) + { + return average_time; + } + + average_time = GNUNET_TIME_relative_divide(average_time, divisor); + fprintf(stderr, "Avg send delay: %u sends is %llu\n", divisor, (long long unsigned int)average_time.value); + return average_time; +} +#endif + +/** + * Find the maximum send time of the recently sent values. + * + * @return the average time between asking core to send a message + * and when the buffer for copying it is passed + */ +static struct GNUNET_TIME_Relative get_max_send_delay() +{ + unsigned int i; + struct GNUNET_TIME_Relative max_time; + max_time = GNUNET_TIME_relative_get_zero(); + + for (i = 0; i < MAX_REPLY_TIMES; i++) + { + if (reply_times[i].value > max_time.value) + max_time.value = reply_times[i].value; + } + + if (max_time.value > MAX_REQUEST_TIME.value) + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n", (long long unsigned int)max_time.value); + return max_time; +} + static void increment_stats(const char *value) { @@ -718,6 +879,10 @@ try_core_send (void *cls, "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id, "DHT", ssize, GNUNET_i2s(&peer->id)); #endif + pending->scheduled = GNUNET_TIME_absolute_get(); + reply_counter++; + if (reply_counter >= MAX_REPLY_TIMES) + reply_counter = 0; peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance, pending->timeout, &peer->id, ssize, &core_transmit_notify, peer); @@ -759,7 +924,7 @@ static void forward_result_message (void *cls, result_message->hop_count = htonl(msg_ctx->hop_count + 1); GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE)); result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id); - memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode)); + memcpy(&result_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode)); memcpy(&result_message[1], msg, ntohs(msg->size)); #if DEBUG_DHT > 1 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id)); @@ -802,6 +967,7 @@ size_t core_transmit_notify (void *cls, peer->th = NULL; off = 0; pending = peer->head; + reply_times[reply_counter] = GNUNET_TIME_absolute_get_difference(pending->scheduled, GNUNET_TIME_absolute_get()); msize = ntohs(pending->msg->size); if (msize <= size) { @@ -1246,6 +1412,7 @@ static int move_lowest_bucket (void *cls, struct PeerInfo *peer = value; int new_bucket; + GNUNET_assert(lowest_bucket > 0); new_bucket = lowest_bucket - 1; remove_peer(peer, lowest_bucket); GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head, @@ -1358,7 +1525,7 @@ static void forward_message (void *cls, increment_stats(STAT_ROUTE_FORWARDS); - if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(msg_ctx->key))) + if ((msg_ctx->closest != GNUNET_YES) && (peer == find_closest_peer(&msg_ctx->key))) increment_stats(STAT_ROUTE_FORWARDS_CLOSEST); msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size); @@ -1378,8 +1545,7 @@ static void forward_message (void *cls, route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id); if (msg_ctx->bloom != NULL) GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE)); - if (msg_ctx->key != NULL) - memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode)); + memcpy(&route_message->key, &msg_ctx->key, sizeof(GNUNET_HashCode)); memcpy(&route_message[1], msg, ntohs(msg->size)); #if DEBUG_DHT > 1 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id)); @@ -1625,6 +1791,8 @@ static int consider_peer (struct GNUNET_PeerIdentity *peer) if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->hashPubKey)) return GNUNET_NO; /* We already know this peer (are connected even!) */ bucket = find_current_bucket(&peer->hashPubKey); + if (bucket == GNUNET_SYSERR) + return GNUNET_NO; if ((k_buckets[bucket].peers_size < bucket_size) || ((bucket == lowest_bucket) && (lowest_bucket > 0))) return GNUNET_YES; @@ -1667,6 +1835,7 @@ static int route_result_message(void *cls, } else /* We have a valid hello, and peer id stored in new_peer */ { + find_peer_context.count++; increment_stats(STAT_FIND_PEER_REPLY); if (GNUNET_YES == consider_peer(&new_peer)) { @@ -1682,7 +1851,7 @@ static int route_result_message(void *cls, if (malicious_dropper == GNUNET_YES) record = NULL; else - record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, message_context->key); + record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &message_context->key); if (record == NULL) /* No record of this message! */ { @@ -1701,7 +1870,7 @@ static int route_result_message(void *cls, message_context->hop_count, GNUNET_SYSERR, &my_identity, - message_context->key, + &message_context->key, message_context->peer, NULL); } #endif @@ -1728,7 +1897,7 @@ static int route_result_message(void *cls, { dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT, message_context->hop_count, - GNUNET_YES, &my_identity, message_context->key, + GNUNET_YES, &my_identity, &message_context->key, message_context->peer, NULL); } #endif @@ -1763,7 +1932,7 @@ static int route_result_message(void *cls, dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT, message_context->hop_count, - GNUNET_NO, &my_identity, message_context->key, + GNUNET_NO, &my_identity, &message_context->key, message_context->peer, &pos->source); } #endif @@ -1876,7 +2045,7 @@ handle_dht_get (void *cls, if (datacache != NULL) results = - GNUNET_DATACACHE_get (datacache, message_context->key, get_type, + GNUNET_DATACACHE_get (datacache, &message_context->key, get_type, &datacache_get_iterator, message_context); if (results >= 1) @@ -1891,14 +2060,14 @@ handle_dht_get (void *cls, { dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET, message_context->hop_count, GNUNET_YES, &my_identity, - message_context->key); + &message_context->key); } if ((debug_routes_extended) && (dhtlog_handle != NULL)) { dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE, message_context->hop_count, GNUNET_YES, - &my_identity, message_context->key, message_context->peer, + &my_identity, &message_context->key, message_context->peer, NULL); } #endif @@ -1911,7 +2080,7 @@ handle_dht_get (void *cls, { dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET, message_context->hop_count, GNUNET_NO, &my_identity, - message_context->key); + &message_context->key); } #endif } @@ -2000,7 +2169,7 @@ handle_dht_find_peer (void *cls, { dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER, message_context->hop_count, GNUNET_YES, &my_identity, - message_context->key); + &message_context->key); } #endif GNUNET_free(find_peer_result); @@ -2046,7 +2215,7 @@ handle_dht_put (void *cls, { dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT, message_context->hop_count, GNUNET_NO, &my_identity, - message_context->key); + &message_context->key); } } #endif @@ -2059,7 +2228,7 @@ handle_dht_put (void *cls, { dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE, message_context->hop_count, GNUNET_YES, - &my_identity, message_context->key, message_context->peer, + &my_identity, &message_context->key, message_context->peer, NULL); } @@ -2067,13 +2236,13 @@ handle_dht_put (void *cls, { dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT, message_context->hop_count, GNUNET_YES, &my_identity, - message_context->key); + &message_context->key); } #endif increment_stats(STAT_PUTS_INSERTED); if (datacache != NULL) - GNUNET_DATACACHE_put (datacache, message_context->key, data_size, + GNUNET_DATACACHE_put (datacache, &message_context->key, data_size, (char *) &put_msg[1], put_type, GNUNET_TIME_absolute_ntoh(put_msg->expiration)); else @@ -2154,9 +2323,8 @@ get_forward_count (unsigned int hop_count, size_t target_replication) } target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */ target_replication / (target_replication * (hop_count + 1) + diameter); - target_value = 0; - #if NONSENSE + target_value = 0; while (target_value < target_count) target_value++; /* target_value is ALWAYS 1 after this "loop" */ #else @@ -2171,33 +2339,44 @@ get_forward_count (unsigned int hop_count, size_t target_replication) /* * Check whether my identity is closer than any known peers. + * If a non-null bloomfilter is given, check if this is the closest + * peer that hasn't already been routed to. * * @param target hash code to check closeness to + * @param bloom bloomfilter, exclude these entries from the decision * * Return GNUNET_YES if node location is closest, GNUNET_NO * otherwise. */ int -am_closest_peer (const GNUNET_HashCode * target) +am_closest_peer (const GNUNET_HashCode * target, struct GNUNET_CONTAINER_BloomFilter *bloom) { int bits; int other_bits; int bucket_num; int count; struct PeerInfo *pos; +#if INTEGER_DISTANCE unsigned int my_distance; - +#endif bucket_num = find_current_bucket(target); if (bucket_num == GNUNET_SYSERR) /* Same key! */ return GNUNET_YES; bits = matching_bits(&my_identity.hashPubKey, target); +#if INTEGER_DISTANCE my_distance = distance(&my_identity.hashPubKey, target); - +#endif pos = k_buckets[bucket_num].head; count = 0; while ((pos != NULL) && (count < bucket_size)) { + if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test(bloom, &pos->id.hashPubKey))) + { + pos = pos->next; + continue; /* Skip already checked entries */ + } + other_bits = matching_bits(&pos->id.hashPubKey, target); if (other_bits > bits) return GNUNET_NO; @@ -2361,6 +2540,33 @@ select_peer (const GNUNET_HashCode * target, } } +/** + * Task used to remove recent entries, either + * after timeout, when full, or on shutdown. + * + * @param cls the entry to remove + * @param tc context, reason, etc. + */ +static void +remove_recent (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct RecentRequest *req = cls; + static GNUNET_HashCode hash; + + GNUNET_assert(req != NULL); + hash_from_uid(req->uid, &hash); + GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(recent.hashmap, &hash, req)); + GNUNET_CONTAINER_heap_remove_node(recent.minHeap, req->heap_node); + GNUNET_CONTAINER_bloomfilter_free(req->bloom); + GNUNET_free(req); + + if ((tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) && (0 == GNUNET_CONTAINER_multihashmap_size(recent.hashmap)) && (0 == GNUNET_CONTAINER_heap_get_size(recent.minHeap))) + { + GNUNET_CONTAINER_multihashmap_destroy(recent.hashmap); + GNUNET_CONTAINER_heap_destroy(recent.minHeap); + } +} + /** * Task used to remove forwarding entries, either @@ -2408,6 +2614,7 @@ static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx) while (current_size >= MAX_OUTSTANDING_FORWARDS) { source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap); + GNUNET_assert(source_info != NULL); record = source_info->record; GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info); if (record->head == NULL) /* No more entries in DLL */ @@ -2420,7 +2627,7 @@ static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx) current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap); } now = GNUNET_TIME_absolute_get(); - record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, msg_ctx->key); + record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &msg_ctx->key); if (record != NULL) /* Already know this request! */ { pos = record->head; @@ -2439,8 +2646,8 @@ static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx) else { record = GNUNET_malloc(sizeof (struct DHTQueryRecord)); - GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode)); + GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, &msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + memcpy(&record->key, &msg_ctx->key, sizeof(GNUNET_HashCode)); } source_info = GNUNET_malloc(sizeof(struct DHTRouteSource)); @@ -2480,11 +2687,12 @@ static int route_message(void *cls, { int i; struct PeerInfo *selected; +#if DEBUG_DHT_ROUTING > 1 struct PeerInfo *nearest; - unsigned int forward_count; -#if DEBUG_DHT - char *nearest_buf; #endif + unsigned int forward_count; + struct RecentRequest *recent_req; + GNUNET_HashCode unique_hash; #if DEBUG_DHT_ROUTING int ret; #endif @@ -2496,7 +2704,7 @@ static int route_message(void *cls, { dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE, message_context->hop_count, GNUNET_SYSERR, - &my_identity, message_context->key, message_context->peer, + &my_identity, &message_context->key, message_context->peer, NULL); } #endif @@ -2506,15 +2714,16 @@ static int route_message(void *cls, } increment_stats(STAT_ROUTES); - message_context->closest = am_closest_peer(message_context->key); + /* Semantics of this call means we find whether we are the closest peer out of those already + * routed to on this messages path. + */ + message_context->closest = am_closest_peer(&message_context->key, message_context->bloom); forward_count = get_forward_count(message_context->hop_count, message_context->replication); - nearest = find_closest_peer(message_context->key); - + if (message_context->bloom == NULL) message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); if ((stop_on_closest == GNUNET_YES) && (message_context->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT)) -/* || ((strict_kademlia == GNUNET_YES) && (message_context->closest == GNUNET_YES))) */ forward_count = 0; #if DEBUG_DHT_ROUTING @@ -2527,7 +2736,7 @@ static int route_message(void *cls, { dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE, message_context->hop_count, ret, - &my_identity, message_context->key, message_context->peer, + &my_identity, &message_context->key, message_context->peer, NULL); } #endif @@ -2556,10 +2765,10 @@ static int route_message(void *cls, { if ((debug_routes) && (dhtlog_handle != NULL)) { - dhtlog_handle->insert_dhtkey(NULL, message_context->key); + dhtlog_handle->insert_dhtkey(NULL, &message_context->key); dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_FIND_PEER, message_context->hop_count, GNUNET_NO, &my_identity, - message_context->key); + &message_context->key); } } #endif @@ -2570,42 +2779,47 @@ static int route_message(void *cls, } GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey); -#if 0 - if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent->hashmap, message_context->key)) - { - if (GNUNET_SYSERR = GNUNET_CONTAINER_multihashmap_get_multiple (recent->hashmap, message_context->key, &find_matching_recent, &message_context)) /* Have too recently seen this request! */ - { - forward_count = 0; - } - else /* Exact match not found, but same key found */ - { - recent_req = GNUNET_CONTAINER_multihashmap_get(recent->hashmap, message_context->key); - } + hash_from_uid(message_context->unique_id, &unique_hash); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(recent.hashmap, &unique_hash)) + { + recent_req = GNUNET_CONTAINER_multihashmap_get(recent.hashmap, &unique_hash); + GNUNET_assert(recent_req != NULL); + if (0 != memcmp(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode))) + increment_stats(STAT_DUPLICATE_UID); + else + { + increment_stats(STAT_RECENT_SEEN); + GNUNET_CONTAINER_bloomfilter_or2(message_context->bloom, recent_req->bloom, DHT_BLOOM_SIZE); + } } else { recent_req = GNUNET_malloc(sizeof(struct RecentRequest)); recent_req->uid = message_context->unique_id; - memcmp(&recent_req->key, message_context->key, sizeof(GNUNET_HashCode)); + memcpy(&recent_req->key, &message_context->key, sizeof(GNUNET_HashCode)); recent_req->remove_task = GNUNET_SCHEDULER_add_delayed(sched, DEFAULT_RECENT_REMOVAL, &remove_recent, recent_req); - GNUNET_CONTAINER_heap_insert(recent->minHeap, recent_req, GNUNET_TIME_absolute_get()); - GNUNET_CONTAINER_multihashmap_put(recent->hashmap, message_context->key, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + recent_req->heap_node = GNUNET_CONTAINER_heap_insert(recent.minHeap, recent_req, GNUNET_TIME_absolute_get().value); + recent_req->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); + GNUNET_CONTAINER_multihashmap_put(recent.hashmap, &unique_hash, recent_req, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } - if (GNUNET_CONTAINER_multihashmap_size(recent->hashmap) > DHT_MAX_RECENT) + if (GNUNET_CONTAINER_multihashmap_size(recent.hashmap) > DHT_MAX_RECENT) { - remove_oldest_recent(); + recent_req = GNUNET_CONTAINER_heap_peek(recent.minHeap); + GNUNET_assert(recent_req != NULL); + GNUNET_SCHEDULER_cancel(sched, recent_req->remove_task); + GNUNET_SCHEDULER_add_now(sched, &remove_recent, recent_req); } -#endif for (i = 0; i < forward_count; i++) { - selected = select_peer(message_context->key, message_context->bloom); + selected = select_peer(&message_context->key, message_context->bloom); if (selected != NULL) { GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey); #if DEBUG_DHT_ROUTING > 1 + nearest = find_closest_peer(&message_context->key); nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id, @@ -2616,7 +2830,7 @@ static int route_message(void *cls, { dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE, message_context->hop_count, GNUNET_NO, - &my_identity, message_context->key, message_context->peer, + &my_identity, &message_context->key, message_context->peer, &selected->id); } forward_message(cls, msg, selected, message_context); @@ -2640,7 +2854,10 @@ static int route_message(void *cls, #endif if (message_context->bloom != NULL) - GNUNET_CONTAINER_bloomfilter_free(message_context->bloom); + { + GNUNET_CONTAINER_bloomfilter_or2(recent_req->bloom, message_context->bloom, DHT_BLOOM_SIZE); + GNUNET_CONTAINER_bloomfilter_free(message_context->bloom); + } return forward_count; } @@ -2684,7 +2901,6 @@ malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static struct GNUNET_DHT_PutMessage put_message; static struct DHT_MessageContext message_context; static GNUNET_HashCode key; - unsigned int mcsize; uint32_t random_key; if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) @@ -2694,12 +2910,11 @@ malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE); put_message.expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever()); - mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode); memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.client = NULL; random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1); GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key); - message_context.key = &key; + memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode)); message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1)); message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION); message_context.msg_options = ntohl (0); @@ -2726,9 +2941,8 @@ static void malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { static struct GNUNET_DHT_GetMessage get_message; - static struct DHT_MessageContext message_context; + struct DHT_MessageContext message_context; static GNUNET_HashCode key; - unsigned int mcsize; uint32_t random_key; if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) @@ -2737,12 +2951,11 @@ malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET); get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE); - mcsize = sizeof(struct DHT_MessageContext) + sizeof(GNUNET_HashCode); memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.client = NULL; random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1); GNUNET_CRYPTO_hash(&random_key, sizeof(uint32_t), &key); - message_context.key = &key; + memcpy(&message_context.key, &key, sizeof(GNUNET_HashCode)); message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, (uint64_t)-1)); message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION); message_context.msg_options = ntohl (0); @@ -2754,13 +2967,10 @@ malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) dhtlog_handle->insert_dhtkey(NULL, &key); increment_stats(STAT_GET_START); GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Sending malicious GET message with hash %s", my_short_id, "DHT", GNUNET_h2s(&key)); - route_message(NULL, &get_message.header, &message_context); + route_message (NULL, &get_message.header, &message_context); GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, malicious_get_frequency), &malicious_get_task, NULL); } -#if DO_FIND_PEER - - /** * Iterator over hash map entries. * @@ -2797,11 +3007,41 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc int ret; struct GNUNET_TIME_Relative next_send_time; struct GNUNET_CONTAINER_BloomFilter *temp_bloom; - +#if COUNT_INTERVAL + struct GNUNET_TIME_Relative time_diff; + struct GNUNET_TIME_Absolute end; + double multiplier; + double count_per_interval; +#endif if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; + if ((newly_found_peers > bucket_size) && (GNUNET_YES == do_find_peer)) /* If we are finding peers already, no need to send out our request right now! */ + { + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Have %d newly found peers since last find peer message sent!\n", newly_found_peers); + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_TIME_UNIT_MINUTES, + &send_find_peer_message, NULL); + newly_found_peers = 0; + return; + } + increment_stats(STAT_FIND_PEER_START); +#if COUNT_INTERVAL + end = GNUNET_TIME_absolute_get(); + time_diff = GNUNET_TIME_absolute_get_difference(find_peer_context.start, end); + + if (time_diff.value > FIND_PEER_CALC_INTERVAL.value) + { + multiplier = time_diff.value / FIND_PEER_CALC_INTERVAL.value; + count_per_interval = find_peer_context.count / multiplier; + } + else + { + multiplier = FIND_PEER_CALC_INTERVAL.value / time_diff.value; + count_per_interval = find_peer_context.count * multiplier; + } +#endif find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage)); find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage)); @@ -2810,10 +3050,10 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc GNUNET_CONTAINER_multihashmap_iterate(all_known_peers, &add_known_to_bloom, temp_bloom); GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(temp_bloom, find_peer_msg->bloomfilter, DHT_BLOOM_SIZE)); memset(&message_context, 0, sizeof(struct DHT_MessageContext)); - message_context.key = &my_identity.hashPubKey; + memcpy(&message_context.key, &my_identity.hashPubKey, sizeof(GNUNET_HashCode)); message_context.unique_id = GNUNET_ntohll (GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, (uint64_t)-1)); - message_context.replication = ntohl (DHT_DEFAULT_FIND_PEER_REPLICATION); - message_context.msg_options = ntohl (DHT_DEFAULT_FIND_PEER_OPTIONS); + message_context.replication = DHT_DEFAULT_FIND_PEER_REPLICATION; + message_context.msg_options = DHT_DEFAULT_FIND_PEER_OPTIONS; message_context.network_size = estimate_diameter(); message_context.peer = &my_identity; message_context.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE; @@ -2836,12 +3076,18 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, DHT_MAXIMUM_FIND_PEER_INTERVAL.value - DHT_MINIMUM_FIND_PEER_INTERVAL.value); } + + GNUNET_assert (next_send_time.value != 0); + find_peer_context.count = 0; newly_found_peers = 0; - GNUNET_SCHEDULER_add_delayed (sched, - next_send_time, - &send_find_peer_message, NULL); + find_peer_context.start = GNUNET_TIME_absolute_get(); + if (GNUNET_YES == do_find_peer) + { + GNUNET_SCHEDULER_add_delayed (sched, + next_send_time, + &send_find_peer_message, NULL); + } } -#endif /** * Handler for any generic DHT messages, calls the appropriate handler @@ -2859,11 +3105,7 @@ handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message; const struct GNUNET_MessageHeader *enc_msg; struct DHT_MessageContext message_context; - size_t enc_type; - enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; - enc_type = ntohs (enc_msg->type); - #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", @@ -2876,7 +3118,7 @@ handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client, #endif memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.client = find_active_client (client); - message_context.key = &dht_msg->key; + memcpy(&message_context.key, &dht_msg->key, sizeof(GNUNET_HashCode)); message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id); message_context.replication = ntohl (dht_msg->desired_replication_level); message_context.msg_options = ntohl (dht_msg->options); @@ -2920,6 +3162,10 @@ handle_dht_control_message (void *cls, struct GNUNET_SERVER_Client *client, switch (ntohs(dht_control_msg->command)) { + case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Sending self seeking find peer request!\n"); + GNUNET_SCHEDULER_add_now(sched, &send_find_peer_message, NULL); + break; case GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET: if (ntohs(dht_control_msg->variable) > 0) malicious_get_frequency = ntohs(dht_control_msg->variable); @@ -2971,15 +3217,11 @@ handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client, (const struct GNUNET_DHT_StopMessage *) message; struct DHTQueryRecord *record; struct DHTRouteSource *pos; - uint64_t uid; #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT", "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); #endif - - uid = GNUNET_ntohll(dht_stop_msg->unique_id); - record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key); if (record != NULL) { @@ -3017,6 +3259,12 @@ handle_dht_p2p_route_request (void *cls, struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1]; struct DHT_MessageContext *message_context; + if (get_max_send_delay().value > MAX_REQUEST_TIME.value) + { + fprintf(stderr, "Sending of previous requests has taken far too long, backing off!\n"); + return GNUNET_YES; + } + if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/ { #if DEBUG_PING @@ -3034,7 +3282,7 @@ handle_dht_p2p_route_request (void *cls, message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K); GNUNET_assert(message_context->bloom != NULL); message_context->hop_count = ntohl(incoming->hop_count); - message_context->key = &incoming->key; + memcpy(&message_context->key, &incoming->key, sizeof(GNUNET_HashCode)); message_context->replication = ntohl(incoming->desired_replication_level); message_context->unique_id = GNUNET_ntohll(incoming->unique_id); message_context->msg_options = ntohl(incoming->options); @@ -3074,7 +3322,7 @@ handle_dht_p2p_route_result (void *cls, memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K); GNUNET_assert(message_context.bloom != NULL); - message_context.key = &incoming->key; + memcpy(&message_context.key, &incoming->key, sizeof(GNUNET_HashCode)); message_context.unique_id = GNUNET_ntohll(incoming->unique_id); message_context.msg_options = ntohl(incoming->options); message_context.hop_count = ntohl(incoming->hop_count); @@ -3322,7 +3570,7 @@ run (void *cls, coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */ cfg, /* Main configuration */ GNUNET_TIME_UNIT_FOREVER_REL, - NULL, /* Closure passed to DHT functionas around? */ + NULL, /* Closure passed to DHT functions */ &core_init, /* Call core_init once connected */ &handle_core_connect, /* Handle connects */ &handle_core_disconnect, /* remove peers on disconnects */ @@ -3402,6 +3650,15 @@ run (void *cls, malicious_dropper = GNUNET_YES; } + if (GNUNET_NO == + GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht", + "do_find_peer")) + { + do_find_peer = GNUNET_NO; + } + else + do_find_peer = GNUNET_YES; + if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging_extended")) @@ -3445,14 +3702,19 @@ run (void *cls, GNUNET_STATISTICS_set(stats, STAT_HELLOS_PROVIDED, 0, GNUNET_NO); GNUNET_STATISTICS_set(stats, STAT_DISCONNECTS, 0, GNUNET_NO); } -#if DO_FIND_PEER - next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value + - GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, - (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.value); - GNUNET_SCHEDULER_add_delayed (sched, - next_send_time, - &send_find_peer_message, NULL); -#endif + /* FIXME: if there are no recent requests then these never get freed, but alternative is _annoying_! */ + recent.hashmap = GNUNET_CONTAINER_multihashmap_create(DHT_MAX_RECENT / 2); + recent.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); + if (GNUNET_YES == do_find_peer) + { + next_send_time.value = DHT_MINIMUM_FIND_PEER_INTERVAL.value + + GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, + (DHT_MAXIMUM_FIND_PEER_INTERVAL.value / 2) - DHT_MINIMUM_FIND_PEER_INTERVAL.value); + find_peer_context.start = GNUNET_TIME_absolute_get(); + GNUNET_SCHEDULER_add_delayed (sched, + next_send_time, + &send_find_peer_message, &find_peer_context); + } /* Scheduled the task to clean up when shutdown is called */ cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,