X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffragmentation%2Fdefragmentation.c;h=bbe6f3741c1e49cec545bdca24c6d764fbcf3dd2;hb=4e35e0bacaf1a6e2ab7a98f0a729f5ea493aef3d;hp=1d62484b58f0134ada00b92d18049ae45666b42d;hpb=8766ccd1af741881379c16f1c2947edd32819e62;p=oweals%2Fgnunet.git diff --git a/src/fragmentation/defragmentation.c b/src/fragmentation/defragmentation.c index 1d62484b5..bbe6f3741 100644 --- a/src/fragmentation/defragmentation.c +++ b/src/fragmentation/defragmentation.c @@ -1,24 +1,24 @@ /* This file is part of GNUnet - (C) 2009, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2009, 2011 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + SPDX-License-Identifier: AGPL3.0-or-later */ /** - * @file src/fragmentation/defragmentation_new.c + * @file src/fragmentation/defragmentation.c * @brief library to help defragment messages * @author Christian Grothoff */ @@ -69,7 +69,7 @@ struct MessageContext /** * Pointer to the assembled message, allocated at the * end of this struct. - */ + */ const struct GNUNET_MessageHeader *msg; /** @@ -83,7 +83,7 @@ struct MessageContext * Task scheduled for transmitting the next ACK to the * other peer. */ - GNUNET_SCHEDULER_TaskIdentifier ack_task; + struct GNUNET_SCHEDULER_Task * ack_task; /** * When did we receive which fragment? Used to calculate @@ -109,13 +109,13 @@ struct MessageContext /** * For the current ACK round, which is the first relevant - * offset in 'frag_times'? + * offset in @e frag_times? */ unsigned int frag_times_start_offset; /** * Which offset whould we write the next frag value into - * in the 'frag_times' array? All smaller entries are valid. + * in the @e frag_times array? All smaller entries are valid. */ unsigned int frag_times_write_offset; @@ -124,6 +124,11 @@ struct MessageContext */ uint16_t total_size; + /** + * Was the last fragment we got a duplicate? + */ + int16_t last_duplicate; + }; @@ -149,7 +154,7 @@ struct GNUNET_DEFRAGMENT_Context struct MessageContext *tail; /** - * Closure for 'proc' and 'ackp'. + * Closure for @e proc and @e ackp. */ void *cls; @@ -183,8 +188,9 @@ struct GNUNET_DEFRAGMENT_Context /** * Maximum message size for each fragment. - */ + */ uint16_t mtu; + }; @@ -192,10 +198,10 @@ struct GNUNET_DEFRAGMENT_Context * Create a defragmentation context. * * @param stats statistics context - * @param mtu the maximum message size for each fragment + * @param mtu the maximum message size for each fragment * @param num_msgs how many fragmented messages * to we defragment at most at the same time? - * @param cls closure for proc and ackp + * @param cls closure for @a proc and @a ackp * @param proc function to call with defragmented messages * @param ackp function to call with acknowledgements (to send * back to the other side) @@ -203,22 +209,21 @@ struct GNUNET_DEFRAGMENT_Context */ struct GNUNET_DEFRAGMENT_Context * GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, - uint16_t mtu, - unsigned int num_msgs, - void *cls, - GNUNET_FRAGMENT_MessageProcessor proc, - GNUNET_DEFRAGMENT_AckProcessor ackp) + uint16_t mtu, unsigned int num_msgs, + void *cls, + GNUNET_FRAGMENT_MessageProcessor proc, + GNUNET_DEFRAGMENT_AckProcessor ackp) { struct GNUNET_DEFRAGMENT_Context *dc; - dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context)); + dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context); dc->stats = stats; dc->cls = cls; dc->proc = proc; dc->ackp = ackp; dc->num_msgs = num_msgs; dc->mtu = mtu; - dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ + dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ return dc; } @@ -228,24 +233,22 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, * * @param dc defragmentation context */ -void +void GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) { struct MessageContext *mc; while (NULL != (mc = dc->head)) + { + GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc); + dc->list_size--; + if (NULL != mc->ack_task) { - GNUNET_CONTAINER_DLL_remove (dc->head, - dc->tail, - mc); - dc->list_size--; - if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) - { - GNUNET_SCHEDULER_cancel (mc->ack_task); - mc->ack_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_free (mc); + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = NULL; } + GNUNET_free (mc); + } GNUNET_assert (0 == dc->list_size); GNUNET_free (dc); } @@ -255,57 +258,57 @@ GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) * Send acknowledgement to the other peer now. * * @param cls the message context - * @param tc the scheduler context */ static void -send_ack (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +send_ack (void *cls) { struct MessageContext *mc = cls; struct GNUNET_DEFRAGMENT_Context *dc = mc->dc; struct FragmentAcknowledgement fa; - mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + mc->ack_task = NULL; fa.header.size = htons (sizeof (struct FragmentAcknowledgement)); fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); fa.fragment_id = htonl (mc->fragment_id); fa.bits = GNUNET_htonll (mc->bits); GNUNET_STATISTICS_update (mc->dc->stats, - _("# acknowledgements sent for fragment"), - 1, - GNUNET_NO); - dc->ackp (dc->cls, mc->fragment_id, &fa.header); + _("# acknowledgements sent for fragment"), + 1, + GNUNET_NO); + mc->last_duplicate = GNUNET_NO; /* clear flag */ + dc->ackp (dc->cls, + mc->fragment_id, + &fa.header); } /** * This function is from the GNU Scientific Library, linear/fit.c, - * (C) 2000 Brian Gough + * Copyright (C) 2000 Brian Gough */ static void -gsl_fit_mul (const double *x, const size_t xstride, - const double *y, const size_t ystride, - const size_t n, - double *c1, double *cov_11, double *sumsq) +gsl_fit_mul (const double *x, const size_t xstride, const double *y, + const size_t ystride, const size_t n, double *c1, double *cov_11, + double *sumsq) { double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0; size_t i; for (i = 0; i < n; i++) - { - m_x += (x[i * xstride] - m_x) / (i + 1.0); - m_y += (y[i * ystride] - m_y) / (i + 1.0); - } + { + m_x += (x[i * xstride] - m_x) / (i + 1.0); + m_y += (y[i * ystride] - m_y) / (i + 1.0); + } for (i = 0; i < n; i++) - { - const double dx = x[i * xstride] - m_x; - const double dy = y[i * ystride] - m_y; + { + const double dx = x[i * xstride] - m_x; + const double dy = y[i * ystride] - m_y; - m_dx2 += (dx * dx - m_dx2) / (i + 1.0); - m_dxdy += (dx * dy - m_dxdy) / (i + 1.0); - } + m_dx2 += (dx * dx - m_dx2) / (i + 1.0); + m_dxdy += (dx * dy - m_dxdy) / (i + 1.0); + } /* In terms of y = b x */ @@ -318,12 +321,13 @@ gsl_fit_mul (const double *x, const size_t xstride, /* Compute chi^2 = \sum (y_i - b * x_i)^2 */ for (i = 0; i < n; i++) - { - const double dx = x[i * xstride] - m_x; - const double dy = y[i * ystride] - m_y; - const double d = (m_y - b * m_x) + dy - b * dx; - d2 += d * d; - } + { + const double dx = x[i * xstride] - m_x; + const double dy = y[i * ystride] - m_y; + const double d = (m_y - b * m_x) + dy - b * dx; + + d2 += d * d; + } s2 = d2 / (n - 1.0); /* chisq per degree of freedom */ @@ -356,15 +360,18 @@ estimate_latency (struct MessageContext *mc) first = &mc->frag_times[mc->frag_times_start_offset]; GNUNET_assert (total > 1); - for (i=0;ihead; while (NULL != pos) - { - if ( (old == NULL) || - (old->last_update.abs_value > pos->last_update.abs_value) ) - old = pos; - pos = pos->next; - } + { + if ((old == NULL) || + (old->last_update.abs_value_us > pos->last_update.abs_value_us)) + old = pos; + pos = pos->next; + } GNUNET_assert (NULL != old); - GNUNET_CONTAINER_DLL_remove (dc->head, - dc->tail, - old); + GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old); dc->list_size--; - if (GNUNET_SCHEDULER_NO_TASK != old->ack_task) + if (NULL != old->ack_task) + { GNUNET_SCHEDULER_cancel (old->ack_task); + old->ack_task = NULL; + } GNUNET_free (old); } @@ -403,11 +411,13 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc) * * @param dc the context * @param msg the message that was received - * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error + * @return #GNUNET_OK on success, + * #GNUNET_NO if this was a duplicate, + * #GNUNET_SYSERR on error */ -int +int GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_MessageHeader *msg) { struct MessageContext *mc; const struct FragmentHeader *fh; @@ -421,132 +431,160 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, unsigned int bc; unsigned int b; unsigned int n; + unsigned int num_fragments; int duplicate; + int last; - if (ntohs(msg->size) < sizeof (struct FragmentHeader)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + if (ntohs (msg->size) < sizeof (struct FragmentHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } if (ntohs (msg->size) > dc->mtu) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - fh = (const struct FragmentHeader*) msg; + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + fh = (const struct FragmentHeader *) msg; msize = ntohs (fh->total_size); + if (msize < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } fid = ntohl (fh->fragment_id); foff = ntohs (fh->offset); if (foff >= msize) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader)))) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } GNUNET_STATISTICS_update (dc->stats, - _("# fragments received"), - 1, - GNUNET_NO); + _("# fragments received"), + 1, + GNUNET_NO); + num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader)); + last = 0; + for (mc = dc->head; NULL != mc; mc = mc->next) + if (mc->fragment_id > fid) + last++; + mc = dc->head; - while ( (NULL != mc) && - (fid != mc->fragment_id) ) + while ((NULL != mc) && (fid != mc->fragment_id)) mc = mc->next; bit = foff / (dc->mtu - sizeof (struct FragmentHeader)); - if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) - - sizeof (struct FragmentHeader) > msize) - { - /* payload extends past total message size */ - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if ( (NULL != mc) && (msize != mc->total_size) ) - { - /* inconsistent message size */ - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) - + sizeof (struct FragmentHeader) > msize) + { + /* payload extends past total message size */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ((NULL != mc) && (msize != mc->total_size)) + { + /* inconsistent message size */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } now = GNUNET_TIME_absolute_get (); if (NULL == mc) - { - mc = GNUNET_malloc (sizeof (struct MessageContext) + msize); - mc->msg = (const struct GNUNET_MessageHeader*) &mc[1]; - mc->dc = dc; - mc->total_size = msize; - mc->fragment_id = fid; - mc->last_update = now; - n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader)); - if (n == 64) - mc->bits = UINT64_MAX; /* set all 64 bit */ - else - mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */ - GNUNET_CONTAINER_DLL_insert (dc->head, - dc->tail, - mc); - dc->list_size++; - if (dc->list_size > dc->num_msgs) - discard_oldest_mc (dc); - } + { + mc = GNUNET_malloc (sizeof (struct MessageContext) + msize); + mc->msg = (const struct GNUNET_MessageHeader *) &mc[1]; + mc->dc = dc; + mc->total_size = msize; + mc->fragment_id = fid; + mc->last_update = now; + n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - + sizeof (struct + FragmentHeader)); + if (n == 64) + mc->bits = UINT64_MAX; /* set all 64 bit */ + else + mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */ + if (dc->list_size >= dc->num_msgs) + discard_oldest_mc (dc); + GNUNET_CONTAINER_DLL_insert (dc->head, + dc->tail, + mc); + dc->list_size++; + } /* copy data to 'mc' */ - if (0 != (mc->bits & (1LL << bit))) - { - mc->bits -= 1LL << bit; - mbuf = (char* )&mc[1]; - memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], - &fh[1], - ntohs (msg->size) - sizeof (struct FragmentHeader)); - mc->last_update = now; - if (bit < mc->last_bit) - mc->frag_times_start_offset = mc->frag_times_write_offset; - mc->last_bit = bit; - mc->frag_times[mc->frag_times_write_offset].time = now; - mc->frag_times[mc->frag_times_write_offset].bit = bit; - mc->frag_times_write_offset++; - duplicate = GNUNET_NO; - } + if (0 != (mc->bits & (1LLU << bit))) + { + mc->bits -= 1LLU << bit; + mbuf = (char *) &mc[1]; + GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1], + ntohs (msg->size) - sizeof (struct FragmentHeader)); + mc->last_update = now; + if (bit < mc->last_bit) + mc->frag_times_start_offset = mc->frag_times_write_offset; + mc->last_bit = bit; + mc->frag_times[mc->frag_times_write_offset].time = now; + mc->frag_times[mc->frag_times_write_offset].bit = bit; + mc->frag_times_write_offset++; + duplicate = GNUNET_NO; + } else - { - duplicate = GNUNET_YES; - GNUNET_STATISTICS_update (dc->stats, - _("# duplicate fragments received"), - 1, - GNUNET_NO); - } + { + duplicate = GNUNET_YES; + GNUNET_STATISTICS_update (dc->stats, + _("# duplicate fragments received"), + 1, + GNUNET_NO); + } - /* count number of missing fragments */ + /* count number of missing fragments after the current one */ bc = 0; - for (b=0;b<64;b++) - if (0 != (mc->bits & (1LL << b))) bc++; + for (b = bit; b < 64; b++) + if (0 != (mc->bits & (1LLU << b))) + bc++; + else + bc = 0; + + /* notify about complete message */ + if ( (GNUNET_NO == duplicate) && + (0 == mc->bits) ) + { + GNUNET_STATISTICS_update (dc->stats, + _("# messages defragmented"), + 1, + GNUNET_NO); + /* message complete, notify! */ + dc->proc (dc->cls, mc->msg); + } + /* send ACK */ if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1) + { dc->latency = estimate_latency (mc); - delay = GNUNET_TIME_relative_multiply (dc->latency, - bc + 1); - if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */ + } + delay = GNUNET_TIME_relative_saturating_multiply (dc->latency, + bc + 1); + if ( (last + fid == num_fragments) || + (0 == mc->bits) || + (GNUNET_YES == duplicate) ) + { + /* message complete or duplicate or last missing fragment in + linear sequence; ACK now! */ delay = GNUNET_TIME_UNIT_ZERO; - if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + } + if (NULL != mc->ack_task) GNUNET_SCHEDULER_cancel (mc->ack_task); mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, - &send_ack, - mc); - if ( (duplicate == GNUNET_NO) && - (0 == mc->bits) ) - { - GNUNET_STATISTICS_update (dc->stats, - _("# messages defragmented"), - 1, - GNUNET_NO); - /* message complete, notify! */ - dc->proc (dc->cls, - mc->msg); - } - if (duplicate == GNUNET_YES) + &send_ack, + mc); + if (GNUNET_YES == duplicate) + { + mc->last_duplicate = GNUNET_YES; return GNUNET_NO; + } return GNUNET_YES; } -/* end of defragmentation_new.c */ - +/* end of defragmentation.c */