2 This file is part of GNUnet
3 (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file fragmentation/fragmentation.c
22 * @brief fragmentation and defragmentation, this code allows
23 * sending and receiving messages that are larger than
24 * the MTU of the transport. Messages are still limited
25 * to a maximum size of 65535 bytes, which is a good
26 * idea because otherwise we may need ungainly fragmentation
27 * buffers. Each connected peer can have at most one
28 * fragmented packet at any given point in time (prevents
29 * DoS attacks). Fragmented messages that have not been
30 * completed after a certain amount of time are discarded.
31 * @author Christian Grothoff
35 #include "gnunet_fragmentation_lib.h"
38 * Message fragment. This header is followed
39 * by the actual data of the fragment.
44 struct GNUNET_MessageHeader header;
49 uint32_t off GNUNET_PACKED;
52 * "unique" id for the fragment
54 uint64_t id GNUNET_PACKED;
60 * Defragmentation context.
62 struct GNUNET_FRAGMENT_Context
68 * Fragment an over-sized message.
70 * @param msg the message to fragment
71 * @param mtu the maximum message size
72 * @param proc function to call for each fragment
73 * @param proc_cls closure for proc
76 GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
78 GNUNET_FRAGMENT_MessageProcessor proc,
86 * Create a defragmentation context.
88 * @param stats statistics context
89 * @param proc function to call with defragmented messages
90 * @param proc_cls closure for proc
91 * @return the defragmentation context
93 struct GNUNET_FRAGMENT_Context *
94 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
95 GNUNET_FRAGMENT_MessageProcessor proc,
103 * Destroy the given defragmentation context.
106 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
113 * We have received a fragment. Process it.
115 * @param ctx the context
116 * @param sender who transmitted the fragment
117 * @param msg the message that was received
120 GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
121 const struct GNUNET_PeerIdentity *sender,
122 const struct GNUNET_MessageHeader *msg)
132 * How many buckets does the fragment hash table
135 #define DEFRAG_BUCKET_COUNT 16
138 * After how long do fragments time out?
140 #ifndef DEFRAGMENTATION_TIMEOUT
141 #define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
145 * Entry in the linked list of fragments.
150 P2P_fragmentation_MESSAGE *frag;
154 * Entry in the GNUNET_hash table of fragments.
160 GNUNET_PeerIdentity sender;
165 #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
167 static GNUNET_CoreAPIForPlugins *coreAPI;
169 static GNUNET_Stats_ServiceAPI *stats;
171 static int stat_defragmented;
173 static int stat_fragmented;
175 static int stat_discarded;
178 * Hashtable *with* collision management!
180 static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
183 * Lock for the defragmentation cache.
185 static struct GNUNET_Mutex *defragCacheLock;
188 freeFL (FL * fl, int c)
194 stats->change (stat_discarded, c);
195 GNUNET_free (fl->frag);
202 * This cron job ensures that we purge buffers of fragments
203 * that have timed out. It can run in much longer intervals
204 * than the defragmentationCron, e.g. every 60s.
206 * This method goes through the hashtable, finds entries that
207 * have timed out and removes them (and all the fragments that
208 * belong to the entry). It's a bit more complicated as the
209 * collision list is also collapsed.
212 defragmentationPurgeCron (void *unused)
219 GNUNET_mutex_lock (defragCacheLock);
220 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
223 smf = defragmentationCache[i];
226 if (smf->ttl < GNUNET_get_time ())
228 /* free linked list of fragments */
229 freeFL (smf->head, 1);
233 defragmentationCache[i] = next;
243 } /* while smf != NULL */
244 } /* for all buckets */
245 GNUNET_mutex_unlock (defragCacheLock);
249 * Check if this fragment-list is complete. If yes, put it together,
250 * process and free all buffers. Does not free the pep
251 * itself (but sets the TTL to 0 to have the cron free it
252 * in the next iteration).
254 * @param pep the entry in the GNUNET_hash table
257 checkComplete (FC * pep)
264 GNUNET_GE_ASSERT (NULL, pep != NULL);
268 len = ntohs (pos->frag->len);
270 goto CLEANUP; /* really bad error! */
272 while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
274 if (off >= off + FRAGSIZE (pos))
275 goto CLEANUP; /* error! */
276 if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
277 off = ntohs (pos->frag->off) + FRAGSIZE (pos);
279 goto CLEANUP; /* error! */
283 return; /* some fragment is still missing */
285 msg = GNUNET_malloc (len);
289 memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
293 stats->change (stat_defragmented, 1);
295 printf ("Finished defragmentation!\n");
297 /* handle message! */
298 coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
301 /* free fragment buffers */
302 freeFL (pep->head, 0);
308 * See if the new fragment is a part of this entry and join them if
309 * yes. Return GNUNET_SYSERR if the fragments do not match. Return GNUNET_OK if
310 * the fragments do match and the fragment has been processed. The
311 * defragCacheLock is already acquired by the caller whenever this
312 * method is called.<p>
314 * @param entry the entry in the cache
315 * @param pep the new entry
316 * @param packet the ip part in the new entry
320 const GNUNET_PeerIdentity * sender,
321 const P2P_fragmentation_MESSAGE * packet)
323 /* frame before ours; may end in the middle of
324 our frame or before it starts; NULL if we are
325 the earliest position we have received so far */
327 /* frame after ours; may start in the middle of
328 our frame or after it; NULL if we are the last
329 fragment we have received so far */
331 /* current position in the frame-list */
333 /* the new entry that we're inserting */
338 GNUNET_GE_ASSERT (NULL, entry != NULL);
339 if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
340 return GNUNET_SYSERR; /* wrong fragment list, try another! */
341 if (ntohl (packet->id) != entry->id)
342 return GNUNET_SYSERR; /* wrong fragment list, try another! */
344 printf ("Received fragment %u from %u to %u\n",
347 ntohs (packet->off) + ntohs (packet->header.size) -
348 sizeof (P2P_fragmentation_MESSAGE));
351 if ((pos != NULL) && (packet->len != pos->frag->len))
352 return GNUNET_SYSERR; /* wrong fragment size */
355 /* find the before-frame */
356 while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
362 /* find the after-frame */
364 ntohs (packet->off) + ntohs (packet->header.size) -
365 sizeof (P2P_fragmentation_MESSAGE);
366 if (end <= ntohs (packet->off))
369 GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
370 "Received invalid fragment at %s:%d\n", __FILE__,
372 return GNUNET_SYSERR; /* yuck! integer overflow! */
379 while ((after != NULL) && (ntohs (after->frag->off) < end))
382 if ((before != NULL) && (before == after))
384 /* this implies after or before != NULL and thereby the new
385 fragment is redundant as it is fully enclosed in an earlier
388 stats->change (stat_defragmented, 1);
389 return GNUNET_OK; /* drop, there is a packet that spans our range! */
392 if ((before != NULL) &&
394 ((htons (before->frag->off) +
395 FRAGSIZE (before)) >= htons (after->frag->off)))
397 /* this implies that the fragment that starts before us and the
398 fragment that comes after this one leave no space in the middle
399 or even overlap; thus we can drop this redundant piece */
401 stats->change (stat_defragmented, 1);
406 pep = GNUNET_malloc (sizeof (FC));
407 pep->frag = GNUNET_malloc (ntohs (packet->header.size));
408 memcpy (pep->frag, packet, ntohs (packet->header.size));
418 GNUNET_free (pos->frag);
424 /* end of insert first */
429 /* insert last: find the end, free everything after it */
430 freeFL (before->link, 1);
435 /* ok, we are filling the middle between two fragments; insert. If
436 there is anything else in the middle, it can be dropped as we're
437 bigger & cover that area as well */
438 /* free everything between before and after */
443 GNUNET_free (pos->frag);
451 entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
452 checkComplete (entry);
457 * Defragment the given fragment and pass to handler once
458 * defragmentation is complete.
460 * @param frag the packet to defragment
461 * @return GNUNET_SYSERR if the fragment is invalid
464 processFragment (const GNUNET_PeerIdentity * sender,
465 const GNUNET_MessageHeader * frag)
470 if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
471 return GNUNET_SYSERR;
473 GNUNET_mutex_lock (defragCacheLock);
474 hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
475 smf = defragmentationCache[hash];
479 tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
481 GNUNET_mutex_unlock (defragCacheLock);
484 if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
486 freeFL (smf->head, 1);
493 smf = GNUNET_malloc (sizeof (FC));
494 smf->next = defragmentationCache[hash];
495 defragmentationCache[hash] = smf;
496 smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
497 smf->sender = *sender;
499 smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
500 smf->head = GNUNET_malloc (sizeof (FL));
501 smf->head->link = NULL;
502 smf->head->frag = GNUNET_malloc (ntohs (frag->size));
503 memcpy (smf->head->frag, frag, ntohs (frag->size));
505 GNUNET_mutex_unlock (defragCacheLock);
511 GNUNET_PeerIdentity sender;
512 /* maximums size of each fragment */
514 /** how long is this message part expected to be? */
516 /** when did we intend to transmit? */
517 GNUNET_CronTime transmissionTime;
521 * Send a message that had to be fragmented (right now!). First grabs
522 * the first part of the message (obtained from ctx->se) and stores
523 * that in a P2P_fragmentation_MESSAGE envelope. The remaining fragments are
524 * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
525 * will be transmitted next). The logic here is that if the priority
526 * for the first fragment was sufficiently high, the priority should
527 * also have been sufficiently high for all of the other fragments (at
528 * this time) since they have the same priority. And we want to make
529 * sure that we send all of them since just sending the first fragment
530 * and then going to other messages of equal priority would not be
531 * such a great idea (i.e. would just waste bandwidth).
534 fragmentBMC (void *buf, void *cls, unsigned short len)
536 FragmentBMC *ctx = cls;
537 static int idGen = 0;
538 P2P_fragmentation_MESSAGE *frag;
543 if ((len < ctx->mtu) || (buf == NULL))
546 return GNUNET_SYSERR;
549 stats->change (stat_fragmented, 1);
550 id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
551 /* write first fragment to buf */
552 frag = (P2P_fragmentation_MESSAGE *) buf;
553 frag->header.size = htons (len);
554 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
556 frag->off = htons (0);
557 frag->len = htons (ctx->len);
558 memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
560 /* create remaining fragments, add to queue! */
561 pos = len - sizeof (P2P_fragmentation_MESSAGE);
562 frag = GNUNET_malloc (ctx->mtu);
563 while (pos < ctx->len)
565 mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
568 GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
569 frag->header.size = htons (mlen);
570 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
572 frag->off = htons (pos);
573 frag->len = htons (ctx->len);
575 &((char *) (&ctx[1]))[pos],
576 mlen - sizeof (P2P_fragmentation_MESSAGE));
577 coreAPI->ciphertext_send (&ctx->sender,
579 GNUNET_EXTREME_PRIORITY,
580 ctx->transmissionTime - GNUNET_get_time ());
581 pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
583 GNUNET_GE_ASSERT (NULL, pos == ctx->len);
590 * The given message must be fragmented. Produce a placeholder that
591 * corresponds to the first fragment. Once that fragment is scheduled
592 * for transmission, the placeholder should automatically add all of
593 * the other fragments (with very high priority).
596 fragment (const GNUNET_PeerIdentity * peer,
599 unsigned int targetTime,
600 unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
605 GNUNET_GE_ASSERT (NULL, len > mtu);
606 GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
607 fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
609 fbmc->sender = *peer;
610 fbmc->transmissionTime = targetTime;
614 memcpy (&fbmc[1], bmcClosure, len);
615 GNUNET_free (bmcClosure);
619 if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
625 xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
626 coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len, /* compute new priority */
631 * Initialize Fragmentation module.
633 GNUNET_Fragmentation_ServiceAPI *
634 provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
636 static GNUNET_Fragmentation_ServiceAPI ret;
640 stats = coreAPI->service_request ("stats");
644 stats->create (gettext_noop ("# messages defragmented"));
646 stats->create (gettext_noop ("# messages fragmented"));
647 stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
649 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
650 defragmentationCache[i] = NULL;
651 defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
652 GNUNET_cron_add_job (coreAPI->cron,
653 &defragmentationPurgeCron,
654 60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
656 GNUNET_GE_LOG (capi->ectx,
657 GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
658 _("`%s' registering handler %d\n"), "fragmentation",
659 GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
660 capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
663 ret.fragment = &fragment;
668 * Shutdown fragmentation.
671 release_module_fragmentation ()
675 coreAPI->p2p_ciphertext_handler_unregister
676 (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
677 GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
678 60 * GNUNET_CRON_SECONDS, NULL);
679 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
681 FC *pos = defragmentationCache[i];
684 FC *next = pos->next;
685 freeFL (pos->head, 1);
692 coreAPI->service_release (stats);
695 GNUNET_mutex_destroy (defragCacheLock);
696 defragCacheLock = NULL;
702 /* end of fragmentation.c */