paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file src/fragmentation/defragmentation.c
20  * @brief library to help defragment messages
21  * @author Christian Grothoff
22  */
23 #include "platform.h"
24 #include "gnunet_fragmentation_lib.h"
25 #include "fragmentation.h"
26
27 /**
28  * Timestamps for fragments.
29  */
30 struct FragTimes
31 {
32   /**
33    * The time the fragment was received.
34    */
35   struct GNUNET_TIME_Absolute time;
36
37   /**
38    * Number of the bit for the fragment (in [0,..,63]).
39    */
40   unsigned int bit;
41 };
42
43
44 /**
45  * Information we keep for one message that is being assembled.  Note
46  * that we keep the context around even after the assembly is done to
47  * handle 'stray' messages that are received 'late'.  A message
48  * context is ONLY discarded when the queue gets too big.
49  */
50 struct MessageContext
51 {
52   /**
53    * This is a DLL.
54    */
55   struct MessageContext *next;
56
57   /**
58    * This is a DLL.
59    */
60   struct MessageContext *prev;
61
62   /**
63    * Associated defragmentation context.
64    */
65   struct GNUNET_DEFRAGMENT_Context *dc;
66
67   /**
68    * Pointer to the assembled message, allocated at the
69    * end of this struct.
70    */
71   const struct GNUNET_MessageHeader *msg;
72
73   /**
74    * Last time we received any update for this message
75    * (least-recently updated message will be discarded
76    * if we hit the queue size).
77    */
78   struct GNUNET_TIME_Absolute last_update;
79
80   /**
81    * Task scheduled for transmitting the next ACK to the
82    * other peer.
83    */
84   struct GNUNET_SCHEDULER_Task * ack_task;
85
86   /**
87    * When did we receive which fragment? Used to calculate
88    * the time we should send the ACK.
89    */
90   struct FragTimes frag_times[64];
91
92   /**
93    * Which fragments have we gotten yet? bits that are 1
94    * indicate missing fragments.
95    */
96   uint64_t bits;
97
98   /**
99    * Unique ID for this message.
100    */
101   uint32_t fragment_id;
102
103   /**
104    * Which 'bit' did the last fragment we received correspond to?
105    */
106   unsigned int last_bit;
107
108   /**
109    * For the current ACK round, which is the first relevant
110    * offset in @e frag_times?
111    */
112   unsigned int frag_times_start_offset;
113
114   /**
115    * Which offset whould we write the next frag value into
116    * in the @e frag_times array? All smaller entries are valid.
117    */
118   unsigned int frag_times_write_offset;
119
120   /**
121    * Total size of the message that we are assembling.
122    */
123   uint16_t total_size;
124
125   /**
126    * Was the last fragment we got a duplicate?
127    */
128   int16_t last_duplicate;
129
130 };
131
132
133 /**
134  * Defragmentation context (one per connection).
135  */
136 struct GNUNET_DEFRAGMENT_Context
137 {
138
139   /**
140    * For statistics.
141    */
142   struct GNUNET_STATISTICS_Handle *stats;
143
144   /**
145    * Head of list of messages we're defragmenting.
146    */
147   struct MessageContext *head;
148
149   /**
150    * Tail of list of messages we're defragmenting.
151    */
152   struct MessageContext *tail;
153
154   /**
155    * Closure for @e proc and @e ackp.
156    */
157   void *cls;
158
159   /**
160    * Function to call with defragmented messages.
161    */
162   GNUNET_FRAGMENT_MessageProcessor proc;
163
164   /**
165    * Function to call with acknowledgements.
166    */
167   GNUNET_DEFRAGMENT_AckProcessor ackp;
168
169   /**
170    * Running average of the latency (delay between messages) for this
171    * connection.
172    */
173   struct GNUNET_TIME_Relative latency;
174
175   /**
176    * num_msgs how many fragmented messages
177    * to we defragment at most at the same time?
178    */
179   unsigned int num_msgs;
180
181   /**
182    * Current number of messages in the 'struct MessageContext'
183    * DLL (smaller or equal to 'num_msgs').
184    */
185   unsigned int list_size;
186
187   /**
188    * Maximum message size for each fragment.
189    */
190   uint16_t mtu;
191
192 };
193
194
195 /**
196  * Create a defragmentation context.
197  *
198  * @param stats statistics context
199  * @param mtu the maximum message size for each fragment
200  * @param num_msgs how many fragmented messages
201  *                 to we defragment at most at the same time?
202  * @param cls closure for @a proc and @a ackp
203  * @param proc function to call with defragmented messages
204  * @param ackp function to call with acknowledgements (to send
205  *             back to the other side)
206  * @return the defragmentation context
207  */
208 struct GNUNET_DEFRAGMENT_Context *
209 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
210                                   uint16_t mtu, unsigned int num_msgs,
211                                   void *cls,
212                                   GNUNET_FRAGMENT_MessageProcessor proc,
213                                   GNUNET_DEFRAGMENT_AckProcessor ackp)
214 {
215   struct GNUNET_DEFRAGMENT_Context *dc;
216
217   dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
218   dc->stats = stats;
219   dc->cls = cls;
220   dc->proc = proc;
221   dc->ackp = ackp;
222   dc->num_msgs = num_msgs;
223   dc->mtu = mtu;
224   dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
225   return dc;
226 }
227
228
229 /**
230  * Destroy the given defragmentation context.
231  *
232  * @param dc defragmentation context
233  */
234 void
235 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
236 {
237   struct MessageContext *mc;
238
239   while (NULL != (mc = dc->head))
240   {
241     GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
242     dc->list_size--;
243     if (NULL != mc->ack_task)
244     {
245       GNUNET_SCHEDULER_cancel (mc->ack_task);
246       mc->ack_task = NULL;
247     }
248     GNUNET_free (mc);
249   }
250   GNUNET_assert (0 == dc->list_size);
251   GNUNET_free (dc);
252 }
253
254
255 /**
256  * Send acknowledgement to the other peer now.
257  *
258  * @param cls the message context
259  */
260 static void
261 send_ack (void *cls)
262 {
263   struct MessageContext *mc = cls;
264   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
265   struct FragmentAcknowledgement fa;
266
267   mc->ack_task = NULL;
268   fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
269   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
270   fa.fragment_id = htonl (mc->fragment_id);
271   fa.bits = GNUNET_htonll (mc->bits);
272   GNUNET_STATISTICS_update (mc->dc->stats,
273                             _("# acknowledgements sent for fragment"),
274                             1,
275                             GNUNET_NO);
276   mc->last_duplicate = GNUNET_NO; /* clear flag */
277   dc->ackp (dc->cls,
278             mc->fragment_id,
279             &fa.header);
280 }
281
282
283 /**
284  * This function is from the GNU Scientific Library, linear/fit.c,
285  * Copyright (C) 2000 Brian Gough
286  */
287 static void
288 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
289              const size_t ystride, const size_t n, double *c1, double *cov_11,
290              double *sumsq)
291 {
292   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
293
294   size_t i;
295
296   for (i = 0; i < n; i++)
297   {
298     m_x += (x[i * xstride] - m_x) / (i + 1.0);
299     m_y += (y[i * ystride] - m_y) / (i + 1.0);
300   }
301
302   for (i = 0; i < n; i++)
303   {
304     const double dx = x[i * xstride] - m_x;
305     const double dy = y[i * ystride] - m_y;
306
307     m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
308     m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
309   }
310
311   /* In terms of y =  b x */
312
313   {
314     double s2 = 0, d2 = 0;
315     double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
316
317     *c1 = b;
318
319     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
320
321     for (i = 0; i < n; i++)
322     {
323       const double dx = x[i * xstride] - m_x;
324       const double dy = y[i * ystride] - m_y;
325       const double d = (m_y - b * m_x) + dy - b * dx;
326
327       d2 += d * d;
328     }
329
330     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
331
332     *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
333
334     *sumsq = d2;
335   }
336 }
337
338
339 /**
340  * Estimate the latency between messages based on the most recent
341  * message time stamps.
342  *
343  * @param mc context with time stamps
344  * @return average delay between time stamps (based on least-squares fit)
345  */
346 static struct GNUNET_TIME_Relative
347 estimate_latency (struct MessageContext *mc)
348 {
349   struct FragTimes *first;
350   size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
351   double x[total];
352   double y[total];
353   size_t i;
354   double c1;
355   double cov11;
356   double sumsq;
357   struct GNUNET_TIME_Relative ret;
358
359   first = &mc->frag_times[mc->frag_times_start_offset];
360   GNUNET_assert (total > 1);
361   for (i = 0; i < total; i++)
362   {
363     x[i] = (double) i;
364     y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
365   }
366   gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
367   c1 += sqrt (sumsq);           /* add 1 std dev */
368   ret.rel_value_us = (uint64_t) c1;
369   if (0 == ret.rel_value_us)
370     ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
371   return ret;
372 }
373
374
375 /**
376  * Discard the message context that was inactive for the longest time.
377  *
378  * @param dc defragmentation context
379  */
380 static void
381 discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
382 {
383   struct MessageContext *old;
384   struct MessageContext *pos;
385
386   old = NULL;
387   pos = dc->head;
388   while (NULL != pos)
389   {
390     if ((old == NULL) ||
391         (old->last_update.abs_value_us > pos->last_update.abs_value_us))
392       old = pos;
393     pos = pos->next;
394   }
395   GNUNET_assert (NULL != old);
396   GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
397   dc->list_size--;
398   if (NULL != old->ack_task)
399   {
400     GNUNET_SCHEDULER_cancel (old->ack_task);
401     old->ack_task = NULL;
402   }
403   GNUNET_free (old);
404 }
405
406
407 /**
408  * We have received a fragment.  Process it.
409  *
410  * @param dc the context
411  * @param msg the message that was received
412  * @return #GNUNET_OK on success,
413  *         #GNUNET_NO if this was a duplicate,
414  *         #GNUNET_SYSERR on error
415  */
416 int
417 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
418                                     const struct GNUNET_MessageHeader *msg)
419 {
420   struct MessageContext *mc;
421   const struct FragmentHeader *fh;
422   uint16_t msize;
423   uint16_t foff;
424   uint32_t fid;
425   char *mbuf;
426   unsigned int bit;
427   struct GNUNET_TIME_Absolute now;
428   struct GNUNET_TIME_Relative delay;
429   unsigned int bc;
430   unsigned int b;
431   unsigned int n;
432   unsigned int num_fragments;
433   int duplicate;
434   int last;
435
436   if (ntohs (msg->size) < sizeof (struct FragmentHeader))
437   {
438     GNUNET_break_op (0);
439     return GNUNET_SYSERR;
440   }
441   if (ntohs (msg->size) > dc->mtu)
442   {
443     GNUNET_break_op (0);
444     return GNUNET_SYSERR;
445   }
446   fh = (const struct FragmentHeader *) msg;
447   msize = ntohs (fh->total_size);
448   if (msize < sizeof (struct GNUNET_MessageHeader))
449   {
450     GNUNET_break_op (0);
451     return GNUNET_SYSERR;
452   }
453   fid = ntohl (fh->fragment_id);
454   foff = ntohs (fh->offset);
455   if (foff >= msize)
456   {
457     GNUNET_break_op (0);
458     return GNUNET_SYSERR;
459   }
460   if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
461   {
462     GNUNET_break_op (0);
463     return GNUNET_SYSERR;
464   }
465   GNUNET_STATISTICS_update (dc->stats,
466                             _("# fragments received"),
467                             1,
468                             GNUNET_NO);
469   num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
470   last = 0;
471   for (mc = dc->head; NULL != mc; mc = mc->next)
472     if (mc->fragment_id > fid)
473       last++;
474
475   mc = dc->head;
476   while ((NULL != mc) && (fid != mc->fragment_id))
477     mc = mc->next;
478   bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
479   if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
480       sizeof (struct FragmentHeader) > msize)
481   {
482     /* payload extends past total message size */
483     GNUNET_break_op (0);
484     return GNUNET_SYSERR;
485   }
486   if ((NULL != mc) && (msize != mc->total_size))
487   {
488     /* inconsistent message size */
489     GNUNET_break_op (0);
490     return GNUNET_SYSERR;
491   }
492   now = GNUNET_TIME_absolute_get ();
493   if (NULL == mc)
494   {
495     mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
496     mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
497     mc->dc = dc;
498     mc->total_size = msize;
499     mc->fragment_id = fid;
500     mc->last_update = now;
501     n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
502                                                                   sizeof (struct
503                                                                           FragmentHeader));
504     if (n == 64)
505       mc->bits = UINT64_MAX;    /* set all 64 bit */
506     else
507       mc->bits = (1LLU << n) - 1;        /* set lowest 'bits' bit */
508     if (dc->list_size >= dc->num_msgs)
509       discard_oldest_mc (dc);
510     GNUNET_CONTAINER_DLL_insert (dc->head,
511                                  dc->tail,
512                                  mc);
513     dc->list_size++;
514   }
515
516   /* copy data to 'mc' */
517   if (0 != (mc->bits & (1LLU << bit)))
518   {
519     mc->bits -= 1LLU << bit;
520     mbuf = (char *) &mc[1];
521     GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
522             ntohs (msg->size) - sizeof (struct FragmentHeader));
523     mc->last_update = now;
524     if (bit < mc->last_bit)
525       mc->frag_times_start_offset = mc->frag_times_write_offset;
526     mc->last_bit = bit;
527     mc->frag_times[mc->frag_times_write_offset].time = now;
528     mc->frag_times[mc->frag_times_write_offset].bit = bit;
529     mc->frag_times_write_offset++;
530     duplicate = GNUNET_NO;
531   }
532   else
533   {
534     duplicate = GNUNET_YES;
535     GNUNET_STATISTICS_update (dc->stats,
536                               _("# duplicate fragments received"),
537                               1,
538                               GNUNET_NO);
539   }
540
541   /* count number of missing fragments after the current one */
542   bc = 0;
543   for (b = bit; b < 64; b++)
544     if (0 != (mc->bits & (1LLU << b)))
545       bc++;
546     else
547       bc = 0;
548
549   /* notify about complete message */
550   if ( (GNUNET_NO == duplicate) &&
551        (0 == mc->bits) )
552   {
553     GNUNET_STATISTICS_update (dc->stats,
554                               _("# messages defragmented"),
555                               1,
556                               GNUNET_NO);
557     /* message complete, notify! */
558     dc->proc (dc->cls, mc->msg);
559   }
560   /* send ACK */
561   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
562   {
563     dc->latency = estimate_latency (mc);
564   }
565   delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
566                                                     bc + 1);
567   if ( (last + fid == num_fragments) ||
568        (0 == mc->bits) ||
569        (GNUNET_YES == duplicate) )
570   {
571     /* message complete or duplicate or last missing fragment in
572        linear sequence; ACK now! */
573     delay = GNUNET_TIME_UNIT_ZERO;
574   }
575   if (NULL != mc->ack_task)
576     GNUNET_SCHEDULER_cancel (mc->ack_task);
577   mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
578                                                &send_ack,
579                                                mc);
580   if (GNUNET_YES == duplicate)
581   {
582     mc->last_duplicate = GNUNET_YES;
583     return GNUNET_NO;
584   }
585   return GNUNET_YES;
586 }
587
588 /* end of defragmentation.c */