2 This file is part of GNUnet
3 (C) 2009, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file src/fragmentation/fragmentation.c
22 * @brief library to help fragment messages
23 * @author Christian Grothoff
26 #include "gnunet_fragmentation_lib.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
32 * Absolute minimum delay we impose between sending and expecting ACK to arrive.
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
38 * Fragmentation context.
40 struct GNUNET_FRAGMENT_Context
45 struct GNUNET_STATISTICS_Handle *stats;
48 * Tracker for flow control.
50 struct GNUNET_BANDWIDTH_Tracker *tracker;
53 * Current expected delay for ACKs.
55 struct GNUNET_TIME_Relative delay;
58 * Next allowed transmission time.
60 struct GNUNET_TIME_Absolute delay_until;
63 * Time we transmitted the last message of the last round.
65 struct GNUNET_TIME_Absolute last_round;
68 * Message to fragment (allocated at the end of this struct).
70 const struct GNUNET_MessageHeader *msg;
73 * Function to call for transmissions.
75 GNUNET_FRAGMENT_MessageProcessor proc;
83 * Bitfield, set to 1 for each unacknowledged fragment.
88 * Bitfield with all possible bits for 'acks' (used to mask the
94 * Task performing work for the fragmenter.
96 GNUNET_SCHEDULER_TaskIdentifier task;
99 * Our fragmentation ID. (chosen at random)
101 uint32_t fragment_id;
104 * Round-robin selector for the next transmission.
106 unsigned int next_transmission;
109 * How many rounds of transmission have we completed so far?
111 unsigned int num_rounds;
114 * How many transmission have we completed in this round?
116 unsigned int num_transmissions;
119 * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
124 * GNUNET_YES if we are waiting for an ACK.
129 * Target fragment size.
137 * Transmit the next fragment to the other peer.
139 * @param cls the 'struct GNUNET_FRAGMENT_Context'
140 * @param tc scheduler context
143 transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
145 struct GNUNET_FRAGMENT_Context *fc = cls;
148 struct FragmentHeader *fh;
149 struct GNUNET_TIME_Relative delay;
155 fc->task = GNUNET_SCHEDULER_NO_TASK;
156 GNUNET_assert (GNUNET_NO == fc->proc_busy);
158 return; /* all done */
159 /* calculate delay */
161 while (0 == (fc->acks & (1LL << fc->next_transmission)))
163 fc->next_transmission = (fc->next_transmission + 1) % 64;
164 wrap |= (0 == fc->next_transmission);
166 bit = fc->next_transmission;
167 size = ntohs (fc->msg->size);
168 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
170 (size % (fc->mtu - sizeof (struct FragmentHeader))) +
171 sizeof (struct FragmentHeader);
174 if (NULL != fc->tracker)
175 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
177 delay = GNUNET_TIME_UNIT_ZERO;
178 if (delay.rel_value > 0)
180 fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
183 fc->next_transmission = (fc->next_transmission + 1) % 64;
184 wrap |= (fc->next_transmission == 0);
185 while (0 == (fc->acks & (1LL << fc->next_transmission)))
187 fc->next_transmission = (fc->next_transmission + 1) % 64;
188 wrap |= (fc->next_transmission == 0);
191 /* assemble fragmentation message */
192 mbuf = (const char *) &fc[1];
193 fh = (struct FragmentHeader *) msg;
194 fh->header.size = htons (fsize);
195 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
196 fh->fragment_id = htonl (fc->fragment_id);
197 fh->total_size = fc->msg->size; /* already in big-endian */
198 fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
199 memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
200 fsize - sizeof (struct FragmentHeader));
201 if (NULL != fc->tracker)
202 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
203 GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1,
205 if (0 != fc->last_round.abs_value)
206 GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1,
209 /* select next message to calculate delay */
210 bit = fc->next_transmission;
211 size = ntohs (fc->msg->size);
212 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
213 fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
216 if (NULL != fc->tracker)
217 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
219 delay = GNUNET_TIME_UNIT_ZERO;
222 /* full round transmitted wait 2x delay for ACK before going again */
225 GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
226 GNUNET_TIME_relative_multiply (fc->delay,
228 /* never use zero, need some time for ACK always */
229 delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
230 fc->wack = GNUNET_YES;
231 fc->last_round = GNUNET_TIME_absolute_get ();
232 GNUNET_STATISTICS_update (fc->stats, _("# fragments wrap arounds"), 1,
235 fc->proc_busy = GNUNET_YES;
236 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
237 fc->num_transmissions++;
238 fc->proc (fc->proc_cls, &fh->header);
243 * Create a fragmentation context for the given message.
244 * Fragments the message into fragments of size "mtu" or
245 * less. Calls 'proc' on each un-acknowledged fragment,
246 * using both the expected 'delay' between messages and
247 * acknowledgements and the given 'tracker' to guide the
248 * frequency of calls to 'proc'.
250 * @param stats statistics context
251 * @param mtu the maximum message size for each fragment
252 * @param tracker bandwidth tracker to use for flow control (can be NULL)
253 * @param delay expected delay between fragment transmission
254 * and ACK based on previous messages
255 * @param msg the message to fragment
256 * @param proc function to call for each fragment to transmit
257 * @param proc_cls closure for proc
258 * @return the fragmentation context
260 struct GNUNET_FRAGMENT_Context *
261 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
263 struct GNUNET_BANDWIDTH_Tracker *tracker,
264 struct GNUNET_TIME_Relative delay,
265 const struct GNUNET_MessageHeader *msg,
266 GNUNET_FRAGMENT_MessageProcessor proc,
269 struct GNUNET_FRAGMENT_Context *fc;
273 GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO);
274 GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
275 size = ntohs (msg->size);
276 GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"),
278 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
279 fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
282 fc->tracker = tracker;
284 fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
286 fc->proc_cls = proc_cls;
288 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
289 memcpy (&fc[1], msg, size);
291 (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
294 GNUNET_assert (bits <= 64);
296 fc->acks_mask = UINT64_MAX; /* set all 64 bit */
298 fc->acks_mask = (1LL << bits) - 1; /* set lowest 'bits' bit */
299 fc->acks = fc->acks_mask;
300 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
306 * Continuation to call from the 'proc' function after the fragment
307 * has been transmitted (and hence the next fragment can now be
310 * @param fc fragmentation context
313 GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
315 GNUNET_assert (fc->proc_busy == GNUNET_YES);
316 fc->proc_busy = GNUNET_NO;
317 GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
319 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
320 (fc->delay_until), &transmit_next, fc);
325 * Process an acknowledgement message we got from the other
326 * side (to control re-transmits).
328 * @param fc fragmentation context
329 * @param msg acknowledgement message we received
330 * @return GNUNET_OK if this ack completes the work of the 'fc'
331 * (all fragments have been received);
332 * GNUNET_NO if more messages are pending
333 * GNUNET_SYSERR if this ack is not valid for this fc
336 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
337 const struct GNUNET_MessageHeader *msg)
339 const struct FragmentAcknowledgement *fa;
341 struct GNUNET_TIME_Relative ndelay;
343 if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
346 return GNUNET_SYSERR;
348 fa = (const struct FragmentAcknowledgement *) msg;
349 if (ntohl (fa->fragment_id) != fc->fragment_id)
350 return GNUNET_SYSERR; /* not our ACK */
351 abits = GNUNET_ntohll (fa->bits);
352 if ( (GNUNET_YES == fc->wack) &&
353 (0 != fc->num_transmissions) )
355 /* normal ACK, can update running average of delay... */
356 fc->wack = GNUNET_NO;
357 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
358 fc->delay.rel_value =
359 (ndelay.rel_value / fc->num_transmissions + 3 * fc->delay.rel_value) / 4;
360 fc->num_transmissions = 0;
362 GNUNET_STATISTICS_update (fc->stats,
363 _("# fragment acknowledgements received"), 1,
365 if (abits != (fc->acks & abits))
367 /* ID collission or message reordering, count! This should be rare! */
368 GNUNET_STATISTICS_update (fc->stats,
369 _("# bits removed from fragmentation ACKs"), 1,
372 fc->acks = abits & fc->acks_mask;
375 /* more to transmit, do so right now (if tracker permits...) */
376 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
378 /* schedule next transmission now, no point in waiting... */
379 GNUNET_SCHEDULER_cancel (fc->task);
380 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
384 /* only case where there is no task should be if we're waiting
385 * for the right to transmit again (proc_busy set to YES) */
386 GNUNET_assert (GNUNET_YES == fc->proc_busy);
392 GNUNET_STATISTICS_update (fc->stats,
393 _("# fragmentation transmissions completed"), 1,
395 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
397 GNUNET_SCHEDULER_cancel (fc->task);
398 fc->task = GNUNET_SCHEDULER_NO_TASK;
405 * Destroy the given fragmentation context (stop calling 'proc', free
408 * @param fc fragmentation context
409 * @return average delay between transmission and ACK for the
410 * last message, FOREVER if the message was not fully transmitted
412 struct GNUNET_TIME_Relative
413 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
415 struct GNUNET_TIME_Relative ret;
417 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
418 GNUNET_SCHEDULER_cancel (fc->task);
425 /* end of fragmentation.c */