continue to fix extract result
[oweals/gnunet.git] / src / fragmentation / fragmentation.c
index bf749181590028406e5a75b3569f893cccaa1d8c..3a55502e71b03adfcde7e71778b48c1b8a8af73a 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2009-2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009-2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 /**
  * @file src/fragmentation/fragmentation.c
@@ -80,7 +80,7 @@ struct GNUNET_FRAGMENT_Context
   GNUNET_FRAGMENT_MessageProcessor proc;
 
   /**
-   * Closure for 'proc'.
+   * Closure for @e proc.
    */
   void *proc_cls;
 
@@ -90,7 +90,7 @@ struct GNUNET_FRAGMENT_Context
   uint64_t acks;
 
   /**
-   * Bitfield with all possible bits for 'acks' (used to mask the
+   * Bitfield with all possible bits for @e acks (used to mask the
    * ack we get back).
    */
   uint64_t acks_mask;
@@ -98,7 +98,7 @@ struct GNUNET_FRAGMENT_Context
   /**
    * Task performing work for the fragmenter.
    */
-  GNUNET_SCHEDULER_TaskIdentifier task;
+  struct GNUNET_SCHEDULER_Task *task;
 
   /**
    * Our fragmentation ID. (chosen at random)
@@ -121,12 +121,12 @@ struct GNUNET_FRAGMENT_Context
   unsigned int num_transmissions;
 
   /**
-   * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
+   * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
    */
   int8_t proc_busy;
 
   /**
-   * GNUNET_YES if we are waiting for an ACK.
+   * #GNUNET_YES if we are waiting for an ACK.
    */
   int8_t wack;
 
@@ -138,14 +138,38 @@ struct GNUNET_FRAGMENT_Context
 };
 
 
+/**
+ * Convert an ACK message to a printable format suitable for logging.
+ *
+ * @param ack message to print
+ * @return ack in human-readable format
+ */
+const char *
+GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
+{
+  static char buf[128];
+  const struct FragmentAcknowledgement *fa;
+
+  if (sizeof (struct FragmentAcknowledgement) !=
+      htons (ack->size))
+    return "<malformed ack>";
+  fa = (const struct FragmentAcknowledgement *) ack;
+  GNUNET_snprintf (buf,
+                   sizeof (buf),
+                   "%u-%llX",
+                   ntohl (fa->fragment_id),
+                   GNUNET_ntohll (fa->bits));
+  return buf;
+}
+
+
 /**
  * Transmit the next fragment to the other peer.
  *
- * @param cls the 'struct GNUNET_FRAGMENT_Context'
- * @param tc scheduler context
+ * @param cls the `struct GNUNET_FRAGMENT_Context`
  */
 static void
-transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmit_next (void *cls)
 {
   struct GNUNET_FRAGMENT_Context *fc = cls;
   char msg[fc->mtu];
@@ -157,7 +181,7 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   size_t fsize;
   int wrap;
 
-  fc->task = GNUNET_SCHEDULER_NO_TASK;
+  fc->task = NULL;
   GNUNET_assert (GNUNET_NO == fc->proc_busy);
   if (0 == fc->acks)
     return;                     /* all done */
@@ -177,12 +201,19 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   else
     fsize = fc->mtu;
   if (NULL != fc->tracker)
-    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                fsize);
   else
     delay = GNUNET_TIME_UNIT_ZERO;
   if (delay.rel_value_us > 0)
   {
-    fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Fragmentation logic delays transmission of next fragment by %s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES));
+    fc->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                             &transmit_next,
+                                             fc);
     return;
   }
   fc->next_transmission = (fc->next_transmission + 1) % 64;
@@ -205,10 +236,14 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
           fsize - sizeof (struct FragmentHeader));
   if (NULL != fc->tracker)
     GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
-  GNUNET_STATISTICS_update (fc->stats, _("# fragments transmitted"), 1,
+  GNUNET_STATISTICS_update (fc->stats,
+                            _("# fragments transmitted"),
+                            1,
                             GNUNET_NO);
   if (0 != fc->last_round.abs_value_us)
-    GNUNET_STATISTICS_update (fc->stats, _("# fragments retransmitted"), 1,
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# fragments retransmitted"),
+                              1,
                               GNUNET_NO);
 
   /* select next message to calculate delay */
@@ -219,12 +254,13 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   else
     fsize = fc->mtu;
   if (NULL != fc->tracker)
-    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                fsize);
   else
     delay = GNUNET_TIME_UNIT_ZERO;
   delay = GNUNET_TIME_relative_max (delay,
                                    GNUNET_TIME_relative_multiply (fc->msg_delay,
-                                                                  (1 << fc->num_rounds)));
+                                                                  (1ULL << fc->num_rounds)));
   if (wrap)
   {
     /* full round transmitted wait 2x delay for ACK before going again */
@@ -234,23 +270,26 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
     fc->wack = GNUNET_YES;
     fc->last_round = GNUNET_TIME_absolute_get ();
-    GNUNET_STATISTICS_update (fc->stats, _("# fragments wrap arounds"), 1,
+    GNUNET_STATISTICS_update (fc->stats,
+                              _("# fragments wrap arounds"),
+                              1,
                               GNUNET_NO);
   }
   fc->proc_busy = GNUNET_YES;
   fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
   fc->num_transmissions++;
-  fc->proc (fc->proc_cls, &fh->header);
+  fc->proc (fc->proc_cls,
+            &fh->header);
 }
 
 
 /**
  * Create a fragmentation context for the given message.
- * Fragments the message into fragments of size "mtu" or
- * less.  Calls 'proc' on each un-acknowledged fragment,
- * using both the expected 'delay' between messages and
- * acknowledgements and the given 'tracker' to guide the
- * frequency of calls to 'proc'.
+ * Fragments the message into fragments of size @a mtu or
+ * less.  Calls @a proc on each un-acknowledged fragment,
+ * using both the expected @a msg_delay between messages and
+ * acknowledgements and the given @a tracker to guide the
+ * frequency of calls to @a proc.
  *
  * @param stats statistics context
  * @param mtu the maximum message size for each fragment
@@ -261,7 +300,7 @@ transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  *              and ACK based on previous messages
  * @param msg the message to fragment
  * @param proc function to call for each fragment to transmit
- * @param proc_cls closure for proc
+ * @param proc_cls closure for @a proc
  * @return the fragmentation context
  */
 struct GNUNET_FRAGMENT_Context *
@@ -278,10 +317,14 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
   size_t size;
   uint64_t bits;
 
-  GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO);
+  GNUNET_STATISTICS_update (stats,
+                            _("# messages fragmented"),
+                            1,
+                            GNUNET_NO);
   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
   size = ntohs (msg->size);
-  GNUNET_STATISTICS_update (stats, _("# total size of fragmented messages"),
+  GNUNET_STATISTICS_update (stats,
+                            _("# total size of fragmented messages"),
                             size, GNUNET_NO);
   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
@@ -294,7 +337,8 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
   fc->proc = proc;
   fc->proc_cls = proc_cls;
   fc->fragment_id =
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
+      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                UINT32_MAX);
   memcpy (&fc[1], msg, size);
   bits =
       (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
@@ -323,7 +367,7 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
 {
   GNUNET_assert (fc->proc_busy == GNUNET_YES);
   fc->proc_busy = GNUNET_NO;
-  GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_assert (fc->task == NULL);
   fc->task =
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
                                     (fc->delay_until), &transmit_next, fc);
@@ -336,10 +380,10 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
  *
  * @param fc fragmentation context
  * @param msg acknowledgement message we received
- * @return GNUNET_OK if this ack completes the work of the 'fc'
+ * @return #GNUNET_OK if this ack completes the work of the 'fc'
  *                   (all fragments have been received);
- *         GNUNET_NO if more messages are pending
- *         GNUNET_SYSERR if this ack is not valid for this fc
+ *         #GNUNET_NO if more messages are pending
+ *         #GNUNET_SYSERR if this ack is not valid for this fc
  */
 int
 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
@@ -369,16 +413,15 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
     ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
     fc->ack_delay.rel_value_us =
         (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
-    fc->num_transmissions = 0;
     /* calculate ratio msg sent vs. msg acked */
     ack_cnt = 0;
     snd_cnt = 0;
     for (i=0;i<64;i++)
     {
-      if (1 == (fc->acks_mask & (1 << i)))
+      if (1 == (fc->acks_mask & (1ULL << i)))
       {
        snd_cnt++;
-       if (0 == (abits & (1 << i)))
+       if (0 == (abits & (1ULL << i)))
          ack_cnt++;
       }
     }
@@ -391,18 +434,22 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
     else if (snd_cnt > ack_cnt)
     {
       /* some loss, slow down proportionally */
-      fprintf (stderr, "Prop loss\n");
       fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
     }
-    else if (100 < fc->msg_delay.rel_value_us)
+    else if (snd_cnt == ack_cnt)
     {
-      fc->msg_delay.rel_value_us -= 100; /* try a bit faster */
+      fc->msg_delay.rel_value_us =
+        (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
     }
+    fc->num_transmissions = 0;
     fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
                                              GNUNET_TIME_UNIT_SECONDS);
+    fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
+                                             GNUNET_TIME_UNIT_SECONDS);
   }
   GNUNET_STATISTICS_update (fc->stats,
-                            _("# fragment acknowledgements received"), 1,
+                            _("# fragment acknowledgements received"),
+                            1,
                             GNUNET_NO);
   if (abits != (fc->acks & abits))
   {
@@ -415,7 +462,7 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
   if (0 != fc->acks)
   {
     /* more to transmit, do so right now (if tracker permits...) */
-    if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+    if (fc->task != NULL)
     {
       /* schedule next transmission now, no point in waiting... */
       GNUNET_SCHEDULER_cancel (fc->task);
@@ -432,12 +479,13 @@ GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
 
   /* all done */
   GNUNET_STATISTICS_update (fc->stats,
-                            _("# fragmentation transmissions completed"), 1,
+                            _("# fragmentation transmissions completed"),
+                            1,
                             GNUNET_NO);
-  if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+  if (NULL != fc->task)
   {
     GNUNET_SCHEDULER_cancel (fc->task);
-    fc->task = GNUNET_SCHEDULER_NO_TASK;
+    fc->task = NULL;
   }
   return GNUNET_OK;
 }
@@ -458,7 +506,7 @@ GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
                                 struct GNUNET_TIME_Relative *msg_delay,
                                 struct GNUNET_TIME_Relative *ack_delay)
 {
-  if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+  if (fc->task != NULL)
     GNUNET_SCHEDULER_cancel (fc->task);
   if (NULL != ack_delay)
     *ack_delay = fc->ack_delay;