2 This file is part of GNUnet
3 (C) 2009, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file src/fragmentation/fragmentation_new.c
22 * @brief library to help fragment messages
23 * @author Christian Grothoff
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
32 * Fragmentation context.
34 struct GNUNET_FRAGMENT_Context
39 struct GNUNET_STATISTICS_Handle *stats;
42 * Tracker for flow control.
44 struct GNUNET_BANDWIDTH_Tracker *tracker;
47 * Current expected delay for ACKs.
49 struct GNUNET_TIME_Relative delay;
52 * Next allowed transmission time.
54 struct GNUNET_TIME_Absolute delay_until;
57 * Time we transmitted the last message of the last round.
59 struct GNUNET_TIME_Absolute last_round;
62 * Message to fragment (allocated at the end of this struct).
64 const struct GNUNET_MessageHeader *msg;
67 * Function to call for transmissions.
69 GNUNET_FRAGMENT_MessageProcessor proc;
77 * Bitfield, set to 1 for each unacknowledged fragment.
82 * Task performing work for the fragmenter.
84 GNUNET_SCHEDULER_TaskIdentifier task;
87 * Our fragmentation ID. (chosen at random)
92 * Round-robin selector for the next transmission.
94 unsigned int next_transmission;
97 * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
102 * GNUNET_YES if we are waiting for an ACK.
107 * Target fragment size.
115 * Transmit the next fragment to the other peer.
117 * @param cls the 'struct GNUNET_FRAGMENT_Context'
118 * @param tc scheduler context
121 transmit_next (void *cls,
122 const struct GNUNET_SCHEDULER_TaskContext *tc)
124 struct GNUNET_FRAGMENT_Context *fc = cls;
127 struct FragmentHeader *fh;
128 struct GNUNET_TIME_Relative delay;
134 fc->task = GNUNET_SCHEDULER_NO_TASK;
135 GNUNET_assert (GNUNET_NO == fc->proc_busy);
137 return; /* all done */
139 /* calculate delay */
141 while (0 == (fc->acks & (1LL << fc->next_transmission)))
143 fc->next_transmission = (fc->next_transmission + 1) % 64;
144 wrap |= (fc->next_transmission == 0);
146 bit = fc->next_transmission;
147 size = ntohs (fc->msg->size);
148 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
149 fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
152 if (fc->tracker != NULL)
153 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
156 delay = GNUNET_TIME_UNIT_ZERO;
157 if (delay.rel_value > 0)
159 fc->task = GNUNET_SCHEDULER_add_delayed (delay,
164 fc->next_transmission = (fc->next_transmission + 1) % 64;
165 wrap |= (fc->next_transmission == 0);
167 /* assemble fragmentation message */
168 mbuf = (const char*) &fc[1];
169 fh = (struct FragmentHeader*) msg;
170 fh->header.size = htons (fsize);
171 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
172 fh->fragment_id = htonl (fc->fragment_id);
173 fh->total_size = fc->msg->size; /* already in big-endian */
174 fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
176 &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
177 fsize - sizeof (struct FragmentHeader));
178 if (NULL != fc->tracker)
179 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
180 GNUNET_STATISTICS_update (fc->stats,
181 _("# fragments transmitted"),
183 if (0 != fc->last_round.abs_value)
184 GNUNET_STATISTICS_update (fc->stats,
185 _("# fragments retransmitted"),
188 /* select next message to calculate delay */
189 bit = fc->next_transmission;
190 size = ntohs (fc->msg->size);
191 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
192 fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
195 if (NULL != fc->tracker)
196 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
199 delay = GNUNET_TIME_UNIT_ZERO;
202 /* full round transmitted wait 2x delay for ACK before going again */
203 delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
205 /* never use zero, need some time for ACK always */
206 delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS,
208 fc->last_round = GNUNET_TIME_absolute_get ();
209 fc->wack = GNUNET_YES;
211 fc->proc_busy = GNUNET_YES;
212 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
213 fc->proc (fc->proc_cls, &fh->header);
218 * Create a fragmentation context for the given message.
219 * Fragments the message into fragments of size "mtu" or
220 * less. Calls 'proc' on each un-acknowledged fragment,
221 * using both the expected 'delay' between messages and
222 * acknowledgements and the given 'tracker' to guide the
223 * frequency of calls to 'proc'.
225 * @param stats statistics context
226 * @param mtu the maximum message size for each fragment
227 * @param tracker bandwidth tracker to use for flow control (can be NULL)
228 * @param delay expected delay between fragment transmission
229 * and ACK based on previous messages
230 * @param msg the message to fragment
231 * @param proc function to call for each fragment to transmit
232 * @param proc_cls closure for proc
233 * @return the fragmentation context
235 struct GNUNET_FRAGMENT_Context *
236 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
238 struct GNUNET_BANDWIDTH_Tracker *tracker,
239 struct GNUNET_TIME_Relative delay,
240 const struct GNUNET_MessageHeader *msg,
241 GNUNET_FRAGMENT_MessageProcessor proc,
244 struct GNUNET_FRAGMENT_Context *fc;
248 GNUNET_STATISTICS_update (stats,
249 _("# messages fragmented"),
251 GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
252 size = ntohs (msg->size);
253 GNUNET_STATISTICS_update (stats,
254 _("# total size of fragmented messages"),
256 //GNUNET_assert (size > mtu);
257 fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
260 fc->tracker = tracker;
262 fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
264 fc->proc_cls = proc_cls;
265 fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
267 memcpy (&fc[1], msg, size);
268 bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader));
269 GNUNET_assert (bits <= 64);
271 fc->acks = UINT64_MAX; /* set all 64 bit */
273 fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */
274 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
281 * Continuation to call from the 'proc' function after the fragment
282 * has been transmitted (and hence the next fragment can now be
285 * @param fc fragmentation context
288 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
290 GNUNET_assert (fc->proc_busy == GNUNET_YES);
291 fc->proc_busy = GNUNET_NO;
292 GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
293 fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (fc->delay_until),
300 * Process an acknowledgement message we got from the other
301 * side (to control re-transmits).
303 * @param fc fragmentation context
304 * @param msg acknowledgement message we received
305 * @return GNUNET_OK if this ack completes the work of the 'fc'
306 * (all fragments have been received);
307 * GNUNET_NO if more messages are pending
308 * GNUNET_SYSERR if this ack is not valid for this fc
311 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
312 const struct GNUNET_MessageHeader *msg)
314 const struct FragmentAcknowledgement *fa;
316 struct GNUNET_TIME_Relative ndelay;
318 if (sizeof (struct FragmentAcknowledgement) !=
322 return GNUNET_SYSERR;
324 fa = (const struct FragmentAcknowledgement *) msg;
325 if (ntohl (fa->fragment_id) != fc->fragment_id)
326 return GNUNET_SYSERR; /* not our ACK */
327 abits = GNUNET_ntohll (fa->bits);
328 if (GNUNET_YES == fc->wack)
330 /* normal ACK, can update running average of delay... */
331 fc->wack = GNUNET_NO;
332 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
333 fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
335 GNUNET_STATISTICS_update (fc->stats,
336 _("# fragment acknowledgements received"),
339 if (abits != (fc->acks & abits))
341 /* ID collission or message reordering, count! This should be rare! */
342 GNUNET_STATISTICS_update (fc->stats,
343 _("# bits removed from fragmentation ACKs"),
349 /* more to transmit, do so right now (if tracker permits...) */
350 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
352 /* schedule next transmission now, no point in waiting... */
353 GNUNET_SCHEDULER_cancel (fc->task);
354 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
359 /* only case where there is no task should be if we're waiting
360 for the right to transmit again (proc_busy set to YES) */
361 GNUNET_assert (GNUNET_YES == fc->proc_busy);
367 GNUNET_STATISTICS_update (fc->stats,
368 _("# fragmentation transmissions completed"),
371 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
373 GNUNET_SCHEDULER_cancel (fc->task);
374 fc->task = GNUNET_SCHEDULER_NO_TASK;
381 * Destroy the given fragmentation context (stop calling 'proc', free
384 * @param fc fragmentation context
385 * @return average delay between transmission and ACK for the
386 * last message, FOREVER if the message was not fully transmitted
388 struct GNUNET_TIME_Relative
389 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
391 struct GNUNET_TIME_Relative ret;
393 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
394 GNUNET_SCHEDULER_cancel (fc->task);
401 /* end of fragmentation_new.c */