changing time measurement from milliseconds to microseconds
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/defragmentation.c
22  * @brief library to help defragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "fragmentation.h"
28
29 /**
30  * Timestamps for fragments.
31  */
32 struct FragTimes
33 {
34   /**
35    * The time the fragment was received.
36    */
37   struct GNUNET_TIME_Absolute time;
38
39   /**
40    * Number of the bit for the fragment (in [0,..,63]).
41    */
42   unsigned int bit;
43 };
44
45
46 /**
47  * Information we keep for one message that is being assembled.  Note
48  * that we keep the context around even after the assembly is done to
49  * handle 'stray' messages that are received 'late'.  A message
50  * context is ONLY discarded when the queue gets too big.
51  */
52 struct MessageContext
53 {
54   /**
55    * This is a DLL.
56    */
57   struct MessageContext *next;
58
59   /**
60    * This is a DLL.
61    */
62   struct MessageContext *prev;
63
64   /**
65    * Associated defragmentation context.
66    */
67   struct GNUNET_DEFRAGMENT_Context *dc;
68
69   /**
70    * Pointer to the assembled message, allocated at the
71    * end of this struct.
72    */
73   const struct GNUNET_MessageHeader *msg;
74
75   /**
76    * Last time we received any update for this message
77    * (least-recently updated message will be discarded
78    * if we hit the queue size).
79    */
80   struct GNUNET_TIME_Absolute last_update;
81
82   /**
83    * Task scheduled for transmitting the next ACK to the
84    * other peer.
85    */
86   GNUNET_SCHEDULER_TaskIdentifier ack_task;
87
88   /**
89    * When did we receive which fragment? Used to calculate
90    * the time we should send the ACK.
91    */
92   struct FragTimes frag_times[64];
93
94   /**
95    * Which fragments have we gotten yet? bits that are 1
96    * indicate missing fragments.
97    */
98   uint64_t bits;
99
100   /**
101    * Unique ID for this message.
102    */
103   uint32_t fragment_id;
104
105   /**
106    * Which 'bit' did the last fragment we received correspond to?
107    */
108   unsigned int last_bit;
109
110   /**
111    * For the current ACK round, which is the first relevant
112    * offset in 'frag_times'?
113    */
114   unsigned int frag_times_start_offset;
115
116   /**
117    * Which offset whould we write the next frag value into
118    * in the 'frag_times' array? All smaller entries are valid.
119    */
120   unsigned int frag_times_write_offset;
121
122   /**
123    * Total size of the message that we are assembling.
124    */
125   uint16_t total_size;
126
127 };
128
129
130 /**
131  * Defragmentation context (one per connection).
132  */
133 struct GNUNET_DEFRAGMENT_Context
134 {
135
136   /**
137    * For statistics.
138    */
139   struct GNUNET_STATISTICS_Handle *stats;
140
141   /**
142    * Head of list of messages we're defragmenting.
143    */
144   struct MessageContext *head;
145
146   /**
147    * Tail of list of messages we're defragmenting.
148    */
149   struct MessageContext *tail;
150
151   /**
152    * Closure for 'proc' and 'ackp'.
153    */
154   void *cls;
155
156   /**
157    * Function to call with defragmented messages.
158    */
159   GNUNET_FRAGMENT_MessageProcessor proc;
160
161   /**
162    * Function to call with acknowledgements.
163    */
164   GNUNET_DEFRAGMENT_AckProcessor ackp;
165
166   /**
167    * Running average of the latency (delay between messages) for this
168    * connection.
169    */
170   struct GNUNET_TIME_Relative latency;
171
172   /**
173    * num_msgs how many fragmented messages
174    * to we defragment at most at the same time?
175    */
176   unsigned int num_msgs;
177
178   /**
179    * Current number of messages in the 'struct MessageContext'
180    * DLL (smaller or equal to 'num_msgs').
181    */
182   unsigned int list_size;
183
184   /**
185    * Maximum message size for each fragment.
186    */
187   uint16_t mtu;
188 };
189
190
191 /**
192  * Create a defragmentation context.
193  *
194  * @param stats statistics context
195  * @param mtu the maximum message size for each fragment
196  * @param num_msgs how many fragmented messages
197  *                 to we defragment at most at the same time?
198  * @param cls closure for proc and ackp
199  * @param proc function to call with defragmented messages
200  * @param ackp function to call with acknowledgements (to send
201  *             back to the other side)
202  * @return the defragmentation context
203  */
204 struct GNUNET_DEFRAGMENT_Context *
205 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
206                                   uint16_t mtu, unsigned int num_msgs,
207                                   void *cls,
208                                   GNUNET_FRAGMENT_MessageProcessor proc,
209                                   GNUNET_DEFRAGMENT_AckProcessor ackp)
210 {
211   struct GNUNET_DEFRAGMENT_Context *dc;
212
213   dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
214   dc->stats = stats;
215   dc->cls = cls;
216   dc->proc = proc;
217   dc->ackp = ackp;
218   dc->num_msgs = num_msgs;
219   dc->mtu = mtu;
220   dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
221   return dc;
222 }
223
224
225 /**
226  * Destroy the given defragmentation context.
227  *
228  * @param dc defragmentation context
229  */
230 void
231 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
232 {
233   struct MessageContext *mc;
234
235   while (NULL != (mc = dc->head))
236   {
237     GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
238     dc->list_size--;
239     if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
240     {
241       GNUNET_SCHEDULER_cancel (mc->ack_task);
242       mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
243     }
244     GNUNET_free (mc);
245   }
246   GNUNET_assert (0 == dc->list_size);
247   GNUNET_free (dc);
248 }
249
250
251 /**
252  * Send acknowledgement to the other peer now.
253  *
254  * @param cls the message context
255  * @param tc the scheduler context
256  */
257 static void
258 send_ack (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
259 {
260   struct MessageContext *mc = cls;
261   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
262   struct FragmentAcknowledgement fa;
263
264   mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
265   fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
266   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
267   fa.fragment_id = htonl (mc->fragment_id);
268   fa.bits = GNUNET_htonll (mc->bits);
269   GNUNET_STATISTICS_update (mc->dc->stats,
270                             _("# acknowledgements sent for fragment"), 1,
271                             GNUNET_NO);
272   dc->ackp (dc->cls, mc->fragment_id, &fa.header);
273 }
274
275
276 /**
277  * This function is from the GNU Scientific Library, linear/fit.c,
278  * (C) 2000 Brian Gough
279  */
280 static void
281 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
282              const size_t ystride, const size_t n, double *c1, double *cov_11,
283              double *sumsq)
284 {
285   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
286
287   size_t i;
288
289   for (i = 0; i < n; i++)
290   {
291     m_x += (x[i * xstride] - m_x) / (i + 1.0);
292     m_y += (y[i * ystride] - m_y) / (i + 1.0);
293   }
294
295   for (i = 0; i < n; i++)
296   {
297     const double dx = x[i * xstride] - m_x;
298     const double dy = y[i * ystride] - m_y;
299
300     m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
301     m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
302   }
303
304   /* In terms of y =  b x */
305
306   {
307     double s2 = 0, d2 = 0;
308     double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
309
310     *c1 = b;
311
312     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
313
314     for (i = 0; i < n; i++)
315     {
316       const double dx = x[i * xstride] - m_x;
317       const double dy = y[i * ystride] - m_y;
318       const double d = (m_y - b * m_x) + dy - b * dx;
319
320       d2 += d * d;
321     }
322
323     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
324
325     *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
326
327     *sumsq = d2;
328   }
329 }
330
331
332 /**
333  * Estimate the latency between messages based on the most recent
334  * message time stamps.
335  *
336  * @param mc context with time stamps
337  * @return average delay between time stamps (based on least-squares fit)
338  */
339 static struct GNUNET_TIME_Relative
340 estimate_latency (struct MessageContext *mc)
341 {
342   struct FragTimes *first;
343   size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
344   double x[total];
345   double y[total];
346   size_t i;
347   double c1;
348   double cov11;
349   double sumsq;
350   struct GNUNET_TIME_Relative ret;
351
352   first = &mc->frag_times[mc->frag_times_start_offset];
353   GNUNET_assert (total > 1);
354   for (i = 0; i < total; i++)
355   {
356     x[i] = (double) i;
357     y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
358   }
359   gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
360   c1 += sqrt (sumsq);           /* add 1 std dev */
361   ret.rel_value_us = (uint64_t) c1;
362   if (0 == ret.rel_value_us)
363     ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
364   return ret;
365 }
366
367
368 /**
369  * Discard the message context that was inactive for the longest time.
370  *
371  * @param dc defragmentation context
372  */
373 static void
374 discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
375 {
376   struct MessageContext *old;
377   struct MessageContext *pos;
378
379   old = NULL;
380   pos = dc->head;
381   while (NULL != pos)
382   {
383     if ((old == NULL) ||
384         (old->last_update.abs_value_us > pos->last_update.abs_value_us))
385       old = pos;
386     pos = pos->next;
387   }
388   GNUNET_assert (NULL != old);
389   GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
390   dc->list_size--;
391   if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
392   {
393     GNUNET_SCHEDULER_cancel (old->ack_task);
394     old->ack_task = GNUNET_SCHEDULER_NO_TASK;
395   }
396   GNUNET_free (old);
397 }
398
399
400 /**
401  * We have received a fragment.  Process it.
402  *
403  * @param dc the context
404  * @param msg the message that was received
405  * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
406  */
407 int
408 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
409                                     const struct GNUNET_MessageHeader *msg)
410 {
411   struct MessageContext *mc;
412   const struct FragmentHeader *fh;
413   uint16_t msize;
414   uint16_t foff;
415   uint32_t fid;
416   char *mbuf;
417   unsigned int bit;
418   struct GNUNET_TIME_Absolute now;
419   struct GNUNET_TIME_Relative delay;
420   unsigned int bc;
421   unsigned int b;
422   unsigned int n;
423   unsigned int num_fragments;
424   int duplicate;
425   int last;
426
427   if (ntohs (msg->size) < sizeof (struct FragmentHeader))
428   {
429     GNUNET_break_op (0);
430     return GNUNET_SYSERR;
431   }
432   if (ntohs (msg->size) > dc->mtu)
433   {
434     GNUNET_break_op (0);
435     return GNUNET_SYSERR;
436   }
437   fh = (const struct FragmentHeader *) msg;
438   msize = ntohs (fh->total_size);
439   if (msize < sizeof (struct GNUNET_MessageHeader))
440   {
441     GNUNET_break_op (0);
442     return GNUNET_SYSERR;
443   }
444   fid = ntohl (fh->fragment_id);
445   foff = ntohs (fh->offset);
446   if (foff >= msize)
447   {
448     GNUNET_break_op (0);
449     return GNUNET_SYSERR;
450   }
451   if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
452   {
453     GNUNET_break_op (0);
454     return GNUNET_SYSERR;
455   }
456   GNUNET_STATISTICS_update (dc->stats, _("# fragments received"), 1, GNUNET_NO);
457   num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
458   last = 0;
459   for (mc = dc->head; NULL != mc; mc = mc->next)
460     if (mc->fragment_id > fid)
461       last++;
462   
463   mc = dc->head;
464   while ((NULL != mc) && (fid != mc->fragment_id))
465     mc = mc->next;
466   bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
467   if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
468       sizeof (struct FragmentHeader) > msize)
469   {
470     /* payload extends past total message size */
471     GNUNET_break_op (0);
472     return GNUNET_SYSERR;
473   }
474   if ((NULL != mc) && (msize != mc->total_size))
475   {
476     /* inconsistent message size */
477     GNUNET_break_op (0);
478     return GNUNET_SYSERR;
479   }
480   now = GNUNET_TIME_absolute_get ();
481   if (NULL == mc)
482   {
483     mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
484     mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
485     mc->dc = dc;
486     mc->total_size = msize;
487     mc->fragment_id = fid;
488     mc->last_update = now;
489     n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
490                                                                   sizeof (struct
491                                                                           FragmentHeader));
492     if (n == 64)
493       mc->bits = UINT64_MAX;    /* set all 64 bit */
494     else
495       mc->bits = (1LL << n) - 1;        /* set lowest 'bits' bit */
496     if (dc->list_size >= dc->num_msgs)
497       discard_oldest_mc (dc);
498     GNUNET_CONTAINER_DLL_insert (dc->head, dc->tail, mc);
499     dc->list_size++;
500   }
501
502   /* copy data to 'mc' */
503   if (0 != (mc->bits & (1LL << bit)))
504   {
505     mc->bits -= 1LL << bit;
506     mbuf = (char *) &mc[1];
507     memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
508             ntohs (msg->size) - sizeof (struct FragmentHeader));
509     mc->last_update = now;
510     if (bit < mc->last_bit)
511       mc->frag_times_start_offset = mc->frag_times_write_offset;
512     mc->last_bit = bit;
513     mc->frag_times[mc->frag_times_write_offset].time = now;
514     mc->frag_times[mc->frag_times_write_offset].bit = bit;
515     mc->frag_times_write_offset++;
516     duplicate = GNUNET_NO;
517   }
518   else
519   {
520     duplicate = GNUNET_YES;
521     GNUNET_STATISTICS_update (dc->stats, _("# duplicate fragments received"), 1,
522                               GNUNET_NO);
523   }
524
525   /* count number of missing fragments */
526   bc = 0;
527   for (b = 0; b < 64; b++)
528     if (0 != (mc->bits & (1LL << b)))
529       bc++;
530
531   /* notify about complete message */
532   if ((duplicate == GNUNET_NO) && (0 == mc->bits))
533   {
534     GNUNET_STATISTICS_update (dc->stats, _("# messages defragmented"), 1,
535                               GNUNET_NO);
536     /* message complete, notify! */
537     dc->proc (dc->cls, mc->msg);
538   }
539   /* send ACK */
540   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
541   { 
542     dc->latency = estimate_latency (mc);
543   }
544   delay = GNUNET_TIME_relative_multiply (dc->latency, bc + 1);
545   if ( (last + fid == num_fragments) ||
546        (0 == mc->bits) || 
547        (GNUNET_YES == duplicate))     
548   {
549     /* message complete or duplicate or last missing fragment in
550        linear sequence; ACK now! */
551     delay = GNUNET_TIME_UNIT_ZERO;
552   }
553   if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
554     GNUNET_SCHEDULER_cancel (mc->ack_task);
555   mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, &send_ack, mc);
556   if (duplicate == GNUNET_YES)
557     return GNUNET_NO;
558   return GNUNET_YES;
559 }
560
561 /* end of defragmentation.c */