*/
struct GNUNET_TIME_Relative delay;
+ /**
+ * Time we transmitted the last message of the last round.
+ */
+ struct GNUNET_TIME_Absolute last_round;
+
/**
* Message to fragment (allocated at the end of this struct).
*/
*/
GNUNET_SCHEDULER_TaskIdentifier task;
+ /**
+ * Round-robin selector for the next transmission.
+ */
+ unsigned int next_transmission;
+
+ /**
+ * GNUNET_YES if we are waiting for an ACK.
+ */
+ int wack;
+
/**
* Target fragment size.
*/
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_FRAGMENT_Context *fc = cls;
+ char msg[fc->mtu];
+ const char *mbuf;
+ struct FragmentHeader *fh;
+ struct GNUNET_TIME_Relative delay;
+ unsigned int bit;
+ size_t size;
+ size_t fsize;
+ int wrap;
fc->task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 == fc->acks)
+ return; /* all done */
+
+ /* calculate delay */
+ wrap = 0;
+ while (0 == (fc->acks & (1 << fc->next_transmission)))
+ {
+ fc->next_transmission = (fc->next_transmission + 1) % 64;
+ wrap |= (fc->next_transmission == 0);
+ }
+ bit = fc->next_transmission;
+ size = ntohs (fc->msg->size);
+ if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+ fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
+ else
+ fsize = fc->mtu;
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+ fsize);
+ if (delay.rel_value > 0)
+ {
+ fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+ fc->mtu),
+ &transmit_next,
+ fc);
+ return;
+ }
+ fc->next_transmission = (fc->next_transmission + 1) % 64;
+ wrap |= (fc->next_transmission == 0);
+
+ /* assemble fragmentation message */
+ mbuf = (const char*) &fc[1];
+ fh = (struct FragmentHeader*) msg;
+ fh->header.size = htons (fsize);
+ fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
+ /* FIXME: add specific ID info... */
+ memcpy (&fc[1],
+ &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
+ fsize - sizeof (struct FragmentHeader));
+ fc->proc (fc->proc_cls, &fh->header);
+ GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
+ GNUNET_STATISTICS_update (fc->stats,
+ _("Fragments transmitted"),
+ 1, GNUNET_NO);
+ if (0 != fc->last_round.abs_value)
+ GNUNET_STATISTICS_update (fc->stats,
+ _("Fragments retransmitted"),
+ 1, GNUNET_NO);
+
+ /* select next message to calculate delay */
+ bit = fc->next_transmission;
+ size = ntohs (fc->msg->size);
+ if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+ fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
+ else
+ fsize = fc->mtu;
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+ fsize);
+ if (wrap)
+ {
+ /* full round transmitted wait 2x delay for ACK before going again */
+ delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
+ fc->delay);
+ fc->last_round = GNUNET_TIME_absolute_get ();
+ fc->wack = GNUNET_YES;
+ }
+ fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+ fc->mtu),
+ &transmit_next,
+ fc);
}
size_t size;
uint64_t bits;
+ GNUNET_STATISTICS_update (stats,
+ _("Messages fragmented"),
+ 1, GNUNET_NO);
GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
size = ntohs (msg->size);
+ GNUNET_STATISTICS_update (stats,
+ _("Total size of fragmented messages"),
+ size, GNUNET_NO);
GNUNET_assert (size > mtu);
fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
fc->stats = stats;
fc->acks = UINT64_MAX; /* set all 64 bit */
else
fc->acks = (1 << bits) - 1; /* set lowest 'bits' bit */
- fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (tracker, mtu),
- &transmit_next,
- fc);
+ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+ fc);
return fc;
}
GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
const struct GNUNET_MessageHeader *msg)
{
- return GNUNET_SYSERR;
+ const struct FragmentAcknowledgement *fa;
+ uint64_t abits;
+ struct GNUNET_TIME_Relative ndelay;
+
+ if (sizeof (struct FragmentAcknowledgement) !=
+ ntohs (msg->size))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ fa = (const struct FragmentAcknowledgement *) msg;
+ abits = GNUNET_ntohll (fa->bits);
+ /* FIXME: match FA to us... */
+
+ if (GNUNET_YES == fc->wack)
+ {
+ /* normal ACK, can update running average of delay... */
+ fc->wack = GNUNET_NO;
+ ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
+ fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
+ }
+
+ fc->acks &= abits;
+ if (0 != fc->acks)
+ {
+ /* more to transmit, do so right now (if tracker permits...) */
+ GNUNET_SCHEDULER_cancel (fc->task);
+ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+ fc);
+ return GNUNET_NO;
+ }
+
+ /* all done */
+ if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (fc->task);
+ fc->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ return GNUNET_OK;
}