Merge branch 'license/spdx'
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
index ecd82b2c4ba4e1809ebfa6fae1bcaa28b46b0b61..9fca6eef024a693bcb425504f71e85bd3f8a8c6c 100644 (file)
 /*
      This file is part of GNUnet
-     (C) 2004, 2006, 2009 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009-2013 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
- */
+     SPDX-License-Identifier: AGPL3.0-or-later
+*/
 /**
- * @file fragmentation/fragmentation.c
- * @brief fragmentation and defragmentation, this code allows
- *        sending and receiving messages that are larger than
- *        the MTU of the transport.  Messages are still limited
- *        to a maximum size of 65535 bytes, which is a good
- *        idea because otherwise we may need ungainly fragmentation
- *        buffers.  Each connected peer can have at most one
- *        fragmented packet at any given point in time (prevents
- *        DoS attacks).  Fragmented messages that have not been
- *        completed after a certain amount of time are discarded.
- * @author Ji Lu
+ * @file src/fragmentation/fragmentation.c
+ * @brief library to help fragment messages
+ * @author Christian Grothoff
  */
-
 #include "platform.h"
 #include "gnunet_fragmentation_lib.h"
 #include "gnunet_protocols.h"
-#include "gnunet_util_lib.h"
+#include "fragmentation.h"
+
 
 /**
- * Message fragment.  This header is followed
- * by the actual data of the fragment.
+ * Absolute minimum delay we impose between sending and expecting ACK to arrive.
  */
-
-struct Fragment
-{
-
-       struct GNUNET_MessageHeader header;
-
-       /**
-        * Fragment offset.
-        */
-       uint16_t off GNUNET_PACKED;
-
-       /**
-       * "unique" id for the fragment
-        */
-       uint32_t id GNUNET_PACKED;
-       uint16_t mtu;
-       uint16_t totalNum;
-       uint16_t totalSize;
-
-};
-
-struct GNUNET_FRAGEMENT_Ctxbuffer{
-       struct GNUNET_FRAGEMENT_Ctxbuffer *next;
-       uint32_t id;
-       uint16_t size;
-       char * buff;
-       int counter;
-       struct GNUNET_TIME_Absolute receivedTime;
-       struct GNUNET_PeerIdentity peerID;
-       int * num;
-};
+#define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
 
 
 /**
- * Defragmentation context.
+ * Fragmentation context.
  */
 struct GNUNET_FRAGMENT_Context
 {
-       uint32_t maxNum;
-       struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
-       GNUNET_FRAGMENT_MessageProcessor proc;
-       void *proc_cls;
+  /**
+   * Statistics to use.
+   */
+  struct GNUNET_STATISTICS_Handle *stats;
+
+  /**
+   * Tracker for flow control.
+   */
+  struct GNUNET_BANDWIDTH_Tracker *tracker;
+
+  /**
+   * Current expected delay for ACKs.
+   */
+  struct GNUNET_TIME_Relative ack_delay;
+
+  /**
+   * Current expected delay between messages.
+   */
+  struct GNUNET_TIME_Relative msg_delay;
+
+  /**
+   * Next allowed transmission time.
+   */
+  struct GNUNET_TIME_Absolute delay_until;
+
+  /**
+   * Time we transmitted the last message of the last round.
+   */
+  struct GNUNET_TIME_Absolute last_round;
+
+  /**
+   * Message to fragment (allocated at the end of this struct).
+   */
+  const struct GNUNET_MessageHeader *msg;
+
+  /**
+   * Function to call for transmissions.
+   */
+  GNUNET_FRAGMENT_MessageProcessor proc;
+
+  /**
+   * Closure for @e proc.
+   */
+  void *proc_cls;
+
+  /**
+   * Bitfield, set to 1 for each unacknowledged fragment.
+   */
+  uint64_t acks;
+
+  /**
+   * Bitfield with all possible bits for @e acks (used to mask the
+   * ack we get back).
+   */
+  uint64_t acks_mask;
+
+  /**
+   * Task performing work for the fragmenter.
+   */
+  struct GNUNET_SCHEDULER_Task *task;
+
+  /**
+   * Our fragmentation ID. (chosen at random)
+   */
+  uint32_t fragment_id;
+
+  /**
+   * Round-robin selector for the next transmission.
+   */
+  unsigned int next_transmission;
+
+  /**
+   * How many rounds of transmission have we completed so far?
+   */
+  unsigned int num_rounds;
+
+  /**
+   * How many transmission have we completed in this round?
+   */
+  unsigned int num_transmissions;
+
+  /**
+   * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
+   */
+  int8_t proc_busy;
+
+  /**
+   * #GNUNET_YES if we are waiting for an ACK.
+   */
+  int8_t wack;
+
+  /**
+   * Target fragment size.
+   */
+  uint16_t mtu;
+
 };
 
 
 /**
- * Fragment an over-sized message.
+ * Convert an ACK message to a printable format suitable for logging.
  *
- * @param msg the message to fragment
- * @param mtu the maximum message size
- * @param proc function to call for each fragment
- * @param proc_cls closure for proc
- */
-void
-GNUNET_FRAGMENT_fragment (const struct GNUNET_MessageHeader *msg,
-               uint16_t mtu,
-               GNUNET_FRAGMENT_MessageProcessor proc,
-               void *proc_cls)
-{
-       uint32_t id = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 256);
-       size_t size = sizeof(struct Fragment);
-
-       if(ntohs(msg->size) > mtu-size){
-               uint16_t lastSize;
-               uint16_t num;
-               uint16_t i;
-               uint16_t actualNum;
-               lastSize = ntohs(msg->size) % (mtu-size);
-               num     = ntohs(msg->size) / (mtu - size);
-               actualNum = num;
-               if(lastSize!=0){
-                       actualNum = num+1;
-               }
-               for(i = 0; i<actualNum; i++)
-               {
-                       struct Fragment *frag;
-                       if(actualNum != num){
-                               if(i!=actualNum-1){
-                                       frag = (struct Fragment *)GNUNET_malloc(mtu);
-                               }
-                               else{
-                                       frag = (struct Fragment *)GNUNET_malloc(lastSize+size);
-                                       }
-                               }
-                       else{
-                                       frag = (struct Fragment *)GNUNET_malloc(mtu);
-                               }
-                       frag->header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT);
-                       frag->id = htonl(id);
-                       frag->off = htons((mtu-size)*i);
-                       frag->mtu = htons(mtu);
-                       frag->totalNum = htons(actualNum);
-                       frag->totalSize = msg->size;
-                       char *tmpMsg =  (char *)msg;
-                       if(actualNum != num){
-                               if(i!=actualNum-1){
-                                       frag->header.size = htons(mtu);
-                                       memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
-                               }
-                               else{
-                                       frag->header.size = htons(lastSize+size);
-                                       memcpy(&frag[1], tmpMsg + (mtu-size)*i, lastSize);
-                               }
-                       }
-                       else{
-                               frag->header.size = htons(mtu);
-                               memcpy(&frag[1], tmpMsg + (mtu-size)*i, mtu - size);
-                       }
-                       proc(proc_cls, &frag->header);
-                       GNUNET_free(frag);
-               }
-       }
-}
-
-/**
- * Create a defragmentation context.
- *
- * @param stats statistics context
- * @param proc function to call with defragmented messages
- * @param proc_cls closure for proc
- * @return the defragmentation context
- */
-struct GNUNET_FRAGMENT_Context *
-GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
-               GNUNET_FRAGMENT_MessageProcessor proc,
-               void *proc_cls)
-               {
-       struct GNUNET_FRAGMENT_Context *ctx = (struct GNUNET_FRAGMENT_Context*)GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context));
-       ctx->maxNum = 100;
-       ctx->proc = proc;
-       ctx->proc_cls = proc_cls;
-       ctx->buffer = NULL;
-       return ctx;
-               }
-
-
-/**
- * Destroy the given defragmentation context.
+ * @param ack message to print
+ * @return ack in human-readable format
  */
-void
-GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *ctx)
+const char *
+GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
 {
-       struct GNUNET_FRAGEMENT_Ctxbuffer *buffer;
-       struct GNUNET_FRAGEMENT_Ctxbuffer *temp;
-       buffer = ctx->buffer;
-
-       while (buffer != NULL)
-         {
-            temp = buffer->next;
-            GNUNET_free(buffer->num);
-            GNUNET_free(buffer);
-            buffer = temp;
-          }
-       GNUNET_free(ctx);
-       GNUNET_assert (0);
+  static char buf[128];
+  const struct FragmentAcknowledgement *fa;
+
+  if (sizeof (struct FragmentAcknowledgement) !=
+      htons (ack->size))
+    return "<malformed ack>";
+  fa = (const struct FragmentAcknowledgement *) ack;
+  GNUNET_snprintf (buf,
+                   sizeof (buf),
+                   "%u-%llX",
+                   ntohl (fa->fragment_id),
+                   GNUNET_ntohll (fa->bits));
+  return buf;
 }
 
 
 /**
- * We have received a fragment.  Process it.
+ * Transmit the next fragment to the other peer.
  *
- * @param ctx the context
- * @param sender who transmitted the fragment
- * @param msg the message that was received
+ * @param cls the `struct GNUNET_FRAGMENT_Context`
  */
-void
-GNUNET_FRAGMENT_process (struct GNUNET_FRAGMENT_Context *ctx,
-               const struct GNUNET_PeerIdentity *sender,
-               const struct GNUNET_MessageHeader *msg)
-{
-       uint16_t type = ntohs(msg->type);
-       int exist = 0, received = 0;
-       if(type!=GNUNET_MESSAGE_TYPE_FRAGMENT){
-               return;
-       }
-       struct Fragment *frag = (struct Fragment *)msg;
-       struct GNUNET_FRAGEMENT_Ctxbuffer* buffer;
-       struct GNUNET_FRAGEMENT_Ctxbuffer* prev;
-       prev = NULL;
-       buffer = ctx->buffer;
-       while (buffer != NULL)
-       {
-               if ((buffer->id == ntohl(frag->id))&&(0 == memcmp (&buffer->peerID,
-                             sender, sizeof (struct GNUNET_PeerIdentity)))){
-                       exist = 1;
-                       break;
-               }
-               prev = buffer;
-               buffer = buffer->next;
-       }
-
-       if (exist)
-       {
-               int i;
-               for(i = 0; i<ntohs(frag->totalNum); i++){
-                       if(buffer->num[i]==ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment))){
-                               received = 1;
-                               break;
-                               }
-               }
-       }
-
-       if(!exist){
-               buffer = GNUNET_malloc(sizeof(struct GNUNET_FRAGEMENT_Ctxbuffer));
-               buffer->num = GNUNET_malloc(ntohs(frag->totalNum)*sizeof(int));
-               int j;
-               for(j = 0; j<ntohs(frag->totalNum); j++){
-                       buffer->num[j] = -10;
-               }
-               buffer->peerID = *sender;
-               buffer->id = ntohl(frag->id);
-               buffer->receivedTime = GNUNET_TIME_absolute_get ();
-               uint16_t si = ntohs(frag->totalSize);
-               buffer->size = si;
-               buffer->buff = GNUNET_malloc(si);
-               buffer->next = ctx->buffer;
-               ctx->buffer = buffer;
-       }
-
-       if(!received){
-               buffer->num[buffer->counter++]=ntohs(frag->off)/(ntohs(frag->mtu)-sizeof(struct Fragment));
-               uint16_t sizeoffrag = ntohs(frag->header.size) - sizeof(struct Fragment);
-               memcpy(&buffer->buff[ntohs(frag->off)], &frag[1], sizeoffrag);
-               buffer->receivedTime = GNUNET_TIME_absolute_get ();
-       }
-
-       if(buffer->counter == ntohs(frag->totalNum))
-       {
-               ctx->proc(ctx->proc_cls, (struct GNUNET_MessageHeader *)buffer->buff);
-               if(prev==NULL){
-                       ctx->buffer = buffer->next;
-               }
-               else{
-                       prev->next = buffer->next;
-               }
-               GNUNET_free(buffer);
-               return;
-       }
-}
-
-
-
-#if 0
-
-/**
- * How many buckets does the fragment hash table
- * have?
- */
-#define DEFRAG_BUCKET_COUNT 16
-
-/**
- * After how long do fragments time out?
- */
-#ifndef DEFRAGMENTATION_TIMEOUT
-#define DEFRAGMENTATION_TIMEOUT (3 * GNUNET_CRON_MINUTES)
-#endif
-
-/**
- * Entry in the linked list of fragments.
- */
-typedef struct FL
-{
-       struct FL *link;
-       P2P_fragmentation_MESSAGE *frag;
-} FL;
-
-/**
- * Entry in the GNUNET_hash table of fragments.
- */
-typedef struct FC
-{
-       struct FC *next;
-       FL *head;
-       GNUNET_PeerIdentity sender;
-       int id;
-       GNUNET_CronTime ttl;
-} FC;
-
-#define FRAGSIZE(fl) ((ntohs(fl->frag->header.size)-sizeof(P2P_fragmentation_MESSAGE)))
-
-static GNUNET_CoreAPIForPlugins *coreAPI;
-
-static GNUNET_Stats_ServiceAPI *stats;
-
-static int stat_defragmented;
-
-static int stat_fragmented;
-
-static int stat_discarded;
-
-/**
- * Hashtable *with* collision management!
- */
-static FC *defragmentationCache[DEFRAG_BUCKET_COUNT];
-
-/**
- * Lock for the defragmentation cache.
- */
-static struct GNUNET_Mutex *defragCacheLock;
-
 static void
-freeFL (FL * fl, int c)
+transmit_next (void *cls)
 {
-       while (fl != NULL)
-       {
-               FL *link = fl->link;
-               if (stats != NULL)
-                       stats->change (stat_discarded, c);
-               GNUNET_free (fl->frag);
-               GNUNET_free (fl);
-               fl = link;
-       }
+  struct GNUNET_FRAGMENT_Context *fc = cls;
+  char msg[fc->mtu];
+  const char *mbuf;
+  struct FragmentHeader *fh;
+  struct GNUNET_TIME_Relative delay;
+  unsigned int bit;
+  size_t size;
+  size_t fsize;
+  int wrap;
+
+  fc->task = NULL;
+  GNUNET_assert (GNUNET_NO == fc->proc_busy);
+  if (0 == fc->acks)
+    return;                     /* all done */
+  /* calculate delay */
+  wrap = 0;
+  while (0 == (fc->acks & (1LLU << fc->next_transmission)))
+  {
+    fc->next_transmission = (fc->next_transmission + 1) % 64;
+    wrap |= (0 == fc->next_transmission);
+  }
+  bit = fc->next_transmission;
+  size = ntohs (fc->msg->size);
+  if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+    fsize =
+        (size % (fc->mtu - sizeof (struct FragmentHeader))) +
+        sizeof (struct FragmentHeader);
+  else
+    fsize = fc->mtu;
+  if (NULL != fc->tracker)
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                fsize);
+  else
+    delay = GNUNET_TIME_UNIT_ZERO;
+  if (delay.rel_value_us > 0)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Fragmentation logic delays transmission of next fragment by %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+    fc->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                             &transmit_next,
+                                             fc);
+    return;
+  }
+  fc->next_transmission = (fc->next_transmission + 1) % 64;
+  wrap |= (0 == fc->next_transmission);
+  while (0 == (fc->acks & (1LLU << fc->next_transmission)))
+  {
+    fc->next_transmission = (fc->next_transmission + 1) % 64;
+    wrap |= (0 == fc->next_transmission);
+  }
+
+  /* assemble fragmentation message */
+  mbuf = (const char *) &fc[1];
+  fh = (struct FragmentHeader *) msg;
+  fh->header.size = htons (fsize);
+  fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
+  fh->fragment_id = htonl (fc->fragment_id);
+  fh->total_size = fc->msg->size;       /* already in big-endian */
+  fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
+  GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
+          fsize - sizeof (struct FragmentHeader));
+  if (NULL != fc->tracker)
+    GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragments transmitted"),
+                            1,
+                            GNUNET_NO);
+  if (0 != fc->last_round.abs_value_us)
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# fragments retransmitted"),
+                              1,
+                              GNUNET_NO);
+
+  /* select next message to calculate delay */
+  bit = fc->next_transmission;
+  size = ntohs (fc->msg->size);
+  if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+    fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
+  else
+    fsize = fc->mtu;
+  if (NULL != fc->tracker)
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                fsize);
+  else
+    delay = GNUNET_TIME_UNIT_ZERO;
+  if (fc->num_rounds < 64)
+    delay = GNUNET_TIME_relative_max (delay,
+                                      GNUNET_TIME_relative_saturating_multiply
+                                      (fc->msg_delay,
+                                       (1ULL << fc->num_rounds)));
+  else
+    delay = GNUNET_TIME_UNIT_FOREVER_REL;
+  if (wrap)
+  {
+    /* full round transmitted wait 2x delay for ACK before going again */
+    fc->num_rounds++;
+    delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
+    /* never use zero, need some time for ACK always */
+    delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
+    fc->wack = GNUNET_YES;
+    fc->last_round = GNUNET_TIME_absolute_get ();
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# fragments wrap arounds"),
+                              1,
+                              GNUNET_NO);
+  }
+  fc->proc_busy = GNUNET_YES;
+  fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
+  fc->num_transmissions++;
+  fc->proc (fc->proc_cls,
+            &fh->header);
 }
 
-/**
- * This cron job ensures that we purge buffers of fragments
- * that have timed out.  It can run in much longer intervals
- * than the defragmentationCron, e.g. every 60s.
- * <p>
- * This method goes through the hashtable, finds entries that
- * have timed out and removes them (and all the fragments that
- * belong to the entry).  It's a bit more complicated as the
- * collision list is also collapsed.
- */
-static void
-defragmentationPurgeCron (void *unused)
-{
-       int i;
-       FC *smf;
-       FC *next;
-       FC *last;
-
-       GNUNET_mutex_lock (defragCacheLock);
-       for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
-       {
-               last = NULL;
-               smf = defragmentationCache[i];
-               while (smf != NULL)
-               {
-                       if (smf->ttl < GNUNET_get_time ())
-                       {
-                               /* free linked list of fragments */
-                               freeFL (smf->head, 1);
-                               next = smf->next;
-                               GNUNET_free (smf);
-                               if (last == NULL)
-                                       defragmentationCache[i] = next;
-                               else
-                                       last->next = next;
-                               smf = next;
-                       }
-                       else
-                       {
-                               last = smf;
-                               smf = smf->next;
-                       }
-               }                       /* while smf != NULL */
-       }                           /* for all buckets */
-       GNUNET_mutex_unlock (defragCacheLock);
-}
 
 /**
- * Check if this fragment-list is complete.  If yes, put it together,
- * process and free all buffers.  Does not free the pep
- * itself (but sets the TTL to 0 to have the cron free it
- * in the next iteration).
+ * Create a fragmentation context for the given message.
+ * Fragments the message into fragments of size @a mtu or
+ * less.  Calls @a proc on each un-acknowledged fragment,
+ * using both the expected @a msg_delay between messages and
+ * acknowledgements and the given @a tracker to guide the
+ * frequency of calls to @a proc.
  *
- * @param pep the entry in the GNUNET_hash table
+ * @param stats statistics context
+ * @param mtu the maximum message size for each fragment
+ * @param tracker bandwidth tracker to use for flow control (can be NULL)
+ * @param msg_delay initial delay to insert between fragment transmissions
+ *              based on previous messages
+ * @param ack_delay expected delay between fragment transmission
+ *              and ACK based on previous messages
+ * @param msg the message to fragment
+ * @param proc function to call for each fragment to transmit
+ * @param proc_cls closure for @a proc
+ * @return the fragmentation context
  */
-static void
-checkComplete (FC * pep)
+struct GNUNET_FRAGMENT_Context *
+GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+                                uint16_t mtu,
+                                struct GNUNET_BANDWIDTH_Tracker *tracker,
+                                struct GNUNET_TIME_Relative msg_delay,
+                                struct GNUNET_TIME_Relative ack_delay,
+                                const struct GNUNET_MessageHeader *msg,
+                                GNUNET_FRAGMENT_MessageProcessor proc,
+                                void *proc_cls)
 {
-       FL *pos;
-       unsigned short off;
-       unsigned short len;
-       char *msg;
-
-       GNUNET_GE_ASSERT (NULL, pep != NULL);
-       pos = pep->head;
-       if (pos == NULL)
-               return;
-       len = ntohs (pos->frag->len);
-       if (len == 0)
-               goto CLEANUP;               /* really bad error! */
-       off = 0;
-       while ((pos != NULL) && (ntohs (pos->frag->off) <= off))
-       {
-               if (off >= off + FRAGSIZE (pos))
-                       goto CLEANUP;           /* error! */
-               if (ntohs (pos->frag->off) + FRAGSIZE (pos) > off)
-                       off = ntohs (pos->frag->off) + FRAGSIZE (pos);
-               else
-                       goto CLEANUP;           /* error! */
-               pos = pos->link;
-       }
-       if (off < len)
-               return;                     /* some fragment is still missing */
-
-       msg = GNUNET_malloc (len);
-       pos = pep->head;
-       while (pos != NULL)
-       {
-               memcpy (&msg[ntohs (pos->frag->off)], &pos->frag[1], FRAGSIZE (pos));
-               pos = pos->link;
-       }
-       if (stats != NULL)
-               stats->change (stat_defragmented, 1);
-#if 0
-       printf ("Finished defragmentation!\n");
-#endif
-       /* handle message! */
-       coreAPI->loopback_send (&pep->sender, msg, len, GNUNET_YES, NULL);
-       GNUNET_free (msg);
-       CLEANUP:
-       /* free fragment buffers */
-       freeFL (pep->head, 0);
-       pep->head = NULL;
-       pep->ttl = 0;
+  struct GNUNET_FRAGMENT_Context *fc;
+  size_t size;
+  uint64_t bits;
+
+  GNUNET_STATISTICS_update (stats,
+                            _("# messages fragmented"),
+                            1,
+                            GNUNET_NO);
+  GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
+  size = ntohs (msg->size);
+  GNUNET_STATISTICS_update (stats,
+                            _("# total size of fragmented messages"),
+                            size, GNUNET_NO);
+  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
+  fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
+  fc->stats = stats;
+  fc->mtu = mtu;
+  fc->tracker = tracker;
+  fc->ack_delay = ack_delay;
+  fc->msg_delay = msg_delay;
+  fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
+  fc->proc = proc;
+  fc->proc_cls = proc_cls;
+  fc->fragment_id =
+      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                UINT32_MAX);
+  GNUNET_memcpy (&fc[1], msg, size);
+  bits =
+      (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
+                                                           sizeof (struct
+                                                                   FragmentHeader));
+  GNUNET_assert (bits <= 64);
+  if (bits == 64)
+    fc->acks_mask = UINT64_MAX; /* set all 64 bit */
+  else
+    fc->acks_mask = (1LLU << bits) - 1;  /* set lowest 'bits' bit */
+  fc->acks = fc->acks_mask;
+  fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
+  return fc;
 }
 
-/**
- * See if the new fragment is a part of this entry and join them if
- * yes.  Return GNUNET_SYSERR if the fragments do not match.  Return GNUNET_OK if
- * the fragments do match and the fragment has been processed.  The
- * defragCacheLock is already acquired by the caller whenever this
- * method is called.<p>
- *
- * @param entry the entry in the cache
- * @param pep the new entry
- * @param packet the ip part in the new entry
- */
-static int
-tryJoin (FC * entry,
-               const GNUNET_PeerIdentity * sender,
-               const P2P_fragmentation_MESSAGE * packet)
-{
-       /* frame before ours; may end in the middle of
-     our frame or before it starts; NULL if we are
-     the earliest position we have received so far */
-       FL *before;
-       /* frame after ours; may start in the middle of
-     our frame or after it; NULL if we are the last
-     fragment we have received so far */
-       FL *after;
-       /* current position in the frame-list */
-       FL *pos;
-       /* the new entry that we're inserting */
-       FL *pep;
-       FL *tmp;
-       unsigned short end;
-
-       GNUNET_GE_ASSERT (NULL, entry != NULL);
-       if (0 != memcmp (sender, &entry->sender, sizeof (GNUNET_PeerIdentity)))
-               return GNUNET_SYSERR;       /* wrong fragment list, try another! */
-       if (ntohl (packet->id) != entry->id)
-               return GNUNET_SYSERR;       /* wrong fragment list, try another! */
-#if 0
-       printf ("Received fragment %u from %u to %u\n",
-                       ntohl (packet->id),
-                       ntohs (packet->off),
-                       ntohs (packet->off) + ntohs (packet->header.size) -
-                       sizeof (P2P_fragmentation_MESSAGE));
-#endif
-       pos = entry->head;
-       if ((pos != NULL) && (packet->len != pos->frag->len))
-               return GNUNET_SYSERR;       /* wrong fragment size */
-
-       before = NULL;
-       /* find the before-frame */
-       while ((pos != NULL) && (ntohs (pos->frag->off) < ntohs (packet->off)))
-       {
-               before = pos;
-               pos = pos->link;
-       }
-
-       /* find the after-frame */
-       end =
-                       ntohs (packet->off) + ntohs (packet->header.size) -
-                       sizeof (P2P_fragmentation_MESSAGE);
-       if (end <= ntohs (packet->off))
-       {
-               GNUNET_GE_LOG (NULL,
-                               GNUNET_GE_DEVELOPER | GNUNET_GE_DEBUG | GNUNET_GE_BULK,
-                               "Received invalid fragment at %s:%d\n", __FILE__,
-                               __LINE__);
-               return GNUNET_SYSERR;     /* yuck! integer overflow! */
-       }
-
-       if (before != NULL)
-               after = before;
-       else
-               after = entry->head;
-       while ((after != NULL) && (ntohs (after->frag->off) < end))
-               after = after->link;
-
-       if ((before != NULL) && (before == after))
-       {
-               /* this implies after or before != NULL and thereby the new
-         fragment is redundant as it is fully enclosed in an earlier
-         fragment */
-               if (stats != NULL)
-                       stats->change (stat_defragmented, 1);
-               return GNUNET_OK;         /* drop, there is a packet that spans our range! */
-       }
-
-       if ((before != NULL) &&
-                       (after != NULL) &&
-                       ((htons (before->frag->off) +
-                                       FRAGSIZE (before)) >= htons (after->frag->off)))
-       {
-               /* this implies that the fragment that starts before us and the
-         fragment that comes after this one leave no space in the middle
-         or even overlap; thus we can drop this redundant piece */
-               if (stats != NULL)
-                       stats->change (stat_defragmented, 1);
-               return GNUNET_OK;
-       }
-
-       /* allocate pep */
-       pep = GNUNET_malloc (sizeof (FC));
-       pep->frag = GNUNET_malloc (ntohs (packet->header.size));
-       memcpy (pep->frag, packet, ntohs (packet->header.size));
-       pep->link = NULL;
-
-       if (before == NULL)
-       {
-               pep->link = after;
-               pos = entry->head;
-               while (pos != after)
-               {
-                       tmp = pos->link;
-                       GNUNET_free (pos->frag);
-                       GNUNET_free (pos);
-                       pos = tmp;
-               }
-               entry->head = pep;
-               goto FINISH;
-               /* end of insert first */
-       }
-
-       if (after == NULL)
-       {
-               /* insert last: find the end, free everything after it */
-               freeFL (before->link, 1);
-               before->link = pep;
-               goto FINISH;
-       }
-
-       /* ok, we are filling the middle between two fragments; insert.  If
-     there is anything else in the middle, it can be dropped as we're
-     bigger & cover that area as well */
-       /* free everything between before and after */
-       pos = before->link;
-       while (pos != after)
-       {
-               tmp = pos->link;
-               GNUNET_free (pos->frag);
-               GNUNET_free (pos);
-               pos = tmp;
-       }
-       before->link = pep;
-       pep->link = after;
-
-       FINISH:
-       entry->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
-       checkComplete (entry);
-       return GNUNET_OK;
-}
 
 /**
- * Defragment the given fragment and pass to handler once
- * defragmentation is complete.
+ * Continuation to call from the 'proc' function after the fragment
+ * has been transmitted (and hence the next fragment can now be
+ * given to proc).
  *
- * @param frag the packet to defragment
- * @return GNUNET_SYSERR if the fragment is invalid
+ * @param fc fragmentation context
  */
-static int
-processFragment (const GNUNET_PeerIdentity * sender,
-               const GNUNET_MessageHeader * frag)
+void
+GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
 {
-       unsigned int hash;
-       FC *smf;
-
-       if (ntohs (frag->size) < sizeof (P2P_fragmentation_MESSAGE))
-               return GNUNET_SYSERR;
-
-       GNUNET_mutex_lock (defragCacheLock);
-       hash = sender->hashPubKey.bits[0] % DEFRAG_BUCKET_COUNT;
-       smf = defragmentationCache[hash];
-       while (smf != NULL)
-       {
-               if (GNUNET_OK ==
-                               tryJoin (smf, sender, (P2P_fragmentation_MESSAGE *) frag))
-               {
-                       GNUNET_mutex_unlock (defragCacheLock);
-                       return GNUNET_OK;
-               }
-               if (0 == memcmp (sender, &smf->sender, sizeof (GNUNET_PeerIdentity)))
-               {
-                       freeFL (smf->head, 1);
-                       break;
-               }
-               smf = smf->next;
-       }
-       if (smf == NULL)
-       {
-               smf = GNUNET_malloc (sizeof (FC));
-               smf->next = defragmentationCache[hash];
-               defragmentationCache[hash] = smf;
-               smf->ttl = GNUNET_get_time () + DEFRAGMENTATION_TIMEOUT;
-               smf->sender = *sender;
-       }
-       smf->id = ntohl (((P2P_fragmentation_MESSAGE *) frag)->id);
-       smf->head = GNUNET_malloc (sizeof (FL));
-       smf->head->link = NULL;
-       smf->head->frag = GNUNET_malloc (ntohs (frag->size));
-       memcpy (smf->head->frag, frag, ntohs (frag->size));
-
-       GNUNET_mutex_unlock (defragCacheLock);
-       return GNUNET_OK;
+  GNUNET_assert (fc->proc_busy == GNUNET_YES);
+  fc->proc_busy = GNUNET_NO;
+  GNUNET_assert (fc->task == NULL);
+  fc->task =
+    GNUNET_SCHEDULER_add_at (fc->delay_until,
+                             &transmit_next,
+                             fc);
 }
 
-typedef struct
-{
-       GNUNET_PeerIdentity sender;
-       /* maximums size of each fragment */
-       unsigned short mtu;
-       /** how long is this message part expected to be? */
-       unsigned short len;
-       /** when did we intend to transmit? */
-       GNUNET_CronTime transmissionTime;
-} FragmentBMC;
-
-/**
- * Send a message that had to be fragmented (right now!).  First grabs
- * the first part of the message (obtained from ctx->se) and stores
- * that in a P2P_fragmentation_MESSAGE envelope.  The remaining fragments are
- * added to the send queue with GNUNET_EXTREME_PRIORITY (to ensure that they
- * will be transmitted next).  The logic here is that if the priority
- * for the first fragment was sufficiently high, the priority should
- * also have been sufficiently high for all of the other fragments (at
- * this time) since they have the same priority.  And we want to make
- * sure that we send all of them since just sending the first fragment
- * and then going to other messages of equal priority would not be
- * such a great idea (i.e. would just waste bandwidth).
- */
-static int
-fragmentBMC (void *buf, void *cls, unsigned short len)
-{
-       FragmentBMC *ctx = cls;
-       static int idGen = 0;
-       P2P_fragmentation_MESSAGE *frag;
-       unsigned int pos;
-       int id;
-       unsigned short mlen;
-
-       if ((len < ctx->mtu) || (buf == NULL))
-       {
-               GNUNET_free (ctx);
-               return GNUNET_SYSERR;
-       }
-       if (stats != NULL)
-               stats->change (stat_fragmented, 1);
-       id = (idGen++) + GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 512);
-       /* write first fragment to buf */
-       frag = (P2P_fragmentation_MESSAGE *) buf;
-       frag->header.size = htons (len);
-       frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
-       frag->id = id;
-       frag->off = htons (0);
-       frag->len = htons (ctx->len);
-       memcpy (&frag[1], &ctx[1], len - sizeof (P2P_fragmentation_MESSAGE));
-
-       /* create remaining fragments, add to queue! */
-       pos = len - sizeof (P2P_fragmentation_MESSAGE);
-       frag = GNUNET_malloc (ctx->mtu);
-       while (pos < ctx->len)
-       {
-               mlen = sizeof (P2P_fragmentation_MESSAGE) + ctx->len - pos;
-               if (mlen > ctx->mtu)
-                       mlen = ctx->mtu;
-               GNUNET_GE_ASSERT (NULL, mlen > sizeof (P2P_fragmentation_MESSAGE));
-               frag->header.size = htons (mlen);
-               frag->header.type = htons (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
-               frag->id = id;
-               frag->off = htons (pos);
-               frag->len = htons (ctx->len);
-               memcpy (&frag[1],
-                               &((char *) (&ctx[1]))[pos],
-                               mlen - sizeof (P2P_fragmentation_MESSAGE));
-               coreAPI->ciphertext_send (&ctx->sender,
-                               &frag->header,
-                               GNUNET_EXTREME_PRIORITY,
-                               ctx->transmissionTime - GNUNET_get_time ());
-               pos += mlen - sizeof (P2P_fragmentation_MESSAGE);
-       }
-       GNUNET_GE_ASSERT (NULL, pos == ctx->len);
-       GNUNET_free (frag);
-       GNUNET_free (ctx);
-       return GNUNET_OK;
-}
 
 /**
- * The given message must be fragmented.  Produce a placeholder that
- * corresponds to the first fragment.  Once that fragment is scheduled
- * for transmission, the placeholder should automatically add all of
- * the other fragments (with very high priority).
+ * Process an acknowledgement message we got from the other
+ * side (to control re-transmits).
+ *
+ * @param fc fragmentation context
+ * @param msg acknowledgement message we received
+ * @return #GNUNET_OK if this ack completes the work of the 'fc'
+ *                   (all fragments have been received);
+ *         #GNUNET_NO if more messages are pending
+ *         #GNUNET_SYSERR if this ack is not valid for this fc
  */
-void
-fragment (const GNUNET_PeerIdentity * peer,
-               unsigned int mtu,
-               unsigned int prio,
-               unsigned int targetTime,
-               unsigned int len, GNUNET_BuildMessageCallback bmc, void *bmcClosure)
+int
+GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
+                             const struct GNUNET_MessageHeader *msg)
 {
-       FragmentBMC *fbmc;
-       int xlen;
-
-       GNUNET_GE_ASSERT (NULL, len > mtu);
-       GNUNET_GE_ASSERT (NULL, mtu > sizeof (P2P_fragmentation_MESSAGE));
-       fbmc = GNUNET_malloc (sizeof (FragmentBMC) + len);
-       fbmc->mtu = mtu;
-       fbmc->sender = *peer;
-       fbmc->transmissionTime = targetTime;
-       fbmc->len = len;
-       if (bmc == NULL)
-       {
-               memcpy (&fbmc[1], bmcClosure, len);
-               GNUNET_free (bmcClosure);
-       }
-       else
-       {
-               if (GNUNET_SYSERR == bmc (&fbmc[1], bmcClosure, len))
-               {
-                       GNUNET_free (fbmc);
-                       return;
-               }
-       }
-       xlen = mtu - sizeof (P2P_fragmentation_MESSAGE);
-       coreAPI->ciphertext_send_with_callback (peer, &fragmentBMC, fbmc, mtu, prio * xlen / len,     /* compute new priority */
-                       targetTime);
+  const struct FragmentAcknowledgement *fa;
+  uint64_t abits;
+  struct GNUNET_TIME_Relative ndelay;
+  unsigned int ack_cnt;
+  unsigned int snd_cnt;
+  unsigned int i;
+
+  if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  fa = (const struct FragmentAcknowledgement *) msg;
+  if (ntohl (fa->fragment_id) != fc->fragment_id)
+    return GNUNET_SYSERR;       /* not our ACK */
+  abits = GNUNET_ntohll (fa->bits);
+  if ( (GNUNET_YES == fc->wack) &&
+       (0 != fc->num_transmissions) )
+  {
+    /* normal ACK, can update running average of delay... */
+    fc->wack = GNUNET_NO;
+    ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
+    fc->ack_delay.rel_value_us =
+        (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
+    /* calculate ratio msg sent vs. msg acked */
+    ack_cnt = 0;
+    snd_cnt = 0;
+    for (i=0;i<64;i++)
+    {
+      if (1 == (fc->acks_mask & (1ULL << i)))
+      {
+       snd_cnt++;
+       if (0 == (abits & (1ULL << i)))
+         ack_cnt++;
+      }
+    }
+    if (0 == ack_cnt)
+    {
+      /* complete loss */
+      fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
+                                                                snd_cnt);
+    }
+    else if (snd_cnt > ack_cnt)
+    {
+      /* some loss, slow down proportionally */
+      fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
+    }
+    else if (snd_cnt == ack_cnt)
+    {
+      fc->msg_delay.rel_value_us =
+        (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
+    }
+    fc->num_transmissions = 0;
+    fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
+                                             GNUNET_TIME_UNIT_SECONDS);
+    fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
+                                             GNUNET_TIME_UNIT_SECONDS);
+  }
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragment acknowledgements received"),
+                            1,
+                            GNUNET_NO);
+  if (abits != (fc->acks & abits))
+  {
+    /* ID collission or message reordering, count! This should be rare! */
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# bits removed from fragmentation ACKs"), 1,
+                              GNUNET_NO);
+  }
+  fc->acks = abits & fc->acks_mask;
+  if (0 != fc->acks)
+  {
+    /* more to transmit, do so right now (if tracker permits...) */
+    if (fc->task != NULL)
+    {
+      /* schedule next transmission now, no point in waiting... */
+      GNUNET_SCHEDULER_cancel (fc->task);
+      fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
+    }
+    else
+    {
+      /* only case where there is no task should be if we're waiting
+       * for the right to transmit again (proc_busy set to YES) */
+      GNUNET_assert (GNUNET_YES == fc->proc_busy);
+    }
+    return GNUNET_NO;
+  }
+
+  /* all done */
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragmentation transmissions completed"),
+                            1,
+                            GNUNET_NO);
+  if (NULL != fc->task)
+  {
+    GNUNET_SCHEDULER_cancel (fc->task);
+    fc->task = NULL;
+  }
+  return GNUNET_OK;
 }
 
-/**
- * Initialize Fragmentation module.
- */
-GNUNET_Fragmentation_ServiceAPI *
-provide_module_fragmentation (GNUNET_CoreAPIForPlugins * capi)
-{
-       static GNUNET_Fragmentation_ServiceAPI ret;
-       int i;
-
-       coreAPI = capi;
-       stats = coreAPI->service_request ("stats");
-       if (stats != NULL)
-       {
-               stat_defragmented =
-                               stats->create (gettext_noop ("# messages defragmented"));
-               stat_fragmented =
-                               stats->create (gettext_noop ("# messages fragmented"));
-               stat_discarded = stats->create (gettext_noop ("# fragments discarded"));
-       }
-       for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
-               defragmentationCache[i] = NULL;
-       defragCacheLock = GNUNET_mutex_create (GNUNET_NO);
-       GNUNET_cron_add_job (coreAPI->cron,
-                       &defragmentationPurgeCron,
-                       60 * GNUNET_CRON_SECONDS, 60 * GNUNET_CRON_SECONDS,
-                       NULL);
-       GNUNET_GE_LOG (capi->ectx,
-                       GNUNET_GE_INFO | GNUNET_GE_USER | GNUNET_GE_REQUEST,
-                       _("`%s' registering handler %d\n"), "fragmentation",
-                       GNUNET_P2P_PROTO_MESSAGE_FRAGMENT);
-       capi->p2p_ciphertext_handler_register (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT,
-                       &processFragment);
-
-       ret.fragment = &fragment;
-       return &ret;
-}
 
 /**
- * Shutdown fragmentation.
+ * Destroy the given fragmentation context (stop calling 'proc', free
+ * resources).
+ *
+ * @param fc fragmentation context
+ * @param msg_delay where to store average delay between individual message transmissions the
+ *         last message (OUT only)
+ * @param ack_delay where to store average delay between transmission and ACK for the
+ *         last message, set to FOREVER if the message was not fully transmitted (OUT only)
  */
 void
-release_module_fragmentation ()
+GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
+                                struct GNUNET_TIME_Relative *msg_delay,
+                                struct GNUNET_TIME_Relative *ack_delay)
 {
-       int i;
-
-       coreAPI->p2p_ciphertext_handler_unregister
-       (GNUNET_P2P_PROTO_MESSAGE_FRAGMENT, &processFragment);
-       GNUNET_cron_del_job (coreAPI->cron, &defragmentationPurgeCron,
-                       60 * GNUNET_CRON_SECONDS, NULL);
-       for (i = 0; i < DEFRAG_BUCKET_COUNT; i++)
-       {
-               FC *pos = defragmentationCache[i];
-               while (pos != NULL)
-               {
-                       FC *next = pos->next;
-                       freeFL (pos->head, 1);
-                       GNUNET_free (pos);
-                       pos = next;
-               }
-       }
-       if (stats != NULL)
-       {
-               coreAPI->service_release (stats);
-               stats = NULL;
-       }
-       GNUNET_mutex_destroy (defragCacheLock);
-       defragCacheLock = NULL;
-       coreAPI = NULL;
+  if (fc->task != NULL)
+    GNUNET_SCHEDULER_cancel (fc->task);
+  if (NULL != ack_delay)
+    *ack_delay = fc->ack_delay;
+  if (NULL != msg_delay)
+    *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
+                                                           fc->num_rounds);
+  GNUNET_free (fc);
 }
 
-#endif
 
 /* end of fragmentation.c */