From 1f6511d450641f20c69f616dbdbbbb1badbbbc5a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 26 Sep 2011 22:02:03 +0000 Subject: [PATCH] more wild hxing --- src/dht/gnunet-service-dht-new.c | 1719 +---------------------- src/dht/gnunet-service-dht_clients.c | 9 +- src/dht/gnunet-service-dht_clients.h | 2 +- src/dht/gnunet-service-dht_datacache.c | 4 +- src/dht/gnunet-service-dht_datacache.h | 4 +- src/dht/gnunet-service-dht_neighbours.c | 259 ++-- src/dht/gnunet-service-dht_neighbours.h | 12 +- src/dht/gnunet-service-dht_nse.c | 6 +- src/dht/gnunet-service-dht_nse.h | 6 +- src/dht/gnunet-service-dht_routing.c | 211 +++ src/dht/gnunet-service-dht_routing.h | 93 ++ 11 files changed, 534 insertions(+), 1791 deletions(-) create mode 100644 src/dht/gnunet-service-dht_routing.c create mode 100644 src/dht/gnunet-service-dht_routing.h diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c index 87cf97a40..17330ddb6 100644 --- a/src/dht/gnunet-service-dht-new.c +++ b/src/dht/gnunet-service-dht-new.c @@ -66,11 +66,6 @@ */ #define MINIMUM_PEER_THRESHOLD 20 -/** - * Number of requests we track at most (for routing replies). - */ -#define DHT_MAX_RECENT (1024 * 16) - /** * How long do we wait at most when queueing messages with core * that we are sending on behalf of other peers. @@ -132,258 +127,6 @@ -/** - * Context containing information about a DHT message received. - */ -struct DHT_MessageContext -{ - /** - * The client this request was received from. - * (NULL if received from another peer) - */ - struct ClientList *client; - - /** - * The peer this request was received from. - */ - struct GNUNET_PeerIdentity peer; - - /** - * Bloomfilter for this routing request. - */ - struct GNUNET_CONTAINER_BloomFilter *bloom; - - /** - * extended query (see gnunet_block_lib.h). - */ - const void *xquery; - - /** - * Bloomfilter to filter out duplicate replies. - */ - struct GNUNET_CONTAINER_BloomFilter *reply_bf; - - /** - * The key this request was about - */ - GNUNET_HashCode key; - - /** - * How long should we wait to transmit this request? - */ - struct GNUNET_TIME_Relative timeout; - - /** - * The unique identifier of this request - */ - uint64_t unique_id; - - /** - * Number of bytes in xquery. - */ - size_t xquery_size; - - /** - * Mutator value for the reply_bf, see gnunet_block_lib.h - */ - uint32_t reply_bf_mutator; - - /** - * Desired replication level - */ - uint32_t replication; - - /** - * Network size estimate, either ours or the sum of - * those routed to thus far. =~ Log of number of peers - * chosen from for this request. - */ - uint32_t network_size; - - /** - * Any message options for this request - */ - uint32_t msg_options; - - /** - * How many hops has the message already traversed? - */ - uint32_t hop_count; - - /** - * How many peer identities are present in the path history? - */ - uint32_t path_history_len; - - /** - * Path history. - */ - char *path_history; - - /** - * How important is this message? - */ - unsigned int importance; - - /** - * Should we (still) forward the request on to other peers? - */ - int do_forward; - - /** - * Did we forward this message? (may need to remember it!) - */ - int forwarded; - - /** - * Are we the closest known peer to this key (out of our neighbors?) - */ - int closest; -}; - - -/** - * Record used for remembering what peers are waiting for what - * responses (based on search key). - */ -struct DHTRouteSource -{ - /** - * This is a DLL. - */ - struct DHTRouteSource *next; - - /** - * This is a DLL. - */ - struct DHTRouteSource *prev; - - /** - * UID of the request, 0 if from another peer. - */ - uint64_t uid; - - /** - * Source of the request. Replies should be forwarded to - * this peer. - */ - struct GNUNET_PeerIdentity source; - - /** - * If this was a local request, remember the client; otherwise NULL. - */ - struct ClientList *client; - - /** - * Pointer to this nodes heap location (for removal) - */ - struct GNUNET_CONTAINER_HeapNode *hnode; - - /** - * Back pointer to the record storing this information. - */ - struct DHTQueryRecord *record; - - /** - * Task to remove this entry on timeout. - */ - GNUNET_SCHEDULER_TaskIdentifier delete_task; - - /** - * Bloomfilter of peers we have already sent back as - * replies to the initial request. Allows us to not - * forward the same peer multiple times for a find peer - * request. - */ - struct GNUNET_CONTAINER_BloomFilter *find_peers_responded; - -}; - - -/** - * Entry in the DHT routing table. - */ -struct DHTQueryRecord -{ - /** - * Head of DLL for result forwarding. - */ - struct DHTRouteSource *head; - - /** - * Tail of DLL for result forwarding. - */ - struct DHTRouteSource *tail; - - /** - * Key that the record concerns. - */ - GNUNET_HashCode key; - -}; - - -/** - * Context used to calculate the number of find peer messages - * per X time units since our last scheduled find peer message - * was sent. If we have seen too many messages, delay or don't - * send our own out. - */ -struct FindPeerMessageContext -{ - unsigned int count; - - struct GNUNET_TIME_Absolute start; - -}; - - -struct RecentRequest -{ - /** - * Position of this node in the min heap. - */ - struct GNUNET_CONTAINER_HeapNode *heap_node; - - /** - * Bloomfilter containing entries for peers - * we forwarded this request to. - */ - struct GNUNET_CONTAINER_BloomFilter *bloom; - - /** - * Timestamp of this request, for ordering - * the min heap. - */ - struct GNUNET_TIME_Absolute timestamp; - - /** - * Key of this request. - */ - GNUNET_HashCode key; - - /** - * Unique identifier for this request, 0 if from another peer. - */ - uint64_t uid; - - /** - * Task to remove this entry on timeout. - */ - GNUNET_SCHEDULER_TaskIdentifier remove_task; -}; - - -/** - * Recent requests by time inserted. - */ -static struct GNUNET_CONTAINER_Heap *recent_heap; - -/** - * Context to use to calculate find peer rates. - */ -static struct FindPeerMessageContext find_peer_context; - /** * How many peers have we added since we sent out our last * find peer request? @@ -420,11 +163,6 @@ static struct GNUNET_TRANSPORT_Handle *transport_handle; */ static struct GNUNET_PeerIdentity my_identity; -/** - * Short id of the peer, for printing - */ -static char *my_short_id; - /** * Our HELLO */ @@ -440,17 +178,6 @@ static GNUNET_SCHEDULER_TaskIdentifier cleanup_task; */ static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests; -/** - * Reply times for requests, if we are busy, don't send any - * more requests! - */ -static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES]; - -/** - * Current counter for replies. - */ -static unsigned int reply_counter; - /** * Our handle to the BLOCK library. */ @@ -458,11 +185,6 @@ static struct GNUNET_BLOCK_Context *block_context; -/** Declare here so retry_core_send is aware of it */ -static size_t -core_transmit_notify (void *cls, size_t size, void *buf); - - /** * Given the largest send delay, artificially decrease it @@ -531,170 +253,6 @@ decrement_stats (const char *value) } -/** - * Try to send another message from our core send list - */ -static void -try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct PeerInfo *peer = cls; - struct P2PPendingMessage *pending; - size_t ssize; - - peer->send_task = GNUNET_SCHEDULER_NO_TASK; - - if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) - return; - - if (peer->th != NULL) - return; /* Message send already in progress */ - - pending = peer->head; - if (pending != NULL) - { - ssize = ntohs (pending->msg->size); -#if DEBUG_DHT > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", - my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id)); -#endif - pending->scheduled = GNUNET_TIME_absolute_get (); - reply_counter++; - if (reply_counter >= MAX_REPLY_TIMES) - reply_counter = 0; - peer->th = - GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES, - pending->importance, - pending->timeout, &peer->id, ssize, - &core_transmit_notify, peer); - if (peer->th == NULL) - increment_stats ("# notify transmit ready failed"); - } -} - - -/** - * Function called to send a request out to another peer. - * Called both for locally initiated requests and those - * received from other peers. - * - * @param msg the encapsulated message - * @param peer the peer to forward the message to - * @param msg_ctx the context of the message (hop count, bloom, etc.) - */ -static void -forward_result_message (const struct GNUNET_MessageHeader *msg, - struct PeerInfo *peer, - struct DHT_MessageContext *msg_ctx) -{ - struct GNUNET_DHT_P2PRouteResultMessage *result_message; - struct P2PPendingMessage *pending; - size_t msize; - size_t psize; - char *path_start; - char *path_offset; - - increment_stats (STAT_RESULT_FORWARDS); - msize = - sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) + - (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len); - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - psize = sizeof (struct P2PPendingMessage) + msize; - pending = GNUNET_malloc (psize); - pending->msg = (struct GNUNET_MessageHeader *) &pending[1]; - pending->importance = DHT_SEND_PRIORITY; - pending->timeout = GNUNET_TIME_relative_get_forever (); - result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg; - result_message->header.size = htons (msize); - result_message->header.type = - htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT); - result_message->outgoing_path_length = htonl (msg_ctx->path_history_len); - if (msg_ctx->path_history_len > 0) - { - /* End of pending is where enc_msg starts */ - path_start = (char *) &pending[1]; - /* Offset by the size of the enc_msg */ - path_start += ntohs (msg->size); - memcpy (path_start, msg_ctx->path_history, - msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity))); - } - result_message->options = htonl (msg_ctx->msg_options); - result_message->hop_count = htonl (msg_ctx->hop_count + 1); - memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode)); - /* Copy the enc_msg, then the path history as well! */ - memcpy (&result_message[1], msg, ntohs (msg->size)); - path_offset = (char *) &result_message[1]; - path_offset += ntohs (msg->size); - /* If we have path history, copy it to the end of the whole thing */ - if (msg_ctx->path_history_len > 0) - memcpy (path_offset, msg_ctx->path_history, - msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity))); -#if DEBUG_DHT > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Adding pending message size %d for peer %s\n", my_short_id, - "DHT", msize, GNUNET_i2s (&peer->id)); -#endif - peer->pending_count++; - increment_stats ("# pending messages scheduled"); - GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail, - pending); - if (peer->send_task == GNUNET_SCHEDULER_NO_TASK) - peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer); -} - - -/** - * Called when core is ready to send a message we asked for - * out to the destination. - * - * @param cls closure (NULL) - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -core_transmit_notify (void *cls, size_t size, void *buf) -{ - struct PeerInfo *peer = cls; - char *cbuf = buf; - struct P2PPendingMessage *pending; - - size_t off; - size_t msize; - - peer->th = NULL; - if (buf == NULL) - { - /* client disconnected */ -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", - my_short_id, "DHT"); -#endif - return 0; - } - - if (peer->head == NULL) - return 0; - - off = 0; - pending = peer->head; - while (NULL != pending && - (size - off >= (msize = ntohs (pending->msg->size)))) - { - memcpy (&cbuf[off], pending->msg, msize); - off += msize; - peer->pending_count--; - increment_stats ("# pending messages sent"); - GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending); - GNUNET_free (pending); - pending = peer->head; - } - if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK)) - peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer); - - return off; -} - /** * Compute the distance between have and target as a 32-bit value. @@ -763,618 +321,82 @@ distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have) * Must fudge the value if NO bits match. */ static unsigned int -inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have) -{ - if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0) - return 1; /* Never return 0! */ - return ((unsigned int) -1) - distance (target, have); -} - - -/** - * Find which k-bucket this peer should go into, - * taking into account the size of the k-bucket - * array. This means that if more bits match than - * there are currently buckets, lowest_bucket will - * be returned. - * - * @param hc GNUNET_HashCode we are finding the bucket for. - * - * @return the proper bucket index for this key, - * or GNUNET_SYSERR on error (same hashcode) - */ -static int -find_current_bucket (const GNUNET_HashCode * hc) -{ - int actual_bucket; - - actual_bucket = find_bucket (hc); - if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */ - return lowest_bucket; - if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */ - return lowest_bucket; - return actual_bucket; -} - - -/** - * Find a routing table entry from a peer identity - * - * @param peer the peer identity to look up - * - * @return the routing table entry, or NULL if not found - */ -static struct PeerInfo * -find_peer_by_id (const struct GNUNET_PeerIdentity *peer) -{ - int bucket; - struct PeerInfo *pos; - - bucket = find_current_bucket (&peer->hashPubKey); - - if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) - return NULL; - - pos = k_buckets[bucket].head; - while (pos != NULL) - { - if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity))) - return pos; - pos = pos->next; - } - return NULL; /* No such peer. */ -} - -/* Forward declaration */ -static void -update_core_preference (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - - -/** - * Function called with statistics about the given peer. - * - * @param cls closure - * @param peer identifies the peer - * @param bpm_out set to the current bandwidth limit (sending) for this peer - * @param amount set to the amount that was actually reserved or unreserved; - * either the full requested amount or zero (no partial reservations) - * @param res_delay if the reservation could not be satisfied (amount was 0), how - * long should the client wait until re-trying? - * @param preference current traffic preference for the given peer - */ -static void -update_core_preference_finish (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GNUNET_BANDWIDTH_Value32NBO bpm_out, - int32_t amount, - struct GNUNET_TIME_Relative res_delay, - uint64_t preference) -{ - struct PeerInfo *peer_info = cls; - - peer_info->info_ctx = NULL; - GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL, - &update_core_preference, peer_info); -} - -static void -update_core_preference (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct PeerInfo *peer = cls; - uint64_t preference; - unsigned int matching; - - if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) - { - return; - } - matching = - GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, - &peer->id.hashPubKey); - if (matching >= 64) - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Peer identifier matches by %u bits, only shifting as much as we can!\n", - matching); -#endif - matching = 63; - } - preference = 1LL << matching; - peer->info_ctx = - GNUNET_CORE_peer_change_preference (coreAPI, &peer->id, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_BANDWIDTH_VALUE_MAX, 0, - preference, - &update_core_preference_finish, peer); -} - -/** - * Find the closest peer in our routing table to the - * given hashcode. - * - * @return The closest peer in our routing table to the - * key, or NULL on error. - */ -static struct PeerInfo * -find_closest_peer (const GNUNET_HashCode * hc) -{ - struct PeerInfo *pos; - struct PeerInfo *current_closest; - unsigned int lowest_distance; - unsigned int temp_distance; - int bucket; - int count; - - lowest_distance = -1; - - if (k_buckets[lowest_bucket].peers_size == 0) - return NULL; - - current_closest = NULL; - for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++) - { - pos = k_buckets[bucket].head; - count = 0; - while ((pos != NULL) && (count < bucket_size)) - { - temp_distance = distance (&pos->id.hashPubKey, hc); - if (temp_distance <= lowest_distance) - { - lowest_distance = temp_distance; - current_closest = pos; - } - pos = pos->next; - count++; - } - } - GNUNET_assert (current_closest != NULL); - return current_closest; -} - - -/** - * Function called to send a request out to another peer. - * Called both for locally initiated requests and those - * received from other peers. - * - * @param msg the encapsulated message - * @param peer the peer to forward the message to - * @param msg_ctx the context of the message (hop count, bloom, etc.) - */ -static void -forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer, - struct DHT_MessageContext *msg_ctx) -{ - struct GNUNET_DHT_P2PRouteMessage *route_message; - struct P2PPendingMessage *pending; - size_t msize; - size_t psize; - char *route_path; - - increment_stats (STAT_ROUTE_FORWARDS); - GNUNET_assert (peer != NULL); - if ((msg_ctx->closest != GNUNET_YES) && - (peer == find_closest_peer (&msg_ctx->key))) - increment_stats (STAT_ROUTE_FORWARDS_CLOSEST); - - msize = - sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) + - (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - psize = sizeof (struct P2PPendingMessage) + msize; - pending = GNUNET_malloc (psize); - pending->msg = (struct GNUNET_MessageHeader *) &pending[1]; - pending->importance = msg_ctx->importance; - pending->timeout = msg_ctx->timeout; - route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg; - route_message->header.size = htons (msize); - route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE); - route_message->options = htonl (msg_ctx->msg_options); - route_message->hop_count = htonl (msg_ctx->hop_count + 1); - route_message->network_size = htonl (msg_ctx->network_size); - route_message->desired_replication_level = htonl (msg_ctx->replication); - if (msg_ctx->bloom != NULL) - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom, - route_message-> - bloomfilter, - DHT_BLOOM_SIZE)); - memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode)); - memcpy (&route_message[1], msg, ntohs (msg->size)); - if (GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) - { - route_message->outgoing_path_length = htonl (msg_ctx->path_history_len); - /* Set pointer to start of enc_msg */ - route_path = (char *) &route_message[1]; - /* Offset to the end of the enc_msg */ - route_path += ntohs (msg->size); - /* Copy the route_path after enc_msg */ - memcpy (route_path, msg_ctx->path_history, - msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - } -#if DEBUG_DHT > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Adding pending message size %d for peer %s\n", my_short_id, - "DHT", msize, GNUNET_i2s (&peer->id)); -#endif - peer->pending_count++; - increment_stats ("# pending messages scheduled"); - GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail, - pending); - if (peer->send_task == GNUNET_SCHEDULER_NO_TASK) - peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer); -} - - - - -/** - * Called when a reply needs to be sent to a client, as - * a result it found to a GET or FIND PEER request. - * - * @param client the client to send the reply to - * @param message the encapsulated message to send - * @param msg_ctx the context of the received message - */ -static void -send_reply_to_client (struct ClientList *client, - const struct GNUNET_MessageHeader *message, - struct DHT_MessageContext *msg_ctx) -{ - struct GNUNET_DHT_RouteResultMessage *reply; - struct PendingMessage *pending_message; - uint16_t msize; - size_t tsize; - char *reply_offset; - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n", - my_short_id, "DHT"); -#endif - msize = ntohs (message->size); - tsize = - sizeof (struct GNUNET_DHT_RouteResultMessage) + msize + - (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break_op (0); - return; - } - pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize); - pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1]; - reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1]; - reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT); - reply->header.size = htons (tsize); - reply->outgoing_path_length = htonl (msg_ctx->path_history_len); - reply->unique_id = GNUNET_htonll (msg_ctx->unique_id); - memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode)); - reply_offset = (char *) &reply[1]; - memcpy (&reply[1], message, msize); - if (msg_ctx->path_history_len > 0) - { - reply_offset += msize; - memcpy (reply_offset, msg_ctx->path_history, - msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - } - add_pending_message (client, pending_message); -} - -/** - * Consider whether or not we would like to have this peer added to - * our routing table. Check whether bucket for this peer is full, - * if so return negative; if not return positive. Since peers are - * only added on CORE level connect, this doesn't actually add the - * peer to the routing table. - * - * @param peer the peer we are considering adding - * - * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket - * already full) - */ -static int -consider_peer (struct GNUNET_PeerIdentity *peer) -{ - int bucket; - - if ((GNUNET_YES == - GNUNET_CONTAINER_multihashmap_contains (all_known_peers, - &peer->hashPubKey)) || - (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))) - return GNUNET_NO; /* We already know this peer (are connected even!) */ - bucket = find_current_bucket (&peer->hashPubKey); - - if ((k_buckets[bucket].peers_size < bucket_size) || - ((bucket == lowest_bucket) && (lowest_bucket > 0))) - return GNUNET_YES; - - return GNUNET_NO; -} - - -/** - * Task used to remove forwarding entries, either - * after timeout, when full, or on shutdown. - * - * @param cls the entry to remove - * @param tc context, reason, etc. - */ -static void -remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct DHTRouteSource *source_info = cls; - struct DHTQueryRecord *record; - - source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode); - record = source_info->record; - GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info); - - if (record->head == NULL) /* No more entries in DLL */ - { - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap, - &record->key, record)); - GNUNET_free (record); - } - if (source_info->find_peers_responded != NULL) - GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded); - GNUNET_free (source_info); -} - -/** - * Main function that handles whether or not to route a result - * message to other peers, or to send to our local client. - * - * @param msg the result message to be routed - * @param msg_ctx context of the message we are routing - * - * @return the number of peers the message was routed to, - * GNUNET_SYSERR on failure - */ -static int -route_result_message (struct GNUNET_MessageHeader *msg, - struct DHT_MessageContext *msg_ctx) -{ - struct GNUNET_PeerIdentity new_peer; - struct DHTQueryRecord *record; - struct DHTRouteSource *pos; - struct PeerInfo *peer_info; - const struct GNUNET_MessageHeader *hello_msg; - -#if DEBUG_DHT > 1 - unsigned int i; -#endif - - increment_stats (STAT_RESULTS); - /** - * If a find peer result message is received and contains a valid - * HELLO for another peer, offer it to the transport service. - */ - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) - { - if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader)) - GNUNET_break_op (0); - - hello_msg = &msg[1]; - if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) || - (GNUNET_SYSERR == - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg, - &new_peer))) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%s:%s Received non-HELLO message type in find peer result message!\n", - my_short_id, "DHT"); - GNUNET_break_op (0); - return GNUNET_NO; - } - else /* We have a valid hello, and peer id stored in new_peer */ - { - find_peer_context.count++; - increment_stats (STAT_FIND_PEER_REPLY); - if (GNUNET_YES == consider_peer (&new_peer)) - { - increment_stats (STAT_HELLOS_PROVIDED); - GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL); - GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL); - } - } - } - - record = - GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key); - - if (record == NULL) /* No record of this message! */ - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Have no record of response key %s uid %llu\n", - my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id); -#endif - return 0; - } - - pos = record->head; - while (pos != NULL) - { - if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */ - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Sending response key %s uid %llu to client\n", - my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id); -#endif - increment_stats (STAT_RESULTS_TO_CLIENT); - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT) - increment_stats (STAT_GET_REPLY); -#if DEBUG_DHT > 1 - for (i = 0; i < msg_ctx->path_history_len; i++) - { - char *path_offset; - - path_offset = - &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)]; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "(before client) Key %s Found peer %d:%s\n", - GNUNET_h2s (&msg_ctx->key), i, - GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset)); - } -#endif - send_reply_to_client (pos->client, msg, msg_ctx); - } - else /* Send to peer */ - { - peer_info = find_peer_by_id (&pos->source); - if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */ - { - pos = pos->next; - continue; - } -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", - my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id, GNUNET_i2s (&peer_info->id)); -#endif - forward_result_message (msg, peer_info, msg_ctx); - /* Try removing forward entries after sending once, only allows ONE response per request */ - if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (pos->delete_task); - pos->delete_task = - GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos); - } - } - pos = pos->next; - } - return 0; +inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have) +{ + if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0) + return 1; /* Never return 0! */ + return ((unsigned int) -1) - distance (target, have); } +/* Forward declaration */ +static void +update_core_preference (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + /** - * Main function that handles whether or not to route a message to other - * peers. + * Function called with statistics about the given peer. * - * @param msg the message to be routed - * @param msg_ctx the context containing all pertinent information about the message + * @param cls closure + * @param peer identifies the peer + * @param bpm_out set to the current bandwidth limit (sending) for this peer + * @param amount set to the amount that was actually reserved or unreserved; + * either the full requested amount or zero (no partial reservations) + * @param res_delay if the reservation could not be satisfied (amount was 0), how + * long should the client wait until re-trying? + * @param preference current traffic preference for the given peer */ static void -route_message (const struct GNUNET_MessageHeader *msg, - struct DHT_MessageContext *msg_ctx); +update_core_preference_finish (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_BANDWIDTH_Value32NBO bpm_out, + int32_t amount, + struct GNUNET_TIME_Relative res_delay, + uint64_t preference) +{ + struct PeerInfo *peer_info = cls; + peer_info->info_ctx = NULL; + GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL, + &update_core_preference, peer_info); +} -/** - * Server handler for all dht get requests, look for data, - * if found, send response either to clients or other peers. - * - * @param msg the actual get message - * @param msg_ctx struct containing pertinent information about the get request - * - * @return number of items found for GET request - */ -static unsigned int -handle_dht_get (const struct GNUNET_MessageHeader *msg, - struct DHT_MessageContext *msg_ctx) +static void +update_core_preference (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - const struct GNUNET_DHT_GetMessage *get_msg; - uint16_t msize; - uint16_t bf_size; - unsigned int results; - const char *end; - enum GNUNET_BLOCK_Type type; - - msize = ntohs (msg->size); - if (msize < sizeof (struct GNUNET_DHT_GetMessage)) - { - GNUNET_break (0); - return 0; - } - get_msg = (const struct GNUNET_DHT_GetMessage *) msg; - bf_size = ntohs (get_msg->bf_size); - msg_ctx->xquery_size = ntohs (get_msg->xquery_size); - msg_ctx->reply_bf_mutator = get_msg->bf_mutator; - if (msize != - sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size) - { - GNUNET_break_op (0); - return 0; - } - end = (const char *) &get_msg[1]; - if (msg_ctx->xquery_size == 0) - { - msg_ctx->xquery = NULL; - } - else - { - msg_ctx->xquery = (const void *) end; - end += msg_ctx->xquery_size; - } - if (bf_size == 0) + struct PeerInfo *peer = cls; + uint64_t preference; + unsigned int matching; + + if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) { - msg_ctx->reply_bf = NULL; + return; } - else + matching = + GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, + &peer->id.hashPubKey); + if (matching >= 64) { - msg_ctx->reply_bf = - GNUNET_CONTAINER_bloomfilter_init (end, bf_size, - GNUNET_DHT_GET_BLOOMFILTER_K); - } - type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type); -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n", - my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id); -#endif - increment_stats (STAT_GETS); - results = 0; - msg_ctx->do_forward = GNUNET_YES; #if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Found %d results for `%s' request uid %llu\n", - my_short_id, "DHT", results, "GET", msg_ctx->unique_id); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Peer identifier matches by %u bits, only shifting as much as we can!\n", + matching); #endif - if (results >= 1) - { - } - else - { - /* check query valid */ - if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID == - GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key, - &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator, - msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0)) - { - GNUNET_break_op (0); - msg_ctx->do_forward = GNUNET_NO; - } + matching = 63; } - - if (msg_ctx->do_forward == GNUNET_YES) - route_message (msg, msg_ctx); - GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf); - return results; + preference = 1LL << matching; + peer->info_ctx = + GNUNET_CORE_peer_change_preference (coreAPI, &peer->id, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_BANDWIDTH_VALUE_MAX, 0, + preference, + &update_core_preference_finish, peer); } -static void -remove_recent_find_peer (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - GNUNET_HashCode *key = cls; - - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove - (recent_find_peer_requests, key, NULL)); - GNUNET_free (key); -} - /** * Server handler for initiating local dht find peer requests @@ -1571,592 +593,6 @@ handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg, } -/** - * Server handler for initiating local dht put requests - * - * @param msg the actual put message - * @param msg_ctx struct containing pertinent information about the request - */ -static void -handle_dht_put (const struct GNUNET_MessageHeader *msg, - struct DHT_MessageContext *msg_ctx) -{ - const struct GNUNET_DHT_PutMessage *put_msg; - struct DHTPutEntry *put_entry; - unsigned int put_size; - char *path_offset; - enum GNUNET_BLOCK_Type put_type; - size_t data_size; - int ret; - GNUNET_HashCode key; - struct DHTQueryRecord *record; - - GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage)); - - put_msg = (const struct GNUNET_DHT_PutMessage *) msg; - put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type); - data_size = - ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); - ret = - GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size, - &key); - if (GNUNET_NO == ret) - { - /* invalid reply */ - GNUNET_break_op (0); - return; - } - if ((GNUNET_YES == ret) && - (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode)))) - { - /* invalid wrapper: key mismatch! */ - GNUNET_break_op (0); - return; - } - /* ret == GNUNET_SYSERR means that there is no known relationship between - * data and the key, so we cannot check it */ -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n", - my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id); -#endif - - record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, - &msg_ctx->key); - if (NULL != record) - { - struct DHTRouteSource *pos; - struct GNUNET_DHT_GetResultMessage *get_result; - struct DHT_MessageContext new_msg_ctx; - size_t get_size; - - pos = record->head; - while (pos != NULL) - { - /* TODO: do only for local started requests? or also for remote peers? */ - /* TODO: include this in statistics? under what? */ - /* TODO: reverse order of path_history? */ - if (NULL == pos->client) - { - pos = pos->next; - continue; - } - - memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext)); - if (GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) - { - new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE; - } - - get_size = - sizeof (struct GNUNET_DHT_GetResultMessage) + data_size + - (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - get_result = GNUNET_malloc (get_size); - get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); - get_result->header.size = htons (get_size); - get_result->expiration = put_msg->expiration; - get_result->type = put_msg->type; - get_result->put_path_length = htons (msg_ctx->path_history_len); - - /* Copy the actual data and the path_history to the end of the get result */ - memcpy (&get_result[1], &put_msg[1], data_size); - path_offset = (char *) &get_result[1]; - path_offset += data_size; - memcpy (path_offset, msg_ctx->path_history, - msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - new_msg_ctx.peer = my_identity; - new_msg_ctx.bloom = NULL; - new_msg_ctx.hop_count = 0; - /* Make result routing a higher priority */ - new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; - new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT; - new_msg_ctx.unique_id = pos->uid; - send_reply_to_client(pos->client, &get_result->header, &new_msg_ctx); - GNUNET_free (get_result); - pos = pos->next; - } - } - - if (msg_ctx->closest != GNUNET_YES) - { - route_message (msg, msg_ctx); - return; - } - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n", - my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id); -#endif - - increment_stats (STAT_PUTS_INSERTED); - - route_message (msg, msg_ctx); -} - - -/** - * To how many peers should we (on average) - * forward the request to obtain the desired - * target_replication count (on average). - * - * returns: target_replication / (est. hops) + (target_replication * hop_count) - * where est. hops is typically 2 * the routing table depth - * - * @param hop_count number of hops the message has traversed - * @param target_replication the number of total paths desired - * - * @return Some number of peers to forward the message to - */ -static unsigned int -get_forward_count (unsigned int hop_count, size_t target_replication) -{ - uint32_t random_value; - unsigned int forward_count; - float target_value; - - if (hop_count > log_of_network_size_estimate * 4.0) - { - /* forcefully terminate */ - return 0; - } - - if (hop_count > log_of_network_size_estimate * 2.0) - { - /* keep forwarding, but no more replication */ - return 1; - } - - target_value = - 1 + (target_replication - 1.0) / (log_of_network_size_estimate + - ((float) (target_replication - 1.0) * - hop_count)); - /* Set forward count to floor of target_value */ - forward_count = (unsigned int) target_value; - /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */ - target_value = target_value - forward_count; - random_value = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); - if (random_value < (target_value * UINT32_MAX)) - forward_count++; - return forward_count; -} - - -/** - * Check whether my identity is closer than any known peers. - * If a non-null bloomfilter is given, check if this is the closest - * peer that hasn't already been routed to. - * - * @param target hash code to check closeness to - * @param bloom bloomfilter, exclude these entries from the decision - * @return GNUNET_YES if node location is closest, - * GNUNET_NO otherwise. - */ -static int -am_closest_peer (const GNUNET_HashCode * target, - struct GNUNET_CONTAINER_BloomFilter *bloom) -{ - int bits; - int other_bits; - int bucket_num; - int count; - struct PeerInfo *pos; - unsigned int my_distance; - - if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode))) - return GNUNET_YES; - - bucket_num = find_current_bucket (target); - - bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target); - my_distance = distance (&my_identity.hashPubKey, target); - pos = k_buckets[bucket_num].head; - count = 0; - while ((pos != NULL) && (count < bucket_size)) - { - if ((bloom != NULL) && - (GNUNET_YES == - GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))) - { - pos = pos->next; - continue; /* Skip already checked entries */ - } - - other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target); - if (other_bits > bits) - return GNUNET_NO; - else if (other_bits == bits) /* We match the same number of bits */ - { - if (distance (&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */ - return GNUNET_NO; - } - pos = pos->next; - } - - /* No peers closer, we are the closest! */ - return GNUNET_YES; -} - - -/** - * Select a peer from the routing table that would be a good routing - * destination for sending a message for "target". The resulting peer - * must not be in the set of blocked peers.

- * - * Note that we should not ALWAYS select the closest peer to the - * target, peers further away from the target should be chosen with - * exponentially declining probability. - * - * @param target the key we are selecting a peer to route to - * @param bloom a bloomfilter containing entries this request has seen already - * @param hops how many hops has this message traversed thus far - * - * @return Peer to route to, or NULL on error - */ -static struct PeerInfo * -select_peer (const GNUNET_HashCode * target, - struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops) -{ - unsigned int bc; - unsigned int count; - unsigned int selected; - struct PeerInfo *pos; - unsigned int distance; - unsigned int largest_distance; - struct PeerInfo *chosen; - - if (hops >= log_of_network_size_estimate) - { - /* greedy selection (closest peer that is not in bloomfilter) */ - largest_distance = 0; - chosen = NULL; - for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) - { - pos = k_buckets[bc].head; - count = 0; - while ((pos != NULL) && (count < bucket_size)) - { - /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */ - if (GNUNET_NO == - GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) - { - distance = inverse_distance (target, &pos->id.hashPubKey); - if (distance > largest_distance) - { - chosen = pos; - largest_distance = distance; - } - } - count++; - pos = pos->next; - } - } - if ((largest_distance > 0) && (chosen != NULL)) - { - GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey); - return chosen; - } - return NULL; /* no peer available or we are the closest */ - } - - - /* select "random" peer */ - /* count number of peers that are available and not filtered */ - count = 0; - for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) - { - pos = k_buckets[bc].head; - while ((pos != NULL) && (count < bucket_size)) - { - if (GNUNET_YES == - GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) - { - pos = pos->next; - increment_stats ("# peer blocked from selection by Bloom filter"); - continue; /* Ignore bloomfiltered peers */ - } - count++; - pos = pos->next; - } - } - if (count == 0) /* No peers to select from! */ - { - increment_stats ("# failed to select peer"); - return NULL; - } - /* Now actually choose a peer */ - selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count); - count = 0; - for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) - { - pos = k_buckets[bc].head; - while ((pos != NULL) && (count < bucket_size)) - { - if (GNUNET_YES == - GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) - { - pos = pos->next; - continue; /* Ignore bloomfiltered peers */ - } - if (0 == selected--) - return pos; - pos = pos->next; - } - } - GNUNET_break (0); - return NULL; -} - - -/** - * Remember this routing request so that if a reply is - * received we can either forward it to the correct peer - * or return the result locally. - * - * @param msg_ctx Context of the route request - * - * @return GNUNET_YES if this response was cached, GNUNET_NO if not - */ -static int -cache_response (struct DHT_MessageContext *msg_ctx) -{ - struct DHTQueryRecord *record; - struct DHTRouteSource *source_info; - struct DHTRouteSource *pos; - struct GNUNET_TIME_Absolute now; - unsigned int current_size; - - current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap); - - while (current_size >= MAX_OUTSTANDING_FORWARDS) - { - source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap); - GNUNET_assert (source_info != NULL); - record = source_info->record; - GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info); - if (record->head == NULL) /* No more entries in DLL */ - { - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap, - &record->key, - record)); - GNUNET_free (record); - } - if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (source_info->delete_task); - source_info->delete_task = GNUNET_SCHEDULER_NO_TASK; - } - if (source_info->find_peers_responded != NULL) - GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded); - GNUNET_free (source_info); - current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap); - } - - /** Non-local request and have too many outstanding forwards, discard! */ - if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL)) - return GNUNET_NO; - - now = GNUNET_TIME_absolute_get (); - record = - GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key); - if (record != NULL) /* Already know this request! */ - { - pos = record->head; - while (pos != NULL) - { - if (0 == - memcmp (&msg_ctx->peer, &pos->source, - sizeof (struct GNUNET_PeerIdentity))) - break; /* Already have this peer in reply list! */ - pos = pos->next; - } - if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */ - { - GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode, - now.abs_value); - return GNUNET_NO; - } - } - else - { - record = GNUNET_malloc (sizeof (struct DHTQueryRecord)); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap, - &msg_ctx->key, record, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode)); - } - - source_info = GNUNET_malloc (sizeof (struct DHTRouteSource)); - source_info->record = record; - source_info->delete_task = - GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry, - source_info); - source_info->find_peers_responded = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - source_info->source = msg_ctx->peer; - GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail, - source_info); - if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */ - { - source_info->client = msg_ctx->client; - now = GNUNET_TIME_absolute_get_forever (); - } - source_info->hnode = - GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info, - now.abs_value); - source_info->uid = msg_ctx->unique_id; -#if DEBUG_DHT > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Created new forward source info for %s uid %llu\n", - my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key), - msg_ctx->unique_id); -#endif - return GNUNET_YES; -} - - -/** - * Main function that handles whether or not to route a message to other - * peers. - * - * @param msg the message to be routed - * @param msg_ctx the context containing all pertinent information about the message - */ -static void -route_message (const struct GNUNET_MessageHeader *msg, - struct DHT_MessageContext *msg_ctx) -{ - int i; - struct PeerInfo *selected; - unsigned int target_forward_count; - unsigned int forward_count; - struct RecentRequest *recent_req; - char *stat_forward_count; - char *temp_stat_str; - - increment_stats (STAT_ROUTES); - target_forward_count = - get_forward_count (msg_ctx->hop_count, msg_ctx->replication); - GNUNET_asprintf (&stat_forward_count, "# forward counts of %d", - target_forward_count); - increment_stats (stat_forward_count); - GNUNET_free (stat_forward_count); - if (msg_ctx->bloom == NULL) - msg_ctx->bloom = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - - if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT) - { - recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); - GNUNET_assert (recent_req != NULL); - GNUNET_SCHEDULER_cancel (recent_req->remove_task); - GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); - GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom); - GNUNET_free (recent_req); - } - - recent_req = GNUNET_malloc (sizeof (struct RecentRequest)); - recent_req->uid = msg_ctx->unique_id; - memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode)); - recent_req->heap_node = - GNUNET_CONTAINER_heap_insert (recent_heap, recent_req, - GNUNET_TIME_absolute_get ().abs_value); - recent_req->bloom = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - - forward_count = 0; - for (i = 0; i < target_forward_count; i++) - { - selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count); - if (selected == NULL) - break; - forward_count++; - if (GNUNET_CRYPTO_hash_matching_bits - (&selected->id.hashPubKey, - &msg_ctx->key) >= - GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, - &msg_ctx->key)) - GNUNET_asprintf (&temp_stat_str, - "# requests routed to close(r) peer hop %u", - msg_ctx->hop_count); - else - GNUNET_asprintf (&temp_stat_str, - "# requests routed to less close peer hop %u", - msg_ctx->hop_count); - if (temp_stat_str != NULL) - { - increment_stats (temp_stat_str); - GNUNET_free (temp_stat_str); - } - GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom, - &selected->id.hashPubKey); - forward_message (msg, selected, msg_ctx); - } - - if (msg_ctx->bloom != NULL) - { - GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom, - DHT_BLOOM_SIZE); - GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); - msg_ctx->bloom = NULL; - } -} - - -/** - * Main function that handles whether or not to route a message to other - * peers. - * - * @param msg the message to be routed - * @param msg_ctx the context containing all pertinent information about the message - */ -static void -demultiplex_message (const struct GNUNET_MessageHeader *msg, - struct DHT_MessageContext *msg_ctx) -{ - /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */ - msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom); - - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */ - cache_response (msg_ctx); - handle_dht_get (msg, msg_ctx); - break; - case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. */ - increment_stats (STAT_PUTS); - handle_dht_put (msg, msg_ctx); - break; - case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */ - increment_stats (STAT_FIND_PEER); - if (((msg_ctx->hop_count > 0) && - (0 != - memcmp (&msg_ctx->peer, &my_identity, - sizeof (struct GNUNET_PeerIdentity)))) || - (msg_ctx->client != NULL)) - { - cache_response (msg_ctx); - if ((msg_ctx->closest == GNUNET_YES) || - (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) - handle_dht_find_peer (msg, msg_ctx); - } - else - route_message (msg, msg_ctx); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "`%s': Message type (%d) not handled, forwarding anyway!\n", - "DHT", ntohs (msg->type)); - route_message (msg, msg_ctx); - } -} - /** * Receive the HELLO from transport service, @@ -2188,9 +624,6 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message) static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - int bucket_count; - struct PeerInfo *pos; - if (NULL != ghh) { GNUNET_TRANSPORT_get_hello_cancel (ghh); @@ -2204,20 +637,9 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } GDS_NEIGHBOURS_done (); GDS_DATACACHE_done (); + GDS_ROUTING_done (); + GDS_CLIENT_done (); GDS_NSE_done (); - for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++) - { - while (k_buckets[bucket_count].head != NULL) - { - pos = k_buckets[bucket_count].head; -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Removing peer %s from bucket %d!\n", my_short_id, - "DHT", GNUNET_i2s (&pos->id), bucket_count); -#endif - delete_peer (pos, bucket_count); - } - } if (stats != NULL) { GNUNET_STATISTICS_destroy (stats, GNUNET_YES); @@ -2310,30 +732,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, int main (int argc, char *const *argv) { - int ret; - struct RecentRequest *recent_req; - - recent_heap = - GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - recent_find_peer_requests = - GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8); - ret = - (GNUNET_OK == - GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run, - NULL)) ? 0 : 1; - while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0) - { - recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); - GNUNET_assert (recent_req != NULL); - GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); - GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom); - GNUNET_free (recent_req); - } - GNUNET_CONTAINER_heap_destroy (recent_heap); - recent_heap = NULL; - GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests); - recent_find_peer_requests = NULL; - return ret; + return (GNUNET_OK == + GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run, + NULL)) ? 0 : 1; } /* end of gnunet-service-dht.c */ diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index 95a0d68d0..8790d8fbb 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -177,10 +177,9 @@ struct ClientQueryRecord uint32_t msg_options; /** - * The type for the data for the GET request; actually an 'enum - * GNUNET_BLOCK_Type'. + * The type for the data for the GET request. */ - uint32_t msg_type; + enum GNUNET_BLOCK_Type msg_type; }; @@ -662,7 +661,7 @@ struct ForwardReplyContext /** * Type of the data. */ - uint32_t type; + enum GNUNET_BLOCK_Type type; /** * Number of bytes in data. @@ -795,7 +794,7 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration, const struct GNUNET_PeerIdentity *get_path, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, - uint32_t type, + enum GNUNET_BLOCK_Type type, size_t data_size, const void *data) { diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h index db4f0b9fe..0bb548f71 100644 --- a/src/dht/gnunet-service-dht_clients.h +++ b/src/dht/gnunet-service-dht_clients.h @@ -50,7 +50,7 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration, const struct GNUNET_PeerIdentity *get_path, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, - uint32_t type, + enum GNUNET_BLOCK_Type type, size_t data_size, const void *data); diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c index b2dd05ac9..2c1a3fe20 100644 --- a/src/dht/gnunet-service-dht_datacache.c +++ b/src/dht/gnunet-service-dht_datacache.c @@ -72,7 +72,7 @@ GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration, const GNUNET_HashCode *key, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, - uint32_t type, + enum GNUNET_BLOCK_Type type, size_t data_size, const void *data) { @@ -268,7 +268,7 @@ struct GetRequestContext */ void GDS_DATACACHE_handle_get (const GNUNET_HashCode *key, - uint32_t type, + enum GNUNET_BLOCK_Type type, const void *xquery, size_t xquery_size, struct GNUNET_CONTAINER_BloomFilter **reply_bf, diff --git a/src/dht/gnunet-service-dht_datacache.h b/src/dht/gnunet-service-dht_datacache.h index 0501e9e4c..ecb3a24a1 100644 --- a/src/dht/gnunet-service-dht_datacache.h +++ b/src/dht/gnunet-service-dht_datacache.h @@ -44,7 +44,7 @@ GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration, const GNUNET_HashCode *key, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, - uint32_t type, + enum GNUNET_BLOCK_Type type, size_t data_size, const void *data); @@ -61,7 +61,7 @@ GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration, */ void GDS_DATACACHE_handle_get (const GNUNET_HashCode *key, - uint32_t type, + enum GNUNET_BLOCK_Type type, const void *xquery, size_t xquery_size, struct GNUNET_CONTAINER_BloomFilter **reply_bf, diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index b7cc2048e..425411b75 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -38,6 +38,7 @@ #include "gnunet_statistics_service.h" #include "dht.h" #include "gnunet-service-dht_datacache.h" +#include "gnunet-service-dht_routing.h" #include /** @@ -138,6 +139,11 @@ struct PeerResultMessage */ uint32_t get_path_length GNUNET_PACKED; + /** + * When does the content expire? + */ + struct GNUNET_TIME_AbsoluteNBO expiration_time; + /** * The key of the corresponding GET request. */ @@ -582,8 +588,6 @@ process_peer_queue (struct PeerInfo *peer) * To how many peers should we (on average) forward the request to * obtain the desired target_replication count (on average). * - * FIXME: double-check that this is fine - * * @param hop_count number of hops the message has traversed * @param target_replication the number of total paths desired * @return Some number of peers to forward the message to @@ -596,14 +600,19 @@ get_forward_count (uint32_t hop_count, uint32_t forward_count; float target_value; - /* bound by system-wide maximum */ - target_replication = GNUNET_MIN (16 /* FIXME: use named constant */, - target_replication); + if (hop_count > log_of_network_size_estimate * 4.0) + { + /* forcefully terminate */ + return 0; + } if (hop_count > log_of_network_size_estimate * 2.0) { /* Once we have reached our ideal number of hops, only forward to 1 peer */ return 1; } + /* bound by system-wide maximum */ + target_replication = GNUNET_MIN (16 /* FIXME: use named constant */, + target_replication); target_value = 1 + (target_replication - 1.0) / (log_of_network_size_estimate + ((float) (target_replication - 1.0) * @@ -847,8 +856,8 @@ get_target_peers (const GNUNET_HashCode *key, * @param data_size number of bytes in data */ void -GDS_NEIGHBOURS_handle_put (uint32_t type, - uint32_t options, +GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, + enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, GNUNET_TIME_Absolute expiration_time, uint32_t hop_count, @@ -936,8 +945,8 @@ GDS_NEIGHBOURS_handle_put (uint32_t type, * @param peer_bf filter for peers not to select (again) */ void -GDS_NEIGHBOURS_handle_get (uint32_t type, - uint32_t options, +GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, + enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, uint32_t hop_count, const GNUNET_HashCode *key, @@ -969,6 +978,7 @@ GDS_NEIGHBOURS_handle_get (uint32_t type, GNUNET_break (0); return; } + /* forward request */ for (i=0;i= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + (get_path_length + put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || + (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) ) + { + GNUNET_break (0); + return; + } + pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers, + &target->hashPubKey); + if (NULL == pi) + { + /* peer disconnected in the meantime, drop reply */ + return; + } + pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); + pending->importance = 0; /* FIXME */ + pending->timeout = expiration_time; + prm = (struct PeerResultMessage*) &pending[1]; + pending->msg = &prm->header; + prm->header.size = htons (msize); + prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT); + prm->type = htonl (type); + prm->put_path_length = htonl (put_path_length); + prm->get_path_length = htonl (get_path_length); + prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); + prm->key = *key; + paths = (struct GNUNET_PeerIdentity) &prm[1]; + memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity)); + memcpy (&paths[put_path_length], + get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity)); + memcpy (&paths[put_path_length + get_path_length], + data, data_size); + GNUNET_CONTAINER_DLL_insert (target->head, + target->tail, + pending); + target->pending_count++; + process_peer_queue (target); } @@ -1281,79 +1335,91 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { - // 1) validate GET - // 2) store in routing table - // 3) check options (i.e. FIND PEER) - // 4) local lookup (=> need eval result!) - // 5) p2p forwarding - - - struct GNUNET_DHT_P2PRouteMessage *incoming = - (struct GNUNET_DHT_P2PRouteMessage *) message; - struct GNUNET_MessageHeader *enc_msg = - (struct GNUNET_MessageHeader *) &incoming[1]; - struct DHT_MessageContext *msg_ctx; - char *route_path; - int path_size; - - // FIXME - if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) + struct PeerGetMessage *get; + uint32_t xquery_size; + size_t reply_bf_size; + uint16_t msize; + enum GNUNET_BLOCK_Type type; + enum GNUNET_DHT_RouteOption options; + enum GNUNET_BLOCK_EvaluationResult eval; + struct GNUNET_CONTAINER_BloomFilter *reply_bf; + struct GNUNET_CONTAINER_BloomFilter *peer_bf; + const char *xquery; + + /* parse and validate message */ + msize = ntohs (message->size); + if (msize < sizeof (struct PeerGetMessage)) { GNUNET_break_op (0); return GNUNET_YES; } - - if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) + get = (struct PeerGetMessage *) message; + xquery_size = ntohl (get->xquery_size); + if (msize < sizeof (struct PeerGetMessage) + xquery_size) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending of previous replies took too long, backing off!\n"); - increment_stats ("# route requests dropped due to high load"); - decrease_max_send_delay (get_max_send_delay ()); + GNUNET_break_op (0); return GNUNET_YES; } - msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); - msg_ctx->bloom = - GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, - DHT_BLOOM_K); - GNUNET_assert (msg_ctx->bloom != NULL); - msg_ctx->hop_count = ntohl (incoming->hop_count); - memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); - msg_ctx->replication = ntohl (incoming->desired_replication_level); - msg_ctx->msg_options = ntohl (incoming->options); - if (GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) - { - path_size = - ntohl (incoming->outgoing_path_length) * - sizeof (struct GNUNET_PeerIdentity); - if (ntohs (message->size) != - (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) + - path_size)) - { - GNUNET_break_op (0); - GNUNET_free (msg_ctx); - return GNUNET_YES; - } - route_path = (char *) &incoming[1]; - route_path = route_path + ntohs (enc_msg->size); - msg_ctx->path_history = - GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size); - memcpy (msg_ctx->path_history, route_path, path_size); - memcpy (&msg_ctx->path_history[path_size], &my_identity, - sizeof (struct GNUNET_PeerIdentity)); - msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1; - } - msg_ctx->network_size = ntohl (incoming->network_size); - msg_ctx->peer = *peer; - msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE; - msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; - demultiplex_message (enc_msg, msg_ctx); - if (msg_ctx->bloom != NULL) + reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size); + type = ntohl (get->type); + options = ntohl (get->options); + xquery = (const char*) &get[1]; + reply_bf = NULL; + if (reply_bf_size > 0) + reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], + reply_bf_size, + GNUNET_DHT_GET_BLOOMFILTER_K); + eval = GNUNET_BLOCK_evaluate (block_context, + type, + &get->key, + &reply_bf, + get->bf_mutator, + xquery, xquery_size, + NULL, 0); + if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID) { - GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); - msg_ctx->bloom = NULL; + /* request invalid or block type not supported */ + GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED); + if (NULL != reply_bf) + GNUNET_CONTAINER_bloomfilter_free (reply_bf); + return GNUNET_YES; } - GNUNET_free (msg_ctx); + peer_bf = + GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, + DHT_BLOOM_SIZE, + DHT_BLOOM_K); + + /* remember request for routing replies */ + GDS_ROUTING_add (peer, + type, + &get->key, + xquery, xquery_size, + reply_bf, get->reply_bf_mutator); + /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */ + + /* local lookup (this may update the reply_bf) */ + GDS_DATACACHE_handle_get (&get->key, + type, + xquery, xquery_size, + &reply_bf, + get->reply_bf_mutator); + /* FIXME: should track if the local lookup resulted in a + definitive result and then NOT do P2P forwarding */ + + /* P2P forwarding */ + GDS_NEIGHBOURS_handle_get (type, + options, + ntohl (get->desired_replication_level), + ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */ + &get->key, + xquery, xquery_size, + reply_bf, + get->reply_bf_mutator, + peer_bf); + /* clean up */ + if (NULL != reply_bf) + GNUNET_CONTAINER_bloomfilter_free (reply_bf); + GNUNET_CONTAINER_bloomfilter_free (peer_bf); return GNUNET_YES; } @@ -1365,7 +1431,7 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, * @param message message * @param peer peer identity this notification is about * @param atsi performance data - * + * @return GNUNET_YES (do not cut p2p connection) */ static int handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, @@ -1373,6 +1439,7 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { + // FIXME! // 1) validate result format // 2) append 'peer' to put path // 3) forward to local clients @@ -1383,38 +1450,8 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, (struct GNUNET_MessageHeader *) &incoming[1]; struct DHT_MessageContext msg_ctx; - // FIXME - if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) - { - GNUNET_break_op (0); - return GNUNET_YES; - } - memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); - memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode)); - msg_ctx.msg_options = ntohl (incoming->options); - msg_ctx.hop_count = ntohl (incoming->hop_count); - msg_ctx.peer = *peer; - msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */ - msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT; - if ((GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) && - (ntohl (incoming->outgoing_path_length) > 0)) - { - if (ntohs (message->size) - - sizeof (struct GNUNET_DHT_P2PRouteResultMessage) - - ntohs (enc_msg->size) != - ntohl (incoming->outgoing_path_length) * - sizeof (struct GNUNET_PeerIdentity)) - { - GNUNET_break_op (0); - return GNUNET_NO; - } - msg_ctx.path_history = (char *) &incoming[1]; - msg_ctx.path_history += ntohs (enc_msg->size); - msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length); - } - route_result_message (enc_msg, &msg_ctx); + return GNUNET_YES; } diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h index 2c20df2c4..70723deac 100644 --- a/src/dht/gnunet-service-dht_neighbours.h +++ b/src/dht/gnunet-service-dht_neighbours.h @@ -48,8 +48,8 @@ * @param data_size number of bytes in data */ void -GDS_NEIGHBOURS_handle_put (uint32_t type, - uint32_t options, +GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, + enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, GNUNET_TIME_Absolute expiration_time, uint32_t hop_count, @@ -79,8 +79,8 @@ GDS_NEIGHBOURS_handle_put (uint32_t type, * @param peer_bf filter for peers not to select (again) */ void -GDS_NEIGHBOURS_handle_get (uint32_t type, - uint32_t options, +GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, + enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, uint32_t hop_count, const GNUNET_HashCode *key, @@ -96,6 +96,7 @@ GDS_NEIGHBOURS_handle_get (uint32_t type, * other peers waiting for it. Does not do local caching or * forwarding to local clients. * + * @param target neighbour that should receive the block (if still connected) * @param type type of the block * @param expiration_time when does the content expire * @param key key for the content @@ -107,7 +108,8 @@ GDS_NEIGHBOURS_handle_get (uint32_t type, * @param data_size number of bytes in data */ void -GDS_NEIGHBOURS_handle_reply (uint32_t type, +GDS_NEIGHBOURS_handle_reply (const GNUNET_PeerIdentity *target, + enum GNUNET_BLOCK_Type type, GNUNET_TIME_Absolute expiration_time, const GNUNET_HashCode *key, unsigned int put_path_length, diff --git a/src/dht/gnunet-service-dht_nse.c b/src/dht/gnunet-service-dht_nse.c index 4711c9c31..0715465f5 100644 --- a/src/dht/gnunet-service-dht_nse.c +++ b/src/dht/gnunet-service-dht_nse.c @@ -58,21 +58,21 @@ update_network_size_estimate (void *cls, struct GNUNET_TIME_Absolute timestamp, double -GDS_nse_get () +GDS_NSE_get () { return log_of_network_size_estimate; } void -GDS_nse_init () +GDS_NSE_init () { nse = GNUNET_NSE_connect (GDS_cfg, &update_network_size_estimate, NULL); } void -GDS_nse_done () +GDS_NSE_done () { if (NULL != nse) { diff --git a/src/dht/gnunet-service-dht_nse.h b/src/dht/gnunet-service-dht_nse.h index 4642d4d9c..e2f73a9dd 100644 --- a/src/dht/gnunet-service-dht_nse.h +++ b/src/dht/gnunet-service-dht_nse.h @@ -28,13 +28,13 @@ double -GDS_nse_get (void); +GDS_NSE_get (void); void -GDS_nse_init (void); +GDS_NSE_init (void); void -GDS_nse_done (void); +GDS_NSE_done (void); #endif diff --git a/src/dht/gnunet-service-dht_routing.c b/src/dht/gnunet-service-dht_routing.c new file mode 100644 index 000000000..535a63267 --- /dev/null +++ b/src/dht/gnunet-service-dht_routing.c @@ -0,0 +1,211 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file dht/gnunet-service-dht_routing.c + * @brief GNUnet DHT tracking of requests for routing replies + * @author Christian Grothoff + */ + +#include "gnunet-service-dht_routing.h" + + +/** + * Number of requests we track at most (for routing replies). + */ +#define DHT_MAX_RECENT (1024 * 16) + + +/** + * Information we keep about all recent GET requests + * so that we can route replies. + */ +struct RecentRequest +{ + + /** + * The peer this request was received from. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Position of this node in the min heap. + */ + struct GNUNET_CONTAINER_HeapNode *heap_node; + + /** + * Bloomfilter for replies to drop. + */ + struct GNUNET_CONTAINER_BloomFilter *reply_bf; + + /** + * Timestamp of this request, for ordering + * the min heap. + */ + struct GNUNET_TIME_Absolute timestamp; + + /** + * Type of the requested block. + */ + enum GNUNET_BLOCK_Type type; + + /** + * extended query (see gnunet_block_lib.h). Allocated at the + * end of this struct. + */ + const void *xquery; + + /** + * Number of bytes in xquery. + */ + size_t xquery_size; + + /** + * Mutator value for the reply_bf, see gnunet_block_lib.h + */ + uint32_t reply_bf_mutator; + + /** + * Key of this request. + */ + GNUNET_HashCode key; + +}; + + +/** + * Recent requests by time inserted. + */ +static struct GNUNET_CONTAINER_Heap *recent_heap; + +/** + * Recently seen requests by key. + */ +static struct GNUNET_CONTAINER_MultiHashMap *recent_map; + + +/** + * Handle a reply (route to origin). Only forwards the reply back to + * other peers waiting for it. Does not do local caching or + * forwarding to local clients. Essentially calls + * GDS_NEIGHBOURS_handle_reply for all peers that sent us a matching + * request recently. + * + * @param type type of the block + * @param expiration_time when does the content expire + * @param key key for the content + * @param put_path_length number of entries in put_path + * @param put_path peers the original PUT traversed (if tracked) + * @param get_path_length number of entries in put_path + * @param get_path peers this reply has traversed so far (if tracked) + * @param data payload of the reply + * @param data_size number of bytes in data + */ +void +GDS_ROUTING_process (enum GNUNET_BLOCK_Type type, + GNUNET_TIME_Absolute expiration_time, + const GNUNET_HashCode *key, + unsigned int put_path_length, + struct GNUNET_PeerIdentity *put_path, + unsigned int get_path_length, + struct GNUNET_PeerIdentity *get_path, + const void *data, + size_t data_size) +{ +} + + +/** + * Add a new entry to our routing table. + * + * @param sender peer that originated the request + * @param type type of the block + * @param key key for the content + * @param xquery extended query + * @param xquery_size number of bytes in xquery + * @param reply_bf bloomfilter to filter duplicates + * @param reply_bf_mutator mutator for reply_bf +*/ +void +GDS_ROUTING_add (const GNUNET_PeerIdentity *sender, + enum GNUNET_BLOCK_Type type, + const GNUNET_HashCode *key, + const void *xquery, + size_t xquery_size, + const struct GNUNET_CONTAINER_BloomFilter *reply_bf, + uint32_t reply_bf_mutator) +{ + if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT) + { + recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); + GNUNET_assert (recent_req != NULL); + GNUNET_SCHEDULER_cancel (recent_req->remove_task); + GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); + GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom); + GNUNET_free (recent_req); + } + + recent_req = GNUNET_malloc (sizeof (struct RecentRequest)); + recent_req->uid = msg_ctx->unique_id; + memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode)); + recent_req->heap_node = + GNUNET_CONTAINER_heap_insert (recent_heap, recent_req, + GNUNET_TIME_absolute_get ().abs_value); + recent_req->bloom = + GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); + + +} + + +/** + * Initialize routing subsystem. + */ +void +GDS_ROUTING_init () +{ + recent_heap = + GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + recent_map = + GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8); +} + + +/** + * Shutdown routing subsystem. + */ +void +GDS_ROUTING_done () +{ + while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0) + { + recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); + GNUNET_assert (recent_req != NULL); + GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); + GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom); + GNUNET_free (recent_req); + } + GNUNET_CONTAINER_heap_destroy (recent_heap); + recent_heap = NULL; + GNUNET_CONTAINER_multihashmap_destroy (recent_map); + recent_map = NULL; +} + +/* end of gnunet-service-dht_routing.c */ diff --git a/src/dht/gnunet-service-dht_routing.h b/src/dht/gnunet-service-dht_routing.h new file mode 100644 index 000000000..3ddfcc66e --- /dev/null +++ b/src/dht/gnunet-service-dht_routing.h @@ -0,0 +1,93 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file dht/gnunet-service-dht_routing.h + * @brief GNUnet DHT tracking of requests for routing replies + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_DHT_ROUTING_H +#define GNUNET_SERVICE_DHT_ROUTING_H + + +/** + * Handle a reply (route to origin). Only forwards the reply back to + * other peers waiting for it. Does not do local caching or + * forwarding to local clients. Essentially calls + * GDS_NEIGHBOURS_handle_reply for all peers that sent us a matching + * request recently. + * + * @param type type of the block + * @param expiration_time when does the content expire + * @param key key for the content + * @param put_path_length number of entries in put_path + * @param put_path peers the original PUT traversed (if tracked) + * @param get_path_length number of entries in put_path + * @param get_path peers this reply has traversed so far (if tracked) + * @param data payload of the reply + * @param data_size number of bytes in data + */ +void +GDS_ROUTING_process (uint32_t type, + GNUNET_TIME_Absolute expiration_time, + const GNUNET_HashCode *key, + unsigned int put_path_length, + struct GNUNET_PeerIdentity *put_path, + unsigned int get_path_length, + struct GNUNET_PeerIdentity *get_path, + const void *data, + size_t data_size); + + +/** + * Add a new entry to our routing table. + * + * @param sender peer that originated the request + * @param type type of the block + * @param key key for the content + * @param xquery extended query + * @param xquery_size number of bytes in xquery + * @param reply_bf bloomfilter to filter duplicates + * @param reply_bf_mutator mutator for reply_bf +*/ +void +GDS_ROUTING_add (const GNUNET_PeerIdentity *sender, + uint32_t type, + const GNUNET_HashCode *key, + const void *xquery, + size_t xquery_size, + const struct GNUNET_CONTAINER_BloomFilter *reply_bf, + uint32_t reply_bf_mutator); + + +/** + * Initialize routing subsystem. + */ +void +GDS_ROUTING_init (void); + + +/** + * Shutdown routing subsystem. + */ +void +GDS_ROUTING_done (void); + +#endif -- 2.25.1