error handling
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file src/fragmentation/defragmentation.c
22  * @brief library to help defragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "fragmentation.h"
28
29 /**
30  * Timestamps for fragments.
31  */
32 struct FragTimes
33 {
34   /**
35    * The time the fragment was received.
36    */
37   struct GNUNET_TIME_Absolute time;
38
39   /**
40    * Number of the bit for the fragment (in [0,..,63]).
41    */
42   unsigned int bit;
43 };
44
45
46 /**
47  * Information we keep for one message that is being assembled.  Note
48  * that we keep the context around even after the assembly is done to
49  * handle 'stray' messages that are received 'late'.  A message
50  * context is ONLY discarded when the queue gets too big.
51  */
52 struct MessageContext
53 {
54   /**
55    * This is a DLL.
56    */
57   struct MessageContext *next;
58
59   /**
60    * This is a DLL.
61    */
62   struct MessageContext *prev;
63
64   /**
65    * Associated defragmentation context.
66    */
67   struct GNUNET_DEFRAGMENT_Context *dc;
68
69   /**
70    * Pointer to the assembled message, allocated at the
71    * end of this struct.
72    */
73   const struct GNUNET_MessageHeader *msg;
74
75   /**
76    * Last time we received any update for this message
77    * (least-recently updated message will be discarded
78    * if we hit the queue size).
79    */
80   struct GNUNET_TIME_Absolute last_update;
81
82   /**
83    * Task scheduled for transmitting the next ACK to the
84    * other peer.
85    */
86   struct GNUNET_SCHEDULER_Task *ack_task;
87
88   /**
89    * When did we receive which fragment? Used to calculate
90    * the time we should send the ACK.
91    */
92   struct FragTimes frag_times[64];
93
94   /**
95    * Which fragments have we gotten yet? bits that are 1
96    * indicate missing fragments.
97    */
98   uint64_t bits;
99
100   /**
101    * Unique ID for this message.
102    */
103   uint32_t fragment_id;
104
105   /**
106    * Which 'bit' did the last fragment we received correspond to?
107    */
108   unsigned int last_bit;
109
110   /**
111    * For the current ACK round, which is the first relevant
112    * offset in @e frag_times?
113    */
114   unsigned int frag_times_start_offset;
115
116   /**
117    * Which offset whould we write the next frag value into
118    * in the @e frag_times array? All smaller entries are valid.
119    */
120   unsigned int frag_times_write_offset;
121
122   /**
123    * Total size of the message that we are assembling.
124    */
125   uint16_t total_size;
126
127   /**
128    * Was the last fragment we got a duplicate?
129    */
130   int16_t last_duplicate;
131 };
132
133
134 /**
135  * Defragmentation context (one per connection).
136  */
137 struct GNUNET_DEFRAGMENT_Context
138 {
139   /**
140    * For statistics.
141    */
142   struct GNUNET_STATISTICS_Handle *stats;
143
144   /**
145    * Head of list of messages we're defragmenting.
146    */
147   struct MessageContext *head;
148
149   /**
150    * Tail of list of messages we're defragmenting.
151    */
152   struct MessageContext *tail;
153
154   /**
155    * Closure for @e proc and @e ackp.
156    */
157   void *cls;
158
159   /**
160    * Function to call with defragmented messages.
161    */
162   GNUNET_FRAGMENT_MessageProcessor proc;
163
164   /**
165    * Function to call with acknowledgements.
166    */
167   GNUNET_DEFRAGMENT_AckProcessor ackp;
168
169   /**
170    * Running average of the latency (delay between messages) for this
171    * connection.
172    */
173   struct GNUNET_TIME_Relative latency;
174
175   /**
176    * num_msgs how many fragmented messages
177    * to we defragment at most at the same time?
178    */
179   unsigned int num_msgs;
180
181   /**
182    * Current number of messages in the 'struct MessageContext'
183    * DLL (smaller or equal to 'num_msgs').
184    */
185   unsigned int list_size;
186
187   /**
188    * Maximum message size for each fragment.
189    */
190   uint16_t mtu;
191 };
192
193
194 /**
195  * Create a defragmentation context.
196  *
197  * @param stats statistics context
198  * @param mtu the maximum message size for each fragment
199  * @param num_msgs how many fragmented messages
200  *                 to we defragment at most at the same time?
201  * @param cls closure for @a proc and @a ackp
202  * @param proc function to call with defragmented messages
203  * @param ackp function to call with acknowledgements (to send
204  *             back to the other side)
205  * @return the defragmentation context
206  */
207 struct GNUNET_DEFRAGMENT_Context *
208 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
209                                   uint16_t mtu, unsigned int num_msgs,
210                                   void *cls,
211                                   GNUNET_FRAGMENT_MessageProcessor proc,
212                                   GNUNET_DEFRAGMENT_AckProcessor ackp)
213 {
214   struct GNUNET_DEFRAGMENT_Context *dc;
215
216   dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
217   dc->stats = stats;
218   dc->cls = cls;
219   dc->proc = proc;
220   dc->ackp = ackp;
221   dc->num_msgs = num_msgs;
222   dc->mtu = mtu;
223   dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
224   return dc;
225 }
226
227
228 /**
229  * Destroy the given defragmentation context.
230  *
231  * @param dc defragmentation context
232  */
233 void
234 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
235 {
236   struct MessageContext *mc;
237
238   while (NULL != (mc = dc->head))
239   {
240     GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
241     dc->list_size--;
242     if (NULL != mc->ack_task)
243     {
244       GNUNET_SCHEDULER_cancel (mc->ack_task);
245       mc->ack_task = NULL;
246     }
247     GNUNET_free (mc);
248   }
249   GNUNET_assert (0 == dc->list_size);
250   GNUNET_free (dc);
251 }
252
253
254 /**
255  * Send acknowledgement to the other peer now.
256  *
257  * @param cls the message context
258  */
259 static void
260 send_ack (void *cls)
261 {
262   struct MessageContext *mc = cls;
263   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
264   struct FragmentAcknowledgement fa;
265
266   mc->ack_task = NULL;
267   fa.header.size = htons (sizeof(struct FragmentAcknowledgement));
268   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
269   fa.fragment_id = htonl (mc->fragment_id);
270   fa.bits = GNUNET_htonll (mc->bits);
271   GNUNET_STATISTICS_update (mc->dc->stats,
272                             _ ("# acknowledgements sent for fragment"),
273                             1,
274                             GNUNET_NO);
275   mc->last_duplicate = GNUNET_NO; /* clear flag */
276   dc->ackp (dc->cls,
277             mc->fragment_id,
278             &fa.header);
279 }
280
281
282 /**
283  * This function is from the GNU Scientific Library, linear/fit.c,
284  * Copyright (C) 2000 Brian Gough
285  */
286 static void
287 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
288              const size_t ystride, const size_t n, double *c1, double *cov_11,
289              double *sumsq)
290 {
291   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
292
293   size_t i;
294
295   for (i = 0; i < n; i++)
296   {
297     m_x += (x[i * xstride] - m_x) / (i + 1.0);
298     m_y += (y[i * ystride] - m_y) / (i + 1.0);
299   }
300
301   for (i = 0; i < n; i++)
302   {
303     const double dx = x[i * xstride] - m_x;
304     const double dy = y[i * ystride] - m_y;
305
306     m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
307     m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
308   }
309
310   /* In terms of y =  b x */
311
312   {
313     double s2 = 0, d2 = 0;
314     double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
315
316     *c1 = b;
317
318     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
319
320     for (i = 0; i < n; i++)
321     {
322       const double dx = x[i * xstride] - m_x;
323       const double dy = y[i * ystride] - m_y;
324       const double d = (m_y - b * m_x) + dy - b * dx;
325
326       d2 += d * d;
327     }
328
329     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
330
331     *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
332
333     *sumsq = d2;
334   }
335 }
336
337
338 /**
339  * Estimate the latency between messages based on the most recent
340  * message time stamps.
341  *
342  * @param mc context with time stamps
343  * @return average delay between time stamps (based on least-squares fit)
344  */
345 static struct GNUNET_TIME_Relative
346 estimate_latency (struct MessageContext *mc)
347 {
348   struct FragTimes *first;
349   size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
350   double x[total];
351   double y[total];
352   size_t i;
353   double c1;
354   double cov11;
355   double sumsq;
356   struct GNUNET_TIME_Relative ret;
357
358   first = &mc->frag_times[mc->frag_times_start_offset];
359   GNUNET_assert (total > 1);
360   for (i = 0; i < total; i++)
361   {
362     x[i] = (double) i;
363     y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
364   }
365   gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
366   c1 += sqrt (sumsq);            /* add 1 std dev */
367   ret.rel_value_us = (uint64_t) c1;
368   if (0 == ret.rel_value_us)
369     ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
370   return ret;
371 }
372
373
374 /**
375  * Discard the message context that was inactive for the longest time.
376  *
377  * @param dc defragmentation context
378  */
379 static void
380 discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
381 {
382   struct MessageContext *old;
383   struct MessageContext *pos;
384
385   old = NULL;
386   pos = dc->head;
387   while (NULL != pos)
388   {
389     if ((old == NULL) ||
390         (old->last_update.abs_value_us > pos->last_update.abs_value_us))
391       old = pos;
392     pos = pos->next;
393   }
394   GNUNET_assert (NULL != old);
395   GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
396   dc->list_size--;
397   if (NULL != old->ack_task)
398   {
399     GNUNET_SCHEDULER_cancel (old->ack_task);
400     old->ack_task = NULL;
401   }
402   GNUNET_free (old);
403 }
404
405
406 /**
407  * We have received a fragment.  Process it.
408  *
409  * @param dc the context
410  * @param msg the message that was received
411  * @return #GNUNET_OK on success,
412  *         #GNUNET_NO if this was a duplicate,
413  *         #GNUNET_SYSERR on error
414  */
415 int
416 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
417                                     const struct GNUNET_MessageHeader *msg)
418 {
419   struct MessageContext *mc;
420   const struct FragmentHeader *fh;
421   uint16_t msize;
422   uint16_t foff;
423   uint32_t fid;
424   char *mbuf;
425   unsigned int bit;
426   struct GNUNET_TIME_Absolute now;
427   struct GNUNET_TIME_Relative delay;
428   unsigned int bc;
429   unsigned int b;
430   unsigned int n;
431   unsigned int num_fragments;
432   int duplicate;
433   int last;
434
435   if (ntohs (msg->size) < sizeof(struct FragmentHeader))
436   {
437     GNUNET_break_op (0);
438     return GNUNET_SYSERR;
439   }
440   if (ntohs (msg->size) > dc->mtu)
441   {
442     GNUNET_break_op (0);
443     return GNUNET_SYSERR;
444   }
445   fh = (const struct FragmentHeader *) msg;
446   msize = ntohs (fh->total_size);
447   if (msize < sizeof(struct GNUNET_MessageHeader))
448   {
449     GNUNET_break_op (0);
450     return GNUNET_SYSERR;
451   }
452   fid = ntohl (fh->fragment_id);
453   foff = ntohs (fh->offset);
454   if (foff >= msize)
455   {
456     GNUNET_break_op (0);
457     return GNUNET_SYSERR;
458   }
459   if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader))))
460   {
461     GNUNET_break_op (0);
462     return GNUNET_SYSERR;
463   }
464   GNUNET_STATISTICS_update (dc->stats,
465                             _ ("# fragments received"),
466                             1,
467                             GNUNET_NO);
468   num_fragments = (ntohs (msg->size) + dc->mtu - sizeof(struct FragmentHeader)
469                    - 1) / (dc->mtu - sizeof(struct FragmentHeader));
470   last = 0;
471   for (mc = dc->head; NULL != mc; mc = mc->next)
472     if (mc->fragment_id > fid)
473       last++;
474
475   mc = dc->head;
476   while ((NULL != mc) && (fid != mc->fragment_id))
477     mc = mc->next;
478   bit = foff / (dc->mtu - sizeof(struct FragmentHeader));
479   if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs (msg->size)
480       - sizeof(struct FragmentHeader) > msize)
481   {
482     /* payload extends past total message size */
483     GNUNET_break_op (0);
484     return GNUNET_SYSERR;
485   }
486   if ((NULL != mc) && (msize != mc->total_size))
487   {
488     /* inconsistent message size */
489     GNUNET_break_op (0);
490     return GNUNET_SYSERR;
491   }
492   now = GNUNET_TIME_absolute_get ();
493   if (NULL == mc)
494   {
495     mc = GNUNET_malloc (sizeof(struct MessageContext) + msize);
496     mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
497     mc->dc = dc;
498     mc->total_size = msize;
499     mc->fragment_id = fid;
500     mc->last_update = now;
501     n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu
502                                                                  - sizeof(struct
503                                                                           FragmentHeader));
504     if (n == 64)
505       mc->bits = UINT64_MAX;    /* set all 64 bit */
506     else
507       mc->bits = (1LLU << n) - 1;        /* set lowest 'bits' bit */
508     if (dc->list_size >= dc->num_msgs)
509       discard_oldest_mc (dc);
510     GNUNET_CONTAINER_DLL_insert (dc->head,
511                                  dc->tail,
512                                  mc);
513     dc->list_size++;
514   }
515
516   /* copy data to 'mc' */
517   if (0 != (mc->bits & (1LLU << bit)))
518   {
519     mc->bits -= 1LLU << bit;
520     mbuf = (char *) &mc[1];
521     GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))],
522                    &fh[1],
523                    ntohs (msg->size) - sizeof(struct FragmentHeader));
524     mc->last_update = now;
525     if (bit < mc->last_bit)
526       mc->frag_times_start_offset = mc->frag_times_write_offset;
527     mc->last_bit = bit;
528     mc->frag_times[mc->frag_times_write_offset].time = now;
529     mc->frag_times[mc->frag_times_write_offset].bit = bit;
530     mc->frag_times_write_offset++;
531     duplicate = GNUNET_NO;
532   }
533   else
534   {
535     duplicate = GNUNET_YES;
536     GNUNET_STATISTICS_update (dc->stats,
537                               _ ("# duplicate fragments received"),
538                               1,
539                               GNUNET_NO);
540   }
541
542   /* count number of missing fragments after the current one */
543   bc = 0;
544   for (b = bit; b < 64; b++)
545     if (0 != (mc->bits & (1LLU << b)))
546       bc++;
547     else
548       bc = 0;
549
550   /* notify about complete message */
551   if ((GNUNET_NO == duplicate) &&
552       (0 == mc->bits))
553   {
554     GNUNET_STATISTICS_update (dc->stats,
555                               _ ("# messages defragmented"),
556                               1,
557                               GNUNET_NO);
558     /* message complete, notify! */
559     dc->proc (dc->cls, mc->msg);
560   }
561   /* send ACK */
562   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
563   {
564     dc->latency = estimate_latency (mc);
565   }
566   delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
567                                                     bc + 1);
568   if ((last + fid == num_fragments) ||
569       (0 == mc->bits) ||
570       (GNUNET_YES == duplicate))
571   {
572     /* message complete or duplicate or last missing fragment in
573        linear sequence; ACK now! */
574     delay = GNUNET_TIME_UNIT_ZERO;
575   }
576   if (NULL != mc->ack_task)
577     GNUNET_SCHEDULER_cancel (mc->ack_task);
578   mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
579                                                &send_ack,
580                                                mc);
581   if (GNUNET_YES == duplicate)
582   {
583     mc->last_duplicate = GNUNET_YES;
584     return GNUNET_NO;
585   }
586   return GNUNET_YES;
587 }
588
589
590 /* end of defragmentation.c */