commented out wrong message type
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
index 49680bf4287f94dc2e46d0855db2d5b5297fc615..f686351004171c94c118610a522277d34f5174f9 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet
-     (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009-2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 /**
- * @file fragmentation/fragmentation.c
- * @brief fragmentation and defragmentation, this code allows
- *        sending and receiving messages that are larger than
- *        the MTU of the transport.  Messages are still limited
- *        to a maximum size of 65535 bytes, which is a good
- *        idea because otherwise we may need ungainly fragmentation
- *        buffers.  Each connected peer can have at most one
- *        fragmented packet at any given point in time (prevents
- *        DoS attacks).  Fragmented messages that have not been
- *        completed after a certain amount of time are discarded.
+ * @file src/fragmentation/fragmentation.c
+ * @brief library to help fragment messages
  * @author Christian Grothoff
  */
-
 #include "platform.h"
 #include "gnunet_fragmentation_lib.h"
 #include "gnunet_protocols.h"
-#include "gnunet_util_lib.h"
-/**
- * Message fragment.  This header is followed
- * by the actual data of the fragment.
- */
-struct Fragment
-{
-
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Fragment offset.
-   */
-  uint32_t off GNUNET_PACKED;
-
-  /**
-   * "unique" id for the fragment
-   */
-  uint64_t id GNUNET_PACKED;
-
-  size_t mtu;
-  uint32_t totalNum;
-
-};
-
-struct GNUNET_FRAGEMENT_Ctxbuffer{
-       uint64_t id;
-    uint16_t size;
-    char * buff;
-    int counter;
-    struct GNUNET_TIME_Absolute receivedTime;
-    struct GNUNET_PeerIdentity *peerID;
-       struct GNUNET_FRAGEMENT_Ctxbuffer *next;
-       int * num;
-};
+#include "fragmentation.h"
 
 
 /**
- * Defragmentation context.
+ * Absolute minimum delay we impose between sending and expecting ACK to arrive.
  */
-struct GNUNET_FRAGMENT_Context
-{
-       uint32_t maxNum;
-       struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
-};
+#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
 
 
 /**
- * Fragment an over-sized message.
- *
- * @param msg the message to fragment
- * @param mtu the maximum message size
- * @param proc function to call for each fragment
- * @param proc_cls closure for proc
+ * Fragmentation context.
  */
-void
-GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
-                          uint16_t mtu,
-                          GNUNET_FRAGMENT_MessageProcessor proc,
-                          void *proc_cls)
+struct GNUNET_FRAGMENT_Context
 {
-       uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256);
-       size_t size = sizeof(struct Fragment);
-       if(msg->size > mtu){
-               uint16_t lastSize = (msg->size) % (mtu-size);
-               int num = ceil(msg->size / mtu - size);
-               int i;
-               for(i = 0; i<num; i++){
-                       struct Fragment *frag = (struct Fragment *)GNUNET_malloc(size);
-                       frag->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
-                       frag->id = htonl(id);
-                       frag->off = htons(mtu*i);
-                       frag->mtu = htons(mtu);
-                       if(lastSize!=0){
-                               frag->totalNum = htons(num+1);
-                       }
-                       else{
-                               frag->totalNum = htons(num);
-                       }
-            if(i!=num-1){
-               frag->header.size = htons(mtu - size);
-               memcpy((char*)&frag[1], (char *)&msg[1]+frag->off, mtu - size);
-            }
-            else{
-               frag->header.size = htons(lastSize);
-                memcpy((char*)&frag[1], (char *)&msg[1]+frag->off, lastSize);
-            }
-            proc(proc_cls, &frag->header);
-            free(frag);
-               }
-       }
-}
-
+  /**
+   * Statistics to use.
+   */
+  struct GNUNET_STATISTICS_Handle *stats;
 
-/**
- * Create a defragmentation context.
- *
- * @param stats statistics context
- * @param proc function to call with defragmented messages
- * @param proc_cls closure for proc
- * @return the defragmentation context
- */
-struct GNUNET_FRAGMENT_Context *
-GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
-                                GNUNET_FRAGMENT_MessageProcessor proc,
-                                void *proc_cls)
-{
-       struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context));
-       ctx->maxNum = 100;
-       ctx->buffer = NULL;
-       return ctx;
-}
+  /**
+   * Tracker for flow control.
+   */
+  struct GNUNET_BANDWIDTH_Tracker *tracker;
 
+  /**
+   * Current expected delay for ACKs.
+   */
+  struct GNUNET_TIME_Relative ack_delay;
 
-/**
- * Destroy the given defragmentation context.
- */
-void
-GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
-{
-       struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
-       for(buffer = ctx->buffer; buffer!=NULL; buffer = buffer->next){
-               GNUNET_free(buffer->num);
-               GNUNET_free(buffer);
-       }
-       GNUNET_free(ctx);
-       GNUNET_assert (0);
-}
+  /**
+   * Current expected delay between messages.
+   */
+  struct GNUNET_TIME_Relative msg_delay;
 
+  /**
+   * Next allowed transmission time.
+   */
+  struct GNUNET_TIME_Absolute delay_until;
 
-/**
- * We have received a fragment.  Process it.
- *
- * @param ctx the context
- * @param sender who transmitted the fragment
- * @param msg the message that was received
- */
-void
-GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
-                         const struct GNUNET_PeerIdentity *sender,
-                         const struct GNUNET_MessageHeader *msg)
-{
-          uint16_t type = ntohs(msg->type);
-          int exited = 0, received = 0;
-       if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){
-          return;
-       }
-       struct Fragment *frag = (struct Fragment *)msg;
-       struct GNUNET_FRAGEMENT_Ctxbuffer* buffer;
-       for(buffer = ctx->buffer; buffer!= NULL; buffer = buffer->next){
-          if(ctx->buffer->counter == ntohs(frag->totalNum)){return;}
-          if(buffer->id == ntohl(frag->id)&&(buffer->peerID==sender)){
-                  exited = 1;
-                  int i;
-                  for(i = 0; i<ntohs(frag->totalNum); i++){
-                    if(buffer->num[i]==ntohs(frag->off)/ntohs(frag->mtu)){
-                        received = 1;
-                        break;
-                    }
-              }
-                  if(!received){
-                          buffer->num[buffer->counter++]=ntohs(frag->off)/ntohs(frag->mtu);
-                            }
-                  buffer->receivedTime = GNUNET_TIME_absolute_get ();
-                  uint16_t size = ntohs(frag->header.size);
-                  memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], size);
-                  break;
-          }
-       }
-       if(!exited){
-          buffer = (struct GNUNET_FRAGEMENT_Ctxbuffer* )GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer));
-          buffer->num = (int*)GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int));
-          buffer->num[buffer->counter++]=ntohs(frag->off)/ntohs(frag->mtu);
-          memcpy(buffer->peerID,sender,sizeof(struct GNUNET_PeerIdentity));
-          buffer->receivedTime = GNUNET_TIME_absolute_get ();
-                  uint16_t size = ntohs(frag->header.size);
-                  memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], size);
-       }
+  /**
+   * Time we transmitted the last message of the last round.
+   */
+  struct GNUNET_TIME_Absolute last_round;
 
-}
+  /**
+   * Message to fragment (allocated at the end of this struct).
+   */
+  const struct GNUNET_MessageHeader *msg;
 
+  /**
+   * Function to call for transmissions.
+   */
+  GNUNET_FRAGMENT_MessageProcessor proc;
 
+  /**
+   * Closure for @e proc.
+   */
+  void *proc_cls;
 
-#if 0
+  /**
+   * Bitfield, set to 1 for each unacknowledged fragment.
+   */
+  uint64_t acks;
 
-/**
* How many buckets does the fragment hash table
- * have?
- */
-#define DEFRAG_BUCKET_COUNT 16
+  /**
  * Bitfield with all possible bits for @e acks (used to mask the
+   * ack we get back).
  */
+  uint64_t acks_mask;
 
-/**
- * After how long do fragments time out?
- */
-#ifndef DEFRAGMENTATION_TIMEOUT
-#define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
-#endif
+  /**
+   * Task performing work for the fragmenter.
+   */
+  struct GNUNET_SCHEDULER_Task *task;
 
-/**
- * Entry in the linked list of fragments.
- */
-typedef struct FL
-{
-  struct FL *link;
-  P2P_fragmentation_MESSAGE *frag;
-} FL;
+  /**
+   * Our fragmentation ID. (chosen at random)
+   */
+  uint32_t fragment_id;
 
-/**
- * Entry in the GNUNET_hash table of fragments.
- */
-typedef struct FC
-{
-  struct FC *next;
-  FL *head;
-  GNUNET_PeerIdentity sender;
-  int id;
-  GNUNET_CronTime ttl;
-} FC;
+  /**
+   * Round-robin selector for the next transmission.
+   */
+  unsigned int next_transmission;
 
-#define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
+  /**
+   * How many rounds of transmission have we completed so far?
+   */
+  unsigned int num_rounds;
 
-static GNUNET_CoreAPIForPlugins *coreAPI;
+  /**
+   * How many transmission have we completed in this round?
+   */
+  unsigned int num_transmissions;
 
-static GNUNET_Stats_ServiceAPI *stats;
+  /**
+   * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
+   */
+  int8_t proc_busy;
 
-static int stat_defragmented;
+  /**
+   * #GNUNET_YES if we are waiting for an ACK.
+   */
+  int8_t wack;
 
-static int stat_fragmented;
+  /**
+   * Target fragment size.
+   */
+  uint16_t mtu;
 
-static int stat_discarded;
+};
 
-/**
- * Hashtable *with* collision management!
- */
-static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
 
 /**
- * Lock for the defragmentation cache.
+ * Convert an ACK message to a printable format suitable for logging.
+ *
+ * @param ack message to print
+ * @return ack in human-readable format
  */
-static struct GNUNET_Mutex *defragCacheLock;
-
-static void
-freeFL (FL * fl, int c)
+const char *
+GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
 {
-  while (fl != NULL)
-    {
-      FL *link = fl->link;
-      if (stats != NULL)
-        stats->change (stat_discarded, c);
-      GNUNET_free (fl->frag);
-      GNUNET_free (fl);
-      fl = link;
-    }
+  static char buf[128];
+  const struct FragmentAcknowledgement *fa;
+
+  if (sizeof (struct FragmentAcknowledgement) !=
+      htons (ack->size))
+    return "<malformed ack>";
+  fa = (const struct FragmentAcknowledgement *) ack;
+  GNUNET_snprintf (buf,
+                   sizeof (buf),
+                   "%u-%llX",
+                   ntohl (fa->fragment_id),
+                   GNUNET_ntohll (fa->bits));
+  return buf;
 }
 
-/**
- * This cron job ensures that we purge buffers of fragments
- * that have timed out.  It can run in much longer intervals
- * than the defragmentationCron, e.g. every 60s.
- * <p>
- * This method goes through the hashtable, finds entries that
- * have timed out and removes them (and all the fragments that
- * belong to the entry).  It's a bit more complicated as the
- * collision list is also collapsed.
- */
-static void
-defragmentationPurgeCron (void *unused)
-{
-  int i;
-  FC *smf;
-  FC *next;
-  FC *last;
-
-  GNUNET_mutex_lock (defragCacheLock);
-  for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
-    {
-      last = NULL;
-      smf = defragmentationCache[i];
-      while (smf != NULL)
-        {
-          if (smf->ttl < GNUNET_get_time ())
-            {
-              /* free linked list of fragments */
-              freeFL (smf->head, 1);
-              next = smf->next;
-              GNUNET_free (smf);
-              if (last == NULL)
-                defragmentationCache[i] = next;
-              else
-                last->next = next;
-              smf = next;
-            }
-          else
-            {
-              last = smf;
-              smf = smf->next;
-            }
-        }                       /* while smf != NULL */
-    }                           /* for all buckets */
-  GNUNET_mutex_unlock (defragCacheLock);
-}
 
 /**
- * Check if this fragment-list is complete.  If yes, put it together,
- * process and free all buffers.  Does not free the pep
- * itself (but sets the TTL to 0 to have the cron free it
- * in the next iteration).
+ * Transmit the next fragment to the other peer.
  *
- * @param pep the entry in the GNUNET_hash table
+ * @param cls the `struct GNUNET_FRAGMENT_Context`
  */
 static void
-checkComplete (FC * pep)
+transmit_next (void *cls)
 {
-  FL *pos;
-  unsigned short off;
-  unsigned short len;
-  char *msg;
-
-  GNUNET_GE_ASSERT (NULL, pep != NULL);
-  pos = pep->head;
-  if (pos == NULL)
+  struct GNUNET_FRAGMENT_Context *fc = cls;
+  char msg[fc->mtu];
+  const char *mbuf;
+  struct FragmentHeader *fh;
+  struct GNUNET_TIME_Relative delay;
+  unsigned int bit;
+  size_t size;
+  size_t fsize;
+  int wrap;
+
+  fc->task = NULL;
+  GNUNET_assert (GNUNET_NO == fc->proc_busy);
+  if (0 == fc->acks)
+    return;                     /* all done */
+  /* calculate delay */
+  wrap = 0;
+  while (0 == (fc->acks & (1LLU << fc->next_transmission)))
+  {
+    fc->next_transmission = (fc->next_transmission + 1) % 64;
+    wrap |= (0 == fc->next_transmission);
+  }
+  bit = fc->next_transmission;
+  size = ntohs (fc->msg->size);
+  if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+    fsize =
+        (size % (fc->mtu - sizeof (struct FragmentHeader))) +
+        sizeof (struct FragmentHeader);
+  else
+    fsize = fc->mtu;
+  if (NULL != fc->tracker)
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                fsize);
+  else
+    delay = GNUNET_TIME_UNIT_ZERO;
+  if (delay.rel_value_us > 0)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Fragmentation logic delays transmission of next fragment by %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+    fc->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                             &transmit_next,
+                                             fc);
     return;
-  len = ntohs (pos->frag->len);
-  if (len == 0)
-    goto CLEANUP;               /* really bad error! */
-  off = 0;
-  while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
-    {
-      if (off >= off + FRAGSIZE (pos))
-        goto CLEANUP;           /* error! */
-      if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
-        off = ntohs (pos->frag->off) + FRAGSIZE (pos);
-      else
-        goto CLEANUP;           /* error! */
-      pos = pos->link;
-    }
-  if (off < len)
-    return;                     /* some fragment is still missing */
-
-  msg = GNUNET_malloc (len);
-  pos = pep->head;
-  while (pos != NULL)
-    {
-      memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
-      pos = pos->link;
-    }
-  if (stats != NULL)
-    stats->change (stat_defragmented, 1);
-#if 0
-  printf ("Finished defragmentation!\n");
-#endif
-  /* handle message! */
-  coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
-  GNUNET_free (msg);
-CLEANUP:
-  /* free fragment buffers */
-  freeFL (pep->head, 0);
-  pep->head = NULL;
-  pep->ttl = 0;
+  }
+  fc->next_transmission = (fc->next_transmission + 1) % 64;
+  wrap |= (0 == fc->next_transmission);
+  while (0 == (fc->acks & (1LLU << fc->next_transmission)))
+  {
+    fc->next_transmission = (fc->next_transmission + 1) % 64;
+    wrap |= (0 == fc->next_transmission);
+  }
+
+  /* assemble fragmentation message */
+  mbuf = (const char *) &fc[1];
+  fh = (struct FragmentHeader *) msg;
+  fh->header.size = htons (fsize);
+  fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
+  fh->fragment_id = htonl (fc->fragment_id);
+  fh->total_size = fc->msg->size;       /* already in big-endian */
+  fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
+  GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
+          fsize - sizeof (struct FragmentHeader));
+  if (NULL != fc->tracker)
+    GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragments transmitted"),
+                            1,
+                            GNUNET_NO);
+  if (0 != fc->last_round.abs_value_us)
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# fragments retransmitted"),
+                              1,
+                              GNUNET_NO);
+
+  /* select next message to calculate delay */
+  bit = fc->next_transmission;
+  size = ntohs (fc->msg->size);
+  if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+    fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
+  else
+    fsize = fc->mtu;
+  if (NULL != fc->tracker)
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                fsize);
+  else
+    delay = GNUNET_TIME_UNIT_ZERO;
+  if (fc->num_rounds < 64)
+    delay = GNUNET_TIME_relative_max (delay,
+                                      GNUNET_TIME_relative_saturating_multiply
+                                      (fc->msg_delay,
+                                       (1ULL << fc->num_rounds)));
+  else
+    delay = GNUNET_TIME_UNIT_FOREVER_REL;
+  if (wrap)
+  {
+    /* full round transmitted wait 2x delay for ACK before going again */
+    fc->num_rounds++;
+    delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
+    /* never use zero, need some time for ACK always */
+    delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
+    fc->wack = GNUNET_YES;
+    fc->last_round = GNUNET_TIME_absolute_get ();
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# fragments wrap arounds"),
+                              1,
+                              GNUNET_NO);
+  }
+  fc->proc_busy = GNUNET_YES;
+  fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
+  fc->num_transmissions++;
+  fc->proc (fc->proc_cls,
+            &fh->header);
 }
 
+
 /**
- * See if the new fragment is a part of this entry and join them if
- * yes.  Return GNUNET_SYSERR if the fragments do not match.  Return GNUNET_OK if
- * the fragments do match and the fragment has been processed.  The
- * defragCacheLock is already acquired by the caller whenever this
- * method is called.<p>
+ * Create a fragmentation context for the given message.
+ * Fragments the message into fragments of size @a mtu or
+ * less.  Calls @a proc on each un-acknowledged fragment,
+ * using both the expected @a msg_delay between messages and
+ * acknowledgements and the given @a tracker to guide the
+ * frequency of calls to @a proc.
  *
- * @param entry the entry in the cache
- * @param pep the new entry
- * @param packet the ip part in the new entry
+ * @param stats statistics context
+ * @param mtu the maximum message size for each fragment
+ * @param tracker bandwidth tracker to use for flow control (can be NULL)
+ * @param msg_delay initial delay to insert between fragment transmissions
+ *              based on previous messages
+ * @param ack_delay expected delay between fragment transmission
+ *              and ACK based on previous messages
+ * @param msg the message to fragment
+ * @param proc function to call for each fragment to transmit
+ * @param proc_cls closure for @a proc
+ * @return the fragmentation context
  */
-static int
-tryJoin (FC * entry,
-         const GNUNET_PeerIdentity * sender,
-         const P2P_fragmentation_MESSAGE * packet)
+struct GNUNET_FRAGMENT_Context *
+GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+                                uint16_t mtu,
+                                struct GNUNET_BANDWIDTH_Tracker *tracker,
+                                struct GNUNET_TIME_Relative msg_delay,
+                                struct GNUNET_TIME_Relative ack_delay,
+                                const struct GNUNET_MessageHeader *msg,
+                                GNUNET_FRAGMENT_MessageProcessor proc,
+                                void *proc_cls)
 {
-  /* frame before ours; may end in the middle of
-     our frame or before it starts; NULL if we are
-     the earliest position we have received so far */
-  FL *before;
-  /* frame after ours; may start in the middle of
-     our frame or after it; NULL if we are the last
-     fragment we have received so far */
-  FL *after;
-  /* current position in the frame-list */
-  FL *pos;
-  /* the new entry that we're inserting */
-  FL *pep;
-  FL *tmp;
-  unsigned short end;
-
-  GNUNET_GE_ASSERT (NULL, entry != NULL);
-  if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
-    return GNUNET_SYSERR;       /* wrong fragment list, try another! */
-  if (ntohl (packet->id) != entry->id)
-    return GNUNET_SYSERR;       /* wrong fragment list, try another! */
-#if 0
-  printf ("Received fragment %u from %u to %u\n",
-          ntohl (packet->id),
-          ntohs (packet->off),
-          ntohs (packet->off) + ntohs (packet->header.size) -
-          sizeof (P2P_fragmentation_MESSAGE));
-#endif
-  pos = entry->head;
-  if ((pos != NULL) && (packet->len != pos->frag->len))
-    return GNUNET_SYSERR;       /* wrong fragment size */
-
-  before = NULL;
-  /* find the before-frame */
-  while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
-    {
-      before = pos;
-      pos = pos->link;
-    }
-
-  /* find the after-frame */
-  end =
-    ntohs (packet->off) + ntohs (packet->header.size) -
-    sizeof (P2P_fragmentation_MESSAGE);
-  if (end <= ntohs (packet->off))
-    {
-      GNUNET_GE_LOG (NULL,
-                     GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
-                     "Received invalid fragment at %s:%d\n", __FILE__,
-                     __LINE__);
-      return GNUNET_SYSERR;     /* yuck! integer overflow! */
-    }
-
-  if (before != NULL)
-    after = before;
+  struct GNUNET_FRAGMENT_Context *fc;
+  size_t size;
+  uint64_t bits;
+
+  GNUNET_STATISTICS_update (stats,
+                            _("# messages fragmented"),
+                            1,
+                            GNUNET_NO);
+  GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
+  size = ntohs (msg->size);
+  GNUNET_STATISTICS_update (stats,
+                            _("# total size of fragmented messages"),
+                            size, GNUNET_NO);
+  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
+  fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
+  fc->stats = stats;
+  fc->mtu = mtu;
+  fc->tracker = tracker;
+  fc->ack_delay = ack_delay;
+  fc->msg_delay = msg_delay;
+  fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
+  fc->proc = proc;
+  fc->proc_cls = proc_cls;
+  fc->fragment_id =
+      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                UINT32_MAX);
+  GNUNET_memcpy (&fc[1], msg, size);
+  bits =
+      (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
+                                                           sizeof (struct
+                                                                   FragmentHeader));
+  GNUNET_assert (bits <= 64);
+  if (bits == 64)
+    fc->acks_mask = UINT64_MAX; /* set all 64 bit */
   else
-    after = entry->head;
-  while ((after != NULL) && (ntohs (after->frag->off) < end))
-    after = after->link;
-
-  if ((before != NULL) && (before == after))
-    {
-      /* this implies after or before != NULL and thereby the new
-         fragment is redundant as it is fully enclosed in an earlier
-         fragment */
-      if (stats != NULL)
-        stats->change (stat_defragmented, 1);
-      return GNUNET_OK;         /* drop, there is a packet that spans our range! */
-    }
-
-  if ((before != NULL) &&
-      (after != NULL) &&
-      ((htons (before->frag->off) +
-        FRAGSIZE (before)) >= htons (after->frag->off)))
-    {
-      /* this implies that the fragment that starts before us and the
-         fragment that comes after this one leave no space in the middle
-         or even overlap; thus we can drop this redundant piece */
-      if (stats != NULL)
-        stats->change (stat_defragmented, 1);
-      return GNUNET_OK;
-    }
-
-  /* allocate pep */
-  pep = GNUNET_malloc (sizeof (FC));
-  pep->frag = GNUNET_malloc (ntohs (packet->header.size));
-  memcpy (pep->frag, packet, ntohs (packet->header.size));
-  pep->link = NULL;
-
-  if (before == NULL)
-    {
-      pep->link = after;
-      pos = entry->head;
-      while (pos != after)
-        {
-          tmp = pos->link;
-          GNUNET_free (pos->frag);
-          GNUNET_free (pos);
-          pos = tmp;
-        }
-      entry->head = pep;
-      goto FINISH;
-      /* end of insert first */
-    }
-
-  if (after == NULL)
-    {
-      /* insert last: find the end, free everything after it */
-      freeFL (before->link, 1);
-      before->link = pep;
-      goto FINISH;
-    }
-
-  /* ok, we are filling the middle between two fragments; insert.  If
-     there is anything else in the middle, it can be dropped as we're
-     bigger & cover that area as well */
-  /* free everything between before and after */
-  pos = before->link;
-  while (pos != after)
-    {
-      tmp = pos->link;
-      GNUNET_free (pos->frag);
-      GNUNET_free (pos);
-      pos = tmp;
-    }
-  before->link = pep;
-  pep->link = after;
-
-FINISH:
-  entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
-  checkComplete (entry);
-  return GNUNET_OK;
+    fc->acks_mask = (1LLU << bits) - 1;  /* set lowest 'bits' bit */
+  fc->acks = fc->acks_mask;
+  fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
+  return fc;
 }
 
+
 /**
- * Defragment the given fragment and pass to handler once
- * defragmentation is complete.
+ * Continuation to call from the 'proc' function after the fragment
+ * has been transmitted (and hence the next fragment can now be
+ * given to proc).
  *
- * @param frag the packet to defragment
- * @return GNUNET_SYSERR if the fragment is invalid
+ * @param fc fragmentation context
  */
-static int
-processFragment (const GNUNET_PeerIdentity * sender,
-                 const GNUNET_MessageHeader * frag)
+void
+GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
 {
-  unsigned int hash;
-  FC *smf;
-
-  if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
-    return GNUNET_SYSERR;
-
-  GNUNET_mutex_lock (defragCacheLock);
-  hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
-  smf = defragmentationCache[hash];
-  while (smf != NULL)
-    {
-      if (GNUNET_OK ==
-          tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
-        {
-          GNUNET_mutex_unlock (defragCacheLock);
-          return GNUNET_OK;
-        }
-      if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
-        {
-          freeFL (smf->head, 1);
-          break;
-        }
-      smf = smf->next;
-    }
-  if (smf == NULL)
-    {
-      smf = GNUNET_malloc (sizeof (FC));
-      smf->next = defragmentationCache[hash];
-      defragmentationCache[hash] = smf;
-      smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
-      smf->sender = *sender;
-    }
-  smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
-  smf->head = GNUNET_malloc (sizeof (FL));
-  smf->head->link = NULL;
-  smf->head->frag = GNUNET_malloc (ntohs (frag->size));
-  memcpy (smf->head->frag, frag, ntohs (frag->size));
-
-  GNUNET_mutex_unlock (defragCacheLock);
-  return GNUNET_OK;
+  GNUNET_assert (fc->proc_busy == GNUNET_YES);
+  fc->proc_busy = GNUNET_NO;
+  GNUNET_assert (fc->task == NULL);
+  fc->task =
+    GNUNET_SCHEDULER_add_at (fc->delay_until,
+                             &transmit_next,
+                             fc);
 }
 
-typedef struct
-{
-  GNUNET_PeerIdentity sender;
-  /* maximums size of each fragment */
-  unsigned short mtu;
-  /** how long is this message part expected to be? */
-  unsigned short len;
-  /** when did we intend to transmit? */
-  GNUNET_CronTime transmissionTime;
-} FragmentBMC;
 
 /**
- * Send a message that had to be fragmented (right now!).  First grabs
- * the first part of the message (obtained from ctx->se) and stores
- * that in a P2P_fragmentation_MESSAGE envelope.  The remaining fragments are
- * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
- * will be transmitted next).  The logic here is that if the priority
- * for the first fragment was sufficiently high, the priority should
- * also have been sufficiently high for all of the other fragments (at
- * this time) since they have the same priority.  And we want to make
- * sure that we send all of them since just sending the first fragment
- * and then going to other messages of equal priority would not be
- * such a great idea (i.e. would just waste bandwidth).
+ * Process an acknowledgement message we got from the other
+ * side (to control re-transmits).
+ *
+ * @param fc fragmentation context
+ * @param msg acknowledgement message we received
+ * @return #GNUNET_OK if this ack completes the work of the 'fc'
+ *                   (all fragments have been received);
+ *         #GNUNET_NO if more messages are pending
+ *         #GNUNET_SYSERR if this ack is not valid for this fc
  */
-static int
-fragmentBMC (void *buf, void *cls, unsigned short len)
+int
+GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
+                             const struct GNUNET_MessageHeader *msg)
 {
-  FragmentBMC *ctx = cls;
-  static int idGen = 0;
-  P2P_fragmentation_MESSAGE *frag;
-  unsigned int pos;
-  int id;
-  unsigned short mlen;
-
-  if ((len < ctx->mtu) || (buf == NULL))
+  const struct FragmentAcknowledgement *fa;
+  uint64_t abits;
+  struct GNUNET_TIME_Relative ndelay;
+  unsigned int ack_cnt;
+  unsigned int snd_cnt;
+  unsigned int i;
+
+  if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  fa = (const struct FragmentAcknowledgement *) msg;
+  if (ntohl (fa->fragment_id) != fc->fragment_id)
+    return GNUNET_SYSERR;       /* not our ACK */
+  abits = GNUNET_ntohll (fa->bits);
+  if ( (GNUNET_YES == fc->wack) &&
+       (0 != fc->num_transmissions) )
+  {
+    /* normal ACK, can update running average of delay... */
+    fc->wack = GNUNET_NO;
+    ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
+    fc->ack_delay.rel_value_us =
+        (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
+    /* calculate ratio msg sent vs. msg acked */
+    ack_cnt = 0;
+    snd_cnt = 0;
+    for (i=0;i<64;i++)
     {
-      GNUNET_free (ctx);
-      return GNUNET_SYSERR;
+      if (1 == (fc->acks_mask & (1ULL << i)))
+      {
+       snd_cnt++;
+       if (0 == (abits & (1ULL << i)))
+         ack_cnt++;
+      }
     }
-  if (stats != NULL)
-    stats->change (stat_fragmented, 1);
-  id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
-  /* write first fragment to buf */
-  frag = (P2P_fragmentation_MESSAGE *) buf;
-  frag->header.size = htons (len);
-  frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
-  frag->id = id;
-  frag->off = htons (0);
-  frag->len = htons (ctx->len);
-  memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
-
-  /* create remaining fragments, add to queue! */
-  pos = len - sizeof (P2P_fragmentation_MESSAGE);
-  frag = GNUNET_malloc (ctx->mtu);
-  while (pos < ctx->len)
+    if (0 == ack_cnt)
     {
-      mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
-      if (mlen > ctx->mtu)
-        mlen = ctx->mtu;
-      GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
-      frag->header.size = htons (mlen);
-      frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
-      frag->id = id;
-      frag->off = htons (pos);
-      frag->len = htons (ctx->len);
-      memcpy (&frag[1],
-              &((char *) (&ctx[1]))[pos],
-              mlen - sizeof (P2P_fragmentation_MESSAGE));
-      coreAPI->ciphertext_send (&ctx->sender,
-                                &frag->header,
-                                GNUNET_EXTREME_PRIORITY,
-                                ctx->transmissionTime - GNUNET_get_time ());
-      pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
+      /* complete loss */
+      fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
+                                                                snd_cnt);
     }
-  GNUNET_GE_ASSERT (NULL, pos == ctx->len);
-  GNUNET_free (frag);
-  GNUNET_free (ctx);
-  return GNUNET_OK;
-}
-
-/**
- * The given message must be fragmented.  Produce a placeholder that
- * corresponds to the first fragment.  Once that fragment is scheduled
- * for transmission, the placeholder should automatically add all of
- * the other fragments (with very high priority).
- */
-void
-fragment (const GNUNET_PeerIdentity * peer,
-          unsigned int mtu,
-          unsigned int prio,
-          unsigned int targetTime,
-          unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
-{
-  FragmentBMC *fbmc;
-  int xlen;
-
-  GNUNET_GE_ASSERT (NULL, len > mtu);
-  GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
-  fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
-  fbmc->mtu = mtu;
-  fbmc->sender = *peer;
-  fbmc->transmissionTime = targetTime;
-  fbmc->len = len;
-  if (bmc == NULL)
+    else if (snd_cnt > ack_cnt)
     {
-      memcpy (&fbmc[1], bmcClosure, len);
-      GNUNET_free (bmcClosure);
+      /* some loss, slow down proportionally */
+      fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
     }
-  else
+    else if (snd_cnt == ack_cnt)
     {
-      if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
-        {
-          GNUNET_free (fbmc);
-          return;
-        }
+      fc->msg_delay.rel_value_us =
+        (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
     }
-  xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
-  coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len,     /* compute new priority */
-                                          targetTime);
-}
-
-/**
- * Initialize Fragmentation module.
- */
-GNUNET_Fragmentation_ServiceAPI *
-provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
-{
-  static GNUNET_Fragmentation_ServiceAPI ret;
-  int i;
-
-  coreAPI = capi;
-  stats = coreAPI->service_request ("stats");
-  if (stats != NULL)
+    fc->num_transmissions = 0;
+    fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
+                                             GNUNET_TIME_UNIT_SECONDS);
+    fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
+                                             GNUNET_TIME_UNIT_SECONDS);
+  }
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragment acknowledgements received"),
+                            1,
+                            GNUNET_NO);
+  if (abits != (fc->acks & abits))
+  {
+    /* ID collission or message reordering, count! This should be rare! */
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# bits removed from fragmentation ACKs"), 1,
+                              GNUNET_NO);
+  }
+  fc->acks = abits & fc->acks_mask;
+  if (0 != fc->acks)
+  {
+    /* more to transmit, do so right now (if tracker permits...) */
+    if (fc->task != NULL)
+    {
+      /* schedule next transmission now, no point in waiting... */
+      GNUNET_SCHEDULER_cancel (fc->task);
+      fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
+    }
+    else
     {
-      stat_defragmented =
-        stats->create (gettext_noop ("# messages defragmented"));
-      stat_fragmented =
-        stats->create (gettext_noop ("# messages fragmented"));
-      stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
+      /* only case where there is no task should be if we're waiting
+       * for the right to transmit again (proc_busy set to YES) */
+      GNUNET_assert (GNUNET_YES == fc->proc_busy);
     }
-  for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
-    defragmentationCache[i] = NULL;
-  defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
-  GNUNET_cron_add_job (coreAPI->cron,
-                       &defragmentationPurgeCron,
-                       60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
-                       NULL);
-  GNUNET_GE_LOG (capi->ectx,
-                 GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
-                 _("`%s' registering handler %d\n"), "fragmentation",
-                 GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
-  capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
-                                         &processFragment);
-
-  ret.fragment = &fragment;
-  return &ret;
+    return GNUNET_NO;
+  }
+
+  /* all done */
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragmentation transmissions completed"),
+                            1,
+                            GNUNET_NO);
+  if (NULL != fc->task)
+  {
+    GNUNET_SCHEDULER_cancel (fc->task);
+    fc->task = NULL;
+  }
+  return GNUNET_OK;
 }
 
+
 /**
- * Shutdown fragmentation.
+ * Destroy the given fragmentation context (stop calling 'proc', free
+ * resources).
+ *
+ * @param fc fragmentation context
+ * @param msg_delay where to store average delay between individual message transmissions the
+ *         last message (OUT only)
+ * @param ack_delay where to store average delay between transmission and ACK for the
+ *         last message, set to FOREVER if the message was not fully transmitted (OUT only)
  */
 void
-release_module_fragmentation ()
+GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
+                                struct GNUNET_TIME_Relative *msg_delay,
+                                struct GNUNET_TIME_Relative *ack_delay)
 {
-  int i;
-
-  coreAPI->p2p_ciphertext_handler_unregister
-    (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
-  GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
-                       60 * GNUNET_CRON_SECONDS, NULL);
-  for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
-    {
-      FC *pos = defragmentationCache[i];
-      while (pos != NULL)
-        {
-          FC *next = pos->next;
-          freeFL (pos->head, 1);
-          GNUNET_free (pos);
-          pos = next;
-        }
-    }
-  if (stats != NULL)
-    {
-      coreAPI->service_release (stats);
-      stats = NULL;
-    }
-  GNUNET_mutex_destroy (defragCacheLock);
-  defragCacheLock = NULL;
-  coreAPI = NULL;
+  if (fc->task != NULL)
+    GNUNET_SCHEDULER_cancel (fc->task);
+  if (NULL != ack_delay)
+    *ack_delay = fc->ack_delay;
+  if (NULL != msg_delay)
+    *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
+                                                           fc->num_rounds);
+  GNUNET_free (fc);
 }
 
-#endif
 
 /* end of fragmentation.c */