consider number of retransmissions for delay calculation
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Fragmentation context.
33  */
34 struct GNUNET_FRAGMENT_Context
35 {
36   /**
37    * Statistics to use.
38    */
39   struct GNUNET_STATISTICS_Handle *stats;
40
41   /**
42    * Tracker for flow control.
43    */
44   struct GNUNET_BANDWIDTH_Tracker *tracker;
45
46   /**
47    * Current expected delay for ACKs.
48    */
49   struct GNUNET_TIME_Relative delay;
50
51   /**
52    * Next allowed transmission time.
53    */
54   struct GNUNET_TIME_Absolute delay_until;
55
56   /**
57    * Time we transmitted the last message of the last round.
58    */
59   struct GNUNET_TIME_Absolute last_round;
60
61   /**
62    * Message to fragment (allocated at the end of this struct).
63    */
64   const struct GNUNET_MessageHeader *msg;
65
66   /**
67    * Function to call for transmissions.
68    */
69   GNUNET_FRAGMENT_MessageProcessor proc;
70
71   /**
72    * Closure for 'proc'.
73    */
74   void *proc_cls;
75
76   /**
77    * Bitfield, set to 1 for each unacknowledged fragment.
78    */
79   uint64_t acks;
80
81   /**
82    * Bitfield with all possible bits for 'acks' (used to mask the
83    * ack we get back).
84    */
85   uint64_t acks_mask;
86
87   /**
88    * Task performing work for the fragmenter.
89    */
90   GNUNET_SCHEDULER_TaskIdentifier task;
91
92   /**
93    * Our fragmentation ID. (chosen at random)
94    */
95   uint32_t fragment_id;
96
97   /**
98    * Round-robin selector for the next transmission.
99    */
100   unsigned int next_transmission;
101
102   /**
103    * How many rounds of transmission have we completed so far?
104    */
105   unsigned int num_rounds;
106
107   /**
108    * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
109    */
110   int8_t proc_busy;
111
112   /**
113    * GNUNET_YES if we are waiting for an ACK.
114    */
115   int8_t wack;
116
117   /**
118    * Target fragment size.
119    */
120   uint16_t mtu;  
121
122 };
123
124
125 /**
126  * Transmit the next fragment to the other peer.
127  *
128  * @param cls the 'struct GNUNET_FRAGMENT_Context'
129  * @param tc scheduler context
130  */
131 static void
132 transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
133 {
134   struct GNUNET_FRAGMENT_Context *fc = cls;
135   char msg[fc->mtu];
136   const char *mbuf;
137   struct FragmentHeader *fh;
138   struct GNUNET_TIME_Relative delay;
139   unsigned int bit;
140   size_t size;
141   size_t fsize;
142   int wrap;
143
144   fc->task = GNUNET_SCHEDULER_NO_TASK;
145   GNUNET_assert (GNUNET_NO == fc->proc_busy);
146   if (0 == fc->acks)
147     return;                     /* all done */
148
149   /* calculate delay */
150   wrap = 0;
151   while (0 == (fc->acks & (1LL << fc->next_transmission)))
152   {
153     fc->next_transmission = (fc->next_transmission + 1) % 64;
154     wrap |= (fc->next_transmission == 0);
155   }
156   bit = fc->next_transmission;
157   size = ntohs (fc->msg->size);
158   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
159     fsize =
160         (size % (fc->mtu - sizeof (struct FragmentHeader))) +
161         sizeof (struct FragmentHeader);
162   else
163     fsize = fc->mtu;
164   if (fc->tracker != NULL)
165     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
166   else
167     delay = GNUNET_TIME_UNIT_ZERO;
168   if (delay.rel_value > 0)
169   {
170     fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
171     return;
172   }
173   fc->next_transmission = (fc->next_transmission + 1) % 64;
174   wrap |= (fc->next_transmission == 0);
175
176   /* assemble fragmentation message */
177   mbuf = (const char *) &fc[1];
178   fh = (struct FragmentHeader *) msg;
179   fh->header.size = htons (fsize);
180   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
181   fh->fragment_id = htonl (fc->fragment_id);
182   fh->total_size = fc->msg->size;       /* already in big-endian */
183   fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
184   memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
185           fsize - sizeof (struct FragmentHeader));
186   if (NULL != fc->tracker)
187     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
188   GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1,
189                             GNUNET_NO);
190   if (0 != fc->last_round.abs_value)
191     GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1,
192                               GNUNET_NO);
193
194   /* select next message to calculate delay */
195   bit = fc->next_transmission;
196   size = ntohs (fc->msg->size);
197   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
198     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
199   else
200     fsize = fc->mtu;
201   if (NULL != fc->tracker)
202     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
203   else
204     delay = GNUNET_TIME_UNIT_ZERO;
205   if (wrap)
206   {
207     /* full round transmitted wait 2x delay for ACK before going again */
208     fc->num_rounds++;
209     delay =
210         GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
211                                   GNUNET_TIME_relative_multiply (fc->delay, fc->num_rounds));
212     /* never use zero, need some time for ACK always */
213     delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS, delay);
214     fc->last_round = GNUNET_TIME_absolute_get ();
215     fc->wack = GNUNET_YES;
216   }
217   fc->proc_busy = GNUNET_YES;
218   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
219   fc->proc (fc->proc_cls, &fh->header);
220 }
221
222
223 /**
224  * Create a fragmentation context for the given message.
225  * Fragments the message into fragments of size "mtu" or
226  * less.  Calls 'proc' on each un-acknowledged fragment,
227  * using both the expected 'delay' between messages and
228  * acknowledgements and the given 'tracker' to guide the
229  * frequency of calls to 'proc'.
230  *
231  * @param stats statistics context
232  * @param mtu the maximum message size for each fragment
233  * @param tracker bandwidth tracker to use for flow control (can be NULL)
234  * @param delay expected delay between fragment transmission
235  *              and ACK based on previous messages
236  * @param msg the message to fragment
237  * @param proc function to call for each fragment to transmit
238  * @param proc_cls closure for proc
239  * @return the fragmentation context
240  */
241 struct GNUNET_FRAGMENT_Context *
242 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
243                                 uint16_t mtu,
244                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
245                                 struct GNUNET_TIME_Relative delay,
246                                 const struct GNUNET_MessageHeader *msg,
247                                 GNUNET_FRAGMENT_MessageProcessor proc,
248                                 void *proc_cls)
249 {
250   struct GNUNET_FRAGMENT_Context *fc;
251   size_t size;
252   uint64_t bits;
253
254   GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO);
255   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
256   size = ntohs (msg->size);
257   GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"),
258                             size, GNUNET_NO);
259   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
260   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
261   fc->stats = stats;
262   fc->mtu = mtu;
263   fc->tracker = tracker;
264   fc->delay = delay;
265   fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
266   fc->proc = proc;
267   fc->proc_cls = proc_cls;
268   fc->fragment_id =
269       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
270   memcpy (&fc[1], msg, size);
271   bits =
272       (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
273                                                            sizeof (struct
274                                                                    FragmentHeader));
275   GNUNET_assert (bits <= 64);
276   if (bits == 64)
277     fc->acks_mask = UINT64_MAX; /* set all 64 bit */
278   else
279     fc->acks_mask = (1LL << bits) - 1;  /* set lowest 'bits' bit */
280   fc->acks = fc->acks_mask;
281   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
282   return fc;
283 }
284
285
286 /**
287  * Continuation to call from the 'proc' function after the fragment
288  * has been transmitted (and hence the next fragment can now be
289  * given to proc).
290  *
291  * @param fc fragmentation context
292  */
293 void
294 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
295 {
296   GNUNET_assert (fc->proc_busy == GNUNET_YES);
297   fc->proc_busy = GNUNET_NO;
298   GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
299   fc->task =
300       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
301                                     (fc->delay_until), &transmit_next, fc);
302 }
303
304
305 /**
306  * Process an acknowledgement message we got from the other
307  * side (to control re-transmits).
308  *
309  * @param fc fragmentation context
310  * @param msg acknowledgement message we received
311  * @return GNUNET_OK if this ack completes the work of the 'fc'
312  *                   (all fragments have been received);
313  *         GNUNET_NO if more messages are pending
314  *         GNUNET_SYSERR if this ack is not valid for this fc
315  */
316 int
317 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
318                              const struct GNUNET_MessageHeader *msg)
319 {
320   const struct FragmentAcknowledgement *fa;
321   uint64_t abits;
322   struct GNUNET_TIME_Relative ndelay;
323
324   if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
325   {
326     GNUNET_break_op (0);
327     return GNUNET_SYSERR;
328   }
329   fa = (const struct FragmentAcknowledgement *) msg;
330   if (ntohl (fa->fragment_id) != fc->fragment_id)
331     return GNUNET_SYSERR;       /* not our ACK */
332   abits = GNUNET_ntohll (fa->bits);
333   if ( (GNUNET_YES == fc->wack) &&
334        (abits == (fc->acks & abits)) )
335   {
336     /* normal ACK, can update running average of delay... */
337     fc->wack = GNUNET_NO;
338     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
339     fc->delay.rel_value = (ndelay.rel_value * fc->num_rounds + 3 * fc->delay.rel_value) / 4;
340   }
341   GNUNET_STATISTICS_update (fc->stats,
342                             _("# fragment acknowledgements received"), 1,
343                             GNUNET_NO);
344   if (abits != (fc->acks & abits))
345   {
346     /* ID collission or message reordering, count! This should be rare! */
347     GNUNET_STATISTICS_update (fc->stats,
348                               _("# bits removed from fragmentation ACKs"), 1,
349                               GNUNET_NO);
350   }
351   fc->acks = abits & fc->acks_mask;
352   if (0 != fc->acks)
353   {
354     /* more to transmit, do so right now (if tracker permits...) */
355     if (fc->task != GNUNET_SCHEDULER_NO_TASK)
356     {
357       /* schedule next transmission now, no point in waiting... */
358       GNUNET_SCHEDULER_cancel (fc->task);
359       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
360     }
361     else
362     {
363       /* only case where there is no task should be if we're waiting
364        * for the right to transmit again (proc_busy set to YES) */
365       GNUNET_assert (GNUNET_YES == fc->proc_busy);
366     }
367     return GNUNET_NO;
368   }
369
370   /* all done */
371   GNUNET_STATISTICS_update (fc->stats,
372                             _("# fragmentation transmissions completed"), 1,
373                             GNUNET_NO);
374   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
375   {
376     GNUNET_SCHEDULER_cancel (fc->task);
377     fc->task = GNUNET_SCHEDULER_NO_TASK;
378   }
379   return GNUNET_OK;
380 }
381
382
383 /**
384  * Destroy the given fragmentation context (stop calling 'proc', free
385  * resources).
386  *
387  * @param fc fragmentation context
388  * @return average delay between transmission and ACK for the
389  *         last message, FOREVER if the message was not fully transmitted
390  */
391 struct GNUNET_TIME_Relative
392 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
393 {
394   struct GNUNET_TIME_Relative ret;
395
396   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
397     GNUNET_SCHEDULER_cancel (fc->task);
398   ret = fc->delay;
399   GNUNET_free (fc);
400   return ret;
401 }
402
403
404 /* end of fragmentation.c */