*/
#define MAXIMUM_PENDING_PER_PEER 64
-/**
- * How often to update our preference levels for peers in our routing tables.
- */
-#define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
-
/**
* How long at least to wait before sending another find peer request.
*/
*/
struct GNUNET_CORE_TransmitHandle *th;
- /**
- * Task for scheduling preference updates
- */
- struct GNUNET_SCHEDULER_Task *preference_task;
-
/**
* What is the identity of the peer?
*/
};
+/**
+ * Information about a peer that we would like to connect to.
+ */
+struct ConnectInfo
+{
+
+ /**
+ * Handle to active HELLO offer operation, or NULL.
+ */
+ struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
+
+ /**
+ * Handle to active connectivity suggestion operation, or NULL.
+ */
+ struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
+
+ /**
+ * How much would we like to connect to this peer?
+ */
+ uint32_t strength;
+};
+
+
/**
* Do we cache all results that we are routing in the local datacache?
*/
/**
* Hash map of all CORE-connected peers, for easy removal from
- * #k_buckets on disconnect.
+ * #k_buckets on disconnect. Values are of type `struct PeerInfo`.
*/
static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
+/**
+ * Hash map of all peers we would like to be connected to.
+ * Values are of type `struct ConnectInfo`.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
+
/**
* Maximum size for each bucket.
*/
*/
static struct GNUNET_CORE_Handle *core_api;
-/**
- * Handle to ATS performance monitoring.
- */
-static struct GNUNET_ATS_PerformanceHandle *ats_perf;
-
/**
* Handle to ATS connectivity.
*/
static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
-
/**
* Find the optimal bucket for this key.
*
/**
- * Let GNUnet core know that we like the given peer.
+ * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
+ * Clean up the "oh" field in the @a cls
*
- * @param cls the `struct PeerInfo` of the peer
- * @param tc scheduler context.
+ * @param cls a `struct ConnectInfo`
+ * @param tc unused
*/
static void
-update_core_preference (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+offer_hello_done (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct PeerInfo *peer = cls;
- uint64_t preference;
- unsigned int matching;
- int bucket;
- struct GNUNET_HashCode phash;
+ struct ConnectInfo *ci = cls;
- peer->preference_task = NULL;
- if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
- return;
- GNUNET_CRYPTO_hash (&peer->id,
- sizeof (struct GNUNET_PeerIdentity),
- &phash);
- matching =
- GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
- &phash);
- if (matching >= 64)
- matching = 63;
- bucket = find_bucket (&phash);
- if (bucket == GNUNET_SYSERR)
- preference = 0;
- else
+ ci->oh = NULL;
+}
+
+
+/**
+ * Function called for all entries in #all_desired_peers to clean up.
+ *
+ * @param cls NULL
+ * @param peer peer the entry is for
+ * @param value the value to remove
+ * @return #GNUNET_YES
+ */
+static int
+free_connect_info (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ void *value)
+{
+ struct ConnectInfo *ci = cls;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
+ peer,
+ ci));
+ if (NULL != ci->sh)
+ {
+ GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
+ ci->sh = NULL;
+ }
+ if (NULL != ci->oh)
{
- GNUNET_assert (k_buckets[bucket].peers_size != 0);
- preference = (1LL << matching) / k_buckets[bucket].peers_size;
+ GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
+ ci->oh = NULL;
}
- if (preference == 0)
+ GNUNET_free (ci);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Consider if we want to connect to a given peer, and if so
+ * let ATS know. If applicable, the HELLO is offered to the
+ * TRANSPORT service.
+ *
+ * @param pid peer to consider connectivity requirements for
+ * @param h a HELLO message, or NULL
+ */
+static void
+try_connect (const struct GNUNET_PeerIdentity *pid,
+ const struct GNUNET_MessageHeader *h)
+{
+ int bucket;
+ struct GNUNET_HashCode pid_hash;
+ struct ConnectInfo *ci;
+ uint32_t strength;
+
+ GNUNET_CRYPTO_hash (pid,
+ sizeof (struct GNUNET_PeerIdentity),
+ &pid_hash);
+ bucket = find_bucket (&pid_hash);
+ if (bucket < 0)
+ return; /* self? */
+ ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
+ pid);
+
+ if (k_buckets[bucket].peers_size < bucket_size)
+ strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
+ else
+ strength = bucket; /* minimum value of connectivity */
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
+ pid))
+ strength *= 2; /* double for connected peers */
+ else if (k_buckets[bucket].peers_size > bucket_size)
+ strength = 0; /* bucket full, we really do not care about more */
+
+ if ( (0 == strength) &&
+ (NULL != ci) )
{
- peer->preference_task =
- GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
- &update_core_preference, peer);
+ /* release request */
+ GNUNET_assert (GNUNET_YES ==
+ free_connect_info (NULL,
+ pid,
+ ci));
return;
}
- GNUNET_STATISTICS_update (GDS_stats,
- gettext_noop ("# Preference updates given to core"),
- 1, GNUNET_NO);
- GNUNET_ATS_performance_change_preference (ats_perf,
- &peer->id,
- GNUNET_ATS_PREFERENCE_BANDWIDTH,
- (double) preference,
- GNUNET_ATS_PREFERENCE_END);
- peer->preference_task =
- GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
- &update_core_preference, peer);
+ if (NULL == ci)
+ ci = GNUNET_new (struct ConnectInfo);
+ if ( (NULL != GDS_transport_handle) &&
+ (NULL != ci->oh) &&
+ (NULL != h) )
+ GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
+ if ( (NULL != GDS_transport_handle) &&
+ (NULL != h) )
+ ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
+ h,
+ &offer_hello_done,
+ ci);
+ if ( (NULL != ci->sh) &&
+ (ci->strength != strength) )
+ GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
+ if (ci->strength != strength)
+ ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
+ pid,
+ strength);
+ ci->strength = strength;
+}
+
+
+/**
+ * Function called for each peer in #all_desired_peers during
+ * #update_connect_preferences() if we have reason to adjust
+ * the strength of our desire to keep connections to certain
+ * peers. Calls #try_connect() to update the calculations for
+ * the given @a pid.
+ *
+ * @param cls NULL
+ * @param pid peer to update
+ * @param value unused
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+update_desire_strength (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ try_connect (pid, NULL);
+ return GNUNET_YES;
+}
+/**
+ * Update our preferences for connectivity as given to ATS.
+ *
+ * @param cls the `struct PeerInfo` of the peer
+ * @param tc scheduler context.
+ */
+static void
+update_connect_preferences ()
+{
+ GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
+ &update_desire_strength,
+ NULL);
}
/**
- * Closure for 'add_known_to_bloom'.
+ * Closure for #add_known_to_bloom().
*/
struct BloomConstructorContext
{
struct GNUNET_HashCode key_hash;
struct GNUNET_HashCode mh;
- GNUNET_CRYPTO_hash (key, sizeof (struct GNUNET_PeerIdentity), &key_hash);
- GNUNET_BLOCK_mingle_hash (&key_hash, ctx->bf_mutator, &mh);
+ GNUNET_CRYPTO_hash (key,
+ sizeof (struct GNUNET_PeerIdentity),
+ &key_hash);
+ GNUNET_BLOCK_mingle_hash (&key_hash,
+ ctx->bf_mutator,
+ &mh);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
GNUNET_i2s (key), ctx->bf_mutator);
GNUNET_break (0);
return;
}
- GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1,
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# peers connected"),
+ 1,
GNUNET_NO);
GNUNET_CRYPTO_hash (peer,
sizeof (struct GNUNET_PeerIdentity),
k_buckets[peer_bucket].tail,
ret);
k_buckets[peer_bucket].peers_size++;
- closest_bucket = GNUNET_MAX (closest_bucket, peer_bucket);
- if ((peer_bucket > 0) && (k_buckets[peer_bucket].peers_size <= bucket_size))
- {
- ret->preference_task =
- GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
- newly_found_peers++;
- }
+ closest_bucket = GNUNET_MAX (closest_bucket,
+ peer_bucket);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
peer,
ret,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ if ( (peer_bucket > 0) &&
+ (k_buckets[peer_bucket].peers_size <= bucket_size))
+ {
+ update_connect_preferences ();
+ newly_found_peers++;
+ }
if (1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) &&
(GNUNET_YES != disable_try_connect))
{
struct GNUNET_HashCode phash;
/* Check for disconnect from self message */
- if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
+ if (0 == memcmp (&my_identity,
+ peer,
+ sizeof (struct GNUNET_PeerIdentity)))
return;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnected %s\n",
GNUNET_break (0);
return;
}
- GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1,
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop ("# peers connected"),
+ -1,
GNUNET_NO);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
peer,
to_remove));
- if (NULL != to_remove->preference_task)
- {
- GNUNET_SCHEDULER_cancel (to_remove->preference_task);
- to_remove->preference_task = NULL;
- }
GNUNET_CRYPTO_hash (peer,
sizeof (struct GNUNET_PeerIdentity),
&phash);
current_bucket = find_bucket (&phash);
GNUNET_assert (current_bucket >= 0);
GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
- k_buckets[current_bucket].tail, to_remove);
+ k_buckets[current_bucket].tail,
+ to_remove);
GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
k_buckets[current_bucket].peers_size--;
- while ((closest_bucket > 0) && (k_buckets[closest_bucket].peers_size == 0))
+ while ( (closest_bucket > 0) &&
+ (0 == k_buckets[closest_bucket].peers_size) )
closest_bucket--;
-
- if (to_remove->th != NULL)
+ if (NULL != to_remove->th)
{
GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
to_remove->th = NULL;
discarded = 0;
while (NULL != (pos = to_remove->head))
{
- GNUNET_CONTAINER_DLL_remove (to_remove->head, to_remove->tail, pos);
+ GNUNET_CONTAINER_DLL_remove (to_remove->head,
+ to_remove->tail,
+ pos);
discarded++;
GNUNET_free (pos);
}
+ if (k_buckets[current_bucket].peers_size < bucket_size)
+ update_connect_preferences ();
GNUNET_STATISTICS_update (GDS_stats,
- gettext_noop
- ("# Queued messages discarded (peer disconnected)"),
- discarded, GNUNET_NO);
+ gettext_noop ("# Queued messages discarded (peer disconnected)"),
+ discarded,
+ GNUNET_NO);
GNUNET_free (to_remove);
}
* out to the destination.
*
* @param cls the 'struct PeerInfo' of the target peer
- * @param size number of bytes available in buf
+ * @param size number of bytes available in @a buf
* @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @return number of bytes written to @a buf
*/
static size_t
-core_transmit_notify (void *cls, size_t size, void *buf)
+core_transmit_notify (void *cls,
+ size_t size,
+ void *buf)
{
struct PeerInfo *peer = cls;
char *cbuf = buf;
GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
GNUNET_CORE_PRIO_BEST_EFFORT,
GNUNET_TIME_absolute_get_remaining
- (pending->timeout), &peer->id,
+ (pending->timeout),
+ &peer->id,
ntohs (pending->msg->size),
- &core_transmit_notify, peer);
+ &core_transmit_notify,
+ peer);
GNUNET_break (NULL != peer->th);
}
bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, key);
pos = k_buckets[bucket_num].head;
count = 0;
- while ((pos != NULL) && (count < bucket_size))
+ while ((NULL != pos) && (count < bucket_size))
{
GNUNET_CRYPTO_hash (&pos->id,
sizeof (struct GNUNET_PeerIdentity),
&phash);
- if ((bloom != NULL) &&
+ if ((NULL != bloom) &&
(GNUNET_YES ==
GNUNET_CONTAINER_bloomfilter_test (bloom, &phash)))
{
* @return Peer to route to, or NULL on error
*/
static struct PeerInfo *
-select_peer (const struct GNUNET_HashCode * key,
- const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops)
+select_peer (const struct GNUNET_HashCode *key,
+ const struct GNUNET_CONTAINER_BloomFilter *bloom,
+ uint32_t hops)
{
unsigned int bc;
unsigned int count;
ntohl (put->desired_replication_level),
GNUNET_TIME_absolute_ntoh (put->expiration_time),
ntohl (put->hop_count), bf,
- &put->key, putlen,
- pp, payload, payload_size);
+ &put->key,
+ putlen,
+ pp,
+ payload,
+ payload_size);
/* notify monitoring clients */
GDS_CLIENTS_process_put (options
- | (GNUNET_OK == forwarded)
- ? GNUNET_DHT_RO_LAST_HOP : 0,
+ | ( (GNUNET_OK == forwarded)
+ ? GNUNET_DHT_RO_LAST_HOP
+ : 0 ),
ntohl (put->type),
ntohl (put->hop_count),
ntohl (put->desired_replication_level),
char *tmp;
tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
- LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N RESULT %s: %s->%s (%u)\n",
- GNUNET_h2s (&prm->key), GNUNET_i2s (peer), tmp,
+ LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
+ "R5N RESULT %s: %s->%s (%u)\n",
+ GNUNET_h2s (&prm->key),
+ GNUNET_i2s (peer),
+ tmp,
get_path_length + 1);
GNUNET_free (tmp);
}
{
const struct GNUNET_MessageHeader *h;
struct GNUNET_PeerIdentity pid;
- int bucket;
/* Should be a HELLO, validate and consider using it! */
if (data_size < sizeof (struct GNUNET_MessageHeader))
(0 != memcmp (&my_identity,
&pid,
sizeof (struct GNUNET_PeerIdentity))) )
- {
- struct GNUNET_HashCode pid_hash;
-
- GNUNET_CRYPTO_hash (&pid,
- sizeof (struct GNUNET_PeerIdentity),
- &pid_hash);
- bucket = find_bucket (&pid_hash);
- if ( (bucket >= 0) &&
- (k_buckets[bucket].peers_size < bucket_size) &&
- (NULL != GDS_transport_handle) )
- {
- GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
- h,
- NULL,
- NULL);
- GNUNET_TRANSPORT_try_connect (GDS_transport_handle,
- &pid,
- NULL,
- NULL); /*FIXME TRY_CONNECT change */
- }
- }
+ try_connect (&pid,
+ h);
}
/* append 'peer' to 'get_path' */
{
struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
- memcpy (xget_path, get_path,
+ memcpy (xget_path,
+ get_path,
get_path_length * sizeof (struct GNUNET_PeerIdentity));
xget_path[get_path_length] = *peer;
get_path_length++;
/* forward to local clients */
GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
- &prm->key, get_path_length, xget_path,
- put_path_length, put_path, type, data_size, data);
+ &prm->key,
+ get_path_length,
+ xget_path,
+ put_path_length,
+ put_path,
+ type,
+ data_size,
+ data);
GDS_CLIENTS_process_get_resp (type,
- xget_path, get_path_length,
+ xget_path,
+ get_path_length,
put_path, put_path_length,
- GNUNET_TIME_absolute_ntoh (
- prm->expiration_time),
+ GNUNET_TIME_absolute_ntoh (prm->expiration_time),
&prm->key,
data,
data_size);
GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
&prm->key,
- get_path_length + put_path_length, xput_path,
- type, data_size, data);
+ get_path_length + put_path_length,
+ xput_path,
+ type,
+ data_size,
+ data);
}
/* forward to other peers */
- GDS_ROUTING_process (type, GNUNET_TIME_absolute_ntoh (prm->expiration_time),
- &prm->key, put_path_length, put_path, get_path_length,
- xget_path, data, data_size);
+ GDS_ROUTING_process (type,
+ GNUNET_TIME_absolute_ntoh (prm->expiration_time),
+ &prm->key,
+ put_path_length,
+ put_path,
+ get_path_length,
+ xget_path,
+ data,
+ data_size);
}
return GNUNET_YES;
/**
* Initialize neighbours subsystem.
*
- * @return GNUNET_OK on success, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
*/
int
GDS_NEIGHBOURS_init ()
log_route_details_stderr =
(NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
- ats_perf = GNUNET_ATS_performance_init (GDS_cfg,
- NULL,
- NULL);
ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
core_api =
- GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect,
- &handle_core_disconnect, NULL, GNUNET_NO, NULL,
- GNUNET_NO, core_handlers);
+ GNUNET_CORE_connect (GDS_cfg, NULL,
+ &core_init,
+ &handle_core_connect,
+ &handle_core_disconnect,
+ NULL, GNUNET_NO,
+ NULL, GNUNET_NO,
+ core_handlers);
if (core_api == NULL)
return GNUNET_SYSERR;
all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
- GNUNET_NO);
+ GNUNET_NO);
+ all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
+ GNUNET_NO);
return GNUNET_OK;
}
return;
GNUNET_CORE_disconnect (core_api);
core_api = NULL;
- GNUNET_ATS_performance_done (ats_perf);
- ats_perf = NULL;
- GNUNET_ATS_connectivity_done (ats_ch);
- ats_ch = NULL;
GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
all_connected_peers = NULL;
+ GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
+ &free_connect_info,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
+ all_desired_peers = NULL;
+ GNUNET_ATS_connectivity_done (ats_ch);
+ ats_ch = NULL;
if (NULL != find_peer_task)
{
GNUNET_SCHEDULER_cancel (find_peer_task);