cleaning up set handlers, eliminating 2nd level demultiplexing and improving use...
[oweals/gnunet.git] / src / fragmentation / defragmentation.c
index f422070469281386d77083434d9e8f9a2e9feaf5..cc0f5a8c5434f7e102f840c7773ee4f25f01273e 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2009, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2011 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 /**
  * @file src/fragmentation/defragmentation.c
@@ -83,7 +83,7 @@ struct MessageContext
    * Task scheduled for transmitting the next ACK to the
    * other peer.
    */
-  GNUNET_SCHEDULER_TaskIdentifier ack_task;
+  struct GNUNET_SCHEDULER_Task * ack_task;
 
   /**
    * When did we receive which fragment? Used to calculate
@@ -109,13 +109,13 @@ struct MessageContext
 
   /**
    * For the current ACK round, which is the first relevant
-   * offset in 'frag_times'?
+   * offset in @e frag_times?
    */
   unsigned int frag_times_start_offset;
 
   /**
    * Which offset whould we write the next frag value into
-   * in the 'frag_times' array? All smaller entries are valid.
+   * in the @e frag_times array? All smaller entries are valid.
    */
   unsigned int frag_times_write_offset;
 
@@ -124,6 +124,11 @@ struct MessageContext
    */
   uint16_t total_size;
 
+  /**
+   * Was the last fragment we got a duplicate?
+   */
+  int16_t last_duplicate;
+
 };
 
 
@@ -149,7 +154,7 @@ struct GNUNET_DEFRAGMENT_Context
   struct MessageContext *tail;
 
   /**
-   * Closure for 'proc' and 'ackp'.
+   * Closure for @e proc and @e ackp.
    */
   void *cls;
 
@@ -185,6 +190,7 @@ struct GNUNET_DEFRAGMENT_Context
    * Maximum message size for each fragment.
    */
   uint16_t mtu;
+
 };
 
 
@@ -195,7 +201,7 @@ struct GNUNET_DEFRAGMENT_Context
  * @param mtu the maximum message size for each fragment
  * @param num_msgs how many fragmented messages
  *                 to we defragment at most at the same time?
- * @param cls closure for proc and ackp
+ * @param cls closure for @a proc and @a ackp
  * @param proc function to call with defragmented messages
  * @param ackp function to call with acknowledgements (to send
  *             back to the other side)
@@ -210,7 +216,7 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
 {
   struct GNUNET_DEFRAGMENT_Context *dc;
 
-  dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
+  dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
   dc->stats = stats;
   dc->cls = cls;
   dc->proc = proc;
@@ -236,10 +242,10 @@ GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
   {
     GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
     dc->list_size--;
-    if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+    if (NULL != mc->ack_task)
     {
       GNUNET_SCHEDULER_cancel (mc->ack_task);
-      mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+      mc->ack_task = NULL;
     }
     GNUNET_free (mc);
   }
@@ -252,30 +258,33 @@ GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
  * Send acknowledgement to the other peer now.
  *
  * @param cls the message context
- * @param tc the scheduler context
  */
 static void
-send_ack (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+send_ack (void *cls)
 {
   struct MessageContext *mc = cls;
   struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
   struct FragmentAcknowledgement fa;
 
-  mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+  mc->ack_task = NULL;
   fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
   fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
   fa.fragment_id = htonl (mc->fragment_id);
   fa.bits = GNUNET_htonll (mc->bits);
   GNUNET_STATISTICS_update (mc->dc->stats,
-                            _("# acknowledgements sent for fragment"), 1,
+                            _("# acknowledgements sent for fragment"),
+                            1,
                             GNUNET_NO);
-  dc->ackp (dc->cls, mc->fragment_id, &fa.header);
+  mc->last_duplicate = GNUNET_NO; /* clear flag */
+  dc->ackp (dc->cls,
+            mc->fragment_id,
+            &fa.header);
 }
 
 
 /**
  * This function is from the GNU Scientific Library, linear/fit.c,
- * (C) 2000 Brian Gough
+ * Copyright (C) 2000 Brian Gough
  */
 static void
 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
@@ -388,10 +397,10 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
   GNUNET_assert (NULL != old);
   GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
   dc->list_size--;
-  if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
+  if (NULL != old->ack_task)
   {
     GNUNET_SCHEDULER_cancel (old->ack_task);
-    old->ack_task = GNUNET_SCHEDULER_NO_TASK;
+    old->ack_task = NULL;
   }
   GNUNET_free (old);
 }
@@ -402,7 +411,9 @@ discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
  *
  * @param dc the context
  * @param msg the message that was received
- * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
+ * @return #GNUNET_OK on success,
+ *         #GNUNET_NO if this was a duplicate,
+ *         #GNUNET_SYSERR on error
  */
 int
 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
@@ -453,13 +464,16 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
-  GNUNET_STATISTICS_update (dc->stats, _("# fragments received"), 1, GNUNET_NO);
+  GNUNET_STATISTICS_update (dc->stats,
+                            _("# fragments received"),
+                            1,
+                            GNUNET_NO);
   num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
   last = 0;
   for (mc = dc->head; NULL != mc; mc = mc->next)
     if (mc->fragment_id > fid)
       last++;
-  
+
   mc = dc->head;
   while ((NULL != mc) && (fid != mc->fragment_id))
     mc = mc->next;
@@ -492,19 +506,21 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
     if (n == 64)
       mc->bits = UINT64_MAX;    /* set all 64 bit */
     else
-      mc->bits = (1LL << n) - 1;        /* set lowest 'bits' bit */
+      mc->bits = (1LLU << n) - 1;        /* set lowest 'bits' bit */
     if (dc->list_size >= dc->num_msgs)
       discard_oldest_mc (dc);
-    GNUNET_CONTAINER_DLL_insert (dc->head, dc->tail, mc);
+    GNUNET_CONTAINER_DLL_insert (dc->head,
+                                 dc->tail,
+                                 mc);
     dc->list_size++;
   }
 
   /* copy data to 'mc' */
-  if (0 != (mc->bits & (1LL << bit)))
+  if (0 != (mc->bits & (1LLU << bit)))
   {
-    mc->bits -= 1LL << bit;
+    mc->bits -= 1LLU << bit;
     mbuf = (char *) &mc[1];
-    memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
+    GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
             ntohs (msg->size) - sizeof (struct FragmentHeader));
     mc->last_update = now;
     if (bit < mc->last_bit)
@@ -518,43 +534,56 @@ GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
   else
   {
     duplicate = GNUNET_YES;
-    GNUNET_STATISTICS_update (dc->stats, _("# duplicate fragments received"), 1,
+    GNUNET_STATISTICS_update (dc->stats,
+                              _("# duplicate fragments received"),
+                              1,
                               GNUNET_NO);
   }
 
-  /* count number of missing fragments */
+  /* count number of missing fragments after the current one */
   bc = 0;
-  for (b = 0; b < 64; b++)
-    if (0 != (mc->bits & (1LL << b)))
+  for (b = bit; b < 64; b++)
+    if (0 != (mc->bits & (1LLU << b)))
       bc++;
+    else
+      bc = 0;
 
   /* notify about complete message */
-  if ((duplicate == GNUNET_NO) && (0 == mc->bits))
+  if ( (GNUNET_NO == duplicate) &&
+       (0 == mc->bits) )
   {
-    GNUNET_STATISTICS_update (dc->stats, _("# messages defragmented"), 1,
+    GNUNET_STATISTICS_update (dc->stats,
+                              _("# messages defragmented"),
+                              1,
                               GNUNET_NO);
     /* message complete, notify! */
     dc->proc (dc->cls, mc->msg);
   }
   /* send ACK */
   if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
-  { 
+  {
     dc->latency = estimate_latency (mc);
   }
-  delay = GNUNET_TIME_relative_multiply (dc->latency, bc + 1);
+  delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
+                                                    bc + 1);
   if ( (last + fid == num_fragments) ||
-       (0 == mc->bits) || 
-       (GNUNET_YES == duplicate))     
+       (0 == mc->bits) ||
+       (GNUNET_YES == duplicate) )
   {
     /* message complete or duplicate or last missing fragment in
        linear sequence; ACK now! */
     delay = GNUNET_TIME_UNIT_ZERO;
   }
-  if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+  if (NULL != mc->ack_task)
     GNUNET_SCHEDULER_cancel (mc->ack_task);
-  mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, &send_ack, mc);
-  if (duplicate == GNUNET_YES)
+  mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                               &send_ack,
+                                               mc);
+  if (GNUNET_YES == duplicate)
+  {
+    mc->last_duplicate = GNUNET_YES;
     return GNUNET_NO;
+  }
   return GNUNET_YES;
 }