- ttl is deprecated, don't warn
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Absolute minimum delay we impose between sending and expecting ACK to arrive.
33  */
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
35
36
37 /**
38  * Fragmentation context.
39  */
40 struct GNUNET_FRAGMENT_Context
41 {
42   /**
43    * Statistics to use.
44    */
45   struct GNUNET_STATISTICS_Handle *stats;
46
47   /**
48    * Tracker for flow control.
49    */
50   struct GNUNET_BANDWIDTH_Tracker *tracker;
51
52   /**
53    * Current expected delay for ACKs.
54    */
55   struct GNUNET_TIME_Relative ack_delay;
56
57   /**
58    * Current expected delay between messages.
59    */
60   struct GNUNET_TIME_Relative msg_delay;
61
62   /**
63    * Next allowed transmission time.
64    */
65   struct GNUNET_TIME_Absolute delay_until;
66
67   /**
68    * Time we transmitted the last message of the last round.
69    */
70   struct GNUNET_TIME_Absolute last_round;
71
72   /**
73    * Message to fragment (allocated at the end of this struct).
74    */
75   const struct GNUNET_MessageHeader *msg;
76
77   /**
78    * Function to call for transmissions.
79    */
80   GNUNET_FRAGMENT_MessageProcessor proc;
81
82   /**
83    * Closure for @e proc.
84    */
85   void *proc_cls;
86
87   /**
88    * Bitfield, set to 1 for each unacknowledged fragment.
89    */
90   uint64_t acks;
91
92   /**
93    * Bitfield with all possible bits for @e acks (used to mask the
94    * ack we get back).
95    */
96   uint64_t acks_mask;
97
98   /**
99    * Task performing work for the fragmenter.
100    */
101   struct GNUNET_SCHEDULER_Task *task;
102
103   /**
104    * Our fragmentation ID. (chosen at random)
105    */
106   uint32_t fragment_id;
107
108   /**
109    * Round-robin selector for the next transmission.
110    */
111   unsigned int next_transmission;
112
113   /**
114    * How many rounds of transmission have we completed so far?
115    */
116   unsigned int num_rounds;
117
118   /**
119    * How many transmission have we completed in this round?
120    */
121   unsigned int num_transmissions;
122
123   /**
124    * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
125    */
126   int8_t proc_busy;
127
128   /**
129    * #GNUNET_YES if we are waiting for an ACK.
130    */
131   int8_t wack;
132
133   /**
134    * Target fragment size.
135    */
136   uint16_t mtu;
137
138 };
139
140
141 /**
142  * Convert an ACK message to a printable format suitable for logging.
143  *
144  * @param ack message to print
145  * @return ack in human-readable format
146  */
147 const char *
148 GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
149 {
150   static char buf[128];
151   const struct FragmentAcknowledgement *fa;
152
153   if (sizeof (struct FragmentAcknowledgement) !=
154       htons (ack->size))
155     return "<malformed ack>";
156   fa = (const struct FragmentAcknowledgement *) ack;
157   GNUNET_snprintf (buf,
158                    sizeof (buf),
159                    "%u-%llX",
160                    ntohl (fa->fragment_id),
161                    GNUNET_ntohll (fa->bits));
162   return buf;
163 }
164
165
166 /**
167  * Transmit the next fragment to the other peer.
168  *
169  * @param cls the `struct GNUNET_FRAGMENT_Context`
170  * @param tc scheduler context
171  */
172 static void
173 transmit_next (void *cls,
174                const struct GNUNET_SCHEDULER_TaskContext *tc)
175 {
176   struct GNUNET_FRAGMENT_Context *fc = cls;
177   char msg[fc->mtu];
178   const char *mbuf;
179   struct FragmentHeader *fh;
180   struct GNUNET_TIME_Relative delay;
181   unsigned int bit;
182   size_t size;
183   size_t fsize;
184   int wrap;
185
186   fc->task = NULL;
187   GNUNET_assert (GNUNET_NO == fc->proc_busy);
188   if (0 == fc->acks)
189     return;                     /* all done */
190   /* calculate delay */
191   wrap = 0;
192   while (0 == (fc->acks & (1LL << fc->next_transmission)))
193   {
194     fc->next_transmission = (fc->next_transmission + 1) % 64;
195     wrap |= (0 == fc->next_transmission);
196   }
197   bit = fc->next_transmission;
198   size = ntohs (fc->msg->size);
199   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
200     fsize =
201         (size % (fc->mtu - sizeof (struct FragmentHeader))) +
202         sizeof (struct FragmentHeader);
203   else
204     fsize = fc->mtu;
205   if (NULL != fc->tracker)
206     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
207   else
208     delay = GNUNET_TIME_UNIT_ZERO;
209   if (delay.rel_value_us > 0)
210   {
211     fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
212     return;
213   }
214   fc->next_transmission = (fc->next_transmission + 1) % 64;
215   wrap |= (0 == fc->next_transmission);
216   while (0 == (fc->acks & (1LL << fc->next_transmission)))
217   {
218     fc->next_transmission = (fc->next_transmission + 1) % 64;
219     wrap |= (0 == fc->next_transmission);
220   }
221
222   /* assemble fragmentation message */
223   mbuf = (const char *) &fc[1];
224   fh = (struct FragmentHeader *) msg;
225   fh->header.size = htons (fsize);
226   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
227   fh->fragment_id = htonl (fc->fragment_id);
228   fh->total_size = fc->msg->size;       /* already in big-endian */
229   fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
230   memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
231           fsize - sizeof (struct FragmentHeader));
232   if (NULL != fc->tracker)
233     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
234   GNUNET_STATISTICS_update (fc->stats,
235                             _("# fragments transmitted"),
236                             1,
237                             GNUNET_NO);
238   if (0 != fc->last_round.abs_value_us)
239     GNUNET_STATISTICS_update (fc->stats,
240                               _("# fragments retransmitted"),
241                               1,
242                               GNUNET_NO);
243
244   /* select next message to calculate delay */
245   bit = fc->next_transmission;
246   size = ntohs (fc->msg->size);
247   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
248     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
249   else
250     fsize = fc->mtu;
251   if (NULL != fc->tracker)
252     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
253                                                 fsize);
254   else
255     delay = GNUNET_TIME_UNIT_ZERO;
256   delay = GNUNET_TIME_relative_max (delay,
257                                     GNUNET_TIME_relative_multiply (fc->msg_delay,
258                                                                    (1 << fc->num_rounds)));
259   if (wrap)
260   {
261     /* full round transmitted wait 2x delay for ACK before going again */
262     fc->num_rounds++;
263     delay = GNUNET_TIME_relative_multiply (fc->ack_delay, 2);
264     /* never use zero, need some time for ACK always */
265     delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
266     fc->wack = GNUNET_YES;
267     fc->last_round = GNUNET_TIME_absolute_get ();
268     GNUNET_STATISTICS_update (fc->stats,
269                               _("# fragments wrap arounds"),
270                               1,
271                               GNUNET_NO);
272   }
273   fc->proc_busy = GNUNET_YES;
274   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
275   fc->num_transmissions++;
276   fc->proc (fc->proc_cls, &fh->header);
277 }
278
279
280 /**
281  * Create a fragmentation context for the given message.
282  * Fragments the message into fragments of size @a mtu or
283  * less.  Calls @a proc on each un-acknowledged fragment,
284  * using both the expected @a msg_delay between messages and
285  * acknowledgements and the given @a tracker to guide the
286  * frequency of calls to @a proc.
287  *
288  * @param stats statistics context
289  * @param mtu the maximum message size for each fragment
290  * @param tracker bandwidth tracker to use for flow control (can be NULL)
291  * @param msg_delay initial delay to insert between fragment transmissions
292  *              based on previous messages
293  * @param ack_delay expected delay between fragment transmission
294  *              and ACK based on previous messages
295  * @param msg the message to fragment
296  * @param proc function to call for each fragment to transmit
297  * @param proc_cls closure for @a proc
298  * @return the fragmentation context
299  */
300 struct GNUNET_FRAGMENT_Context *
301 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
302                                 uint16_t mtu,
303                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
304                                 struct GNUNET_TIME_Relative msg_delay,
305                                 struct GNUNET_TIME_Relative ack_delay,
306                                 const struct GNUNET_MessageHeader *msg,
307                                 GNUNET_FRAGMENT_MessageProcessor proc,
308                                 void *proc_cls)
309 {
310   struct GNUNET_FRAGMENT_Context *fc;
311   size_t size;
312   uint64_t bits;
313
314   GNUNET_STATISTICS_update (stats,
315                             _("# messages fragmented"),
316                             1,
317                             GNUNET_NO);
318   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
319   size = ntohs (msg->size);
320   GNUNET_STATISTICS_update (stats,
321                             _("# total size of fragmented messages"),
322                             size, GNUNET_NO);
323   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
324   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
325   fc->stats = stats;
326   fc->mtu = mtu;
327   fc->tracker = tracker;
328   fc->ack_delay = ack_delay;
329   fc->msg_delay = msg_delay;
330   fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
331   fc->proc = proc;
332   fc->proc_cls = proc_cls;
333   fc->fragment_id =
334       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
335   memcpy (&fc[1], msg, size);
336   bits =
337       (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
338                                                            sizeof (struct
339                                                                    FragmentHeader));
340   GNUNET_assert (bits <= 64);
341   if (bits == 64)
342     fc->acks_mask = UINT64_MAX; /* set all 64 bit */
343   else
344     fc->acks_mask = (1LL << bits) - 1;  /* set lowest 'bits' bit */
345   fc->acks = fc->acks_mask;
346   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
347   return fc;
348 }
349
350
351 /**
352  * Continuation to call from the 'proc' function after the fragment
353  * has been transmitted (and hence the next fragment can now be
354  * given to proc).
355  *
356  * @param fc fragmentation context
357  */
358 void
359 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
360 {
361   GNUNET_assert (fc->proc_busy == GNUNET_YES);
362   fc->proc_busy = GNUNET_NO;
363   GNUNET_assert (fc->task == NULL);
364   fc->task =
365       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
366                                     (fc->delay_until), &transmit_next, fc);
367 }
368
369
370 /**
371  * Process an acknowledgement message we got from the other
372  * side (to control re-transmits).
373  *
374  * @param fc fragmentation context
375  * @param msg acknowledgement message we received
376  * @return #GNUNET_OK if this ack completes the work of the 'fc'
377  *                   (all fragments have been received);
378  *         #GNUNET_NO if more messages are pending
379  *         #GNUNET_SYSERR if this ack is not valid for this fc
380  */
381 int
382 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
383                              const struct GNUNET_MessageHeader *msg)
384 {
385   const struct FragmentAcknowledgement *fa;
386   uint64_t abits;
387   struct GNUNET_TIME_Relative ndelay;
388   unsigned int ack_cnt;
389   unsigned int snd_cnt;
390   unsigned int i;
391
392   if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
393   {
394     GNUNET_break_op (0);
395     return GNUNET_SYSERR;
396   }
397   fa = (const struct FragmentAcknowledgement *) msg;
398   if (ntohl (fa->fragment_id) != fc->fragment_id)
399     return GNUNET_SYSERR;       /* not our ACK */
400   abits = GNUNET_ntohll (fa->bits);
401   if ( (GNUNET_YES == fc->wack) &&
402        (0 != fc->num_transmissions) )
403   {
404     /* normal ACK, can update running average of delay... */
405     fc->wack = GNUNET_NO;
406     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
407     fc->ack_delay.rel_value_us =
408         (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
409     /* calculate ratio msg sent vs. msg acked */
410     ack_cnt = 0;
411     snd_cnt = 0;
412     for (i=0;i<64;i++)
413     {
414       if (1 == (fc->acks_mask & (1 << i)))
415       {
416         snd_cnt++;
417         if (0 == (abits & (1 << i)))
418           ack_cnt++;
419       }
420     }
421     if (0 == ack_cnt)
422     {
423       /* complete loss */
424       fc->msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay,
425                                                      snd_cnt);
426     }
427     else if (snd_cnt > ack_cnt)
428     {
429       /* some loss, slow down proportionally */
430       fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
431     }
432     else if (snd_cnt == ack_cnt)
433     {
434       fc->msg_delay.rel_value_us =
435         (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
436     }
437     fc->num_transmissions = 0;
438     fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
439                                               GNUNET_TIME_UNIT_SECONDS);
440     fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
441                                               GNUNET_TIME_UNIT_SECONDS);
442   }
443   GNUNET_STATISTICS_update (fc->stats,
444                             _("# fragment acknowledgements received"),
445                             1,
446                             GNUNET_NO);
447   if (abits != (fc->acks & abits))
448   {
449     /* ID collission or message reordering, count! This should be rare! */
450     GNUNET_STATISTICS_update (fc->stats,
451                               _("# bits removed from fragmentation ACKs"), 1,
452                               GNUNET_NO);
453   }
454   fc->acks = abits & fc->acks_mask;
455   if (0 != fc->acks)
456   {
457     /* more to transmit, do so right now (if tracker permits...) */
458     if (fc->task != NULL)
459     {
460       /* schedule next transmission now, no point in waiting... */
461       GNUNET_SCHEDULER_cancel (fc->task);
462       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
463     }
464     else
465     {
466       /* only case where there is no task should be if we're waiting
467        * for the right to transmit again (proc_busy set to YES) */
468       GNUNET_assert (GNUNET_YES == fc->proc_busy);
469     }
470     return GNUNET_NO;
471   }
472
473   /* all done */
474   GNUNET_STATISTICS_update (fc->stats,
475                             _("# fragmentation transmissions completed"),
476                             1,
477                             GNUNET_NO);
478   if (NULL != fc->task)
479   {
480     GNUNET_SCHEDULER_cancel (fc->task);
481     fc->task = NULL;
482   }
483   return GNUNET_OK;
484 }
485
486
487 /**
488  * Destroy the given fragmentation context (stop calling 'proc', free
489  * resources).
490  *
491  * @param fc fragmentation context
492  * @param msg_delay where to store average delay between individual message transmissions the
493  *         last message (OUT only)
494  * @param ack_delay where to store average delay between transmission and ACK for the
495  *         last message, set to FOREVER if the message was not fully transmitted (OUT only)
496  */
497 void
498 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
499                                  struct GNUNET_TIME_Relative *msg_delay,
500                                  struct GNUNET_TIME_Relative *ack_delay)
501 {
502   if (fc->task != NULL)
503     GNUNET_SCHEDULER_cancel (fc->task);
504   if (NULL != ack_delay)
505     *ack_delay = fc->ack_delay;
506   if (NULL != msg_delay)
507     *msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay,
508                                                 fc->num_rounds);
509   GNUNET_free (fc);
510 }
511
512
513 /* end of fragmentation.c */