* been taken care of.
*/
int previous_round;
+
+#if ENABLE_HISTOGRAM
+
+ /**
+ * Amount of messages received from this peer on this round.
+ */
+ unsigned int received_messages;
+
+ /**
+ * Amount of messages transmitted to this peer on this round.
+ */
+ unsigned int transmitted_messages;
+
+ /**
+ * Which size did we tell the peer the network is?
+ */
+ unsigned int last_transmitted_size;
+
+#endif
+
};
+GNUNET_NETWORK_STRUCT_BEGIN
+
/**
* Network size estimate reply; sent when "this"
* peer's timer has run out before receiving a
*/
struct GNUNET_CRYPTO_RsaSignature signature;
};
-
+GNUNET_NETWORK_STRUCT_END
/**
* Handle to our current configuration.
em->header.type = htons (GNUNET_MESSAGE_TYPE_NSE_ESTIMATE);
em->reserved = htonl (0);
em->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
- em->size_estimate = mean - 0.332747;
+ double se = mean - 0.332747;
nsize = log2 (GNUNET_CONTAINER_multihashmap_size (peers) + 1);
- if (em->size_estimate < nsize)
- em->size_estimate = nsize;
- em->std_deviation = std_dev;
+ em->size_estimate = GNUNET_hton_double (GNUNET_MAX (se, nsize));
+ em->std_deviation = GNUNET_hton_double (std_dev);
GNUNET_STATISTICS_set (stats, "# nodes in the network (estimate)",
(uint64_t) pow (2, mean - 1.0 / 3.0), GNUNET_NO);
}
uint32_t i;
double d;
- if (matching_bits == 0)
- return GNUNET_TIME_UNIT_ZERO;
- d = get_matching_bits_delay (matching_bits - 1);
+ d = get_matching_bits_delay (matching_bits);
i = (uint32_t) (d / (double) (hop_count_max + 1));
+#if DEBUG_NSE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Randomizing flood using latencies up to %u ms\n",
+ (unsigned int) i);
+#endif
ret.rel_value = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, i + 1);
return ret;
#else
* @param tc scheduler context
*/
static void
-transmit_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+transmit_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
/**
idx = (idx + HISTORY_SIZE - 1) % HISTORY_SIZE;
peer_entry->previous_round = GNUNET_YES;
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0), &transmit_task,
+ GNUNET_SCHEDULER_add_delayed (get_transmit_delay (0), &transmit_task_cb,
peer_entry);
}
if ((ntohl (size_estimate_messages[idx].hop_count) == 0) &&
GNUNET_STATISTICS_update (stats, "# flood messages started", 1, GNUNET_NO);
GNUNET_STATISTICS_update (stats, "# flood messages transmitted", 1,
GNUNET_NO);
+#if ENABLE_HISTOGRAM
+ peer_entry->transmitted_messages++;
+ peer_entry->last_transmitted_size =
+ ntohl(size_estimate_messages[idx].matching_bits);
+#endif
memcpy (buf, &size_estimate_messages[idx],
sizeof (struct GNUNET_NSE_FloodMessage));
- GNUNET_STATISTICS_update (stats, "# flood messages sent", 1, GNUNET_NO);
return sizeof (struct GNUNET_NSE_FloodMessage);
}
* @param tc scheduler context
*/
static void
-transmit_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmit_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct NSEPeerEntry *peer_entry = cls;
GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
peer_entry->previous_round = GNUNET_NO;
}
+#if ENABLE_HISTOGRAM
+ if (peer_entry->received_messages > 1)
+ GNUNET_STATISTICS_update(stats, "# extra messages",
+ peer_entry->received_messages - 1, GNUNET_NO);
+ peer_entry->transmitted_messages = 0;
+ peer_entry->last_transmitted_size = 0;
+ peer_entry->received_messages = 0;
+#endif
delay =
get_transmit_delay ((peer_entry->previous_round == GNUNET_NO) ? -1 : 0);
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (delay, &transmit_task, peer_entry);
+ GNUNET_SCHEDULER_add_delayed (delay, &transmit_task_cb, peer_entry);
return GNUNET_OK;
}
estimate_index = (estimate_index + 1) % HISTORY_SIZE;
if (estimate_count < HISTORY_SIZE)
estimate_count++;
- if (next_timestamp.abs_value ==
- GNUNET_TIME_absolute_ntoh (next_message.timestamp).abs_value)
+ if ((current_timestamp.abs_value ==
+ GNUNET_TIME_absolute_ntoh (next_message.timestamp).abs_value) &&
+ (get_matching_bits (current_timestamp, &my_identity) >
+ ntohl(next_message.matching_bits)))
{
/* we received a message for this round way early, use it! */
size_estimate_messages[estimate_index] = next_message;
my_proof = counter;
}
proof_task =
- GNUNET_SCHEDULER_add_delayed (proof_find_delay, &find_proof, NULL);
+ GNUNET_SCHEDULER_add_delayed_with_priority (proof_find_delay,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ &find_proof, NULL);
}
{
/* still stuck in previous round, no point to update, check that
* we are active here though... */
- GNUNET_break ((peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK) ||
- (peer_entry->th != NULL));
+ GNUNET_break (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK);
return GNUNET_OK;
}
if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
}
delay = get_transmit_delay (0);
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (delay, &transmit_task, peer_entry);
+ GNUNET_SCHEDULER_add_delayed (delay, &transmit_task_cb, peer_entry);
return GNUNET_OK;
}
GNUNET_break (0);
return GNUNET_OK;
}
+#if ENABLE_HISTOGRAM
+ peer_entry->received_messages++;
+ if (peer_entry->transmitted_messages > 0 &&
+ peer_entry->last_transmitted_size >= matching_bits)
+ GNUNET_STATISTICS_update(stats, "# cross messages", 1, GNUNET_NO);
+#endif
ts = GNUNET_TIME_absolute_ntoh (incoming_flood->timestamp);
-
if (ts.abs_value == current_timestamp.abs_value)
idx = estimate_index;
else if (ts.abs_value ==
update_network_size_estimate ();
return GNUNET_OK;
}
- if (matching_bits >= ntohl (size_estimate_messages[idx].matching_bits))
- {
- /* cancel transmission from us to this peer for this round */
- if (idx == estimate_index)
- {
- if (peer_entry->previous_round == GNUNET_YES)
- {
- /* cancel any activity for current round */
- if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
- peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (peer_entry->th != NULL)
- {
- GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th);
- peer_entry->th = NULL;
- }
- }
- }
- else
- {
- /* cancel previous round only */
- peer_entry->previous_round = GNUNET_YES;
- }
- }
if (matching_bits == ntohl (size_estimate_messages[idx].matching_bits))
{
- /* cancel transmission in the other direction, as this peer clearly has
- up-to-date information already */
+ /* Cancel transmission in the other direction, as this peer clearly has
+ up-to-date information already. Even if we didn't talk to this peer in
+ the previous round, we should no longer send it stale information as it
+ told us about the current round! */
+ peer_entry->previous_round = GNUNET_YES;
if (idx != estimate_index)
{
/* do not transmit information for the previous round to this peer
- anymore (but allow current round) */
- peer_entry->previous_round = GNUNET_YES;
+ anymore (but allow current round) */
return GNUNET_OK;
}
- /* got up-to-date information for current round, cancel transmission altogether */
+ /* got up-to-date information for current round, cancel transmission to
+ * this peer altogether */
if (GNUNET_SCHEDULER_NO_TASK != peer_entry->transmit_task)
{
GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
}
return GNUNET_OK;
}
- if (matching_bits <= ntohl (size_estimate_messages[idx].matching_bits))
+ if (matching_bits < ntohl (size_estimate_messages[idx].matching_bits))
{
if ((idx < estimate_index) && (peer_entry->previous_round == GNUNET_YES))
peer_entry->previous_round = GNUNET_NO;
if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_now (&transmit_task, peer_entry);
+ GNUNET_SCHEDULER_add_now (&transmit_task_cb, peer_entry);
}
/* Not closer than our most recent message, no need to do work here */
GNUNET_STATISTICS_update (stats,
GNUNET_break_op (0);
return GNUNET_OK;
}
+ GNUNET_assert (matching_bits >
+ ntohl (size_estimate_messages[idx].matching_bits));
+ /* cancel transmission from us to this peer for this round */
+ if (idx == estimate_index)
+ {
+ /* cancel any activity for current round */
+ if (peer_entry->transmit_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (peer_entry->transmit_task);
+ peer_entry->transmit_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (peer_entry->th != NULL)
+ {
+ GNUNET_CORE_notify_transmit_ready_cancel (peer_entry->th);
+ peer_entry->th = NULL;
+ }
+ }
+ else
+ {
+ /* cancel previous round only */
+ peer_entry->previous_round = GNUNET_YES;
+ }
size_estimate_messages[idx] = *incoming_flood;
size_estimate_messages[idx].hop_count =
htonl (ntohl (incoming_flood->hop_count) + 1);
peer_entry,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
peer_entry->transmit_task =
- GNUNET_SCHEDULER_add_delayed (get_transmit_delay (-1), &transmit_task,
+ GNUNET_SCHEDULER_add_delayed (get_transmit_delay (-1), &transmit_task_cb,
peer_entry);
GNUNET_STATISTICS_update (stats, "# peers", 1, GNUNET_NO);
}
if (server == NULL)
{
-#if DEBUG_NSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection to core FAILED!\n");
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Connection to core FAILED!\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
current_timestamp.abs_value =
(now.abs_value / gnunet_nse_interval.rel_value) *
gnunet_nse_interval.rel_value;
- next_timestamp.abs_value =
- current_timestamp.abs_value + gnunet_nse_interval.rel_value;
+ next_timestamp =
+ GNUNET_TIME_absolute_add (current_timestamp, gnunet_nse_interval);
estimate_index = HISTORY_SIZE - 1;
estimate_count = 0;
if (GNUNET_YES == check_proof_of_work (&my_public_key, my_proof))
{
prev_time.abs_value =
- current_timestamp.abs_value - (estimate_index -
- 1) * gnunet_nse_interval.rel_value;
+ current_timestamp.abs_value - gnunet_nse_interval.rel_value;
setup_flood_message (estimate_index, prev_time);
estimate_count++;
}