2 This file is part of GNUnet
3 (C) 2009, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file src/fragmentation/defragmentation.c
22 * @brief library to help defragment messages
23 * @author Christian Grothoff
26 #include "gnunet_fragmentation_lib.h"
27 #include "fragmentation.h"
30 * Timestamps for fragments.
35 * The time the fragment was received.
37 struct GNUNET_TIME_Absolute time;
40 * Number of the bit for the fragment (in [0,..,63]).
47 * Information we keep for one message that is being assembled. Note
48 * that we keep the context around even after the assembly is done to
49 * handle 'stray' messages that are received 'late'. A message
50 * context is ONLY discarded when the queue gets too big.
57 struct MessageContext *next;
62 struct MessageContext *prev;
65 * Associated defragmentation context.
67 struct GNUNET_DEFRAGMENT_Context *dc;
70 * Pointer to the assembled message, allocated at the
73 const struct GNUNET_MessageHeader *msg;
76 * Last time we received any update for this message
77 * (least-recently updated message will be discarded
78 * if we hit the queue size).
80 struct GNUNET_TIME_Absolute last_update;
83 * Task scheduled for transmitting the next ACK to the
86 GNUNET_SCHEDULER_TaskIdentifier ack_task;
89 * When did we receive which fragment? Used to calculate
90 * the time we should send the ACK.
92 struct FragTimes frag_times[64];
95 * Which fragments have we gotten yet? bits that are 1
96 * indicate missing fragments.
101 * Unique ID for this message.
103 uint32_t fragment_id;
106 * Which 'bit' did the last fragment we received correspond to?
108 unsigned int last_bit;
111 * For the current ACK round, which is the first relevant
112 * offset in 'frag_times'?
114 unsigned int frag_times_start_offset;
117 * Which offset whould we write the next frag value into
118 * in the 'frag_times' array? All smaller entries are valid.
120 unsigned int frag_times_write_offset;
123 * Total size of the message that we are assembling.
131 * Defragmentation context (one per connection).
133 struct GNUNET_DEFRAGMENT_Context
139 struct GNUNET_STATISTICS_Handle *stats;
142 * Head of list of messages we're defragmenting.
144 struct MessageContext *head;
147 * Tail of list of messages we're defragmenting.
149 struct MessageContext *tail;
152 * Closure for 'proc' and 'ackp'.
157 * Function to call with defragmented messages.
159 GNUNET_FRAGMENT_MessageProcessor proc;
162 * Function to call with acknowledgements.
164 GNUNET_DEFRAGMENT_AckProcessor ackp;
167 * Running average of the latency (delay between messages) for this
170 struct GNUNET_TIME_Relative latency;
173 * num_msgs how many fragmented messages
174 * to we defragment at most at the same time?
176 unsigned int num_msgs;
179 * Current number of messages in the 'struct MessageContext'
180 * DLL (smaller or equal to 'num_msgs').
182 unsigned int list_size;
185 * Maximum message size for each fragment.
192 * Create a defragmentation context.
194 * @param stats statistics context
195 * @param mtu the maximum message size for each fragment
196 * @param num_msgs how many fragmented messages
197 * to we defragment at most at the same time?
198 * @param cls closure for proc and ackp
199 * @param proc function to call with defragmented messages
200 * @param ackp function to call with acknowledgements (to send
201 * back to the other side)
202 * @return the defragmentation context
204 struct GNUNET_DEFRAGMENT_Context *
205 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
206 uint16_t mtu, unsigned int num_msgs,
208 GNUNET_FRAGMENT_MessageProcessor proc,
209 GNUNET_DEFRAGMENT_AckProcessor ackp)
211 struct GNUNET_DEFRAGMENT_Context *dc;
213 dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
218 dc->num_msgs = num_msgs;
220 dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
226 * Destroy the given defragmentation context.
228 * @param dc defragmentation context
231 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
233 struct MessageContext *mc;
235 while (NULL != (mc = dc->head))
237 GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
239 if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
241 GNUNET_SCHEDULER_cancel (mc->ack_task);
242 mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
246 GNUNET_assert (0 == dc->list_size);
252 * Send acknowledgement to the other peer now.
254 * @param cls the message context
255 * @param tc the scheduler context
258 send_ack (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
260 struct MessageContext *mc = cls;
261 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
262 struct FragmentAcknowledgement fa;
264 mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
265 fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
266 fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
267 fa.fragment_id = htonl (mc->fragment_id);
268 fa.bits = GNUNET_htonll (mc->bits);
269 GNUNET_STATISTICS_update (mc->dc->stats,
270 _("# acknowledgements sent for fragment"), 1,
272 dc->ackp (dc->cls, mc->fragment_id, &fa.header);
277 * This function is from the GNU Scientific Library, linear/fit.c,
278 * (C) 2000 Brian Gough
281 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
282 const size_t ystride, const size_t n, double *c1, double *cov_11,
285 double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
289 for (i = 0; i < n; i++)
291 m_x += (x[i * xstride] - m_x) / (i + 1.0);
292 m_y += (y[i * ystride] - m_y) / (i + 1.0);
295 for (i = 0; i < n; i++)
297 const double dx = x[i * xstride] - m_x;
298 const double dy = y[i * ystride] - m_y;
300 m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
301 m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
304 /* In terms of y = b x */
307 double s2 = 0, d2 = 0;
308 double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
312 /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
314 for (i = 0; i < n; i++)
316 const double dx = x[i * xstride] - m_x;
317 const double dy = y[i * ystride] - m_y;
318 const double d = (m_y - b * m_x) + dy - b * dx;
323 s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
325 *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
333 * Estimate the latency between messages based on the most recent
334 * message time stamps.
336 * @param mc context with time stamps
337 * @return average delay between time stamps (based on least-squares fit)
339 static struct GNUNET_TIME_Relative
340 estimate_latency (struct MessageContext *mc)
342 struct FragTimes *first;
343 size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
350 struct GNUNET_TIME_Relative ret;
352 first = &mc->frag_times[mc->frag_times_start_offset];
353 GNUNET_assert (total > 1);
354 for (i = 0; i < total; i++)
357 y[i] = (double) (first[i].time.abs_value - first[0].time.abs_value);
359 gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
360 c1 += sqrt (sumsq); /* add 1 std dev */
361 ret.rel_value = (uint64_t) c1;
362 if (ret.rel_value == 0)
363 ret = GNUNET_TIME_UNIT_MILLISECONDS; /* always at least 1 */
369 * Discard the message context that was inactive for the longest time.
371 * @param dc defragmentation context
374 discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
376 struct MessageContext *old;
377 struct MessageContext *pos;
384 (old->last_update.abs_value > pos->last_update.abs_value))
388 GNUNET_assert (NULL != old);
389 GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
391 if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
393 GNUNET_SCHEDULER_cancel (old->ack_task);
394 old->ack_task = GNUNET_SCHEDULER_NO_TASK;
401 * We have received a fragment. Process it.
403 * @param dc the context
404 * @param msg the message that was received
405 * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
408 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
409 const struct GNUNET_MessageHeader *msg)
411 struct MessageContext *mc;
412 const struct FragmentHeader *fh;
418 struct GNUNET_TIME_Absolute now;
419 struct GNUNET_TIME_Relative delay;
425 if (ntohs (msg->size) < sizeof (struct FragmentHeader))
428 return GNUNET_SYSERR;
430 if (ntohs (msg->size) > dc->mtu)
433 return GNUNET_SYSERR;
435 fh = (const struct FragmentHeader *) msg;
436 msize = ntohs (fh->total_size);
437 if (msize < sizeof (struct GNUNET_MessageHeader))
440 return GNUNET_SYSERR;
442 fid = ntohl (fh->fragment_id);
443 foff = ntohs (fh->offset);
447 return GNUNET_SYSERR;
449 if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
452 return GNUNET_SYSERR;
454 GNUNET_STATISTICS_update (dc->stats, _("# fragments received"), 1, GNUNET_NO);
456 while ((NULL != mc) && (fid != mc->fragment_id))
458 bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
459 if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
460 sizeof (struct FragmentHeader) > msize)
462 /* payload extends past total message size */
464 return GNUNET_SYSERR;
466 if ((NULL != mc) && (msize != mc->total_size))
468 /* inconsistent message size */
470 return GNUNET_SYSERR;
472 now = GNUNET_TIME_absolute_get ();
475 mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
476 mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
478 mc->total_size = msize;
479 mc->fragment_id = fid;
480 mc->last_update = now;
481 n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
485 mc->bits = UINT64_MAX; /* set all 64 bit */
487 mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */
488 if (dc->list_size >= dc->num_msgs)
489 discard_oldest_mc (dc);
490 GNUNET_CONTAINER_DLL_insert (dc->head, dc->tail, mc);
494 /* copy data to 'mc' */
495 if (0 != (mc->bits & (1LL << bit)))
497 mc->bits -= 1LL << bit;
498 mbuf = (char *) &mc[1];
499 memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
500 ntohs (msg->size) - sizeof (struct FragmentHeader));
501 mc->last_update = now;
502 if (bit < mc->last_bit)
503 mc->frag_times_start_offset = mc->frag_times_write_offset;
505 mc->frag_times[mc->frag_times_write_offset].time = now;
506 mc->frag_times[mc->frag_times_write_offset].bit = bit;
507 mc->frag_times_write_offset++;
508 duplicate = GNUNET_NO;
512 duplicate = GNUNET_YES;
513 GNUNET_STATISTICS_update (dc->stats, _("# duplicate fragments received"), 1,
517 /* count number of missing fragments */
519 for (b = 0; b < 64; b++)
520 if (0 != (mc->bits & (1LL << b)))
523 /* notify about complete message */
524 if ((duplicate == GNUNET_NO) && (0 == mc->bits))
526 GNUNET_STATISTICS_update (dc->stats, _("# messages defragmented"), 1,
528 /* message complete, notify! */
529 dc->proc (dc->cls, mc->msg);
532 if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
533 dc->latency = estimate_latency (mc);
534 delay = GNUNET_TIME_relative_multiply (dc->latency, bc + 1);
535 if ((0 == mc->bits) || (GNUNET_YES == duplicate)) /* message complete or duplicate, ACK now! */
537 delay = GNUNET_TIME_UNIT_ZERO;
539 if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
540 GNUNET_SCHEDULER_cancel (mc->ack_task);
541 mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, &send_ack, mc);
542 if (duplicate == GNUNET_YES)
547 /* end of defragmentation.c */