* production). The associated code should also probably be removed
* once we're done with experiments.
*/
-#define ENABLE_HISTOGRAM GNUNET_NO
+#define ENABLE_HISTOGRAM GNUNET_YES
/**
* Over how many values do we calculate the weighted average?
struct NSEPeerEntry
{
- /**
- * Pending message for this peer.
- */
- struct GNUNET_MessageHeader *pending_message;
-
/**
* Core handle for sending messages to this peer.
*/
* been taken care of.
*/
int previous_round;
+
+#if ENABLE_HISTOGRAM
+
+ /**
+ * Amount of messages received from this peer on this round.
+ */
+ unsigned int received_messages;
+
+ /**
+ * Amount of messages transmitted to this peer on this round.
+ */
+ unsigned int transmitted_messages;
+
+ /**
+ * Which size did we tell the peer the network is?
+ */
+ unsigned int last_transmitted_size;
+
+#endif
+
};
+GNUNET_NETWORK_STRUCT_BEGIN
+
/**
* Network size estimate reply; sent when "this"
* peer's timer has run out before receiving a
*/
struct GNUNET_CRYPTO_RsaSignature signature;
};
-
+GNUNET_NETWORK_STRUCT_END
/**
* Handle to our current configuration.
double val;
double nsize;
+#define WEST 1
/* Weighted incremental algorithm for stddev according to West (1979) */
#if WEST
double sumweight;
mean = 0.0;
sum = 0.0;
sumweight = 0.0;
+ variance = 0.0;
for (i = 0; i < estimate_count; i++)
{
j = (estimate_index - i + HISTORY_SIZE) % HISTORY_SIZE;
val = htonl (size_estimate_messages[j].matching_bits);
- weight = 1.0; /* was: estimate_count + 1 - i; */
+ weight = estimate_count + 1 - i;
temp = weight + sumweight;
q = val - mean;
if (0 != estimate_count)
{
mean = sum / estimate_count;
- variance = (vsq - mean * sum) / (estimate_count - 1.0); // terrible for numerical stability...
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "(%f - %f) / %u = %f\n",
- vsq, mean * sum, estimate_count - 1, variance);
-
+ variance = (vsq - mean * sum) / (estimate_count - 1.0); // terrible for numerical stability...
}
#endif
- GNUNET_assert (variance >= 0);
- std_dev = sqrt (variance);
+ if (variance >= 0)
+ std_dev = sqrt (variance);
+ else
+ std_dev = variance; /* must be infinity due to estimate_count == 0 */
current_std_dev = std_dev;
current_size_estimate = mean;
em->header.type = htons (GNUNET_MESSAGE_TYPE_NSE_ESTIMATE);
em->reserved = htonl (0);
em->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
- em->size_estimate = mean - 0.332747;
+ double se = mean - 0.332747;
nsize = log2 (GNUNET_CONTAINER_multihashmap_size (peers) + 1);
- if (em->size_estimate < nsize)
- em->size_estimate = nsize;
- em->std_deviation = std_dev;
+ em->size_estimate = GNUNET_hton_double (GNUNET_MAX (se, nsize));
+ em->std_deviation = GNUNET_hton_double (std_dev);
GNUNET_STATISTICS_set (stats, "# nodes in the network (estimate)",
(uint64_t) pow (2, mean - 1.0 / 3.0), GNUNET_NO);
}
{
#if USE_RANDOM_DELAYS
struct GNUNET_TIME_Relative ret;
+ uint32_t i;
+ double d;
- if (matching_bits == 0)
- return GNUNET_TIME_UNIT_ZERO;
- ret.rel_value =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- (uint32_t) (get_matching_bits_delay
- (matching_bits -
- 1) / (double) (hop_count_max +
- 1)));
+ d = get_matching_bits_delay (matching_bits);
+ i = (uint32_t) (d / (double) (hop_count_max + 1));
+#if DEBUG_NSE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Randomizing flood using latencies up to %u ms\n",
+ (unsigned int) i);
+#endif
+ ret.rel_value = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, i + 1);
return ret;
#else
return GNUNET_TIME_UNIT_ZERO;
* @param tc scheduler context
*/
static void
-transmit_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+transmit_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
unsigned int idx;
peer_entry->th = NULL;
- if (buf == NULL)
+ if (NULL == buf)
{
/* client disconnected */
return 0;
}
GNUNET_assert (size >= sizeof (struct GNUNET_NSE_FloodMessage));
idx = estimate_index;
- if (peer_entry->previous_round == GNUNET_NO)
+ if (GNUNET_NO == peer_entry->previous_round)
{
idx = (idx + HISTORY_SIZE - 1) % HISTORY_SIZE;
peer_entry->previous_round = GNUNET_YES;
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0), &transmit_task,
+ GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0), &transmit_task_cb,
peer_entry);
}
if ((ntohl (size_estimate_messages[idx].hop_count) == 0) &&
1, GNUNET_NO);
return 0;
}
+ if (ntohs (size_estimate_messages[idx].header.size) == 0)
+ {
+ GNUNET_STATISTICS_update (stats,
+ "# flood messages not generated (lack of history)",
+ 1, GNUNET_NO);
+ return 0;
+ }
#if DEBUG_NSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"In round %llu, sending to `%s' estimate with %u bits\n",
GNUNET_STATISTICS_update (stats, "# flood messages started", 1, GNUNET_NO);
GNUNET_STATISTICS_update (stats, "# flood messages transmitted", 1,
GNUNET_NO);
+#if ENABLE_HISTOGRAM
+ peer_entry->transmitted_messages++;
+ peer_entry->last_transmitted_size =
+ ntohl(size_estimate_messages[idx].matching_bits);
+#endif
memcpy (buf, &size_estimate_messages[idx],
sizeof (struct GNUNET_NSE_FloodMessage));
- GNUNET_STATISTICS_update (stats, "# flood messages sent", 1, GNUNET_NO);
return sizeof (struct GNUNET_NSE_FloodMessage);
}
* @param tc scheduler context
*/
static void
-transmit_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmit_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct NSEPeerEntry *peer_entry = cls;
peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
+
GNUNET_assert (NULL == peer_entry->th);
peer_entry->th =
GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_NO, NSE_PRIORITY,
fm->timestamp = GNUNET_TIME_absolute_hton (ts);
fm->pkey = my_public_key;
fm->proof_of_work = my_proof;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CRYPTO_rsa_sign (my_private_key, &fm->purpose,
- &fm->signature));
+ if (nse_work_required > 0)
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CRYPTO_rsa_sign (my_private_key, &fm->purpose,
+ &fm->signature));
+ else
+ memset (&fm->signature, 0, sizeof (fm->signature));
}
struct NSEPeerEntry *peer_entry = value;
struct GNUNET_TIME_Relative delay;
- if (peer_entry->th != NULL)
+ if (NULL != peer_entry->th)
{
peer_entry->previous_round = GNUNET_NO;
return GNUNET_OK;
}
- if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
+ if (GNUNET_SCHEDULER_NO_TASK != peer_entry->transmit_task)
{
GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
peer_entry->previous_round = GNUNET_NO;
}
+#if ENABLE_HISTOGRAM
+ if (peer_entry->received_messages > 1)
+ GNUNET_STATISTICS_update(stats, "# extra messages",
+ peer_entry->received_messages - 1, GNUNET_NO);
+ peer_entry->transmitted_messages = 0;
+ peer_entry->last_transmitted_size = 0;
+ peer_entry->received_messages = 0;
+#endif
delay =
get_transmit_delay ((peer_entry->previous_round == GNUNET_NO) ? -1 : 0);
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (delay, &transmit_task, peer_entry);
+ GNUNET_SCHEDULER_add_delayed (delay, &transmit_task_cb, peer_entry);
return GNUNET_OK;
}
unsigned int i;
flood_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
offset = GNUNET_TIME_absolute_get_remaining (next_timestamp);
if (0 != offset.rel_value)
{
GNUNET_SCHEDULER_add_delayed (offset, &update_flood_message, NULL);
return;
}
- current_timestamp = next_timestamp;
- next_timestamp =
- GNUNET_TIME_absolute_add (current_timestamp, gnunet_nse_interval);
estimate_index = (estimate_index + 1) % HISTORY_SIZE;
if (estimate_count < HISTORY_SIZE)
estimate_count++;
- if (next_timestamp.abs_value ==
- GNUNET_TIME_absolute_ntoh (next_message.timestamp).abs_value)
+ current_timestamp = next_timestamp;
+ next_timestamp =
+ GNUNET_TIME_absolute_add (current_timestamp, gnunet_nse_interval);
+ if ((current_timestamp.abs_value ==
+ GNUNET_TIME_absolute_ntoh (next_message.timestamp).abs_value) &&
+ (get_matching_bits (current_timestamp, &my_identity) <
+ ntohl(next_message.matching_bits)))
{
/* we received a message for this round way early, use it! */
size_estimate_messages[estimate_index] = next_message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Proof of work found: %llu!\n",
(unsigned long long) GNUNET_ntohll (counter));
#endif
- for (i = 0; i < HISTORY_SIZE; i++)
- if (ntohl (size_estimate_messages[i].hop_count) == 0)
- {
- size_estimate_messages[i].proof_of_work = my_proof;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CRYPTO_rsa_sign (my_private_key,
- &size_estimate_messages
- [i].purpose,
- &size_estimate_messages
- [i].signature));
- }
write_proof ();
+ setup_flood_message (estimate_index, current_timestamp);
return;
}
counter++;
my_proof = counter;
}
proof_task =
- GNUNET_SCHEDULER_add_delayed (proof_find_delay, &find_proof, NULL);
+ GNUNET_SCHEDULER_add_delayed_with_priority (proof_find_delay,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ &find_proof, NULL);
}
GNUNET_break_op (0);
return GNUNET_NO;
}
- if (GNUNET_OK !=
- GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_NSE_SEND,
- &incoming_flood->purpose,
- &incoming_flood->signature,
- &incoming_flood->pkey))
+ if ((nse_work_required > 0) &&
+ (GNUNET_OK !=
+ GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_NSE_SEND,
+ &incoming_flood->purpose,
+ &incoming_flood->signature,
+ &incoming_flood->pkey)))
{
GNUNET_break_op (0);
return GNUNET_NO;
{
/* still stuck in previous round, no point to update, check that
* we are active here though... */
- GNUNET_break ((peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK) ||
- (peer_entry->th != NULL));
+ if (GNUNET_SCHEDULER_NO_TASK == peer_entry->transmit_task &&
+ NULL == peer_entry->th)
+ {
+ GNUNET_break (0);
+ }
return GNUNET_OK;
}
if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
}
delay = get_transmit_delay (0);
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (delay, &transmit_task, peer_entry);
+ GNUNET_SCHEDULER_add_delayed (delay, &transmit_task_cb, peer_entry);
return GNUNET_OK;
}
GNUNET_break (0);
return GNUNET_OK;
}
+#if ENABLE_HISTOGRAM
+ peer_entry->received_messages++;
+ if (peer_entry->transmitted_messages > 0 &&
+ peer_entry->last_transmitted_size >= matching_bits)
+ GNUNET_STATISTICS_update(stats, "# cross messages", 1, GNUNET_NO);
+#endif
ts = GNUNET_TIME_absolute_ntoh (incoming_flood->timestamp);
-
if (ts.abs_value == current_timestamp.abs_value)
idx = estimate_index;
else if (ts.abs_value ==
current_timestamp.abs_value - gnunet_nse_interval.rel_value)
idx = (estimate_index + HISTORY_SIZE - 1) % HISTORY_SIZE;
- else if (ts.abs_value ==
- next_timestamp.abs_value - gnunet_nse_interval.rel_value)
+ else if (ts.abs_value == next_timestamp.abs_value)
{
if (matching_bits <= ntohl (next_message.matching_bits))
return GNUNET_OK; /* ignore, simply too early/late */
update_network_size_estimate ();
return GNUNET_OK;
}
- if (matching_bits >= ntohl (size_estimate_messages[idx].matching_bits))
+ if (matching_bits == ntohl (size_estimate_messages[idx].matching_bits))
{
- /* cancel transmission from us to this peer for this round */
- if (idx == estimate_index)
+ /* Cancel transmission in the other direction, as this peer clearly has
+ up-to-date information already. Even if we didn't talk to this peer in
+ the previous round, we should no longer send it stale information as it
+ told us about the current round! */
+ peer_entry->previous_round = GNUNET_YES;
+ if (idx != estimate_index)
{
- if (peer_entry->previous_round == GNUNET_YES)
- {
- /* cancel any activity for current round */
- if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
- peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (peer_entry->th != NULL)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th);
- peer_entry->th = NULL;
- }
- }
+ /* do not transmit information for the previous round to this peer
+ anymore (but allow current round) */
+ return GNUNET_OK;
}
- else
+ /* got up-to-date information for current round, cancel transmission to
+ * this peer altogether */
+ if (GNUNET_SCHEDULER_NO_TASK != peer_entry->transmit_task)
{
- /* cancel previous round only */
- peer_entry->previous_round = GNUNET_YES;
+ GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
+ peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (peer_entry->th != NULL)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th);
+ peer_entry->th = NULL;
}
- }
- if (matching_bits == ntohl (size_estimate_messages[idx].matching_bits))
return GNUNET_OK;
- if (matching_bits <= ntohl (size_estimate_messages[idx].matching_bits))
+ }
+ if (matching_bits < ntohl (size_estimate_messages[idx].matching_bits))
{
- if ((idx < estimate_index) && (peer_entry->previous_round == GNUNET_YES))
+ if ((idx < estimate_index) && (peer_entry->previous_round == GNUNET_YES)) {
peer_entry->previous_round = GNUNET_NO;
+ }
/* push back our result now, that peer is spreading bad information... */
if (NULL == peer_entry->th)
{
if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_now (&transmit_task, peer_entry);
+ GNUNET_SCHEDULER_add_now (&transmit_task_cb, peer_entry);
}
/* Not closer than our most recent message, no need to do work here */
GNUNET_STATISTICS_update (stats,
GNUNET_break_op (0);
return GNUNET_OK;
}
+ GNUNET_assert (matching_bits >
+ ntohl (size_estimate_messages[idx].matching_bits));
+ /* cancel transmission from us to this peer for this round */
+ if (idx == estimate_index)
+ {
+ /* Cancel transmission in the other direction, as this peer clearly has
+ up-to-date information already. Even if we didn't talk to this peer in
+ the previous round, we should no longer send it stale information as it
+ told us about the current round! */
+ peer_entry->previous_round = GNUNET_YES;
+ /* cancel any activity for current round */
+ if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
+ peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (peer_entry->th != NULL)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th);
+ peer_entry->th = NULL;
+ }
+ }
+ else
+ {
+ /* cancel previous round only */
+ peer_entry->previous_round = GNUNET_YES;
+ }
size_estimate_messages[idx] = *incoming_flood;
size_estimate_messages[idx].hop_count =
htonl (ntohl (incoming_flood->hop_count) + 1);
hop_count_max =
GNUNET_MAX (ntohl (incoming_flood->hop_count) + 1, hop_count_max);
+ GNUNET_STATISTICS_set (stats,
+ "# estimated network diameter",
+ hop_count_max, GNUNET_NO);
/* have a new, better size estimate, inform clients */
update_network_size_estimate ();
/**
- * Method called whenever a peer connects.
+ * Method called whenever a peer connects. Sets up the PeerEntry and
+ * schedules the initial size info transmission to this peer.
*
* @param cls closure
* @param peer peer identity this notification is about
#endif
peer_entry = GNUNET_malloc (sizeof (struct NSEPeerEntry));
peer_entry->id = *peer;
- GNUNET_CONTAINER_multihashmap_put (peers, &peer->hashPubKey, peer_entry,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (peers, &peer->hashPubKey,
+ peer_entry,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (get_transmit_delay (-1), &transmit_task,
+ GNUNET_SCHEDULER_add_delayed (get_transmit_delay (-1), &transmit_task_cb,
peer_entry);
+ GNUNET_STATISTICS_update (stats, "# peers", 1, GNUNET_NO);
}
/**
- * Method called whenever a peer disconnects.
+ * Method called whenever a peer disconnects. Deletes the PeerEntry and cancels
+ * any pending transmission requests to that peer.
*
* @param cls closure
* @param peer peer identity this notification is about
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (peers, &peer->hashPubKey,
pos));
- if (pos->transmit_task != GNUNET_SCHEDULER_NO_TASK)
+ if (pos->transmit_task != GNUNET_SCHEDULER_NO_TASK) {
GNUNET_SCHEDULER_cancel (pos->transmit_task);
+ pos->transmit_task = GNUNET_SCHEDULER_NO_TASK;
+ }
if (pos->th != NULL)
{
GNUNET_CORE_notify_transmit_ready_cancel (pos->th);
pos->th = NULL;
}
GNUNET_free (pos);
+ GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
}
{
struct GNUNET_TIME_Absolute now;
struct GNUNET_TIME_Absolute prev_time;
- unsigned int i;
- if (server == NULL)
+ if (NULL == server)
{
-#if DEBUG_NSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection to core FAILED!\n");
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Connection to core FAILED!\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
current_timestamp.abs_value =
(now.abs_value / gnunet_nse_interval.rel_value) *
gnunet_nse_interval.rel_value;
- next_timestamp.abs_value =
- current_timestamp.abs_value + gnunet_nse_interval.rel_value;
-
- for (i = 0; i < HISTORY_SIZE; i++)
+ next_timestamp =
+ GNUNET_TIME_absolute_add (current_timestamp, gnunet_nse_interval);
+ estimate_index = HISTORY_SIZE - 1;
+ estimate_count = 0;
+ if (GNUNET_YES == check_proof_of_work (&my_public_key, my_proof))
{
+ int idx = (estimate_index + HISTORY_SIZE - 1) % HISTORY_SIZE;
prev_time.abs_value =
- current_timestamp.abs_value - (HISTORY_SIZE - i -
- 1) * gnunet_nse_interval.rel_value;
- setup_flood_message (i, prev_time);
+ current_timestamp.abs_value - gnunet_nse_interval.rel_value;
+ setup_flood_message (idx, prev_time);
+ setup_flood_message (estimate_index, current_timestamp);
+ estimate_count++;
}
- estimate_index = HISTORY_SIZE - 1;
- estimate_count = 2;
flood_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
(next_timestamp), &update_flood_message,
nc = GNUNET_SERVER_notification_context_create (server, 1);
/* Connect to core service and register core handlers */
coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */
- 1,
- NULL, /* Closure passed to functions */
+ 1, NULL, /* Closure passed to functions */
&core_init, /* Call core_init once connected */
&handle_core_connect, /* Handle connects */
&handle_core_disconnect, /* Handle disconnects */