removing dead code / unused variables
[oweals/gnunet.git] / src / core / gnunet-service-core.c
index f7f5a5fe751c1e5fd95ba649af0edaecfddb464d..b07c427442c2fe530a4fe216f047542f66d20b91 100644 (file)
  * @brief high-level P2P messaging
  * @author Christian Grothoff
  *
- * TODO:
  * POST-TESTING:
- * - revisit API (which arguments are used, needed)?
- * - add code to bound message queue size when passing messages to clients
- * - add code to re-transmit key if first attempt failed
- *   + timeout on connect / key exchange, etc.
- *   + timeout for automatic re-try, etc.
- * - add code to give up re-transmission of key if many attempts fail
- * - add code to send PINGs if we are about to time-out otherwise
- * ? add heuristic to do another send_key in "handle_set_key"
- *   in case previous attempt failed / didn't work / persist
- *   (but don't do it always to avoid storm of SET_KEY's going
- *   back and forth!) --- alternatively, add "status" field
- *   of the other peer to the set key message, that way we'd
- *   know for sure!
- * - check that hostkey used by transport (for HELLOs) is the
- *   same as the hostkey that we are using!
- * - free list of clients on exit
  * - topology management:
  *   + bootstrapping (transport offer hello, plugins)
  *   + internal neighbour selection
- *   + update bandwidth usage statistics
- *   + bandwidth allocation (transport set quota)
+ *
+ * Considerations for later:
+ * - check that hostkey used by transport (for HELLOs) is the
+ *   same as the hostkey that we are using!
+ * - add code to send PINGs if we are about to time-out otherwise
  * - optimize lookup (many O(n) list traversals
  *   could ideally be changed to O(1) hash map lookups)
  */
 #include "platform.h"
+#include "gnunet_constants.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
  */
 #define MAX_WINDOW_TIME (5 * 60 * 1000)
 
-
 /**
- * Amount of bytes per minute (in/out) to assume initially
- * (before either peer has communicated any particular
- * preference).  Should be rather low.
+ * Minimum of bytes per minute (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
+ * sense.
  */
-#define DEFAULT_BPM_IN_OUT 2048
+#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
 
+/**
+ * What is the smallest change (in number of bytes per minute)
+ * that we consider significant enough to bother triggering?
+ */
+#define MIN_BPM_CHANGE 32
 
 /**
  * After how much time past the "official" expiration time do
  */
 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
 
-
 /**
  * What is the maximum delay for a SET_KEY message?
  */
 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
 
-
 /**
  * What how long do we wait for SET_KEY confirmation initially?
  */
 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
 
-
 /**
  * What is the maximum delay for a PING message?
  */
 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
 
-
 /**
  * What is the maximum delay for a PONG message?
  */
 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
 
+/**
+ * How often do we recalculate bandwidth quotas?
+ */
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
 
 /**
  * What is the priority for a SET_KEY message?
  */
 #define SET_KEY_PRIORITY 0xFFFFFF
 
-
 /**
  * What is the priority for a PING message?
  */
 #define PING_PRIORITY 0xFFFFFF
 
-
 /**
  * What is the priority for a PONG message?
  */
 #define PONG_PRIORITY 0xFFFFFF
 
-
 /**
  * How many messages do we queue per peer at most?
  */
 #define MAX_PEER_QUEUE_SIZE 16
 
+/**
+ * How many non-mandatory messages do we queue per client at most?
+ */
+#define MAX_CLIENT_QUEUE_SIZE 32
 
 /**
  * What is the maximum age of a message for us to consider
  */
 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
 
-
 /**
  * What is the maximum size for encrypted messages?  Note that this
  * number imposes a clear limit on the maximum size of any message.
@@ -176,6 +167,7 @@ enum PeerStateMachine
  */
 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
 
+
 /**
  * Encapsulation for encrypted messages exchanged between
  * peers.  Followed by the actual encrypted data.
@@ -399,6 +391,11 @@ struct Neighbour
    */
   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
 
+  /**
+   * ID of task used for updating bandwidth quota for this neighbour.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
+
   /**
    * At what time did we generate our encryption key?
    */
@@ -427,7 +424,7 @@ struct Neighbour
   struct GNUNET_TIME_Relative last_latency;
 
   /**
-   * At what frequency are we currently re-trying SET KEY messages?
+   * At what frequency are we currently re-trying SET_KEY messages?
    */
   struct GNUNET_TIME_Relative set_key_retry_frequency;
 
@@ -448,7 +445,7 @@ struct Neighbour
    * bandwidth-hogs are sampled at a frequency of about 78s!);
    * may get negative if we have VERY high priority content.
    */
-  long long available_send_window;
+  long long available_send_window; 
 
   /**
    * How much downstream capacity of this peer has been reserved for
@@ -457,12 +454,12 @@ struct Neighbour
    * make sure that this reserved amount of bandwidth is actually
    * available).
    */
-  long long available_recv_window;
+  long long available_recv_window; 
 
   /**
    * How valueable were the messages of this peer recently?
    */
-  double current_preference;
+  unsigned long long current_preference;
 
   /**
    * Bit map indicating which of the 32 sequence numbers before the last
@@ -498,7 +495,7 @@ struct Neighbour
 
   /**
    * Internal bandwidth limit set for this peer (initially
-   * typcially set to "-1").  "bpm_out" is MAX of
+   * typically set to "-1").  "bpm_out" is MAX of
    * "bpm_out_internal_limit" and "bpm_out_external_limit".
    */
   uint32_t bpm_out_internal_limit;
@@ -511,7 +508,7 @@ struct Neighbour
   uint32_t bpm_out_external_limit;
 
   /**
-   * What was our PING challenge number?
+   * What was our PING challenge number (for this peer)?
    */
   uint32_t ping_challenge;
 
@@ -637,34 +634,86 @@ static struct GNUNET_SERVER_Handle *server;
  */
 static struct GNUNET_TRANSPORT_Handle *transport;
 
+/**
+ * Linked list of our clients.
+ */
+static struct Client *clients;
+
 /**
  * We keep neighbours in a linked list (for now).
  */
 static struct Neighbour *neighbours;
 
 /**
- * Linked list of our clients.
+ * Sum of all preferences among all neighbours.
  */
-static struct Client *clients;
+static unsigned long long preference_sum;
+
+/**
+ * Total number of neighbours we have.
+ */
+static unsigned int neighbour_count;
+
+/**
+ * How much inbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_in;
+
+/**
+ * How much outbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_out;
+
+
+
+/**
+ * A preference value for a neighbour was update.  Update
+ * the preference sum accordingly.
+ *
+ * @param inc how much was a preference value increased?
+ */
+static void
+update_preference_sum (unsigned long long inc)
+{
+  struct Neighbour *n;
+  unsigned long long os;
+
+  os = preference_sum;
+  preference_sum += inc;
+  if (preference_sum >= os)
+    return; /* done! */
+  /* overflow! compensate by cutting all values in half! */
+  preference_sum = 0;
+  n = neighbours;
+  while (n != NULL)
+    {
+      n->current_preference /= 2;
+      preference_sum += n->current_preference;
+      n = n->next;
+    }    
+}
 
 
 /**
  * Recalculate the number of bytes we expect to
  * receive or transmit in a given window.
  *
+ * @param force force an update now (even if not much time has passed)
  * @param window pointer to the byte counter (updated)
  * @param ts pointer to the timestamp (updated)
  * @param bpm number of bytes per minute that should
  *        be added to the window.
  */
 static void
-update_window (long long *window,
+update_window (int force,
+              long long *window,
                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
 {
   struct GNUNET_TIME_Relative since;
 
   since = GNUNET_TIME_absolute_get_duration (*ts);
-  if (since.value < 60 * 1000)
+  if ( (force == GNUNET_NO) &&
+       (since.value < 60 * 1000) )
     return;                     /* not even a minute has passed */
   *ts = GNUNET_TIME_absolute_get ();
   *window += (bpm * since.value) / 60 / 1000;
@@ -723,6 +772,7 @@ static void request_transmit (struct Client *client);
 
 /**
  * Client is ready to receive data, provide it.
+ *
  * @param cls closure
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
@@ -809,6 +859,7 @@ send_to_client (struct Client *client,
                 const struct GNUNET_MessageHeader *msg, int can_drop)
 {
   struct Event *e;
+  unsigned int queue_size;
   uint16_t msize;
 
 #if DEBUG_CORE_CLIENT
@@ -816,6 +867,23 @@ send_to_client (struct Client *client,
               "Preparing to send message of type %u to client.\n",
               ntohs (msg->type));
 #endif
+  queue_size = 0;
+  e = client->event_head;
+  while (e != NULL)
+    {
+      queue_size++;
+      e = e->next;
+    }
+  if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
+       (can_drop == GNUNET_YES) )
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Too many messages in queue for the client, dropping the new message.\n");
+#endif
+      return;
+    }
+
   msize = ntohs (msg->size);
   e = GNUNET_malloc (sizeof (struct Event) + msize);
   /* append */
@@ -921,8 +989,7 @@ handle_client_init (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
 #endif
-      cnm.bpm_available = htonl (n->bpm_out);
-      cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
+      cnm.reserved = htonl (0);
       cnm.peer = n->peer;
       send_to_client (c, &cnm.header, GNUNET_NO);
       n = n->next;
@@ -988,6 +1055,7 @@ handle_client_request_configure (void *cls,
   struct ConfigurationInfoMessage cim;
   struct Client *c;
   int reserv;
+  unsigned long long old_preference;
 
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -998,6 +1066,10 @@ handle_client_request_configure (void *cls,
   memset (&cim, 0, sizeof (cim));
   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
     {
+      update_window (GNUNET_YES,
+                    &n->available_send_window,
+                    &n->last_asw_update,
+                    n->bpm_out);
       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
                                n->bpm_out_external_limit);
@@ -1008,15 +1080,21 @@ handle_client_request_configure (void *cls,
         }
       else if (reserv > 0)
         {
-          update_window (&n->available_recv_window,
+          update_window (GNUNET_NO,
+                        &n->available_recv_window,
                          &n->last_arw_update, n->bpm_in);
           if (n->available_recv_window < reserv)
             reserv = n->available_recv_window;
           n->available_recv_window -= reserv;
         }
-      n->current_preference += rcm->preference_change;
-      if (n->current_preference < 0)
-        n->current_preference = 0;
+      old_preference = n->current_preference;
+      n->current_preference += GNUNET_ntohll(rcm->preference_change);
+      if (old_preference > n->current_preference) 
+       {
+         /* overflow; cap at maximum value */
+         n->current_preference = (unsigned long long) -1;
+       }
+      update_preference_sum (n->current_preference - old_preference);
       cim.reserved_amount = htonl (reserv);
       cim.bpm_in = htonl (n->bpm_in);
       cim.bpm_out = htonl (n->bpm_out);
@@ -1079,6 +1157,7 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
       GNUNET_assert (size >= m->size);
       memcpy (cbuf, &m[1], m->size);
       ret = m->size;
+      n->available_send_window -= m->size;
       process_encrypted_neighbour_queue (n);
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1141,6 +1220,7 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
   n->th =
     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
                                             n->encrypted_head->size,
+                                           n->encrypted_head->priority,
                                             GNUNET_TIME_absolute_get_remaining
                                             (n->encrypted_head->deadline),
                                             &notify_encrypted_transmit_ready,
@@ -1291,10 +1371,14 @@ select_messages (struct Neighbour *n,
       min = NULL;
       min_prio = -1;
       discard_low_prio = GNUNET_NO;
-      /* number of bytes available for transmission at time "t" */
+      /* calculate number of bytes available for transmission at time "t" */
+      update_window (GNUNET_NO,
+                    &n->available_send_window,
+                    &n->last_asw_update,
+                    n->bpm_out);
       avail = n->available_send_window;
       t = n->last_asw_update;
-      /* how many bytes have we (hyptothetically) scheduled so far */
+      /* how many bytes have we (hypothetically) scheduled so far */
       off = 0;
       /* maximum time we can wait before transmitting anything
          and still make all of our deadlines */
@@ -1359,7 +1443,7 @@ select_messages (struct Neighbour *n,
     }
   /* guard against sending "tiny" messages with large headers without
      urgent deadlines */
-  if ((slack > 1000) && (size > 4 * off))
+  if ( (slack > 1000) && (size > 4 * off) )
     {
       /* less than 25% of message would be filled with
          deadlines still being met if we delay by one
@@ -1474,16 +1558,23 @@ discard_expired_messages (struct Neighbour *n)
   struct MessageEntry *prev;
   struct MessageEntry *next;
   struct MessageEntry *pos;
-  struct GNUNET_TIME_Absolute cutoff;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative delta;
 
-  cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME);
+  now = GNUNET_TIME_absolute_get ();
   prev = NULL;
   pos = n->messages;
   while (pos != NULL) 
     {
       next = pos->next;
-      if (pos->deadline.value < cutoff.value)
+      delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
+      if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
        {
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                     "Message is %llu ms past due, discarding.\n",
+                     delta.value);
+#endif
          if (prev == NULL)
            n->messages = next;
          else
@@ -1579,6 +1670,7 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
       /* ready to continue */
       break;
     }
+  discard_expired_messages (n);
   if (n->messages == NULL)
     {
 #if DEBUG_CORE
@@ -1588,7 +1680,6 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
 #endif
       return;                   /* no pending messages */
     }
-  discard_expired_messages (n);
   if (n->encrypted_head != NULL)
     {
 #if DEBUG_CORE
@@ -1603,7 +1694,6 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
   priority = 0;
   used = sizeof (struct EncryptedMessage);
-
   used += batch_message (n,
                          &pbuf[used],
                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
@@ -1757,7 +1847,8 @@ handle_client_send (void *cls,
                   "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
                  "SEND",
                   GNUNET_i2s (&sm->peer),
-                 sm->deadline.value);
+                 GNUNET_TIME_absolute_get_remaining
+                 (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
 #endif
       msize += sizeof (struct SendMessage);
       /* ask transport to connect to the peer */
@@ -1766,7 +1857,7 @@ handle_client_send (void *cls,
       if (NULL ==
          GNUNET_TRANSPORT_notify_transmit_ready (transport,
                                                  &sm->peer,
-                                                 0,
+                                                 0, 0,
                                                  GNUNET_TIME_absolute_get_remaining
                                                  (GNUNET_TIME_absolute_ntoh
                                                   (sm->deadline)),
@@ -2224,8 +2315,10 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
 #endif
   if (n->public_key == NULL)
     {
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Lacking public key for peer, trying to obtain one.\n");
+#endif
       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
       /* lookup n's public key, then try again */
@@ -2578,7 +2671,7 @@ handle_encrypted_message (struct Neighbour *n,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
-#endif
+#endif  
   /* decrypt */
   if (GNUNET_OK !=
       do_decrypt (n,
@@ -2646,6 +2739,10 @@ handle_encrypted_message (struct Neighbour *n,
     }
 
   /* process decrypted message(s) */
+  update_window (GNUNET_YES,
+                &n->available_send_window,
+                &n->last_asw_update,
+                n->bpm_out);
   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
                            n->bpm_out_internal_limit);
@@ -2768,6 +2865,87 @@ handle_transport_receive (void *cls,
 }
 
 
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ * 
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedule the task that will recalculate the bandwidth
+ * quota for this peer (and possibly force a disconnect of
+ * idle peers by calculating a bandwidth of zero).
+ */
+static void
+schedule_quota_update (struct Neighbour *n)
+{
+  GNUNET_assert (n->quota_update_task ==
+                GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+  n->quota_update_task
+    = GNUNET_SCHEDULER_add_delayed (sched,
+                                   GNUNET_NO,
+                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+                                   QUOTA_UPDATE_FREQUENCY,
+                                   &neighbour_quota_update,
+                                   n);
+}
+
+
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ * 
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+  uint32_t q_in;
+  double pref_rel;
+  double share;
+  unsigned long long distributable;
+  
+  n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+  /* calculate relative preference among all neighbours;
+     divides by a bit more to avoid division by zero AND to
+     account for possibility of new neighbours joining any time 
+     AND to convert to double... */
+  pref_rel = n->current_preference / (1.0 + preference_sum);
+  share = 0;
+  distributable = 0;
+  if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
+    distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
+  share = distributable * pref_rel;
+  q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
+  /* check if we want to disconnect for good due to inactivity */
+  if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
+       (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
+    q_in = 0; /* force disconnect */
+  if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
+       (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
+    {
+      n->bpm_in = q_in;
+      GNUNET_TRANSPORT_set_quota (transport,
+                                 &n->peer,
+                                 n->bpm_in, 
+                                 n->bpm_out,
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 NULL, NULL);
+    }
+  schedule_quota_update (n);
+}
+
+
 /**
  * Function called by transport to notify us that
  * a peer connected to us (on the network level).
@@ -2796,6 +2974,7 @@ handle_transport_notify_connect (void *cls,
   n = GNUNET_malloc (sizeof (struct Neighbour));
   n->next = neighbours;
   neighbours = n;
+  neighbour_count++;
   n->peer = *peer;
   n->last_latency = latency;
   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
@@ -2803,10 +2982,10 @@ handle_transport_notify_connect (void *cls,
   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
   n->last_asw_update = now;
   n->last_arw_update = now;
-  n->bpm_in = DEFAULT_BPM_IN_OUT;
-  n->bpm_out = DEFAULT_BPM_IN_OUT;
+  n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
+  n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
   n->bpm_out_internal_limit = (uint32_t) - 1;
-  n->bpm_out_external_limit = DEFAULT_BPM_IN_OUT;
+  n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                 (uint32_t) - 1);
 #if DEBUG_CORE
@@ -2814,11 +2993,11 @@ handle_transport_notify_connect (void *cls,
               "Received connection from `%4s'.\n",
               GNUNET_i2s (&n->peer));
 #endif
+  schedule_quota_update (n);
   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-  cnm.bpm_available = htonl (DEFAULT_BPM_IN_OUT);
+  cnm.reserved = htonl (0);
   cnm.peer = *peer;
-  cnm.last_activity = GNUNET_TIME_absolute_hton (now);
   send_to_all_clients (&cnm.header, GNUNET_YES);
 }
 
@@ -2826,6 +3005,7 @@ handle_transport_notify_connect (void *cls,
 /**
  * Free the given entry for the neighbour (it has
  * already been removed from the list at this point).
+ *
  * @param n neighbour to free
  */
 static void
@@ -2849,6 +3029,8 @@ free_neighbour (struct Neighbour *n)
     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+  if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
   GNUNET_free_non_null (n->public_key);
   GNUNET_free_non_null (n->pending_ping);
   GNUNET_free (n);
@@ -2891,11 +3073,12 @@ handle_transport_notify_disconnect (void *cls,
     neighbours = n->next;
   else
     p->next = n->next;
+  GNUNET_assert (neighbour_count > 0);
+  neighbour_count--;
   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
-  cnm.bpm_available = htonl (0);
+  cnm.reserved = htonl (0);
   cnm.peer = *peer;
-  cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
   send_to_all_clients (&cnm.header, GNUNET_YES);
   free_neighbour (n);
 }
@@ -2921,6 +3104,8 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   while (NULL != (n = neighbours))
     {
       neighbours = n->next;
+      GNUNET_assert (neighbour_count > 0);
+      neighbour_count--;
       free_neighbour (n);
     }
   while (NULL != (c = clients))
@@ -2952,12 +3137,17 @@ run (void *cls,
   cfg = c;
   /* parse configuration */
   if (
-#if 0
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
-                                               "XX",
-                                               &qin)) ||
+                                               "TOTAL_QUOTA_IN",
+                                               &bandwidth_target_in)) ||
+       (GNUNET_OK !=
+        GNUNET_CONFIGURATION_get_value_number (c,
+                                               "CORE",
+                                               "TOTAL_QUOTA_OUT",
+                                               &bandwidth_target_out)) ||
+#if 0
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
@@ -3021,8 +3211,6 @@ run (void *cls,
 static void
 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-
-
   if (my_private_key != NULL)
     GNUNET_CRYPTO_rsa_key_free (my_private_key);
 }