Merge branch 'license/spdx'
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
index 34db0db3a499b23a6af2dc7f60ea3ee802359dac..bbe6f3741c1e49cec545bdca24c6d764fbcf3dd2 100644 (file)
@@ -1,24 +1,24 @@
 /*
      This file is part of GNUnet
-     (C) 2009, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2011 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     SPDX-License-Identifier: AGPL3.0-or-later
 */
 /**
- * @file src/fragmentation/defragmentation_new.c
+ * @file src/fragmentation/defragmentation.c
  * @brief library to help defragment messages
  * @author Christian Grothoff
  */
@@ -69,7 +69,7 @@ struct MessageContext
   /**
    * Pointer to the assembled message, allocated at the
    * end of this struct.
-   */ 
+   */
   const struct GNUNET_MessageHeader *msg;
 
   /**
@@ -83,7 +83,7 @@ struct MessageContext
    * Task scheduled for transmitting the next ACK to the
    * other peer.
    */
-  GNUNET_SCHEDULER_TaskIdentifier ack_task;
+  struct GNUNET_SCHEDULER_Task * ack_task;
 
   /**
    * When did we receive which fragment? Used to calculate
@@ -109,13 +109,13 @@ struct MessageContext
 
   /**
    * For the current ACK round, which is the first relevant
-   * offset in 'frag_times'?
+   * offset in @e frag_times?
    */
   unsigned int frag_times_start_offset;
 
   /**
    * Which offset whould we write the next frag value into
-   * in the 'frag_times' array? All smaller entries are valid.
+   * in the @e frag_times array? All smaller entries are valid.
    */
   unsigned int frag_times_write_offset;
 
@@ -124,6 +124,11 @@ struct MessageContext
    */
   uint16_t total_size;
 
+  /**
+   * Was the last fragment we got a duplicate?
+   */
+  int16_t last_duplicate;
+
 };
 
 
@@ -149,7 +154,7 @@ struct GNUNET_DEFRAGMENT_Context
   struct MessageContext *tail;
 
   /**
-   * Closure for 'proc' and 'ackp'.
+   * Closure for @e proc and @e ackp.
    */
   void *cls;
 
@@ -183,8 +188,9 @@ struct GNUNET_DEFRAGMENT_Context
 
   /**
    * Maximum message size for each fragment.
-   */ 
+   */
   uint16_t mtu;
+
 };
 
 
@@ -192,10 +198,10 @@ struct GNUNET_DEFRAGMENT_Context
  * Create a defragmentation context.
  *
  * @param stats statistics context
- * @param mtu the maximum message size for each fragment 
+ * @param mtu the maximum message size for each fragment
  * @param num_msgs how many fragmented messages
  *                 to we defragment at most at the same time?
- * @param cls closure for proc and ackp
+ * @param cls closure for @a proc and @a ackp
  * @param proc function to call with defragmented messages
  * @param ackp function to call with acknowledgements (to send
  *             back to the other side)
@@ -203,22 +209,21 @@ struct GNUNET_DEFRAGMENT_Context
  */
 struct GNUNET_DEFRAGMENT_Context *
 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
-                                 uint16_t mtu,
-                                 unsigned int num_msgs,
-                                 void *cls,
-                                 GNUNET_FRAGMENT_MessageProcessor proc,
-                                 GNUNET_DEFRAGMENT_AckProcessor ackp)
+                                  uint16_t mtu, unsigned int num_msgs,
+                                  void *cls,
+                                  GNUNET_FRAGMENT_MessageProcessor proc,
+                                  GNUNET_DEFRAGMENT_AckProcessor ackp)
 {
   struct GNUNET_DEFRAGMENT_Context *dc;
 
-  dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
+  dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
   dc->stats = stats;
   dc->cls = cls;
   dc->proc = proc;
   dc->ackp = ackp;
   dc->num_msgs = num_msgs;
   dc->mtu = mtu;
-  dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
+  dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
   return dc;
 }
 
@@ -228,24 +233,22 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
  *
  * @param dc defragmentation context
  */
-void 
+void
 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
 {
   struct MessageContext *mc;
 
   while (NULL != (mc = dc->head))
+  {
+    GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
+    dc->list_size--;
+    if (NULL != mc->ack_task)
     {
-      GNUNET_CONTAINER_DLL_remove (dc->head,
-                                  dc->tail,
-                                  mc);
-      dc->list_size--;
-      if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
-       {
-         GNUNET_SCHEDULER_cancel (mc->ack_task);
-         mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
-       }
-      GNUNET_free (mc);
+      GNUNET_SCHEDULER_cancel (mc->ack_task);
+      mc->ack_task = NULL;
     }
+    GNUNET_free (mc);
+  }
   GNUNET_assert (0 == dc->list_size);
   GNUNET_free (dc);
 }
@@ -255,57 +258,57 @@ GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
  * Send acknowledgement to the other peer now.
  *
  * @param cls the message context
- * @param tc the scheduler context
  */
 static void
-send_ack (void *cls,
-         const struct GNUNET_SCHEDULER_TaskContext *tc)
+send_ack (void *cls)
 {
   struct MessageContext *mc = cls;
   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
   struct FragmentAcknowledgement fa;
 
-  mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+  mc->ack_task = NULL;
   fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
   fa.fragment_id = htonl (mc->fragment_id);
   fa.bits = GNUNET_htonll (mc->bits);
   GNUNET_STATISTICS_update (mc->dc->stats,
-                           _("# acknowledgements sent for fragment"),
-                           1,
-                           GNUNET_NO);
-  dc->ackp (dc->cls, mc->fragment_id, &fa.header);
+                            _("# acknowledgements sent for fragment"),
+                            1,
+                            GNUNET_NO);
+  mc->last_duplicate = GNUNET_NO; /* clear flag */
+  dc->ackp (dc->cls,
+            mc->fragment_id,
+            &fa.header);
 }
 
 
 /**
  * This function is from the GNU Scientific Library, linear/fit.c,
- * (C) 2000 Brian Gough
+ * Copyright (C) 2000 Brian Gough
  */
 static void
-gsl_fit_mul (const double *x, const size_t xstride,
-             const double *y, const size_t ystride,
-             const size_t n, 
-             double *c1, double *cov_11, double *sumsq)
+gsl_fit_mul (const double *x, const size_t xstride, const double *y,
+             const size_t ystride, const size_t n, double *c1, double *cov_11,
+             double *sumsq)
 {
   double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
 
   size_t i;
 
   for (i = 0; i < n; i++)
-    {
-      m_x += (x[i * xstride] - m_x) / (i + 1.0);
-      m_y += (y[i * ystride] - m_y) / (i + 1.0);
-    }
+  {
+    m_x += (x[i * xstride] - m_x) / (i + 1.0);
+    m_y += (y[i * ystride] - m_y) / (i + 1.0);
+  }
 
   for (i = 0; i < n; i++)
-    {
-      const double dx = x[i * xstride] - m_x;
-      const double dy = y[i * ystride] - m_y;
+  {
+    const double dx = x[i * xstride] - m_x;
+    const double dy = y[i * ystride] - m_y;
 
-      m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
-      m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
-    }
+    m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
+    m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
+  }
 
   /* In terms of y =  b x */
 
@@ -318,12 +321,13 @@ gsl_fit_mul (const double *x, const size_t xstride,
     /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */
 
     for (i = 0; i < n; i++)
-      {
-        const double dx = x[i * xstride] - m_x;
-        const double dy = y[i * ystride] - m_y;
-        const double d = (m_y - b * m_x) + dy - b * dx;
-        d2 += d * d;
-      }
+    {
+      const double dx = x[i * xstride] - m_x;
+      const double dy = y[i * ystride] - m_y;
+      const double d = (m_y - b * m_x) + dy - b * dx;
+
+      d2 += d * d;
+    }
 
     s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */
 
@@ -356,15 +360,18 @@ estimate_latency (struct MessageContext *mc)
 
   first = &mc->frag_times[mc->frag_times_start_offset];
   GNUNET_assert (total > 1);
-  for (i=0;i<total;i++)
-    {
-      x[i] = (double) i;
-      y[i] = (double) (first[i].time.abs_value - first[0].time.abs_value);
-    }
-  gsl_fit_mul (x, 1, y, 1, total,  &c1, &cov11, &sumsq);
-  ret.rel_value = (uint64_t) c1;
+  for (i = 0; i < total; i++)
+  {
+    x[i] = (double) i;
+    y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
+  }
+  gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
+  c1 += sqrt (sumsq);           /* add 1 std dev */
+  ret.rel_value_us = (uint64_t) c1;
+  if (0 == ret.rel_value_us)
+    ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
   return ret;
-};
+}
 
 
 /**
@@ -381,22 +388,20 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
   old = NULL;
   pos = dc->head;
   while (NULL != pos)
-    {
-      if ( (old == NULL) ||
-          (old->last_update.abs_value > pos->last_update.abs_value) )
-       old = pos;
-      pos = pos->next;
-    }
+  {
+    if ((old == NULL) ||
+        (old->last_update.abs_value_us > pos->last_update.abs_value_us))
+      old = pos;
+    pos = pos->next;
+  }
   GNUNET_assert (NULL != old);
-  GNUNET_CONTAINER_DLL_remove (dc->head,
-                              dc->tail,
-                              old);
+  GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
   dc->list_size--;
-  if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
-    {
-      GNUNET_SCHEDULER_cancel (old->ack_task);
-      old->ack_task = GNUNET_SCHEDULER_NO_TASK;
-    }
+  if (NULL != old->ack_task)
+  {
+    GNUNET_SCHEDULER_cancel (old->ack_task);
+    old->ack_task = NULL;
+  }
   GNUNET_free (old);
 }
 
@@ -406,11 +411,13 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
  *
  * @param dc the context
  * @param msg the message that was received
- * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success,
+ *         #GNUNET_NO if this was a duplicate,
+ *         #GNUNET_SYSERR on error
  */
-int 
+int
 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
-                                   const struct GNUNET_MessageHeader *msg)
+                                    const struct GNUNET_MessageHeader *msg)
 {
   struct MessageContext *mc;
   const struct FragmentHeader *fh;
@@ -424,132 +431,160 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
   unsigned int bc;
   unsigned int b;
   unsigned int n;
+  unsigned int num_fragments;
   int duplicate;
+  int last;
 
-  if (ntohs(msg->size) < sizeof (struct FragmentHeader))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
+  if (ntohs (msg->size) < sizeof (struct FragmentHeader))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   if (ntohs (msg->size) > dc->mtu)
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  fh = (const struct FragmentHeader*) msg;
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  fh = (const struct FragmentHeader *) msg;
   msize = ntohs (fh->total_size);
+  if (msize < sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   fid = ntohl (fh->fragment_id);
   foff = ntohs (fh->offset);
   if (foff >= msize)
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
-    {
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   GNUNET_STATISTICS_update (dc->stats,
-                           _("# fragments received"),
-                           1,
-                           GNUNET_NO);
+                            _("# fragments received"),
+                            1,
+                            GNUNET_NO);
+  num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
+  last = 0;
+  for (mc = dc->head; NULL != mc; mc = mc->next)
+    if (mc->fragment_id > fid)
+      last++;
+
   mc = dc->head;
-  while ( (NULL != mc) &&
-         (fid != mc->fragment_id) )
+  while ((NULL != mc) && (fid != mc->fragment_id))
     mc = mc->next;
   bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
-  if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) 
-      sizeof (struct FragmentHeader) > msize)
-    {
-      /* payload extends past total message size */
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
-  if ( (NULL != mc) && (msize != mc->total_size) )
-    {
-      /* inconsistent message size */
-      GNUNET_break_op (0);
-      return GNUNET_SYSERR;
-    }
+  if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
+      sizeof (struct FragmentHeader) > msize)
+  {
+    /* payload extends past total message size */
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if ((NULL != mc) && (msize != mc->total_size))
+  {
+    /* inconsistent message size */
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   now = GNUNET_TIME_absolute_get ();
   if (NULL == mc)
-    {
-      mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
-      mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
-      mc->dc = dc;
-      mc->total_size = msize;
-      mc->fragment_id = fid;      
-      mc->last_update = now;
-      n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader));   
-      if (n == 64)
-       mc->bits = UINT64_MAX;      /* set all 64 bit */
-      else
-       mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */
-      if (dc->list_size >= dc->num_msgs)
-       discard_oldest_mc (dc);
-      GNUNET_CONTAINER_DLL_insert (dc->head,
-                                  dc->tail,
-                                  mc);
-      dc->list_size++;
-    }
+  {
+    mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
+    mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
+    mc->dc = dc;
+    mc->total_size = msize;
+    mc->fragment_id = fid;
+    mc->last_update = now;
+    n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
+                                                                  sizeof (struct
+                                                                          FragmentHeader));
+    if (n == 64)
+      mc->bits = UINT64_MAX;    /* set all 64 bit */
+    else
+      mc->bits = (1LLU << n) - 1;        /* set lowest 'bits' bit */
+    if (dc->list_size >= dc->num_msgs)
+      discard_oldest_mc (dc);
+    GNUNET_CONTAINER_DLL_insert (dc->head,
+                                 dc->tail,
+                                 mc);
+    dc->list_size++;
+  }
 
   /* copy data to 'mc' */
-  if (0 != (mc->bits & (1LL << bit)))
-    {
-      mc->bits -= 1LL << bit;
-      mbuf = (char* )&mc[1];
-      memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))],
-             &fh[1],
-             ntohs (msg->size) - sizeof (struct FragmentHeader));
-      mc->last_update = now;
-      if (bit < mc->last_bit)
-       mc->frag_times_start_offset = mc->frag_times_write_offset;
-      mc->last_bit = bit;
-      mc->frag_times[mc->frag_times_write_offset].time = now;
-      mc->frag_times[mc->frag_times_write_offset].bit = bit;
-      mc->frag_times_write_offset++;
-      duplicate = GNUNET_NO;
-    }
+  if (0 != (mc->bits & (1LLU << bit)))
+  {
+    mc->bits -= 1LLU << bit;
+    mbuf = (char *) &mc[1];
+    GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
+            ntohs (msg->size) - sizeof (struct FragmentHeader));
+    mc->last_update = now;
+    if (bit < mc->last_bit)
+      mc->frag_times_start_offset = mc->frag_times_write_offset;
+    mc->last_bit = bit;
+    mc->frag_times[mc->frag_times_write_offset].time = now;
+    mc->frag_times[mc->frag_times_write_offset].bit = bit;
+    mc->frag_times_write_offset++;
+    duplicate = GNUNET_NO;
+  }
   else
-    {
-      duplicate = GNUNET_YES;
-      GNUNET_STATISTICS_update (dc->stats,
-                               _("# duplicate fragments received"),
-                               1,
-                               GNUNET_NO);
-    }
+  {
+    duplicate = GNUNET_YES;
+    GNUNET_STATISTICS_update (dc->stats,
+                              _("# duplicate fragments received"),
+                              1,
+                              GNUNET_NO);
+  }
 
-  /* count number of missing fragments */
+  /* count number of missing fragments after the current one */
   bc = 0;
-  for (b=0;b<64;b++)
-    if (0 != (mc->bits & (1LL << b))) bc++;
+  for (b = bit; b < 64; b++)
+    if (0 != (mc->bits & (1LLU << b)))
+      bc++;
+    else
+      bc = 0;
+
+  /* notify about complete message */
+  if ( (GNUNET_NO == duplicate) &&
+       (0 == mc->bits) )
+  {
+    GNUNET_STATISTICS_update (dc->stats,
+                              _("# messages defragmented"),
+                              1,
+                              GNUNET_NO);
+    /* message complete, notify! */
+    dc->proc (dc->cls, mc->msg);
+  }
+  /* send ACK */
   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
+  {
     dc->latency = estimate_latency (mc);
-  delay = GNUNET_TIME_relative_multiply (dc->latency,
-                                        bc + 1);
-  if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */
+  }
+  delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
+                                                    bc + 1);
+  if ( (last + fid == num_fragments) ||
+       (0 == mc->bits) ||
+       (GNUNET_YES == duplicate) )
+  {
+    /* message complete or duplicate or last missing fragment in
+       linear sequence; ACK now! */
     delay = GNUNET_TIME_UNIT_ZERO;
-  if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+  }
+  if (NULL != mc->ack_task)
     GNUNET_SCHEDULER_cancel (mc->ack_task);
   mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                              &send_ack,
-                                              mc);
-  if ( (duplicate == GNUNET_NO) &&
-       (0 == mc->bits) )
-    {
-      GNUNET_STATISTICS_update (dc->stats,
-                               _("# messages defragmented"),
-                               1,
-                               GNUNET_NO);
-      /* message complete, notify! */
-      dc->proc (dc->cls,
-               mc->msg);      
-    }
-  if (duplicate == GNUNET_YES)
+                                               &send_ack,
+                                               mc);
+  if (GNUNET_YES == duplicate)
+  {
+    mc->last_duplicate = GNUNET_YES;
     return GNUNET_NO;
+  }
   return GNUNET_YES;
 }
 
-/* end of defragmentation_new.c */
-
+/* end of defragmentation.c */