77b4c2e495cafb4287ec3af4d25420435b04141c
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation_new.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Fragmentation context.
33  */
34 struct GNUNET_FRAGMENT_Context
35 {
36   /**
37    * Statistics to use.
38    */
39   struct GNUNET_STATISTICS_Handle *stats;
40
41   /**
42    * Tracker for flow control.
43    */
44   struct GNUNET_BANDWIDTH_Tracker *tracker;
45
46   /**
47    * Current expected delay for ACKs.
48    */
49   struct GNUNET_TIME_Relative delay;
50
51   /**
52    * Next allowed transmission time.
53    */
54   struct GNUNET_TIME_Absolute delay_until;
55
56   /**
57    * Time we transmitted the last message of the last round.
58    */
59   struct GNUNET_TIME_Absolute last_round;
60
61   /**
62    * Message to fragment (allocated at the end of this struct).
63    */
64   const struct GNUNET_MessageHeader *msg;
65
66   /**
67    * Function to call for transmissions.
68    */
69   GNUNET_FRAGMENT_MessageProcessor proc;
70
71   /**
72    * Closure for 'proc'.
73    */
74   void *proc_cls;
75
76   /**
77    * Bitfield, set to 1 for each unacknowledged fragment.
78    */
79   uint64_t acks;
80
81   /**
82    * Task performing work for the fragmenter.
83    */
84   GNUNET_SCHEDULER_TaskIdentifier task;
85
86   /**
87    * Our fragmentation ID. (chosen at random)
88    */
89   uint32_t fragment_id;
90
91   /**
92    * Round-robin selector for the next transmission.
93    */
94   unsigned int next_transmission;
95
96   /**
97    * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
98    */
99   int8_t proc_busy;
100
101   /**
102    * GNUNET_YES if we are waiting for an ACK.
103    */
104   int8_t wack;
105
106   /**
107    * Target fragment size.
108    */
109   uint16_t mtu;
110   
111 };
112
113
114 /**
115  * Transmit the next fragment to the other peer.
116  *
117  * @param cls the 'struct GNUNET_FRAGMENT_Context'
118  * @param tc scheduler context
119  */
120 static void
121 transmit_next (void *cls,
122                const struct GNUNET_SCHEDULER_TaskContext *tc)
123 {
124   struct GNUNET_FRAGMENT_Context *fc = cls;
125   char msg[fc->mtu];
126   const char *mbuf;
127   struct FragmentHeader *fh;
128   struct GNUNET_TIME_Relative delay;
129   unsigned int bit;
130   size_t size;
131   size_t fsize;
132   int wrap;
133
134   fc->task = GNUNET_SCHEDULER_NO_TASK;
135   GNUNET_assert (GNUNET_NO == fc->proc_busy);
136   if (0 == fc->acks)
137     return; /* all done */
138
139   /* calculate delay */
140   wrap = 0;
141   while (0 == (fc->acks & (1LL << fc->next_transmission)))    
142     {
143       fc->next_transmission = (fc->next_transmission + 1) % 64;
144       wrap |= (fc->next_transmission == 0);
145     }
146   bit = fc->next_transmission;
147   size = ntohs (fc->msg->size);
148   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
149     fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
150   else
151     fsize = fc->mtu;
152   if (fc->tracker != NULL)
153     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
154                                                 fsize);
155   else
156     delay = GNUNET_TIME_UNIT_ZERO;
157   if (delay.rel_value > 0)
158     {
159       fc->task = GNUNET_SCHEDULER_add_delayed (delay,
160                                                &transmit_next,
161                                                fc);
162       return;
163     }
164   fc->next_transmission = (fc->next_transmission + 1) % 64;
165   wrap |= (fc->next_transmission == 0);
166
167   /* assemble fragmentation message */
168   mbuf = (const char*) &fc[1];
169   fh = (struct FragmentHeader*) msg;
170   fh->header.size = htons (fsize);
171   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
172   fh->fragment_id = htonl (fc->fragment_id);
173   fh->total_size = fc->msg->size; /* already in big-endian */
174   fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
175   memcpy (&fh[1],
176           &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], 
177           fsize - sizeof (struct FragmentHeader));
178   if (NULL != fc->tracker)
179     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);    
180   GNUNET_STATISTICS_update (fc->stats,
181                             _("# fragments transmitted"),
182                             1, GNUNET_NO);
183   if (0 != fc->last_round.abs_value)
184     GNUNET_STATISTICS_update (fc->stats,
185                               _("# fragments retransmitted"),
186                               1, GNUNET_NO);
187
188   /* select next message to calculate delay */
189   bit = fc->next_transmission;
190   size = ntohs (fc->msg->size);
191   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
192     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
193   else
194     fsize = fc->mtu;
195   if (NULL != fc->tracker)
196     delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
197                                                 fsize);
198   else
199     delay = GNUNET_TIME_UNIT_ZERO;
200   if (wrap)
201     {
202       /* full round transmitted wait 2x delay for ACK before going again */
203       delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
204                                         fc->delay);
205       /* never use zero, need some time for ACK always */
206       delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS,
207                                         delay);
208       fc->last_round = GNUNET_TIME_absolute_get ();
209       fc->wack = GNUNET_YES;
210     }
211   fc->proc_busy = GNUNET_YES;
212   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
213   fc->proc (fc->proc_cls, &fh->header);
214 }
215
216
217 /**
218  * Create a fragmentation context for the given message.
219  * Fragments the message into fragments of size "mtu" or
220  * less.  Calls 'proc' on each un-acknowledged fragment,
221  * using both the expected 'delay' between messages and
222  * acknowledgements and the given 'tracker' to guide the
223  * frequency of calls to 'proc'.
224  *
225  * @param stats statistics context
226  * @param mtu the maximum message size for each fragment
227  * @param tracker bandwidth tracker to use for flow control (can be NULL)
228  * @param delay expected delay between fragment transmission
229  *              and ACK based on previous messages
230  * @param msg the message to fragment
231  * @param proc function to call for each fragment to transmit
232  * @param proc_cls closure for proc
233  * @return the fragmentation context
234  */
235 struct GNUNET_FRAGMENT_Context *
236 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
237                                 uint16_t mtu,
238                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
239                                 struct GNUNET_TIME_Relative delay,
240                                 const struct GNUNET_MessageHeader *msg,
241                                 GNUNET_FRAGMENT_MessageProcessor proc,
242                                 void *proc_cls)
243 {
244   struct GNUNET_FRAGMENT_Context *fc;
245   size_t size;
246   uint64_t bits;
247   
248   GNUNET_STATISTICS_update (stats,
249                             _("# messages fragmented"),
250                             1, GNUNET_NO);
251   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
252   size = ntohs (msg->size);
253   GNUNET_STATISTICS_update (stats,
254                             _("# total size of fragmented messages"),
255                             size, GNUNET_NO);
256   //GNUNET_assert (size > mtu);
257   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
258   fc->stats = stats;
259   fc->mtu = mtu;
260   fc->tracker = tracker;
261   fc->delay = delay;
262   fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
263   fc->proc = proc;
264   fc->proc_cls = proc_cls;
265   fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
266                                               UINT32_MAX);
267   memcpy (&fc[1], msg, size);
268   bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader));
269   GNUNET_assert (bits <= 64);
270   if (bits == 64)
271     fc->acks = UINT64_MAX;      /* set all 64 bit */
272   else
273     fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */
274   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
275                                        fc);
276   return fc;
277 }
278
279
280 /**
281  * Continuation to call from the 'proc' function after the fragment
282  * has been transmitted (and hence the next fragment can now be
283  * given to proc).
284  *
285  * @param fc fragmentation context
286  */
287 void
288 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
289 {
290   GNUNET_assert (fc->proc_busy == GNUNET_YES);
291   fc->proc_busy = GNUNET_NO;
292   GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
293   fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (fc->delay_until),
294                                            &transmit_next,
295                                            fc);
296 }
297
298
299 /**
300  * Process an acknowledgement message we got from the other
301  * side (to control re-transmits).
302  *
303  * @param fc fragmentation context
304  * @param msg acknowledgement message we received
305  * @return GNUNET_OK if this ack completes the work of the 'fc'
306  *                   (all fragments have been received);
307  *         GNUNET_NO if more messages are pending
308  *         GNUNET_SYSERR if this ack is not valid for this fc
309  */
310 int 
311 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
312                              const struct GNUNET_MessageHeader *msg)
313 {
314   const struct FragmentAcknowledgement *fa;
315   uint64_t abits;
316   struct GNUNET_TIME_Relative ndelay;
317
318   if (sizeof (struct FragmentAcknowledgement) !=
319       ntohs (msg->size))
320     {
321       GNUNET_break_op (0);
322       return GNUNET_SYSERR;
323     }
324   fa = (const struct FragmentAcknowledgement *) msg;
325   if (ntohl (fa->fragment_id) != fc->fragment_id)
326     return GNUNET_SYSERR; /* not our ACK */
327   abits = GNUNET_ntohll (fa->bits);
328   if (GNUNET_YES == fc->wack)
329     {
330       /* normal ACK, can update running average of delay... */
331       fc->wack = GNUNET_NO;
332       ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
333       fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
334     }
335   GNUNET_STATISTICS_update (fc->stats,
336                             _("# fragment acknowledgements received"),
337                             1,
338                             GNUNET_NO);
339   if (abits != (fc->acks & abits))
340     {
341       /* ID collission or message reordering, count! This should be rare! */
342       GNUNET_STATISTICS_update (fc->stats,
343                                 _("# bits removed from fragmentation ACKs"),
344                                 1, GNUNET_NO);
345     }
346   fc->acks = abits;
347   if (0 != fc->acks)
348     {
349       /* more to transmit, do so right now (if tracker permits...) */
350       if (fc->task != GNUNET_SCHEDULER_NO_TASK)
351         {
352           /* schedule next transmission now, no point in waiting... */
353           GNUNET_SCHEDULER_cancel (fc->task);
354           fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
355                                                fc);
356         }
357       else
358         {
359           /* only case where there is no task should be if we're waiting
360              for the right to transmit again (proc_busy set to YES) */
361           GNUNET_assert (GNUNET_YES == fc->proc_busy);
362         }
363       return GNUNET_NO;
364     }
365
366   /* all done */
367   GNUNET_STATISTICS_update (fc->stats,
368                             _("# fragmentation transmissions completed"),
369                             1,
370                             GNUNET_NO);
371   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
372     {
373       GNUNET_SCHEDULER_cancel (fc->task);
374       fc->task = GNUNET_SCHEDULER_NO_TASK;
375     }
376   return GNUNET_OK;
377 }
378
379
380 /**
381  * Destroy the given fragmentation context (stop calling 'proc', free
382  * resources).
383  *
384  * @param fc fragmentation context
385  * @return average delay between transmission and ACK for the
386  *         last message, FOREVER if the message was not fully transmitted
387  */
388 struct GNUNET_TIME_Relative
389 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
390 {
391   struct GNUNET_TIME_Relative ret;
392
393   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
394     GNUNET_SCHEDULER_cancel (fc->task);
395   ret = fc->delay;
396   GNUNET_free (fc);
397   return ret;
398 }
399
400
401 /* end of fragmentation_new.c */
402