From 3d3cedf2e2c41883771cc2170b761840ac26b869 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 1 Aug 2016 12:27:45 +0000 Subject: [PATCH] -migrate to new core MQ API --- src/dht/gnunet-service-dht_neighbours.c | 958 ++++++++++-------------- 1 file changed, 413 insertions(+), 545 deletions(-) diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index b7a2f89a2..1a2fa32e4 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2015 GNUnet e.V. + Copyright (C) 2009-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -255,40 +255,6 @@ struct PeerGetMessage }; GNUNET_NETWORK_STRUCT_END -/** - * Linked list of messages to send to a particular other peer. - */ -struct P2PPendingMessage -{ - /** - * Pointer to next item in the list - */ - struct P2PPendingMessage *next; - - /** - * Pointer to previous item in the list - */ - struct P2PPendingMessage *prev; - - /** - * Message importance level. FIXME: used? useful? - */ - unsigned int importance; - - /** - * When does this message time out? - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Actual message to be sent, allocated at the end of the struct: - * // msg = (cast) &pm[1]; - * // GNUNET_memcpy (&pm[1], data, len); - */ - const struct GNUNET_MessageHeader *msg; - -}; - /** * Entry for a peer in a bucket. @@ -306,41 +272,14 @@ struct PeerInfo struct PeerInfo *prev; /** - * Count of outstanding messages for peer. - */ - unsigned int pending_count; - - /** - * Head of pending messages to be sent to this peer. - */ - struct P2PPendingMessage *head; - - /** - * Tail of pending messages to be sent to this peer. - */ - struct P2PPendingMessage *tail; - - /** - * Core handle for sending messages to this peer. + * Handle for sending messages to this peer. */ - struct GNUNET_CORE_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** * What is the identity of the peer? */ - struct GNUNET_PeerIdentity id; - -#if 0 - /** - * What is the average latency for replies received? - */ - struct GNUNET_TIME_Relative latency; - - /** - * Transport level distance to peer. - */ - unsigned int distance; -#endif + const struct GNUNET_PeerIdentity *id; }; @@ -766,28 +705,29 @@ send_find_peer_message (void *cls) * * @param cls closure * @param peer peer identity this notification is about + * @param mq message queue for sending messages to @a peer + * @return our `struct PeerInfo` for @a peer */ -static void +static void * handle_core_connect (void *cls, - const struct GNUNET_PeerIdentity *peer) + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { struct PeerInfo *ret; struct GNUNET_HashCode phash; int peer_bucket; /* Check for connect to self message */ - if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) - return; + if (0 == memcmp (&my_identity, + peer, + sizeof (struct GNUNET_PeerIdentity))) + return NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to %s\n", GNUNET_i2s (peer)); - if (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_contains (all_connected_peers, - peer)) - { - GNUNET_break (0); - return; - } + GNUNET_assert (GNUNET_NO == + GNUNET_CONTAINER_multipeermap_get (all_connected_peers, + peer)); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1, @@ -798,11 +738,8 @@ handle_core_connect (void *cls, peer_bucket = find_bucket (&phash); GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS)); ret = GNUNET_new (struct PeerInfo); -#if 0 - ret->latency = latency; - ret->distance = distance; -#endif - ret->id = *peer; + ret->id = peer; + ret->mq = mq; GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head, k_buckets[peer_bucket].tail, ret); @@ -811,7 +748,7 @@ handle_core_connect (void *cls, peer_bucket); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (all_connected_peers, - peer, + ret->id, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); if ( (peer_bucket > 0) && @@ -828,6 +765,7 @@ handle_core_connect (void *cls, find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL); } + return ret; } @@ -836,33 +774,23 @@ handle_core_connect (void *cls, * * @param cls closure * @param peer peer identity this notification is about + * @param internal_cls our `struct PeerInfo` for @a peer */ static void handle_core_disconnect (void *cls, - const struct GNUNET_PeerIdentity *peer) + const struct GNUNET_PeerIdentity *peer, + void *internal_cls) { - struct PeerInfo *to_remove; + struct PeerInfo *to_remove = internal_cls; int current_bucket; - struct P2PPendingMessage *pos; - unsigned int discarded; struct GNUNET_HashCode phash; /* Check for disconnect from self message */ - if (0 == memcmp (&my_identity, - peer, - sizeof (struct GNUNET_PeerIdentity))) + if (NULL == to_remove) return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s\n", GNUNET_i2s (peer)); - to_remove = - GNUNET_CONTAINER_multipeermap_get (all_connected_peers, - peer); - if (NULL == to_remove) - { - GNUNET_break (0); - return; - } GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1, @@ -871,8 +799,8 @@ handle_core_disconnect (void *cls, GNUNET_CONTAINER_multipeermap_remove (all_connected_peers, peer, to_remove)); - if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) && - (GNUNET_YES != disable_try_connect)) ) + if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) && + (GNUNET_YES != disable_try_connect) ) { GNUNET_SCHEDULER_cancel (find_peer_task); find_peer_task = NULL; @@ -890,178 +818,12 @@ handle_core_disconnect (void *cls, while ( (closest_bucket > 0) && (0 == k_buckets[closest_bucket].peers_size) ) closest_bucket--; - if (NULL != to_remove->th) - { - GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th); - to_remove->th = NULL; - } - discarded = 0; - while (NULL != (pos = to_remove->head)) - { - GNUNET_CONTAINER_DLL_remove (to_remove->head, - to_remove->tail, - pos); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping message of type %u due to disconnect\n", - ntohs (pos->msg->type)); - discarded++; - GNUNET_free (pos); - } if (k_buckets[current_bucket].peers_size < bucket_size) update_connect_preferences (); - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop ("# Queued messages discarded (peer disconnected)"), - discarded, - GNUNET_NO); GNUNET_free (to_remove); } -/** - * Called when core is ready to send a message we asked for - * out to the destination. - * - * @param cls the 'struct PeerInfo' of the target peer - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf - */ -static size_t -core_transmit_notify (void *cls, - size_t size, - void *buf) -{ - struct PeerInfo *peer = cls; - char *cbuf = buf; - struct P2PPendingMessage *pending; - size_t off; - size_t msize; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "DHT ctn called with buffer of %u bytes\n", - (unsigned int) size); - peer->th = NULL; - while ((NULL != (pending = peer->head)) && - (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us)) - { - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# Messages dropped (CORE timeout)"), - 1, - GNUNET_NO); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropping message of type %u due to timeout\n", - ntohs (pending->msg->type)); - peer->pending_count--; - GNUNET_CONTAINER_DLL_remove (peer->head, - peer->tail, - pending); - GNUNET_free (pending); - } - if (NULL == pending) - { - /* no messages pending */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages pending\n"); - return 0; - } - if (NULL == buf) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got NULL buffer, trying again\n"); - peer->th = - GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO, - GNUNET_CORE_PRIO_BEST_EFFORT, - GNUNET_TIME_absolute_get_remaining - (pending->timeout), - &peer->id, - ntohs (pending->msg->size), - &core_transmit_notify, - peer); - GNUNET_break (NULL != peer->th); - return 0; - } - off = 0; - while ((NULL != (pending = peer->head)) && - (size - off >= (msize = ntohs (pending->msg->size)))) - { - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# Bytes transmitted to other peers"), msize, - GNUNET_NO); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting message of type %u to %s\n", - ntohs (pending->msg->type), - GNUNET_i2s (&peer->id)); - GNUNET_memcpy (&cbuf[off], - pending->msg, - msize); - off += msize; - peer->pending_count--; - GNUNET_CONTAINER_DLL_remove (peer->head, - peer->tail, - pending); - GNUNET_free (pending); - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u bytes fit in %u bytes available, next message is %u bytes\n", - (unsigned int) off, - (unsigned int) size, - (NULL != peer->head) ? ntohs (peer->head->msg->size) : 0); - if (NULL != (pending = peer->head)) - { - /* technically redundant, but easier to read and - avoids bogus gcc warning... */ - msize = ntohs (pending->msg->size); - peer->th = - GNUNET_CORE_notify_transmit_ready (core_api, - GNUNET_NO, - GNUNET_CORE_PRIO_BEST_EFFORT, - GNUNET_TIME_absolute_get_remaining (pending->timeout), - &peer->id, - msize, - &core_transmit_notify, - peer); - GNUNET_break (NULL != peer->th); - } - return off; -} - - -/** - * Transmit all messages in the peer's message queue. - * - * @param peer message queue to process - */ -static void -process_peer_queue (struct PeerInfo *peer) -{ - struct P2PPendingMessage *pending; - - if (NULL == (pending = peer->head)) - return; - if (NULL != peer->th) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer->th); - peer->th = NULL; - } - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# Bytes of bandwidth requested from core"), - ntohs (pending->msg->size), GNUNET_NO); - peer->th = - GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO, - GNUNET_CORE_PRIO_BEST_EFFORT, - GNUNET_TIME_absolute_get_remaining - (pending->timeout), - &peer->id, - ntohs (pending->msg->size), - &core_transmit_notify, - peer); - GNUNET_break (NULL != peer->th); -} - - /** * To how many peers should we (on average) forward the request to * obtain the desired target_replication count (on average). @@ -1071,7 +833,8 @@ process_peer_queue (struct PeerInfo *peer) * @return Some number of peers to forward the message to */ static unsigned int -get_forward_count (uint32_t hop_count, uint32_t target_replication) +get_forward_count (uint32_t hop_count, + uint32_t target_replication) { uint32_t random_value; uint32_t forward_count; @@ -1201,7 +964,7 @@ am_closest_peer (const struct GNUNET_HashCode *key, count = 0; while ((NULL != pos) && (count < bucket_size)) { - GNUNET_CRYPTO_hash (&pos->id, + GNUNET_CRYPTO_hash (pos->id, sizeof (struct GNUNET_PeerIdentity), &phash); if ((NULL != bloom) && @@ -1265,7 +1028,7 @@ select_peer (const struct GNUNET_HashCode *key, count = 0; while ((pos != NULL) && (count < bucket_size)) { - GNUNET_CRYPTO_hash (&pos->id, + GNUNET_CRYPTO_hash (pos->id, sizeof (struct GNUNET_PeerIdentity), &phash); if ((bloom == NULL) || @@ -1283,7 +1046,8 @@ select_peer (const struct GNUNET_HashCode *key, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in greedy routing for %s\n", - GNUNET_i2s (&pos->id), GNUNET_h2s (key)); + GNUNET_i2s (pos->id), + GNUNET_h2s (key)); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers excluded from routing due to Bloomfilter"), @@ -1314,7 +1078,7 @@ select_peer (const struct GNUNET_HashCode *key, pos = k_buckets[bc].head; while ((pos != NULL) && (count < bucket_size)) { - GNUNET_CRYPTO_hash (&pos->id, + GNUNET_CRYPTO_hash (pos->id, sizeof (struct GNUNET_PeerIdentity), &phash); if ((bloom != NULL) && @@ -1327,7 +1091,8 @@ select_peer (const struct GNUNET_HashCode *key, 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in random routing for %s\n", - GNUNET_i2s (&pos->id), GNUNET_h2s (key)); + GNUNET_i2s (pos->id), + GNUNET_h2s (key)); pos = pos->next; continue; /* Ignore bloomfiltered peers */ } @@ -1349,7 +1114,7 @@ select_peer (const struct GNUNET_HashCode *key, { for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next) { - GNUNET_CRYPTO_hash (&pos->id, + GNUNET_CRYPTO_hash (pos->id, sizeof (struct GNUNET_PeerIdentity), &phash); if ((bloom != NULL) && @@ -1383,7 +1148,8 @@ select_peer (const struct GNUNET_HashCode *key, static unsigned int get_target_peers (const struct GNUNET_HashCode *key, struct GNUNET_CONTAINER_BloomFilter *bloom, - uint32_t hop_count, uint32_t target_replication, + uint32_t hop_count, + uint32_t target_replication, struct PeerInfo ***targets) { unsigned int ret; @@ -1393,20 +1159,22 @@ get_target_peers (const struct GNUNET_HashCode *key, struct GNUNET_HashCode nhash; GNUNET_assert (NULL != bloom); - ret = get_forward_count (hop_count, target_replication); + ret = get_forward_count (hop_count, + target_replication); if (0 == ret) { *targets = NULL; return 0; } - rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret); + rtargets = GNUNET_new_array (ret, + struct PeerInfo *); for (off = 0; off < ret; off++) { nxt = select_peer (key, bloom, hop_count); if (NULL == nxt) break; rtargets[off] = nxt; - GNUNET_CRYPTO_hash (&nxt->id, + GNUNET_CRYPTO_hash (nxt->id, sizeof (struct GNUNET_PeerIdentity), &nhash); GNUNET_break (GNUNET_NO == @@ -1467,14 +1235,15 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *key, unsigned int put_path_length, struct GNUNET_PeerIdentity *put_path, - const void *data, size_t data_size) + const void *data, + size_t data_size) { unsigned int target_count; unsigned int i; struct PeerInfo **targets; struct PeerInfo *target; - struct P2PPendingMessage *pending; size_t msize; + struct GNUNET_MQ_Envelope *env; struct PeerPutMessage *ppm; struct GNUNET_PeerIdentity *pp; struct GNUNET_HashCode thash; @@ -1486,47 +1255,54 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, GNUNET_i2s (&my_identity), GNUNET_h2s (key)); GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash); - GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"), - 1, GNUNET_NO); + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# PUT requests routed"), + 1, + GNUNET_NO); target_count = - get_target_peers (key, bf, hop_count, desired_replication_level, + get_target_peers (key, + bf, + hop_count, + desired_replication_level, &targets); if (0 == target_count) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s terminates after %u hops at %s\n", - GNUNET_h2s (key), (unsigned int) hop_count, + GNUNET_h2s (key), + (unsigned int) hop_count, GNUNET_i2s (&my_identity)); return GNUNET_NO; } - msize = - put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + - sizeof (struct PeerPutMessage); - if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size; + if (msize + sizeof (struct PeerPutMessage) + >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { put_path_length = 0; - msize = data_size + sizeof (struct PeerPutMessage); + msize = data_size; } - if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + if (msize + sizeof (struct PeerPutMessage) + >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); GNUNET_free (targets); return GNUNET_NO; } GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# PUT messages queued for transmission"), - target_count, GNUNET_NO); + gettext_noop ("# PUT messages queued for transmission"), + target_count, + GNUNET_NO); skip_count = 0; for (i = 0; i < target_count; i++) { target = targets[i]; - if (target->pending_count >= MAXIMUM_PENDING_PER_PEER) + if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"), - 1, GNUNET_NO); + 1, + GNUNET_NO); skip_count++; continue; } @@ -1534,21 +1310,17 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, - GNUNET_i2s (&target->id)); - pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); - pending->importance = 0; /* FIXME */ - pending->timeout = expiration_time; - ppm = (struct PeerPutMessage *) &pending[1]; - pending->msg = &ppm->header; - ppm->header.size = htons (msize); - ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT); + GNUNET_i2s (target->id)); + env = GNUNET_MQ_msg_extra (ppm, + msize, + GNUNET_MESSAGE_TYPE_DHT_P2P_PUT); ppm->options = htonl (options); ppm->type = htonl (type); ppm->hop_count = htonl (hop_count + 1); ppm->desired_replication_level = htonl (desired_replication_level); ppm->put_path_length = htonl (put_path_length); ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); - GNUNET_CRYPTO_hash (&target->id, + GNUNET_CRYPTO_hash (target->id, sizeof (struct GNUNET_PeerIdentity), &thash); GNUNET_break (GNUNET_YES == @@ -1560,17 +1332,14 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, DHT_BLOOM_SIZE)); ppm->key = *key; pp = (struct GNUNET_PeerIdentity *) &ppm[1]; - GNUNET_memcpy (pp, put_path, - sizeof (struct GNUNET_PeerIdentity) * put_path_length); + GNUNET_memcpy (pp, + put_path, + sizeof (struct GNUNET_PeerIdentity) * put_path_length); GNUNET_memcpy (&pp[put_path_length], - data, - data_size); - GNUNET_CONTAINER_DLL_insert_tail (target->head, - target->tail, - pending); - target->pending_count++; - if (pending == target->head) - process_peer_queue (target); + data, + data_size); + GNUNET_MQ_send (target->mq, + env); } GNUNET_free (targets); return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; @@ -1609,7 +1378,7 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, unsigned int i; struct PeerInfo **targets; struct PeerInfo *target; - struct P2PPendingMessage *pending; + struct GNUNET_MQ_Envelope *env; size_t msize; struct PeerGetMessage *pgm; char *xq; @@ -1618,11 +1387,15 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, unsigned int skip_count; GNUNET_assert (NULL != peer_bf); - GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"), - 1, GNUNET_NO); - target_count = - get_target_peers (key, peer_bf, hop_count, desired_replication_level, - &targets); + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# GET requests routed"), + 1, + GNUNET_NO); + target_count = get_target_peers (key, + peer_bf, + hop_count, + desired_replication_level, + &targets); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to GET bloomfilter for %s\n", GNUNET_i2s (&my_identity), @@ -1633,28 +1406,29 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s terminates after %u hops at %s\n", - GNUNET_h2s (key), (unsigned int) hop_count, + GNUNET_h2s (key), + (unsigned int) hop_count, GNUNET_i2s (&my_identity)); return GNUNET_NO; } reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf); - msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size; - if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + msize = xquery_size + reply_bf_size; + if (msize + sizeof (struct PeerGetMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); GNUNET_free (targets); return GNUNET_NO; } GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# GET messages queued for transmission"), - target_count, GNUNET_NO); + gettext_noop ("# GET messages queued for transmission"), + target_count, + GNUNET_NO); /* forward request */ skip_count = 0; for (i = 0; i < target_count; i++) { target = targets[i]; - if (target->pending_count >= MAXIMUM_PENDING_PER_PEER) + if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, @@ -1667,21 +1441,17 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, - GNUNET_i2s (&target->id)); - pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); - pending->importance = 0; /* FIXME */ - pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT); - pgm = (struct PeerGetMessage *) &pending[1]; - pending->msg = &pgm->header; - pgm->header.size = htons (msize); - pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET); + GNUNET_i2s (target->id)); + env = GNUNET_MQ_msg_extra (pgm, + msize, + GNUNET_MESSAGE_TYPE_DHT_P2P_GET); pgm->options = htonl (options); pgm->type = htonl (type); pgm->hop_count = htonl (hop_count + 1); pgm->desired_replication_level = htonl (desired_replication_level); pgm->xquery_size = htonl (xquery_size); pgm->bf_mutator = reply_bf_mutator; - GNUNET_CRYPTO_hash (&target->id, + GNUNET_CRYPTO_hash (target->id, sizeof (struct GNUNET_PeerIdentity), &thash); GNUNET_break (GNUNET_YES == @@ -1693,19 +1463,17 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, DHT_BLOOM_SIZE)); pgm->key = *key; xq = (char *) &pgm[1]; - GNUNET_memcpy (xq, xquery, xquery_size); + GNUNET_memcpy (xq, + xquery, + xquery_size); if (NULL != reply_bf) GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf, &xq [xquery_size], reply_bf_size)); - GNUNET_CONTAINER_DLL_insert_tail (target->head, - target->tail, - pending); - target->pending_count++; - if (pending == target->head) - process_peer_queue (target); + GNUNET_MQ_send (target->mq, + env); } GNUNET_free (targets); return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; @@ -1741,16 +1509,14 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, size_t data_size) { struct PeerInfo *pi; - struct P2PPendingMessage *pending; + struct GNUNET_MQ_Envelope *env; size_t msize; struct PeerResultMessage *prm; struct GNUNET_PeerIdentity *paths; - msize = - data_size + sizeof (struct PeerResultMessage) + (get_path_length + - put_path_length) * + msize = data_size + (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); - if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (get_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || (put_path_length > @@ -1770,7 +1536,7 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, GNUNET_h2s (key)); return; } - if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER) + if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, @@ -1791,13 +1557,9 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, gettext_noop ("# RESULT messages queued for transmission"), 1, GNUNET_NO); - pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); - pending->importance = 0; /* FIXME */ - pending->timeout = expiration_time; - prm = (struct PeerResultMessage *) &pending[1]; - pending->msg = &prm->header; - prm->header.size = htons (msize); - prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT); + env = GNUNET_MQ_msg_extra (prm, + msize, + GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT); prm->type = htonl (type); prm->put_path_length = htonl (put_path_length); prm->get_path_length = htonl (get_path_length); @@ -1805,19 +1567,16 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, prm->key = *key; paths = (struct GNUNET_PeerIdentity *) &prm[1]; GNUNET_memcpy (paths, - put_path, - put_path_length * sizeof (struct GNUNET_PeerIdentity)); + put_path, + put_path_length * sizeof (struct GNUNET_PeerIdentity)); GNUNET_memcpy (&paths[put_path_length], - get_path, - get_path_length * sizeof (struct GNUNET_PeerIdentity)); + get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); GNUNET_memcpy (&paths[put_path_length + get_path_length], - data, - data_size); - GNUNET_CONTAINER_DLL_insert (pi->head, - pi->tail, - pending); - pi->pending_count++; - process_peer_queue (pi); + data, + data_size); + GNUNET_MQ_send (pi->mq, + env); } @@ -1843,21 +1602,45 @@ core_init (void *cls, /** - * Core handler for p2p put requests. + * Check validity of a p2p put request. * - * @param cls closure - * @param peer sender of the request + * @param cls closure with the `struct PeerInfo` of the sender * @param message message - * @param peer peer identity this notification is about - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @return #GNUNET_OK if the message is valid */ static int +check_dht_p2p_put (void *cls, + const struct PeerPutMessage *put) +{ + uint32_t putlen; + uint16_t msize; + + msize = ntohs (put->header.size); + putlen = ntohl (put->put_path_length); + if ((msize < + sizeof (struct PeerPutMessage) + + putlen * sizeof (struct GNUNET_PeerIdentity)) || + (putlen > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Core handler for p2p put requests. + * + * @param cls closure with the `struct PeerInfo` of the sender + * @param message message + */ +static void handle_dht_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) + const struct PeerPutMessage *put) { - const struct PeerPutMessage *put; + struct PeerInfo *peer = cls; const struct GNUNET_PeerIdentity *put_path; const void *payload; uint32_t putlen; @@ -1869,23 +1652,8 @@ handle_dht_p2p_put (void *cls, struct GNUNET_HashCode phash; int forwarded; - msize = ntohs (message->size); - if (msize < sizeof (struct PeerPutMessage)) - { - GNUNET_break_op (0); - return GNUNET_YES; - } - put = (const struct PeerPutMessage *) message; + msize = ntohs (put->header.size); putlen = ntohl (put->put_path_length); - if ((msize < - sizeof (struct PeerPutMessage) + - putlen * sizeof (struct GNUNET_PeerIdentity)) || - (putlen > - GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) - { - GNUNET_break_op (0); - return GNUNET_YES; - } GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P PUT requests received"), 1, @@ -1897,13 +1665,16 @@ handle_dht_p2p_put (void *cls, put_path = (const struct GNUNET_PeerIdentity *) &put[1]; payload = &put_path[putlen]; options = ntohl (put->options); - payload_size = - msize - (sizeof (struct PeerPutMessage) + - putlen * sizeof (struct GNUNET_PeerIdentity)); + payload_size = msize - (sizeof (struct PeerPutMessage) + + putlen * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n", - GNUNET_h2s (&put->key), GNUNET_i2s (peer)); - GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "PUT for `%s' from %s\n", + GNUNET_h2s (&put->key), + GNUNET_i2s (peer->id)); + GNUNET_CRYPTO_hash (peer->id, + sizeof (struct GNUNET_PeerIdentity), + &phash); if (GNUNET_YES == log_route_details_stderr) { char *tmp; @@ -1912,7 +1683,7 @@ handle_dht_p2p_put (void *cls, LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N PUT %s: %s->%s (%u, %u=>%u)\n", GNUNET_h2s (&put->key), - GNUNET_i2s (peer), + GNUNET_i2s (peer->id), tmp, ntohl(put->hop_count), GNUNET_CRYPTO_hash_matching_bits (&phash, @@ -1923,25 +1694,31 @@ handle_dht_p2p_put (void *cls, GNUNET_free (tmp); } switch (GNUNET_BLOCK_get_key - (GDS_block_context, ntohl (put->type), payload, payload_size, + (GDS_block_context, + ntohl (put->type), + payload, + payload_size, &test_key)) { case GNUNET_YES: - if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode))) + if (0 != memcmp (&test_key, + &put->key, + sizeof (struct GNUNET_HashCode))) { char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key)); GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "PUT with key `%s' for block with key %s\n", - put_s, GNUNET_h2s_full (&test_key)); + put_s, + GNUNET_h2s_full (&test_key)); GNUNET_free (put_s); - return GNUNET_YES; + return; } break; case GNUNET_NO: GNUNET_break_op (0); - return GNUNET_YES; + return; case GNUNET_SYSERR: /* cannot verify, good luck */ break; @@ -1968,7 +1745,7 @@ handle_dht_p2p_put (void *cls, case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: default: GNUNET_break_op (0); - return GNUNET_OK; + return; } } @@ -1976,15 +1753,18 @@ handle_dht_p2p_put (void *cls, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == - GNUNET_CONTAINER_bloomfilter_test (bf, &phash)); + GNUNET_CONTAINER_bloomfilter_test (bf, + &phash)); { struct GNUNET_PeerIdentity pp[putlen + 1]; /* extend 'put path' by sender */ if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE)) { - GNUNET_memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity)); - pp[putlen] = *peer; + GNUNET_memcpy (pp, + put_path, + putlen * sizeof (struct GNUNET_PeerIdentity)); + pp[putlen] = *peer->id; putlen++; } else @@ -1992,8 +1772,14 @@ handle_dht_p2p_put (void *cls, /* give to local clients */ GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time), - &put->key, 0, NULL, putlen, pp, ntohl (put->type), - payload_size, payload); + &put->key, + 0, + NULL, + putlen, + pp, + ntohl (put->type), + payload_size, + payload); /* store locally */ if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || (am_closest_peer (&put->key, bf))) @@ -2001,10 +1787,12 @@ handle_dht_p2p_put (void *cls, (put->expiration_time), &put->key, putlen, pp, ntohl (put->type), payload_size, payload); /* route to other peers */ - forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options, + forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), + options, ntohl (put->desired_replication_level), GNUNET_TIME_absolute_ntoh (put->expiration_time), - ntohl (put->hop_count), bf, + ntohl (put->hop_count), + bf, &put->key, putlen, pp, @@ -2025,7 +1813,6 @@ handle_dht_p2p_put (void *cls, payload_size); } GNUNET_CONTAINER_bloomfilter_free (bf); - return GNUNET_YES; } @@ -2040,8 +1827,9 @@ handle_dht_p2p_put (void *cls, */ static void handle_find_peer (const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_HashCode * key, - struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator) + const struct GNUNET_HashCode *key, + struct GNUNET_CONTAINER_BloomFilter *bf, + uint32_t bf_mutator) { int bucket_idx; struct PeerBucket *bucket; @@ -2058,10 +1846,16 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender, if ((NULL == bf) || (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))) { - GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, + GDS_NEIGHBOURS_handle_reply (sender, + GNUNET_BLOCK_TYPE_DHT_HELLO, GNUNET_TIME_relative_to_absolute (hello_expiration), - key, 0, NULL, 0, NULL, GDS_my_hello, + key, + 0, + NULL, + 0, + NULL, + GDS_my_hello, GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message *) GDS_my_hello)); @@ -2069,31 +1863,34 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender, else { GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# FIND PEER requests ignored due to Bloomfilter"), - 1, GNUNET_NO); + gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"), + 1, + GNUNET_NO); } } else { GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# FIND PEER requests ignored due to lack of HELLO"), - 1, GNUNET_NO); + gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"), + 1, + GNUNET_NO); } /* then, also consider sending a random HELLO from the closest bucket */ - if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode))) + if (0 == memcmp (&my_identity_hash, + key, + sizeof (struct GNUNET_HashCode))) bucket_idx = closest_bucket; else - bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key)); + bucket_idx = GNUNET_MIN (closest_bucket, + find_bucket (key)); if (bucket_idx == GNUNET_SYSERR) return; bucket = &k_buckets[bucket_idx]; if (bucket->peers_size == 0) return; - choice = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size); + choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + bucket->peers_size); peer = bucket->head; while (choice > 0) { @@ -2107,41 +1904,69 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender, peer = peer->next; if (choice-- == 0) return; /* no non-masked peer available */ - if (peer == NULL) + if (NULL == peer) peer = bucket->head; - GNUNET_CRYPTO_hash (&peer->id, + GNUNET_CRYPTO_hash (peer->id, sizeof (struct GNUNET_PeerIdentity), &phash); GNUNET_BLOCK_mingle_hash (&phash, bf_mutator, &mhash); - hello = GDS_HELLO_get (&peer->id); - } - while ((hello == NULL) || - (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))); - GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, + hello = GDS_HELLO_get (peer->id); + } while ( (hello == NULL) || + (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bf, + &mhash)) ); + GDS_NEIGHBOURS_handle_reply (sender, + GNUNET_BLOCK_TYPE_DHT_HELLO, GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key, - 0, NULL, 0, NULL, hello, + (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), + key, + 0, + NULL, + 0, + NULL, + hello, GNUNET_HELLO_size (hello)); } /** - * Core handler for p2p get requests. + * Check validity of p2p get request. * - * @param cls closure - * @param peer sender of the request - * @param message message - * @return #GNUNET_OK to keep the connection open, - * #GNUNET_SYSERR to close it (signal serious error) + * @param cls closure with the `struct PeerInfo` of the sender + * @param get the message + * @return #GNUNET_OK if the message is well-formed */ static int +check_dht_p2p_get (void *cls, + const struct PeerGetMessage *get) +{ + uint32_t xquery_size; + uint16_t msize; + + msize = ntohs (get->header.size); + xquery_size = ntohl (get->xquery_size); + if (msize < sizeof (struct PeerGetMessage) + xquery_size) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Core handler for p2p get requests. + * + * @param cls closure with the `struct PeerInfo` of the sender + * @param get the message + */ +static void handle_dht_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) + const struct PeerGetMessage *get) { - struct PeerGetMessage *get; + struct PeerInfo *peer = cls; uint32_t xquery_size; size_t reply_bf_size; uint16_t msize; @@ -2154,36 +1979,28 @@ handle_dht_p2p_get (void *cls, struct GNUNET_HashCode phash; int forwarded; - GNUNET_break (0 != - memcmp (peer, - &my_identity, - sizeof (struct GNUNET_PeerIdentity))); - /* parse and validate message */ - msize = ntohs (message->size); - if (msize < sizeof (struct PeerGetMessage)) + if (NULL == peer) { - GNUNET_break_op (0); - return GNUNET_YES; + GNUNET_break (0); + return; } - get = (struct PeerGetMessage *) message; + /* parse and validate message */ + msize = ntohs (get->header.size); xquery_size = ntohl (get->xquery_size); - if (msize < sizeof (struct PeerGetMessage) + xquery_size) - { - GNUNET_break_op (0); - return GNUNET_YES; - } reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size); type = ntohl (get->type); options = ntohl (get->options); xquery = (const char *) &get[1]; reply_bf = NULL; GNUNET_STATISTICS_update (GDS_stats, - gettext_noop ("# P2P GET requests received"), 1, + gettext_noop ("# P2P GET requests received"), + 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, - gettext_noop ("# P2P GET bytes received"), msize, + gettext_noop ("# P2P GET bytes received"), + msize, GNUNET_NO); - GNUNET_CRYPTO_hash (peer, + GNUNET_CRYPTO_hash (peer->id, sizeof (struct GNUNET_PeerIdentity), &phash); if (GNUNET_YES == log_route_details_stderr) @@ -2193,17 +2010,23 @@ handle_dht_p2p_get (void *cls, tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n", - GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp, + GNUNET_h2s (&get->key), + GNUNET_i2s (peer->id), + tmp, ntohl(get->hop_count), - GNUNET_CRYPTO_hash_matching_bits (&phash, &get->key), - GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key), - ntohl(get->xquery_size), xquery); + GNUNET_CRYPTO_hash_matching_bits (&phash, + &get->key), + GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, + &get->key), + ntohl(get->xquery_size), + xquery); GNUNET_free (tmp); } if (reply_bf_size > 0) reply_bf = - GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size, + GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], + reply_bf_size, GNUNET_CONSTANTS_BLOOMFILTER_K); eval = GNUNET_BLOCK_evaluate (GDS_block_context, @@ -2222,17 +2045,23 @@ handle_dht_p2p_get (void *cls, GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED); if (NULL != reply_bf) GNUNET_CONTAINER_bloomfilter_free (reply_bf); - return GNUNET_YES; + return; } - peer_bf = - GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE, - GNUNET_CONSTANTS_BLOOMFILTER_K); + peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, + DHT_BLOOM_SIZE, + GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &phash)); /* remember request for routing replies */ - GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size, - reply_bf, get->bf_mutator); + GDS_ROUTING_add (peer->id, + type, + options, + &get->key, + xquery, + xquery_size, + reply_bf, + get->bf_mutator); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n", GNUNET_h2s (&get->key), @@ -2240,48 +2069,59 @@ handle_dht_p2p_get (void *cls, (unsigned int) ntohl (get->hop_count)); /* local lookup (this may update the reply_bf) */ if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || - (am_closest_peer (&get->key, peer_bf))) + (am_closest_peer (&get->key, + peer_bf))) { if ((0 != (options & GNUNET_DHT_RO_FIND_PEER))) { GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# P2P FIND PEER requests processed"), 1, + gettext_noop ("# P2P FIND PEER requests processed"), + 1, GNUNET_NO); - handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator); + handle_find_peer (peer->id, + &get->key, + reply_bf, + get->bf_mutator); } else { - eval = - GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size, - &reply_bf, get->bf_mutator); + eval = GDS_DATACACHE_handle_get (&get->key, + type, + xquery, + xquery_size, + &reply_bf, + get->bf_mutator); } } else { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P GET requests ONLY routed"), - 1, GNUNET_NO); + 1, + GNUNET_NO); } /* P2P forwarding */ forwarded = GNUNET_NO; if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) - forwarded = GDS_NEIGHBOURS_handle_get (type, options, + forwarded = GDS_NEIGHBOURS_handle_get (type, + options, ntohl (get->desired_replication_level), ntohl (get->hop_count), &get->key, xquery, xquery_size, reply_bf, - get->bf_mutator, peer_bf); + get->bf_mutator, + peer_bf); GDS_CLIENTS_process_get (options | (GNUNET_OK == forwarded) ? GNUNET_DHT_RO_LAST_HOP : 0, type, ntohl (get->hop_count), ntohl (get->desired_replication_level), - 0, NULL, + 0, + NULL, &get->key); @@ -2289,41 +2129,25 @@ handle_dht_p2p_get (void *cls, if (NULL != reply_bf) GNUNET_CONTAINER_bloomfilter_free (reply_bf); GNUNET_CONTAINER_bloomfilter_free (peer_bf); - return GNUNET_YES; } /** - * Core handler for p2p result messages. + * Check validity of p2p result message. * * @param cls closure * @param message message - * @param peer peer identity this notification is about - * @return #GNUNET_YES (do not cut p2p connection) + * @return #GNUNET_YES if the message is well-formed */ static int -handle_dht_p2p_result (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +check_dht_p2p_result (void *cls, + const struct PeerResultMessage *prm) { - const struct PeerResultMessage *prm; - const struct GNUNET_PeerIdentity *put_path; - const struct GNUNET_PeerIdentity *get_path; - const void *data; uint32_t get_path_length; uint32_t put_path_length; uint16_t msize; - size_t data_size; - enum GNUNET_BLOCK_Type type; - /* parse and validate message */ - msize = ntohs (message->size); - if (msize < sizeof (struct PeerResultMessage)) - { - GNUNET_break_op (0); - return GNUNET_YES; - } - prm = (struct PeerResultMessage *) message; + msize = ntohs (prm->header.size); put_path_length = ntohl (prm->put_path_length); get_path_length = ntohl (prm->get_path_length); if ((msize < @@ -2336,21 +2160,51 @@ handle_dht_p2p_result (void *cls, GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); - return GNUNET_YES; + return GNUNET_SYSERR; } + return GNUNET_OK; +} + + +/** + * Core handler for p2p result messages. + * + * @param cls closure + * @param message message + */ +static void +handle_dht_p2p_result (void *cls, + const struct PeerResultMessage *prm) +{ + struct PeerInfo *peer = cls; + const struct GNUNET_PeerIdentity *put_path; + const struct GNUNET_PeerIdentity *get_path; + const void *data; + uint32_t get_path_length; + uint32_t put_path_length; + uint16_t msize; + size_t data_size; + enum GNUNET_BLOCK_Type type; + + /* parse and validate message */ + msize = ntohs (prm->header.size); + put_path_length = ntohl (prm->put_path_length); + get_path_length = ntohl (prm->get_path_length); put_path = (const struct GNUNET_PeerIdentity *) &prm[1]; get_path = &put_path[put_path_length]; type = ntohl (prm->type); data = (const void *) &get_path[get_path_length]; - data_size = - msize - (sizeof (struct PeerResultMessage) + - (get_path_length + - put_path_length) * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"), - 1, GNUNET_NO); + data_size = msize - (sizeof (struct PeerResultMessage) + + (get_path_length + + put_path_length) * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# P2P RESULTS received"), + 1, + GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULT bytes received"), - msize, GNUNET_NO); + msize, + GNUNET_NO); if (GNUNET_YES == log_route_details_stderr) { char *tmp; @@ -2359,7 +2213,7 @@ handle_dht_p2p_result (void *cls, LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N RESULT %s: %s->%s (%u)\n", GNUNET_h2s (&prm->key), - GNUNET_i2s (peer), + GNUNET_i2s (peer->id), tmp, get_path_length + 1); GNUNET_free (tmp); @@ -2371,23 +2225,23 @@ handle_dht_p2p_result (void *cls, struct GNUNET_PeerIdentity pid; /* Should be a HELLO, validate and consider using it! */ - if (data_size < sizeof (struct GNUNET_MessageHeader)) + if (data_size < sizeof (struct GNUNET_HELLO_Message)) { GNUNET_break_op (0); - return GNUNET_YES; + return; } h = data; if (data_size != ntohs (h->size)) { GNUNET_break_op (0); - return GNUNET_YES; + return; } if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid)) { GNUNET_break_op (0); - return GNUNET_YES; + return; } if ( (GNUNET_YES != disable_try_connect) && (0 != memcmp (&my_identity, @@ -2402,9 +2256,9 @@ handle_dht_p2p_result (void *cls, struct GNUNET_PeerIdentity xget_path[get_path_length + 1]; GNUNET_memcpy (xget_path, - get_path, - get_path_length * sizeof (struct GNUNET_PeerIdentity)); - xget_path[get_path_length] = *peer; + get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); + xget_path[get_path_length] = *peer->id; get_path_length++; /* forward to local clients */ @@ -2429,10 +2283,12 @@ handle_dht_p2p_result (void *cls, { struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length]; - GNUNET_memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity)); + GNUNET_memcpy (xput_path, + put_path, + put_path_length * sizeof (struct GNUNET_PeerIdentity)); GNUNET_memcpy (&xput_path[put_path_length], - xget_path, - get_path_length * sizeof (struct GNUNET_PeerIdentity)); + xget_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key, @@ -2453,8 +2309,6 @@ handle_dht_p2p_result (void *cls, data, data_size); } - - return GNUNET_YES; } @@ -2466,38 +2320,51 @@ handle_dht_p2p_result (void *cls, int GDS_NEIGHBOURS_init () { - static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, - {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0}, - {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0}, - {NULL, 0, 0} + GNUNET_MQ_hd_var_size (dht_p2p_get, + GNUNET_MESSAGE_TYPE_DHT_P2P_GET, + struct PeerGetMessage); + GNUNET_MQ_hd_var_size (dht_p2p_put, + GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, + struct PeerPutMessage); + GNUNET_MQ_hd_var_size (dht_p2p_result, + GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, + struct PeerResultMessage); + struct GNUNET_MQ_MessageHandler core_handlers[] = { + make_dht_p2p_get_handler (NULL), + make_dht_p2p_put_handler (NULL), + make_dht_p2p_result_handler (NULL), + GNUNET_MQ_handler_end () }; unsigned long long temp_config_num; disable_try_connect - = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT"); + = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, + "DHT", + "DISABLE_TRY_CONNECT"); if (GNUNET_OK == - GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size", + GNUNET_CONFIGURATION_get_value_number (GDS_cfg, + "DHT", + "bucket_size", &temp_config_num)) bucket_size = (unsigned int) temp_config_num; cache_results - = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS"); + = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, + "DHT", + "CACHE_RESULTS"); log_route_details_stderr = (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO; ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg); - core_api = - GNUNET_CORE_connect (GDS_cfg, NULL, - &core_init, - &handle_core_connect, - &handle_core_disconnect, - NULL, GNUNET_NO, - NULL, GNUNET_NO, - core_handlers); - if (core_api == NULL) + core_api = GNUNET_CORE_connecT (GDS_cfg, + NULL, + &core_init, + &handle_core_connect, + &handle_core_disconnect, + core_handlers); + if (NULL == core_api) return GNUNET_SYSERR; all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256, - GNUNET_NO); + GNUNET_YES); all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_NO); return GNUNET_OK; @@ -2512,9 +2379,10 @@ GDS_NEIGHBOURS_done () { if (NULL == core_api) return; - GNUNET_CORE_disconnect (core_api); + GNUNET_CORE_disconnecT (core_api); core_api = NULL; - GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)); + GNUNET_assert (0 == + GNUNET_CONTAINER_multipeermap_size (all_connected_peers)); GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers); all_connected_peers = NULL; GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers, -- 2.25.1