struct GNUNET_TIME_Relative flow_delay_for_other_peer;
/**
- * Desired delay for next sending we received from other peer
+ * Desired delay for transmissions we received from other peer.
+ * Adjusted to be per fragment (UDP_MTU), even though on the
+ * wire it was for "full messages".
*/
- struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+ struct GNUNET_TIME_Relative flow_delay_from_other_peer;
/**
* Session timeout task
*/
struct GNUNET_TIME_Absolute timeout;
+ /**
+ * What time did we last transmit?
+ */
+ struct GNUNET_TIME_Absolute last_transmit_time;
+
/**
* expected delay for ACKs
*/
*/
void *cont_cls;
+ /**
+ * Start time.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Transmission time for the next fragment. Incremented by
+ * the "flow_delay_from_other_peer" for each fragment when
+ * we setup the fragments.
+ */
+ struct GNUNET_TIME_Absolute next_frag_time;
+
/**
* Message timeout
*/
*/
struct UDP_FragmentationContext *frag_ctx;
+ /**
+ * Message enqueue time.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Desired transmission time for this message, based on the
+ * flow limiting information we got from the other peer.
+ */
+ struct GNUNET_TIME_Absolute transmission_time;
+
/**
* Message timeout.
*/
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
if (NULL != plugin->select_task_v4)
GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+ if (NULL != plugin->ipv4_queue_head)
+ {
+ if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Calculated flow delay for UDPv4 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated flow delay for UDPv4 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ }
plugin->select_task_v4
= GNUNET_SCHEDULER_add_read_net (min_delay,
plugin->sockv4,
min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
min_delay = GNUNET_TIME_relative_min (min_delay,
- GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
+ GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
if (NULL != plugin->select_task_v6)
GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
+ if (NULL != plugin->ipv6_queue_head)
+ {
+ if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Calculated flow delay for UDPv6 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calculated flow delay for UDPv6 at %s\n",
+ GNUNET_STRINGS_relative_time_to_string (min_delay,
+ GNUNET_YES));
+ }
+ }
plugin->select_task_v6
= GNUNET_SCHEDULER_add_read_net (min_delay,
plugin->sockv6,
struct UDP_MessageWrapper *udpw;
struct UDP_MessageWrapper *tmp;
size_t overhead;
+ struct GNUNET_TIME_Relative delay;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%p: Fragmented message removed with result %s\n",
overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
else
overhead = frag_ctx->on_wire_size;
+ delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
+ if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Fragmented message acknowledged after %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Fragmented message acknowledged after %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+
if (NULL != frag_ctx->cont)
frag_ctx->cont (frag_ctx->cont_cls,
&s->target,
udpw->msg_size = msg_len;
udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
udpw->timeout = frag_ctx->timeout;
+ udpw->start_time = frag_ctx->start_time;
+ udpw->transmission_time = frag_ctx->next_frag_time;
+ frag_ctx->next_frag_time
+ = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
+ session->flow_delay_from_other_peer);
udpw->frag_ctx = frag_ctx;
udpw->qc = &qc_fragment_sent;
udpw->qc_cls = plugin;
{
struct Plugin *plugin = cls;
size_t overhead;
+ struct GNUNET_TIME_Relative delay;
if (udpw->msg_size >= udpw->payload_size)
overhead = udpw->msg_size - udpw->payload_size;
overhead = udpw->msg_size;
if (NULL != udpw->cont)
+ {
+ delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
+ if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Message sent via UDP with delay of %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message sent via UDP with delay of %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ }
udpw->cont (udpw->cont_cls,
&udpw->session->target,
result,
udpw->payload_size,
overhead);
+ }
if (GNUNET_OK == result)
{
GNUNET_STATISTICS_update (plugin->env->stats,
udpw->msg_buf = (char *) &udpw[1];
udpw->msg_size = udpmlen; /* message size with UDP overhead */
udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
+ udpw->start_time = GNUNET_TIME_absolute_get ();
udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
+ udpw->transmission_time = s->last_transmit_time;
+ s->last_transmit_time
+ = GNUNET_TIME_absolute_add (s->last_transmit_time,
+ s->flow_delay_from_other_peer);
udpw->cont = cont;
udpw->cont_cls = cont_cls;
udpw->frag_ctx = NULL;
frag_ctx->session = s;
frag_ctx->cont = cont;
frag_ctx->cont_cls = cont_cls;
+ frag_ctx->start_time = GNUNET_TIME_absolute_get ();
+ frag_ctx->next_frag_time = s->last_transmit_time;
frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
&enqueue_fragment,
frag_ctx);
s->frag_ctx = frag_ctx;
+ s->last_transmit_time = frag_ctx->next_frag_time;
GNUNET_STATISTICS_update (plugin->env->stats,
"# UDP, fragmented messages active",
1,
GNUNET_STRINGS_relative_time_to_string (flow_delay,
GNUNET_YES),
GNUNET_i2s (&udp_ack->sender));
- s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
+ /* Flow delay is for the reassembled packet, however, our delay
+ is per packet, so we need to adjust: */
+ flow_delay = GNUNET_TIME_relative_divide (flow_delay,
+ 1 + (s->frag_ctx->payload_size /
+ UDP_MTU));
+ s->flow_delay_from_other_peer = flow_delay;
if (GNUNET_OK !=
s->plugin = plugin;
s->address = GNUNET_HELLO_address_copy (address);
s->target = address->peer;
+ s->last_transmit_time = GNUNET_TIME_absolute_get ();
s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
250);
s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
- s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
+ s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
udpw->msg_size = msize;
udpw->payload_size = 0;
udpw->session = s;
+ udpw->start_time = GNUNET_TIME_absolute_get ();
udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
udpw->msg_buf = (char *) &udpw[1];
udpw->qc = &ack_message_sent;
}
else
{
- /* Message did not time out, check flow delay */
- remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
+ /* Message did not time out, check transmission time */
+ remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
if (0 == remaining.rel_value_us)
{
/* this message is not delayed */
udpw->msg_size,
a,
slen);
+ udpw->session->last_transmit_time
+ = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
+ udpw->session->last_transmit_time);
dequeue (plugin,
udpw);
if (GNUNET_SYSERR == sent)