GNUNET_assert(dhtlog_handle != NULL);
fprintf(stderr, "topology iteration finished (%u connections), scheduling continuation\n", topo_ctx->total_connections);
dhtlog_handle->update_topology(topo_ctx->total_connections);
- GNUNET_SCHEDULER_add_now (sched, topo_ctx->cont, topo_ctx->cls);
+ if (topo_ctx->cont != NULL)
+ GNUNET_SCHEDULER_add_now (sched, topo_ctx->cont, topo_ctx->cls);
GNUNET_free(topo_ctx);
}
}
struct TestGetContext *test_get;
int remember[num_puts][num_peers];
+ memset(&remember, 0, sizeof(int) * num_puts * num_peers);
for (i = 0; i < num_puts; i++)
{
test_put = GNUNET_malloc(sizeof(struct TestPutContext));
static void
continue_puts_and_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
{
+ int i;
struct TopologyIteratorContext *topo_ctx;
if (dhtlog_handle != NULL)
{
+ for (i = 1; i < (settle_time / 60) - 2; i++)
+ {
+ topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
+ fprintf(stderr, "scheduled topology iteration in %d minutes\n", i);
+ GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, i), &capture_current_topology, topo_ctx);
+ }
topo_ctx = GNUNET_malloc(sizeof(struct TopologyIteratorContext));
topo_ctx->cont = &setup_puts_and_gets;
GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, settle_time), &capture_current_topology, topo_ctx);
die_task = GNUNET_SCHEDULER_add_now (sched,
&end_badly, "from create topology (bad return)");
}
+ GNUNET_free_non_null(blacklist_transports);
GNUNET_SCHEDULER_cancel (sched, die_task);
die_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_relative_multiply(seconds_per_peer_start, num_peers),
return;
}
+ GNUNET_free_non_null(hostfile);
+
buf = data;
count = 0;
while (count < frstat.st_size)
/**
* Get testing related options.
*/
+ topology_str = NULL;
if ((GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "topology",
&topology_str)) && (GNUNET_NO == GNUNET_TESTING_topology_get(&topology, topology_str)))
else
{
topology_percentage = atof (topology_percentage_str);
+ GNUNET_free(topology_percentage_str);
}
if (GNUNET_OK !=
else
{
topology_probability = atof (topology_probability_str);
+ GNUNET_free(topology_probability_str);
}
if ((GNUNET_YES ==
"Invalid connect topology `%s' given for section %s option %s\n", connect_topology_str, "TESTING", "CONNECT_TOPOLOGY");
}
GNUNET_free_non_null(connect_topology_str);
+
if ((GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_string(cfg, "testing", "connect_topology_option",
&connect_topology_option_str)) && (GNUNET_NO == GNUNET_TESTING_topology_option_get(&connect_topology_option, connect_topology_option_str)))
connect_topology_option = GNUNET_TESTING_TOPOLOGY_OPTION_ALL; /* Defaults to NONE, set to ALL */
}
GNUNET_free_non_null(connect_topology_option_str);
+
if (GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_string (cfg, "testing", "connect_topology_option_modifier",
&connect_topology_option_modifier_string))
malicious_droppers, "");
}
+ GNUNET_free_non_null(trialmessage);
+
hostkey_meter = create_meter(peers_left, "Hostkeys created ", GNUNET_YES);
peer_start_meter = create_meter(peers_left, "Peers started ", GNUNET_YES);
#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
+#define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
+
/**
* Real maximum number of hops, at which point we refuse
* to forward the message.
*/
unsigned int distance;
+ /**
+ * Task for scheduling periodic ping messages for this peer.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ping_task;
+
};
/**
{
struct P2PPendingMessage *pos;
struct P2PPendingMessage *next;
- //fprintf(stderr, "BEFORE REMOVAL\n");
- //print_routing_table();
#if EXTRA_CHECKS
struct PeerInfo *peer_pos;
}
GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey));
- GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer));
+ GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer);
GNUNET_free(peer);
- //fprintf(stderr, "AFTER REMOVAL\n");
- //print_routing_table();
}
/* Remove peers from lowest bucket, insert into next lowest bucket */
GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy(to_remove);
lowest_bucket = lowest_bucket - 1;
#if PRINT_TABLES
fprintf(stderr, "Printing RT after new bucket\n");
#endif
}
+/**
+ * Function called to send a request out to another peer.
+ * Called both for locally initiated requests and those
+ * received from other peers.
+ *
+ * @param cls DHT service closure argument (unused)
+ * @param msg the encapsulated message
+ * @param peer the peer to forward the message to
+ * @param msg_ctx the context of the message (hop count, bloom, etc.)
+ */
+static void forward_message (void *cls,
+ const struct GNUNET_MessageHeader *msg,
+ struct PeerInfo *peer,
+ struct DHT_MessageContext *msg_ctx)
+{
+ struct GNUNET_DHT_P2PRouteMessage *route_message;
+ struct P2PPendingMessage *pending;
+ size_t msize;
+ size_t psize;
+
+ msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
+ GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ psize = sizeof(struct P2PPendingMessage) + msize;
+ pending = GNUNET_malloc(psize);
+ pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
+ pending->importance = DHT_SEND_PRIORITY;
+ pending->timeout = GNUNET_TIME_relative_get_forever();
+ route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
+ route_message->header.size = htons(msize);
+ route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
+ route_message->options = htonl(msg_ctx->msg_options);
+ route_message->hop_count = htonl(msg_ctx->hop_count + 1);
+ route_message->network_size = htonl(msg_ctx->network_size);
+ route_message->desired_replication_level = htonl(msg_ctx->replication);
+ route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
+ if (msg_ctx->bloom != NULL)
+ GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
+ if (msg_ctx->key != NULL)
+ memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
+ memcpy(&route_message[1], msg, ntohs(msg->size));
+#if DEBUG_DHT > 1
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
+#endif
+ GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
+ if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
+ peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
+}
+
+#if DO_PING
+/**
+ * Task used to send ping messages to peers so that
+ * they don't get disconnected.
+ *
+ * @param cls the peer to send a ping message to
+ * @param tc context, reason, etc.
+ */
+static void
+periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerInfo *peer = cls;
+ struct GNUNET_MessageHeader ping_message;
+ struct DHT_MessageContext message_context;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+ return;
+
+ ping_message.size = htons(sizeof(struct GNUNET_MessageHeader));
+ ping_message.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_PING);
+
+ memset(&message_context, 0, sizeof(struct DHT_MessageContext));
+#if DEBUG_PING
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n", my_short_id, "DHT", GNUNET_i2s(&peer->id));
+#endif
+ forward_message(NULL, &ping_message, peer, &message_context);
+ peer->ping_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PING_DELAY, &periodic_ping_task, peer);
+}
+
+/**
+ * Schedule PING messages for the top X peers in each
+ * bucket of the routing table (so core won't disconnect them!)
+ */
+void schedule_ping_messages()
+{
+ unsigned int bucket;
+ unsigned int count;
+ struct PeerInfo *pos;
+ for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
+ {
+ pos = k_buckets[bucket].head;
+ count = 0;
+ while (pos != NULL)
+ {
+ if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK))
+ GNUNET_SCHEDULER_add_now(sched, &periodic_ping_task, pos);
+ else if ((count >= bucket_size) && (pos->ping_task != GNUNET_SCHEDULER_NO_TASK))
+ {
+ GNUNET_SCHEDULER_cancel(sched, pos->ping_task);
+ pos->ping_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ pos = pos->next;
+ count++;
+ }
+ }
+}
+#endif
/**
* Attempt to add a peer to our k-buckets.
if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
enable_next_bucket();
-
+#if DO_PING
+ schedule_ping_messages();
+#endif
return new_peer;
}
/**
* If a find peer result message is received and contains a valid
* HELLO for another peer, offer it to the transport service.
- *
- * FIXME: Check whether we need this peer (based on routing table
- * fullness) and only try to connect to it conditionally. This should
- * reduce trying to connect to say (500) peers when the bucket size will
- * discard most of them.
*/
if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
{
new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
new_msg_ctx->hop_count = 0;
route_result_message(cls, find_peer_result, new_msg_ctx);
+ GNUNET_free(new_msg_ctx);
#if DEBUG_DHT_ROUTING
if ((debug_routes) && (dhtlog_handle != NULL))
{
return GNUNET_NO;
else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
{
+ return GNUNET_YES;
/* FIXME: why not just return GNUNET_YES here? We are certainly close. */
- if (distance(&pos->id.hashPubKey, target) < my_distance)
- return GNUNET_NO;
+ /*if (distance(&pos->id.hashPubKey, target) < my_distance)
+ return GNUNET_NO;*/
}
pos = pos->next;
}
#endif
}
-/**
- * Function called to send a request out to another peer.
- * Called both for locally initiated requests and those
- * received from other peers.
- *
- * @param cls DHT service closure argument
- * @param msg the encapsulated message
- * @param peer the peer to forward the message to
- * @param msg_ctx the context of the message (hop count, bloom, etc.)
- */
-static void forward_message (void *cls,
- const struct GNUNET_MessageHeader *msg,
- struct PeerInfo *peer,
- struct DHT_MessageContext *msg_ctx)
-{
- struct GNUNET_DHT_P2PRouteMessage *route_message;
- struct P2PPendingMessage *pending;
- size_t msize;
- size_t psize;
-
- msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
- GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
- psize = sizeof(struct P2PPendingMessage) + msize;
- pending = GNUNET_malloc(psize);
- pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
- pending->importance = DHT_SEND_PRIORITY;
- pending->timeout = GNUNET_TIME_relative_get_forever();
- route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
- route_message->header.size = htons(msize);
- route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
- route_message->options = htonl(msg_ctx->msg_options);
- route_message->hop_count = htonl(msg_ctx->hop_count + 1);
- route_message->network_size = htonl(msg_ctx->network_size);
- route_message->desired_replication_level = htonl(msg_ctx->replication);
- route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
- GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
- memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
- memcpy(&route_message[1], msg, ntohs(msg->size));
-#if DEBUG_DHT > 1
- GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
-#endif
- GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
- if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
- peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
-}
/**
* Task used to remove forwarding entries, either
struct GNUNET_MessageHeader *find_peer_msg;
struct DHT_MessageContext message_context;
int ret;
+ struct GNUNET_TIME_Relative next_send_time;
if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
return;
message_context.peer = &my_identity;
ret = route_message(NULL, find_peer_msg, &message_context);
-
+ GNUNET_free(find_peer_msg);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT",
"FIND PEER", ret);
+ next_send_time = DHT_DEFAULT_FIND_PEER_INTERVAL;
+ next_send_time.value = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, next_send_time.value * 3);
GNUNET_SCHEDULER_add_delayed (sched,
- DHT_DEFAULT_FIND_PEER_INTERVAL,
+ next_send_time,
&send_find_peer_message, NULL);
}
struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
struct DHT_MessageContext *message_context;
+ if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/
+ {
+#if DEBUG_PING
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n", my_short_id, "DHT");
+#endif
+ return GNUNET_YES;
+ }
+
if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
GNUNET_break_op(0);
latency,
distance);
if (ret != NULL)
- GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ {
+ GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ }
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED");
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
+ int random_seconds;
sched = scheduler;
cfg = c;
datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
}
#if DO_FIND_PEER
+ random_seconds = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 180);
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30),
&send_find_peer_message, NULL);
static unsigned long long current_trial = 0; /* I like to assign 0, just to remember */
-static char *user;
-
-static char *password;
-
-static char *server;
-
-static char *database;
-
-static unsigned long long port;
-
/**
* Connection to the MySQL Server.
*/
}
/**
- * Create a prepared statement.
+ * Close a prepared statement.
*
* @return NULL on error
*/
if (s == NULL)
return;
- if (s->query != NULL)
- GNUNET_free(s->query);
+ GNUNET_free_non_null(s->query);
if (s->valid == GNUNET_YES)
mysql_stmt_close(s->statement);
GNUNET_free(s);
* Initialize the prepared statements for use with dht test logging
*/
static int
-iopen ()
+iopen (struct GNUNET_DHTLOG_Plugin *plugin)
{
int ret;
+ my_bool reconnect;
+ unsigned int timeout;
+ char *user;
+ char *password;
+ char *server;
+ char *database;
+ unsigned long long port;
conn = mysql_init (NULL);
if (conn == NULL)
return GNUNET_SYSERR;
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
+ "MYSQL", "DATABASE",
+ &database))
+ {
+ database = GNUNET_strdup("gnunet");
+ }
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
+ "MYSQL", "USER", &user))
+ {
+ user = GNUNET_strdup("dht");
+ }
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
+ "MYSQL", "PASSWORD", &password))
+ {
+ password = GNUNET_strdup("dhttest**");
+ }
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
+ "MYSQL", "SERVER", &server))
+ {
+ server = GNUNET_strdup("localhost");
+ }
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (plugin->cfg,
+ "MYSQL", "MYSQL_PORT", &port))
+ {
+ port = 0;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to mysql with: user %s, pass %s, server %s, database %s, port %d\n",
user, password, server, database, port);
+ reconnect = 0;
+ timeout = 60; /* in seconds */
+ mysql_options (conn, MYSQL_OPT_RECONNECT, &reconnect);
+ mysql_options (conn,
+ MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
+ mysql_options(conn, MYSQL_SET_CHARSET_NAME, "UTF8");
+ mysql_options (conn, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
+ mysql_options (conn, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
mysql_real_connect (conn, server, user, password,
- database, (unsigned int) port, NULL, 0);
+ database, (unsigned int) port, NULL, CLIENT_IGNORE_SIGPIPE);
+
+ GNUNET_free_non_null(server);
+ GNUNET_free_non_null(password);
+ GNUNET_free_non_null(user);
+ GNUNET_free_non_null(database);
if (mysql_error (conn)[0])
{
va_start (ap, insert_id);
- if (mysql_stmt_prepare (s->statement, s->query, strlen (s->query)))
- {
- GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "mysql_stmt_prepare ERROR");
- return GNUNET_SYSERR;
- }
-
if (GNUNET_OK != init_params (s, ap))
{
va_end (ap);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MySQL DHT Logger: initializing database\n");
#endif
- if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
- "MYSQL", "DATABASE",
- &database))
- {
- database = GNUNET_strdup("gnunet");
- }
-
- if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
- "MYSQL", "USER", &user))
- {
- user = GNUNET_strdup("dht");
- }
-
- if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
- "MYSQL", "PASSWORD", &password))
- {
- password = GNUNET_strdup("dhttest**");
- }
-
- if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (plugin->cfg,
- "MYSQL", "SERVER", &server))
- {
- server = GNUNET_strdup("localhost");
- }
-
- if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (plugin->cfg,
- "MYSQL", "MYSQL_PORT", &port))
- {
- port = 0;
- }
-
- if (iopen () != GNUNET_OK)
+ if (iopen (plugin) != GNUNET_OK)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Failed to initialize MySQL database connection for dhtlog.\n"));
return NULL;
}
+
GNUNET_assert(plugin->dhtlog_api == NULL);
plugin->dhtlog_api = GNUNET_malloc(sizeof(struct GNUNET_DHTLOG_Handle));
plugin->dhtlog_api->insert_trial = &add_trial;
if (conn != NULL)
mysql_close (conn);
-
+ conn = NULL;
+ mysql_library_end();
GNUNET_free(dhtlog_api);
return NULL;
}