error handling
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file src/fragmentation/fragmentation.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Absolute minimum delay we impose between sending and expecting ACK to arrive.
33  */
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply ( \
35     GNUNET_TIME_UNIT_MILLISECONDS, 1)
36
37
38 /**
39  * Fragmentation context.
40  */
41 struct GNUNET_FRAGMENT_Context
42 {
43   /**
44    * Statistics to use.
45    */
46   struct GNUNET_STATISTICS_Handle *stats;
47
48   /**
49    * Tracker for flow control.
50    */
51   struct GNUNET_BANDWIDTH_Tracker *tracker;
52
53   /**
54    * Current expected delay for ACKs.
55    */
56   struct GNUNET_TIME_Relative ack_delay;
57
58   /**
59    * Current expected delay between messages.
60    */
61   struct GNUNET_TIME_Relative msg_delay;
62
63   /**
64    * Next allowed transmission time.
65    */
66   struct GNUNET_TIME_Absolute delay_until;
67
68   /**
69    * Time we transmitted the last message of the last round.
70    */
71   struct GNUNET_TIME_Absolute last_round;
72
73   /**
74    * Message to fragment (allocated at the end of this struct).
75    */
76   const struct GNUNET_MessageHeader *msg;
77
78   /**
79    * Function to call for transmissions.
80    */
81   GNUNET_FRAGMENT_MessageProcessor proc;
82
83   /**
84    * Closure for @e proc.
85    */
86   void *proc_cls;
87
88   /**
89    * Bitfield, set to 1 for each unacknowledged fragment.
90    */
91   uint64_t acks;
92
93   /**
94    * Bitfield with all possible bits for @e acks (used to mask the
95    * ack we get back).
96    */
97   uint64_t acks_mask;
98
99   /**
100    * Task performing work for the fragmenter.
101    */
102   struct GNUNET_SCHEDULER_Task *task;
103
104   /**
105    * Our fragmentation ID. (chosen at random)
106    */
107   uint32_t fragment_id;
108
109   /**
110    * Round-robin selector for the next transmission.
111    */
112   unsigned int next_transmission;
113
114   /**
115    * How many rounds of transmission have we completed so far?
116    */
117   unsigned int num_rounds;
118
119   /**
120    * How many transmission have we completed in this round?
121    */
122   unsigned int num_transmissions;
123
124   /**
125    * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
126    */
127   int8_t proc_busy;
128
129   /**
130    * #GNUNET_YES if we are waiting for an ACK.
131    */
132   int8_t wack;
133
134   /**
135    * Target fragment size.
136    */
137   uint16_t mtu;
138 };
139
140
141 /**
142  * Convert an ACK message to a printable format suitable for logging.
143  *
144  * @param ack message to print
145  * @return ack in human-readable format
146  */
147 const char *
148 GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
149 {
150   static char buf[128];
151   const struct FragmentAcknowledgement *fa;
152
153   if (sizeof(struct FragmentAcknowledgement) !=
154       htons (ack->size))
155     return "<malformed ack>";
156   fa = (const struct FragmentAcknowledgement *) ack;
157   GNUNET_snprintf (buf,
158                    sizeof(buf),
159                    "%u-%llX",
160                    ntohl (fa->fragment_id),
161                    GNUNET_ntohll (fa->bits));
162   return buf;
163 }
164
165
166 /**
167  * Transmit the next fragment to the other peer.
168  *
169  * @param cls the `struct GNUNET_FRAGMENT_Context`
170  */
171 static void
172 transmit_next (void *cls)
173 {
174   struct GNUNET_FRAGMENT_Context *fc = cls;
175   char msg[fc->mtu];
176   const char *mbuf;
177   struct FragmentHeader *fh;
178   struct GNUNET_TIME_Relative delay;
179   unsigned int bit;
180   size_t size;
181   size_t fsize;
182   int wrap;
183
184   fc->task = NULL;
185   GNUNET_assert (GNUNET_NO == fc->proc_busy);
186   if (0 == fc->acks)
187     return;                     /* all done */
188   /* calculate delay */
189   wrap = 0;
190   while (0 == (fc->acks & (1LLU << fc->next_transmission)))
191   {
192     fc->next_transmission = (fc->next_transmission + 1) % 64;
193     wrap |= (0 == fc->next_transmission);
194   }
195   bit = fc->next_transmission;
196   size = ntohs (fc->msg->size);
197   if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
198     fsize =
199       (size % (fc->mtu - sizeof(struct FragmentHeader)))
200       + sizeof(struct FragmentHeader);
201   else
202     fsize = fc->mtu;
203   if (NULL != fc->tracker)
204     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
205                                                 fsize);
206   else
207     delay = GNUNET_TIME_UNIT_ZERO;
208   if (delay.rel_value_us > 0)
209   {
210     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
211                 "Fragmentation logic delays transmission of next fragment by %s\n",
212                 GNUNET_STRINGS_relative_time_to_string (delay,
213                                                         GNUNET_YES));
214     fc->task = GNUNET_SCHEDULER_add_delayed (delay,
215                                              &transmit_next,
216                                              fc);
217     return;
218   }
219   fc->next_transmission = (fc->next_transmission + 1) % 64;
220   wrap |= (0 == fc->next_transmission);
221   while (0 == (fc->acks & (1LLU << fc->next_transmission)))
222   {
223     fc->next_transmission = (fc->next_transmission + 1) % 64;
224     wrap |= (0 == fc->next_transmission);
225   }
226
227   /* assemble fragmentation message */
228   mbuf = (const char *) &fc[1];
229   fh = (struct FragmentHeader *) msg;
230   fh->header.size = htons (fsize);
231   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
232   fh->fragment_id = htonl (fc->fragment_id);
233   fh->total_size = fc->msg->size;       /* already in big-endian */
234   fh->offset = htons ((fc->mtu - sizeof(struct FragmentHeader)) * bit);
235   GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof(struct FragmentHeader))],
236                  fsize - sizeof(struct FragmentHeader));
237   if (NULL != fc->tracker)
238     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
239   GNUNET_STATISTICS_update (fc->stats,
240                             _ ("# fragments transmitted"),
241                             1,
242                             GNUNET_NO);
243   if (0 != fc->last_round.abs_value_us)
244     GNUNET_STATISTICS_update (fc->stats,
245                               _ ("# fragments retransmitted"),
246                               1,
247                               GNUNET_NO);
248
249   /* select next message to calculate delay */
250   bit = fc->next_transmission;
251   size = ntohs (fc->msg->size);
252   if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
253     fsize = size % (fc->mtu - sizeof(struct FragmentHeader));
254   else
255     fsize = fc->mtu;
256   if (NULL != fc->tracker)
257     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
258                                                 fsize);
259   else
260     delay = GNUNET_TIME_UNIT_ZERO;
261   if (fc->num_rounds < 64)
262     delay = GNUNET_TIME_relative_max (delay,
263                                       GNUNET_TIME_relative_saturating_multiply
264                                         (fc->msg_delay,
265                                         (1ULL << fc->num_rounds)));
266   else
267     delay = GNUNET_TIME_UNIT_FOREVER_REL;
268   if (wrap)
269   {
270     /* full round transmitted wait 2x delay for ACK before going again */
271     fc->num_rounds++;
272     delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
273     /* never use zero, need some time for ACK always */
274     delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
275     fc->wack = GNUNET_YES;
276     fc->last_round = GNUNET_TIME_absolute_get ();
277     GNUNET_STATISTICS_update (fc->stats,
278                               _ ("# fragments wrap arounds"),
279                               1,
280                               GNUNET_NO);
281   }
282   fc->proc_busy = GNUNET_YES;
283   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
284   fc->num_transmissions++;
285   fc->proc (fc->proc_cls,
286             &fh->header);
287 }
288
289
290 /**
291  * Create a fragmentation context for the given message.
292  * Fragments the message into fragments of size @a mtu or
293  * less.  Calls @a proc on each un-acknowledged fragment,
294  * using both the expected @a msg_delay between messages and
295  * acknowledgements and the given @a tracker to guide the
296  * frequency of calls to @a proc.
297  *
298  * @param stats statistics context
299  * @param mtu the maximum message size for each fragment
300  * @param tracker bandwidth tracker to use for flow control (can be NULL)
301  * @param msg_delay initial delay to insert between fragment transmissions
302  *              based on previous messages
303  * @param ack_delay expected delay between fragment transmission
304  *              and ACK based on previous messages
305  * @param msg the message to fragment
306  * @param proc function to call for each fragment to transmit
307  * @param proc_cls closure for @a proc
308  * @return the fragmentation context
309  */
310 struct GNUNET_FRAGMENT_Context *
311 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
312                                 uint16_t mtu,
313                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
314                                 struct GNUNET_TIME_Relative msg_delay,
315                                 struct GNUNET_TIME_Relative ack_delay,
316                                 const struct GNUNET_MessageHeader *msg,
317                                 GNUNET_FRAGMENT_MessageProcessor proc,
318                                 void *proc_cls)
319 {
320   struct GNUNET_FRAGMENT_Context *fc;
321   size_t size;
322   uint64_t bits;
323
324   GNUNET_STATISTICS_update (stats,
325                             _ ("# messages fragmented"),
326                             1,
327                             GNUNET_NO);
328   GNUNET_assert (mtu >= 1024 + sizeof(struct FragmentHeader));
329   size = ntohs (msg->size);
330   GNUNET_STATISTICS_update (stats,
331                             _ ("# total size of fragmented messages"),
332                             size, GNUNET_NO);
333   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
334   fc = GNUNET_malloc (sizeof(struct GNUNET_FRAGMENT_Context) + size);
335   fc->stats = stats;
336   fc->mtu = mtu;
337   fc->tracker = tracker;
338   fc->ack_delay = ack_delay;
339   fc->msg_delay = msg_delay;
340   fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
341   fc->proc = proc;
342   fc->proc_cls = proc_cls;
343   fc->fragment_id =
344     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
345                               UINT32_MAX);
346   GNUNET_memcpy (&fc[1], msg, size);
347   bits =
348     (size + mtu - sizeof(struct FragmentHeader) - 1) / (mtu
349                                                         - sizeof(struct
350                                                                  FragmentHeader));
351   GNUNET_assert (bits <= 64);
352   if (bits == 64)
353     fc->acks_mask = UINT64_MAX; /* set all 64 bit */
354   else
355     fc->acks_mask = (1LLU << bits) - 1;  /* set lowest 'bits' bit */
356   fc->acks = fc->acks_mask;
357   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
358   return fc;
359 }
360
361
362 /**
363  * Continuation to call from the 'proc' function after the fragment
364  * has been transmitted (and hence the next fragment can now be
365  * given to proc).
366  *
367  * @param fc fragmentation context
368  */
369 void
370 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
371 {
372   GNUNET_assert (fc->proc_busy == GNUNET_YES);
373   fc->proc_busy = GNUNET_NO;
374   GNUNET_assert (fc->task == NULL);
375   fc->task =
376     GNUNET_SCHEDULER_add_at (fc->delay_until,
377                              &transmit_next,
378                              fc);
379 }
380
381
382 /**
383  * Process an acknowledgement message we got from the other
384  * side (to control re-transmits).
385  *
386  * @param fc fragmentation context
387  * @param msg acknowledgement message we received
388  * @return #GNUNET_OK if this ack completes the work of the 'fc'
389  *                   (all fragments have been received);
390  *         #GNUNET_NO if more messages are pending
391  *         #GNUNET_SYSERR if this ack is not valid for this fc
392  */
393 int
394 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
395                              const struct GNUNET_MessageHeader *msg)
396 {
397   const struct FragmentAcknowledgement *fa;
398   uint64_t abits;
399   struct GNUNET_TIME_Relative ndelay;
400   unsigned int ack_cnt;
401   unsigned int snd_cnt;
402   unsigned int i;
403
404   if (sizeof(struct FragmentAcknowledgement) != ntohs (msg->size))
405   {
406     GNUNET_break_op (0);
407     return GNUNET_SYSERR;
408   }
409   fa = (const struct FragmentAcknowledgement *) msg;
410   if (ntohl (fa->fragment_id) != fc->fragment_id)
411     return GNUNET_SYSERR;       /* not our ACK */
412   abits = GNUNET_ntohll (fa->bits);
413   if ((GNUNET_YES == fc->wack) &&
414       (0 != fc->num_transmissions))
415   {
416     /* normal ACK, can update running average of delay... */
417     fc->wack = GNUNET_NO;
418     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
419     fc->ack_delay.rel_value_us =
420       (ndelay.rel_value_us / fc->num_transmissions + 3
421        * fc->ack_delay.rel_value_us) / 4;
422     /* calculate ratio msg sent vs. msg acked */
423     ack_cnt = 0;
424     snd_cnt = 0;
425     for (i = 0; i < 64; i++)
426     {
427       if (1 == (fc->acks_mask & (1ULL << i)))
428       {
429         snd_cnt++;
430         if (0 == (abits & (1ULL << i)))
431           ack_cnt++;
432       }
433     }
434     if (0 == ack_cnt)
435     {
436       /* complete loss */
437       fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
438                                                                 snd_cnt);
439     }
440     else if (snd_cnt > ack_cnt)
441     {
442       /* some loss, slow down proportionally */
443       fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt)
444                                     / snd_cnt);
445     }
446     else if (snd_cnt == ack_cnt)
447     {
448       fc->msg_delay.rel_value_us =
449         (ndelay.rel_value_us / fc->num_transmissions + 3
450          * fc->msg_delay.rel_value_us) / 5;
451     }
452     fc->num_transmissions = 0;
453     fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
454                                               GNUNET_TIME_UNIT_SECONDS);
455     fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
456                                               GNUNET_TIME_UNIT_SECONDS);
457   }
458   GNUNET_STATISTICS_update (fc->stats,
459                             _ ("# fragment acknowledgements received"),
460                             1,
461                             GNUNET_NO);
462   if (abits != (fc->acks & abits))
463   {
464     /* ID collission or message reordering, count! This should be rare! */
465     GNUNET_STATISTICS_update (fc->stats,
466                               _ ("# bits removed from fragmentation ACKs"), 1,
467                               GNUNET_NO);
468   }
469   fc->acks = abits & fc->acks_mask;
470   if (0 != fc->acks)
471   {
472     /* more to transmit, do so right now (if tracker permits...) */
473     if (fc->task != NULL)
474     {
475       /* schedule next transmission now, no point in waiting... */
476       GNUNET_SCHEDULER_cancel (fc->task);
477       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
478     }
479     else
480     {
481       /* only case where there is no task should be if we're waiting
482        * for the right to transmit again (proc_busy set to YES) */
483       GNUNET_assert (GNUNET_YES == fc->proc_busy);
484     }
485     return GNUNET_NO;
486   }
487
488   /* all done */
489   GNUNET_STATISTICS_update (fc->stats,
490                             _ ("# fragmentation transmissions completed"),
491                             1,
492                             GNUNET_NO);
493   if (NULL != fc->task)
494   {
495     GNUNET_SCHEDULER_cancel (fc->task);
496     fc->task = NULL;
497   }
498   return GNUNET_OK;
499 }
500
501
502 /**
503  * Destroy the given fragmentation context (stop calling 'proc', free
504  * resources).
505  *
506  * @param fc fragmentation context
507  * @param msg_delay where to store average delay between individual message transmissions the
508  *         last message (OUT only)
509  * @param ack_delay where to store average delay between transmission and ACK for the
510  *         last message, set to FOREVER if the message was not fully transmitted (OUT only)
511  */
512 void
513 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
514                                  struct GNUNET_TIME_Relative *msg_delay,
515                                  struct GNUNET_TIME_Relative *ack_delay)
516 {
517   if (fc->task != NULL)
518     GNUNET_SCHEDULER_cancel (fc->task);
519   if (NULL != ack_delay)
520     *ack_delay = fc->ack_delay;
521   if (NULL != msg_delay)
522     *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
523                                                            fc->num_rounds);
524   GNUNET_free (fc);
525 }
526
527
528 /* end of fragmentation.c */