6a2747e4d1bc8e3cf05cb7f0ef420cc4d1f35e6d
[oweals/gnunet.git] / src / fragmentation / fragmentation_new.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation_new.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29
30
31 /**
32  * Fragmentation context.
33  */
34 struct GNUNET_FRAGMENT_Context
35 {
36   /**
37    * Statistics to use.
38    */
39   struct GNUNET_STATISTICS_Handle *stats;
40
41   /**
42    * Tracker for flow control.
43    */
44   struct GNUNET_BANDWIDTH_Tracker *tracker;
45
46   /**
47    * Current expected delay for ACKs.
48    */
49   struct GNUNET_TIME_Relative delay;
50
51   /**
52    * Time we transmitted the last message of the last round.
53    */
54   struct GNUNET_TIME_Absolute last_round;
55
56   /**
57    * Message to fragment (allocated at the end of this struct).
58    */
59   const struct GNUNET_MessageHeader *msg;
60
61   /**
62    * Function to call for transmissions.
63    */
64   GNUNET_FRAGMENT_MessageProcessor proc;
65
66   /**
67    * Closure for 'proc'.
68    */
69   void *proc_cls;
70
71   /**
72    * Bitfield, set to 1 for each unacknowledged fragment.
73    */
74   uint64_t acks;
75
76   /**
77    * Task performing work for the fragmenter.
78    */
79   GNUNET_SCHEDULER_TaskIdentifier task;
80
81   /**
82    * Our fragmentation ID. (chosen at random)
83    */
84   uint32_t fragment_id;
85
86   /**
87    * Round-robin selector for the next transmission.
88    */
89   unsigned int next_transmission;
90
91   /**
92    * GNUNET_YES if we are waiting for an ACK.
93    */
94   int wack;
95
96   /**
97    * Target fragment size.
98    */
99   uint16_t mtu;
100   
101 };
102
103
104 /**
105  * Transmit the next fragment to the other peer.
106  *
107  * @param cls the 'struct GNUNET_FRAGMENT_Context'
108  * @param tc scheduler context
109  */
110 static void
111 transmit_next (void *cls,
112                const struct GNUNET_SCHEDULER_TaskContext *tc)
113 {
114   struct GNUNET_FRAGMENT_Context *fc = cls;
115   char msg[fc->mtu];
116   const char *mbuf;
117   struct FragmentHeader *fh;
118   struct GNUNET_TIME_Relative delay;
119   unsigned int bit;
120   size_t size;
121   size_t fsize;
122   int wrap;
123
124   fc->task = GNUNET_SCHEDULER_NO_TASK;
125   if (0 == fc->acks)
126     return; /* all done */
127
128   /* calculate delay */
129   wrap = 0;
130   while (0 == (fc->acks & (1 << fc->next_transmission)))    
131     {
132       fc->next_transmission = (fc->next_transmission + 1) % 64;
133       wrap |= (fc->next_transmission == 0);
134     }
135   bit = fc->next_transmission;
136   size = ntohs (fc->msg->size);
137   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
138     fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
139   else
140     fsize = fc->mtu;
141   delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
142                                               fsize);
143   if (delay.rel_value > 0)
144     {
145       fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
146                                                                                    fc->mtu),
147                                                &transmit_next,
148                                                fc);
149       return;
150     }
151   fc->next_transmission = (fc->next_transmission + 1) % 64;
152   wrap |= (fc->next_transmission == 0);
153
154   /* assemble fragmentation message */
155   mbuf = (const char*) &fc[1];
156   fh = (struct FragmentHeader*) msg;
157   fh->header.size = htons (fsize);
158   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
159   fh->fragment_id = htonl (fc->fragment_id);
160   fh->total_size = fc->msg->size; /* already in big-endian */
161   fh->offset = htons (fc->mtu * bit);
162   memcpy (&fc[1],
163           &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], 
164           fsize - sizeof (struct FragmentHeader));
165   fc->proc (fc->proc_cls, &fh->header);
166   GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
167   GNUNET_STATISTICS_update (fc->stats,
168                             _("Fragments transmitted"),
169                             1, GNUNET_NO);
170   if (0 != fc->last_round.abs_value)
171     GNUNET_STATISTICS_update (fc->stats,
172                               _("Fragments retransmitted"),
173                               1, GNUNET_NO);
174
175   /* select next message to calculate delay */
176   bit = fc->next_transmission;
177   size = ntohs (fc->msg->size);
178   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
179     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
180   else
181     fsize = fc->mtu;
182   delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
183                                               fsize);
184   if (wrap)
185     {
186       /* full round transmitted wait 2x delay for ACK before going again */
187       delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
188                                         fc->delay);
189       fc->last_round = GNUNET_TIME_absolute_get ();
190       fc->wack = GNUNET_YES;
191     }
192   fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
193                                                                                fc->mtu),
194                                            &transmit_next,
195                                            fc);
196 }
197
198
199 /**
200  * Create a fragmentation context for the given message.
201  * Fragments the message into fragments of size "mtu" or
202  * less.  Calls 'proc' on each un-acknowledged fragment,
203  * using both the expected 'delay' between messages and
204  * acknowledgements and the given 'tracker' to guide the
205  * frequency of calls to 'proc'.
206  *
207  * @param stats statistics context
208  * @param mtu the maximum message size for each fragment
209  * @param tracker bandwidth tracker to use for flow control (can be NULL)
210  * @param delay expected delay between fragment transmission
211  *              and ACK based on previous messages
212  * @param msg the message to fragment
213  * @param proc function to call for each fragment to transmit
214  * @param proc_cls closure for proc
215  * @return the fragmentation context
216  */
217 struct GNUNET_FRAGMENT_Context *
218 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
219                                 uint16_t mtu,
220                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
221                                 struct GNUNET_TIME_Relative delay,
222                                 const struct GNUNET_MessageHeader *msg,
223                                 GNUNET_FRAGMENT_MessageProcessor proc,
224                                 void *proc_cls)
225 {
226   struct GNUNET_FRAGMENT_Context *fc;
227   size_t size;
228   uint64_t bits;
229   
230   GNUNET_STATISTICS_update (stats,
231                             _("Messages fragmented"),
232                             1, GNUNET_NO);
233   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
234   size = ntohs (msg->size);
235   GNUNET_STATISTICS_update (stats,
236                             _("Total size of fragmented messages"),
237                             size, GNUNET_NO);
238   GNUNET_assert (size > mtu);
239   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
240   fc->stats = stats;
241   fc->mtu = mtu;
242   fc->tracker = tracker;
243   fc->delay = delay;
244   fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
245   fc->proc = proc;
246   fc->proc_cls = proc_cls;
247   fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
248                                               UINT32_MAX);
249   memcpy (&fc[1], msg, size);
250   bits = (size + mtu - 1) / (mtu - sizeof (struct FragmentHeader));
251   GNUNET_assert (bits <= 64);
252   if (bits == 64)
253     fc->acks = UINT64_MAX;      /* set all 64 bit */
254   else
255     fc->acks = (1 << bits) - 1; /* set lowest 'bits' bit */
256   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
257                                        fc);
258   return fc;
259 }
260
261
262 /**
263  * Process an acknowledgement message we got from the other
264  * side (to control re-transmits).
265  *
266  * @param fc fragmentation context
267  * @param msg acknowledgement message we received
268  * @return GNUNET_OK if this ack completes the work of the 'fc'
269  *                   (all fragments have been received);
270  *         GNUNET_NO if more messages are pending
271  *         GNUNET_SYSERR if this ack is not valid for this fc
272  */
273 int 
274 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
275                              const struct GNUNET_MessageHeader *msg)
276 {
277   const struct FragmentAcknowledgement *fa;
278   uint64_t abits;
279   struct GNUNET_TIME_Relative ndelay;
280
281   if (sizeof (struct FragmentAcknowledgement) !=
282       ntohs (msg->size))
283     {
284       GNUNET_break_op (0);
285       return GNUNET_SYSERR;
286     }
287   fa = (const struct FragmentAcknowledgement *) msg;
288   if (ntohl (fa->fragment_id) != fc->fragment_id)
289     return GNUNET_SYSERR; /* not our ACK */
290   abits = GNUNET_ntohll (fa->bits);
291   if (GNUNET_YES == fc->wack)
292     {
293       /* normal ACK, can update running average of delay... */
294       fc->wack = GNUNET_NO;
295       ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
296       fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
297     }
298     
299   fc->acks &= abits;
300   if (0 != fc->acks)
301     {
302       /* more to transmit, do so right now (if tracker permits...) */
303       GNUNET_SCHEDULER_cancel (fc->task);
304       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
305                                            fc);
306       return GNUNET_NO;
307     }
308
309   /* all done */
310   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
311     {
312       GNUNET_SCHEDULER_cancel (fc->task);
313       fc->task = GNUNET_SCHEDULER_NO_TASK;
314     }
315   return GNUNET_OK;
316 }
317
318
319 /**
320  * Destroy the given fragmentation context (stop calling 'proc', free
321  * resources).
322  *
323  * @param fc fragmentation context
324  * @return average delay between transmission and ACK for the
325  *         last message, FOREVER if the message was not fully transmitted
326  */
327 struct GNUNET_TIME_Relative
328 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
329 {
330   struct GNUNET_TIME_Relative ret;
331
332   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
333     GNUNET_SCHEDULER_cancel (fc->task);
334   ret = fc->delay;
335   GNUNET_free (fc);
336   return ret;
337 }
338
339
340 /* end of fragmentation_new.c */
341