X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffragmentation%2Ffragmentation.c;h=3a55502e71b03adfcde7e71778b48c1b8a8af73a;hb=3b680a20ab2cbb98cfa658d85be7a44baaf95d2c;hp=77b4c2e495cafb4287ec3af4d25420435b04141c;hpb=22dafb003418aab11e036230b3761116c397d239;p=oweals%2Fgnunet.git diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c index 77b4c2e49..3a55502e7 100644 --- a/src/fragmentation/fragmentation.c +++ b/src/fragmentation/fragmentation.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2009, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,11 +14,11 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** - * @file src/fragmentation/fragmentation_new.c + * @file src/fragmentation/fragmentation.c * @brief library to help fragment messages * @author Christian Grothoff */ @@ -28,6 +28,12 @@ #include "fragmentation.h" +/** + * Absolute minimum delay we impose between sending and expecting ACK to arrive. + */ +#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1) + + /** * Fragmentation context. */ @@ -46,7 +52,12 @@ struct GNUNET_FRAGMENT_Context /** * Current expected delay for ACKs. */ - struct GNUNET_TIME_Relative delay; + struct GNUNET_TIME_Relative ack_delay; + + /** + * Current expected delay between messages. + */ + struct GNUNET_TIME_Relative msg_delay; /** * Next allowed transmission time. @@ -69,7 +80,7 @@ struct GNUNET_FRAGMENT_Context GNUNET_FRAGMENT_MessageProcessor proc; /** - * Closure for 'proc'. + * Closure for @e proc. */ void *proc_cls; @@ -78,10 +89,16 @@ struct GNUNET_FRAGMENT_Context */ uint64_t acks; + /** + * Bitfield with all possible bits for @e acks (used to mask the + * ack we get back). + */ + uint64_t acks_mask; + /** * Task performing work for the fragmenter. */ - GNUNET_SCHEDULER_TaskIdentifier task; + struct GNUNET_SCHEDULER_Task *task; /** * Our fragmentation ID. (chosen at random) @@ -94,12 +111,22 @@ struct GNUNET_FRAGMENT_Context unsigned int next_transmission; /** - * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done' + * How many rounds of transmission have we completed so far? + */ + unsigned int num_rounds; + + /** + * How many transmission have we completed in this round? + */ + unsigned int num_transmissions; + + /** + * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done() */ int8_t proc_busy; /** - * GNUNET_YES if we are waiting for an ACK. + * #GNUNET_YES if we are waiting for an ACK. */ int8_t wack; @@ -107,19 +134,42 @@ struct GNUNET_FRAGMENT_Context * Target fragment size. */ uint16_t mtu; - + }; +/** + * Convert an ACK message to a printable format suitable for logging. + * + * @param ack message to print + * @return ack in human-readable format + */ +const char * +GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack) +{ + static char buf[128]; + const struct FragmentAcknowledgement *fa; + + if (sizeof (struct FragmentAcknowledgement) != + htons (ack->size)) + return ""; + fa = (const struct FragmentAcknowledgement *) ack; + GNUNET_snprintf (buf, + sizeof (buf), + "%u-%llX", + ntohl (fa->fragment_id), + GNUNET_ntohll (fa->bits)); + return buf; +} + + /** * Transmit the next fragment to the other peer. * - * @param cls the 'struct GNUNET_FRAGMENT_Context' - * @param tc scheduler context + * @param cls the `struct GNUNET_FRAGMENT_Context` */ static void -transmit_next (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_next (void *cls) { struct GNUNET_FRAGMENT_Context *fc = cls; char msg[fc->mtu]; @@ -131,59 +181,70 @@ transmit_next (void *cls, size_t fsize; int wrap; - fc->task = GNUNET_SCHEDULER_NO_TASK; + fc->task = NULL; GNUNET_assert (GNUNET_NO == fc->proc_busy); if (0 == fc->acks) - return; /* all done */ - + return; /* all done */ /* calculate delay */ wrap = 0; - while (0 == (fc->acks & (1LL << fc->next_transmission))) - { - fc->next_transmission = (fc->next_transmission + 1) % 64; - wrap |= (fc->next_transmission == 0); - } + while (0 == (fc->acks & (1LL << fc->next_transmission))) + { + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (0 == fc->next_transmission); + } bit = fc->next_transmission; size = ntohs (fc->msg->size); if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) - fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader); + fsize = + (size % (fc->mtu - sizeof (struct FragmentHeader))) + + sizeof (struct FragmentHeader); else fsize = fc->mtu; - if (fc->tracker != NULL) + if (NULL != fc->tracker) delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, - fsize); + fsize); else delay = GNUNET_TIME_UNIT_ZERO; - if (delay.rel_value > 0) - { - fc->task = GNUNET_SCHEDULER_add_delayed (delay, - &transmit_next, - fc); - return; - } + if (delay.rel_value_us > 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Fragmentation logic delays transmission of next fragment by %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + fc->task = GNUNET_SCHEDULER_add_delayed (delay, + &transmit_next, + fc); + return; + } fc->next_transmission = (fc->next_transmission + 1) % 64; - wrap |= (fc->next_transmission == 0); + wrap |= (0 == fc->next_transmission); + while (0 == (fc->acks & (1LL << fc->next_transmission))) + { + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (0 == fc->next_transmission); + } /* assemble fragmentation message */ - mbuf = (const char*) &fc[1]; - fh = (struct FragmentHeader*) msg; + mbuf = (const char *) &fc[1]; + fh = (struct FragmentHeader *) msg; fh->header.size = htons (fsize); fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT); fh->fragment_id = htonl (fc->fragment_id); - fh->total_size = fc->msg->size; /* already in big-endian */ + fh->total_size = fc->msg->size; /* already in big-endian */ fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit); - memcpy (&fh[1], - &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], - fsize - sizeof (struct FragmentHeader)); + memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], + fsize - sizeof (struct FragmentHeader)); if (NULL != fc->tracker) - GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); + GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); GNUNET_STATISTICS_update (fc->stats, - _("# fragments transmitted"), - 1, GNUNET_NO); - if (0 != fc->last_round.abs_value) + _("# fragments transmitted"), + 1, + GNUNET_NO); + if (0 != fc->last_round.abs_value_us) GNUNET_STATISTICS_update (fc->stats, - _("# fragments retransmitted"), - 1, GNUNET_NO); + _("# fragments retransmitted"), + 1, + GNUNET_NO); /* select next message to calculate delay */ bit = fc->next_transmission; @@ -194,85 +255,102 @@ transmit_next (void *cls, fsize = fc->mtu; if (NULL != fc->tracker) delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, - fsize); + fsize); else delay = GNUNET_TIME_UNIT_ZERO; + delay = GNUNET_TIME_relative_max (delay, + GNUNET_TIME_relative_multiply (fc->msg_delay, + (1ULL << fc->num_rounds))); if (wrap) - { - /* full round transmitted wait 2x delay for ACK before going again */ - delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2), - fc->delay); - /* never use zero, need some time for ACK always */ - delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS, - delay); - fc->last_round = GNUNET_TIME_absolute_get (); - fc->wack = GNUNET_YES; - } + { + /* full round transmitted wait 2x delay for ACK before going again */ + fc->num_rounds++; + delay = GNUNET_TIME_relative_multiply (fc->ack_delay, 2); + /* never use zero, need some time for ACK always */ + delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay); + fc->wack = GNUNET_YES; + fc->last_round = GNUNET_TIME_absolute_get (); + GNUNET_STATISTICS_update (fc->stats, + _("# fragments wrap arounds"), + 1, + GNUNET_NO); + } fc->proc_busy = GNUNET_YES; fc->delay_until = GNUNET_TIME_relative_to_absolute (delay); - fc->proc (fc->proc_cls, &fh->header); + fc->num_transmissions++; + fc->proc (fc->proc_cls, + &fh->header); } /** * Create a fragmentation context for the given message. - * Fragments the message into fragments of size "mtu" or - * less. Calls 'proc' on each un-acknowledged fragment, - * using both the expected 'delay' between messages and - * acknowledgements and the given 'tracker' to guide the - * frequency of calls to 'proc'. + * Fragments the message into fragments of size @a mtu or + * less. Calls @a proc on each un-acknowledged fragment, + * using both the expected @a msg_delay between messages and + * acknowledgements and the given @a tracker to guide the + * frequency of calls to @a proc. * * @param stats statistics context * @param mtu the maximum message size for each fragment * @param tracker bandwidth tracker to use for flow control (can be NULL) - * @param delay expected delay between fragment transmission + * @param msg_delay initial delay to insert between fragment transmissions + * based on previous messages + * @param ack_delay expected delay between fragment transmission * and ACK based on previous messages * @param msg the message to fragment * @param proc function to call for each fragment to transmit - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @return the fragmentation context */ struct GNUNET_FRAGMENT_Context * GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, - uint16_t mtu, - struct GNUNET_BANDWIDTH_Tracker *tracker, - struct GNUNET_TIME_Relative delay, - const struct GNUNET_MessageHeader *msg, - GNUNET_FRAGMENT_MessageProcessor proc, - void *proc_cls) + uint16_t mtu, + struct GNUNET_BANDWIDTH_Tracker *tracker, + struct GNUNET_TIME_Relative msg_delay, + struct GNUNET_TIME_Relative ack_delay, + const struct GNUNET_MessageHeader *msg, + GNUNET_FRAGMENT_MessageProcessor proc, + void *proc_cls) { struct GNUNET_FRAGMENT_Context *fc; size_t size; uint64_t bits; - + GNUNET_STATISTICS_update (stats, - _("# messages fragmented"), - 1, GNUNET_NO); + _("# messages fragmented"), + 1, + GNUNET_NO); GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); size = ntohs (msg->size); GNUNET_STATISTICS_update (stats, - _("# total size of fragmented messages"), - size, GNUNET_NO); - //GNUNET_assert (size > mtu); + _("# total size of fragmented messages"), + size, GNUNET_NO); + GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); fc->stats = stats; fc->mtu = mtu; fc->tracker = tracker; - fc->delay = delay; - fc->msg = (const struct GNUNET_MessageHeader*)&fc[1]; + fc->ack_delay = ack_delay; + fc->msg_delay = msg_delay; + fc->msg = (const struct GNUNET_MessageHeader *) &fc[1]; fc->proc = proc; fc->proc_cls = proc_cls; - fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); + fc->fragment_id = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); memcpy (&fc[1], msg, size); - bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader)); + bits = + (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - + sizeof (struct + FragmentHeader)); GNUNET_assert (bits <= 64); if (bits == 64) - fc->acks = UINT64_MAX; /* set all 64 bit */ + fc->acks_mask = UINT64_MAX; /* set all 64 bit */ else - fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */ - fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, - fc); + fc->acks_mask = (1LL << bits) - 1; /* set lowest 'bits' bit */ + fc->acks = fc->acks_mask; + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc); return fc; } @@ -289,10 +367,10 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) { GNUNET_assert (fc->proc_busy == GNUNET_YES); fc->proc_busy = GNUNET_NO; - GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); - fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (fc->delay_until), - &transmit_next, - fc); + GNUNET_assert (fc->task == NULL); + fc->task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining + (fc->delay_until), &transmit_next, fc); } @@ -302,77 +380,113 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) * * @param fc fragmentation context * @param msg acknowledgement message we received - * @return GNUNET_OK if this ack completes the work of the 'fc' + * @return #GNUNET_OK if this ack completes the work of the 'fc' * (all fragments have been received); - * GNUNET_NO if more messages are pending - * GNUNET_SYSERR if this ack is not valid for this fc + * #GNUNET_NO if more messages are pending + * #GNUNET_SYSERR if this ack is not valid for this fc */ -int +int GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_MessageHeader *msg) { const struct FragmentAcknowledgement *fa; uint64_t abits; struct GNUNET_TIME_Relative ndelay; - - if (sizeof (struct FragmentAcknowledgement) != - ntohs (msg->size)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + unsigned int ack_cnt; + unsigned int snd_cnt; + unsigned int i; + + if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } fa = (const struct FragmentAcknowledgement *) msg; if (ntohl (fa->fragment_id) != fc->fragment_id) - return GNUNET_SYSERR; /* not our ACK */ + return GNUNET_SYSERR; /* not our ACK */ abits = GNUNET_ntohll (fa->bits); - if (GNUNET_YES == fc->wack) + if ( (GNUNET_YES == fc->wack) && + (0 != fc->num_transmissions) ) + { + /* normal ACK, can update running average of delay... */ + fc->wack = GNUNET_NO; + ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); + fc->ack_delay.rel_value_us = + (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4; + /* calculate ratio msg sent vs. msg acked */ + ack_cnt = 0; + snd_cnt = 0; + for (i=0;i<64;i++) { - /* normal ACK, can update running average of delay... */ - fc->wack = GNUNET_NO; - ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); - fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4; + if (1 == (fc->acks_mask & (1ULL << i))) + { + snd_cnt++; + if (0 == (abits & (1ULL << i))) + ack_cnt++; + } } + if (0 == ack_cnt) + { + /* complete loss */ + fc->msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, + snd_cnt); + } + else if (snd_cnt > ack_cnt) + { + /* some loss, slow down proportionally */ + fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt); + } + else if (snd_cnt == ack_cnt) + { + fc->msg_delay.rel_value_us = + (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5; + } + fc->num_transmissions = 0; + fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay, + GNUNET_TIME_UNIT_SECONDS); + fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay, + GNUNET_TIME_UNIT_SECONDS); + } GNUNET_STATISTICS_update (fc->stats, - _("# fragment acknowledgements received"), - 1, - GNUNET_NO); + _("# fragment acknowledgements received"), + 1, + GNUNET_NO); if (abits != (fc->acks & abits)) + { + /* ID collission or message reordering, count! This should be rare! */ + GNUNET_STATISTICS_update (fc->stats, + _("# bits removed from fragmentation ACKs"), 1, + GNUNET_NO); + } + fc->acks = abits & fc->acks_mask; + if (0 != fc->acks) + { + /* more to transmit, do so right now (if tracker permits...) */ + if (fc->task != NULL) { - /* ID collission or message reordering, count! This should be rare! */ - GNUNET_STATISTICS_update (fc->stats, - _("# bits removed from fragmentation ACKs"), - 1, GNUNET_NO); + /* schedule next transmission now, no point in waiting... */ + GNUNET_SCHEDULER_cancel (fc->task); + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc); } - fc->acks = abits; - if (0 != fc->acks) + else { - /* more to transmit, do so right now (if tracker permits...) */ - if (fc->task != GNUNET_SCHEDULER_NO_TASK) - { - /* schedule next transmission now, no point in waiting... */ - GNUNET_SCHEDULER_cancel (fc->task); - fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, - fc); - } - else - { - /* only case where there is no task should be if we're waiting - for the right to transmit again (proc_busy set to YES) */ - GNUNET_assert (GNUNET_YES == fc->proc_busy); - } - return GNUNET_NO; + /* only case where there is no task should be if we're waiting + * for the right to transmit again (proc_busy set to YES) */ + GNUNET_assert (GNUNET_YES == fc->proc_busy); } + return GNUNET_NO; + } /* all done */ GNUNET_STATISTICS_update (fc->stats, - _("# fragmentation transmissions completed"), - 1, - GNUNET_NO); - if (fc->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (fc->task); - fc->task = GNUNET_SCHEDULER_NO_TASK; - } + _("# fragmentation transmissions completed"), + 1, + GNUNET_NO); + if (NULL != fc->task) + { + GNUNET_SCHEDULER_cancel (fc->task); + fc->task = NULL; + } return GNUNET_OK; } @@ -382,21 +496,25 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, * resources). * * @param fc fragmentation context - * @return average delay between transmission and ACK for the - * last message, FOREVER if the message was not fully transmitted + * @param msg_delay where to store average delay between individual message transmissions the + * last message (OUT only) + * @param ack_delay where to store average delay between transmission and ACK for the + * last message, set to FOREVER if the message was not fully transmitted (OUT only) */ -struct GNUNET_TIME_Relative -GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc) +void +GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc, + struct GNUNET_TIME_Relative *msg_delay, + struct GNUNET_TIME_Relative *ack_delay) { - struct GNUNET_TIME_Relative ret; - - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (fc->task != NULL) GNUNET_SCHEDULER_cancel (fc->task); - ret = fc->delay; + if (NULL != ack_delay) + *ack_delay = fc->ack_delay; + if (NULL != msg_delay) + *msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, + fc->num_rounds); GNUNET_free (fc); - return ret; } -/* end of fragmentation_new.c */ - +/* end of fragmentation.c */