From 54a83ae9ffdf895596369a78929a8213fb8d900c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 9 Jul 2011 16:21:14 +0000 Subject: [PATCH] fragging --- src/fragmentation/fragmentation.h | 7 ++ src/fragmentation/fragmentation_new.c | 143 +++++++++++++++++++++++++- 2 files changed, 146 insertions(+), 4 deletions(-) diff --git a/src/fragmentation/fragmentation.h b/src/fragmentation/fragmentation.h index e9b3faba5..9473fa31a 100644 --- a/src/fragmentation/fragmentation.h +++ b/src/fragmentation/fragmentation.h @@ -46,6 +46,13 @@ struct FragmentAcknowledgement struct GNUNET_MessageHeader header; + /** + * Bits that are being acknowledged, in big-endian. + * (bits that are set correspond to fragments that + * have not yet been received). + */ + uint64_t bits; + }; diff --git a/src/fragmentation/fragmentation_new.c b/src/fragmentation/fragmentation_new.c index a95afc4a4..66633e4c1 100644 --- a/src/fragmentation/fragmentation_new.c +++ b/src/fragmentation/fragmentation_new.c @@ -47,6 +47,11 @@ struct GNUNET_FRAGMENT_Context */ struct GNUNET_TIME_Relative delay; + /** + * Time we transmitted the last message of the last round. + */ + struct GNUNET_TIME_Absolute last_round; + /** * Message to fragment (allocated at the end of this struct). */ @@ -72,6 +77,16 @@ struct GNUNET_FRAGMENT_Context */ GNUNET_SCHEDULER_TaskIdentifier task; + /** + * Round-robin selector for the next transmission. + */ + unsigned int next_transmission; + + /** + * GNUNET_YES if we are waiting for an ACK. + */ + int wack; + /** * Target fragment size. */ @@ -91,8 +106,85 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_FRAGMENT_Context *fc = cls; + char msg[fc->mtu]; + const char *mbuf; + struct FragmentHeader *fh; + struct GNUNET_TIME_Relative delay; + unsigned int bit; + size_t size; + size_t fsize; + int wrap; fc->task = GNUNET_SCHEDULER_NO_TASK; + if (0 == fc->acks) + return; /* all done */ + + /* calculate delay */ + wrap = 0; + while (0 == (fc->acks & (1 << fc->next_transmission))) + { + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (fc->next_transmission == 0); + } + bit = fc->next_transmission; + size = ntohs (fc->msg->size); + if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) + fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader); + else + fsize = fc->mtu; + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); + if (delay.rel_value > 0) + { + fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fc->mtu), + &transmit_next, + fc); + return; + } + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (fc->next_transmission == 0); + + /* assemble fragmentation message */ + mbuf = (const char*) &fc[1]; + fh = (struct FragmentHeader*) msg; + fh->header.size = htons (fsize); + fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT); + /* FIXME: add specific ID info... */ + memcpy (&fc[1], + &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], + fsize - sizeof (struct FragmentHeader)); + fc->proc (fc->proc_cls, &fh->header); + GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); + GNUNET_STATISTICS_update (fc->stats, + _("Fragments transmitted"), + 1, GNUNET_NO); + if (0 != fc->last_round.abs_value) + GNUNET_STATISTICS_update (fc->stats, + _("Fragments retransmitted"), + 1, GNUNET_NO); + + /* select next message to calculate delay */ + bit = fc->next_transmission; + size = ntohs (fc->msg->size); + if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) + fsize = size % (fc->mtu - sizeof (struct FragmentHeader)); + else + fsize = fc->mtu; + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); + if (wrap) + { + /* full round transmitted wait 2x delay for ACK before going again */ + delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2), + fc->delay); + fc->last_round = GNUNET_TIME_absolute_get (); + fc->wack = GNUNET_YES; + } + fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fc->mtu), + &transmit_next, + fc); } @@ -127,8 +219,14 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, size_t size; uint64_t bits; + GNUNET_STATISTICS_update (stats, + _("Messages fragmented"), + 1, GNUNET_NO); GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); size = ntohs (msg->size); + GNUNET_STATISTICS_update (stats, + _("Total size of fragmented messages"), + size, GNUNET_NO); GNUNET_assert (size > mtu); fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); fc->stats = stats; @@ -145,9 +243,8 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, fc->acks = UINT64_MAX; /* set all 64 bit */ else fc->acks = (1 << bits) - 1; /* set lowest 'bits' bit */ - fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (tracker, mtu), - &transmit_next, - fc); + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, + fc); return fc; } @@ -167,7 +264,45 @@ int GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, const struct GNUNET_MessageHeader *msg) { - return GNUNET_SYSERR; + const struct FragmentAcknowledgement *fa; + uint64_t abits; + struct GNUNET_TIME_Relative ndelay; + + if (sizeof (struct FragmentAcknowledgement) != + ntohs (msg->size)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + fa = (const struct FragmentAcknowledgement *) msg; + abits = GNUNET_ntohll (fa->bits); + /* FIXME: match FA to us... */ + + if (GNUNET_YES == fc->wack) + { + /* normal ACK, can update running average of delay... */ + fc->wack = GNUNET_NO; + ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); + fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4; + } + + fc->acks &= abits; + if (0 != fc->acks) + { + /* more to transmit, do so right now (if tracker permits...) */ + GNUNET_SCHEDULER_cancel (fc->task); + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, + fc); + return GNUNET_NO; + } + + /* all done */ + if (fc->task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (fc->task); + fc->task = GNUNET_SCHEDULER_NO_TASK; + } + return GNUNET_OK; } -- 2.25.1