extending bandwidth tracker api to support notifications
[oweals/gnunet.git] / src / util / bandwidth.c
index 099c7d08090b1a1ceb560e209798e0fc0b5072dc..f056c1aeeb42b792634379c6c3fb9e4aa02e2055 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
 /*
      This file is part of GNUnet.
-     (C) 2010 Christian Grothoff (and other contributing authors)
+     (C) 2010, 2013 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
 
 /**
  * @file util/bandwidth.c
 
 /**
  * @file util/bandwidth.c
- * @brief functions related to bandwidth (unit) 
+ * @brief functions related to bandwidth (unit)
  * @author Christian Grothoff
  */
 #include "platform.h"
  * @author Christian Grothoff
  */
 #include "platform.h"
-#include "gnunet_bandwidth_lib.h"
-#include "gnunet_server_lib.h"
+#include "gnunet_util_lib.h"
+
+
+#define LOG(kind,...) GNUNET_log_from (kind, "util-bandwidth", __VA_ARGS__)
 
 /**
  * Create a new bandwidth value.
 
 /**
  * Create a new bandwidth value.
@@ -38,6 +40,8 @@ GNUNET_BANDWIDTH_value_init (uint32_t bytes_per_second)
 {
   struct GNUNET_BANDWIDTH_Value32NBO ret;
 
 {
   struct GNUNET_BANDWIDTH_Value32NBO ret;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Initializing bandwidth of %u Bps\n",
+       (unsigned int) bytes_per_second);
   ret.value__ = htonl (bytes_per_second);
   return ret;
 }
   ret.value__ = htonl (bytes_per_second);
   return ret;
 }
@@ -52,10 +56,11 @@ GNUNET_BANDWIDTH_value_init (uint32_t bytes_per_second)
  */
 struct GNUNET_BANDWIDTH_Value32NBO
 GNUNET_BANDWIDTH_value_min (struct GNUNET_BANDWIDTH_Value32NBO b1,
  */
 struct GNUNET_BANDWIDTH_Value32NBO
 GNUNET_BANDWIDTH_value_min (struct GNUNET_BANDWIDTH_Value32NBO b1,
-                           struct GNUNET_BANDWIDTH_Value32NBO b2)
+                            struct GNUNET_BANDWIDTH_Value32NBO b2)
 {
 {
-  return GNUNET_BANDWIDTH_value_init (GNUNET_MIN (ntohl (b1.value__),
-                                                 ntohl (b2.value__)));
+  return
+      GNUNET_BANDWIDTH_value_init (GNUNET_MIN
+                                   (ntohl (b1.value__), ntohl (b2.value__)));
 }
 
 
 }
 
 
@@ -67,14 +72,20 @@ GNUNET_BANDWIDTH_value_min (struct GNUNET_BANDWIDTH_Value32NBO b1,
  * @param deadline when is the deadline
  * @return number of bytes available at bps until deadline
  */
  * @param deadline when is the deadline
  * @return number of bytes available at bps until deadline
  */
-uint64_t 
-GNUNET_BANDWIDTH_value_get_available_until (struct GNUNET_BANDWIDTH_Value32NBO bps,
-                                           struct GNUNET_TIME_Relative deadline)
+uint64_t
+GNUNET_BANDWIDTH_value_get_available_until (struct GNUNET_BANDWIDTH_Value32NBO
+                                            bps,
+                                            struct GNUNET_TIME_Relative
+                                            deadline)
 {
   uint64_t b;
 
   b = ntohl (bps.value__);
 {
   uint64_t b;
 
   b = ntohl (bps.value__);
-  return b * deadline.value / 1000LL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Bandwidth has %llu bytes available until deadline in %s\n",
+       (unsigned long long) ((b * deadline.rel_value_us + 500000LL) / 1000000LL),
+       GNUNET_STRINGS_relative_time_to_string (deadline, GNUNET_YES));
+  return (b * deadline.rel_value_us + 500000LL) / 1000000LL;
 }
 
 
 }
 
 
@@ -88,19 +99,28 @@ GNUNET_BANDWIDTH_value_get_available_until (struct GNUNET_BANDWIDTH_Value32NBO b
  */
 struct GNUNET_TIME_Relative
 GNUNET_BANDWIDTH_value_get_delay_for (struct GNUNET_BANDWIDTH_Value32NBO bps,
  */
 struct GNUNET_TIME_Relative
 GNUNET_BANDWIDTH_value_get_delay_for (struct GNUNET_BANDWIDTH_Value32NBO bps,
-                                     uint64_t size)
+                                      uint64_t size)
 {
   uint64_t b;
   struct GNUNET_TIME_Relative ret;
 
   b = ntohl (bps.value__);
 {
   uint64_t b;
   struct GNUNET_TIME_Relative ret;
 
   b = ntohl (bps.value__);
-  if (b == 0)
+  if (0 == b)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Bandwidth suggests delay of infinity (zero bandwidth)\n");
     return GNUNET_TIME_UNIT_FOREVER_REL;
     return GNUNET_TIME_UNIT_FOREVER_REL;
-  ret.value = size * 1000LL / b;
+  }
+  ret.rel_value_us = size * 1000LL * 1000LL / b;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Bandwidth suggests delay of %s for %llu bytes of traffic\n",
+       GNUNET_STRINGS_relative_time_to_string (ret, GNUNET_YES),
+       (unsigned long long) size);
   return ret;
 }
 
 
   return ret;
 }
 
 
+
 /**
  * Initialize bandwidth tracker.  Note that in addition to the
  * 'max_carry_s' limit, we also always allow at least
 /**
  * Initialize bandwidth tracker.  Note that in addition to the
  * 'max_carry_s' limit, we also always allow at least
@@ -111,19 +131,28 @@ GNUNET_BANDWIDTH_value_get_delay_for (struct GNUNET_BANDWIDTH_Value32NBO bps,
  * bytes).
  *
  * @param av tracker to initialize
  * bytes).
  *
  * @param av tracker to initialize
+ * @param update_cb callback to notify a client about the tracker being updated
+ * @param update_cb_cls cls for the callback
  * @param bytes_per_second_limit initial limit to assume
  * @param max_carry_s maximum number of seconds unused bandwidth
  *        may accumulate before it expires
  */
 void
 GNUNET_BANDWIDTH_tracker_init (struct GNUNET_BANDWIDTH_Tracker *av,
  * @param bytes_per_second_limit initial limit to assume
  * @param max_carry_s maximum number of seconds unused bandwidth
  *        may accumulate before it expires
  */
 void
 GNUNET_BANDWIDTH_tracker_init (struct GNUNET_BANDWIDTH_Tracker *av,
-                              struct GNUNET_BANDWIDTH_Value32NBO bytes_per_second_limit,
-                              uint32_t max_carry_s)
+                               GNUNET_BANDWIDTH_tracker_update_cb update_cb,
+                               void *update_cb_cls,
+                               struct GNUNET_BANDWIDTH_Value32NBO
+                               bytes_per_second_limit, uint32_t max_carry_s)
 {
 {
+  av->update_cb = update_cb;
+  av->update_cb_cls = update_cb_cls;
   av->consumption_since_last_update__ = 0;
   av->last_update__ = GNUNET_TIME_absolute_get ();
   av->available_bytes_per_s__ = ntohl (bytes_per_second_limit.value__);
   av->max_carry_s__ = max_carry_s;
   av->consumption_since_last_update__ = 0;
   av->last_update__ = GNUNET_TIME_absolute_get ();
   av->available_bytes_per_s__ = ntohl (bytes_per_second_limit.value__);
   av->max_carry_s__ = max_carry_s;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Tracker %p initialized with %u Bps and max carry %u\n", av,
+       (unsigned int) av->available_bytes_per_s__, (unsigned int) max_carry_s);
 }
 
 
 }
 
 
@@ -137,43 +166,35 @@ static void
 update_tracker (struct GNUNET_BANDWIDTH_Tracker *av)
 {
   struct GNUNET_TIME_Absolute now;
 update_tracker (struct GNUNET_BANDWIDTH_Tracker *av)
 {
   struct GNUNET_TIME_Absolute now;
-  uint64_t avail_per_ms;
+  struct GNUNET_TIME_Relative delta;
   uint64_t delta_time;
   uint64_t delta_avail;
   uint64_t left_bytes;
   uint64_t delta_time;
   uint64_t delta_avail;
   uint64_t left_bytes;
-  uint64_t left_time_ms;
+  uint64_t max_carry;
 
   now = GNUNET_TIME_absolute_get ();
 
   now = GNUNET_TIME_absolute_get ();
-  delta_time = now.value - av->last_update__.value;
-  delta_avail = (delta_time * ((unsigned long long) av->available_bytes_per_s__)) / 1000LL;
-  if (av->consumption_since_last_update__ >= delta_avail)
-    {
-      av->consumption_since_last_update__ -= delta_avail;
-      av->last_update__ = now;
-    }
-  else
-    {
-      left_bytes = delta_avail - av->consumption_since_last_update__;
-      avail_per_ms = ((unsigned long long) av->available_bytes_per_s__) / 1000LL;
-      if (avail_per_ms > 0)
-       left_time_ms = left_bytes / avail_per_ms;
-      else
-       left_time_ms = 0;
-      if (left_time_ms > ((unsigned long long) av->max_carry_s__) * 1000LL)
-       {
-         /* need to limit accumulation of unused bandwidth */
-         left_time_ms = ((unsigned long long) av->max_carry_s__) * 1000LL;
-         if (left_time_ms * avail_per_ms < GNUNET_SERVER_MAX_MESSAGE_SIZE)
-           {
-             /* need to still allow GNUNET_SERVER_MAX_MESSAGE_SIZE accumulation */
-             if (left_bytes > GNUNET_SERVER_MAX_MESSAGE_SIZE)
-               left_bytes = GNUNET_SERVER_MAX_MESSAGE_SIZE;
-             left_time_ms = left_bytes / avail_per_ms;
-           }
-       }
-      av->consumption_since_last_update__ = 0;
-      av->last_update__.value = now.value - left_time_ms;
-    }
+  delta_time = now.abs_value_us - av->last_update__.abs_value_us;
+  delta_avail =
+      (delta_time * ((unsigned long long) av->available_bytes_per_s__) +
+       500000LL) / 1000000LL;
+  av->consumption_since_last_update__ -= delta_avail;
+  av->last_update__ = now;
+  if (av->consumption_since_last_update__ < 0)
+  {
+    left_bytes = -av->consumption_since_last_update__;
+    max_carry = av->available_bytes_per_s__ * av->max_carry_s__;
+    if (max_carry < GNUNET_SERVER_MAX_MESSAGE_SIZE)
+      max_carry = GNUNET_SERVER_MAX_MESSAGE_SIZE;
+    if (max_carry > left_bytes)
+      av->consumption_since_last_update__ = -left_bytes;
+    else
+      av->consumption_since_last_update__ = -max_carry;
+  }
+  delta.rel_value_us = delta_time;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Tracker %p updated, have %u Bps, last update was %s ago\n", av,
+       (unsigned int) av->available_bytes_per_s__,
+       GNUNET_STRINGS_relative_time_to_string (delta, GNUNET_YES));
 }
 
 
 }
 
 
@@ -190,28 +211,34 @@ update_tracker (struct GNUNET_BANDWIDTH_Tracker *av)
  */
 int
 GNUNET_BANDWIDTH_tracker_consume (struct GNUNET_BANDWIDTH_Tracker *av,
  */
 int
 GNUNET_BANDWIDTH_tracker_consume (struct GNUNET_BANDWIDTH_Tracker *av,
-                                 ssize_t size)
+                                  ssize_t size)
 {
 {
-  uint64_t nc;
+  int64_t nc;
 
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Tracker %p consumes %d bytes\n", av,
+       (int) size);
   if (size > 0)
   if (size > 0)
+  {
+    nc = av->consumption_since_last_update__ + size;
+    if (nc < av->consumption_since_last_update__)
     {
     {
-      nc = av->consumption_since_last_update__ + size;
-      if (nc < av->consumption_since_last_update__) 
-       {
-         GNUNET_break (0);
-         return GNUNET_SYSERR;
-       }
-      av->consumption_since_last_update__ = nc;
-      update_tracker (av);
-      if (av->consumption_since_last_update__ > 0)
-       return GNUNET_YES;
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
     }
     }
-  else
+    av->consumption_since_last_update__ = nc;
+    update_tracker (av);
+    if (av->consumption_since_last_update__ > 0)
     {
     {
-      av->last_update__.value -= (size * av->available_bytes_per_s__) / 1000LL;
-      update_tracker (av);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Tracker %p consumption %llu bytes above limit\n", av,
+           (unsigned long long) av->consumption_since_last_update__);
+      return GNUNET_YES;
     }
     }
+  }
+  else
+  {
+    av->consumption_since_last_update__ += size;
+  }
   return GNUNET_NO;
 }
 
   return GNUNET_NO;
 }
 
@@ -223,28 +250,37 @@ GNUNET_BANDWIDTH_tracker_consume (struct GNUNET_BANDWIDTH_Tracker *av,
  *
  * @param av tracker to query
  * @param size number of bytes we would like to consume
  *
  * @param av tracker to query
  * @param size number of bytes we would like to consume
- * @return time to wait for consumption to be OK
+ * @return time in ms to wait for consumption to be OK
  */
 struct GNUNET_TIME_Relative
 GNUNET_BANDWIDTH_tracker_get_delay (struct GNUNET_BANDWIDTH_Tracker *av,
  */
 struct GNUNET_TIME_Relative
 GNUNET_BANDWIDTH_tracker_get_delay (struct GNUNET_BANDWIDTH_Tracker *av,
-                                   size_t size)
+                                    size_t size)
 {
   struct GNUNET_TIME_Relative ret;
 {
   struct GNUNET_TIME_Relative ret;
-  struct GNUNET_TIME_Absolute now;
-  uint64_t delta_avail;
-  uint64_t delta_time;
-  uint64_t bytes_needed;
+  int64_t bytes_needed;
 
   if (av->available_bytes_per_s__ == 0)
 
   if (av->available_bytes_per_s__ == 0)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Tracker %p delay is infinity\n", av);
     return GNUNET_TIME_UNIT_FOREVER_REL;
     return GNUNET_TIME_UNIT_FOREVER_REL;
+  }
   update_tracker (av);
   update_tracker (av);
-  now = GNUNET_TIME_absolute_get ();
-  delta_time = now.value - av->last_update__.value;
-  delta_avail = (delta_time * ((unsigned long long) av->available_bytes_per_s__)) / 1000LL;
-  if (delta_avail >= size)
+  bytes_needed = size + av->consumption_since_last_update__;
+  if (bytes_needed <= 0)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Tracker %p delay for %u bytes is zero\n", av,
+         (unsigned int) size);
     return GNUNET_TIME_UNIT_ZERO;
     return GNUNET_TIME_UNIT_ZERO;
-  bytes_needed = size - delta_avail;
-  ret.value = 1000LL * bytes_needed / (unsigned long long) av->available_bytes_per_s__;
+  }
+  ret.rel_value_us =
+      (1000LL * 1000LL * bytes_needed) /
+      (unsigned long long) av->available_bytes_per_s__;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Tracker %p delay for %u bytes is %s\n",
+       av, (unsigned int) size,
+       GNUNET_STRINGS_relative_time_to_string (ret, GNUNET_YES));
   return ret;
 }
 
   return ret;
 }
 
@@ -256,18 +292,23 @@ GNUNET_BANDWIDTH_tracker_get_delay (struct GNUNET_BANDWIDTH_Tracker *av,
  * @param av tracker to query
  * @return number of bytes available for consumption right now
  */
  * @param av tracker to query
  * @return number of bytes available for consumption right now
  */
-int64_t 
-GNUNET_BANDWIDTH_tracker_get_available (struct GNUNET_BANDWIDTH_Tracker *av)
+int64_t
+GNUNET_BANDWIDTH_tracker_get_available (struct GNUNET_BANDWIDTH_Tracker * av)
 {
   struct GNUNET_BANDWIDTH_Value32NBO bps;
   uint64_t avail;
 {
   struct GNUNET_BANDWIDTH_Value32NBO bps;
   uint64_t avail;
-  uint64_t used;
+  int64_t used;
 
   update_tracker (av);
   bps = GNUNET_BANDWIDTH_value_init (av->available_bytes_per_s__);
 
   update_tracker (av);
   bps = GNUNET_BANDWIDTH_value_init (av->available_bytes_per_s__);
-  avail = GNUNET_BANDWIDTH_value_get_available_until (bps,
-                                                     GNUNET_TIME_absolute_get_duration (av->last_update__));
+  avail =
+      GNUNET_BANDWIDTH_value_get_available_until (bps,
+                                                  GNUNET_TIME_absolute_get_duration
+                                                  (av->last_update__));
   used = av->consumption_since_last_update__;
   used = av->consumption_since_last_update__;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Tracker %p available bandwidth is %lld bytes\n", av,
+       (long long) (int64_t) (avail - used));
   return (int64_t) (avail - used);
 }
 
   return (int64_t) (avail - used);
 }
 
@@ -280,17 +321,23 @@ GNUNET_BANDWIDTH_tracker_get_available (struct GNUNET_BANDWIDTH_Tracker *av)
  */
 void
 GNUNET_BANDWIDTH_tracker_update_quota (struct GNUNET_BANDWIDTH_Tracker *av,
  */
 void
 GNUNET_BANDWIDTH_tracker_update_quota (struct GNUNET_BANDWIDTH_Tracker *av,
-                                      struct GNUNET_BANDWIDTH_Value32NBO bytes_per_second_limit)
+                                       struct GNUNET_BANDWIDTH_Value32NBO
+                                       bytes_per_second_limit)
 {
   uint32_t old_limit;
   uint32_t new_limit;
 
   new_limit = ntohl (bytes_per_second_limit.value__);
 {
   uint32_t old_limit;
   uint32_t new_limit;
 
   new_limit = ntohl (bytes_per_second_limit.value__);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Tracker %p bandwidth changed to %u Bps\n", av,
+       (unsigned int) new_limit);
   update_tracker (av);
   old_limit = av->available_bytes_per_s__;
   av->available_bytes_per_s__ = new_limit;
   update_tracker (av);
   old_limit = av->available_bytes_per_s__;
   av->available_bytes_per_s__ = new_limit;
+  if (NULL != av->update_cb)
+    av->update_cb (av->update_cb_cls);
   if (old_limit > new_limit)
   if (old_limit > new_limit)
-    update_tracker (av); /* maximum excess might be less now */
+    update_tracker (av);        /* maximum excess might be less now */
 }
 
 
 }