a890842873f7dbfbfefa2ccc016f27239a3c9036
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/defragmentation.c
22  * @brief library to help defragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "fragmentation.h"
28
29 /**
30  * Timestamps for fragments.
31  */
32 struct FragTimes
33 {
34   /**
35    * The time the fragment was received.
36    */
37   struct GNUNET_TIME_Absolute time;
38
39   /**
40    * Number of the bit for the fragment (in [0,..,63]).
41    */
42   unsigned int bit;
43 };
44
45
46 /**
47  * Information we keep for one message that is being assembled.  Note
48  * that we keep the context around even after the assembly is done to
49  * handle 'stray' messages that are received 'late'.  A message
50  * context is ONLY discarded when the queue gets too big.
51  */
52 struct MessageContext
53 {
54   /**
55    * This is a DLL.
56    */
57   struct MessageContext *next;
58
59   /**
60    * This is a DLL.
61    */
62   struct MessageContext *prev;
63
64   /**
65    * Associated defragmentation context.
66    */
67   struct GNUNET_DEFRAGMENT_Context *dc;
68
69   /**
70    * Pointer to the assembled message, allocated at the
71    * end of this struct.
72    */
73   const struct GNUNET_MessageHeader *msg;
74
75   /**
76    * Last time we received any update for this message
77    * (least-recently updated message will be discarded
78    * if we hit the queue size).
79    */
80   struct GNUNET_TIME_Absolute last_update;
81
82   /**
83    * Task scheduled for transmitting the next ACK to the
84    * other peer.
85    */
86   struct GNUNET_SCHEDULER_Task * ack_task;
87
88   /**
89    * When did we receive which fragment? Used to calculate
90    * the time we should send the ACK.
91    */
92   struct FragTimes frag_times[64];
93
94   /**
95    * Which fragments have we gotten yet? bits that are 1
96    * indicate missing fragments.
97    */
98   uint64_t bits;
99
100   /**
101    * Unique ID for this message.
102    */
103   uint32_t fragment_id;
104
105   /**
106    * Which 'bit' did the last fragment we received correspond to?
107    */
108   unsigned int last_bit;
109
110   /**
111    * For the current ACK round, which is the first relevant
112    * offset in @e frag_times?
113    */
114   unsigned int frag_times_start_offset;
115
116   /**
117    * Which offset whould we write the next frag value into
118    * in the @e frag_times array? All smaller entries are valid.
119    */
120   unsigned int frag_times_write_offset;
121
122   /**
123    * Total size of the message that we are assembling.
124    */
125   uint16_t total_size;
126
127   /**
128    * Was the last fragment we got a duplicate?
129    */
130   int16_t last_duplicate;
131
132 };
133
134
135 /**
136  * Defragmentation context (one per connection).
137  */
138 struct GNUNET_DEFRAGMENT_Context
139 {
140
141   /**
142    * For statistics.
143    */
144   struct GNUNET_STATISTICS_Handle *stats;
145
146   /**
147    * Head of list of messages we're defragmenting.
148    */
149   struct MessageContext *head;
150
151   /**
152    * Tail of list of messages we're defragmenting.
153    */
154   struct MessageContext *tail;
155
156   /**
157    * Closure for @e proc and @e ackp.
158    */
159   void *cls;
160
161   /**
162    * Function to call with defragmented messages.
163    */
164   GNUNET_FRAGMENT_MessageProcessor proc;
165
166   /**
167    * Function to call with acknowledgements.
168    */
169   GNUNET_DEFRAGMENT_AckProcessor ackp;
170
171   /**
172    * Running average of the latency (delay between messages) for this
173    * connection.
174    */
175   struct GNUNET_TIME_Relative latency;
176
177   /**
178    * num_msgs how many fragmented messages
179    * to we defragment at most at the same time?
180    */
181   unsigned int num_msgs;
182
183   /**
184    * Current number of messages in the 'struct MessageContext'
185    * DLL (smaller or equal to 'num_msgs').
186    */
187   unsigned int list_size;
188
189   /**
190    * Maximum message size for each fragment.
191    */
192   uint16_t mtu;
193
194 };
195
196
197 /**
198  * Create a defragmentation context.
199  *
200  * @param stats statistics context
201  * @param mtu the maximum message size for each fragment
202  * @param num_msgs how many fragmented messages
203  *                 to we defragment at most at the same time?
204  * @param cls closure for @a proc and @a ackp
205  * @param proc function to call with defragmented messages
206  * @param ackp function to call with acknowledgements (to send
207  *             back to the other side)
208  * @return the defragmentation context
209  */
210 struct GNUNET_DEFRAGMENT_Context *
211 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
212                                   uint16_t mtu, unsigned int num_msgs,
213                                   void *cls,
214                                   GNUNET_FRAGMENT_MessageProcessor proc,
215                                   GNUNET_DEFRAGMENT_AckProcessor ackp)
216 {
217   struct GNUNET_DEFRAGMENT_Context *dc;
218
219   dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
220   dc->stats = stats;
221   dc->cls = cls;
222   dc->proc = proc;
223   dc->ackp = ackp;
224   dc->num_msgs = num_msgs;
225   dc->mtu = mtu;
226   dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
227   return dc;
228 }
229
230
231 /**
232  * Destroy the given defragmentation context.
233  *
234  * @param dc defragmentation context
235  */
236 void
237 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
238 {
239   struct MessageContext *mc;
240
241   while (NULL != (mc = dc->head))
242   {
243     GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
244     dc->list_size--;
245     if (NULL != mc->ack_task)
246     {
247       GNUNET_SCHEDULER_cancel (mc->ack_task);
248       mc->ack_task = NULL;
249     }
250     GNUNET_free (mc);
251   }
252   GNUNET_assert (0 == dc->list_size);
253   GNUNET_free (dc);
254 }
255
256
257 /**
258  * Send acknowledgement to the other peer now.
259  *
260  * @param cls the message context
261  * @param tc the scheduler context
262  */
263 static void
264 send_ack (void *cls,
265           const struct GNUNET_SCHEDULER_TaskContext *tc)
266 {
267   struct MessageContext *mc = cls;
268   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
269   struct FragmentAcknowledgement fa;
270
271   mc->ack_task = NULL;
272   fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
273   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
274   fa.fragment_id = htonl (mc->fragment_id);
275   fa.bits = GNUNET_htonll (mc->bits);
276   GNUNET_STATISTICS_update (mc->dc->stats,
277                             _("# acknowledgements sent for fragment"),
278                             1,
279                             GNUNET_NO);
280   mc->last_duplicate = GNUNET_NO; /* clear flag */
281   dc->ackp (dc->cls,
282             mc->fragment_id,
283             &fa.header);
284 }
285
286
287 /**
288  * This function is from the GNU Scientific Library, linear/fit.c,
289  * Copyright (C) 2000 Brian Gough
290  */
291 static void
292 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
293              const size_t ystride, const size_t n, double *c1, double *cov_11,
294              double *sumsq)
295 {
296   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
297
298   size_t i;
299
300   for (i = 0; i < n; i++)
301   {
302     m_x += (x[i * xstride] - m_x) / (i + 1.0);
303     m_y += (y[i * ystride] - m_y) / (i + 1.0);
304   }
305
306   for (i = 0; i < n; i++)
307   {
308     const double dx = x[i * xstride] - m_x;
309     const double dy = y[i * ystride] - m_y;
310
311     m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
312     m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
313   }
314
315   /* In terms of y =  b x */
316
317   {
318     double s2 = 0, d2 = 0;
319     double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
320
321     *c1 = b;
322
323     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
324
325     for (i = 0; i < n; i++)
326     {
327       const double dx = x[i * xstride] - m_x;
328       const double dy = y[i * ystride] - m_y;
329       const double d = (m_y - b * m_x) + dy - b * dx;
330
331       d2 += d * d;
332     }
333
334     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
335
336     *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
337
338     *sumsq = d2;
339   }
340 }
341
342
343 /**
344  * Estimate the latency between messages based on the most recent
345  * message time stamps.
346  *
347  * @param mc context with time stamps
348  * @return average delay between time stamps (based on least-squares fit)
349  */
350 static struct GNUNET_TIME_Relative
351 estimate_latency (struct MessageContext *mc)
352 {
353   struct FragTimes *first;
354   size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
355   double x[total];
356   double y[total];
357   size_t i;
358   double c1;
359   double cov11;
360   double sumsq;
361   struct GNUNET_TIME_Relative ret;
362
363   first = &mc->frag_times[mc->frag_times_start_offset];
364   GNUNET_assert (total > 1);
365   for (i = 0; i < total; i++)
366   {
367     x[i] = (double) i;
368     y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
369   }
370   gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
371   c1 += sqrt (sumsq);           /* add 1 std dev */
372   ret.rel_value_us = (uint64_t) c1;
373   if (0 == ret.rel_value_us)
374     ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
375   return ret;
376 }
377
378
379 /**
380  * Discard the message context that was inactive for the longest time.
381  *
382  * @param dc defragmentation context
383  */
384 static void
385 discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
386 {
387   struct MessageContext *old;
388   struct MessageContext *pos;
389
390   old = NULL;
391   pos = dc->head;
392   while (NULL != pos)
393   {
394     if ((old == NULL) ||
395         (old->last_update.abs_value_us > pos->last_update.abs_value_us))
396       old = pos;
397     pos = pos->next;
398   }
399   GNUNET_assert (NULL != old);
400   GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
401   dc->list_size--;
402   if (NULL != old->ack_task)
403   {
404     GNUNET_SCHEDULER_cancel (old->ack_task);
405     old->ack_task = NULL;
406   }
407   GNUNET_free (old);
408 }
409
410
411 /**
412  * We have received a fragment.  Process it.
413  *
414  * @param dc the context
415  * @param msg the message that was received
416  * @return #GNUNET_OK on success,
417  *         #GNUNET_NO if this was a duplicate,
418  *         #GNUNET_SYSERR on error
419  */
420 int
421 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
422                                     const struct GNUNET_MessageHeader *msg)
423 {
424   struct MessageContext *mc;
425   const struct FragmentHeader *fh;
426   uint16_t msize;
427   uint16_t foff;
428   uint32_t fid;
429   char *mbuf;
430   unsigned int bit;
431   struct GNUNET_TIME_Absolute now;
432   struct GNUNET_TIME_Relative delay;
433   unsigned int bc;
434   unsigned int b;
435   unsigned int n;
436   unsigned int num_fragments;
437   int duplicate;
438   int last;
439
440   if (ntohs (msg->size) < sizeof (struct FragmentHeader))
441   {
442     GNUNET_break_op (0);
443     return GNUNET_SYSERR;
444   }
445   if (ntohs (msg->size) > dc->mtu)
446   {
447     GNUNET_break_op (0);
448     return GNUNET_SYSERR;
449   }
450   fh = (const struct FragmentHeader *) msg;
451   msize = ntohs (fh->total_size);
452   if (msize < sizeof (struct GNUNET_MessageHeader))
453   {
454     GNUNET_break_op (0);
455     return GNUNET_SYSERR;
456   }
457   fid = ntohl (fh->fragment_id);
458   foff = ntohs (fh->offset);
459   if (foff >= msize)
460   {
461     GNUNET_break_op (0);
462     return GNUNET_SYSERR;
463   }
464   if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
465   {
466     GNUNET_break_op (0);
467     return GNUNET_SYSERR;
468   }
469   GNUNET_STATISTICS_update (dc->stats,
470                             _("# fragments received"),
471                             1,
472                             GNUNET_NO);
473   num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
474   last = 0;
475   for (mc = dc->head; NULL != mc; mc = mc->next)
476     if (mc->fragment_id > fid)
477       last++;
478
479   mc = dc->head;
480   while ((NULL != mc) && (fid != mc->fragment_id))
481     mc = mc->next;
482   bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
483   if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
484       sizeof (struct FragmentHeader) > msize)
485   {
486     /* payload extends past total message size */
487     GNUNET_break_op (0);
488     return GNUNET_SYSERR;
489   }
490   if ((NULL != mc) && (msize != mc->total_size))
491   {
492     /* inconsistent message size */
493     GNUNET_break_op (0);
494     return GNUNET_SYSERR;
495   }
496   now = GNUNET_TIME_absolute_get ();
497   if (NULL == mc)
498   {
499     mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
500     mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
501     mc->dc = dc;
502     mc->total_size = msize;
503     mc->fragment_id = fid;
504     mc->last_update = now;
505     n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
506                                                                   sizeof (struct
507                                                                           FragmentHeader));
508     if (n == 64)
509       mc->bits = UINT64_MAX;    /* set all 64 bit */
510     else
511       mc->bits = (1LL << n) - 1;        /* set lowest 'bits' bit */
512     if (dc->list_size >= dc->num_msgs)
513       discard_oldest_mc (dc);
514     GNUNET_CONTAINER_DLL_insert (dc->head,
515                                  dc->tail,
516                                  mc);
517     dc->list_size++;
518   }
519
520   /* copy data to 'mc' */
521   if (0 != (mc->bits & (1LL << bit)))
522   {
523     mc->bits -= 1LL << bit;
524     mbuf = (char *) &mc[1];
525     memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
526             ntohs (msg->size) - sizeof (struct FragmentHeader));
527     mc->last_update = now;
528     if (bit < mc->last_bit)
529       mc->frag_times_start_offset = mc->frag_times_write_offset;
530     mc->last_bit = bit;
531     mc->frag_times[mc->frag_times_write_offset].time = now;
532     mc->frag_times[mc->frag_times_write_offset].bit = bit;
533     mc->frag_times_write_offset++;
534     duplicate = GNUNET_NO;
535   }
536   else
537   {
538     duplicate = GNUNET_YES;
539     GNUNET_STATISTICS_update (dc->stats,
540                               _("# duplicate fragments received"),
541                               1,
542                               GNUNET_NO);
543   }
544
545   /* count number of missing fragments after the current one */
546   bc = 0;
547   for (b = bit; b < 64; b++)
548     if (0 != (mc->bits & (1LL << b)))
549       bc++;
550     else
551       bc = 0;
552
553   /* notify about complete message */
554   if ( (GNUNET_NO == duplicate) &&
555        (0 == mc->bits) )
556   {
557     GNUNET_STATISTICS_update (dc->stats,
558                               _("# messages defragmented"),
559                               1,
560                               GNUNET_NO);
561     /* message complete, notify! */
562     dc->proc (dc->cls, mc->msg);
563   }
564   /* send ACK */
565   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
566   {
567     dc->latency = estimate_latency (mc);
568   }
569   delay = GNUNET_TIME_relative_multiply (dc->latency,
570                                          bc + 1);
571   if ( (last + fid == num_fragments) ||
572        (0 == mc->bits) ||
573        (GNUNET_YES == duplicate) )
574   {
575     /* message complete or duplicate or last missing fragment in
576        linear sequence; ACK now! */
577     delay = GNUNET_TIME_UNIT_ZERO;
578   }
579   if (NULL != mc->ack_task)
580     GNUNET_SCHEDULER_cancel (mc->ack_task);
581   mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
582                                                &send_ack,
583                                                mc);
584   if (GNUNET_YES == duplicate)
585   {
586     mc->last_duplicate = GNUNET_YES;
587     return GNUNET_NO;
588   }
589   return GNUNET_YES;
590 }
591
592 /* end of defragmentation.c */