X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Ffragmentation%2Ffragmentation.c;h=3a55502e71b03adfcde7e71778b48c1b8a8af73a;hb=3b680a20ab2cbb98cfa658d85be7a44baaf95d2c;hp=8fab3fee4c74f7c2b6ce3ba990d50361e6677b59;hpb=71ea5bd2d05058008e604ffd42993be9c7250e04;p=oweals%2Fgnunet.git diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c index 8fab3fee4..3a55502e7 100644 --- a/src/fragmentation/fragmentation.c +++ b/src/fragmentation/fragmentation.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2009, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file src/fragmentation/fragmentation.c @@ -28,6 +28,12 @@ #include "fragmentation.h" +/** + * Absolute minimum delay we impose between sending and expecting ACK to arrive. + */ +#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1) + + /** * Fragmentation context. */ @@ -46,7 +52,12 @@ struct GNUNET_FRAGMENT_Context /** * Current expected delay for ACKs. */ - struct GNUNET_TIME_Relative delay; + struct GNUNET_TIME_Relative ack_delay; + + /** + * Current expected delay between messages. + */ + struct GNUNET_TIME_Relative msg_delay; /** * Next allowed transmission time. @@ -69,7 +80,7 @@ struct GNUNET_FRAGMENT_Context GNUNET_FRAGMENT_MessageProcessor proc; /** - * Closure for 'proc'. + * Closure for @e proc. */ void *proc_cls; @@ -79,7 +90,7 @@ struct GNUNET_FRAGMENT_Context uint64_t acks; /** - * Bitfield with all possible bits for 'acks' (used to mask the + * Bitfield with all possible bits for @e acks (used to mask the * ack we get back). */ uint64_t acks_mask; @@ -87,7 +98,7 @@ struct GNUNET_FRAGMENT_Context /** * Task performing work for the fragmenter. */ - GNUNET_SCHEDULER_TaskIdentifier task; + struct GNUNET_SCHEDULER_Task *task; /** * Our fragmentation ID. (chosen at random) @@ -105,12 +116,17 @@ struct GNUNET_FRAGMENT_Context unsigned int num_rounds; /** - * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done' + * How many transmission have we completed in this round? + */ + unsigned int num_transmissions; + + /** + * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done() */ int8_t proc_busy; /** - * GNUNET_YES if we are waiting for an ACK. + * #GNUNET_YES if we are waiting for an ACK. */ int8_t wack; @@ -122,14 +138,38 @@ struct GNUNET_FRAGMENT_Context }; +/** + * Convert an ACK message to a printable format suitable for logging. + * + * @param ack message to print + * @return ack in human-readable format + */ +const char * +GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack) +{ + static char buf[128]; + const struct FragmentAcknowledgement *fa; + + if (sizeof (struct FragmentAcknowledgement) != + htons (ack->size)) + return ""; + fa = (const struct FragmentAcknowledgement *) ack; + GNUNET_snprintf (buf, + sizeof (buf), + "%u-%llX", + ntohl (fa->fragment_id), + GNUNET_ntohll (fa->bits)); + return buf; +} + + /** * Transmit the next fragment to the other peer. * - * @param cls the 'struct GNUNET_FRAGMENT_Context' - * @param tc scheduler context + * @param cls the `struct GNUNET_FRAGMENT_Context` */ static void -transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_next (void *cls) { struct GNUNET_FRAGMENT_Context *fc = cls; char msg[fc->mtu]; @@ -141,17 +181,16 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) size_t fsize; int wrap; - fc->task = GNUNET_SCHEDULER_NO_TASK; + fc->task = NULL; GNUNET_assert (GNUNET_NO == fc->proc_busy); if (0 == fc->acks) return; /* all done */ - /* calculate delay */ wrap = 0; while (0 == (fc->acks & (1LL << fc->next_transmission))) { fc->next_transmission = (fc->next_transmission + 1) % 64; - wrap |= (fc->next_transmission == 0); + wrap |= (0 == fc->next_transmission); } bit = fc->next_transmission; size = ntohs (fc->msg->size); @@ -161,17 +200,29 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) sizeof (struct FragmentHeader); else fsize = fc->mtu; - if (fc->tracker != NULL) - delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize); + if (NULL != fc->tracker) + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); else delay = GNUNET_TIME_UNIT_ZERO; - if (delay.rel_value > 0) + if (delay.rel_value_us > 0) { - fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Fragmentation logic delays transmission of next fragment by %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + fc->task = GNUNET_SCHEDULER_add_delayed (delay, + &transmit_next, + fc); return; } fc->next_transmission = (fc->next_transmission + 1) % 64; - wrap |= (fc->next_transmission == 0); + wrap |= (0 == fc->next_transmission); + while (0 == (fc->acks & (1LL << fc->next_transmission))) + { + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (0 == fc->next_transmission); + } /* assemble fragmentation message */ mbuf = (const char *) &fc[1]; @@ -185,10 +236,14 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) fsize - sizeof (struct FragmentHeader)); if (NULL != fc->tracker) GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); - GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1, + GNUNET_STATISTICS_update (fc->stats, + _("# fragments transmitted"), + 1, GNUNET_NO); - if (0 != fc->last_round.abs_value) - GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1, + if (0 != fc->last_round.abs_value_us) + GNUNET_STATISTICS_update (fc->stats, + _("# fragments retransmitted"), + 1, GNUNET_NO); /* select next message to calculate delay */ @@ -199,51 +254,61 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) else fsize = fc->mtu; if (NULL != fc->tracker) - delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize); + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); else delay = GNUNET_TIME_UNIT_ZERO; + delay = GNUNET_TIME_relative_max (delay, + GNUNET_TIME_relative_multiply (fc->msg_delay, + (1ULL << fc->num_rounds))); if (wrap) { /* full round transmitted wait 2x delay for ACK before going again */ fc->num_rounds++; - delay = - GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2), - GNUNET_TIME_relative_multiply (fc->delay, - fc->num_rounds)); + delay = GNUNET_TIME_relative_multiply (fc->ack_delay, 2); /* never use zero, need some time for ACK always */ - delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS, delay); - fc->last_round = GNUNET_TIME_absolute_get (); + delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay); fc->wack = GNUNET_YES; + fc->last_round = GNUNET_TIME_absolute_get (); + GNUNET_STATISTICS_update (fc->stats, + _("# fragments wrap arounds"), + 1, + GNUNET_NO); } fc->proc_busy = GNUNET_YES; fc->delay_until = GNUNET_TIME_relative_to_absolute (delay); - fc->proc (fc->proc_cls, &fh->header); + fc->num_transmissions++; + fc->proc (fc->proc_cls, + &fh->header); } /** * Create a fragmentation context for the given message. - * Fragments the message into fragments of size "mtu" or - * less. Calls 'proc' on each un-acknowledged fragment, - * using both the expected 'delay' between messages and - * acknowledgements and the given 'tracker' to guide the - * frequency of calls to 'proc'. + * Fragments the message into fragments of size @a mtu or + * less. Calls @a proc on each un-acknowledged fragment, + * using both the expected @a msg_delay between messages and + * acknowledgements and the given @a tracker to guide the + * frequency of calls to @a proc. * * @param stats statistics context * @param mtu the maximum message size for each fragment * @param tracker bandwidth tracker to use for flow control (can be NULL) - * @param delay expected delay between fragment transmission + * @param msg_delay initial delay to insert between fragment transmissions + * based on previous messages + * @param ack_delay expected delay between fragment transmission * and ACK based on previous messages * @param msg the message to fragment * @param proc function to call for each fragment to transmit - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @return the fragmentation context */ struct GNUNET_FRAGMENT_Context * GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, uint16_t mtu, struct GNUNET_BANDWIDTH_Tracker *tracker, - struct GNUNET_TIME_Relative delay, + struct GNUNET_TIME_Relative msg_delay, + struct GNUNET_TIME_Relative ack_delay, const struct GNUNET_MessageHeader *msg, GNUNET_FRAGMENT_MessageProcessor proc, void *proc_cls) @@ -252,22 +317,28 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, size_t size; uint64_t bits; - GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, + _("# messages fragmented"), + 1, + GNUNET_NO); GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); size = ntohs (msg->size); - GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"), + GNUNET_STATISTICS_update (stats, + _("# total size of fragmented messages"), size, GNUNET_NO); GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); fc->stats = stats; fc->mtu = mtu; fc->tracker = tracker; - fc->delay = delay; + fc->ack_delay = ack_delay; + fc->msg_delay = msg_delay; fc->msg = (const struct GNUNET_MessageHeader *) &fc[1]; fc->proc = proc; fc->proc_cls = proc_cls; fc->fragment_id = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); memcpy (&fc[1], msg, size); bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - @@ -296,7 +367,7 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) { GNUNET_assert (fc->proc_busy == GNUNET_YES); fc->proc_busy = GNUNET_NO; - GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (fc->task == NULL); fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (fc->delay_until), &transmit_next, fc); @@ -309,10 +380,10 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) * * @param fc fragmentation context * @param msg acknowledgement message we received - * @return GNUNET_OK if this ack completes the work of the 'fc' + * @return #GNUNET_OK if this ack completes the work of the 'fc' * (all fragments have been received); - * GNUNET_NO if more messages are pending - * GNUNET_SYSERR if this ack is not valid for this fc + * #GNUNET_NO if more messages are pending + * #GNUNET_SYSERR if this ack is not valid for this fc */ int GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, @@ -321,6 +392,9 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, const struct FragmentAcknowledgement *fa; uint64_t abits; struct GNUNET_TIME_Relative ndelay; + unsigned int ack_cnt; + unsigned int snd_cnt; + unsigned int i; if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size)) { @@ -331,16 +405,51 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, if (ntohl (fa->fragment_id) != fc->fragment_id) return GNUNET_SYSERR; /* not our ACK */ abits = GNUNET_ntohll (fa->bits); - if ((GNUNET_YES == fc->wack) && (abits == (fc->acks & abits))) + if ( (GNUNET_YES == fc->wack) && + (0 != fc->num_transmissions) ) { /* normal ACK, can update running average of delay... */ fc->wack = GNUNET_NO; ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); - fc->delay.rel_value = - (ndelay.rel_value * fc->num_rounds + 3 * fc->delay.rel_value) / 4; + fc->ack_delay.rel_value_us = + (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4; + /* calculate ratio msg sent vs. msg acked */ + ack_cnt = 0; + snd_cnt = 0; + for (i=0;i<64;i++) + { + if (1 == (fc->acks_mask & (1ULL << i))) + { + snd_cnt++; + if (0 == (abits & (1ULL << i))) + ack_cnt++; + } + } + if (0 == ack_cnt) + { + /* complete loss */ + fc->msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, + snd_cnt); + } + else if (snd_cnt > ack_cnt) + { + /* some loss, slow down proportionally */ + fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt); + } + else if (snd_cnt == ack_cnt) + { + fc->msg_delay.rel_value_us = + (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5; + } + fc->num_transmissions = 0; + fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay, + GNUNET_TIME_UNIT_SECONDS); + fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay, + GNUNET_TIME_UNIT_SECONDS); } GNUNET_STATISTICS_update (fc->stats, - _("# fragment acknowledgements received"), 1, + _("# fragment acknowledgements received"), + 1, GNUNET_NO); if (abits != (fc->acks & abits)) { @@ -353,7 +462,7 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, if (0 != fc->acks) { /* more to transmit, do so right now (if tracker permits...) */ - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (fc->task != NULL) { /* schedule next transmission now, no point in waiting... */ GNUNET_SCHEDULER_cancel (fc->task); @@ -370,12 +479,13 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, /* all done */ GNUNET_STATISTICS_update (fc->stats, - _("# fragmentation transmissions completed"), 1, + _("# fragmentation transmissions completed"), + 1, GNUNET_NO); - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != fc->task) { GNUNET_SCHEDULER_cancel (fc->task); - fc->task = GNUNET_SCHEDULER_NO_TASK; + fc->task = NULL; } return GNUNET_OK; } @@ -386,19 +496,24 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, * resources). * * @param fc fragmentation context - * @return average delay between transmission and ACK for the - * last message, FOREVER if the message was not fully transmitted + * @param msg_delay where to store average delay between individual message transmissions the + * last message (OUT only) + * @param ack_delay where to store average delay between transmission and ACK for the + * last message, set to FOREVER if the message was not fully transmitted (OUT only) */ -struct GNUNET_TIME_Relative -GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc) +void +GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc, + struct GNUNET_TIME_Relative *msg_delay, + struct GNUNET_TIME_Relative *ack_delay) { - struct GNUNET_TIME_Relative ret; - - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (fc->task != NULL) GNUNET_SCHEDULER_cancel (fc->task); - ret = fc->delay; + if (NULL != ack_delay) + *ack_delay = fc->ack_delay; + if (NULL != msg_delay) + *msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, + fc->num_rounds); GNUNET_free (fc); - return ret; }