2 This file is part of GNUnet
3 Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file src/fragmentation/fragmentation.c
22 * @brief library to help fragment messages
23 * @author Christian Grothoff
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
32 * Absolute minimum delay we impose between sending and expecting ACK to arrive.
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
38 * Fragmentation context.
40 struct GNUNET_FRAGMENT_Context
45 struct GNUNET_STATISTICS_Handle *stats;
48 * Tracker for flow control.
50 struct GNUNET_BANDWIDTH_Tracker *tracker;
53 * Current expected delay for ACKs.
55 struct GNUNET_TIME_Relative ack_delay;
58 * Current expected delay between messages.
60 struct GNUNET_TIME_Relative msg_delay;
63 * Next allowed transmission time.
65 struct GNUNET_TIME_Absolute delay_until;
68 * Time we transmitted the last message of the last round.
70 struct GNUNET_TIME_Absolute last_round;
73 * Message to fragment (allocated at the end of this struct).
75 const struct GNUNET_MessageHeader *msg;
78 * Function to call for transmissions.
80 GNUNET_FRAGMENT_MessageProcessor proc;
83 * Closure for @e proc.
88 * Bitfield, set to 1 for each unacknowledged fragment.
93 * Bitfield with all possible bits for @e acks (used to mask the
99 * Task performing work for the fragmenter.
101 struct GNUNET_SCHEDULER_Task *task;
104 * Our fragmentation ID. (chosen at random)
106 uint32_t fragment_id;
109 * Round-robin selector for the next transmission.
111 unsigned int next_transmission;
114 * How many rounds of transmission have we completed so far?
116 unsigned int num_rounds;
119 * How many transmission have we completed in this round?
121 unsigned int num_transmissions;
124 * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
129 * #GNUNET_YES if we are waiting for an ACK.
134 * Target fragment size.
142 * Convert an ACK message to a printable format suitable for logging.
144 * @param ack message to print
145 * @return ack in human-readable format
148 GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
150 static char buf[128];
151 const struct FragmentAcknowledgement *fa;
153 if (sizeof (struct FragmentAcknowledgement) !=
155 return "<malformed ack>";
156 fa = (const struct FragmentAcknowledgement *) ack;
157 GNUNET_snprintf (buf,
160 ntohl (fa->fragment_id),
161 GNUNET_ntohll (fa->bits));
167 * Transmit the next fragment to the other peer.
169 * @param cls the `struct GNUNET_FRAGMENT_Context`
170 * @param tc scheduler context
173 transmit_next (void *cls,
174 const struct GNUNET_SCHEDULER_TaskContext *tc)
176 struct GNUNET_FRAGMENT_Context *fc = cls;
179 struct FragmentHeader *fh;
180 struct GNUNET_TIME_Relative delay;
187 GNUNET_assert (GNUNET_NO == fc->proc_busy);
189 return; /* all done */
190 /* calculate delay */
192 while (0 == (fc->acks & (1LL << fc->next_transmission)))
194 fc->next_transmission = (fc->next_transmission + 1) % 64;
195 wrap |= (0 == fc->next_transmission);
197 bit = fc->next_transmission;
198 size = ntohs (fc->msg->size);
199 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
201 (size % (fc->mtu - sizeof (struct FragmentHeader))) +
202 sizeof (struct FragmentHeader);
205 if (NULL != fc->tracker)
206 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
208 delay = GNUNET_TIME_UNIT_ZERO;
209 if (delay.rel_value_us > 0)
211 fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
214 fc->next_transmission = (fc->next_transmission + 1) % 64;
215 wrap |= (0 == fc->next_transmission);
216 while (0 == (fc->acks & (1LL << fc->next_transmission)))
218 fc->next_transmission = (fc->next_transmission + 1) % 64;
219 wrap |= (0 == fc->next_transmission);
222 /* assemble fragmentation message */
223 mbuf = (const char *) &fc[1];
224 fh = (struct FragmentHeader *) msg;
225 fh->header.size = htons (fsize);
226 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
227 fh->fragment_id = htonl (fc->fragment_id);
228 fh->total_size = fc->msg->size; /* already in big-endian */
229 fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
230 memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
231 fsize - sizeof (struct FragmentHeader));
232 if (NULL != fc->tracker)
233 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
234 GNUNET_STATISTICS_update (fc->stats,
235 _("# fragments transmitted"),
238 if (0 != fc->last_round.abs_value_us)
239 GNUNET_STATISTICS_update (fc->stats,
240 _("# fragments retransmitted"),
244 /* select next message to calculate delay */
245 bit = fc->next_transmission;
246 size = ntohs (fc->msg->size);
247 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
248 fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
251 if (NULL != fc->tracker)
252 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
255 delay = GNUNET_TIME_UNIT_ZERO;
256 delay = GNUNET_TIME_relative_max (delay,
257 GNUNET_TIME_relative_multiply (fc->msg_delay,
258 (1 << fc->num_rounds)));
261 /* full round transmitted wait 2x delay for ACK before going again */
263 delay = GNUNET_TIME_relative_multiply (fc->ack_delay, 2);
264 /* never use zero, need some time for ACK always */
265 delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
266 fc->wack = GNUNET_YES;
267 fc->last_round = GNUNET_TIME_absolute_get ();
268 GNUNET_STATISTICS_update (fc->stats,
269 _("# fragments wrap arounds"),
273 fc->proc_busy = GNUNET_YES;
274 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
275 fc->num_transmissions++;
276 fc->proc (fc->proc_cls, &fh->header);
281 * Create a fragmentation context for the given message.
282 * Fragments the message into fragments of size @a mtu or
283 * less. Calls @a proc on each un-acknowledged fragment,
284 * using both the expected @a msg_delay between messages and
285 * acknowledgements and the given @a tracker to guide the
286 * frequency of calls to @a proc.
288 * @param stats statistics context
289 * @param mtu the maximum message size for each fragment
290 * @param tracker bandwidth tracker to use for flow control (can be NULL)
291 * @param msg_delay initial delay to insert between fragment transmissions
292 * based on previous messages
293 * @param ack_delay expected delay between fragment transmission
294 * and ACK based on previous messages
295 * @param msg the message to fragment
296 * @param proc function to call for each fragment to transmit
297 * @param proc_cls closure for @a proc
298 * @return the fragmentation context
300 struct GNUNET_FRAGMENT_Context *
301 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
303 struct GNUNET_BANDWIDTH_Tracker *tracker,
304 struct GNUNET_TIME_Relative msg_delay,
305 struct GNUNET_TIME_Relative ack_delay,
306 const struct GNUNET_MessageHeader *msg,
307 GNUNET_FRAGMENT_MessageProcessor proc,
310 struct GNUNET_FRAGMENT_Context *fc;
314 GNUNET_STATISTICS_update (stats,
315 _("# messages fragmented"),
318 GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
319 size = ntohs (msg->size);
320 GNUNET_STATISTICS_update (stats,
321 _("# total size of fragmented messages"),
323 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
324 fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
327 fc->tracker = tracker;
328 fc->ack_delay = ack_delay;
329 fc->msg_delay = msg_delay;
330 fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
332 fc->proc_cls = proc_cls;
334 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
335 memcpy (&fc[1], msg, size);
337 (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
340 GNUNET_assert (bits <= 64);
342 fc->acks_mask = UINT64_MAX; /* set all 64 bit */
344 fc->acks_mask = (1LL << bits) - 1; /* set lowest 'bits' bit */
345 fc->acks = fc->acks_mask;
346 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
352 * Continuation to call from the 'proc' function after the fragment
353 * has been transmitted (and hence the next fragment can now be
356 * @param fc fragmentation context
359 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
361 GNUNET_assert (fc->proc_busy == GNUNET_YES);
362 fc->proc_busy = GNUNET_NO;
363 GNUNET_assert (fc->task == NULL);
365 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
366 (fc->delay_until), &transmit_next, fc);
371 * Process an acknowledgement message we got from the other
372 * side (to control re-transmits).
374 * @param fc fragmentation context
375 * @param msg acknowledgement message we received
376 * @return #GNUNET_OK if this ack completes the work of the 'fc'
377 * (all fragments have been received);
378 * #GNUNET_NO if more messages are pending
379 * #GNUNET_SYSERR if this ack is not valid for this fc
382 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
383 const struct GNUNET_MessageHeader *msg)
385 const struct FragmentAcknowledgement *fa;
387 struct GNUNET_TIME_Relative ndelay;
388 unsigned int ack_cnt;
389 unsigned int snd_cnt;
392 if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
395 return GNUNET_SYSERR;
397 fa = (const struct FragmentAcknowledgement *) msg;
398 if (ntohl (fa->fragment_id) != fc->fragment_id)
399 return GNUNET_SYSERR; /* not our ACK */
400 abits = GNUNET_ntohll (fa->bits);
401 if ( (GNUNET_YES == fc->wack) &&
402 (0 != fc->num_transmissions) )
404 /* normal ACK, can update running average of delay... */
405 fc->wack = GNUNET_NO;
406 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
407 fc->ack_delay.rel_value_us =
408 (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
409 /* calculate ratio msg sent vs. msg acked */
414 if (1 == (fc->acks_mask & (1 << i)))
417 if (0 == (abits & (1 << i)))
424 fc->msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay,
427 else if (snd_cnt > ack_cnt)
429 /* some loss, slow down proportionally */
430 fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
432 else if (snd_cnt == ack_cnt)
434 fc->msg_delay.rel_value_us =
435 (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
437 fc->num_transmissions = 0;
438 fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
439 GNUNET_TIME_UNIT_SECONDS);
440 fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
441 GNUNET_TIME_UNIT_SECONDS);
443 GNUNET_STATISTICS_update (fc->stats,
444 _("# fragment acknowledgements received"),
447 if (abits != (fc->acks & abits))
449 /* ID collission or message reordering, count! This should be rare! */
450 GNUNET_STATISTICS_update (fc->stats,
451 _("# bits removed from fragmentation ACKs"), 1,
454 fc->acks = abits & fc->acks_mask;
457 /* more to transmit, do so right now (if tracker permits...) */
458 if (fc->task != NULL)
460 /* schedule next transmission now, no point in waiting... */
461 GNUNET_SCHEDULER_cancel (fc->task);
462 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
466 /* only case where there is no task should be if we're waiting
467 * for the right to transmit again (proc_busy set to YES) */
468 GNUNET_assert (GNUNET_YES == fc->proc_busy);
474 GNUNET_STATISTICS_update (fc->stats,
475 _("# fragmentation transmissions completed"),
478 if (NULL != fc->task)
480 GNUNET_SCHEDULER_cancel (fc->task);
488 * Destroy the given fragmentation context (stop calling 'proc', free
491 * @param fc fragmentation context
492 * @param msg_delay where to store average delay between individual message transmissions the
493 * last message (OUT only)
494 * @param ack_delay where to store average delay between transmission and ACK for the
495 * last message, set to FOREVER if the message was not fully transmitted (OUT only)
498 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
499 struct GNUNET_TIME_Relative *msg_delay,
500 struct GNUNET_TIME_Relative *ack_delay)
502 if (fc->task != NULL)
503 GNUNET_SCHEDULER_cancel (fc->task);
504 if (NULL != ack_delay)
505 *ack_delay = fc->ack_delay;
506 if (NULL != msg_delay)
507 *msg_delay = GNUNET_TIME_relative_multiply (fc->msg_delay,
513 /* end of fragmentation.c */