X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffragmentation%2Ffragmentation.c;h=4667725de6854a76dcdaa28d7e181785807306f5;hb=4e29ecde9b3ad3e34af359f18b6679c06b17ce78;hp=bf749181590028406e5a75b3569f893cccaa1d8c;hpb=61c39c60565b386e0e12ea669556b030e8cd7180;p=oweals%2Fgnunet.git diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c index bf7491815..4667725de 100644 --- a/src/fragmentation/fragmentation.c +++ b/src/fragmentation/fragmentation.c @@ -1,21 +1,16 @@ /* This file is part of GNUnet - (C) 2009-2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2013 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Affero General Public License for more details. */ /** * @file src/fragmentation/fragmentation.c @@ -80,7 +75,7 @@ struct GNUNET_FRAGMENT_Context GNUNET_FRAGMENT_MessageProcessor proc; /** - * Closure for 'proc'. + * Closure for @e proc. */ void *proc_cls; @@ -90,7 +85,7 @@ struct GNUNET_FRAGMENT_Context uint64_t acks; /** - * Bitfield with all possible bits for 'acks' (used to mask the + * Bitfield with all possible bits for @e acks (used to mask the * ack we get back). */ uint64_t acks_mask; @@ -98,7 +93,7 @@ struct GNUNET_FRAGMENT_Context /** * Task performing work for the fragmenter. */ - GNUNET_SCHEDULER_TaskIdentifier task; + struct GNUNET_SCHEDULER_Task *task; /** * Our fragmentation ID. (chosen at random) @@ -121,12 +116,12 @@ struct GNUNET_FRAGMENT_Context unsigned int num_transmissions; /** - * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done' + * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done() */ int8_t proc_busy; /** - * GNUNET_YES if we are waiting for an ACK. + * #GNUNET_YES if we are waiting for an ACK. */ int8_t wack; @@ -138,14 +133,38 @@ struct GNUNET_FRAGMENT_Context }; +/** + * Convert an ACK message to a printable format suitable for logging. + * + * @param ack message to print + * @return ack in human-readable format + */ +const char * +GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack) +{ + static char buf[128]; + const struct FragmentAcknowledgement *fa; + + if (sizeof (struct FragmentAcknowledgement) != + htons (ack->size)) + return ""; + fa = (const struct FragmentAcknowledgement *) ack; + GNUNET_snprintf (buf, + sizeof (buf), + "%u-%llX", + ntohl (fa->fragment_id), + GNUNET_ntohll (fa->bits)); + return buf; +} + + /** * Transmit the next fragment to the other peer. * - * @param cls the 'struct GNUNET_FRAGMENT_Context' - * @param tc scheduler context + * @param cls the `struct GNUNET_FRAGMENT_Context` */ static void -transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_next (void *cls) { struct GNUNET_FRAGMENT_Context *fc = cls; char msg[fc->mtu]; @@ -157,13 +176,13 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) size_t fsize; int wrap; - fc->task = GNUNET_SCHEDULER_NO_TASK; + fc->task = NULL; GNUNET_assert (GNUNET_NO == fc->proc_busy); if (0 == fc->acks) return; /* all done */ /* calculate delay */ wrap = 0; - while (0 == (fc->acks & (1LL << fc->next_transmission))) + while (0 == (fc->acks & (1LLU << fc->next_transmission))) { fc->next_transmission = (fc->next_transmission + 1) % 64; wrap |= (0 == fc->next_transmission); @@ -177,17 +196,24 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) else fsize = fc->mtu; if (NULL != fc->tracker) - delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize); + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); else delay = GNUNET_TIME_UNIT_ZERO; if (delay.rel_value_us > 0) { - fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Fragmentation logic delays transmission of next fragment by %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + fc->task = GNUNET_SCHEDULER_add_delayed (delay, + &transmit_next, + fc); return; } fc->next_transmission = (fc->next_transmission + 1) % 64; wrap |= (0 == fc->next_transmission); - while (0 == (fc->acks & (1LL << fc->next_transmission))) + while (0 == (fc->acks & (1LLU << fc->next_transmission))) { fc->next_transmission = (fc->next_transmission + 1) % 64; wrap |= (0 == fc->next_transmission); @@ -201,14 +227,18 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) fh->fragment_id = htonl (fc->fragment_id); fh->total_size = fc->msg->size; /* already in big-endian */ fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit); - memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], + GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], fsize - sizeof (struct FragmentHeader)); if (NULL != fc->tracker) GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); - GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1, + GNUNET_STATISTICS_update (fc->stats, + _("# fragments transmitted"), + 1, GNUNET_NO); if (0 != fc->last_round.abs_value_us) - GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1, + GNUNET_STATISTICS_update (fc->stats, + _("# fragments retransmitted"), + 1, GNUNET_NO); /* select next message to calculate delay */ @@ -219,38 +249,46 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) else fsize = fc->mtu; if (NULL != fc->tracker) - delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize); + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); else delay = GNUNET_TIME_UNIT_ZERO; - delay = GNUNET_TIME_relative_max (delay, - GNUNET_TIME_relative_multiply (fc->msg_delay, - (1 << fc->num_rounds))); + if (fc->num_rounds < 64) + delay = GNUNET_TIME_relative_max (delay, + GNUNET_TIME_relative_saturating_multiply + (fc->msg_delay, + (1ULL << fc->num_rounds))); + else + delay = GNUNET_TIME_UNIT_FOREVER_REL; if (wrap) { /* full round transmitted wait 2x delay for ACK before going again */ fc->num_rounds++; - delay = GNUNET_TIME_relative_multiply (fc->ack_delay, 2); + delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2); /* never use zero, need some time for ACK always */ delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay); fc->wack = GNUNET_YES; fc->last_round = GNUNET_TIME_absolute_get (); - GNUNET_STATISTICS_update (fc->stats, _("# fragments wrap arounds"), 1, + GNUNET_STATISTICS_update (fc->stats, + _("# fragments wrap arounds"), + 1, GNUNET_NO); } fc->proc_busy = GNUNET_YES; fc->delay_until = GNUNET_TIME_relative_to_absolute (delay); fc->num_transmissions++; - fc->proc (fc->proc_cls, &fh->header); + fc->proc (fc->proc_cls, + &fh->header); } /** * Create a fragmentation context for the given message. - * Fragments the message into fragments of size "mtu" or - * less. Calls 'proc' on each un-acknowledged fragment, - * using both the expected 'delay' between messages and - * acknowledgements and the given 'tracker' to guide the - * frequency of calls to 'proc'. + * Fragments the message into fragments of size @a mtu or + * less. Calls @a proc on each un-acknowledged fragment, + * using both the expected @a msg_delay between messages and + * acknowledgements and the given @a tracker to guide the + * frequency of calls to @a proc. * * @param stats statistics context * @param mtu the maximum message size for each fragment @@ -261,7 +299,7 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * and ACK based on previous messages * @param msg the message to fragment * @param proc function to call for each fragment to transmit - * @param proc_cls closure for proc + * @param proc_cls closure for @a proc * @return the fragmentation context */ struct GNUNET_FRAGMENT_Context * @@ -278,10 +316,14 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, size_t size; uint64_t bits; - GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, + _("# messages fragmented"), + 1, + GNUNET_NO); GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); size = ntohs (msg->size); - GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"), + GNUNET_STATISTICS_update (stats, + _("# total size of fragmented messages"), size, GNUNET_NO); GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); @@ -294,8 +336,9 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, fc->proc = proc; fc->proc_cls = proc_cls; fc->fragment_id = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); - memcpy (&fc[1], msg, size); + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + GNUNET_memcpy (&fc[1], msg, size); bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct @@ -304,7 +347,7 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, if (bits == 64) fc->acks_mask = UINT64_MAX; /* set all 64 bit */ else - fc->acks_mask = (1LL << bits) - 1; /* set lowest 'bits' bit */ + fc->acks_mask = (1LLU << bits) - 1; /* set lowest 'bits' bit */ fc->acks = fc->acks_mask; fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc); return fc; @@ -323,10 +366,11 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) { GNUNET_assert (fc->proc_busy == GNUNET_YES); fc->proc_busy = GNUNET_NO; - GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (fc->task == NULL); fc->task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (fc->delay_until), &transmit_next, fc); + GNUNET_SCHEDULER_add_at (fc->delay_until, + &transmit_next, + fc); } @@ -336,10 +380,10 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) * * @param fc fragmentation context * @param msg acknowledgement message we received - * @return GNUNET_OK if this ack completes the work of the 'fc' + * @return #GNUNET_OK if this ack completes the work of the 'fc' * (all fragments have been received); - * GNUNET_NO if more messages are pending - * GNUNET_SYSERR if this ack is not valid for this fc + * #GNUNET_NO if more messages are pending + * #GNUNET_SYSERR if this ack is not valid for this fc */ int GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, @@ -369,40 +413,43 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); fc->ack_delay.rel_value_us = (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4; - fc->num_transmissions = 0; /* calculate ratio msg sent vs. msg acked */ ack_cnt = 0; snd_cnt = 0; for (i=0;i<64;i++) { - if (1 == (fc->acks_mask & (1 << i))) + if (1 == (fc->acks_mask & (1ULL << i))) { snd_cnt++; - if (0 == (abits & (1 << i))) + if (0 == (abits & (1ULL << i))) ack_cnt++; } } if (0 == ack_cnt) { /* complete loss */ - fc->msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, - snd_cnt); + fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay, + snd_cnt); } else if (snd_cnt > ack_cnt) { /* some loss, slow down proportionally */ - fprintf (stderr, "Prop loss\n"); fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt); } - else if (100 < fc->msg_delay.rel_value_us) + else if (snd_cnt == ack_cnt) { - fc->msg_delay.rel_value_us -= 100; /* try a bit faster */ + fc->msg_delay.rel_value_us = + (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5; } + fc->num_transmissions = 0; fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay, GNUNET_TIME_UNIT_SECONDS); + fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay, + GNUNET_TIME_UNIT_SECONDS); } GNUNET_STATISTICS_update (fc->stats, - _("# fragment acknowledgements received"), 1, + _("# fragment acknowledgements received"), + 1, GNUNET_NO); if (abits != (fc->acks & abits)) { @@ -415,7 +462,7 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, if (0 != fc->acks) { /* more to transmit, do so right now (if tracker permits...) */ - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (fc->task != NULL) { /* schedule next transmission now, no point in waiting... */ GNUNET_SCHEDULER_cancel (fc->task); @@ -432,12 +479,13 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, /* all done */ GNUNET_STATISTICS_update (fc->stats, - _("# fragmentation transmissions completed"), 1, + _("# fragmentation transmissions completed"), + 1, GNUNET_NO); - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != fc->task) { GNUNET_SCHEDULER_cancel (fc->task); - fc->task = GNUNET_SCHEDULER_NO_TASK; + fc->task = NULL; } return GNUNET_OK; } @@ -458,13 +506,13 @@ GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc, struct GNUNET_TIME_Relative *msg_delay, struct GNUNET_TIME_Relative *ack_delay) { - if (fc->task != GNUNET_SCHEDULER_NO_TASK) + if (fc->task != NULL) GNUNET_SCHEDULER_cancel (fc->task); if (NULL != ack_delay) *ack_delay = fc->ack_delay; if (NULL != msg_delay) - *msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, - fc->num_rounds); + *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay, + fc->num_rounds); GNUNET_free (fc); }