more stats
[oweals/gnunet.git] / src / core / gnunet-service-core.c
index ff71eddd39018b82eea597cc253f65555dd54bb8..8bf6f0798914d92a6ab7a2b5abd7028ef514f344 100644 (file)
 #include "gnunet_peerinfo_service.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
 #include "gnunet_transport_service.h"
 #include "core.h"
 
 
 #define DEBUG_HANDSHAKE GNUNET_NO
 
+#define DEBUG_CORE_QUOTA GNUNET_NO
+
 /**
  * Receive and send buffer windows grow over time.  For
  * how long can 'unused' bandwidth accumulate before we
- * need to cap it?  (specified in ms).
+ * need to cap it?  (specified in seconds).
  */
-#define MAX_WINDOW_TIME (5 * 60 * 1000)
+#define MAX_WINDOW_TIME_S (5 * 60)
 
 /**
  * How many messages do we queue up at most for optional
 #define MAX_NOTIFY_QUEUE 16
 
 /**
- * Minimum of bytes per minute (out) to assign to any connected peer.
- * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
+ * Minimum bandwidth (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
  * sense.
  */
-#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
-
-/**
- * What is the smallest change (in number of bytes per minute)
- * that we consider significant enough to bother triggering?
- */
-#define MIN_BPM_CHANGE 32
+#define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
 
 /**
  * After how much time past the "official" expiration time do
  */
 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
 
+/**
+ * How long do we delay messages to get larger packet sizes (CORKing)?
+ */
+#define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
+
 /**
  * What is the maximum delay for a SET_KEY message?
  */
-#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
  * What how long do we wait for SET_KEY confirmation initially?
  */
-#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
+#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
 
 /**
  * What is the maximum delay for a PING message?
  */
-#define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
 
 /**
  * What is the maximum delay for a PONG message?
  */
-#define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
+
+/**
+ * What is the minimum frequency for a PING message?
+ */
+#define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
  * How often do we recalculate bandwidth quotas?
  */
-#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
  * What is the priority for a SET_KEY message?
 #define PONG_PRIORITY 0xFFFFFF
 
 /**
- * How many messages do we queue per peer at most?
+ * How many messages do we queue per peer at most?  Must be at
+ * least two.
  */
 #define MAX_PEER_QUEUE_SIZE 16
 
@@ -169,7 +177,7 @@ enum PeerStateMachine
  * Number of bytes (at the beginning) of "struct EncryptedMessage"
  * that are NOT encrypted.
  */
-#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
+#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
 
 
 /**
@@ -184,15 +192,15 @@ struct EncryptedMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Always zero.
+   * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
+   * be set to the offset of the *next* field.
    */
-  uint32_t reserved GNUNET_PACKED;
+  uint32_t iv_seed GNUNET_PACKED;
 
   /**
-   * Hash of the plaintext, used to verify message integrity;
-   * ALSO used as the IV for the symmetric cipher!  Everything
-   * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
-   * must be set to the offset of the next field.
+   * Hash of the plaintext (starting at 'sequence_number'), used to
+   * verify message integrity.  Everything after this hash (including
+   * this hash itself) will be encrypted.  
    */
   GNUNET_HashCode plaintext_hash;
 
@@ -204,11 +212,10 @@ struct EncryptedMessage
   uint32_t sequence_number GNUNET_PACKED;
 
   /**
-   * Desired bandwidth (how much we should send to this
-   * peer / how much is the sender willing to receive),
-   * in bytes per minute.
+   * Desired bandwidth (how much we should send to this peer / how
+   * much is the sender willing to receive)?
    */
-  uint32_t inbound_bpm_limit GNUNET_PACKED;
+  struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
 
   /**
    * Timestamp.  Used to prevent reply of ancient messages
@@ -218,6 +225,7 @@ struct EncryptedMessage
 
 };
 
+
 /**
  * We're sending an (encrypted) PING to the other peer to check if he
  * can decrypt.  The other peer should respond with a PONG with the
@@ -226,7 +234,7 @@ struct EncryptedMessage
 struct PingMessage
 {
   /**
-   * Message type is either CORE_PING or CORE_PONG.
+   * Message type is CORE_PING.
    */
   struct GNUNET_MessageHeader header;
 
@@ -243,6 +251,43 @@ struct PingMessage
 };
 
 
+
+/**
+ * Response to a PING.  Includes data from the original PING
+ * plus initial bandwidth quota information.
+ */
+struct PongMessage
+{
+  /**
+   * Message type is CORE_PONG.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Random number proochosen to make reply harder.  Must be
+   * first field after header (this is where we start to encrypt!).
+   */
+  uint32_t challenge GNUNET_PACKED;
+
+  /**
+   * Must be zero.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /**
+   * Desired bandwidth (how much we should send to this
+   * peer / how much is the sender willing to receive).
+   */
+  struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
+
+  /**
+   * Intended target of the PING, used primarily to check
+   * that decryption actually worked.
+   */
+  struct GNUNET_PeerIdentity target;
+};
+
+
 /**
  * Message transmitted to set (or update) a session key.
  */
@@ -296,15 +341,26 @@ struct MessageEntry
 {
 
   /**
-   * We keep messages in a linked list (for now).
+   * We keep messages in a doubly linked list.
    */
   struct MessageEntry *next;
 
+  /**
+   * We keep messages in a doubly linked list.
+   */
+  struct MessageEntry *prev;
+
   /**
    * By when are we supposed to transmit this message?
    */
   struct GNUNET_TIME_Absolute deadline;
 
+  /**
+   * By when are we supposed to transmit this message (after
+   * giving slack)?
+   */
+  struct GNUNET_TIME_Absolute slack_deadline;
+
   /**
    * How important is this message to us?
    */
@@ -380,7 +436,7 @@ struct Neighbour
    * (or the SET_KEY).  We keep it here until we have a key
    * to decrypt it.  NULL if no PONG is pending.
    */
-  struct PingMessage *pending_pong;
+  struct PongMessage *pending_pong;
 
   /**
    * Non-NULL if we are currently looking up HELLOs for this peer.
@@ -426,6 +482,11 @@ struct Neighbour
    */
   GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
 
+  /**
+   * ID of task used for sending keep-alive pings.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
+
   /**
    * ID of task used for cleaning up dead neighbour entries.
    */
@@ -464,32 +525,14 @@ struct Neighbour
   struct GNUNET_TIME_Relative set_key_retry_frequency;
 
   /**
-   * Time of our last update to the "available_send_window".
-   */
-  struct GNUNET_TIME_Absolute last_asw_update;
-
-  /**
-   * Time of our last update to the "available_recv_window".
+   * Tracking bandwidth for sending to this peer.
    */
-  struct GNUNET_TIME_Absolute last_arw_update;
+  struct GNUNET_BANDWIDTH_Tracker available_send_window;
 
   /**
-   * Number of bytes that we are eligible to transmit to this
-   * peer at this point.  Incremented every minute by max_out_bpm,
-   * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
-   * bandwidth-hogs are sampled at a frequency of about 78s!);
-   * may get negative if we have VERY high priority content.
+   * Tracking bandwidth for receiving from this peer.
    */
-  long long available_send_window; 
-
-  /**
-   * How much downstream capacity of this peer has been reserved for
-   * our traffic?  (Our clients can request that a certain amount of
-   * bandwidth is available for replies to them; this value is used to
-   * make sure that this reserved amount of bandwidth is actually
-   * available).
-   */
-  long long available_recv_window; 
+  struct GNUNET_BANDWIDTH_Tracker available_recv_window;
 
   /**
    * How valueable were the messages of this peer recently?
@@ -503,11 +546,6 @@ struct Neighbour
    */
   unsigned int last_packets_bitmap;
 
-  /**
-   * Number of messages in the message queue for this peer.
-   */
-  unsigned int message_queue_size;
-
   /**
    * last sequence number received on this connection (highest)
    */
@@ -521,26 +559,26 @@ struct Neighbour
   /**
    * Available bandwidth in for this peer (current target).
    */
-  uint32_t bpm_in;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
 
   /**
    * Available bandwidth out for this peer (current target).
    */
-  uint32_t bpm_out;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
 
   /**
-   * Internal bandwidth limit set for this peer (initially
-   * typically set to "-1").  "bpm_out" is MAX of
-   * "bpm_out_internal_limit" and "bpm_out_external_limit".
+   * Internal bandwidth limit set for this peer (initially typically
+   * set to "-1").  Actual "bw_out" is MIN of
+   * "bpm_out_internal_limit" and "bw_out_external_limit".
    */
-  uint32_t bpm_out_internal_limit;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
 
   /**
    * External bandwidth limit set for this peer by the
-   * peer that we are communicating with.  "bpm_out" is MAX of
-   * "bpm_out_internal_limit" and "bpm_out_external_limit".
+   * peer that we are communicating with.  "bw_out" is MIN of
+   * "bw_out_internal_limit" and "bw_out_external_limit".
    */
-  uint32_t bpm_out_external_limit;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
 
   /**
    * What was our PING challenge number (for this peer)?
@@ -561,6 +599,7 @@ struct Neighbour
    * Are we currently connected to this neighbour?
    */ 
   int is_connected;
+
 };
 
 
@@ -584,7 +623,7 @@ struct Client
    * about (with "tcnt" entries).  Allocated as part
    * of this client struct, do not free!
    */
-  uint16_t *types;
+  const uint16_t *types;
 
   /**
    * Options for messages this client cares about,
@@ -651,6 +690,11 @@ static struct GNUNET_SERVER_NotificationContext *notifier;
  */
 static struct Neighbour *neighbours;
 
+/**
+ * For creating statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
 /**
  * Sum of all preferences among all neighbours.
  */
@@ -662,14 +706,15 @@ static unsigned long long preference_sum;
 static unsigned int neighbour_count;
 
 /**
- * How much inbound bandwidth are we supposed to be using?
+ * How much inbound bandwidth are we supposed to be using per second?
+ * FIXME: this value is not used!
  */
-static unsigned long long bandwidth_target_in;
+static unsigned long long bandwidth_target_in_bps;
 
 /**
- * How much outbound bandwidth are we supposed to be using?
+ * How much outbound bandwidth are we supposed to be using per second?
  */
-static unsigned long long bandwidth_target_out;
+static unsigned long long bandwidth_target_out_bps;
 
 
 
@@ -698,34 +743,7 @@ update_preference_sum (unsigned long long inc)
       preference_sum += n->current_preference;
       n = n->next;
     }    
-}
-
-
-/**
- * Recalculate the number of bytes we expect to
- * receive or transmit in a given window.
- *
- * @param force force an update now (even if not much time has passed)
- * @param window pointer to the byte counter (updated)
- * @param ts pointer to the timestamp (updated)
- * @param bpm number of bytes per minute that should
- *        be added to the window.
- */
-static void
-update_window (int force,
-              long long *window,
-               struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
-{
-  struct GNUNET_TIME_Relative since;
-
-  since = GNUNET_TIME_absolute_get_duration (*ts);
-  if ( (force == GNUNET_NO) &&
-       (since.value < 60 * 1000) )
-    return;                     /* not even a minute has passed */
-  *ts = GNUNET_TIME_absolute_get ();
-  *window += (bpm * since.value) / 60 / 1000;
-  if (*window > MAX_WINDOW_TIME * bpm)
-    *window = MAX_WINDOW_TIME * bpm;
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), preference_sum, GNUNET_NO);
 }
 
 
@@ -794,7 +812,14 @@ send_to_all_clients (const struct GNUNET_MessageHeader *msg,
   while (c != NULL)
     {
       if (0 != (c->options & options))
-       send_to_client (c, msg, can_drop);
+       {
+#if DEBUG_CORE_CLIENT
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Sending message of type %u to client.\n",
+                     ntohs (msg->type));
+#endif
+         send_to_client (c, msg, can_drop);
+       }
       c = c->next;
     }
 }
@@ -813,8 +838,10 @@ handle_client_init (void *cls,
   struct Client *c;
   uint16_t msize;
   const uint16_t *types;
+  uint16_t *wtypes;
   struct Neighbour *n;
   struct ConnectNotifyMessage cnm;
+  unsigned int i;
 
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -848,10 +875,18 @@ handle_client_init (void *cls,
   c->client_handle = client;
   c->next = clients;
   clients = c;
-  memcpy (&c[1], types, msize);
-  c->types = (uint16_t *) & c[1];
-  c->options = ntohl (im->options);
   c->tcnt = msize / sizeof (uint16_t);
+  c->types = (const uint16_t *) &c[1];
+  wtypes = (uint16_t *) &c[1];
+  for (i=0;i<c->tcnt;i++)
+    wtypes[i] = ntohs (types[i]);
+  c->options = ntohl (im->options);
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Client %p is interested in %u message types\n",
+             c,
+             c->tcnt);
+#endif
   /* send init reply message */
   irm.header.size = htons (sizeof (struct InitReplyMessage));
   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
@@ -864,24 +899,27 @@ handle_client_init (void *cls,
               "Sending `%s' message to client.\n", "INIT_REPLY");
 #endif
   send_to_client (c, &irm.header, GNUNET_NO);
-  /* notify new client about existing neighbours */
-  cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
-  cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-  n = neighbours;
-  while (n != NULL)
+  if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
     {
-      if (n->status == PEER_STATE_KEY_CONFIRMED)
+      /* notify new client about existing neighbours */
+      cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+      cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+      n = neighbours;
+      while (n != NULL)
        {
+         if (n->status == PEER_STATE_KEY_CONFIRMED)
+           {
 #if DEBUG_CORE_CLIENT
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
 #endif
-         cnm.distance = htonl (n->last_distance);
-         cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
-         cnm.peer = n->peer;
-         send_to_client (c, &cnm.header, GNUNET_NO);
+             cnm.distance = htonl (n->last_distance);
+             cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
+             cnm.peer = n->peer;
+             send_to_client (c, &cnm.header, GNUNET_NO);
+           }
+         n = n->next;
        }
-      n = n->next;
     }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -903,7 +941,8 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
     return;
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client has disconnected from core service.\n");
+              "Client %p has disconnected from core service.\n",
+             client);
 #endif
   prev = NULL;
   pos = clients;
@@ -936,7 +975,8 @@ handle_client_request_info (void *cls,
   const struct RequestInfoMessage *rcm;
   struct Neighbour *n;
   struct ConfigurationInfoMessage cim;
-  int reserv;
+  int32_t want_reserv;
+  int32_t got_reserv;
   unsigned long long old_preference;
   struct GNUNET_SERVER_TransmitContext *tc;
 
@@ -949,27 +989,37 @@ handle_client_request_info (void *cls,
   memset (&cim, 0, sizeof (cim));
   if (n != NULL) 
     {
-      update_window (GNUNET_YES,
-                    &n->available_send_window,
-                    &n->last_asw_update,
-                    n->bpm_out);
-      n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
-      n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
-                               n->bpm_out_external_limit);
-      reserv = ntohl (rcm->reserve_inbound);
-      if (reserv < 0)
+      want_reserv = ntohl (rcm->reserve_inbound);
+      if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
+       {
+         n->bw_out_internal_limit = rcm->limit_outbound;
+         n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
+                                                 n->bw_out_external_limit);
+         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
+                                                n->bw_out);
+         GNUNET_TRANSPORT_set_quota (transport,
+                                     &n->peer,
+                                     n->bw_in,
+                                     n->bw_out,
+                                     GNUNET_TIME_UNIT_FOREVER_REL,
+                                     NULL, NULL); 
+       }
+      if (want_reserv < 0)
         {
-          n->available_recv_window += reserv;
+         got_reserv = want_reserv;
         }
-      else if (reserv > 0)
+      else if (want_reserv > 0)
         {
-          update_window (GNUNET_NO,
-                        &n->available_recv_window,
-                         &n->last_arw_update, n->bpm_in);
-          if (n->available_recv_window < reserv)
-            reserv = n->available_recv_window;
-          n->available_recv_window -= reserv;
+         if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
+                                                 want_reserv).value == 0)
+           got_reserv = want_reserv;
+         else
+            got_reserv = 0; /* all or nothing */
         }
+      else
+       got_reserv = 0;
+      GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
+                                       got_reserv);
       old_preference = n->current_preference;
       n->current_preference += GNUNET_ntohll(rcm->preference_change);
       if (old_preference > n->current_preference) 
@@ -978,9 +1028,16 @@ handle_client_request_info (void *cls,
          n->current_preference = (unsigned long long) -1;
        }
       update_preference_sum (n->current_preference - old_preference);
-      cim.reserved_amount = htonl (reserv);
-      cim.bpm_in = htonl (n->bpm_in);
-      cim.bpm_out = htonl (n->bpm_out);
+#if DEBUG_CORE_QUOTA
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
+                 (int) want_reserv,
+                 GNUNET_i2s (&rcm->peer),
+                 (int) got_reserv);
+#endif
+      cim.reserved_amount = htonl (got_reserv);
+      cim.bw_in = n->bw_in;
+      cim.bw_out = n->bw_out;
       cim.preference = n->current_preference;
     }
   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
@@ -1026,11 +1083,16 @@ free_neighbour (struct Neighbour *n)
     }
   while (NULL != (m = n->encrypted_head))
     {
-      n->encrypted_head = m->next;
+      GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+                                  n->encrypted_tail,
+                                  m);
       GNUNET_free (m);
     }
   if (NULL != n->th)
-    GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
+    {
+      GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
+      n->th = NULL;
+    }
   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
@@ -1039,6 +1101,10 @@ free_neighbour (struct Neighbour *n)
     GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
+  if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)    
+      GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+  if (n->status == PEER_STATE_KEY_CONFIRMED)
+    GNUNET_STATISTICS_update (stats, gettext_noop ("# peers connected"), -1, GNUNET_NO);
   GNUNET_free_non_null (n->public_key);
   GNUNET_free_non_null (n->pending_ping);
   GNUNET_free_non_null (n->pending_pong);
@@ -1046,6 +1112,54 @@ free_neighbour (struct Neighbour *n)
 }
 
 
+/**
+ * Check if we have encrypted messages for the specified neighbour
+ * pending, and if so, check with the transport about sending them
+ * out.
+ *
+ * @param n neighbour to check.
+ */
+static void process_encrypted_neighbour_queue (struct Neighbour *n);
+
+
+/**
+ * Encrypt size bytes from in and write the result to out.  Use the
+ * key for outbound traffic of the given neighbour.
+ *
+ * @param n neighbour we are sending to
+ * @param iv initialization vector to use
+ * @param in ciphertext
+ * @param out plaintext
+ * @param size size of in/out
+ * @return GNUNET_OK on success
+ */
+static int
+do_encrypt (struct Neighbour *n,
+            const GNUNET_HashCode * iv,
+            const void *in, void *out, size_t size)
+{
+  if (size != (uint16_t) size)
+    {
+      GNUNET_break (0);
+      return GNUNET_NO;
+    }
+  GNUNET_assert (size ==
+                 GNUNET_CRYPTO_aes_encrypt (in,
+                                            (uint16_t) size,
+                                            &n->encrypt_key,
+                                            (const struct
+                                             GNUNET_CRYPTO_AesInitializationVector
+                                             *) iv, out));
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes encrypted"), size, GNUNET_NO);
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Encrypted %u bytes for `%4s' using key %u\n", size,
+              GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
+#endif
+  return GNUNET_OK;
+}
+
+
 /**
  * Consider freeing the given neighbour since we may not need
  * to keep it around anymore.
@@ -1056,6 +1170,70 @@ static void
 consider_free_neighbour (struct Neighbour *n);
 
 
+/**
+ * Task triggered when a neighbour entry is about to time out 
+ * (and we should prevent this by sending a PING).
+ *
+ * @param cls the 'struct Neighbour'
+ * @param tc scheduler context (not used)
+ */
+static void
+send_keep_alive (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+  struct GNUNET_TIME_Relative retry;
+  struct GNUNET_TIME_Relative left;
+  struct MessageEntry *me;
+  struct PingMessage pp;
+  struct PingMessage *pm;
+
+  n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
+  /* send PING */
+  me = GNUNET_malloc (sizeof (struct MessageEntry) +
+                      sizeof (struct PingMessage));
+  me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
+  me->priority = PING_PRIORITY;
+  me->size = sizeof (struct PingMessage);
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
+  pm = (struct PingMessage *) &me[1];
+  pm->header.size = htons (sizeof (struct PingMessage));
+  pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
+  pp.challenge = htonl (n->ping_challenge);
+  pp.target = n->peer;
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Encrypting `%s' and `%s' messages for `%4s'.\n",
+              "SET_KEY", "PING", GNUNET_i2s (&n->peer));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
+              "PING",
+              GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
+#endif
+  do_encrypt (n,
+              &n->peer.hashPubKey,
+              &pp.challenge,
+              &pm->challenge,
+              sizeof (struct PingMessage) -
+              sizeof (struct GNUNET_MessageHeader));
+  process_encrypted_neighbour_queue (n);
+  /* reschedule PING job */
+  left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
+                                                                      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
+  retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
+                                   MIN_PING_FREQUENCY);
+  n->keep_alive_task 
+    = GNUNET_SCHEDULER_add_delayed (sched, 
+                                   retry,
+                                   &send_keep_alive,
+                                   n);
+
+}
+
+
 /**
  * Task triggered when a neighbour entry might have gotten stale.
  *
@@ -1067,6 +1245,7 @@ consider_free_task (void *cls,
                    const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Neighbour *n = cls;
+
   n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
   consider_free_neighbour (n);
 }
@@ -1092,7 +1271,7 @@ consider_free_neighbour (struct Neighbour *n)
     return; /* no chance */
   
   left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
-                                                                      MAX_PONG_DELAY));
+                                                                      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
   if (left.value > 0)
     {
       if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
@@ -1117,20 +1296,11 @@ consider_free_neighbour (struct Neighbour *n)
     prev->next = n->next;
   GNUNET_assert (neighbour_count > 0);
   neighbour_count--;
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
   free_neighbour (n);
 }
 
 
-/**
- * Check if we have encrypted messages for the specified neighbour
- * pending, and if so, check with the transport about sending them
- * out.
- *
- * @param n neighbour to check.
- */
-static void process_encrypted_neighbour_queue (struct Neighbour *n);
-
-
 /**
  * Function called when the transport service is ready to
  * receive an encrypted message for the respective peer
@@ -1150,9 +1320,9 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
 
   n->th = NULL;
   GNUNET_assert (NULL != (m = n->encrypted_head));
-  n->encrypted_head = m->next;
-  if (m->next == NULL)
-    n->encrypted_tail = NULL;
+  GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+                              n->encrypted_tail,
+                              m);
   ret = 0;
   cbuf = buf;
   if (buf != NULL)
@@ -1160,22 +1330,24 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
       GNUNET_assert (size >= m->size);
       memcpy (cbuf, &m[1], m->size);
       ret = m->size;
-      n->available_send_window -= m->size;
-      process_encrypted_neighbour_queue (n);
-
+      GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
+                                       m->size);
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                   ret, GNUNET_i2s (&n->peer));
 #endif
+      process_encrypted_neighbour_queue (n);
     }
   else
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Transmission of message of type %u and size %u failed\n",
                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                   m->size);
+#endif
     }
   GNUNET_free (m);
   consider_free_neighbour (n);
@@ -1207,7 +1379,8 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
  
   if (n->th != NULL)
     return;  /* request already pending */
-  if (n->encrypted_head == NULL)
+  m = n->encrypted_head;
+  if (m == NULL)
     {
       /* encrypted queue empty, try plaintext instead */
       process_plaintext_neighbour_queue (n);
@@ -1216,18 +1389,17 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
-              n->encrypted_head->size,
+              m->size,
               GNUNET_i2s (&n->peer),
-              GNUNET_TIME_absolute_get_remaining (n->
-                                                  encrypted_head->deadline).
+              GNUNET_TIME_absolute_get_remaining (m->deadline).
               value);
 #endif
   n->th =
     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
-                                            n->encrypted_head->size,
-                                           n->encrypted_head->priority,
+                                            m->size,
+                                           m->priority,
                                             GNUNET_TIME_absolute_get_remaining
-                                            (n->encrypted_head->deadline),
+                                            (m->deadline),
                                             &notify_encrypted_transmit_ready,
                                             n);
   if (n->th == NULL)
@@ -1235,10 +1407,9 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
       /* message request too large or duplicate request */
       GNUNET_break (0);
       /* discard encrypted message */
-      GNUNET_assert (NULL != (m = n->encrypted_head));
-      n->encrypted_head = m->next;
-      if (m->next == NULL)
-       n->encrypted_tail = NULL;
+      GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+                                  n->encrypted_tail,
+                                  m);
       GNUNET_free (m);
       process_encrypted_neighbour_queue (n);
     }
@@ -1284,6 +1455,7 @@ do_decrypt (struct Neighbour *n,
       GNUNET_break (0);
       return GNUNET_SYSERR;
     }
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes decrypted"), size, GNUNET_NO);
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted %u bytes from `%4s' using key %u\n",
@@ -1293,43 +1465,6 @@ do_decrypt (struct Neighbour *n,
 }
 
 
-/**
- * Encrypt size bytes from in and write the result to out.  Use the
- * key for outbound traffic of the given neighbour.
- *
- * @param n neighbour we are sending to
- * @param iv initialization vector to use
- * @param in ciphertext
- * @param out plaintext
- * @param size size of in/out
- * @return GNUNET_OK on success
- */
-static int
-do_encrypt (struct Neighbour *n,
-            const GNUNET_HashCode * iv,
-            const void *in, void *out, size_t size)
-{
-  if (size != (uint16_t) size)
-    {
-      GNUNET_break (0);
-      return GNUNET_NO;
-    }
-  GNUNET_assert (size ==
-                 GNUNET_CRYPTO_aes_encrypt (in,
-                                            (uint16_t) size,
-                                            &n->encrypt_key,
-                                            (const struct
-                                             GNUNET_CRYPTO_AesInitializationVector
-                                             *) iv, out));
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Encrypted %u bytes for `%4s' using key %u\n", size,
-              GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
-#endif
-  return GNUNET_OK;
-}
-
-
 /**
  * Select messages for transmission.  This heuristic uses a combination
  * of earliest deadline first (EDF) scheduling (with bounded horizon)
@@ -1357,10 +1492,12 @@ select_messages (struct Neighbour *n,
   unsigned int min_prio;
   struct GNUNET_TIME_Absolute t;
   struct GNUNET_TIME_Absolute now;
-  uint64_t delta;
+  struct GNUNET_TIME_Relative delta;
   uint64_t avail;
-  unsigned long long slack;     /* how long could we wait before missing deadlines? */
+  struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
   size_t off;
+  uint64_t tsize;
+  unsigned int queue_size;
   int discard_low_prio;
 
   GNUNET_assert (NULL != n->messages);
@@ -1370,6 +1507,15 @@ select_messages (struct Neighbour *n,
   /* should we remove the entry with the lowest
      priority from consideration for scheduling at the
      end of the loop? */
+  queue_size = 0;
+  tsize = 0;
+  pos = n->messages;
+  while (pos != NULL)
+    {
+      queue_size++;
+      tsize += pos->size;
+      pos = pos->next;
+    }
   discard_low_prio = GNUNET_YES;
   while (GNUNET_YES == discard_low_prio)
     {
@@ -1377,25 +1523,20 @@ select_messages (struct Neighbour *n,
       min_prio = -1;
       discard_low_prio = GNUNET_NO;
       /* calculate number of bytes available for transmission at time "t" */
-      update_window (GNUNET_NO,
-                    &n->available_send_window,
-                    &n->last_asw_update,
-                    n->bpm_out);
-      avail = n->available_send_window;
-      t = n->last_asw_update;
+      avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
+      t = now;
       /* how many bytes have we (hypothetically) scheduled so far */
       off = 0;
       /* maximum time we can wait before transmitting anything
          and still make all of our deadlines */
-      slack = -1;
-
+      slack = MAX_CORK_DELAY;
       pos = n->messages;
       /* note that we use "*2" here because we want to look
          a bit further into the future; much more makes no
          sense since new message might be scheduled in the
          meantime... */
       while ((pos != NULL) && (off < size * 2))
-        {
+        {         
           if (pos->do_transmit == GNUNET_YES)
             {
               /* already removed from consideration */
@@ -1404,14 +1545,17 @@ select_messages (struct Neighbour *n,
             }
           if (discard_low_prio == GNUNET_NO)
             {
-              delta = pos->deadline.value;
-              if (delta < t.value)
-                delta = 0;
-              else
-                delta = t.value - delta;
-              avail += delta * n->bpm_out / 1000 / 60;
+             delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
+             if (delta.value > 0)
+               {
+                 // FIXME: HUH? Check!
+                 t = pos->deadline;
+                 avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
+                                                                      delta);
+               }
               if (avail < pos->size)
                 {
+                 // FIXME: HUH? Check!
                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
                 }
               else
@@ -1420,23 +1564,33 @@ select_messages (struct Neighbour *n,
                   /* update slack, considering both its absolute deadline
                      and relative deadlines caused by other messages
                      with their respective load */
-                  slack = GNUNET_MIN (slack, avail / n->bpm_out);
-                  if ( (pos->deadline.value < now.value) ||
-                      (GNUNET_YES == pos->got_slack) )                
+                  slack = GNUNET_TIME_relative_min (slack,
+                                                   GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
+                                                                                         avail));
+                  if (pos->deadline.value <= now.value) 
                    {
-                     slack = 0;
+                     /* now or never */
+                     slack = GNUNET_TIME_UNIT_ZERO;
+                   }
+                 else if (GNUNET_YES == pos->got_slack)
+                   {
+                     /* should be soon now! */
+                     slack = GNUNET_TIME_relative_min (slack,
+                                                       GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
                    }
                   else
                    {
                      slack =
-                       GNUNET_MIN (slack, pos->deadline.value - now.value);
+                       GNUNET_TIME_relative_min (slack, 
+                                                 GNUNET_TIME_absolute_get_difference (now, pos->deadline));
                      pos->got_slack = GNUNET_YES;
+                     pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
+                                                                     GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
                    }
                 }
             }
-
           off += pos->size;
-          t.value = GNUNET_MAX (pos->deadline.value, t.value);
+          t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
           if (pos->priority <= min_prio)
             {
               /* update min for discard */
@@ -1455,13 +1609,15 @@ select_messages (struct Neighbour *n,
     }
   /* guard against sending "tiny" messages with large headers without
      urgent deadlines */
-  if ( (slack > 1000) && (size > 4 * off) )
+  if ( (slack.value > 0) && 
+       (size > 4 * off) &&
+       (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
     {
       /* less than 25% of message would be filled with deadlines still
          being met if we delay by one second or more; so just wait for
          more data; but do not wait longer than 1s (since we don't want
         to delay messages for a really long time either). */
-      retry_time->value = 1000;
+      *retry_time = MAX_CORK_DELAY;
       /* reset do_transmit values for next time */
       while (pos != last)
         {
@@ -1470,7 +1626,10 @@ select_messages (struct Neighbour *n,
         }
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Deferring transmission for 1s due to underfull message buffer size\n");
+                 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
+                 (unsigned long long) slack.value,
+                 (unsigned int) off,
+                 (unsigned int) size);
 #endif
       return 0;
     }
@@ -1491,8 +1650,10 @@ select_messages (struct Neighbour *n,
     }
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
-              off, GNUNET_i2s (&n->peer));
+              "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
+              off, tsize,
+             queue_size, MAX_PEER_QUEUE_SIZE,
+             GNUNET_i2s (&n->peer));
 #endif
   return off;
 }
@@ -1531,9 +1692,11 @@ batch_message (struct Neighbour *n,
   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
   if (0 == select_messages (n, size, retry_time))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "No messages selected, will try again in %llu ms\n",
                   retry_time->value);
+#endif
       return 0;
     }
   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
@@ -1587,7 +1750,8 @@ batch_message (struct Neighbour *n,
           *priority += pos->priority;
 #if DEBUG_CORE
          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Adding plaintext message with deadline %llu ms to batch\n",
+                     "Adding plaintext message of size %u with deadline %llu ms to batch\n",
+                     pos->size,
                      GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
 #endif
           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
@@ -1687,6 +1851,11 @@ set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Neighbour *n = cls;
 
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrying key transmission to `%4s'\n",
+             GNUNET_i2s (&n->peer));
+#endif
   n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
   n->set_key_retry_frequency =
     GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
@@ -1713,6 +1882,7 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   unsigned int priority;
   struct GNUNET_TIME_Absolute deadline;
   struct GNUNET_TIME_Relative retry_time;
+  GNUNET_HashCode iv;
 
   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
     {
@@ -1799,8 +1969,15 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
                                       &retry_plaintext_processing, n);
       return;
     }
+#if DEBUG_CORE_QUOTA
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending %u b/s as new limit to peer `%4s'\n",
+             (unsigned int) ntohl (n->bw_in.value__),
+             GNUNET_i2s (&n->peer));
+#endif
+  ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
   ph->sequence_number = htonl (++n->last_sequence_number_sent);
-  ph->inbound_bpm_limit = htonl (n->bpm_in);
+  ph->inbound_bw_limit = n->bw_in;
   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
 
   /* setup encryption message header */
@@ -1811,9 +1988,12 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   em = (struct EncryptedMessage *) &me[1];
   em->header.size = htons (used);
   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
-  em->reserved = htonl (0);
+  em->iv_seed = ph->iv_seed;
   esize = used - ENCRYPTED_HEADER_SIZE;
-  GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
+  GNUNET_CRYPTO_hash (&ph->sequence_number,
+                     esize - sizeof (GNUNET_HashCode), 
+                     &ph->plaintext_hash);
+  GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
   /* encrypt */
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1824,15 +2004,14 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
 #endif
   GNUNET_assert (GNUNET_OK ==
                  do_encrypt (n,
-                             &em->plaintext_hash,
-                             &ph->sequence_number,
-                             &em->sequence_number, esize));
+                             &iv,
+                             &ph->plaintext_hash,
+                             &em->plaintext_hash, esize));
   /* append to transmission list */
-  if (n->encrypted_tail == NULL)
-    n->encrypted_head = me;
-  else
-    n->encrypted_tail->next = me;
-  n->encrypted_tail = me;
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   process_encrypted_neighbour_queue (n);
 }
 
@@ -1883,26 +2062,24 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid)
   n->next = neighbours;
   neighbours = n;
   neighbour_count++;
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
   n->peer = *pid;
   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
   now = GNUNET_TIME_absolute_get ();
   n->encrypt_key_created = now;
   n->last_activity = now;
   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
-  n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
-  n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
-  n->bpm_out_internal_limit = (uint32_t) - 1;
-  n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
+  n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+  n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+  n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
+  n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                 (uint32_t) - 1);
-  schedule_quota_update (n);
+  neighbour_quota_update (n, NULL);
   return n;
 }
 
 
-
-
-
 /**
  * Handle CORE_SEND request.
  *
@@ -1916,7 +2093,6 @@ handle_client_send (void *cls,
                     const struct GNUNET_MessageHeader *message)
 {
   const struct SendMessage *sm;
-  const struct GNUNET_MessageHeader *mh;
   struct Neighbour *n;
   struct MessageEntry *prev;
   struct MessageEntry *pos;
@@ -1938,14 +2114,6 @@ handle_client_send (void *cls,
     }
   sm = (const struct SendMessage *) message;
   msize -= sizeof (struct SendMessage);
-  mh = (const struct GNUNET_MessageHeader *) &sm[1];
-  if (msize != ntohs (mh->size))
-    {
-      GNUNET_break (0);
-      if (client != NULL)
-        GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-      return;
-    }
   n = find_neighbour (&sm->peer);
   if (n == NULL)
     n = create_neighbour (&sm->peer);
@@ -1984,7 +2152,11 @@ handle_client_send (void *cls,
          /* discard new entry */
 #if DEBUG_CORE
          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Queue full, discarding new request\n");
+                     "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
+                     queue_size,
+                     MAX_PEER_QUEUE_SIZE,
+                     msize,
+                     ntohs (message->type));
 #endif
          if (client != NULL)
            GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -2004,14 +2176,15 @@ handle_client_send (void *cls,
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Adding transmission request for `%4s' to queue\n",
-             GNUNET_i2s (&sm->peer));
+             "Adding transmission request for `%4s' of size %u to queue\n",
+             GNUNET_i2s (&sm->peer),
+             msize);
 #endif  
   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
   e->priority = ntohl (sm->priority);
   e->size = msize;
-  memcpy (&e[1], mh, msize);
+  memcpy (&e[1], &sm[1], msize);
 
   /* insert, keep list sorted by deadline */
   prev = NULL;
@@ -2034,6 +2207,33 @@ handle_client_send (void *cls,
 }
 
 
+/**
+ * Function called when the transport service is ready to
+ * receive a message.  Only resets 'n->th' to NULL.
+ *
+ * @param cls neighbour to use message from
+ * @param size number of bytes we can transmit
+ * @param buf where to copy the message
+ * @return number of bytes transmitted
+ */
+static size_t
+notify_transport_connect_done (void *cls, size_t size, void *buf)
+{
+  struct Neighbour *n = cls;
+
+  n->th = NULL;
+  if (buf == NULL)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Failed to connect to `%4s': transport failed to connect\n"),
+                 GNUNET_i2s (&n->peer));
+      return 0;
+    }
+  send_key (n);
+  return 0;
+}
+
+
 /**
  * Handle CORE_REQUEST_CONNECT request.
  *
@@ -2048,7 +2248,14 @@ handle_client_request_connect (void *cls,
 {
   const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
   struct Neighbour *n;
+  struct GNUNET_TIME_Relative timeout;
 
+  if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   n = find_neighbour (&cm->peer);
   if (n == NULL)
@@ -2056,20 +2263,22 @@ handle_client_request_connect (void *cls,
   if ( (n->is_connected) ||
        (n->th != NULL) )
     return; /* already connected, or at least trying */
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# connection requests received"), 1, GNUNET_NO);
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Core received `%s' request for `%4s', will try to establish connection\n",
              "REQUEST_CONNECT",
              GNUNET_i2s (&cm->peer));
 #endif
+  timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
   /* ask transport to connect to the peer */
-  /* FIXME: timeout zero OK? */
   n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
                                                  &cm->peer,
-                                                 0, 0,
-                                                 GNUNET_TIME_UNIT_ZERO,
-                                                 NULL,
-                                                 NULL);
+                                                 sizeof (struct GNUNET_MessageHeader), 0,
+                                                 timeout,
+                                                 &notify_transport_connect_done,
+                                                 n);
+  GNUNET_break (NULL != n->th);
 }
 
 
@@ -2119,10 +2328,18 @@ process_hello_retry_send_key (void *cls,
       n->pitr = NULL;
       if (n->public_key != NULL)
        {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# SETKEY messages deferred (need public key)"), 
+                                   -1, 
+                                   GNUNET_NO);
          send_key (n);
        }
       else
        {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# Delayed connecting due to lack of public key"),
+                                   1,
+                                   GNUNET_NO);      
          if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
            n->retry_set_key_task
              = GNUNET_SCHEDULER_add_delayed (sched,
@@ -2157,6 +2374,10 @@ process_hello_retry_send_key (void *cls,
     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
     {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# Error extracting public key from HELLO"),
+                               1,
+                               GNUNET_NO);      
       GNUNET_free (n->public_key);
       n->public_key = NULL;
 #if DEBUG_CORE
@@ -2220,11 +2441,10 @@ send_key (struct Neighbour *n)
   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
   me->priority = SET_KEY_PRIORITY;
   me->size = sizeof (struct SetKeyMessage);
-  if (n->encrypted_head == NULL)
-    n->encrypted_head = me;
-  else
-    n->encrypted_tail->next = me;
-  n->encrypted_tail = me;
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   sm = (struct SetKeyMessage *) &me[1];
   sm->header.size = htons (sizeof (struct SetKeyMessage));
   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
@@ -2254,8 +2474,10 @@ send_key (struct Neighbour *n)
   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
   me->priority = PING_PRIORITY;
   me->size = sizeof (struct PingMessage);
-  n->encrypted_tail->next = me;
-  n->encrypted_tail = me;
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   pm = (struct PingMessage *) &me[1];
   pm->header.size = htons (sizeof (struct PingMessage));
   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
@@ -2377,7 +2599,8 @@ static void
 handle_ping (struct Neighbour *n, const struct PingMessage *m)
 {
   struct PingMessage t;
-  struct PingMessage *tp;
+  struct PongMessage tx;
+  struct PongMessage *tp;
   struct MessageEntry *me;
 
 #if DEBUG_CORE
@@ -2403,6 +2626,7 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
               "Target of `%s' request is `%4s'.\n",
               "PING", GNUNET_i2s (&t.target));
 #endif
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# ping messages decrypted"), 1, GNUNET_NO);
   if (0 != memcmp (&t.target,
                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
     {
@@ -2410,25 +2634,26 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
       return;
     }
   me = GNUNET_malloc (sizeof (struct MessageEntry) +
-                      sizeof (struct PingMessage));
-  if (n->encrypted_tail != NULL)
-    n->encrypted_tail->next = me;
-  else
-    {
-      n->encrypted_tail = me;
-      n->encrypted_head = me;
-    }
+                      sizeof (struct PongMessage));
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
   me->priority = PONG_PRIORITY;
-  me->size = sizeof (struct PingMessage);
-  tp = (struct PingMessage *) &me[1];
+  me->size = sizeof (struct PongMessage);
+  tx.reserved = htonl (0);
+  tx.inbound_bw_limit = n->bw_in;
+  tx.challenge = t.challenge;
+  tx.target = t.target;
+  tp = (struct PongMessage *) &me[1];
   tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
-  tp->header.size = htons (sizeof (struct PingMessage));
+  tp->header.size = htons (sizeof (struct PongMessage));
   do_encrypt (n,
               &my_identity.hashPubKey,
-              &t.challenge,
+              &tx.challenge,
               &tp->challenge,
-              sizeof (struct PingMessage) -
+              sizeof (struct PongMessage) -
               sizeof (struct GNUNET_MessageHeader));
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2447,9 +2672,10 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
  * @param m the encrypted PONG message itself
  */
 static void
-handle_pong (struct Neighbour *n, const struct PingMessage *m)
+handle_pong (struct Neighbour *n, 
+            const struct PongMessage *m)
 {
-  struct PingMessage t;
+  struct PongMessage t;
   struct ConnectNotifyMessage cnm;
 
 #if DEBUG_CORE
@@ -2462,9 +2688,14 @@ handle_pong (struct Neighbour *n, const struct PingMessage *m)
                   &n->peer.hashPubKey,
                   &m->challenge,
                   &t.challenge,
-                  sizeof (struct PingMessage) -
+                  sizeof (struct PongMessage) -
                   sizeof (struct GNUNET_MessageHeader)))
     return;
+  if (0 != ntohl (t.reserved))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
@@ -2499,11 +2730,25 @@ handle_pong (struct Neighbour *n, const struct PingMessage *m)
       return;
     case PEER_STATE_KEY_RECEIVED:
       n->status = PEER_STATE_KEY_CONFIRMED;
+      if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
+       {
+         n->bw_out_external_limit = t.inbound_bw_limit;
+         n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+                                                 n->bw_out_internal_limit);
+         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+                                                n->bw_out);       
+         GNUNET_TRANSPORT_set_quota (transport,
+                                     &n->peer,
+                                     n->bw_in,
+                                     n->bw_out,
+                                     GNUNET_TIME_UNIT_FOREVER_REL,
+                                     NULL, NULL); 
+       }
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Confirmed key via `%s' message for peer `%4s'\n",
                   "PONG", GNUNET_i2s (&n->peer));
-#endif
+#endif      
       if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
         {
           GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
@@ -2516,9 +2761,16 @@ handle_pong (struct Neighbour *n, const struct PingMessage *m)
       cnm.peer = n->peer;
       send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
       process_encrypted_neighbour_queue (n);
-      break;
+      /* fall-through! */
     case PEER_STATE_KEY_CONFIRMED:
-      /* duplicate PONG? */
+      n->last_activity = GNUNET_TIME_absolute_get ();
+      if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+      n->keep_alive_task 
+       = GNUNET_SCHEDULER_add_delayed (sched, 
+                                       GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+                                       &send_keep_alive,
+                                       n);
       break;
     default:
       GNUNET_break (0);
@@ -2541,7 +2793,7 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
   struct GNUNET_TIME_Absolute t;
   struct GNUNET_CRYPTO_AesSessionKey k;
   struct PingMessage *ping;
-  struct PingMessage *pong;
+  struct PongMessage *pong;
   enum PeerStateMachine sender_status;
 
 #if DEBUG_CORE
@@ -2575,6 +2827,7 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
                                         0,
                                         GNUNET_TIME_UNIT_MINUTES,
                                         &process_hello_retry_handle_set_key, n);
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# SETKEY messages deferred (need public key)"), 1, GNUNET_NO);
       return;
     }
   if (0 != memcmp (&m->target,
@@ -2582,8 +2835,9 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
                   sizeof (struct GNUNET_PeerIdentity)))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Received `%s' message that was not for me.  Ignoring.\n"),
-                 "SET_KEY");
+                 _("Received `%s' message that was for `%s', not for me.  Ignoring.\n"),
+                 "SET_KEY",
+                 GNUNET_i2s (&m->target));
       return;
     }
   if ((ntohl (m->purpose.size) !=
@@ -2611,7 +2865,8 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
       return;
     }
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
+             "Decrypting key material.\n");
 #endif  
   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
                                   &m->encrypted_key,
@@ -2739,14 +2994,16 @@ deliver_message (struct Neighbour *sender,
   uint16_t type;
   unsigned int tpos;
   int deliver_full;
+  int dropped;
 
   type = ntohs (m->type);
-#if DEBUG_HANDSHAKE
-  fprintf (stderr,
-          "Received encapsulated message of type %u from `%4s'\n",
-          type,
-          GNUNET_i2s (&sender->peer));
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received encapsulated message of type %u from `%4s'\n",
+             type,
+             GNUNET_i2s (&sender->peer));
 #endif
+  dropped = GNUNET_YES;
   cpos = clients;
   while (cpos != NULL)
     {
@@ -2764,12 +3021,27 @@ deliver_message (struct Neighbour *sender,
             }
         }
       if (GNUNET_YES == deliver_full)
-        send_p2p_message_to_client (sender, cpos, m, msize);
+       {
+         send_p2p_message_to_client (sender, cpos, m, msize);
+         dropped = GNUNET_NO;
+       }
       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
-        send_p2p_message_to_client (sender, cpos, m,
-                                    sizeof (struct GNUNET_MessageHeader));
+       {
+         send_p2p_message_to_client (sender, cpos, m,
+                                     sizeof (struct GNUNET_MessageHeader));
+       }
       cpos = cpos->next;
     }
+  if (dropped == GNUNET_YES)
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Message of type %u from `%4s' not delivered to any client.\n",
+                 type,
+                 GNUNET_i2s (&sender->peer));
+#endif
+      /* FIXME: stats... */
+    }
 }
 
 
@@ -2860,25 +3132,30 @@ handle_encrypted_message (struct Neighbour *n,
   size_t off;
   uint32_t snum;
   struct GNUNET_TIME_Absolute t;
+  GNUNET_HashCode iv;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
 #endif  
+  GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
   /* decrypt */
   if (GNUNET_OK !=
       do_decrypt (n,
+                  &iv,
                   &m->plaintext_hash,
-                  &m->sequence_number,
-                  &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
+                  &buf[ENCRYPTED_HEADER_SIZE], 
+                 size - ENCRYPTED_HEADER_SIZE))
     return;
   pt = (struct EncryptedMessage *) buf;
 
   /* validate hash */
   GNUNET_CRYPTO_hash (&pt->sequence_number,
-                      size - ENCRYPTED_HEADER_SIZE, &ph);
-  if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
+                      size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
+  if (0 != memcmp (&ph, 
+                  &pt->plaintext_hash, 
+                  sizeof (GNUNET_HashCode)))
     {
       /* checksum failed */
       GNUNET_break_op (0);
@@ -2933,14 +3210,34 @@ handle_encrypted_message (struct Neighbour *n,
     }
 
   /* process decrypted message(s) */
-  update_window (GNUNET_YES,
-                &n->available_send_window,
-                &n->last_asw_update,
-                n->bpm_out);
-  n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
-  n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
-                           n->bpm_out_internal_limit);
+  if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
+    {
+#if DEBUG_CORE_SET_QUOTA
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Received %u b/s as new inbound limit for peer `%4s'\n",
+                 (unsigned int) ntohl (pt->inbound_bw_limit.value__),
+                 GNUNET_i2s (&n->peer));
+#endif
+      n->bw_out_external_limit = pt->inbound_bw_limit;
+      n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+                                             n->bw_out_internal_limit);
+      GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+                                            n->bw_out);
+      GNUNET_TRANSPORT_set_quota (transport,
+                                 &n->peer,
+                                 n->bw_in,
+                                 n->bw_out,
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 NULL, NULL); 
+    }
   n->last_activity = GNUNET_TIME_absolute_get ();
+  if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+  n->keep_alive_task 
+    = GNUNET_SCHEDULER_add_delayed (sched, 
+                                   GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+                                   &send_keep_alive,
+                                   n);
   off = sizeof (struct EncryptedMessage);
   deliver_messages (n, buf, size, off);
 }
@@ -2976,11 +3273,9 @@ handle_transport_receive (void *cls,
 #endif
   n = find_neighbour (peer);
   if (n == NULL)
-    {
-      GNUNET_break (0);
-      return;
-    }
-  GNUNET_break (n->is_connected);
+    n = create_neighbour (peer);
+  if (n == NULL)
+    return;   
   n->last_latency = latency;
   n->last_distance = distance;
   up = (n->status == PEER_STATE_KEY_CONFIRMED);
@@ -3000,6 +3295,7 @@ handle_transport_receive (void *cls,
           GNUNET_break_op (0);
           return;
         }
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# session keys received"), 1, GNUNET_NO);
       handle_set_key (n, (const struct SetKeyMessage *) message);
       break;
     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
@@ -3013,6 +3309,13 @@ handle_transport_receive (void *cls,
           (n->status != PEER_STATE_KEY_CONFIRMED))
         {
           GNUNET_break_op (0);
+         /* blacklist briefly (?); might help recover (?) */
+         GNUNET_TRANSPORT_blacklist (sched, cfg,
+                                     &n->peer, 
+                                     GNUNET_TIME_UNIT_SECONDS,
+                                     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                                    5),
+                                     NULL, NULL);
           return;
         }
       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
@@ -3023,6 +3326,7 @@ handle_transport_receive (void *cls,
           GNUNET_break_op (0);
           return;
         }
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# ping messages received"), 1, GNUNET_NO);
       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
           (n->status != PEER_STATE_KEY_CONFIRMED))
         {
@@ -3039,11 +3343,12 @@ handle_transport_receive (void *cls,
       handle_ping (n, (const struct PingMessage *) message);
       break;
     case GNUNET_MESSAGE_TYPE_CORE_PONG:
-      if (size != sizeof (struct PingMessage))
+      if (size != sizeof (struct PongMessage))
         {
           GNUNET_break_op (0);
           return;
         }
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# pong messages received"), 1, GNUNET_NO);
       if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
           (n->status != PEER_STATE_KEY_CONFIRMED) )
         {
@@ -3053,11 +3358,11 @@ handle_transport_receive (void *cls,
                       "PONG", GNUNET_i2s (&n->peer));
 #endif
           GNUNET_free_non_null (n->pending_pong);
-          n->pending_pong = GNUNET_malloc (sizeof (struct PingMessage));
-          memcpy (n->pending_pong, message, sizeof (struct PingMessage));
+          n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
+          memcpy (n->pending_pong, message, sizeof (struct PongMessage));
           return;
         }
-      handle_pong (n, (const struct PingMessage *) message);
+      handle_pong (n, (const struct PongMessage *) message);
       break;
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -3069,7 +3374,17 @@ handle_transport_receive (void *cls,
       now = GNUNET_TIME_absolute_get ();
       n->last_activity = now;
       if (!up)
-        n->time_established = now;
+       {
+         GNUNET_STATISTICS_update (stats, gettext_noop ("# peers connected"), 1, GNUNET_NO);
+         n->time_established = now;
+       }
+      if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+      n->keep_alive_task 
+       = GNUNET_SCHEDULER_add_delayed (sched, 
+                                       GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+                                       &send_keep_alive,
+                                       n);
     }
 }
 
@@ -3086,41 +3401,65 @@ neighbour_quota_update (void *cls,
                        const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Neighbour *n = cls;
-  uint32_t q_in;
+  struct GNUNET_BANDWIDTH_Value32NBO q_in;
   double pref_rel;
   double share;
   unsigned long long distributable;
+  uint64_t need_per_peer;
+  uint64_t need_per_second;
   
   n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
   /* calculate relative preference among all neighbours;
      divides by a bit more to avoid division by zero AND to
      account for possibility of new neighbours joining any time 
      AND to convert to double... */
-  pref_rel = n->current_preference / (1.0 + preference_sum);
+  if (preference_sum == 0)
+    {
+      pref_rel = 1.0 / (double) neighbour_count;
+    }
+  else
+    {
+      pref_rel = n->current_preference / preference_sum;
+    }
+  need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
+                                                             GNUNET_TIME_UNIT_SECONDS);  
+  need_per_second = need_per_peer * neighbour_count;
   distributable = 0;
-  if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
-    distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
+  if (bandwidth_target_out_bps > need_per_second)
+    distributable = bandwidth_target_out_bps - need_per_second;
   share = distributable * pref_rel;
-  q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
+  if (share + need_per_peer > ( (uint32_t)-1))
+    q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
+  else
+    q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
   /* check if we want to disconnect for good due to inactivity */
   if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
        (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
     {
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Forcing disconnect of `%4s' due to inactivity (?).\n",
-              GNUNET_i2s (&n->peer));
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Forcing disconnect of `%4s' due to inactivity (?).\n",
+                 GNUNET_i2s (&n->peer));
 #endif
-      q_in = 0; /* force disconnect */
+      q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
     }
-  if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
-       (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
+#if DEBUG_CORE_QUOTA
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
+             GNUNET_i2s (&n->peer),
+             (unsigned int) ntohl (q_in.value__),
+             bandwidth_target_out_bps,
+             (unsigned int) ntohl (n->bw_in.value__),
+             (unsigned int) ntohl (n->bw_out.value__),
+             (unsigned int) ntohl (n->bw_out_internal_limit.value__));
+#endif
+  if (n->bw_in.value__ != q_in.value__) 
     {
-      n->bpm_in = q_in;
+      n->bw_in = q_in;
       GNUNET_TRANSPORT_set_quota (transport,
                                  &n->peer,
-                                 n->bpm_in, 
-                                 n->bpm_out,
+                                 n->bw_in,
+                                 n->bw_out,
                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                  NULL, NULL);
     }
@@ -3144,7 +3483,6 @@ handle_transport_notify_connect (void *cls,
                                 unsigned int distance)
 {
   struct Neighbour *n;
-  struct GNUNET_TIME_Absolute now;
   struct ConnectNotifyMessage cnm;
 
   n = find_neighbour (peer);
@@ -3161,12 +3499,15 @@ handle_transport_notify_connect (void *cls,
     {
       n = create_neighbour (peer);
     }
-  now = GNUNET_TIME_absolute_get ();
   n->is_connected = GNUNET_YES;      
   n->last_latency = latency;
   n->last_distance = distance;
-  n->last_asw_update = now;
-  n->last_arw_update = now;
+  GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
+                                n->bw_out,
+                                MAX_WINDOW_TIME_S);
+  GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
+                                n->bw_in,
+                                MAX_WINDOW_TIME_S);  
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received connection from `%4s'.\n",
@@ -3178,7 +3519,13 @@ handle_transport_notify_connect (void *cls,
   cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
   cnm.peer = *peer;
   send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
-  send_key (n);
+  GNUNET_TRANSPORT_set_quota (transport,
+                             &n->peer,
+                             n->bw_in,
+                             n->bw_out,
+                             GNUNET_TIME_UNIT_FOREVER_REL,
+                             NULL, NULL);
+  send_key (n); 
 }
 
 
@@ -3201,6 +3548,11 @@ handle_transport_notify_disconnect (void *cls,
               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
 #endif
   n = find_neighbour (peer);
+  if (n == NULL)
+    {
+      GNUNET_break (0);
+      return;
+    }
   GNUNET_break (n->is_connected);
   cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
@@ -3234,12 +3586,15 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       neighbour_count--;
       free_neighbour (n);
     }
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
   GNUNET_SERVER_notification_context_destroy (notifier);
   notifier = NULL;
   while (NULL != (c = clients))
     handle_client_disconnect (NULL, c->client_handle);
   if (my_private_key != NULL)
     GNUNET_CRYPTO_rsa_key_free (my_private_key);
+  if (stats != NULL)
+    GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
 }
 
 
@@ -3257,11 +3612,6 @@ run (void *cls,
      struct GNUNET_SERVER_Handle *serv,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
-#if 0
-  unsigned long long qin;
-  unsigned long long qout;
-  unsigned long long tneigh;
-#endif
   char *keyfile;
 
   sched = s;
@@ -3272,12 +3622,12 @@ run (void *cls,
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
                                                "TOTAL_QUOTA_IN",
-                                               &bandwidth_target_in)) ||
+                                               &bandwidth_target_in_bps)) ||
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
                                                "TOTAL_QUOTA_OUT",
-                                               &bandwidth_target_out)) ||
+                                               &bandwidth_target_out_bps)) ||
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_filename (c,
                                                  "GNUNETD",
@@ -3314,6 +3664,7 @@ run (void *cls,
                                         &handle_transport_notify_connect,
                                         &handle_transport_notify_disconnect);
   GNUNET_assert (NULL != transport);
+  stats = GNUNET_STATISTICS_create (sched, "core", cfg);
   GNUNET_SCHEDULER_add_delayed (sched,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &cleaning_task, NULL);