uncrustify as demanded.
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009, 2011 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file src/fragmentation/defragmentation.c
22  * @brief library to help defragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "fragmentation.h"
28
29 /**
30  * Timestamps for fragments.
31  */
32 struct FragTimes {
33   /**
34    * The time the fragment was received.
35    */
36   struct GNUNET_TIME_Absolute time;
37
38   /**
39    * Number of the bit for the fragment (in [0,..,63]).
40    */
41   unsigned int bit;
42 };
43
44
45 /**
46  * Information we keep for one message that is being assembled.  Note
47  * that we keep the context around even after the assembly is done to
48  * handle 'stray' messages that are received 'late'.  A message
49  * context is ONLY discarded when the queue gets too big.
50  */
51 struct MessageContext {
52   /**
53    * This is a DLL.
54    */
55   struct MessageContext *next;
56
57   /**
58    * This is a DLL.
59    */
60   struct MessageContext *prev;
61
62   /**
63    * Associated defragmentation context.
64    */
65   struct GNUNET_DEFRAGMENT_Context *dc;
66
67   /**
68    * Pointer to the assembled message, allocated at the
69    * end of this struct.
70    */
71   const struct GNUNET_MessageHeader *msg;
72
73   /**
74    * Last time we received any update for this message
75    * (least-recently updated message will be discarded
76    * if we hit the queue size).
77    */
78   struct GNUNET_TIME_Absolute last_update;
79
80   /**
81    * Task scheduled for transmitting the next ACK to the
82    * other peer.
83    */
84   struct GNUNET_SCHEDULER_Task * ack_task;
85
86   /**
87    * When did we receive which fragment? Used to calculate
88    * the time we should send the ACK.
89    */
90   struct FragTimes frag_times[64];
91
92   /**
93    * Which fragments have we gotten yet? bits that are 1
94    * indicate missing fragments.
95    */
96   uint64_t bits;
97
98   /**
99    * Unique ID for this message.
100    */
101   uint32_t fragment_id;
102
103   /**
104    * Which 'bit' did the last fragment we received correspond to?
105    */
106   unsigned int last_bit;
107
108   /**
109    * For the current ACK round, which is the first relevant
110    * offset in @e frag_times?
111    */
112   unsigned int frag_times_start_offset;
113
114   /**
115    * Which offset whould we write the next frag value into
116    * in the @e frag_times array? All smaller entries are valid.
117    */
118   unsigned int frag_times_write_offset;
119
120   /**
121    * Total size of the message that we are assembling.
122    */
123   uint16_t total_size;
124
125   /**
126    * Was the last fragment we got a duplicate?
127    */
128   int16_t last_duplicate;
129 };
130
131
132 /**
133  * Defragmentation context (one per connection).
134  */
135 struct GNUNET_DEFRAGMENT_Context {
136   /**
137    * For statistics.
138    */
139   struct GNUNET_STATISTICS_Handle *stats;
140
141   /**
142    * Head of list of messages we're defragmenting.
143    */
144   struct MessageContext *head;
145
146   /**
147    * Tail of list of messages we're defragmenting.
148    */
149   struct MessageContext *tail;
150
151   /**
152    * Closure for @e proc and @e ackp.
153    */
154   void *cls;
155
156   /**
157    * Function to call with defragmented messages.
158    */
159   GNUNET_FRAGMENT_MessageProcessor proc;
160
161   /**
162    * Function to call with acknowledgements.
163    */
164   GNUNET_DEFRAGMENT_AckProcessor ackp;
165
166   /**
167    * Running average of the latency (delay between messages) for this
168    * connection.
169    */
170   struct GNUNET_TIME_Relative latency;
171
172   /**
173    * num_msgs how many fragmented messages
174    * to we defragment at most at the same time?
175    */
176   unsigned int num_msgs;
177
178   /**
179    * Current number of messages in the 'struct MessageContext'
180    * DLL (smaller or equal to 'num_msgs').
181    */
182   unsigned int list_size;
183
184   /**
185    * Maximum message size for each fragment.
186    */
187   uint16_t mtu;
188 };
189
190
191 /**
192  * Create a defragmentation context.
193  *
194  * @param stats statistics context
195  * @param mtu the maximum message size for each fragment
196  * @param num_msgs how many fragmented messages
197  *                 to we defragment at most at the same time?
198  * @param cls closure for @a proc and @a ackp
199  * @param proc function to call with defragmented messages
200  * @param ackp function to call with acknowledgements (to send
201  *             back to the other side)
202  * @return the defragmentation context
203  */
204 struct GNUNET_DEFRAGMENT_Context *
205 GNUNET_DEFRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats,
206                                  uint16_t mtu, unsigned int num_msgs,
207                                  void *cls,
208                                  GNUNET_FRAGMENT_MessageProcessor proc,
209                                  GNUNET_DEFRAGMENT_AckProcessor ackp)
210 {
211   struct GNUNET_DEFRAGMENT_Context *dc;
212
213   dc = GNUNET_new(struct GNUNET_DEFRAGMENT_Context);
214   dc->stats = stats;
215   dc->cls = cls;
216   dc->proc = proc;
217   dc->ackp = ackp;
218   dc->num_msgs = num_msgs;
219   dc->mtu = mtu;
220   dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
221   return dc;
222 }
223
224
225 /**
226  * Destroy the given defragmentation context.
227  *
228  * @param dc defragmentation context
229  */
230 void
231 GNUNET_DEFRAGMENT_context_destroy(struct GNUNET_DEFRAGMENT_Context *dc)
232 {
233   struct MessageContext *mc;
234
235   while (NULL != (mc = dc->head))
236     {
237       GNUNET_CONTAINER_DLL_remove(dc->head, dc->tail, mc);
238       dc->list_size--;
239       if (NULL != mc->ack_task)
240         {
241           GNUNET_SCHEDULER_cancel(mc->ack_task);
242           mc->ack_task = NULL;
243         }
244       GNUNET_free(mc);
245     }
246   GNUNET_assert(0 == dc->list_size);
247   GNUNET_free(dc);
248 }
249
250
251 /**
252  * Send acknowledgement to the other peer now.
253  *
254  * @param cls the message context
255  */
256 static void
257 send_ack(void *cls)
258 {
259   struct MessageContext *mc = cls;
260   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
261   struct FragmentAcknowledgement fa;
262
263   mc->ack_task = NULL;
264   fa.header.size = htons(sizeof(struct FragmentAcknowledgement));
265   fa.header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
266   fa.fragment_id = htonl(mc->fragment_id);
267   fa.bits = GNUNET_htonll(mc->bits);
268   GNUNET_STATISTICS_update(mc->dc->stats,
269                            _("# acknowledgements sent for fragment"),
270                            1,
271                            GNUNET_NO);
272   mc->last_duplicate = GNUNET_NO; /* clear flag */
273   dc->ackp(dc->cls,
274            mc->fragment_id,
275            &fa.header);
276 }
277
278
279 /**
280  * This function is from the GNU Scientific Library, linear/fit.c,
281  * Copyright (C) 2000 Brian Gough
282  */
283 static void
284 gsl_fit_mul(const double *x, const size_t xstride, const double *y,
285             const size_t ystride, const size_t n, double *c1, double *cov_11,
286             double *sumsq)
287 {
288   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
289
290   size_t i;
291
292   for (i = 0; i < n; i++)
293     {
294       m_x += (x[i * xstride] - m_x) / (i + 1.0);
295       m_y += (y[i * ystride] - m_y) / (i + 1.0);
296     }
297
298   for (i = 0; i < n; i++)
299     {
300       const double dx = x[i * xstride] - m_x;
301       const double dy = y[i * ystride] - m_y;
302
303       m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
304       m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
305     }
306
307   /* In terms of y =  b x */
308
309   {
310     double s2 = 0, d2 = 0;
311     double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
312
313     *c1 = b;
314
315     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
316
317     for (i = 0; i < n; i++)
318       {
319         const double dx = x[i * xstride] - m_x;
320         const double dy = y[i * ystride] - m_y;
321         const double d = (m_y - b * m_x) + dy - b * dx;
322
323         d2 += d * d;
324       }
325
326     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
327
328     *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
329
330     *sumsq = d2;
331   }
332 }
333
334
335 /**
336  * Estimate the latency between messages based on the most recent
337  * message time stamps.
338  *
339  * @param mc context with time stamps
340  * @return average delay between time stamps (based on least-squares fit)
341  */
342 static struct GNUNET_TIME_Relative
343 estimate_latency(struct MessageContext *mc)
344 {
345   struct FragTimes *first;
346   size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
347   double x[total];
348   double y[total];
349   size_t i;
350   double c1;
351   double cov11;
352   double sumsq;
353   struct GNUNET_TIME_Relative ret;
354
355   first = &mc->frag_times[mc->frag_times_start_offset];
356   GNUNET_assert(total > 1);
357   for (i = 0; i < total; i++)
358     {
359       x[i] = (double)i;
360       y[i] = (double)(first[i].time.abs_value_us - first[0].time.abs_value_us);
361     }
362   gsl_fit_mul(x, 1, y, 1, total, &c1, &cov11, &sumsq);
363   c1 += sqrt(sumsq);            /* add 1 std dev */
364   ret.rel_value_us = (uint64_t)c1;
365   if (0 == ret.rel_value_us)
366     ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
367   return ret;
368 }
369
370
371 /**
372  * Discard the message context that was inactive for the longest time.
373  *
374  * @param dc defragmentation context
375  */
376 static void
377 discard_oldest_mc(struct GNUNET_DEFRAGMENT_Context *dc)
378 {
379   struct MessageContext *old;
380   struct MessageContext *pos;
381
382   old = NULL;
383   pos = dc->head;
384   while (NULL != pos)
385     {
386       if ((old == NULL) ||
387           (old->last_update.abs_value_us > pos->last_update.abs_value_us))
388         old = pos;
389       pos = pos->next;
390     }
391   GNUNET_assert(NULL != old);
392   GNUNET_CONTAINER_DLL_remove(dc->head, dc->tail, old);
393   dc->list_size--;
394   if (NULL != old->ack_task)
395     {
396       GNUNET_SCHEDULER_cancel(old->ack_task);
397       old->ack_task = NULL;
398     }
399   GNUNET_free(old);
400 }
401
402
403 /**
404  * We have received a fragment.  Process it.
405  *
406  * @param dc the context
407  * @param msg the message that was received
408  * @return #GNUNET_OK on success,
409  *         #GNUNET_NO if this was a duplicate,
410  *         #GNUNET_SYSERR on error
411  */
412 int
413 GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc,
414                                    const struct GNUNET_MessageHeader *msg)
415 {
416   struct MessageContext *mc;
417   const struct FragmentHeader *fh;
418   uint16_t msize;
419   uint16_t foff;
420   uint32_t fid;
421   char *mbuf;
422   unsigned int bit;
423   struct GNUNET_TIME_Absolute now;
424   struct GNUNET_TIME_Relative delay;
425   unsigned int bc;
426   unsigned int b;
427   unsigned int n;
428   unsigned int num_fragments;
429   int duplicate;
430   int last;
431
432   if (ntohs(msg->size) < sizeof(struct FragmentHeader))
433     {
434       GNUNET_break_op(0);
435       return GNUNET_SYSERR;
436     }
437   if (ntohs(msg->size) > dc->mtu)
438     {
439       GNUNET_break_op(0);
440       return GNUNET_SYSERR;
441     }
442   fh = (const struct FragmentHeader *)msg;
443   msize = ntohs(fh->total_size);
444   if (msize < sizeof(struct GNUNET_MessageHeader))
445     {
446       GNUNET_break_op(0);
447       return GNUNET_SYSERR;
448     }
449   fid = ntohl(fh->fragment_id);
450   foff = ntohs(fh->offset);
451   if (foff >= msize)
452     {
453       GNUNET_break_op(0);
454       return GNUNET_SYSERR;
455     }
456   if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader))))
457     {
458       GNUNET_break_op(0);
459       return GNUNET_SYSERR;
460     }
461   GNUNET_STATISTICS_update(dc->stats,
462                            _("# fragments received"),
463                            1,
464                            GNUNET_NO);
465   num_fragments = (ntohs(msg->size) + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu - sizeof(struct FragmentHeader));
466   last = 0;
467   for (mc = dc->head; NULL != mc; mc = mc->next)
468     if (mc->fragment_id > fid)
469       last++;
470
471   mc = dc->head;
472   while ((NULL != mc) && (fid != mc->fragment_id))
473     mc = mc->next;
474   bit = foff / (dc->mtu - sizeof(struct FragmentHeader));
475   if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs(msg->size) -
476       sizeof(struct FragmentHeader) > msize)
477     {
478       /* payload extends past total message size */
479       GNUNET_break_op(0);
480       return GNUNET_SYSERR;
481     }
482   if ((NULL != mc) && (msize != mc->total_size))
483     {
484       /* inconsistent message size */
485       GNUNET_break_op(0);
486       return GNUNET_SYSERR;
487     }
488   now = GNUNET_TIME_absolute_get();
489   if (NULL == mc)
490     {
491       mc = GNUNET_malloc(sizeof(struct MessageContext) + msize);
492       mc->msg = (const struct GNUNET_MessageHeader *)&mc[1];
493       mc->dc = dc;
494       mc->total_size = msize;
495       mc->fragment_id = fid;
496       mc->last_update = now;
497       n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu -
498                                                                    sizeof(struct
499                                                                           FragmentHeader));
500       if (n == 64)
501         mc->bits = UINT64_MAX;  /* set all 64 bit */
502       else
503         mc->bits = (1LLU << n) - 1;      /* set lowest 'bits' bit */
504       if (dc->list_size >= dc->num_msgs)
505         discard_oldest_mc(dc);
506       GNUNET_CONTAINER_DLL_insert(dc->head,
507                                   dc->tail,
508                                   mc);
509       dc->list_size++;
510     }
511
512   /* copy data to 'mc' */
513   if (0 != (mc->bits & (1LLU << bit)))
514     {
515       mc->bits -= 1LLU << bit;
516       mbuf = (char *)&mc[1];
517       GNUNET_memcpy(&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))], &fh[1],
518                     ntohs(msg->size) - sizeof(struct FragmentHeader));
519       mc->last_update = now;
520       if (bit < mc->last_bit)
521         mc->frag_times_start_offset = mc->frag_times_write_offset;
522       mc->last_bit = bit;
523       mc->frag_times[mc->frag_times_write_offset].time = now;
524       mc->frag_times[mc->frag_times_write_offset].bit = bit;
525       mc->frag_times_write_offset++;
526       duplicate = GNUNET_NO;
527     }
528   else
529     {
530       duplicate = GNUNET_YES;
531       GNUNET_STATISTICS_update(dc->stats,
532                                _("# duplicate fragments received"),
533                                1,
534                                GNUNET_NO);
535     }
536
537   /* count number of missing fragments after the current one */
538   bc = 0;
539   for (b = bit; b < 64; b++)
540     if (0 != (mc->bits & (1LLU << b)))
541       bc++;
542     else
543       bc = 0;
544
545   /* notify about complete message */
546   if ((GNUNET_NO == duplicate) &&
547       (0 == mc->bits))
548     {
549       GNUNET_STATISTICS_update(dc->stats,
550                                _("# messages defragmented"),
551                                1,
552                                GNUNET_NO);
553       /* message complete, notify! */
554       dc->proc(dc->cls, mc->msg);
555     }
556   /* send ACK */
557   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
558     {
559       dc->latency = estimate_latency(mc);
560     }
561   delay = GNUNET_TIME_relative_saturating_multiply(dc->latency,
562                                                    bc + 1);
563   if ((last + fid == num_fragments) ||
564       (0 == mc->bits) ||
565       (GNUNET_YES == duplicate))
566     {
567       /* message complete or duplicate or last missing fragment in
568          linear sequence; ACK now! */
569       delay = GNUNET_TIME_UNIT_ZERO;
570     }
571   if (NULL != mc->ack_task)
572     GNUNET_SCHEDULER_cancel(mc->ack_task);
573   mc->ack_task = GNUNET_SCHEDULER_add_delayed(delay,
574                                               &send_ack,
575                                               mc);
576   if (GNUNET_YES == duplicate)
577     {
578       mc->last_duplicate = GNUNET_YES;
579       return GNUNET_NO;
580     }
581   return GNUNET_YES;
582 }
583
584 /* end of defragmentation.c */