-Merge branch 'master' of ssh://gnunet.org/gnunet into gsoc2018/rest_api
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file src/fragmentation/fragmentation.c
20  * @brief library to help fragment messages
21  * @author Christian Grothoff
22  */
23 #include "platform.h"
24 #include "gnunet_fragmentation_lib.h"
25 #include "gnunet_protocols.h"
26 #include "fragmentation.h"
27
28
29 /**
30  * Absolute minimum delay we impose between sending and expecting ACK to arrive.
31  */
32 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
33
34
35 /**
36  * Fragmentation context.
37  */
38 struct GNUNET_FRAGMENT_Context
39 {
40   /**
41    * Statistics to use.
42    */
43   struct GNUNET_STATISTICS_Handle *stats;
44
45   /**
46    * Tracker for flow control.
47    */
48   struct GNUNET_BANDWIDTH_Tracker *tracker;
49
50   /**
51    * Current expected delay for ACKs.
52    */
53   struct GNUNET_TIME_Relative ack_delay;
54
55   /**
56    * Current expected delay between messages.
57    */
58   struct GNUNET_TIME_Relative msg_delay;
59
60   /**
61    * Next allowed transmission time.
62    */
63   struct GNUNET_TIME_Absolute delay_until;
64
65   /**
66    * Time we transmitted the last message of the last round.
67    */
68   struct GNUNET_TIME_Absolute last_round;
69
70   /**
71    * Message to fragment (allocated at the end of this struct).
72    */
73   const struct GNUNET_MessageHeader *msg;
74
75   /**
76    * Function to call for transmissions.
77    */
78   GNUNET_FRAGMENT_MessageProcessor proc;
79
80   /**
81    * Closure for @e proc.
82    */
83   void *proc_cls;
84
85   /**
86    * Bitfield, set to 1 for each unacknowledged fragment.
87    */
88   uint64_t acks;
89
90   /**
91    * Bitfield with all possible bits for @e acks (used to mask the
92    * ack we get back).
93    */
94   uint64_t acks_mask;
95
96   /**
97    * Task performing work for the fragmenter.
98    */
99   struct GNUNET_SCHEDULER_Task *task;
100
101   /**
102    * Our fragmentation ID. (chosen at random)
103    */
104   uint32_t fragment_id;
105
106   /**
107    * Round-robin selector for the next transmission.
108    */
109   unsigned int next_transmission;
110
111   /**
112    * How many rounds of transmission have we completed so far?
113    */
114   unsigned int num_rounds;
115
116   /**
117    * How many transmission have we completed in this round?
118    */
119   unsigned int num_transmissions;
120
121   /**
122    * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
123    */
124   int8_t proc_busy;
125
126   /**
127    * #GNUNET_YES if we are waiting for an ACK.
128    */
129   int8_t wack;
130
131   /**
132    * Target fragment size.
133    */
134   uint16_t mtu;
135
136 };
137
138
139 /**
140  * Convert an ACK message to a printable format suitable for logging.
141  *
142  * @param ack message to print
143  * @return ack in human-readable format
144  */
145 const char *
146 GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
147 {
148   static char buf[128];
149   const struct FragmentAcknowledgement *fa;
150
151   if (sizeof (struct FragmentAcknowledgement) !=
152       htons (ack->size))
153     return "<malformed ack>";
154   fa = (const struct FragmentAcknowledgement *) ack;
155   GNUNET_snprintf (buf,
156                    sizeof (buf),
157                    "%u-%llX",
158                    ntohl (fa->fragment_id),
159                    GNUNET_ntohll (fa->bits));
160   return buf;
161 }
162
163
164 /**
165  * Transmit the next fragment to the other peer.
166  *
167  * @param cls the `struct GNUNET_FRAGMENT_Context`
168  */
169 static void
170 transmit_next (void *cls)
171 {
172   struct GNUNET_FRAGMENT_Context *fc = cls;
173   char msg[fc->mtu];
174   const char *mbuf;
175   struct FragmentHeader *fh;
176   struct GNUNET_TIME_Relative delay;
177   unsigned int bit;
178   size_t size;
179   size_t fsize;
180   int wrap;
181
182   fc->task = NULL;
183   GNUNET_assert (GNUNET_NO == fc->proc_busy);
184   if (0 == fc->acks)
185     return;                     /* all done */
186   /* calculate delay */
187   wrap = 0;
188   while (0 == (fc->acks & (1LLU << fc->next_transmission)))
189   {
190     fc->next_transmission = (fc->next_transmission + 1) % 64;
191     wrap |= (0 == fc->next_transmission);
192   }
193   bit = fc->next_transmission;
194   size = ntohs (fc->msg->size);
195   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
196     fsize =
197         (size % (fc->mtu - sizeof (struct FragmentHeader))) +
198         sizeof (struct FragmentHeader);
199   else
200     fsize = fc->mtu;
201   if (NULL != fc->tracker)
202     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
203                                                 fsize);
204   else
205     delay = GNUNET_TIME_UNIT_ZERO;
206   if (delay.rel_value_us > 0)
207   {
208     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
209                 "Fragmentation logic delays transmission of next fragment by %s\n",
210                 GNUNET_STRINGS_relative_time_to_string (delay,
211                                                         GNUNET_YES));
212     fc->task = GNUNET_SCHEDULER_add_delayed (delay,
213                                              &transmit_next,
214                                              fc);
215     return;
216   }
217   fc->next_transmission = (fc->next_transmission + 1) % 64;
218   wrap |= (0 == fc->next_transmission);
219   while (0 == (fc->acks & (1LLU << fc->next_transmission)))
220   {
221     fc->next_transmission = (fc->next_transmission + 1) % 64;
222     wrap |= (0 == fc->next_transmission);
223   }
224
225   /* assemble fragmentation message */
226   mbuf = (const char *) &fc[1];
227   fh = (struct FragmentHeader *) msg;
228   fh->header.size = htons (fsize);
229   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
230   fh->fragment_id = htonl (fc->fragment_id);
231   fh->total_size = fc->msg->size;       /* already in big-endian */
232   fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
233   GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
234           fsize - sizeof (struct FragmentHeader));
235   if (NULL != fc->tracker)
236     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
237   GNUNET_STATISTICS_update (fc->stats,
238                             _("# fragments transmitted"),
239                             1,
240                             GNUNET_NO);
241   if (0 != fc->last_round.abs_value_us)
242     GNUNET_STATISTICS_update (fc->stats,
243                               _("# fragments retransmitted"),
244                               1,
245                               GNUNET_NO);
246
247   /* select next message to calculate delay */
248   bit = fc->next_transmission;
249   size = ntohs (fc->msg->size);
250   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
251     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
252   else
253     fsize = fc->mtu;
254   if (NULL != fc->tracker)
255     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
256                                                 fsize);
257   else
258     delay = GNUNET_TIME_UNIT_ZERO;
259   if (fc->num_rounds < 64)
260     delay = GNUNET_TIME_relative_max (delay,
261                                       GNUNET_TIME_relative_saturating_multiply
262                                       (fc->msg_delay,
263                                        (1ULL << fc->num_rounds)));
264   else
265     delay = GNUNET_TIME_UNIT_FOREVER_REL;
266   if (wrap)
267   {
268     /* full round transmitted wait 2x delay for ACK before going again */
269     fc->num_rounds++;
270     delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
271     /* never use zero, need some time for ACK always */
272     delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
273     fc->wack = GNUNET_YES;
274     fc->last_round = GNUNET_TIME_absolute_get ();
275     GNUNET_STATISTICS_update (fc->stats,
276                               _("# fragments wrap arounds"),
277                               1,
278                               GNUNET_NO);
279   }
280   fc->proc_busy = GNUNET_YES;
281   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
282   fc->num_transmissions++;
283   fc->proc (fc->proc_cls,
284             &fh->header);
285 }
286
287
288 /**
289  * Create a fragmentation context for the given message.
290  * Fragments the message into fragments of size @a mtu or
291  * less.  Calls @a proc on each un-acknowledged fragment,
292  * using both the expected @a msg_delay between messages and
293  * acknowledgements and the given @a tracker to guide the
294  * frequency of calls to @a proc.
295  *
296  * @param stats statistics context
297  * @param mtu the maximum message size for each fragment
298  * @param tracker bandwidth tracker to use for flow control (can be NULL)
299  * @param msg_delay initial delay to insert between fragment transmissions
300  *              based on previous messages
301  * @param ack_delay expected delay between fragment transmission
302  *              and ACK based on previous messages
303  * @param msg the message to fragment
304  * @param proc function to call for each fragment to transmit
305  * @param proc_cls closure for @a proc
306  * @return the fragmentation context
307  */
308 struct GNUNET_FRAGMENT_Context *
309 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
310                                 uint16_t mtu,
311                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
312                                 struct GNUNET_TIME_Relative msg_delay,
313                                 struct GNUNET_TIME_Relative ack_delay,
314                                 const struct GNUNET_MessageHeader *msg,
315                                 GNUNET_FRAGMENT_MessageProcessor proc,
316                                 void *proc_cls)
317 {
318   struct GNUNET_FRAGMENT_Context *fc;
319   size_t size;
320   uint64_t bits;
321
322   GNUNET_STATISTICS_update (stats,
323                             _("# messages fragmented"),
324                             1,
325                             GNUNET_NO);
326   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
327   size = ntohs (msg->size);
328   GNUNET_STATISTICS_update (stats,
329                             _("# total size of fragmented messages"),
330                             size, GNUNET_NO);
331   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
332   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
333   fc->stats = stats;
334   fc->mtu = mtu;
335   fc->tracker = tracker;
336   fc->ack_delay = ack_delay;
337   fc->msg_delay = msg_delay;
338   fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
339   fc->proc = proc;
340   fc->proc_cls = proc_cls;
341   fc->fragment_id =
342       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
343                                 UINT32_MAX);
344   GNUNET_memcpy (&fc[1], msg, size);
345   bits =
346       (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
347                                                            sizeof (struct
348                                                                    FragmentHeader));
349   GNUNET_assert (bits <= 64);
350   if (bits == 64)
351     fc->acks_mask = UINT64_MAX; /* set all 64 bit */
352   else
353     fc->acks_mask = (1LLU << bits) - 1;  /* set lowest 'bits' bit */
354   fc->acks = fc->acks_mask;
355   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
356   return fc;
357 }
358
359
360 /**
361  * Continuation to call from the 'proc' function after the fragment
362  * has been transmitted (and hence the next fragment can now be
363  * given to proc).
364  *
365  * @param fc fragmentation context
366  */
367 void
368 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
369 {
370   GNUNET_assert (fc->proc_busy == GNUNET_YES);
371   fc->proc_busy = GNUNET_NO;
372   GNUNET_assert (fc->task == NULL);
373   fc->task =
374     GNUNET_SCHEDULER_add_at (fc->delay_until,
375                              &transmit_next,
376                              fc);
377 }
378
379
380 /**
381  * Process an acknowledgement message we got from the other
382  * side (to control re-transmits).
383  *
384  * @param fc fragmentation context
385  * @param msg acknowledgement message we received
386  * @return #GNUNET_OK if this ack completes the work of the 'fc'
387  *                   (all fragments have been received);
388  *         #GNUNET_NO if more messages are pending
389  *         #GNUNET_SYSERR if this ack is not valid for this fc
390  */
391 int
392 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
393                              const struct GNUNET_MessageHeader *msg)
394 {
395   const struct FragmentAcknowledgement *fa;
396   uint64_t abits;
397   struct GNUNET_TIME_Relative ndelay;
398   unsigned int ack_cnt;
399   unsigned int snd_cnt;
400   unsigned int i;
401
402   if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
403   {
404     GNUNET_break_op (0);
405     return GNUNET_SYSERR;
406   }
407   fa = (const struct FragmentAcknowledgement *) msg;
408   if (ntohl (fa->fragment_id) != fc->fragment_id)
409     return GNUNET_SYSERR;       /* not our ACK */
410   abits = GNUNET_ntohll (fa->bits);
411   if ( (GNUNET_YES == fc->wack) &&
412        (0 != fc->num_transmissions) )
413   {
414     /* normal ACK, can update running average of delay... */
415     fc->wack = GNUNET_NO;
416     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
417     fc->ack_delay.rel_value_us =
418         (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
419     /* calculate ratio msg sent vs. msg acked */
420     ack_cnt = 0;
421     snd_cnt = 0;
422     for (i=0;i<64;i++)
423     {
424       if (1 == (fc->acks_mask & (1ULL << i)))
425       {
426         snd_cnt++;
427         if (0 == (abits & (1ULL << i)))
428           ack_cnt++;
429       }
430     }
431     if (0 == ack_cnt)
432     {
433       /* complete loss */
434       fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
435                                                                 snd_cnt);
436     }
437     else if (snd_cnt > ack_cnt)
438     {
439       /* some loss, slow down proportionally */
440       fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
441     }
442     else if (snd_cnt == ack_cnt)
443     {
444       fc->msg_delay.rel_value_us =
445         (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
446     }
447     fc->num_transmissions = 0;
448     fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
449                                               GNUNET_TIME_UNIT_SECONDS);
450     fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
451                                               GNUNET_TIME_UNIT_SECONDS);
452   }
453   GNUNET_STATISTICS_update (fc->stats,
454                             _("# fragment acknowledgements received"),
455                             1,
456                             GNUNET_NO);
457   if (abits != (fc->acks & abits))
458   {
459     /* ID collission or message reordering, count! This should be rare! */
460     GNUNET_STATISTICS_update (fc->stats,
461                               _("# bits removed from fragmentation ACKs"), 1,
462                               GNUNET_NO);
463   }
464   fc->acks = abits & fc->acks_mask;
465   if (0 != fc->acks)
466   {
467     /* more to transmit, do so right now (if tracker permits...) */
468     if (fc->task != NULL)
469     {
470       /* schedule next transmission now, no point in waiting... */
471       GNUNET_SCHEDULER_cancel (fc->task);
472       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
473     }
474     else
475     {
476       /* only case where there is no task should be if we're waiting
477        * for the right to transmit again (proc_busy set to YES) */
478       GNUNET_assert (GNUNET_YES == fc->proc_busy);
479     }
480     return GNUNET_NO;
481   }
482
483   /* all done */
484   GNUNET_STATISTICS_update (fc->stats,
485                             _("# fragmentation transmissions completed"),
486                             1,
487                             GNUNET_NO);
488   if (NULL != fc->task)
489   {
490     GNUNET_SCHEDULER_cancel (fc->task);
491     fc->task = NULL;
492   }
493   return GNUNET_OK;
494 }
495
496
497 /**
498  * Destroy the given fragmentation context (stop calling 'proc', free
499  * resources).
500  *
501  * @param fc fragmentation context
502  * @param msg_delay where to store average delay between individual message transmissions the
503  *         last message (OUT only)
504  * @param ack_delay where to store average delay between transmission and ACK for the
505  *         last message, set to FOREVER if the message was not fully transmitted (OUT only)
506  */
507 void
508 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
509                                  struct GNUNET_TIME_Relative *msg_delay,
510                                  struct GNUNET_TIME_Relative *ack_delay)
511 {
512   if (fc->task != NULL)
513     GNUNET_SCHEDULER_cancel (fc->task);
514   if (NULL != ack_delay)
515     *ack_delay = fc->ack_delay;
516   if (NULL != msg_delay)
517     *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
518                                                            fc->num_rounds);
519   GNUNET_free (fc);
520 }
521
522
523 /* end of fragmentation.c */