fix memleak
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Absolute minimum delay we impose between sending and expecting ACK to arrive.
33  */
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
35
36
37 /**
38  * Fragmentation context.
39  */
40 struct GNUNET_FRAGMENT_Context
41 {
42   /**
43    * Statistics to use.
44    */
45   struct GNUNET_STATISTICS_Handle *stats;
46
47   /**
48    * Tracker for flow control.
49    */
50   struct GNUNET_BANDWIDTH_Tracker *tracker;
51
52   /**
53    * Current expected delay for ACKs.
54    */
55   struct GNUNET_TIME_Relative ack_delay;
56
57   /**
58    * Current expected delay between messages.
59    */
60   struct GNUNET_TIME_Relative msg_delay;
61
62   /**
63    * Next allowed transmission time.
64    */
65   struct GNUNET_TIME_Absolute delay_until;
66
67   /**
68    * Time we transmitted the last message of the last round.
69    */
70   struct GNUNET_TIME_Absolute last_round;
71
72   /**
73    * Message to fragment (allocated at the end of this struct).
74    */
75   const struct GNUNET_MessageHeader *msg;
76
77   /**
78    * Function to call for transmissions.
79    */
80   GNUNET_FRAGMENT_MessageProcessor proc;
81
82   /**
83    * Closure for 'proc'.
84    */
85   void *proc_cls;
86
87   /**
88    * Bitfield, set to 1 for each unacknowledged fragment.
89    */
90   uint64_t acks;
91
92   /**
93    * Bitfield with all possible bits for 'acks' (used to mask the
94    * ack we get back).
95    */
96   uint64_t acks_mask;
97
98   /**
99    * Task performing work for the fragmenter.
100    */
101   GNUNET_SCHEDULER_TaskIdentifier task;
102
103   /**
104    * Our fragmentation ID. (chosen at random)
105    */
106   uint32_t fragment_id;
107
108   /**
109    * Round-robin selector for the next transmission.
110    */
111   unsigned int next_transmission;
112
113   /**
114    * How many rounds of transmission have we completed so far?
115    */
116   unsigned int num_rounds;
117
118   /**
119    * How many transmission have we completed in this round?
120    */
121   unsigned int num_transmissions;
122
123   /**
124    * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
125    */
126   int8_t proc_busy;
127
128   /**
129    * GNUNET_YES if we are waiting for an ACK.
130    */
131   int8_t wack;
132
133   /**
134    * Target fragment size.
135    */
136   uint16_t mtu;
137
138 };
139
140
141 /**
142  * Transmit the next fragment to the other peer.
143  *
144  * @param cls the 'struct GNUNET_FRAGMENT_Context'
145  * @param tc scheduler context
146  */
147 static void
148 transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
149 {
150   struct GNUNET_FRAGMENT_Context *fc = cls;
151   char msg[fc->mtu];
152   const char *mbuf;
153   struct FragmentHeader *fh;
154   struct GNUNET_TIME_Relative delay;
155   unsigned int bit;
156   size_t size;
157   size_t fsize;
158   int wrap;
159
160   fc->task = GNUNET_SCHEDULER_NO_TASK;
161   GNUNET_assert (GNUNET_NO == fc->proc_busy);
162   if (0 == fc->acks)
163     return;                     /* all done */
164   /* calculate delay */
165   wrap = 0;
166   while (0 == (fc->acks & (1LL << fc->next_transmission)))
167   {
168     fc->next_transmission = (fc->next_transmission + 1) % 64;
169     wrap |= (0 == fc->next_transmission);
170   }
171   bit = fc->next_transmission;
172   size = ntohs (fc->msg->size);
173   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
174     fsize =
175         (size % (fc->mtu - sizeof (struct FragmentHeader))) +
176         sizeof (struct FragmentHeader);
177   else
178     fsize = fc->mtu;
179   if (NULL != fc->tracker)
180     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
181   else
182     delay = GNUNET_TIME_UNIT_ZERO;
183   if (delay.rel_value > 0)
184   {
185     fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
186     return;
187   }
188   fc->next_transmission = (fc->next_transmission + 1) % 64;
189   wrap |= (0 == fc->next_transmission);
190   while (0 == (fc->acks & (1LL << fc->next_transmission)))
191   {
192     fc->next_transmission = (fc->next_transmission + 1) % 64;
193     wrap |= (0 == fc->next_transmission);
194   }
195
196   /* assemble fragmentation message */
197   mbuf = (const char *) &fc[1];
198   fh = (struct FragmentHeader *) msg;
199   fh->header.size = htons (fsize);
200   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
201   fh->fragment_id = htonl (fc->fragment_id);
202   fh->total_size = fc->msg->size;       /* already in big-endian */
203   fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
204   memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
205           fsize - sizeof (struct FragmentHeader));
206   if (NULL != fc->tracker)
207     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
208   GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1,
209                             GNUNET_NO);
210   if (0 != fc->last_round.abs_value)
211     GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1,
212                               GNUNET_NO);
213
214   /* select next message to calculate delay */
215   bit = fc->next_transmission;
216   size = ntohs (fc->msg->size);
217   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
218     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
219   else
220     fsize = fc->mtu;
221   if (NULL != fc->tracker)
222     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
223   else
224     delay = GNUNET_TIME_UNIT_ZERO;
225   delay = GNUNET_TIME_relative_max (delay,
226                                     GNUNET_TIME_relative_multiply (fc->msg_delay,
227                                                                    (1 << fc->num_rounds)));
228   if (wrap)
229   {
230     /* full round transmitted wait 2x delay for ACK before going again */
231     fc->num_rounds++;
232     delay = GNUNET_TIME_relative_multiply (fc->ack_delay, 2);
233     /* never use zero, need some time for ACK always */
234     delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);    
235     fc->wack = GNUNET_YES;
236     fc->last_round = GNUNET_TIME_absolute_get ();
237     GNUNET_STATISTICS_update (fc->stats, _("# fragments wrap arounds"), 1,
238                               GNUNET_NO);
239   }
240   fc->proc_busy = GNUNET_YES;
241   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
242   fc->num_transmissions++;
243   fc->proc (fc->proc_cls, &fh->header);
244 }
245
246
247 /**
248  * Create a fragmentation context for the given message.
249  * Fragments the message into fragments of size "mtu" or
250  * less.  Calls 'proc' on each un-acknowledged fragment,
251  * using both the expected 'delay' between messages and
252  * acknowledgements and the given 'tracker' to guide the
253  * frequency of calls to 'proc'.
254  *
255  * @param stats statistics context
256  * @param mtu the maximum message size for each fragment
257  * @param tracker bandwidth tracker to use for flow control (can be NULL)
258  * @param msg_delay initial delay to insert between fragment transmissions
259  *              based on previous messages
260  * @param ack_delay expected delay between fragment transmission
261  *              and ACK based on previous messages
262  * @param msg the message to fragment
263  * @param proc function to call for each fragment to transmit
264  * @param proc_cls closure for proc
265  * @return the fragmentation context
266  */
267 struct GNUNET_FRAGMENT_Context *
268 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
269                                 uint16_t mtu,
270                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
271                                 struct GNUNET_TIME_Relative msg_delay,
272                                 struct GNUNET_TIME_Relative ack_delay,
273                                 const struct GNUNET_MessageHeader *msg,
274                                 GNUNET_FRAGMENT_MessageProcessor proc,
275                                 void *proc_cls)
276 {
277   struct GNUNET_FRAGMENT_Context *fc;
278   size_t size;
279   uint64_t bits;
280   
281   GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO);
282   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
283   size = ntohs (msg->size);
284   GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"),
285                             size, GNUNET_NO);
286   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
287   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
288   fc->stats = stats;
289   fc->mtu = mtu;
290   fc->tracker = tracker;
291   fc->ack_delay = ack_delay;
292   fc->msg_delay = msg_delay;
293   fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
294   fc->proc = proc;
295   fc->proc_cls = proc_cls;
296   fc->fragment_id =
297       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
298   memcpy (&fc[1], msg, size);
299   bits =
300       (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
301                                                            sizeof (struct
302                                                                    FragmentHeader));
303   GNUNET_assert (bits <= 64);
304   if (bits == 64)
305     fc->acks_mask = UINT64_MAX; /* set all 64 bit */
306   else
307     fc->acks_mask = (1LL << bits) - 1;  /* set lowest 'bits' bit */
308   fc->acks = fc->acks_mask;
309   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
310   return fc;
311 }
312
313
314 /**
315  * Continuation to call from the 'proc' function after the fragment
316  * has been transmitted (and hence the next fragment can now be
317  * given to proc).
318  *
319  * @param fc fragmentation context
320  */
321 void
322 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
323 {
324   GNUNET_assert (fc->proc_busy == GNUNET_YES);
325   fc->proc_busy = GNUNET_NO;
326   GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
327   fc->task =
328       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
329                                     (fc->delay_until), &transmit_next, fc);
330 }
331
332
333 /**
334  * Process an acknowledgement message we got from the other
335  * side (to control re-transmits).
336  *
337  * @param fc fragmentation context
338  * @param msg acknowledgement message we received
339  * @return GNUNET_OK if this ack completes the work of the 'fc'
340  *                   (all fragments have been received);
341  *         GNUNET_NO if more messages are pending
342  *         GNUNET_SYSERR if this ack is not valid for this fc
343  */
344 int
345 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
346                              const struct GNUNET_MessageHeader *msg)
347 {
348   const struct FragmentAcknowledgement *fa;
349   uint64_t abits;
350   struct GNUNET_TIME_Relative ndelay;
351   unsigned int ack_cnt;
352   unsigned int snd_cnt;
353   unsigned int i;
354
355   if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
356   {
357     GNUNET_break_op (0);
358     return GNUNET_SYSERR;
359   }
360   fa = (const struct FragmentAcknowledgement *) msg;
361   if (ntohl (fa->fragment_id) != fc->fragment_id)
362     return GNUNET_SYSERR;       /* not our ACK */
363   abits = GNUNET_ntohll (fa->bits);
364   if ( (GNUNET_YES == fc->wack) &&
365        (0 != fc->num_transmissions) )
366   {
367     /* normal ACK, can update running average of delay... */
368     fc->wack = GNUNET_NO;
369     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
370     fc->ack_delay.rel_value =
371         (ndelay.rel_value / fc->num_transmissions + 3 * fc->ack_delay.rel_value) / 4;    
372     fc->num_transmissions = 0;
373     /* calculate ratio msg sent vs. msg acked */
374     ack_cnt = 0;
375     snd_cnt = 0;
376     for (i=0;i<64;i++)
377     {
378       if (1 == (fc->acks_mask & (1 << i)))
379       {
380         snd_cnt++;
381         if (0 == (abits & (1 << i)))
382           ack_cnt++;
383       }
384     }
385     if (0 == ack_cnt)
386     {
387       /* complete loss */
388       fc->msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay, 
389                                                      snd_cnt);      
390     }
391     else if (snd_cnt > ack_cnt)
392     {
393       /* some loss, slow down proportionally */
394       fprintf (stderr, "Prop loss\n");
395       fc->msg_delay.rel_value = ((fc->msg_delay.rel_value * ack_cnt) / snd_cnt);
396     }
397     else if (1 < fc->msg_delay.rel_value)
398     {
399       fc->msg_delay.rel_value--; /* try a bit faster */
400     }
401     fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
402                                               GNUNET_TIME_UNIT_SECONDS);
403   }
404   GNUNET_STATISTICS_update (fc->stats,
405                             _("# fragment acknowledgements received"), 1,
406                             GNUNET_NO);
407   if (abits != (fc->acks & abits))
408   {
409     /* ID collission or message reordering, count! This should be rare! */
410     GNUNET_STATISTICS_update (fc->stats,
411                               _("# bits removed from fragmentation ACKs"), 1,
412                               GNUNET_NO);
413   }
414   fc->acks = abits & fc->acks_mask;
415   if (0 != fc->acks)
416   {
417     /* more to transmit, do so right now (if tracker permits...) */
418     if (fc->task != GNUNET_SCHEDULER_NO_TASK)
419     {
420       /* schedule next transmission now, no point in waiting... */
421       GNUNET_SCHEDULER_cancel (fc->task);
422       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
423     }
424     else
425     {
426       /* only case where there is no task should be if we're waiting
427        * for the right to transmit again (proc_busy set to YES) */
428       GNUNET_assert (GNUNET_YES == fc->proc_busy);
429     }
430     return GNUNET_NO;
431   }
432
433   /* all done */
434   GNUNET_STATISTICS_update (fc->stats,
435                             _("# fragmentation transmissions completed"), 1,
436                             GNUNET_NO);
437   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
438   {
439     GNUNET_SCHEDULER_cancel (fc->task);
440     fc->task = GNUNET_SCHEDULER_NO_TASK;
441   }
442   return GNUNET_OK;
443 }
444
445
446 /**
447  * Destroy the given fragmentation context (stop calling 'proc', free
448  * resources).
449  *
450  * @param fc fragmentation context
451  * @param msg_delay where to store average delay between individual message transmissions the
452  *         last message (OUT only)
453  * @param ack_delay where to store average delay between transmission and ACK for the
454  *         last message, set to FOREVER if the message was not fully transmitted (OUT only)
455  */
456 void
457 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
458                                  struct GNUNET_TIME_Relative *msg_delay,
459                                  struct GNUNET_TIME_Relative *ack_delay)
460 {
461   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
462     GNUNET_SCHEDULER_cancel (fc->task);
463   if (NULL != ack_delay)
464     *ack_delay = fc->ack_delay;
465   if (NULL != msg_delay)
466     *msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay,
467                                                 fc->num_rounds);
468   GNUNET_free (fc);
469 }
470
471
472 /* end of fragmentation.c */