glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15 /**
16  * @file src/fragmentation/defragmentation.c
17  * @brief library to help defragment messages
18  * @author Christian Grothoff
19  */
20 #include "platform.h"
21 #include "gnunet_fragmentation_lib.h"
22 #include "fragmentation.h"
23
24 /**
25  * Timestamps for fragments.
26  */
27 struct FragTimes
28 {
29   /**
30    * The time the fragment was received.
31    */
32   struct GNUNET_TIME_Absolute time;
33
34   /**
35    * Number of the bit for the fragment (in [0,..,63]).
36    */
37   unsigned int bit;
38 };
39
40
41 /**
42  * Information we keep for one message that is being assembled.  Note
43  * that we keep the context around even after the assembly is done to
44  * handle 'stray' messages that are received 'late'.  A message
45  * context is ONLY discarded when the queue gets too big.
46  */
47 struct MessageContext
48 {
49   /**
50    * This is a DLL.
51    */
52   struct MessageContext *next;
53
54   /**
55    * This is a DLL.
56    */
57   struct MessageContext *prev;
58
59   /**
60    * Associated defragmentation context.
61    */
62   struct GNUNET_DEFRAGMENT_Context *dc;
63
64   /**
65    * Pointer to the assembled message, allocated at the
66    * end of this struct.
67    */
68   const struct GNUNET_MessageHeader *msg;
69
70   /**
71    * Last time we received any update for this message
72    * (least-recently updated message will be discarded
73    * if we hit the queue size).
74    */
75   struct GNUNET_TIME_Absolute last_update;
76
77   /**
78    * Task scheduled for transmitting the next ACK to the
79    * other peer.
80    */
81   struct GNUNET_SCHEDULER_Task * ack_task;
82
83   /**
84    * When did we receive which fragment? Used to calculate
85    * the time we should send the ACK.
86    */
87   struct FragTimes frag_times[64];
88
89   /**
90    * Which fragments have we gotten yet? bits that are 1
91    * indicate missing fragments.
92    */
93   uint64_t bits;
94
95   /**
96    * Unique ID for this message.
97    */
98   uint32_t fragment_id;
99
100   /**
101    * Which 'bit' did the last fragment we received correspond to?
102    */
103   unsigned int last_bit;
104
105   /**
106    * For the current ACK round, which is the first relevant
107    * offset in @e frag_times?
108    */
109   unsigned int frag_times_start_offset;
110
111   /**
112    * Which offset whould we write the next frag value into
113    * in the @e frag_times array? All smaller entries are valid.
114    */
115   unsigned int frag_times_write_offset;
116
117   /**
118    * Total size of the message that we are assembling.
119    */
120   uint16_t total_size;
121
122   /**
123    * Was the last fragment we got a duplicate?
124    */
125   int16_t last_duplicate;
126
127 };
128
129
130 /**
131  * Defragmentation context (one per connection).
132  */
133 struct GNUNET_DEFRAGMENT_Context
134 {
135
136   /**
137    * For statistics.
138    */
139   struct GNUNET_STATISTICS_Handle *stats;
140
141   /**
142    * Head of list of messages we're defragmenting.
143    */
144   struct MessageContext *head;
145
146   /**
147    * Tail of list of messages we're defragmenting.
148    */
149   struct MessageContext *tail;
150
151   /**
152    * Closure for @e proc and @e ackp.
153    */
154   void *cls;
155
156   /**
157    * Function to call with defragmented messages.
158    */
159   GNUNET_FRAGMENT_MessageProcessor proc;
160
161   /**
162    * Function to call with acknowledgements.
163    */
164   GNUNET_DEFRAGMENT_AckProcessor ackp;
165
166   /**
167    * Running average of the latency (delay between messages) for this
168    * connection.
169    */
170   struct GNUNET_TIME_Relative latency;
171
172   /**
173    * num_msgs how many fragmented messages
174    * to we defragment at most at the same time?
175    */
176   unsigned int num_msgs;
177
178   /**
179    * Current number of messages in the 'struct MessageContext'
180    * DLL (smaller or equal to 'num_msgs').
181    */
182   unsigned int list_size;
183
184   /**
185    * Maximum message size for each fragment.
186    */
187   uint16_t mtu;
188
189 };
190
191
192 /**
193  * Create a defragmentation context.
194  *
195  * @param stats statistics context
196  * @param mtu the maximum message size for each fragment
197  * @param num_msgs how many fragmented messages
198  *                 to we defragment at most at the same time?
199  * @param cls closure for @a proc and @a ackp
200  * @param proc function to call with defragmented messages
201  * @param ackp function to call with acknowledgements (to send
202  *             back to the other side)
203  * @return the defragmentation context
204  */
205 struct GNUNET_DEFRAGMENT_Context *
206 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
207                                   uint16_t mtu, unsigned int num_msgs,
208                                   void *cls,
209                                   GNUNET_FRAGMENT_MessageProcessor proc,
210                                   GNUNET_DEFRAGMENT_AckProcessor ackp)
211 {
212   struct GNUNET_DEFRAGMENT_Context *dc;
213
214   dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
215   dc->stats = stats;
216   dc->cls = cls;
217   dc->proc = proc;
218   dc->ackp = ackp;
219   dc->num_msgs = num_msgs;
220   dc->mtu = mtu;
221   dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
222   return dc;
223 }
224
225
226 /**
227  * Destroy the given defragmentation context.
228  *
229  * @param dc defragmentation context
230  */
231 void
232 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
233 {
234   struct MessageContext *mc;
235
236   while (NULL != (mc = dc->head))
237   {
238     GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
239     dc->list_size--;
240     if (NULL != mc->ack_task)
241     {
242       GNUNET_SCHEDULER_cancel (mc->ack_task);
243       mc->ack_task = NULL;
244     }
245     GNUNET_free (mc);
246   }
247   GNUNET_assert (0 == dc->list_size);
248   GNUNET_free (dc);
249 }
250
251
252 /**
253  * Send acknowledgement to the other peer now.
254  *
255  * @param cls the message context
256  */
257 static void
258 send_ack (void *cls)
259 {
260   struct MessageContext *mc = cls;
261   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
262   struct FragmentAcknowledgement fa;
263
264   mc->ack_task = NULL;
265   fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
266   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
267   fa.fragment_id = htonl (mc->fragment_id);
268   fa.bits = GNUNET_htonll (mc->bits);
269   GNUNET_STATISTICS_update (mc->dc->stats,
270                             _("# acknowledgements sent for fragment"),
271                             1,
272                             GNUNET_NO);
273   mc->last_duplicate = GNUNET_NO; /* clear flag */
274   dc->ackp (dc->cls,
275             mc->fragment_id,
276             &fa.header);
277 }
278
279
280 /**
281  * This function is from the GNU Scientific Library, linear/fit.c,
282  * Copyright (C) 2000 Brian Gough
283  */
284 static void
285 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
286              const size_t ystride, const size_t n, double *c1, double *cov_11,
287              double *sumsq)
288 {
289   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
290
291   size_t i;
292
293   for (i = 0; i < n; i++)
294   {
295     m_x += (x[i * xstride] - m_x) / (i + 1.0);
296     m_y += (y[i * ystride] - m_y) / (i + 1.0);
297   }
298
299   for (i = 0; i < n; i++)
300   {
301     const double dx = x[i * xstride] - m_x;
302     const double dy = y[i * ystride] - m_y;
303
304     m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
305     m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
306   }
307
308   /* In terms of y =  b x */
309
310   {
311     double s2 = 0, d2 = 0;
312     double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
313
314     *c1 = b;
315
316     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
317
318     for (i = 0; i < n; i++)
319     {
320       const double dx = x[i * xstride] - m_x;
321       const double dy = y[i * ystride] - m_y;
322       const double d = (m_y - b * m_x) + dy - b * dx;
323
324       d2 += d * d;
325     }
326
327     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
328
329     *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
330
331     *sumsq = d2;
332   }
333 }
334
335
336 /**
337  * Estimate the latency between messages based on the most recent
338  * message time stamps.
339  *
340  * @param mc context with time stamps
341  * @return average delay between time stamps (based on least-squares fit)
342  */
343 static struct GNUNET_TIME_Relative
344 estimate_latency (struct MessageContext *mc)
345 {
346   struct FragTimes *first;
347   size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
348   double x[total];
349   double y[total];
350   size_t i;
351   double c1;
352   double cov11;
353   double sumsq;
354   struct GNUNET_TIME_Relative ret;
355
356   first = &mc->frag_times[mc->frag_times_start_offset];
357   GNUNET_assert (total > 1);
358   for (i = 0; i < total; i++)
359   {
360     x[i] = (double) i;
361     y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
362   }
363   gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
364   c1 += sqrt (sumsq);           /* add 1 std dev */
365   ret.rel_value_us = (uint64_t) c1;
366   if (0 == ret.rel_value_us)
367     ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
368   return ret;
369 }
370
371
372 /**
373  * Discard the message context that was inactive for the longest time.
374  *
375  * @param dc defragmentation context
376  */
377 static void
378 discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
379 {
380   struct MessageContext *old;
381   struct MessageContext *pos;
382
383   old = NULL;
384   pos = dc->head;
385   while (NULL != pos)
386   {
387     if ((old == NULL) ||
388         (old->last_update.abs_value_us > pos->last_update.abs_value_us))
389       old = pos;
390     pos = pos->next;
391   }
392   GNUNET_assert (NULL != old);
393   GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
394   dc->list_size--;
395   if (NULL != old->ack_task)
396   {
397     GNUNET_SCHEDULER_cancel (old->ack_task);
398     old->ack_task = NULL;
399   }
400   GNUNET_free (old);
401 }
402
403
404 /**
405  * We have received a fragment.  Process it.
406  *
407  * @param dc the context
408  * @param msg the message that was received
409  * @return #GNUNET_OK on success,
410  *         #GNUNET_NO if this was a duplicate,
411  *         #GNUNET_SYSERR on error
412  */
413 int
414 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
415                                     const struct GNUNET_MessageHeader *msg)
416 {
417   struct MessageContext *mc;
418   const struct FragmentHeader *fh;
419   uint16_t msize;
420   uint16_t foff;
421   uint32_t fid;
422   char *mbuf;
423   unsigned int bit;
424   struct GNUNET_TIME_Absolute now;
425   struct GNUNET_TIME_Relative delay;
426   unsigned int bc;
427   unsigned int b;
428   unsigned int n;
429   unsigned int num_fragments;
430   int duplicate;
431   int last;
432
433   if (ntohs (msg->size) < sizeof (struct FragmentHeader))
434   {
435     GNUNET_break_op (0);
436     return GNUNET_SYSERR;
437   }
438   if (ntohs (msg->size) > dc->mtu)
439   {
440     GNUNET_break_op (0);
441     return GNUNET_SYSERR;
442   }
443   fh = (const struct FragmentHeader *) msg;
444   msize = ntohs (fh->total_size);
445   if (msize < sizeof (struct GNUNET_MessageHeader))
446   {
447     GNUNET_break_op (0);
448     return GNUNET_SYSERR;
449   }
450   fid = ntohl (fh->fragment_id);
451   foff = ntohs (fh->offset);
452   if (foff >= msize)
453   {
454     GNUNET_break_op (0);
455     return GNUNET_SYSERR;
456   }
457   if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
458   {
459     GNUNET_break_op (0);
460     return GNUNET_SYSERR;
461   }
462   GNUNET_STATISTICS_update (dc->stats,
463                             _("# fragments received"),
464                             1,
465                             GNUNET_NO);
466   num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
467   last = 0;
468   for (mc = dc->head; NULL != mc; mc = mc->next)
469     if (mc->fragment_id > fid)
470       last++;
471
472   mc = dc->head;
473   while ((NULL != mc) && (fid != mc->fragment_id))
474     mc = mc->next;
475   bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
476   if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
477       sizeof (struct FragmentHeader) > msize)
478   {
479     /* payload extends past total message size */
480     GNUNET_break_op (0);
481     return GNUNET_SYSERR;
482   }
483   if ((NULL != mc) && (msize != mc->total_size))
484   {
485     /* inconsistent message size */
486     GNUNET_break_op (0);
487     return GNUNET_SYSERR;
488   }
489   now = GNUNET_TIME_absolute_get ();
490   if (NULL == mc)
491   {
492     mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
493     mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
494     mc->dc = dc;
495     mc->total_size = msize;
496     mc->fragment_id = fid;
497     mc->last_update = now;
498     n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
499                                                                   sizeof (struct
500                                                                           FragmentHeader));
501     if (n == 64)
502       mc->bits = UINT64_MAX;    /* set all 64 bit */
503     else
504       mc->bits = (1LLU << n) - 1;        /* set lowest 'bits' bit */
505     if (dc->list_size >= dc->num_msgs)
506       discard_oldest_mc (dc);
507     GNUNET_CONTAINER_DLL_insert (dc->head,
508                                  dc->tail,
509                                  mc);
510     dc->list_size++;
511   }
512
513   /* copy data to 'mc' */
514   if (0 != (mc->bits & (1LLU << bit)))
515   {
516     mc->bits -= 1LLU << bit;
517     mbuf = (char *) &mc[1];
518     GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
519             ntohs (msg->size) - sizeof (struct FragmentHeader));
520     mc->last_update = now;
521     if (bit < mc->last_bit)
522       mc->frag_times_start_offset = mc->frag_times_write_offset;
523     mc->last_bit = bit;
524     mc->frag_times[mc->frag_times_write_offset].time = now;
525     mc->frag_times[mc->frag_times_write_offset].bit = bit;
526     mc->frag_times_write_offset++;
527     duplicate = GNUNET_NO;
528   }
529   else
530   {
531     duplicate = GNUNET_YES;
532     GNUNET_STATISTICS_update (dc->stats,
533                               _("# duplicate fragments received"),
534                               1,
535                               GNUNET_NO);
536   }
537
538   /* count number of missing fragments after the current one */
539   bc = 0;
540   for (b = bit; b < 64; b++)
541     if (0 != (mc->bits & (1LLU << b)))
542       bc++;
543     else
544       bc = 0;
545
546   /* notify about complete message */
547   if ( (GNUNET_NO == duplicate) &&
548        (0 == mc->bits) )
549   {
550     GNUNET_STATISTICS_update (dc->stats,
551                               _("# messages defragmented"),
552                               1,
553                               GNUNET_NO);
554     /* message complete, notify! */
555     dc->proc (dc->cls, mc->msg);
556   }
557   /* send ACK */
558   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
559   {
560     dc->latency = estimate_latency (mc);
561   }
562   delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
563                                                     bc + 1);
564   if ( (last + fid == num_fragments) ||
565        (0 == mc->bits) ||
566        (GNUNET_YES == duplicate) )
567   {
568     /* message complete or duplicate or last missing fragment in
569        linear sequence; ACK now! */
570     delay = GNUNET_TIME_UNIT_ZERO;
571   }
572   if (NULL != mc->ack_task)
573     GNUNET_SCHEDULER_cancel (mc->ack_task);
574   mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
575                                                &send_ack,
576                                                mc);
577   if (GNUNET_YES == duplicate)
578   {
579     mc->last_duplicate = GNUNET_YES;
580     return GNUNET_NO;
581   }
582   return GNUNET_YES;
583 }
584
585 /* end of defragmentation.c */