fragging
[oweals/gnunet.git] / src / fragmentation / fragmentation_new.c
1 /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file src/fragmentation/fragmentation_new.c
22  * @brief library to help fragment messages
23  * @author Christian Grothoff
24  */
25
26 #include "platform.h"
27 #include "gnunet_fragmentation_lib.h"
28 #include "fragmentation.h"
29
30 /**
31  * Fragmentation context.
32  */
33 struct GNUNET_FRAGMENT_Context
34 {
35   /**
36    * Statistics to use.
37    */
38   struct GNUNET_STATISTICS_Handle *stats;
39
40   /**
41    * Tracker for flow control.
42    */
43   struct GNUNET_BANDWIDTH_Tracker *tracker;
44
45   /**
46    * Current expected delay for ACKs.
47    */
48   struct GNUNET_TIME_Relative delay;
49
50   /**
51    * Time we transmitted the last message of the last round.
52    */
53   struct GNUNET_TIME_Absolute last_round;
54
55   /**
56    * Message to fragment (allocated at the end of this struct).
57    */
58   const struct GNUNET_MessageHeader *msg;
59
60   /**
61    * Function to call for transmissions.
62    */
63   GNUNET_FRAGMENT_MessageProcessor proc;
64
65   /**
66    * Closure for 'proc'.
67    */
68   void *proc_cls;
69
70   /**
71    * Bitfield, set to 1 for each unacknowledged fragment.
72    */
73   uint64_t acks;
74
75   /**
76    * Task performing work for the fragmenter.
77    */
78   GNUNET_SCHEDULER_TaskIdentifier task;
79
80   /**
81    * Round-robin selector for the next transmission.
82    */
83   unsigned int next_transmission;
84
85   /**
86    * GNUNET_YES if we are waiting for an ACK.
87    */
88   int wack;
89
90   /**
91    * Target fragment size.
92    */
93   uint16_t mtu;
94   
95 };
96
97
98 /**
99  * Transmit the next fragment to the other peer.
100  *
101  * @param cls the 'struct GNUNET_FRAGMENT_Context'
102  * @param tc scheduler context
103  */
104 static void
105 transmit_next (void *cls,
106                const struct GNUNET_SCHEDULER_TaskContext *tc)
107 {
108   struct GNUNET_FRAGMENT_Context *fc = cls;
109   char msg[fc->mtu];
110   const char *mbuf;
111   struct FragmentHeader *fh;
112   struct GNUNET_TIME_Relative delay;
113   unsigned int bit;
114   size_t size;
115   size_t fsize;
116   int wrap;
117
118   fc->task = GNUNET_SCHEDULER_NO_TASK;
119   if (0 == fc->acks)
120     return; /* all done */
121
122   /* calculate delay */
123   wrap = 0;
124   while (0 == (fc->acks & (1 << fc->next_transmission)))    
125     {
126       fc->next_transmission = (fc->next_transmission + 1) % 64;
127       wrap |= (fc->next_transmission == 0);
128     }
129   bit = fc->next_transmission;
130   size = ntohs (fc->msg->size);
131   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
132     fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
133   else
134     fsize = fc->mtu;
135   delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
136                                               fsize);
137   if (delay.rel_value > 0)
138     {
139       fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
140                                                                                    fc->mtu),
141                                                &transmit_next,
142                                                fc);
143       return;
144     }
145   fc->next_transmission = (fc->next_transmission + 1) % 64;
146   wrap |= (fc->next_transmission == 0);
147
148   /* assemble fragmentation message */
149   mbuf = (const char*) &fc[1];
150   fh = (struct FragmentHeader*) msg;
151   fh->header.size = htons (fsize);
152   fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
153   /* FIXME: add specific ID info... */
154   memcpy (&fc[1],
155           &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], 
156           fsize - sizeof (struct FragmentHeader));
157   fc->proc (fc->proc_cls, &fh->header);
158   GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
159   GNUNET_STATISTICS_update (fc->stats,
160                             _("Fragments transmitted"),
161                             1, GNUNET_NO);
162   if (0 != fc->last_round.abs_value)
163     GNUNET_STATISTICS_update (fc->stats,
164                               _("Fragments retransmitted"),
165                               1, GNUNET_NO);
166
167   /* select next message to calculate delay */
168   bit = fc->next_transmission;
169   size = ntohs (fc->msg->size);
170   if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
171     fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
172   else
173     fsize = fc->mtu;
174   delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
175                                               fsize);
176   if (wrap)
177     {
178       /* full round transmitted wait 2x delay for ACK before going again */
179       delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
180                                         fc->delay);
181       fc->last_round = GNUNET_TIME_absolute_get ();
182       fc->wack = GNUNET_YES;
183     }
184   fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
185                                                                                fc->mtu),
186                                            &transmit_next,
187                                            fc);
188 }
189
190
191 /**
192  * Create a fragmentation context for the given message.
193  * Fragments the message into fragments of size "mtu" or
194  * less.  Calls 'proc' on each un-acknowledged fragment,
195  * using both the expected 'delay' between messages and
196  * acknowledgements and the given 'tracker' to guide the
197  * frequency of calls to 'proc'.
198  *
199  * @param stats statistics context
200  * @param mtu the maximum message size for each fragment
201  * @param tracker bandwidth tracker to use for flow control (can be NULL)
202  * @param delay expected delay between fragment transmission
203  *              and ACK based on previous messages
204  * @param msg the message to fragment
205  * @param proc function to call for each fragment to transmit
206  * @param proc_cls closure for proc
207  * @return the fragmentation context
208  */
209 struct GNUNET_FRAGMENT_Context *
210 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
211                                 uint16_t mtu,
212                                 struct GNUNET_BANDWIDTH_Tracker *tracker,
213                                 struct GNUNET_TIME_Relative delay,
214                                 const struct GNUNET_MessageHeader *msg,
215                                 GNUNET_FRAGMENT_MessageProcessor proc,
216                                 void *proc_cls)
217 {
218   struct GNUNET_FRAGMENT_Context *fc;
219   size_t size;
220   uint64_t bits;
221   
222   GNUNET_STATISTICS_update (stats,
223                             _("Messages fragmented"),
224                             1, GNUNET_NO);
225   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
226   size = ntohs (msg->size);
227   GNUNET_STATISTICS_update (stats,
228                             _("Total size of fragmented messages"),
229                             size, GNUNET_NO);
230   GNUNET_assert (size > mtu);
231   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
232   fc->stats = stats;
233   fc->mtu = mtu;
234   fc->tracker = tracker;
235   fc->delay = delay;
236   fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
237   fc->proc = proc;
238   fc->proc_cls = proc_cls;
239   memcpy (&fc[1], msg, size);
240   bits = (size + mtu - 1) / (mtu - sizeof (struct FragmentHeader));
241   GNUNET_assert (bits <= 64);
242   if (bits == 64)
243     fc->acks = UINT64_MAX;      /* set all 64 bit */
244   else
245     fc->acks = (1 << bits) - 1; /* set lowest 'bits' bit */
246   fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
247                                        fc);
248   return fc;
249 }
250
251
252 /**
253  * Process an acknowledgement message we got from the other
254  * side (to control re-transmits).
255  *
256  * @param fc fragmentation context
257  * @param msg acknowledgement message we received
258  * @return GNUNET_OK if this ack completes the work of the 'fc'
259  *                   (all fragments have been received);
260  *         GNUNET_NO if more messages are pending
261  *         GNUNET_SYSERR if this ack is not valid for this fc
262  */
263 int 
264 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
265                              const struct GNUNET_MessageHeader *msg)
266 {
267   const struct FragmentAcknowledgement *fa;
268   uint64_t abits;
269   struct GNUNET_TIME_Relative ndelay;
270
271   if (sizeof (struct FragmentAcknowledgement) !=
272       ntohs (msg->size))
273     {
274       GNUNET_break_op (0);
275       return GNUNET_SYSERR;
276     }
277   fa = (const struct FragmentAcknowledgement *) msg;
278   abits = GNUNET_ntohll (fa->bits);
279   /* FIXME: match FA to us... */
280
281   if (GNUNET_YES == fc->wack)
282     {
283       /* normal ACK, can update running average of delay... */
284       fc->wack = GNUNET_NO;
285       ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
286       fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
287     }
288     
289   fc->acks &= abits;
290   if (0 != fc->acks)
291     {
292       /* more to transmit, do so right now (if tracker permits...) */
293       GNUNET_SCHEDULER_cancel (fc->task);
294       fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
295                                            fc);
296       return GNUNET_NO;
297     }
298
299   /* all done */
300   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
301     {
302       GNUNET_SCHEDULER_cancel (fc->task);
303       fc->task = GNUNET_SCHEDULER_NO_TASK;
304     }
305   return GNUNET_OK;
306 }
307
308
309 /**
310  * Destroy the given fragmentation context (stop calling 'proc', free
311  * resources).
312  *
313  * @param fc fragmentation context
314  * @return average delay between transmission and ACK for the
315  *         last message, FOREVER if the message was not fully transmitted
316  */
317 struct GNUNET_TIME_Relative
318 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
319 {
320   struct GNUNET_TIME_Relative ret;
321
322   if (fc->task != GNUNET_SCHEDULER_NO_TASK)
323     GNUNET_SCHEDULER_cancel (fc->task);
324   ret = fc->delay;
325   GNUNET_free (fc);
326   return ret;
327 }
328
329 /* end of fragmentation_new.c */
330