4749f537819b8d10a9a11bcde50166b86d4535e3
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Absolute minimum delay we impose between sending and expecting ACK to arrive.
33  */
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
35
36
37 /**
38  * Fragmentation context.
39  */
40 struct GNUNET_FRAGMENT_Context
41 {
42   /**
43    * Statistics to use.
44    */
45   struct GNUNET_STATISTICS_Handle *stats;
46
47   /**
48    * Tracker for flow control.
49    */
50   struct GNUNET_BANDWIDTH_Tracker *tracker;
51
52   /**
53    * Current expected delay for ACKs.
54    */
55   struct GNUNET_TIME_Relative delay;
56
57   /**
58    * Next allowed transmission time.
59    */
60   struct GNUNET_TIME_Absolute delay_until;
61
62   /**
63    * Time we transmitted the last message of the last round.
64    */
65   struct GNUNET_TIME_Absolute last_round;
66
67   /**
68    * Message to fragment (allocated at the end of this struct).
69    */
70   const struct GNUNET_MessageHeader *msg;
71
72   /**
73    * Function to call for transmissions.
74    */
75   GNUNET_FRAGMENT_MessageProcessor proc;
76
77   /**
78    * Closure for 'proc'.
79    */
80   void *proc_cls;
81
82   /**
83    * Bitfield, set to 1 for each unacknowledged fragment.
84    */
85   uint64_t acks;
86
87   /**
88    * Bitfield with all possible bits for 'acks' (used to mask the
89    * ack we get back).
90    */
91   uint64_t acks_mask;
92
93   /**
94    * Task performing work for the fragmenter.
95    */
96   GNUNET_SCHEDULER_TaskIdentifier task;
97
98   /**
99    * Our fragmentation ID. (chosen at random)
100    */
101   uint32_t fragment_id;
102
103   /**
104    * Round-robin selector for the next transmission.
105    */
106   unsigned int next_transmission;
107
108   /**
109    * How many rounds of transmission have we completed so far?
110    */
111   unsigned int num_rounds;
112
113   /**
114    * How many transmission have we completed in this round?
115    */
116   unsigned int num_transmissions;
117
118   /**
119    * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
120    */
121   int8_t proc_busy;
122
123   /**
124    * GNUNET_YES if we are waiting for an ACK.
125    */
126   int8_t wack;
127
128   /**
129    * Target fragment size.
130    */
131   uint16_t mtu;
132
133 };
134
135
136 /**
137  * Transmit the next fragment to the other peer.
138  *
139  * @param cls the 'struct GNUNET_FRAGMENT_Context'
140  * @param tc scheduler context
141  */
142 static void
143 transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
144 {
145   struct GNUNET_FRAGMENT_Context *fc = cls;
146   char msg[fc->mtu];
147   const char *mbuf;
148   struct FragmentHeader *fh;
149   struct GNUNET_TIME_Relative delay;
150   unsigned int bit;
151   size_t size;
152   size_t fsize;
153   int wrap;
154
155   fc->task = GNUNET_SCHEDULER_NO_TASK;
156   GNUNET_assert (GNUNET_NO == fc->proc_busy);
157   if (0 == fc->acks)
158     return;                     /* all done */
159   /* calculate delay */
160   wrap = 0;
161   while (0 == (fc->acks & (1LL << fc->next_transmission)))
162   {
163     fc->next_transmission = (fc->next_transmission + 1) % 64;
164     wrap |= (0 == fc->next_transmission);
165   }
166   bit = fc->next_transmission;
167   size = ntohs (fc->msg->size);
168   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
169     fsize =
170         (size % (fc->mtu - sizeof (struct FragmentHeader))) +
171         sizeof (struct FragmentHeader);
172   else
173     fsize = fc->mtu;
174   if (NULL != fc->tracker)
175     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
176   else
177     delay = GNUNET_TIME_UNIT_ZERO;
178   if (delay.rel_value > 0)
179   {
180     fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
181     return;
182   }
183   fc->next_transmission = (fc->next_transmission + 1) % 64;
184   wrap |= (fc->next_transmission == 0);
185   while (0 == (fc->acks & (1LL << fc->next_transmission)))
186   {
187     fc->next_transmission = (fc->next_transmission + 1) % 64;
188     wrap |= (fc->next_transmission == 0);
189   }
190
191   /* assemble fragmentation message */
192   mbuf = (const char *) &fc[1];
193   fh = (struct FragmentHeader *) msg;
194   fh->header.size = htons (fsize);
195   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
196   fh->fragment_id = htonl (fc->fragment_id);
197   fh->total_size = fc->msg->size;       /* already in big-endian */
198   fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
199   memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
200           fsize - sizeof (struct FragmentHeader));
201   if (NULL != fc->tracker)
202     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
203   GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1,
204                             GNUNET_NO);
205   if (0 != fc->last_round.abs_value)
206     GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1,
207                               GNUNET_NO);
208
209   /* select next message to calculate delay */
210   bit = fc->next_transmission;
211   size = ntohs (fc->msg->size);
212   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
213     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
214   else
215     fsize = fc->mtu;
216   if (NULL != fc->tracker)
217     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
218   else
219     delay = GNUNET_TIME_UNIT_ZERO;
220   if (wrap)
221   {
222     /* full round transmitted wait 2x delay for ACK before going again */
223     fc->num_rounds++;
224     delay =
225         GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
226                                   GNUNET_TIME_relative_multiply (fc->delay,
227                                                                  fc->num_rounds));
228     /* never use zero, need some time for ACK always */
229     delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
230     fc->wack = GNUNET_YES;
231     fc->last_round = GNUNET_TIME_absolute_get ();
232     GNUNET_STATISTICS_update (fc->stats, _("# fragments wrap arounds"), 1,
233                               GNUNET_NO);
234   }
235   fc->proc_busy = GNUNET_YES;
236   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
237   fc->num_transmissions++;
238   fc->proc (fc->proc_cls, &fh->header);
239 }
240
241
242 /**
243  * Create a fragmentation context for the given message.
244  * Fragments the message into fragments of size "mtu" or
245  * less.  Calls 'proc' on each un-acknowledged fragment,
246  * using both the expected 'delay' between messages and
247  * acknowledgements and the given 'tracker' to guide the
248  * frequency of calls to 'proc'.
249  *
250  * @param stats statistics context
251  * @param mtu the maximum message size for each fragment
252  * @param tracker bandwidth tracker to use for flow control (can be NULL)
253  * @param delay expected delay between fragment transmission
254  *              and ACK based on previous messages
255  * @param msg the message to fragment
256  * @param proc function to call for each fragment to transmit
257  * @param proc_cls closure for proc
258  * @return the fragmentation context
259  */
260 struct GNUNET_FRAGMENT_Context *
261 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
262                                 uint16_t mtu,
263                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
264                                 struct GNUNET_TIME_Relative delay,
265                                 const struct GNUNET_MessageHeader *msg,
266                                 GNUNET_FRAGMENT_MessageProcessor proc,
267                                 void *proc_cls)
268 {
269   struct GNUNET_FRAGMENT_Context *fc;
270   size_t size;
271   uint64_t bits;
272   
273   GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO);
274   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
275   size = ntohs (msg->size);
276   GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"),
277                             size, GNUNET_NO);
278   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
279   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
280   fc->stats = stats;
281   fc->mtu = mtu;
282   fc->tracker = tracker;
283   fc->delay = delay;
284   fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
285   fc->proc = proc;
286   fc->proc_cls = proc_cls;
287   fc->fragment_id =
288       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
289   memcpy (&fc[1], msg, size);
290   bits =
291       (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
292                                                            sizeof (struct
293                                                                    FragmentHeader));
294   GNUNET_assert (bits <= 64);
295   if (bits == 64)
296     fc->acks_mask = UINT64_MAX; /* set all 64 bit */
297   else
298     fc->acks_mask = (1LL << bits) - 1;  /* set lowest 'bits' bit */
299   fc->acks = fc->acks_mask;
300   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
301   return fc;
302 }
303
304
305 /**
306  * Continuation to call from the 'proc' function after the fragment
307  * has been transmitted (and hence the next fragment can now be
308  * given to proc).
309  *
310  * @param fc fragmentation context
311  */
312 void
313 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
314 {
315   GNUNET_assert (fc->proc_busy == GNUNET_YES);
316   fc->proc_busy = GNUNET_NO;
317   GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
318   fc->task =
319       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
320                                     (fc->delay_until), &transmit_next, fc);
321 }
322
323
324 /**
325  * Process an acknowledgement message we got from the other
326  * side (to control re-transmits).
327  *
328  * @param fc fragmentation context
329  * @param msg acknowledgement message we received
330  * @return GNUNET_OK if this ack completes the work of the 'fc'
331  *                   (all fragments have been received);
332  *         GNUNET_NO if more messages are pending
333  *         GNUNET_SYSERR if this ack is not valid for this fc
334  */
335 int
336 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
337                              const struct GNUNET_MessageHeader *msg)
338 {
339   const struct FragmentAcknowledgement *fa;
340   uint64_t abits;
341   struct GNUNET_TIME_Relative ndelay;
342
343   if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
344   {
345     GNUNET_break_op (0);
346     return GNUNET_SYSERR;
347   }
348   fa = (const struct FragmentAcknowledgement *) msg;
349   if (ntohl (fa->fragment_id) != fc->fragment_id)
350     return GNUNET_SYSERR;       /* not our ACK */
351   abits = GNUNET_ntohll (fa->bits);
352   if ( (GNUNET_YES == fc->wack) &&
353        (0 != fc->num_transmissions) )
354   {
355     /* normal ACK, can update running average of delay... */
356     fc->wack = GNUNET_NO;
357     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
358     fc->delay.rel_value =
359         (ndelay.rel_value / fc->num_transmissions + 3 * fc->delay.rel_value) / 4;
360     fc->num_transmissions = 0;
361   }
362   GNUNET_STATISTICS_update (fc->stats,
363                             _("# fragment acknowledgements received"), 1,
364                             GNUNET_NO);
365   if (abits != (fc->acks & abits))
366   {
367     /* ID collission or message reordering, count! This should be rare! */
368     GNUNET_STATISTICS_update (fc->stats,
369                               _("# bits removed from fragmentation ACKs"), 1,
370                               GNUNET_NO);
371   }
372   fc->acks = abits & fc->acks_mask;
373   if (0 != fc->acks)
374   {
375     /* more to transmit, do so right now (if tracker permits...) */
376     if (fc->task != GNUNET_SCHEDULER_NO_TASK)
377     {
378       /* schedule next transmission now, no point in waiting... */
379       GNUNET_SCHEDULER_cancel (fc->task);
380       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
381     }
382     else
383     {
384       /* only case where there is no task should be if we're waiting
385        * for the right to transmit again (proc_busy set to YES) */
386       GNUNET_assert (GNUNET_YES == fc->proc_busy);
387     }
388     return GNUNET_NO;
389   }
390
391   /* all done */
392   GNUNET_STATISTICS_update (fc->stats,
393                             _("# fragmentation transmissions completed"), 1,
394                             GNUNET_NO);
395   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
396   {
397     GNUNET_SCHEDULER_cancel (fc->task);
398     fc->task = GNUNET_SCHEDULER_NO_TASK;
399   }
400   return GNUNET_OK;
401 }
402
403
404 /**
405  * Destroy the given fragmentation context (stop calling 'proc', free
406  * resources).
407  *
408  * @param fc fragmentation context
409  * @return average delay between transmission and ACK for the
410  *         last message, FOREVER if the message was not fully transmitted
411  */
412 struct GNUNET_TIME_Relative
413 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
414 {
415   struct GNUNET_TIME_Relative ret;
416
417   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
418     GNUNET_SCHEDULER_cancel (fc->task);
419   ret = fc->delay;
420   GNUNET_free (fc);
421   return ret;
422 }
423
424
425 /* end of fragmentation.c */