2 This file is part of GNUnet
3 (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file fragmentation/fragmentation.c
22 * @brief fragmentation and defragmentation, this code allows
23 * sending and receiving messages that are larger than
24 * the MTU of the transport. Messages are still limited
25 * to a maximum size of 65535 bytes, which is a good
26 * idea because otherwise we may need ungainly fragmentation
27 * buffers. Each connected peer can have at most one
28 * fragmented packet at any given point in time (prevents
29 * DoS attacks). Fragmented messages that have not been
30 * completed after a certain amount of time are discarded.
35 #include "gnunet_fragmentation_lib.h"
36 #include "gnunet_protocols.h"
37 #include "gnunet_util_lib.h"
39 * Message fragment. This header is followed
40 * by the actual data of the fragment.
45 struct GNUNET_MessageHeader header;
50 uint16_t off GNUNET_PACKED;
53 * "unique" id for the fragment
55 uint32_t id GNUNET_PACKED;
62 struct GNUNET_FRAGEMENT_Ctxbuffer{
63 struct GNUNET_FRAGEMENT_Ctxbuffer *next;
68 struct GNUNET_TIME_Absolute receivedTime;
69 struct GNUNET_PeerIdentity peerID;
75 * Defragmentation context.
77 struct GNUNET_FRAGMENT_Context
80 struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
81 GNUNET_FRAGMENT_MessageProcessor proc;
87 * Fragment an over-sized message.
89 * @param msg the message to fragment
90 * @param mtu the maximum message size
91 * @param proc function to call for each fragment
92 * @param proc_cls closure for proc
95 GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
97 GNUNET_FRAGMENT_MessageProcessor proc,
100 uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256);
101 size_t size = sizeof(struct Fragment);
103 if(ntohs(msg->size) > mtu-size){
108 lastSize = ntohs(msg->size) % (mtu-size);
109 num = ntohs(msg->size) / (mtu - size);
114 for(i = 0; i<actualNum; i++)
116 struct Fragment *frag;
117 if(actualNum != num){
119 frag = (struct Fragment *)GNUNET_malloc(mtu);
122 frag = (struct Fragment *)GNUNET_malloc(lastSize+size);
126 frag = (struct Fragment *)GNUNET_malloc(mtu);
128 frag->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
129 frag->id = htonl(id);
130 frag->off = htons((mtu-size)*i);
131 frag->mtu = htons(mtu);
132 frag->totalNum = htons(actualNum);
133 frag->totalSize = msg->size;
134 char *tmpMsg = (char *)msg;
135 if(actualNum != num){
137 frag->header.size = htons(mtu);
138 memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
141 frag->header.size = htons(lastSize+size);
142 memcpy(&frag[1], tmpMsg + (mtu-size)*i, lastSize);
146 frag->header.size = htons(mtu);
147 memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
149 proc(proc_cls, &frag->header);
156 * Create a defragmentation context.
158 * @param stats statistics context
159 * @param proc function to call with defragmented messages
160 * @param proc_cls closure for proc
161 * @return the defragmentation context
163 struct GNUNET_FRAGMENT_Context *
164 GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
165 GNUNET_FRAGMENT_MessageProcessor proc,
168 struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context));
171 ctx->proc_cls = proc_cls;
178 * Destroy the given defragmentation context.
181 GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
183 struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
184 for(buffer = ctx->buffer; buffer!=NULL; buffer = buffer->next){
185 GNUNET_free(buffer->num);
194 * We have received a fragment. Process it.
196 * @param ctx the context
197 * @param sender who transmitted the fragment
198 * @param msg the message that was received
201 GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
202 const struct GNUNET_PeerIdentity *sender,
203 const struct GNUNET_MessageHeader *msg)
205 uint16_t type = ntohs(msg->type);
206 int exist = 0, received = 0;
207 if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){
210 struct Fragment *frag = (struct Fragment *)msg;
211 struct GNUNET_FRAGEMENT_Ctxbuffer* buffer;
212 struct GNUNET_FRAGEMENT_Ctxbuffer* prev;
214 buffer = ctx->buffer;
215 while (buffer != NULL)
217 if ((buffer->id == ntohl(frag->id))&&(0 == memcmp (&buffer->peerID,
218 sender, sizeof (struct GNUNET_PeerIdentity)))){
223 buffer = buffer->next;
229 for(i = 0; i<ntohs(frag->totalNum); i++){
230 if(buffer->num[i]==ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment))){
238 buffer = GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer));
239 buffer->num = GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int));
241 for(j = 0; j<ntohs(frag->totalNum); j++){
242 buffer->num[j] = -10;
244 buffer->peerID = *sender;
245 buffer->id = ntohl(frag->id);
246 buffer->receivedTime = GNUNET_TIME_absolute_get ();
247 uint16_t si = ntohs(frag->totalSize);
249 buffer->buff = GNUNET_malloc(si);
250 buffer->next = ctx->buffer;
251 ctx->buffer = buffer;
255 buffer->num[buffer->counter++]=ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment));
256 uint16_t sizeoffrag = ntohs(frag->header.size) - sizeof(struct Fragment);
257 memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], sizeoffrag);
258 buffer->receivedTime = GNUNET_TIME_absolute_get ();
261 if(buffer->counter == ntohs(frag->totalNum))
263 ctx->proc(ctx->proc_cls, (struct GNUNET_MessageHeader *)buffer->buff);
265 ctx->buffer = buffer->next;
268 prev->next = buffer->next;
280 * How many buckets does the fragment hash table
283 #define DEFRAG_BUCKET_COUNT 16
286 * After how long do fragments time out?
288 #ifndef DEFRAGMENTATION_TIMEOUT
289 #define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
293 * Entry in the linked list of fragments.
298 P2P_fragmentation_MESSAGE *frag;
302 * Entry in the GNUNET_hash table of fragments.
308 GNUNET_PeerIdentity sender;
313 #define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
315 static GNUNET_CoreAPIForPlugins *coreAPI;
317 static GNUNET_Stats_ServiceAPI *stats;
319 static int stat_defragmented;
321 static int stat_fragmented;
323 static int stat_discarded;
326 * Hashtable *with* collision management!
328 static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
331 * Lock for the defragmentation cache.
333 static struct GNUNET_Mutex *defragCacheLock;
336 freeFL (FL * fl, int c)
342 stats->change (stat_discarded, c);
343 GNUNET_free (fl->frag);
350 * This cron job ensures that we purge buffers of fragments
351 * that have timed out. It can run in much longer intervals
352 * than the defragmentationCron, e.g. every 60s.
354 * This method goes through the hashtable, finds entries that
355 * have timed out and removes them (and all the fragments that
356 * belong to the entry). It's a bit more complicated as the
357 * collision list is also collapsed.
360 defragmentationPurgeCron (void *unused)
367 GNUNET_mutex_lock (defragCacheLock);
368 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
371 smf = defragmentationCache[i];
374 if (smf->ttl < GNUNET_get_time ())
376 /* free linked list of fragments */
377 freeFL (smf->head, 1);
381 defragmentationCache[i] = next;
391 } /* while smf != NULL */
392 } /* for all buckets */
393 GNUNET_mutex_unlock (defragCacheLock);
397 * Check if this fragment-list is complete. If yes, put it together,
398 * process and free all buffers. Does not free the pep
399 * itself (but sets the TTL to 0 to have the cron free it
400 * in the next iteration).
402 * @param pep the entry in the GNUNET_hash table
405 checkComplete (FC * pep)
412 GNUNET_GE_ASSERT (NULL, pep != NULL);
416 len = ntohs (pos->frag->len);
418 goto CLEANUP; /* really bad error! */
420 while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
422 if (off >= off + FRAGSIZE (pos))
423 goto CLEANUP; /* error! */
424 if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
425 off = ntohs (pos->frag->off) + FRAGSIZE (pos);
427 goto CLEANUP; /* error! */
431 return; /* some fragment is still missing */
433 msg = GNUNET_malloc (len);
437 memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
441 stats->change (stat_defragmented, 1);
443 printf ("Finished defragmentation!\n");
445 /* handle message! */
446 coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
449 /* free fragment buffers */
450 freeFL (pep->head, 0);
456 * See if the new fragment is a part of this entry and join them if
457 * yes. Return GNUNET_SYSERR if the fragments do not match. Return GNUNET_OK if
458 * the fragments do match and the fragment has been processed. The
459 * defragCacheLock is already acquired by the caller whenever this
460 * method is called.<p>
462 * @param entry the entry in the cache
463 * @param pep the new entry
464 * @param packet the ip part in the new entry
468 const GNUNET_PeerIdentity * sender,
469 const P2P_fragmentation_MESSAGE * packet)
471 /* frame before ours; may end in the middle of
472 our frame or before it starts; NULL if we are
473 the earliest position we have received so far */
475 /* frame after ours; may start in the middle of
476 our frame or after it; NULL if we are the last
477 fragment we have received so far */
479 /* current position in the frame-list */
481 /* the new entry that we're inserting */
486 GNUNET_GE_ASSERT (NULL, entry != NULL);
487 if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
488 return GNUNET_SYSERR; /* wrong fragment list, try another! */
489 if (ntohl (packet->id) != entry->id)
490 return GNUNET_SYSERR; /* wrong fragment list, try another! */
492 printf ("Received fragment %u from %u to %u\n",
495 ntohs (packet->off) + ntohs (packet->header.size) -
496 sizeof (P2P_fragmentation_MESSAGE));
499 if ((pos != NULL) && (packet->len != pos->frag->len))
500 return GNUNET_SYSERR; /* wrong fragment size */
503 /* find the before-frame */
504 while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
510 /* find the after-frame */
512 ntohs (packet->off) + ntohs (packet->header.size) -
513 sizeof (P2P_fragmentation_MESSAGE);
514 if (end <= ntohs (packet->off))
517 GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
518 "Received invalid fragment at %s:%d\n", __FILE__,
520 return GNUNET_SYSERR; /* yuck! integer overflow! */
527 while ((after != NULL) && (ntohs (after->frag->off) < end))
530 if ((before != NULL) && (before == after))
532 /* this implies after or before != NULL and thereby the new
533 fragment is redundant as it is fully enclosed in an earlier
536 stats->change (stat_defragmented, 1);
537 return GNUNET_OK; /* drop, there is a packet that spans our range! */
540 if ((before != NULL) &&
542 ((htons (before->frag->off) +
543 FRAGSIZE (before)) >= htons (after->frag->off)))
545 /* this implies that the fragment that starts before us and the
546 fragment that comes after this one leave no space in the middle
547 or even overlap; thus we can drop this redundant piece */
549 stats->change (stat_defragmented, 1);
554 pep = GNUNET_malloc (sizeof (FC));
555 pep->frag = GNUNET_malloc (ntohs (packet->header.size));
556 memcpy (pep->frag, packet, ntohs (packet->header.size));
566 GNUNET_free (pos->frag);
572 /* end of insert first */
577 /* insert last: find the end, free everything after it */
578 freeFL (before->link, 1);
583 /* ok, we are filling the middle between two fragments; insert. If
584 there is anything else in the middle, it can be dropped as we're
585 bigger & cover that area as well */
586 /* free everything between before and after */
591 GNUNET_free (pos->frag);
599 entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
600 checkComplete (entry);
605 * Defragment the given fragment and pass to handler once
606 * defragmentation is complete.
608 * @param frag the packet to defragment
609 * @return GNUNET_SYSERR if the fragment is invalid
612 processFragment (const GNUNET_PeerIdentity * sender,
613 const GNUNET_MessageHeader * frag)
618 if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
619 return GNUNET_SYSERR;
621 GNUNET_mutex_lock (defragCacheLock);
622 hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
623 smf = defragmentationCache[hash];
627 tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
629 GNUNET_mutex_unlock (defragCacheLock);
632 if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
634 freeFL (smf->head, 1);
641 smf = GNUNET_malloc (sizeof (FC));
642 smf->next = defragmentationCache[hash];
643 defragmentationCache[hash] = smf;
644 smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
645 smf->sender = *sender;
647 smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
648 smf->head = GNUNET_malloc (sizeof (FL));
649 smf->head->link = NULL;
650 smf->head->frag = GNUNET_malloc (ntohs (frag->size));
651 memcpy (smf->head->frag, frag, ntohs (frag->size));
653 GNUNET_mutex_unlock (defragCacheLock);
659 GNUNET_PeerIdentity sender;
660 /* maximums size of each fragment */
662 /** how long is this message part expected to be? */
664 /** when did we intend to transmit? */
665 GNUNET_CronTime transmissionTime;
669 * Send a message that had to be fragmented (right now!). First grabs
670 * the first part of the message (obtained from ctx->se) and stores
671 * that in a P2P_fragmentation_MESSAGE envelope. The remaining fragments are
672 * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
673 * will be transmitted next). The logic here is that if the priority
674 * for the first fragment was sufficiently high, the priority should
675 * also have been sufficiently high for all of the other fragments (at
676 * this time) since they have the same priority. And we want to make
677 * sure that we send all of them since just sending the first fragment
678 * and then going to other messages of equal priority would not be
679 * such a great idea (i.e. would just waste bandwidth).
682 fragmentBMC (void *buf, void *cls, unsigned short len)
684 FragmentBMC *ctx = cls;
685 static int idGen = 0;
686 P2P_fragmentation_MESSAGE *frag;
691 if ((len < ctx->mtu) || (buf == NULL))
694 return GNUNET_SYSERR;
697 stats->change (stat_fragmented, 1);
698 id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
699 /* write first fragment to buf */
700 frag = (P2P_fragmentation_MESSAGE *) buf;
701 frag->header.size = htons (len);
702 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
704 frag->off = htons (0);
705 frag->len = htons (ctx->len);
706 memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
708 /* create remaining fragments, add to queue! */
709 pos = len - sizeof (P2P_fragmentation_MESSAGE);
710 frag = GNUNET_malloc (ctx->mtu);
711 while (pos < ctx->len)
713 mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
716 GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
717 frag->header.size = htons (mlen);
718 frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
720 frag->off = htons (pos);
721 frag->len = htons (ctx->len);
723 &((char *) (&ctx[1]))[pos],
724 mlen - sizeof (P2P_fragmentation_MESSAGE));
725 coreAPI->ciphertext_send (&ctx->sender,
727 GNUNET_EXTREME_PRIORITY,
728 ctx->transmissionTime - GNUNET_get_time ());
729 pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
731 GNUNET_GE_ASSERT (NULL, pos == ctx->len);
738 * The given message must be fragmented. Produce a placeholder that
739 * corresponds to the first fragment. Once that fragment is scheduled
740 * for transmission, the placeholder should automatically add all of
741 * the other fragments (with very high priority).
744 fragment (const GNUNET_PeerIdentity * peer,
747 unsigned int targetTime,
748 unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
753 GNUNET_GE_ASSERT (NULL, len > mtu);
754 GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
755 fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
757 fbmc->sender = *peer;
758 fbmc->transmissionTime = targetTime;
762 memcpy (&fbmc[1], bmcClosure, len);
763 GNUNET_free (bmcClosure);
767 if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
773 xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
774 coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len, /* compute new priority */
779 * Initialize Fragmentation module.
781 GNUNET_Fragmentation_ServiceAPI *
782 provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
784 static GNUNET_Fragmentation_ServiceAPI ret;
788 stats = coreAPI->service_request ("stats");
792 stats->create (gettext_noop ("# messages defragmented"));
794 stats->create (gettext_noop ("# messages fragmented"));
795 stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
797 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
798 defragmentationCache[i] = NULL;
799 defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
800 GNUNET_cron_add_job (coreAPI->cron,
801 &defragmentationPurgeCron,
802 60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
804 GNUNET_GE_LOG (capi->ectx,
805 GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
806 _("`%s' registering handler %d\n"), "fragmentation",
807 GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
808 capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
811 ret.fragment = &fragment;
816 * Shutdown fragmentation.
819 release_module_fragmentation ()
823 coreAPI->p2p_ciphertext_handler_unregister
824 (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
825 GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
826 60 * GNUNET_CRON_SECONDS, NULL);
827 for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
829 FC *pos = defragmentationCache[i];
832 FC *next = pos->next;
833 freeFL (pos->head, 1);
840 coreAPI->service_release (stats);
843 GNUNET_mutex_destroy (defragCacheLock);
844 defragCacheLock = NULL;
850 /* end of fragmentation.c */