2 This file is part of GNUnet
3 Copyright (C) 2009, 2011 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file src/fragmentation/defragmentation.c
22 * @brief library to help defragment messages
23 * @author Christian Grothoff
26 #include "gnunet_fragmentation_lib.h"
27 #include "fragmentation.h"
30 * Timestamps for fragments.
34 * The time the fragment was received.
36 struct GNUNET_TIME_Absolute time;
39 * Number of the bit for the fragment (in [0,..,63]).
46 * Information we keep for one message that is being assembled. Note
47 * that we keep the context around even after the assembly is done to
48 * handle 'stray' messages that are received 'late'. A message
49 * context is ONLY discarded when the queue gets too big.
51 struct MessageContext {
55 struct MessageContext *next;
60 struct MessageContext *prev;
63 * Associated defragmentation context.
65 struct GNUNET_DEFRAGMENT_Context *dc;
68 * Pointer to the assembled message, allocated at the
71 const struct GNUNET_MessageHeader *msg;
74 * Last time we received any update for this message
75 * (least-recently updated message will be discarded
76 * if we hit the queue size).
78 struct GNUNET_TIME_Absolute last_update;
81 * Task scheduled for transmitting the next ACK to the
84 struct GNUNET_SCHEDULER_Task * ack_task;
87 * When did we receive which fragment? Used to calculate
88 * the time we should send the ACK.
90 struct FragTimes frag_times[64];
93 * Which fragments have we gotten yet? bits that are 1
94 * indicate missing fragments.
99 * Unique ID for this message.
101 uint32_t fragment_id;
104 * Which 'bit' did the last fragment we received correspond to?
106 unsigned int last_bit;
109 * For the current ACK round, which is the first relevant
110 * offset in @e frag_times?
112 unsigned int frag_times_start_offset;
115 * Which offset whould we write the next frag value into
116 * in the @e frag_times array? All smaller entries are valid.
118 unsigned int frag_times_write_offset;
121 * Total size of the message that we are assembling.
126 * Was the last fragment we got a duplicate?
128 int16_t last_duplicate;
133 * Defragmentation context (one per connection).
135 struct GNUNET_DEFRAGMENT_Context {
139 struct GNUNET_STATISTICS_Handle *stats;
142 * Head of list of messages we're defragmenting.
144 struct MessageContext *head;
147 * Tail of list of messages we're defragmenting.
149 struct MessageContext *tail;
152 * Closure for @e proc and @e ackp.
157 * Function to call with defragmented messages.
159 GNUNET_FRAGMENT_MessageProcessor proc;
162 * Function to call with acknowledgements.
164 GNUNET_DEFRAGMENT_AckProcessor ackp;
167 * Running average of the latency (delay between messages) for this
170 struct GNUNET_TIME_Relative latency;
173 * num_msgs how many fragmented messages
174 * to we defragment at most at the same time?
176 unsigned int num_msgs;
179 * Current number of messages in the 'struct MessageContext'
180 * DLL (smaller or equal to 'num_msgs').
182 unsigned int list_size;
185 * Maximum message size for each fragment.
192 * Create a defragmentation context.
194 * @param stats statistics context
195 * @param mtu the maximum message size for each fragment
196 * @param num_msgs how many fragmented messages
197 * to we defragment at most at the same time?
198 * @param cls closure for @a proc and @a ackp
199 * @param proc function to call with defragmented messages
200 * @param ackp function to call with acknowledgements (to send
201 * back to the other side)
202 * @return the defragmentation context
204 struct GNUNET_DEFRAGMENT_Context *
205 GNUNET_DEFRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats,
206 uint16_t mtu, unsigned int num_msgs,
208 GNUNET_FRAGMENT_MessageProcessor proc,
209 GNUNET_DEFRAGMENT_AckProcessor ackp)
211 struct GNUNET_DEFRAGMENT_Context *dc;
213 dc = GNUNET_new(struct GNUNET_DEFRAGMENT_Context);
218 dc->num_msgs = num_msgs;
220 dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
226 * Destroy the given defragmentation context.
228 * @param dc defragmentation context
231 GNUNET_DEFRAGMENT_context_destroy(struct GNUNET_DEFRAGMENT_Context *dc)
233 struct MessageContext *mc;
235 while (NULL != (mc = dc->head))
237 GNUNET_CONTAINER_DLL_remove(dc->head, dc->tail, mc);
239 if (NULL != mc->ack_task)
241 GNUNET_SCHEDULER_cancel(mc->ack_task);
246 GNUNET_assert(0 == dc->list_size);
252 * Send acknowledgement to the other peer now.
254 * @param cls the message context
259 struct MessageContext *mc = cls;
260 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
261 struct FragmentAcknowledgement fa;
264 fa.header.size = htons(sizeof(struct FragmentAcknowledgement));
265 fa.header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
266 fa.fragment_id = htonl(mc->fragment_id);
267 fa.bits = GNUNET_htonll(mc->bits);
268 GNUNET_STATISTICS_update(mc->dc->stats,
269 _("# acknowledgements sent for fragment"),
272 mc->last_duplicate = GNUNET_NO; /* clear flag */
280 * This function is from the GNU Scientific Library, linear/fit.c,
281 * Copyright (C) 2000 Brian Gough
284 gsl_fit_mul(const double *x, const size_t xstride, const double *y,
285 const size_t ystride, const size_t n, double *c1, double *cov_11,
288 double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
292 for (i = 0; i < n; i++)
294 m_x += (x[i * xstride] - m_x) / (i + 1.0);
295 m_y += (y[i * ystride] - m_y) / (i + 1.0);
298 for (i = 0; i < n; i++)
300 const double dx = x[i * xstride] - m_x;
301 const double dy = y[i * ystride] - m_y;
303 m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
304 m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
307 /* In terms of y = b x */
310 double s2 = 0, d2 = 0;
311 double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
315 /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
317 for (i = 0; i < n; i++)
319 const double dx = x[i * xstride] - m_x;
320 const double dy = y[i * ystride] - m_y;
321 const double d = (m_y - b * m_x) + dy - b * dx;
326 s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
328 *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
336 * Estimate the latency between messages based on the most recent
337 * message time stamps.
339 * @param mc context with time stamps
340 * @return average delay between time stamps (based on least-squares fit)
342 static struct GNUNET_TIME_Relative
343 estimate_latency(struct MessageContext *mc)
345 struct FragTimes *first;
346 size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
353 struct GNUNET_TIME_Relative ret;
355 first = &mc->frag_times[mc->frag_times_start_offset];
356 GNUNET_assert(total > 1);
357 for (i = 0; i < total; i++)
360 y[i] = (double)(first[i].time.abs_value_us - first[0].time.abs_value_us);
362 gsl_fit_mul(x, 1, y, 1, total, &c1, &cov11, &sumsq);
363 c1 += sqrt(sumsq); /* add 1 std dev */
364 ret.rel_value_us = (uint64_t)c1;
365 if (0 == ret.rel_value_us)
366 ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
372 * Discard the message context that was inactive for the longest time.
374 * @param dc defragmentation context
377 discard_oldest_mc(struct GNUNET_DEFRAGMENT_Context *dc)
379 struct MessageContext *old;
380 struct MessageContext *pos;
387 (old->last_update.abs_value_us > pos->last_update.abs_value_us))
391 GNUNET_assert(NULL != old);
392 GNUNET_CONTAINER_DLL_remove(dc->head, dc->tail, old);
394 if (NULL != old->ack_task)
396 GNUNET_SCHEDULER_cancel(old->ack_task);
397 old->ack_task = NULL;
404 * We have received a fragment. Process it.
406 * @param dc the context
407 * @param msg the message that was received
408 * @return #GNUNET_OK on success,
409 * #GNUNET_NO if this was a duplicate,
410 * #GNUNET_SYSERR on error
413 GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc,
414 const struct GNUNET_MessageHeader *msg)
416 struct MessageContext *mc;
417 const struct FragmentHeader *fh;
423 struct GNUNET_TIME_Absolute now;
424 struct GNUNET_TIME_Relative delay;
428 unsigned int num_fragments;
432 if (ntohs(msg->size) < sizeof(struct FragmentHeader))
435 return GNUNET_SYSERR;
437 if (ntohs(msg->size) > dc->mtu)
440 return GNUNET_SYSERR;
442 fh = (const struct FragmentHeader *)msg;
443 msize = ntohs(fh->total_size);
444 if (msize < sizeof(struct GNUNET_MessageHeader))
447 return GNUNET_SYSERR;
449 fid = ntohl(fh->fragment_id);
450 foff = ntohs(fh->offset);
454 return GNUNET_SYSERR;
456 if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader))))
459 return GNUNET_SYSERR;
461 GNUNET_STATISTICS_update(dc->stats,
462 _("# fragments received"),
465 num_fragments = (ntohs(msg->size) + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu - sizeof(struct FragmentHeader));
467 for (mc = dc->head; NULL != mc; mc = mc->next)
468 if (mc->fragment_id > fid)
472 while ((NULL != mc) && (fid != mc->fragment_id))
474 bit = foff / (dc->mtu - sizeof(struct FragmentHeader));
475 if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs(msg->size) -
476 sizeof(struct FragmentHeader) > msize)
478 /* payload extends past total message size */
480 return GNUNET_SYSERR;
482 if ((NULL != mc) && (msize != mc->total_size))
484 /* inconsistent message size */
486 return GNUNET_SYSERR;
488 now = GNUNET_TIME_absolute_get();
491 mc = GNUNET_malloc(sizeof(struct MessageContext) + msize);
492 mc->msg = (const struct GNUNET_MessageHeader *)&mc[1];
494 mc->total_size = msize;
495 mc->fragment_id = fid;
496 mc->last_update = now;
497 n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu -
501 mc->bits = UINT64_MAX; /* set all 64 bit */
503 mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */
504 if (dc->list_size >= dc->num_msgs)
505 discard_oldest_mc(dc);
506 GNUNET_CONTAINER_DLL_insert(dc->head,
512 /* copy data to 'mc' */
513 if (0 != (mc->bits & (1LLU << bit)))
515 mc->bits -= 1LLU << bit;
516 mbuf = (char *)&mc[1];
517 GNUNET_memcpy(&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))], &fh[1],
518 ntohs(msg->size) - sizeof(struct FragmentHeader));
519 mc->last_update = now;
520 if (bit < mc->last_bit)
521 mc->frag_times_start_offset = mc->frag_times_write_offset;
523 mc->frag_times[mc->frag_times_write_offset].time = now;
524 mc->frag_times[mc->frag_times_write_offset].bit = bit;
525 mc->frag_times_write_offset++;
526 duplicate = GNUNET_NO;
530 duplicate = GNUNET_YES;
531 GNUNET_STATISTICS_update(dc->stats,
532 _("# duplicate fragments received"),
537 /* count number of missing fragments after the current one */
539 for (b = bit; b < 64; b++)
540 if (0 != (mc->bits & (1LLU << b)))
545 /* notify about complete message */
546 if ((GNUNET_NO == duplicate) &&
549 GNUNET_STATISTICS_update(dc->stats,
550 _("# messages defragmented"),
553 /* message complete, notify! */
554 dc->proc(dc->cls, mc->msg);
557 if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
559 dc->latency = estimate_latency(mc);
561 delay = GNUNET_TIME_relative_saturating_multiply(dc->latency,
563 if ((last + fid == num_fragments) ||
565 (GNUNET_YES == duplicate))
567 /* message complete or duplicate or last missing fragment in
568 linear sequence; ACK now! */
569 delay = GNUNET_TIME_UNIT_ZERO;
571 if (NULL != mc->ack_task)
572 GNUNET_SCHEDULER_cancel(mc->ack_task);
573 mc->ack_task = GNUNET_SCHEDULER_add_delayed(delay,
576 if (GNUNET_YES == duplicate)
578 mc->last_duplicate = GNUNET_YES;
584 /* end of defragmentation.c */