/**
* How many buckets will we allow total.
*/
-#define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
+#define MAX_BUCKETS sizeof (struct GNUNET_HashCode) * 8
/**
* What is the maximum number of peers in a given bucket.
*/
#define MAXIMUM_REPLICATION_LEVEL 16
+/**
+ * Maximum allowed number of pending messages per peer.
+ */
+#define MAXIMUM_PENDING_PER_PEER 64
+
/**
* How often to update our preference levels for peers in our routing tables.
*/
/**
* The key we are storing under.
*/
- GNUNET_HashCode key;
+ struct GNUNET_HashCode key;
/* put path (if tracked) */
/**
* The key of the corresponding GET request.
*/
- GNUNET_HashCode key;
+ struct GNUNET_HashCode key;
/* put path (if tracked) */
/**
* The key we are looking for.
*/
- GNUNET_HashCode key;
+ struct GNUNET_HashCode key;
/* xquery */
struct PeerInfo *prev;
/**
- * Count of outstanding messages for peer. FIXME: NEEDED?
- * FIXME: bound queue size!?
+ * Count of outstanding messages for peer.
*/
unsigned int pending_count;
* on error (same hashcode)
*/
static int
-find_bucket (const GNUNET_HashCode * hc)
+find_bucket (const struct GNUNET_HashCode * hc)
{
unsigned int bits;
* @return GNUNET_YES (we should continue to iterate)
*/
static int
-add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
+add_known_to_bloom (void *cls, const struct GNUNET_HashCode * key, void *value)
{
struct BloomConstructorContext *ctx = cls;
- GNUNET_HashCode mh;
+ struct GNUNET_HashCode mh;
GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
GNUNET_h2s (key), ctx->bf_mutator);
-#endif
GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
return GNUNET_YES;
}
/* Check for connect to self message */
if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
-#endif
if (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
&peer->hashPubKey))
GNUNET_break (0);
return;
}
- GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), 1,
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1,
GNUNET_NO);
peer_bucket = find_bucket (&peer->hashPubKey);
GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
/* Check for disconnect from self message */
if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
-#endif
to_remove =
GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
if (NULL == to_remove)
GNUNET_break (0);
return;
}
- GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), -1,
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1,
GNUNET_NO);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
* the two hash codes increases
*/
static unsigned int
-get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
+get_distance (const struct GNUNET_HashCode * target, const struct GNUNET_HashCode * have)
{
unsigned int bucket;
unsigned int msb;
* mismatching bit at 'bucket' */
lsb = 0;
for (i = bucket + 1;
- (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
+ (i < sizeof (struct GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
{
if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
GNUNET_CRYPTO_hash_get_bit (have, i))
* GNUNET_NO otherwise.
*/
static int
-am_closest_peer (const GNUNET_HashCode * key,
+am_closest_peer (const struct GNUNET_HashCode * key,
const struct GNUNET_CONTAINER_BloomFilter *bloom)
{
int bits;
int count;
struct PeerInfo *pos;
- if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
+ if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
return GNUNET_YES;
bucket_num = find_bucket (key);
GNUNET_assert (bucket_num >= 0);
* @return Peer to route to, or NULL on error
*/
static struct PeerInfo *
-select_peer (const GNUNET_HashCode * key,
+select_peer (const struct GNUNET_HashCode * key,
const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops)
{
unsigned int bc;
}
else
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Excluded peer `%s' due to BF match in greedy routing for %s\n",
GNUNET_i2s (&pos->id), GNUNET_h2s (key));
-#endif
GNUNET_STATISTICS_update (GDS_stats,
gettext_noop
("# Peers excluded from routing due to Bloomfilter"),
gettext_noop
("# Peers excluded from routing due to Bloomfilter"),
1, GNUNET_NO);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Excluded peer `%s' due to BF match in random routing for %s\n",
GNUNET_i2s (&pos->id), GNUNET_h2s (key));
-#endif
pos = pos->next;
continue; /* Ignore bloomfiltered peers */
}
* @return number of peers returned in 'targets'.
*/
static unsigned int
-get_target_peers (const GNUNET_HashCode * key,
+get_target_peers (const struct GNUNET_HashCode * key,
struct GNUNET_CONTAINER_BloomFilter *bloom,
uint32_t hop_count, uint32_t target_replication,
struct PeerInfo ***targets)
&nxt->id.hashPubKey));
GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
}
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Selected %u/%u peers at hop %u for %s (target was %u)\n", off,
GNUNET_CONTAINER_multihashmap_size (all_known_peers),
(unsigned int) hop_count, GNUNET_h2s (key), ret);
-#endif
if (0 == off)
{
GNUNET_free (rtargets);
struct GNUNET_TIME_Absolute expiration_time,
uint32_t hop_count,
struct GNUNET_CONTAINER_BloomFilter *bf,
- const GNUNET_HashCode * key,
+ const struct GNUNET_HashCode * key,
unsigned int put_path_length,
struct GNUNET_PeerIdentity *put_path,
const void *data, size_t data_size)
struct GNUNET_PeerIdentity *pp;
GNUNET_assert (NULL != bf);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding myself (%s) to PUT bloomfilter for %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-#endif
GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1, GNUNET_NO);
&targets);
if (0 == target_count)
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing PUT for %s terminates after %u hops at %s\n",
GNUNET_h2s (key), (unsigned int) hop_count,
GNUNET_i2s (&my_identity));
-#endif
return;
}
msize =
for (i = 0; i < target_count; i++)
{
target = targets[i];
-#if DEBUG_DHT
+ if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
+ {
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
+ 1, GNUNET_NO);
+ continue; /* skip */
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
(unsigned int) hop_count, GNUNET_i2s (&target->id));
-#endif
pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
pending->importance = 0; /* FIXME */
pending->timeout = expiration_time;
GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
enum GNUNET_DHT_RouteOption options,
uint32_t desired_replication_level,
- uint32_t hop_count, const GNUNET_HashCode * key,
+ uint32_t hop_count, const struct GNUNET_HashCode * key,
const void *xquery, size_t xquery_size,
const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
uint32_t reply_bf_mutator,
target_count =
get_target_peers (key, peer_bf, hop_count, desired_replication_level,
&targets);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding myself (%s) to GET bloomfilter for %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-#endif
GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
if (0 == target_count)
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing GET for %s terminates after %u hops at %s\n",
GNUNET_h2s (key), (unsigned int) hop_count,
GNUNET_i2s (&my_identity));
-#endif
return;
}
reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
for (i = 0; i < target_count; i++)
{
target = targets[i];
-#if DEBUG_DHT
+ if (target->pending_count >= MAXIMUM_PENDING_PER_PEER)
+ {
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
+ 1, GNUNET_NO);
+ continue; /* skip */
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
(unsigned int) hop_count, GNUNET_i2s (&target->id));
-#endif
pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
pending->importance = 0; /* FIXME */
pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
enum GNUNET_BLOCK_Type type,
struct GNUNET_TIME_Absolute expiration_time,
- const GNUNET_HashCode * key,
+ const struct GNUNET_HashCode * key,
unsigned int put_path_length,
const struct GNUNET_PeerIdentity *put_path,
unsigned int get_path_length,
/* peer disconnected in the meantime, drop reply */
return;
}
+ if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER)
+ {
+ /* skip */
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"),
+ 1, GNUNET_NO);
+ return;
+ }
+
GNUNET_STATISTICS_update (GDS_stats,
gettext_noop
("# RESULT messages queued for transmission"), 1,
size_t payload_size;
enum GNUNET_DHT_RouteOption options;
struct GNUNET_CONTAINER_BloomFilter *bf;
- GNUNET_HashCode test_key;
+ struct GNUNET_HashCode test_key;
msize = ntohs (message->size);
if (msize < sizeof (struct PeerPutMessage))
&test_key))
{
case GNUNET_YES:
- if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode)))
+ if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
{
GNUNET_break_op (0);
return GNUNET_YES;
/* cannot verify, good luck */
break;
}
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for %s at %s\n",
- GNUNET_h2s (&put->key), GNUNET_i2s (&my_identity));
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
+ GNUNET_h2s (&put->key), GNUNET_i2s (peer));
bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
GNUNET_CONSTANTS_BLOOMFILTER_K);
GNUNET_break_op (GNUNET_YES ==
pp, payload, payload_size);
}
GNUNET_CONTAINER_bloomfilter_free (bf);
- GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
- GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key,
- putlen, put_path, 0, NULL, ntohl(put->desired_replication_level),
- ntohl (put->type), payload, payload_size);
+ GDS_CLIENTS_process_put (options,
+ ntohl (put->type),
+ ntohl (put->hop_count),
+ ntohl (put->desired_replication_level),
+ putlen, put_path,
+ GNUNET_TIME_absolute_ntoh (put->expiration_time),
+ &put->key,
+ payload,
+ payload_size);
return GNUNET_YES;
}
*/
static void
handle_find_peer (const struct GNUNET_PeerIdentity *sender,
- const GNUNET_HashCode * key,
+ const struct GNUNET_HashCode * key,
struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator)
{
int bucket_idx;
struct PeerBucket *bucket;
struct PeerInfo *peer;
unsigned int choice;
- GNUNET_HashCode mhash;
+ struct GNUNET_HashCode mhash;
const struct GNUNET_HELLO_Message *hello;
/* first, check about our own HELLO */
}
/* then, also consider sending a random HELLO from the closest bucket */
- if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
+ if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (struct GNUNET_HashCode)))
bucket_idx = closest_bucket;
else
bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key));
/* remember request for routing replies */
GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
reply_bf, get->bf_mutator);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n",
GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity),
(unsigned int) ntohl (get->hop_count));
-#endif
/* local lookup (this may update the reply_bf) */
if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
(am_closest_peer (&get->key, peer_bf)))
1, GNUNET_NO);
}
- GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
- GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL,
- ntohl (get->desired_replication_level), type, NULL, 0);
+ /* FIXME Path */
+ GDS_CLIENTS_process_get (options,
+ type,
+ ntohl(get->hop_count),
+ ntohl(get->desired_replication_level),
+ 0, NULL,
+ &get->key);
/* P2P forwarding */
if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
xget_path, data, data_size);
}
- GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
- GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key,
- put_path_length, put_path, get_path_length, get_path,
- 0, type, data, data_size);
+ GDS_CLIENTS_process_get_resp (type,
+ get_path,
+ get_path_length,
+ put_path,
+ put_path_length,
+ GNUNET_TIME_absolute_ntoh (
+ prm->expiration_time),
+ &prm->key,
+ data,
+ data_size);
return GNUNET_YES;
}
bucket_size = (unsigned int) temp_config_num;
atsAPI = GNUNET_ATS_performance_init (GDS_cfg, NULL, NULL);
coreAPI =
- GNUNET_CORE_connect (GDS_cfg, 1, NULL, &core_init, &handle_core_connect,
+ GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect,
&handle_core_disconnect, NULL, GNUNET_NO, NULL,
GNUNET_NO, core_handlers);
if (coreAPI == NULL)
}
}
+/**
+ * Get the ID of the local node.
+ *
+ * @return identity of the local node
+ */
+struct GNUNET_PeerIdentity *
+GDS_NEIGHBOURS_get_id ()
+{
+ return &my_identity;
+}
+
/* end of gnunet-service-dht_neighbours.c */