more stats
[oweals/gnunet.git] / src / core / gnunet-service-core.c
index 5996a81bfb55d8bbd1ebd2a39745939f3f5d15c1..8bf6f0798914d92a6ab7a2b5abd7028ef514f344 100644 (file)
  * @brief high-level P2P messaging
  * @author Christian Grothoff
  *
- * POST-TESTING:
- * - revisit API (which arguments are used, needed)?
- * - add code to re-transmit key if first attempt failed
- *   + timeout on connect / key exchange, etc.
- *   + timeout for automatic re-try, etc.
- * - add code to give up re-transmission of key if many attempts fail
- * - add code to send PINGs if we are about to time-out otherwise
- * ? add heuristic to do another send_key in "handle_set_key"
- *   in case previous attempt failed / didn't work / persist
- *   (but don't do it always to avoid storm of SET_KEY's going
- *   back and forth!) --- alternatively, add "status" field
- *   of the other peer to the set key message, that way we'd
- *   know for sure!
+ * Considerations for later:
  * - check that hostkey used by transport (for HELLOs) is the
  *   same as the hostkey that we are using!
- * - topology management:
- *   + bootstrapping (transport offer hello, plugins)
- *   + internal neighbour selection
- *   + update bandwidth usage statistics
- *   + bandwidth allocation (transport set quota)
+ * - add code to send PINGs if we are about to time-out otherwise
  * - optimize lookup (many O(n) list traversals
  *   could ideally be changed to O(1) hash map lookups)
  */
 #include "platform.h"
+#include "gnunet_constants.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
+#include "gnunet_statistics_service.h"
 #include "gnunet_transport_service.h"
 #include "core.h"
 
 
+#define DEBUG_HANDSHAKE GNUNET_NO
+
+#define DEBUG_CORE_QUOTA GNUNET_NO
+
 /**
  * Receive and send buffer windows grow over time.  For
  * how long can 'unused' bandwidth accumulate before we
- * need to cap it?  (specified in ms).
+ * need to cap it?  (specified in seconds).
  */
-#define MAX_WINDOW_TIME (5 * 60 * 1000)
-
+#define MAX_WINDOW_TIME_S (5 * 60)
 
 /**
- * Amount of bytes per minute (in/out) to assume initially
- * (before either peer has communicated any particular
- * preference).  Should be rather low.
+ * How many messages do we queue up at most for optional
+ * notifications to a client?  (this can cause notifications
+ * about outgoing messages to be dropped).
  */
-#define DEFAULT_BPM_IN_OUT 2048
+#define MAX_NOTIFY_QUEUE 16
 
+/**
+ * Minimum bandwidth (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BW_IN_OUT make no
+ * sense.
+ */
+#define MIN_BANDWIDTH_PER_PEER GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT
 
 /**
  * After how much time past the "official" expiration time do
  */
 #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
 
+/**
+ * How long do we delay messages to get larger packet sizes (CORKing)?
+ */
+#define MAX_CORK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
 
 /**
  * What is the maximum delay for a SET_KEY message?
  */
-#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
-
+#define MAX_SET_KEY_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
  * What how long do we wait for SET_KEY confirmation initially?
  */
-#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
-
+#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 3)
 
 /**
  * What is the maximum delay for a PING message?
  */
-#define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
-
+#define MAX_PING_DELAY GNUNET_TIME_relative_multiply (MAX_SET_KEY_DELAY, 2)
 
 /**
  * What is the maximum delay for a PONG message?
  */
-#define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
+#define MAX_PONG_DELAY GNUNET_TIME_relative_multiply (MAX_PING_DELAY, 2)
 
+/**
+ * What is the minimum frequency for a PING message?
+ */
+#define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
+/**
+ * How often do we recalculate bandwidth quotas?
+ */
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
  * What is the priority for a SET_KEY message?
  */
 #define SET_KEY_PRIORITY 0xFFFFFF
 
-
 /**
  * What is the priority for a PING message?
  */
 #define PING_PRIORITY 0xFFFFFF
 
-
 /**
  * What is the priority for a PONG message?
  */
 #define PONG_PRIORITY 0xFFFFFF
 
-
 /**
- * How many messages do we queue per peer at most?
+ * How many messages do we queue per peer at most?  Must be at
+ * least two.
  */
 #define MAX_PEER_QUEUE_SIZE 16
 
-
 /**
  * How many non-mandatory messages do we queue per client at most?
  */
 #define MAX_CLIENT_QUEUE_SIZE 32
 
-
 /**
  * What is the maximum age of a message for us to consider
  * processing it?  Note that this looks at the timestamp used
  */
 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
 
-
 /**
  * What is the maximum size for encrypted messages?  Note that this
  * number imposes a clear limit on the maximum size of any message.
@@ -177,7 +177,8 @@ enum PeerStateMachine
  * Number of bytes (at the beginning) of "struct EncryptedMessage"
  * that are NOT encrypted.
  */
-#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
+#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t))
+
 
 /**
  * Encapsulation for encrypted messages exchanged between
@@ -191,15 +192,15 @@ struct EncryptedMessage
   struct GNUNET_MessageHeader header;
 
   /**
-   * Always zero.
+   * Random value used for IV generation.  ENCRYPTED_HEADER_SIZE must
+   * be set to the offset of the *next* field.
    */
-  uint32_t reserved GNUNET_PACKED;
+  uint32_t iv_seed GNUNET_PACKED;
 
   /**
-   * Hash of the plaintext, used to verify message integrity;
-   * ALSO used as the IV for the symmetric cipher!  Everything
-   * after this hash will be encrypted.  ENCRYPTED_HEADER_SIZE
-   * must be set to the offset of the next field.
+   * Hash of the plaintext (starting at 'sequence_number'), used to
+   * verify message integrity.  Everything after this hash (including
+   * this hash itself) will be encrypted.  
    */
   GNUNET_HashCode plaintext_hash;
 
@@ -211,11 +212,10 @@ struct EncryptedMessage
   uint32_t sequence_number GNUNET_PACKED;
 
   /**
-   * Desired bandwidth (how much we should send to this
-   * peer / how much is the sender willing to receive),
-   * in bytes per minute.
+   * Desired bandwidth (how much we should send to this peer / how
+   * much is the sender willing to receive)?
    */
-  uint32_t inbound_bpm_limit GNUNET_PACKED;
+  struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
 
   /**
    * Timestamp.  Used to prevent reply of ancient messages
@@ -225,6 +225,7 @@ struct EncryptedMessage
 
 };
 
+
 /**
  * We're sending an (encrypted) PING to the other peer to check if he
  * can decrypt.  The other peer should respond with a PONG with the
@@ -233,7 +234,7 @@ struct EncryptedMessage
 struct PingMessage
 {
   /**
-   * Message type is either CORE_PING or CORE_PONG.
+   * Message type is CORE_PING.
    */
   struct GNUNET_MessageHeader header;
 
@@ -250,6 +251,43 @@ struct PingMessage
 };
 
 
+
+/**
+ * Response to a PING.  Includes data from the original PING
+ * plus initial bandwidth quota information.
+ */
+struct PongMessage
+{
+  /**
+   * Message type is CORE_PONG.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Random number proochosen to make reply harder.  Must be
+   * first field after header (this is where we start to encrypt!).
+   */
+  uint32_t challenge GNUNET_PACKED;
+
+  /**
+   * Must be zero.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /**
+   * Desired bandwidth (how much we should send to this
+   * peer / how much is the sender willing to receive).
+   */
+  struct GNUNET_BANDWIDTH_Value32NBO inbound_bw_limit;
+
+  /**
+   * Intended target of the PING, used primarily to check
+   * that decryption actually worked.
+   */
+  struct GNUNET_PeerIdentity target;
+};
+
+
 /**
  * Message transmitted to set (or update) a session key.
  */
@@ -303,15 +341,26 @@ struct MessageEntry
 {
 
   /**
-   * We keep messages in a linked list (for now).
+   * We keep messages in a doubly linked list.
    */
   struct MessageEntry *next;
 
+  /**
+   * We keep messages in a doubly linked list.
+   */
+  struct MessageEntry *prev;
+
   /**
    * By when are we supposed to transmit this message?
    */
   struct GNUNET_TIME_Absolute deadline;
 
+  /**
+   * By when are we supposed to transmit this message (after
+   * giving slack)?
+   */
+  struct GNUNET_TIME_Absolute slack_deadline;
+
   /**
    * How important is this message to us?
    */
@@ -328,7 +377,14 @@ struct MessageEntry
    * Was this message selected for transmission in the
    * current round? GNUNET_YES or GNUNET_NO.
    */
-  int16_t do_transmit;
+  int8_t do_transmit;
+
+  /**
+   * Did we give this message some slack (delayed sending) previously
+   * (and hence should not give it any more slack)? GNUNET_YES or
+   * GNUNET_NO.
+   */
+  int8_t got_slack;
 
 };
 
@@ -375,6 +431,25 @@ struct Neighbour
    */
   struct PingMessage *pending_ping;
 
+  /**
+   * We received a PONG message before we got the "public_key"
+   * (or the SET_KEY).  We keep it here until we have a key
+   * to decrypt it.  NULL if no PONG is pending.
+   */
+  struct PongMessage *pending_pong;
+
+  /**
+   * Non-NULL if we are currently looking up HELLOs for this peer.
+   * for this peer.
+   */
+  struct GNUNET_PEERINFO_IteratorContext *pitr;
+
+  /**
+   * SetKeyMessage to transmit, NULL if we are not currently trying
+   * to send one.
+   */
+  struct SetKeyMessage *skm;
+
   /**
    * Identity of the neighbour.
    */
@@ -402,6 +477,21 @@ struct Neighbour
    */
   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
 
+  /**
+   * ID of task used for updating bandwidth quota for this neighbour.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
+
+  /**
+   * ID of task used for sending keep-alive pings.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
+
+  /**
+   * ID of task used for cleaning up dead neighbour entries.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier dead_clean_task;
+
   /**
    * At what time did we generate our encryption key?
    */
@@ -430,42 +520,24 @@ struct Neighbour
   struct GNUNET_TIME_Relative last_latency;
 
   /**
-   * At what frequency are we currently re-trying SET KEY messages?
+   * At what frequency are we currently re-trying SET_KEY messages?
    */
   struct GNUNET_TIME_Relative set_key_retry_frequency;
 
   /**
-   * Time of our last update to the "available_send_window".
-   */
-  struct GNUNET_TIME_Absolute last_asw_update;
-
-  /**
-   * Time of our last update to the "available_recv_window".
-   */
-  struct GNUNET_TIME_Absolute last_arw_update;
-
-  /**
-   * Number of bytes that we are eligible to transmit to this
-   * peer at this point.  Incremented every minute by max_out_bpm,
-   * bounded by max_bpm (no back-log larger than MAX_BUF_FACT minutes,
-   * bandwidth-hogs are sampled at a frequency of about 78s!);
-   * may get negative if we have VERY high priority content.
+   * Tracking bandwidth for sending to this peer.
    */
-  long long available_send_window;
+  struct GNUNET_BANDWIDTH_Tracker available_send_window;
 
   /**
-   * How much downstream capacity of this peer has been reserved for
-   * our traffic?  (Our clients can request that a certain amount of
-   * bandwidth is available for replies to them; this value is used to
-   * make sure that this reserved amount of bandwidth is actually
-   * available).
+   * Tracking bandwidth for receiving from this peer.
    */
-  long long available_recv_window;
+  struct GNUNET_BANDWIDTH_Tracker available_recv_window;
 
   /**
    * How valueable were the messages of this peer recently?
    */
-  double current_preference;
+  unsigned long long current_preference;
 
   /**
    * Bit map indicating which of the 32 sequence numbers before the last
@@ -474,11 +546,6 @@ struct Neighbour
    */
   unsigned int last_packets_bitmap;
 
-  /**
-   * Number of messages in the message queue for this peer.
-   */
-  unsigned int message_queue_size;
-
   /**
    * last sequence number received on this connection (highest)
    */
@@ -492,61 +559,46 @@ struct Neighbour
   /**
    * Available bandwidth in for this peer (current target).
    */
-  uint32_t bpm_in;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_in;    
 
   /**
    * Available bandwidth out for this peer (current target).
    */
-  uint32_t bpm_out;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_out;  
 
   /**
-   * Internal bandwidth limit set for this peer (initially
-   * typcially set to "-1").  "bpm_out" is MAX of
-   * "bpm_out_internal_limit" and "bpm_out_external_limit".
+   * Internal bandwidth limit set for this peer (initially typically
+   * set to "-1").  Actual "bw_out" is MIN of
+   * "bpm_out_internal_limit" and "bw_out_external_limit".
    */
-  uint32_t bpm_out_internal_limit;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit;
 
   /**
    * External bandwidth limit set for this peer by the
-   * peer that we are communicating with.  "bpm_out" is MAX of
-   * "bpm_out_internal_limit" and "bpm_out_external_limit".
+   * peer that we are communicating with.  "bw_out" is MIN of
+   * "bw_out_internal_limit" and "bw_out_external_limit".
    */
-  uint32_t bpm_out_external_limit;
+  struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit;
 
   /**
-   * What was our PING challenge number?
+   * What was our PING challenge number (for this peer)?
    */
   uint32_t ping_challenge;
 
   /**
-   * What is our connection status?
-   */
-  enum PeerStateMachine status;
-
-};
-
-
-/**
- * Events are messages for clients.  The struct
- * itself is followed by the actual message.
- */
-struct Event
-{
-  /**
-   * This is a linked list.
+   * What was the last distance to this peer as reported by the transports?
    */
-  struct Event *next;
+  uint32_t last_distance;
 
   /**
-   * Size of the message.
+   * What is our connection status?
    */
-  size_t size;
+  enum PeerStateMachine status;
 
   /**
-   * Could this event be dropped if this queue
-   * is getting too large? (NOT YET USED!)
-   */
-  int can_drop;
+   * Are we currently connected to this neighbour?
+   */ 
+  int is_connected;
 
 };
 
@@ -566,29 +618,12 @@ struct Client
    */
   struct GNUNET_SERVER_Client *client_handle;
 
-  /**
-   * Linked list of messages we still need to deliver to
-   * this client.
-   */
-  struct Event *event_head;
-
-  /**
-   * Tail of the linked list of events.
-   */
-  struct Event *event_tail;
-
-  /**
-   * Current transmit handle, NULL if no transmission request
-   * is pending.
-   */
-  struct GNUNET_NETWORK_TransmitHandle *th;
-
   /**
    * Array of the types of messages this peer cares
    * about (with "tcnt" entries).  Allocated as part
    * of this client struct, do not free!
    */
-  uint16_t *types;
+  const uint16_t *types;
 
   /**
    * Options for messages this client cares about,
@@ -628,7 +663,7 @@ struct GNUNET_SCHEDULER_Handle *sched;
 /**
  * Our configuration.
  */
-struct GNUNET_CONFIGURATION_Handle *cfg;
+const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
  * Our server.
@@ -640,39 +675,75 @@ static struct GNUNET_SERVER_Handle *server;
  */
 static struct GNUNET_TRANSPORT_Handle *transport;
 
+/**
+ * Linked list of our clients.
+ */
+static struct Client *clients;
+
+/**
+ * Context for notifications we need to send to our clients.
+ */
+static struct GNUNET_SERVER_NotificationContext *notifier;
+
 /**
  * We keep neighbours in a linked list (for now).
  */
 static struct Neighbour *neighbours;
 
 /**
- * Linked list of our clients.
+ * For creating statistics.
  */
-static struct Client *clients;
+static struct GNUNET_STATISTICS_Handle *stats;
+
+/**
+ * Sum of all preferences among all neighbours.
+ */
+static unsigned long long preference_sum;
+
+/**
+ * Total number of neighbours we have.
+ */
+static unsigned int neighbour_count;
+
+/**
+ * How much inbound bandwidth are we supposed to be using per second?
+ * FIXME: this value is not used!
+ */
+static unsigned long long bandwidth_target_in_bps;
+
+/**
+ * How much outbound bandwidth are we supposed to be using per second?
+ */
+static unsigned long long bandwidth_target_out_bps;
+
 
 
 /**
- * Recalculate the number of bytes we expect to
- * receive or transmit in a given window.
+ * A preference value for a neighbour was update.  Update
+ * the preference sum accordingly.
  *
- * @param window pointer to the byte counter (updated)
- * @param ts pointer to the timestamp (updated)
- * @param bpm number of bytes per minute that should
- *        be added to the window.
+ * @param inc how much was a preference value increased?
  */
 static void
-update_window (long long *window,
-               struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
+update_preference_sum (unsigned long long inc)
 {
-  struct GNUNET_TIME_Relative since;
-
-  since = GNUNET_TIME_absolute_get_duration (*ts);
-  if (since.value < 60 * 1000)
-    return;                     /* not even a minute has passed */
-  *ts = GNUNET_TIME_absolute_get ();
-  *window += (bpm * since.value) / 60 / 1000;
-  if (*window > MAX_WINDOW_TIME * bpm)
-    *window = MAX_WINDOW_TIME * bpm;
+  struct Neighbour *n;
+  unsigned long long os;
+
+  os = preference_sum;
+  preference_sum += inc;
+  if (preference_sum >= os)
+    return; /* done! */
+  /* overflow! compensate by cutting all values in half! */
+  preference_sum = 0;
+  n = neighbours;
+  while (n != NULL)
+    {
+      n->current_preference /= 2;
+      preference_sum += n->current_preference;
+      n = n->next;
+    }    
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), preference_sum, GNUNET_NO);
 }
 
 
@@ -697,112 +768,9 @@ find_neighbour (const struct GNUNET_PeerIdentity *peer)
 }
 
 
-/**
- * Find the entry for the given client.
- *
- * @param client handle for the client
- * @return NULL if we are not connected, otherwise the
- *         client's struct.
- */
-static struct Client *
-find_client (const struct GNUNET_SERVER_Client *client)
-{
-  struct Client *ret;
-
-  ret = clients;
-  while ((ret != NULL) && (client != ret->client_handle))
-    ret = ret->next;
-  return ret;
-}
-
-
-/**
- * If necessary, initiate a request with the server to
- * transmit messages from the queue of the given client.
- * @param client who to transfer messages to
- */
-static void request_transmit (struct Client *client);
-
-
-/**
- * Client is ready to receive data, provide it.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-do_client_transmit (void *cls, size_t size, void *buf)
-{
-  struct Client *client = cls;
-  struct Event *e;
-  char *tgt;
-  size_t ret;
-
-  client->th = NULL;
-#if DEBUG_CORE_CLIENT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client ready to receive %u bytes.\n", size);
-#endif
-  if (buf == NULL)
-    {
-#if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Failed to transmit data to client (disconnect)?\n");
-#endif
-      return 0;                 /* we'll surely get a disconnect soon... */
-    }
-  tgt = buf;
-  ret = 0;
-  while ((NULL != (e = client->event_head)) && (e->size <= size))
-    {
-      memcpy (&tgt[ret], &e[1], e->size);
-      size -= e->size;
-      ret += e->size;
-      client->event_head = e->next;
-      GNUNET_free (e);
-    }
-  GNUNET_assert (ret > 0);
-  if (client->event_head == NULL)
-    client->event_tail = NULL;
-#if DEBUG_CORE_CLIENT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting %u bytes to client\n", ret);
-#endif
-  request_transmit (client);
-  return ret;
-}
-
-
-/**
- * If necessary, initiate a request with the server to
- * transmit messages from the queue of the given client.
- * @param client who to transfer messages to
- */
-static void
-request_transmit (struct Client *client)
-{
-
-  if (NULL != client->th)
-    return;                     /* already pending */
-  if (NULL == client->event_head)
-    return;                     /* no more events pending */
-#if DEBUG_CORE_CLIENT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asking server to transmit %u bytes to client\n",
-              client->event_head->size);
-#endif
-  client->th
-    = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
-                                           client->event_head->size,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           &do_client_transmit, client);
-}
-
-
 /**
  * Send a message to one of our clients.
+ *
  * @param client target for the message
  * @param msg message to transmit
  * @param can_drop could this message be dropped if the
@@ -810,61 +778,48 @@ request_transmit (struct Client *client)
  */
 static void
 send_to_client (struct Client *client,
-                const struct GNUNET_MessageHeader *msg, int can_drop)
+                const struct GNUNET_MessageHeader *msg, 
+               int can_drop)
 {
-  struct Event *e;
-  unsigned int queue_size;
-  uint16_t msize;
-
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Preparing to send message of type %u to client.\n",
               ntohs (msg->type));
-#endif
-  queue_size = 0;
-  e = client->event_head;
-  while (e != NULL)
-    {
-      queue_size++;
-      e = e->next;
-    }
-  if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
-       (can_drop == GNUNET_YES) )
-    {
-#if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                 "Too many messages in queue for the client, dropping the new message.\n");
-#endif
-      return;
-    }
-
-  msize = ntohs (msg->size);
-  e = GNUNET_malloc (sizeof (struct Event) + msize);
-  /* append */
-  if (client->event_tail != NULL)
-    client->event_tail->next = e;
-  else
-    client->event_head = e;
-  client->event_tail = e;
-  e->can_drop = can_drop;
-  e->size = msize;
-  memcpy (&e[1], msg, msize);
-  request_transmit (client);
+#endif  
+  GNUNET_SERVER_notification_context_unicast (notifier,
+                                             client->client_handle,
+                                             msg,
+                                             can_drop);
 }
 
 
 /**
- * Send a message to all of our current clients.
+ * Send a message to all of our current clients that have
+ * the right options set.
+ * 
+ * @param msg message to multicast
+ * @param can_drop can this message be discarded if the queue is too long
+ * @param options mask to use 
  */
 static void
-send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
+send_to_all_clients (const struct GNUNET_MessageHeader *msg, 
+                    int can_drop,
+                    int options)
 {
   struct Client *c;
 
   c = clients;
   while (c != NULL)
     {
-      send_to_client (c, msg, can_drop);
+      if (0 != (c->options & options))
+       {
+#if DEBUG_CORE_CLIENT
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Sending message of type %u to client.\n",
+                     ntohs (msg->type));
+#endif
+         send_to_client (c, msg, can_drop);
+       }
       c = c->next;
     }
 }
@@ -883,8 +838,10 @@ handle_client_init (void *cls,
   struct Client *c;
   uint16_t msize;
   const uint16_t *types;
+  uint16_t *wtypes;
   struct Neighbour *n;
   struct ConnectNotifyMessage cnm;
+  unsigned int i;
 
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -910,6 +867,7 @@ handle_client_init (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+  GNUNET_SERVER_notification_context_add (notifier, client);
   im = (const struct InitMessage *) message;
   types = (const uint16_t *) &im[1];
   msize -= sizeof (struct InitMessage);
@@ -917,10 +875,18 @@ handle_client_init (void *cls,
   c->client_handle = client;
   c->next = clients;
   clients = c;
-  memcpy (&c[1], types, msize);
-  c->types = (uint16_t *) & c[1];
-  c->options = ntohl (im->options);
   c->tcnt = msize / sizeof (uint16_t);
+  c->types = (const uint16_t *) &c[1];
+  wtypes = (uint16_t *) &c[1];
+  for (i=0;i<c->tcnt;i++)
+    wtypes[i] = ntohs (types[i]);
+  c->options = ntohl (im->options);
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Client %p is interested in %u message types\n",
+             c,
+             c->tcnt);
+#endif
   /* send init reply message */
   irm.header.size = htons (sizeof (struct InitReplyMessage));
   irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
@@ -933,21 +899,27 @@ handle_client_init (void *cls,
               "Sending `%s' message to client.\n", "INIT_REPLY");
 #endif
   send_to_client (c, &irm.header, GNUNET_NO);
-  /* notify new client about existing neighbours */
-  cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
-  cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-  n = neighbours;
-  while (n != NULL)
+  if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
     {
+      /* notify new client about existing neighbours */
+      cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+      cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+      n = neighbours;
+      while (n != NULL)
+       {
+         if (n->status == PEER_STATE_KEY_CONFIRMED)
+           {
 #if DEBUG_CORE_CLIENT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
 #endif
-      cnm.bpm_available = htonl (n->bpm_out);
-      cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
-      cnm.peer = n->peer;
-      send_to_client (c, &cnm.header, GNUNET_NO);
-      n = n->next;
+             cnm.distance = htonl (n->last_distance);
+             cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
+             cnm.peer = n->peer;
+             send_to_client (c, &cnm.header, GNUNET_NO);
+           }
+         n = n->next;
+       }
     }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -964,11 +936,13 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
 {
   struct Client *pos;
   struct Client *prev;
-  struct Event *e;
 
+  if (client == NULL)
+    return;
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client has disconnected from core service.\n");
+              "Client %p has disconnected from core service.\n",
+             client);
 #endif
   prev = NULL;
   pos = clients;
@@ -980,13 +954,6 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
             clients = pos->next;
           else
             prev->next = pos->next;
-          if (pos->th != NULL)
-            GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
-         while (NULL != (e = pos->event_head))
-           {
-             pos->event_head = e->next;
-             GNUNET_free (e);
-           }
           GNUNET_free (pos);
           return;
         }
@@ -998,80 +965,342 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
 
 
 /**
- * Handle REQUEST_CONFIGURE request.
+ * Handle REQUEST_INFO request.
  */
 static void
-handle_client_request_configure (void *cls,
-                                 struct GNUNET_SERVER_Client *client,
-                                 const struct GNUNET_MessageHeader *message)
+handle_client_request_info (void *cls,
+                           struct GNUNET_SERVER_Client *client,
+                           const struct GNUNET_MessageHeader *message)
 {
-  const struct RequestConfigureMessage *rcm;
+  const struct RequestInfoMessage *rcm;
   struct Neighbour *n;
   struct ConfigurationInfoMessage cim;
-  struct Client *c;
-  int reserv;
+  int32_t want_reserv;
+  int32_t got_reserv;
+  unsigned long long old_preference;
+  struct GNUNET_SERVER_TransmitContext *tc;
 
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core service receives `%s' request.\n", "CONFIGURE");
+              "Core service receives `%s' request.\n", "REQUEST_INFO");
 #endif
-  rcm = (const struct RequestConfigureMessage *) message;
+  rcm = (const struct RequestInfoMessage *) message;
   n = find_neighbour (&rcm->peer);
   memset (&cim, 0, sizeof (cim));
-  if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
+  if (n != NULL) 
     {
-      n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
-      n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
-                               n->bpm_out_external_limit);
-      reserv = ntohl (rcm->reserve_inbound);
-      if (reserv < 0)
+      want_reserv = ntohl (rcm->reserve_inbound);
+      if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
+       {
+         n->bw_out_internal_limit = rcm->limit_outbound;
+         n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
+                                                 n->bw_out_external_limit);
+         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
+                                                n->bw_out);
+         GNUNET_TRANSPORT_set_quota (transport,
+                                     &n->peer,
+                                     n->bw_in,
+                                     n->bw_out,
+                                     GNUNET_TIME_UNIT_FOREVER_REL,
+                                     NULL, NULL); 
+       }
+      if (want_reserv < 0)
         {
-          n->available_recv_window += reserv;
+         got_reserv = want_reserv;
         }
-      else if (reserv > 0)
+      else if (want_reserv > 0)
         {
-          update_window (&n->available_recv_window,
-                         &n->last_arw_update, n->bpm_in);
-          if (n->available_recv_window < reserv)
-            reserv = n->available_recv_window;
-          n->available_recv_window -= reserv;
+         if (GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
+                                                 want_reserv).value == 0)
+           got_reserv = want_reserv;
+         else
+            got_reserv = 0; /* all or nothing */
         }
-      n->current_preference += rcm->preference_change;
-      if (n->current_preference < 0)
-        n->current_preference = 0;
-      cim.reserved_amount = htonl (reserv);
-      cim.bpm_in = htonl (n->bpm_in);
-      cim.bpm_out = htonl (n->bpm_out);
-      cim.latency = GNUNET_TIME_relative_hton (n->last_latency);
+      else
+       got_reserv = 0;
+      GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window,
+                                       got_reserv);
+      old_preference = n->current_preference;
+      n->current_preference += GNUNET_ntohll(rcm->preference_change);
+      if (old_preference > n->current_preference) 
+       {
+         /* overflow; cap at maximum value */
+         n->current_preference = (unsigned long long) -1;
+       }
+      update_preference_sum (n->current_preference - old_preference);
+#if DEBUG_CORE_QUOTA
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Received reservation request for %d bytes for peer `%4s', reserved %d bytes\n",
+                 (int) want_reserv,
+                 GNUNET_i2s (&rcm->peer),
+                 (int) got_reserv);
+#endif
+      cim.reserved_amount = htonl (got_reserv);
+      cim.bw_in = n->bw_in;
+      cim.bw_out = n->bw_out;
       cim.preference = n->current_preference;
     }
   cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
   cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
   cim.peer = rcm->peer;
-  c = find_client (client);
-  if (c == NULL)
-    {
-      GNUNET_break (0);
-      return;
-    }
+
 #if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
 #endif
-  send_to_client (c, &cim.header, GNUNET_NO);
+  tc = GNUNET_SERVER_transmit_context_create (client);
+  GNUNET_SERVER_transmit_context_append_message (tc, &cim.header);
+  GNUNET_SERVER_transmit_context_run (tc,
+                                     GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
 
 /**
- * Check if we have encrypted messages for the specified neighbour
- * pending, and if so, check with the transport about sending them
- * out.
- *
+ * Free the given entry for the neighbour (it has
+ * already been removed from the list at this point).
+ *
+ * @param n neighbour to free
+ */
+static void
+free_neighbour (struct Neighbour *n)
+{
+  struct MessageEntry *m;
+
+  if (n->pitr != NULL)
+    {
+      GNUNET_PEERINFO_iterate_cancel (n->pitr);
+      n->pitr = NULL;
+    }
+  if (n->skm != NULL)
+    {
+      GNUNET_free (n->skm);
+      n->skm = NULL;
+    }
+  while (NULL != (m = n->messages))
+    {
+      n->messages = m->next;
+      GNUNET_free (m);
+    }
+  while (NULL != (m = n->encrypted_head))
+    {
+      GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+                                  n->encrypted_tail,
+                                  m);
+      GNUNET_free (m);
+    }
+  if (NULL != n->th)
+    {
+      GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
+      n->th = NULL;
+    }
+  if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
+  if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+  if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
+  if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
+  if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)    
+      GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+  if (n->status == PEER_STATE_KEY_CONFIRMED)
+    GNUNET_STATISTICS_update (stats, gettext_noop ("# peers connected"), -1, GNUNET_NO);
+  GNUNET_free_non_null (n->public_key);
+  GNUNET_free_non_null (n->pending_ping);
+  GNUNET_free_non_null (n->pending_pong);
+  GNUNET_free (n);
+}
+
+
+/**
+ * Check if we have encrypted messages for the specified neighbour
+ * pending, and if so, check with the transport about sending them
+ * out.
+ *
  * @param n neighbour to check.
  */
 static void process_encrypted_neighbour_queue (struct Neighbour *n);
 
 
+/**
+ * Encrypt size bytes from in and write the result to out.  Use the
+ * key for outbound traffic of the given neighbour.
+ *
+ * @param n neighbour we are sending to
+ * @param iv initialization vector to use
+ * @param in ciphertext
+ * @param out plaintext
+ * @param size size of in/out
+ * @return GNUNET_OK on success
+ */
+static int
+do_encrypt (struct Neighbour *n,
+            const GNUNET_HashCode * iv,
+            const void *in, void *out, size_t size)
+{
+  if (size != (uint16_t) size)
+    {
+      GNUNET_break (0);
+      return GNUNET_NO;
+    }
+  GNUNET_assert (size ==
+                 GNUNET_CRYPTO_aes_encrypt (in,
+                                            (uint16_t) size,
+                                            &n->encrypt_key,
+                                            (const struct
+                                             GNUNET_CRYPTO_AesInitializationVector
+                                             *) iv, out));
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes encrypted"), size, GNUNET_NO);
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Encrypted %u bytes for `%4s' using key %u\n", size,
+              GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
+#endif
+  return GNUNET_OK;
+}
+
+
+/**
+ * Consider freeing the given neighbour since we may not need
+ * to keep it around anymore.
+ *
+ * @param n neighbour to consider discarding
+ */
+static void
+consider_free_neighbour (struct Neighbour *n);
+
+
+/**
+ * Task triggered when a neighbour entry is about to time out 
+ * (and we should prevent this by sending a PING).
+ *
+ * @param cls the 'struct Neighbour'
+ * @param tc scheduler context (not used)
+ */
+static void
+send_keep_alive (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+  struct GNUNET_TIME_Relative retry;
+  struct GNUNET_TIME_Relative left;
+  struct MessageEntry *me;
+  struct PingMessage pp;
+  struct PingMessage *pm;
+
+  n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
+  /* send PING */
+  me = GNUNET_malloc (sizeof (struct MessageEntry) +
+                      sizeof (struct PingMessage));
+  me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
+  me->priority = PING_PRIORITY;
+  me->size = sizeof (struct PingMessage);
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
+  pm = (struct PingMessage *) &me[1];
+  pm->header.size = htons (sizeof (struct PingMessage));
+  pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
+  pp.challenge = htonl (n->ping_challenge);
+  pp.target = n->peer;
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Encrypting `%s' and `%s' messages for `%4s'.\n",
+              "SET_KEY", "PING", GNUNET_i2s (&n->peer));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
+              "PING",
+              GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
+#endif
+  do_encrypt (n,
+              &n->peer.hashPubKey,
+              &pp.challenge,
+              &pm->challenge,
+              sizeof (struct PingMessage) -
+              sizeof (struct GNUNET_MessageHeader));
+  process_encrypted_neighbour_queue (n);
+  /* reschedule PING job */
+  left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
+                                                                      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
+  retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
+                                   MIN_PING_FREQUENCY);
+  n->keep_alive_task 
+    = GNUNET_SCHEDULER_add_delayed (sched, 
+                                   retry,
+                                   &send_keep_alive,
+                                   n);
+
+}
+
+
+/**
+ * Task triggered when a neighbour entry might have gotten stale.
+ *
+ * @param cls the 'struct Neighbour'
+ * @param tc scheduler context (not used)
+ */
+static void
+consider_free_task (void *cls,
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+
+  n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK;
+  consider_free_neighbour (n);
+}
+
+
+/**
+ * Consider freeing the given neighbour since we may not need
+ * to keep it around anymore.
+ *
+ * @param n neighbour to consider discarding
+ */
+static void
+consider_free_neighbour (struct Neighbour *n)
+{ 
+  struct Neighbour *pos;
+  struct Neighbour *prev;
+  struct GNUNET_TIME_Relative left;
+
+  if ( (n->th != NULL) ||
+       (n->pitr != NULL) ||
+       (n->status == PEER_STATE_KEY_CONFIRMED) ||
+       (GNUNET_YES == n->is_connected) )
+    return; /* no chance */
+  
+  left = GNUNET_TIME_absolute_get_remaining (GNUNET_TIME_absolute_add (n->last_activity,
+                                                                      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT));
+  if (left.value > 0)
+    {
+      if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched, n->dead_clean_task);
+      n->dead_clean_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                        left,
+                                                        &consider_free_task,
+                                                        n);
+      return;
+    }
+  /* actually free the neighbour... */
+  prev = NULL;
+  pos = neighbours;
+  while (pos != n)
+    {
+      prev = pos;
+      pos = pos->next;
+    }
+  if (prev == NULL)
+    neighbours = n->next;
+  else
+    prev->next = n->next;
+  GNUNET_assert (neighbour_count > 0);
+  neighbour_count--;
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
+  free_neighbour (n);
+}
+
+
 /**
  * Function called when the transport service is ready to
  * receive an encrypted message for the respective peer
@@ -1091,9 +1320,9 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
 
   n->th = NULL;
   GNUNET_assert (NULL != (m = n->encrypted_head));
-  n->encrypted_head = m->next;
-  if (m->next == NULL)
-    n->encrypted_tail = NULL;
+  GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+                              n->encrypted_tail,
+                              m);
   ret = 0;
   cbuf = buf;
   if (buf != NULL)
@@ -1101,22 +1330,27 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
       GNUNET_assert (size >= m->size);
       memcpy (cbuf, &m[1], m->size);
       ret = m->size;
-      process_encrypted_neighbour_queue (n);
+      GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window,
+                                       m->size);
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                   ret, GNUNET_i2s (&n->peer));
 #endif
+      process_encrypted_neighbour_queue (n);
     }
   else
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Transmission for message of type %u and size %u failed\n",
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Transmission of message of type %u and size %u failed\n",
                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                   m->size);
+#endif
     }
   GNUNET_free (m);
+  consider_free_neighbour (n);
   return ret;
 }
 
@@ -1145,7 +1379,8 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
  
   if (n->th != NULL)
     return;  /* request already pending */
-  if (n->encrypted_head == NULL)
+  m = n->encrypted_head;
+  if (m == NULL)
     {
       /* encrypted queue empty, try plaintext instead */
       process_plaintext_neighbour_queue (n);
@@ -1154,28 +1389,27 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
-              n->encrypted_head->size,
+              m->size,
               GNUNET_i2s (&n->peer),
-              GNUNET_TIME_absolute_get_remaining (n->
-                                                  encrypted_head->deadline).
+              GNUNET_TIME_absolute_get_remaining (m->deadline).
               value);
 #endif
   n->th =
     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
-                                            n->encrypted_head->size,
+                                            m->size,
+                                           m->priority,
                                             GNUNET_TIME_absolute_get_remaining
-                                            (n->encrypted_head->deadline),
+                                            (m->deadline),
                                             &notify_encrypted_transmit_ready,
                                             n);
   if (n->th == NULL)
     {
-      /* message request too large (oops) */
+      /* message request too large or duplicate request */
       GNUNET_break (0);
       /* discard encrypted message */
-      GNUNET_assert (NULL != (m = n->encrypted_head));
-      n->encrypted_head = m->next;
-      if (m->next == NULL)
-       n->encrypted_tail = NULL;
+      GNUNET_CONTAINER_DLL_remove (n->encrypted_head,
+                                  n->encrypted_tail,
+                                  m);
       GNUNET_free (m);
       process_encrypted_neighbour_queue (n);
     }
@@ -1211,16 +1445,17 @@ do_decrypt (struct Neighbour *n,
       return GNUNET_SYSERR;
     }
   if (size !=
-      GNUNET_CRYPTO_aes_decrypt (&n->decrypt_key,
-                                 in,
+      GNUNET_CRYPTO_aes_decrypt (in,
                                  (uint16_t) size,
-                                 (const struct
+                                 &n->decrypt_key,
+                                (const struct
                                   GNUNET_CRYPTO_AesInitializationVector *) iv,
                                  out))
     {
       GNUNET_break (0);
       return GNUNET_SYSERR;
     }
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes decrypted"), size, GNUNET_NO);
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted %u bytes from `%4s' using key %u\n",
@@ -1230,43 +1465,6 @@ do_decrypt (struct Neighbour *n,
 }
 
 
-/**
- * Encrypt size bytes from in and write the result to out.  Use the
- * key for outbound traffic of the given neighbour.
- *
- * @param n neighbour we are sending to
- * @param iv initialization vector to use
- * @param in ciphertext
- * @param out plaintext
- * @param size size of in/out
- * @return GNUNET_OK on success
- */
-static int
-do_encrypt (struct Neighbour *n,
-            const GNUNET_HashCode * iv,
-            const void *in, void *out, size_t size)
-{
-  if (size != (uint16_t) size)
-    {
-      GNUNET_break (0);
-      return GNUNET_NO;
-    }
-  GNUNET_assert (size ==
-                 GNUNET_CRYPTO_aes_encrypt (in,
-                                            (uint16_t) size,
-                                            &n->encrypt_key,
-                                            (const struct
-                                             GNUNET_CRYPTO_AesInitializationVector
-                                             *) iv, out));
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Encrypted %u bytes for `%4s' using key %u\n", size,
-              GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
-#endif
-  return GNUNET_OK;
-}
-
-
 /**
  * Select messages for transmission.  This heuristic uses a combination
  * of earliest deadline first (EDF) scheduling (with bounded horizon)
@@ -1294,10 +1492,12 @@ select_messages (struct Neighbour *n,
   unsigned int min_prio;
   struct GNUNET_TIME_Absolute t;
   struct GNUNET_TIME_Absolute now;
-  uint64_t delta;
+  struct GNUNET_TIME_Relative delta;
   uint64_t avail;
-  unsigned long long slack;     /* how long could we wait before missing deadlines? */
+  struct GNUNET_TIME_Relative slack;     /* how long could we wait before missing deadlines? */
   size_t off;
+  uint64_t tsize;
+  unsigned int queue_size;
   int discard_low_prio;
 
   GNUNET_assert (NULL != n->messages);
@@ -1307,28 +1507,36 @@ select_messages (struct Neighbour *n,
   /* should we remove the entry with the lowest
      priority from consideration for scheduling at the
      end of the loop? */
+  queue_size = 0;
+  tsize = 0;
+  pos = n->messages;
+  while (pos != NULL)
+    {
+      queue_size++;
+      tsize += pos->size;
+      pos = pos->next;
+    }
   discard_low_prio = GNUNET_YES;
   while (GNUNET_YES == discard_low_prio)
     {
       min = NULL;
       min_prio = -1;
       discard_low_prio = GNUNET_NO;
-      /* number of bytes available for transmission at time "t" */
-      avail = n->available_send_window;
-      t = n->last_asw_update;
-      /* how many bytes have we (hyptothetically) scheduled so far */
+      /* calculate number of bytes available for transmission at time "t" */
+      avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
+      t = now;
+      /* how many bytes have we (hypothetically) scheduled so far */
       off = 0;
       /* maximum time we can wait before transmitting anything
          and still make all of our deadlines */
-      slack = -1;
-
+      slack = MAX_CORK_DELAY;
       pos = n->messages;
       /* note that we use "*2" here because we want to look
          a bit further into the future; much more makes no
          sense since new message might be scheduled in the
          meantime... */
       while ((pos != NULL) && (off < size * 2))
-        {
+        {         
           if (pos->do_transmit == GNUNET_YES)
             {
               /* already removed from consideration */
@@ -1337,14 +1545,17 @@ select_messages (struct Neighbour *n,
             }
           if (discard_low_prio == GNUNET_NO)
             {
-              delta = pos->deadline.value;
-              if (delta < t.value)
-                delta = 0;
-              else
-                delta = t.value - delta;
-              avail += delta * n->bpm_out / 1000 / 60;
+             delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
+             if (delta.value > 0)
+               {
+                 // FIXME: HUH? Check!
+                 t = pos->deadline;
+                 avail += GNUNET_BANDWIDTH_value_get_available_until (n->bw_out,
+                                                                      delta);
+               }
               if (avail < pos->size)
                 {
+                 // FIXME: HUH? Check!
                   discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
                 }
               else
@@ -1353,16 +1564,33 @@ select_messages (struct Neighbour *n,
                   /* update slack, considering both its absolute deadline
                      and relative deadlines caused by other messages
                      with their respective load */
-                  slack = GNUNET_MIN (slack, avail / n->bpm_out);
-                  if (pos->deadline.value < now.value)
-                    slack = 0;
+                  slack = GNUNET_TIME_relative_min (slack,
+                                                   GNUNET_BANDWIDTH_value_get_delay_for (n->bw_out,
+                                                                                         avail));
+                  if (pos->deadline.value <= now.value) 
+                   {
+                     /* now or never */
+                     slack = GNUNET_TIME_UNIT_ZERO;
+                   }
+                 else if (GNUNET_YES == pos->got_slack)
+                   {
+                     /* should be soon now! */
+                     slack = GNUNET_TIME_relative_min (slack,
+                                                       GNUNET_TIME_absolute_get_remaining (pos->slack_deadline));
+                   }
                   else
-                    slack =
-                      GNUNET_MIN (slack, pos->deadline.value - now.value);
+                   {
+                     slack =
+                       GNUNET_TIME_relative_min (slack, 
+                                                 GNUNET_TIME_absolute_get_difference (now, pos->deadline));
+                     pos->got_slack = GNUNET_YES;
+                     pos->slack_deadline = GNUNET_TIME_absolute_min (pos->deadline,
+                                                                     GNUNET_TIME_relative_to_absolute (MAX_CORK_DELAY));
+                   }
                 }
             }
           off += pos->size;
-          t.value = GNUNET_MAX (pos->deadline.value, t.value);
+          t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check!
           if (pos->priority <= min_prio)
             {
               /* update min for discard */
@@ -1381,18 +1609,28 @@ select_messages (struct Neighbour *n,
     }
   /* guard against sending "tiny" messages with large headers without
      urgent deadlines */
-  if ( (slack > 1000) && (size > 4 * off) )
-    {
-      /* less than 25% of message would be filled with
-         deadlines still being met if we delay by one
-         second or more; so just wait for more data */
-      retry_time->value = slack / 2;
+  if ( (slack.value > 0) && 
+       (size > 4 * off) &&
+       (queue_size <= MAX_PEER_QUEUE_SIZE - 2) )
+    {
+      /* less than 25% of message would be filled with deadlines still
+         being met if we delay by one second or more; so just wait for
+         more data; but do not wait longer than 1s (since we don't want
+        to delay messages for a really long time either). */
+      *retry_time = MAX_CORK_DELAY;
       /* reset do_transmit values for next time */
       while (pos != last)
         {
-          pos->do_transmit = GNUNET_NO;
+          pos->do_transmit = GNUNET_NO;          
           pos = pos->next;
         }
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
+                 (unsigned long long) slack.value,
+                 (unsigned int) off,
+                 (unsigned int) size);
+#endif
       return 0;
     }
   /* select marked messages (up to size) for transmission */
@@ -1412,8 +1650,10 @@ select_messages (struct Neighbour *n,
     }
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
-              off, GNUNET_i2s (&n->peer));
+              "Selected %u/%u bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
+              off, tsize,
+             queue_size, MAX_PEER_QUEUE_SIZE,
+             GNUNET_i2s (&n->peer));
 #endif
   return off;
 }
@@ -1439,22 +1679,31 @@ batch_message (struct Neighbour *n,
                struct GNUNET_TIME_Relative *retry_time,
                unsigned int *priority)
 {
+  char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+  struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage*) ntmb;
   struct MessageEntry *pos;
   struct MessageEntry *prev;
   struct MessageEntry *next;
   size_t ret;
-
+  
   ret = 0;
   *priority = 0;
   *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
   if (0 == select_messages (n, size, retry_time))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "No messages selected, will try again in %llu ms\n",
                   retry_time->value);
+#endif
       return 0;
     }
+  ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
+  ntm->distance = htonl (n->last_distance);
+  ntm->latency = GNUNET_TIME_relative_hton (n->last_latency);
+  ntm->peer = n->peer;
+  
   pos = n->messages;
   prev = NULL;
   while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
@@ -1463,10 +1712,48 @@ batch_message (struct Neighbour *n,
       if (GNUNET_YES == pos->do_transmit)
         {
           GNUNET_assert (pos->size <= size);
+         /* do notifications */
+         /* FIXME: track if we have *any* client that wants
+            full notifications and only do this if that is
+            actually true */
+         if (pos->size < GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
+           {
+             memcpy (&ntm[1], &pos[1], pos->size);
+             ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
+                                       sizeof (struct GNUNET_MessageHeader));
+             send_to_all_clients (&ntm->header,
+                                  GNUNET_YES,
+                                  GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
+           }
+         else
+           {
+             /* message too large for 'full' notifications, we do at
+                least the 'hdr' type */
+             memcpy (&ntm[1],
+                     &pos[1],
+                     sizeof (struct GNUNET_MessageHeader));
+           }
+         ntm->header.size = htons (sizeof (struct NotifyTrafficMessage) + 
+                                   pos->size);
+         send_to_all_clients (&ntm->header,
+                              GNUNET_YES,
+                              GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);   
+#if DEBUG_HANDSHAKE
+         fprintf (stderr,
+                  "Encrypting message of type %u\n",
+                  ntohs(((struct GNUNET_MessageHeader*)&pos[1])->type));
+#endif
+         /* copy for encrypted transmission */
           memcpy (&buf[ret], &pos[1], pos->size);
           ret += pos->size;
           size -= pos->size;
           *priority += pos->priority;
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Adding plaintext message of size %u with deadline %llu ms to batch\n",
+                     pos->size,
+                     GNUNET_TIME_absolute_get_remaining (pos->deadline).value);
+#endif
           deadline->value = GNUNET_MIN (deadline->value, pos->deadline.value);
           GNUNET_free (pos);
           if (prev == NULL)
@@ -1480,6 +1767,11 @@ batch_message (struct Neighbour *n,
         }
       pos = next;
     }
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Deadline for message batch is %llu ms\n",
+             GNUNET_TIME_absolute_get_remaining (*deadline).value);
+#endif
   return ret;
 }
 
@@ -1538,7 +1830,7 @@ retry_plaintext_processing (void *cls,
 {
   struct Neighbour *n = cls;
 
-  n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+  n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
   process_plaintext_neighbour_queue (n);
 }
 
@@ -1550,6 +1842,26 @@ retry_plaintext_processing (void *cls,
  */
 static void send_key (struct Neighbour *n);
 
+/**
+ * Task that will retry "send_key" if our previous attempt failed
+ * to yield a PONG.
+ */
+static void
+set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrying key transmission to `%4s'\n",
+             GNUNET_i2s (&n->peer));
+#endif
+  n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
+  n->set_key_retry_frequency =
+    GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
+  send_key (n);
+}
+
 
 /**
  * Check if we have plaintext messages for the specified neighbour
@@ -1570,11 +1882,12 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   unsigned int priority;
   struct GNUNET_TIME_Absolute deadline;
   struct GNUNET_TIME_Relative retry_time;
+  GNUNET_HashCode iv;
 
-  if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+  if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
     {
       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
-      n->retry_plaintext_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+      n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
     }
   switch (n->status)
     {
@@ -1587,8 +1900,11 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
 #endif
       return;
     case PEER_STATE_KEY_SENT:
-      GNUNET_assert (n->retry_set_key_task !=
-                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+      if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
+       n->retry_set_key_task
+         = GNUNET_SCHEDULER_add_delayed (sched,
+                                         n->set_key_retry_frequency,
+                                         &set_key_retry_task, n);    
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
@@ -1596,8 +1912,11 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
 #endif
       return;
     case PEER_STATE_KEY_RECEIVED:
-      GNUNET_assert (n->retry_set_key_task !=
-                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+      if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)        
+       n->retry_set_key_task
+         = GNUNET_SCHEDULER_add_delayed (sched,
+                                         n->set_key_retry_frequency,
+                                         &set_key_retry_task, n);        
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
@@ -1646,16 +1965,19 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
       /* no messages selected for sending, try again later... */
       n->retry_plaintext_task =
         GNUNET_SCHEDULER_add_delayed (sched,
-                                      GNUNET_NO,
-                                      GNUNET_SCHEDULER_PRIORITY_IDLE,
-                                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
                                       retry_time,
                                       &retry_plaintext_processing, n);
       return;
     }
-
+#if DEBUG_CORE_QUOTA
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending %u b/s as new limit to peer `%4s'\n",
+             (unsigned int) ntohl (n->bw_in.value__),
+             GNUNET_i2s (&n->peer));
+#endif
+  ph->iv_seed = htonl (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1));
   ph->sequence_number = htonl (++n->last_sequence_number_sent);
-  ph->inbound_bpm_limit = htonl (n->bpm_in);
+  ph->inbound_bw_limit = n->bw_in;
   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
 
   /* setup encryption message header */
@@ -1666,79 +1988,104 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   em = (struct EncryptedMessage *) &me[1];
   em->header.size = htons (used);
   em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
-  em->reserved = htonl (0);
+  em->iv_seed = ph->iv_seed;
   esize = used - ENCRYPTED_HEADER_SIZE;
-  GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
+  GNUNET_CRYPTO_hash (&ph->sequence_number,
+                     esize - sizeof (GNUNET_HashCode), 
+                     &ph->plaintext_hash);
+  GNUNET_CRYPTO_hash (&ph->iv_seed, sizeof (uint32_t), &iv);
   /* encrypt */
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
+              "Encrypting %u bytes of plaintext messages for `%4s' for transmission in %llums.\n",
              esize,
-             GNUNET_i2s(&n->peer));
+             GNUNET_i2s(&n->peer),
+             (unsigned long long) GNUNET_TIME_absolute_get_remaining (deadline).value);
 #endif
   GNUNET_assert (GNUNET_OK ==
                  do_encrypt (n,
-                             &em->plaintext_hash,
-                             &ph->sequence_number,
-                             &em->sequence_number, esize));
+                             &iv,
+                             &ph->plaintext_hash,
+                             &em->plaintext_hash, esize));
   /* append to transmission list */
-  if (n->encrypted_tail == NULL)
-    n->encrypted_head = me;
-  else
-    n->encrypted_tail->next = me;
-  n->encrypted_tail = me;
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   process_encrypted_neighbour_queue (n);
 }
 
 
 /**
- * Handle CORE_SEND request.
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ * 
+ * @param cls neighbour for the quota update
+ * @param tc context
  */
 static void
-handle_client_send (void *cls,
-                    struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *message);
+neighbour_quota_update (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
- * Function called to notify us that we either succeeded
- * or failed to connect (at the transport level) to another
- * peer.  We should either free the message we were asked
- * to transmit or re-try adding it to the queue.
+ * Schedule the task that will recalculate the bandwidth
+ * quota for this peer (and possibly force a disconnect of
+ * idle peers by calculating a bandwidth of zero).
+ */
+static void
+schedule_quota_update (struct Neighbour *n)
+{
+  GNUNET_assert (n->quota_update_task ==
+                GNUNET_SCHEDULER_NO_TASK);
+  n->quota_update_task
+    = GNUNET_SCHEDULER_add_delayed (sched,
+                                   QUOTA_UPDATE_FREQUENCY,
+                                   &neighbour_quota_update,
+                                   n);
+}
+
+
+/**
+ * Initialize a new 'struct Neighbour'.
  *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param pid ID of the new neighbour
+ * @return handle for the new neighbour
  */
-static size_t
-send_connect_continuation (void *cls, size_t size, void *buf)
+static struct Neighbour *
+create_neighbour (const struct GNUNET_PeerIdentity *pid)
 {
-  struct SendMessage *sm = cls;
+  struct Neighbour *n;
+  struct GNUNET_TIME_Absolute now;
 
-  if (buf == NULL)
-    {
-#if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
-                  GNUNET_i2s (&sm->peer));
-#endif
-      GNUNET_free (sm);
-      return 0;
-    }
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Connection to peer `%4s' succeeded, retrying original transmission request\n",
-              GNUNET_i2s (&sm->peer));
-#endif
-  handle_client_send (NULL, NULL, &sm->header);
-  GNUNET_free (sm);
-  return 0;
+  n = GNUNET_malloc (sizeof (struct Neighbour));
+  n->next = neighbours;
+  neighbours = n;
+  neighbour_count++;
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
+  n->peer = *pid;
+  GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
+  now = GNUNET_TIME_absolute_get ();
+  n->encrypt_key_created = now;
+  n->last_activity = now;
+  n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
+  n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+  n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+  n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init ((uint32_t) - 1);
+  n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
+  n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                (uint32_t) - 1);
+  neighbour_quota_update (n, NULL);
+  return n;
 }
 
 
 /**
  * Handle CORE_SEND request.
+ *
+ * @param cls unused
+ * @param client the client issuing the request
+ * @param message the "struct SendMessage"
  */
 static void
 handle_client_send (void *cls,
@@ -1746,8 +2093,6 @@ handle_client_send (void *cls,
                     const struct GNUNET_MessageHeader *message)
 {
   const struct SendMessage *sm;
-  struct SendMessage *smc;
-  const struct GNUNET_MessageHeader *mh;
   struct Neighbour *n;
   struct MessageEntry *prev;
   struct MessageEntry *pos;
@@ -1769,51 +2114,9 @@ handle_client_send (void *cls,
     }
   sm = (const struct SendMessage *) message;
   msize -= sizeof (struct SendMessage);
-  mh = (const struct GNUNET_MessageHeader *) &sm[1];
-  if (msize != ntohs (mh->size))
-    {
-      GNUNET_break (0);
-      if (client != NULL)
-        GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-      return;
-    }
   n = find_neighbour (&sm->peer);
   if (n == NULL)
-    {
-#if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
-                 "SEND",
-                  GNUNET_i2s (&sm->peer),
-                 GNUNET_TIME_absolute_get_remaining
-                 (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
-#endif
-      msize += sizeof (struct SendMessage);
-      /* ask transport to connect to the peer */
-      smc = GNUNET_malloc (msize);
-      memcpy (smc, sm, msize);
-      if (NULL ==
-         GNUNET_TRANSPORT_notify_transmit_ready (transport,
-                                                 &sm->peer,
-                                                 0,
-                                                 GNUNET_TIME_absolute_get_remaining
-                                                 (GNUNET_TIME_absolute_ntoh
-                                                  (sm->deadline)),
-                                                 &send_connect_continuation,
-                                                 smc))
-       {
-         /* transport has already a request pending for this peer! */
-#if DEBUG_CORE
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     "Dropped second message destined for `%4s' since connection is still down.\n",
-                     GNUNET_i2s(&sm->peer));
-#endif
-         GNUNET_free (smc);
-       }
-      if (client != NULL)
-        GNUNET_SERVER_receive_done (client, GNUNET_OK);
-      return;
-    }
+    n = create_neighbour (&sm->peer);
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
@@ -1824,6 +2127,8 @@ handle_client_send (void *cls,
   /* bound queue size */
   discard_expired_messages (n);
   min_prio = (unsigned int) -1;
+  min_prio_entry = NULL;
+  min_prio_prev = NULL;
   queue_size = 0;
   prev = NULL;
   pos = n->messages;
@@ -1847,7 +2152,11 @@ handle_client_send (void *cls,
          /* discard new entry */
 #if DEBUG_CORE
          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Queue full, discarding new request\n");
+                     "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
+                     queue_size,
+                     MAX_PEER_QUEUE_SIZE,
+                     msize,
+                     ntohs (message->type));
 #endif
          if (client != NULL)
            GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1864,12 +2173,18 @@ handle_client_send (void *cls,
        min_prio_prev->next = min_prio_entry->next;      
       GNUNET_free (min_prio_entry);    
     }
-  
+
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Adding transmission request for `%4s' of size %u to queue\n",
+             GNUNET_i2s (&sm->peer),
+             msize);
+#endif  
   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
   e->priority = ntohl (sm->priority);
   e->size = msize;
-  memcpy (&e[1], mh, msize);
+  memcpy (&e[1], &sm[1], msize);
 
   /* insert, keep list sorted by deadline */
   prev = NULL;
@@ -1892,6 +2207,81 @@ handle_client_send (void *cls,
 }
 
 
+/**
+ * Function called when the transport service is ready to
+ * receive a message.  Only resets 'n->th' to NULL.
+ *
+ * @param cls neighbour to use message from
+ * @param size number of bytes we can transmit
+ * @param buf where to copy the message
+ * @return number of bytes transmitted
+ */
+static size_t
+notify_transport_connect_done (void *cls, size_t size, void *buf)
+{
+  struct Neighbour *n = cls;
+
+  n->th = NULL;
+  if (buf == NULL)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 _("Failed to connect to `%4s': transport failed to connect\n"),
+                 GNUNET_i2s (&n->peer));
+      return 0;
+    }
+  send_key (n);
+  return 0;
+}
+
+
+/**
+ * Handle CORE_REQUEST_CONNECT request.
+ *
+ * @param cls unused
+ * @param client the client issuing the request
+ * @param message the "struct ConnectMessage"
+ */
+static void
+handle_client_request_connect (void *cls,
+                              struct GNUNET_SERVER_Client *client,
+                              const struct GNUNET_MessageHeader *message)
+{
+  const struct ConnectMessage *cm = (const struct ConnectMessage*) message;
+  struct Neighbour *n;
+  struct GNUNET_TIME_Relative timeout;
+
+  if (0 == memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_break (0);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  n = find_neighbour (&cm->peer);
+  if (n == NULL)
+    n = create_neighbour (&cm->peer);
+  if ( (n->is_connected) ||
+       (n->th != NULL) )
+    return; /* already connected, or at least trying */
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# connection requests received"), 1, GNUNET_NO);
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Core received `%s' request for `%4s', will try to establish connection\n",
+             "REQUEST_CONNECT",
+             GNUNET_i2s (&cm->peer));
+#endif
+  timeout = GNUNET_TIME_relative_ntoh (cm->timeout);
+  /* ask transport to connect to the peer */
+  n->th = GNUNET_TRANSPORT_notify_transmit_ready (transport,
+                                                 &cm->peer,
+                                                 sizeof (struct GNUNET_MessageHeader), 0,
+                                                 timeout,
+                                                 &notify_transport_connect_done,
+                                                 n);
+  GNUNET_break (NULL != n->th);
+}
+
+
 /**
  * List of handlers for the messages understood by this
  * service.
@@ -1899,22 +2289,24 @@ handle_client_send (void *cls,
 static struct GNUNET_SERVER_MessageHandler handlers[] = {
   {&handle_client_init, NULL,
    GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
-  {&handle_client_request_configure, NULL,
-   GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONFIGURE,
-   sizeof (struct RequestConfigureMessage)},
+  {&handle_client_request_info, NULL,
+   GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
+   sizeof (struct RequestInfoMessage)},
   {&handle_client_send, NULL,
    GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
+  {&handle_client_request_connect, NULL,
+   GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
+   sizeof (struct ConnectMessage)},
   {NULL, NULL, 0, 0}
 };
 
 
 /**
- * PEERINFO is giving us a HELLO for a peer.  Add the
- * public key to the neighbour's struct and retry
- * send_key.  Or, if we did not get a HELLO, just do
- * nothing.
+ * PEERINFO is giving us a HELLO for a peer.  Add the public key to
+ * the neighbour's struct and retry send_key.  Or, if we did not get a
+ * HELLO, just do nothing.
  *
- * @param cls NULL
+ * @param cls the 'struct Neighbour' to retry sending the key for
  * @param peer the peer for which this is the HELLO
  * @param hello HELLO message of that peer
  * @param trust amount of trust we currently have in that peer
@@ -1925,15 +2317,53 @@ process_hello_retry_send_key (void *cls,
                               const struct GNUNET_HELLO_Message *hello,
                               uint32_t trust)
 {
-  struct Neighbour *n;
+  struct Neighbour *n = cls;
 
   if (peer == NULL)
-    return;
-  n = find_neighbour (peer);
-  if (n == NULL)
-    return;
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Entered `process_hello_retry_send_key' and `peer' is NULL!\n");
+#endif
+      n->pitr = NULL;
+      if (n->public_key != NULL)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# SETKEY messages deferred (need public key)"), 
+                                   -1, 
+                                   GNUNET_NO);
+         send_key (n);
+       }
+      else
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# Delayed connecting due to lack of public key"),
+                                   1,
+                                   GNUNET_NO);      
+         if (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task)
+           n->retry_set_key_task
+             = GNUNET_SCHEDULER_add_delayed (sched,
+                                             n->set_key_retry_frequency,
+                                             &set_key_retry_task, n);
+       }
+      return;
+    }
+
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Entered `process_hello_retry_send_key' for peer `%4s'\n",
+              GNUNET_i2s (peer));
+#endif
   if (n->public_key != NULL)
-    return;
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "already have public key for peer %s!! (so why are we here?)\n",
+              GNUNET_i2s (peer));
+#endif
+      return;
+    }
+
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received new `%s' message for `%4s', initiating key exchange.\n",
@@ -1944,28 +2374,18 @@ process_hello_retry_send_key (void *cls,
     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
     {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# Error extracting public key from HELLO"),
+                               1,
+                               GNUNET_NO);      
       GNUNET_free (n->public_key);
       n->public_key = NULL;
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "GNUNET_HELLO_get_key returned awfully\n");
+#endif
       return;
     }
-  send_key (n);
-}
-
-
-/**
- * Task that will retry "send_key" if our previous attempt failed
- * to yield a PONG.
- */
-static void
-set_key_retry_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Neighbour *n = cls;
-
-  GNUNET_assert (n->status != PEER_STATE_KEY_CONFIRMED);
-  n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
-  n->set_key_retry_frequency =
-    GNUNET_TIME_relative_multiply (n->set_key_retry_frequency, 2);
-  send_key (n);
 }
 
 
@@ -1982,6 +2402,17 @@ send_key (struct Neighbour *n)
   struct PingMessage pp;
   struct PingMessage *pm;
 
+  if ( (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK) ||
+       (n->pitr != NULL) )
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Key exchange in progress with `%4s'.\n",
+                  GNUNET_i2s (&n->peer));
+#endif
+      return; /* already in progress */
+    }
+
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asked to perform key exchange with `%4s'.\n",
@@ -1992,15 +2423,16 @@ send_key (struct Neighbour *n)
       /* lookup n's public key, then try again */
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Lacking public key for `%4s', trying to obtain one.\n",
+                  "Lacking public key for `%4s', trying to obtain one (send_key).\n",
                   GNUNET_i2s (&n->peer));
 #endif
-      GNUNET_PEERINFO_for_all (cfg,
-                               sched,
-                               &n->peer,
-                               0,
-                               GNUNET_TIME_UNIT_MINUTES,
-                               &process_hello_retry_send_key, NULL);
+      GNUNET_assert (n->pitr == NULL);
+      n->pitr = GNUNET_PEERINFO_iterate (cfg,
+                                        sched,
+                                        &n->peer,
+                                        0,
+                                        GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 20),
+                                        &process_hello_retry_send_key, n);
       return;
     }
   /* first, set key message */
@@ -2009,11 +2441,10 @@ send_key (struct Neighbour *n)
   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_SET_KEY_DELAY);
   me->priority = SET_KEY_PRIORITY;
   me->size = sizeof (struct SetKeyMessage);
-  if (n->encrypted_head == NULL)
-    n->encrypted_head = me;
-  else
-    n->encrypted_tail->next = me;
-  n->encrypted_tail = me;
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   sm = (struct SetKeyMessage *) &me[1];
   sm->header.size = htons (sizeof (struct SetKeyMessage));
   sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SET_KEY);
@@ -2043,8 +2474,10 @@ send_key (struct Neighbour *n)
   me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
   me->priority = PING_PRIORITY;
   me->size = sizeof (struct PingMessage);
-  n->encrypted_tail->next = me;
-  n->encrypted_tail = me;
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
   pm = (struct PingMessage *) &me[1];
   pm->header.size = htons (sizeof (struct PingMessage));
   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
@@ -2076,22 +2509,25 @@ send_key (struct Neighbour *n)
     case PEER_STATE_KEY_RECEIVED:
       break;
     case PEER_STATE_KEY_CONFIRMED:
-      GNUNET_break (0);
       break;
     default:
       GNUNET_break (0);
       break;
     }
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Have %llu ms left for `%s' transmission.\n",
+             (unsigned long long) GNUNET_TIME_absolute_get_remaining (me->deadline).value,
+             "SET_KEY");
+#endif
   /* trigger queue processing */
   process_encrypted_neighbour_queue (n);
-  if (n->status != PEER_STATE_KEY_CONFIRMED)
+  if ( (n->status != PEER_STATE_KEY_CONFIRMED) &&
+       (GNUNET_SCHEDULER_NO_TASK == n->retry_set_key_task) )
     n->retry_set_key_task
       = GNUNET_SCHEDULER_add_delayed (sched,
-                                      GNUNET_NO,
-                                      GNUNET_SCHEDULER_PRIORITY_KEEP,
-                                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
-                                      n->set_key_retry_frequency,
-                                      &set_key_retry_task, n);
+                                     n->set_key_retry_frequency,
+                                     &set_key_retry_task, n);    
 }
 
 
@@ -2122,18 +2558,14 @@ process_hello_retry_handle_set_key (void *cls,
                                     const struct GNUNET_HELLO_Message *hello,
                                     uint32_t trust)
 {
-  struct SetKeyMessage *sm = cls;
-  struct Neighbour *n;
+  struct Neighbour *n = cls;
+  struct SetKeyMessage *sm = n->skm;
 
   if (peer == NULL)
     {
       GNUNET_free (sm);
-      return;
-    }
-  n = find_neighbour (peer);
-  if (n == NULL)
-    {
-      GNUNET_break (0);
+      n->skm = NULL;
+      n->pitr = NULL;
       return;
     }
   if (n->public_key != NULL)
@@ -2167,7 +2599,8 @@ static void
 handle_ping (struct Neighbour *n, const struct PingMessage *m)
 {
   struct PingMessage t;
-  struct PingMessage *tp;
+  struct PongMessage tx;
+  struct PongMessage *tp;
   struct MessageEntry *me;
 
 #if DEBUG_CORE
@@ -2193,40 +2626,156 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
               "Target of `%s' request is `%4s'.\n",
               "PING", GNUNET_i2s (&t.target));
 #endif
-  if (0 != memcmp (&t.target,
-                   &my_identity, sizeof (struct GNUNET_PeerIdentity)))
-    {
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# ping messages decrypted"), 1, GNUNET_NO);
+  if (0 != memcmp (&t.target,
+                   &my_identity, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+  me = GNUNET_malloc (sizeof (struct MessageEntry) +
+                      sizeof (struct PongMessage));
+  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head,
+                                    n->encrypted_tail,
+                                    n->encrypted_tail,
+                                    me);
+  me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
+  me->priority = PONG_PRIORITY;
+  me->size = sizeof (struct PongMessage);
+  tx.reserved = htonl (0);
+  tx.inbound_bw_limit = n->bw_in;
+  tx.challenge = t.challenge;
+  tx.target = t.target;
+  tp = (struct PongMessage *) &me[1];
+  tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
+  tp->header.size = htons (sizeof (struct PongMessage));
+  do_encrypt (n,
+              &my_identity.hashPubKey,
+              &tx.challenge,
+              &tp->challenge,
+              sizeof (struct PongMessage) -
+              sizeof (struct GNUNET_MessageHeader));
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Encrypting `%s' with challenge %u using key %u\n", "PONG",
+              ntohl (t.challenge), n->encrypt_key.crc32);
+#endif
+  /* trigger queue processing */
+  process_encrypted_neighbour_queue (n);
+}
+
+
+/**
+ * We received a PONG message.  Validate and update our status.
+ *
+ * @param n sender of the PONG
+ * @param m the encrypted PONG message itself
+ */
+static void
+handle_pong (struct Neighbour *n, 
+            const struct PongMessage *m)
+{
+  struct PongMessage t;
+  struct ConnectNotifyMessage cnm;
+
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Core service receives `%s' request from `%4s'.\n",
+              "PONG", GNUNET_i2s (&n->peer));
+#endif
+  if (GNUNET_OK !=
+      do_decrypt (n,
+                  &n->peer.hashPubKey,
+                  &m->challenge,
+                  &t.challenge,
+                  sizeof (struct PongMessage) -
+                  sizeof (struct GNUNET_MessageHeader)))
+    return;
+  if (0 != ntohl (t.reserved))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
+              "PONG",
+              GNUNET_i2s (&t.target),
+              ntohl (t.challenge), n->decrypt_key.crc32);
+#endif
+  if ((0 != memcmp (&t.target,
+                    &n->peer,
+                    sizeof (struct GNUNET_PeerIdentity))) ||
+      (n->ping_challenge != ntohl (t.challenge)))
+    {
+      /* PONG malformed */
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Received malformed `%s' wanted sender `%4s' with challenge %u\n",
+                  "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Received malformed `%s' received from `%4s' with challenge %u\n",
+                  "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
+#endif
       GNUNET_break_op (0);
       return;
     }
-  me = GNUNET_malloc (sizeof (struct MessageEntry) +
-                      sizeof (struct PingMessage));
-  if (n->encrypted_tail != NULL)
-    n->encrypted_tail->next = me;
-  else
+  switch (n->status)
     {
-      n->encrypted_tail = me;
-      n->encrypted_head = me;
-    }
-  me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PONG_DELAY);
-  me->priority = PONG_PRIORITY;
-  me->size = sizeof (struct PingMessage);
-  tp = (struct PingMessage *) &me[1];
-  tp->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
-  tp->header.size = htons (sizeof (struct PingMessage));
-  do_encrypt (n,
-              &my_identity.hashPubKey,
-              &t.challenge,
-              &tp->challenge,
-              sizeof (struct PingMessage) -
-              sizeof (struct GNUNET_MessageHeader));
+    case PEER_STATE_DOWN:
+      GNUNET_break (0);         /* should be impossible */
+      return;
+    case PEER_STATE_KEY_SENT:
+      GNUNET_break (0);         /* should be impossible, how did we decrypt? */
+      return;
+    case PEER_STATE_KEY_RECEIVED:
+      n->status = PEER_STATE_KEY_CONFIRMED;
+      if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
+       {
+         n->bw_out_external_limit = t.inbound_bw_limit;
+         n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+                                                 n->bw_out_internal_limit);
+         GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+                                                n->bw_out);       
+         GNUNET_TRANSPORT_set_quota (transport,
+                                     &n->peer,
+                                     n->bw_in,
+                                     n->bw_out,
+                                     GNUNET_TIME_UNIT_FOREVER_REL,
+                                     NULL, NULL); 
+       }
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Encrypting `%s' with challenge %u using key %u\n", "PONG",
-              ntohl (t.challenge), n->encrypt_key.crc32);
-#endif
-  /* trigger queue processing */
-  process_encrypted_neighbour_queue (n);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Confirmed key via `%s' message for peer `%4s'\n",
+                  "PONG", GNUNET_i2s (&n->peer));
+#endif      
+      if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_TASK)
+        {
+          GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+          n->retry_set_key_task = GNUNET_SCHEDULER_NO_TASK;
+        }      
+      cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+      cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+      cnm.distance = htonl (n->last_distance);
+      cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
+      cnm.peer = n->peer;
+      send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_CONNECT);
+      process_encrypted_neighbour_queue (n);
+      /* fall-through! */
+    case PEER_STATE_KEY_CONFIRMED:
+      n->last_activity = GNUNET_TIME_absolute_get ();
+      if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+      n->keep_alive_task 
+       = GNUNET_SCHEDULER_add_delayed (sched, 
+                                       GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+                                       &send_keep_alive,
+                                       n);
+      break;
+    default:
+      GNUNET_break (0);
+      break;
+    }
 }
 
 
@@ -2244,6 +2793,7 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
   struct GNUNET_TIME_Absolute t;
   struct GNUNET_CRYPTO_AesSessionKey k;
   struct PingMessage *ping;
+  struct PongMessage *pong;
   enum PeerStateMachine sender_status;
 
 #if DEBUG_CORE
@@ -2253,19 +2803,41 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
 #endif
   if (n->public_key == NULL)
     {
+      if (n->pitr != NULL)
+       {
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Ignoring `%s' message due to lack of public key for peer (still trying to obtain one).\n",
+                     "SET_KEY");
+#endif
+         return;
+       }
 #if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Lacking public key for peer, trying to obtain one.\n");
+                  "Lacking public key for peer, trying to obtain one (handle_set_key).\n");
 #endif
       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
       /* lookup n's public key, then try again */
-      GNUNET_PEERINFO_for_all (cfg,
-                               sched,
-                               &n->peer,
-                               0,
-                               GNUNET_TIME_UNIT_MINUTES,
-                               &process_hello_retry_handle_set_key, m_cpy);
+      GNUNET_assert (n->skm == NULL);
+      n->skm = m_cpy;
+      n->pitr = GNUNET_PEERINFO_iterate (cfg,
+                                        sched,
+                                        &n->peer,
+                                        0,
+                                        GNUNET_TIME_UNIT_MINUTES,
+                                        &process_hello_retry_handle_set_key, n);
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# SETKEY messages deferred (need public key)"), 1, GNUNET_NO);
+      return;
+    }
+  if (0 != memcmp (&m->target,
+                  &my_identity,
+                  sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 _("Received `%s' message that was for `%s', not for me.  Ignoring.\n"),
+                 "SET_KEY",
+                 GNUNET_i2s (&m->target));
       return;
     }
   if ((ntohl (m->purpose.size) !=
@@ -2293,8 +2865,9 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
       return;
     }
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
-#endif
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
+             "Decrypting key material.\n");
+#endif  
   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
                                   &m->encrypted_key,
                                   &k,
@@ -2343,7 +2916,7 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
     case PEER_STATE_KEY_CONFIRMED:
       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
           (sender_status != PEER_STATE_KEY_CONFIRMED))
-        {
+        {        
 #if DEBUG_CORE
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
@@ -2363,81 +2936,12 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
       handle_ping (n, ping);
       GNUNET_free (ping);
     }
-}
-
-
-/**
- * We received a PONG message.  Validate and update
- * our status.
- *
- * @param n sender of the PONG
- * @param m the encrypted PONG message itself
- */
-static void
-handle_pong (struct Neighbour *n, const struct PingMessage *m)
-{
-  struct PingMessage t;
-
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core service receives `%s' request from `%4s'.\n",
-              "PONG", GNUNET_i2s (&n->peer));
-#endif
-  if (GNUNET_OK !=
-      do_decrypt (n,
-                  &n->peer.hashPubKey,
-                  &m->challenge,
-                  &t.challenge,
-                  sizeof (struct PingMessage) -
-                  sizeof (struct GNUNET_MessageHeader)))
-    return;
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
-              "PONG",
-              GNUNET_i2s (&t.target),
-              ntohl (t.challenge), n->decrypt_key.crc32);
-#endif
-  if ((0 != memcmp (&t.target,
-                    &n->peer,
-                    sizeof (struct GNUNET_PeerIdentity))) ||
-      (n->ping_challenge != ntohl (t.challenge)))
-    {
-      /* PONG malformed */
-#if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
-                  "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received malfromed `%s' received from `%4s' with challenge %u\n",
-                  "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
-#endif
-      GNUNET_break_op (0);
-      return;
-    }
-  switch (n->status)
+  if (n->pending_pong != NULL)
     {
-    case PEER_STATE_DOWN:
-      GNUNET_break (0);         /* should be impossible */
-      return;
-    case PEER_STATE_KEY_SENT:
-      GNUNET_break (0);         /* should be impossible, how did we decrypt? */
-      return;
-    case PEER_STATE_KEY_RECEIVED:
-      n->status = PEER_STATE_KEY_CONFIRMED;
-      if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
-        {
-          GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
-          n->retry_set_key_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
-        }
-      process_encrypted_neighbour_queue (n);
-      break;
-    case PEER_STATE_KEY_CONFIRMED:
-      /* duplicate PONG? */
-      break;
-    default:
-      GNUNET_break (0);
-      break;
+      pong = n->pending_pong;
+      n->pending_pong = NULL;
+      handle_pong (n, pong);
+      GNUNET_free (pong);
     }
 }
 
@@ -2467,7 +2971,8 @@ send_p2p_message_to_client (struct Neighbour *sender,
   ntm = (struct NotifyTrafficMessage *) buf;
   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
-  ntm->reserved = htonl (0);
+  ntm->distance = htonl (sender->last_distance);
+  ntm->latency = GNUNET_TIME_relative_hton (sender->last_latency);
   ntm->peer = sender->peer;
   memcpy (&ntm[1], m, msize);
   send_to_client (client, &ntm->header, GNUNET_YES);
@@ -2489,13 +2994,21 @@ deliver_message (struct Neighbour *sender,
   uint16_t type;
   unsigned int tpos;
   int deliver_full;
+  int dropped;
 
   type = ntohs (m->type);
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received encapsulated message of type %u from `%4s'\n",
+             type,
+             GNUNET_i2s (&sender->peer));
+#endif
+  dropped = GNUNET_YES;
   cpos = clients;
   while (cpos != NULL)
     {
       deliver_full = GNUNET_NO;
-      if (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)
+      if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
         deliver_full = GNUNET_YES;
       else
         {
@@ -2508,12 +3021,27 @@ deliver_message (struct Neighbour *sender,
             }
         }
       if (GNUNET_YES == deliver_full)
-        send_p2p_message_to_client (sender, cpos, m, msize);
+       {
+         send_p2p_message_to_client (sender, cpos, m, msize);
+         dropped = GNUNET_NO;
+       }
       else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
-        send_p2p_message_to_client (sender, cpos, m,
-                                    sizeof (struct GNUNET_MessageHeader));
+       {
+         send_p2p_message_to_client (sender, cpos, m,
+                                     sizeof (struct GNUNET_MessageHeader));
+       }
       cpos = cpos->next;
     }
+  if (dropped == GNUNET_YES)
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Message of type %u from `%4s' not delivered to any client.\n",
+                 type,
+                 GNUNET_i2s (&sender->peer));
+#endif
+      /* FIXME: stats... */
+    }
 }
 
 
@@ -2604,25 +3132,30 @@ handle_encrypted_message (struct Neighbour *n,
   size_t off;
   uint32_t snum;
   struct GNUNET_TIME_Absolute t;
+  GNUNET_HashCode iv;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
-#endif
+#endif  
+  GNUNET_CRYPTO_hash (&m->iv_seed, sizeof (uint32_t), &iv);
   /* decrypt */
   if (GNUNET_OK !=
       do_decrypt (n,
+                  &iv,
                   &m->plaintext_hash,
-                  &m->sequence_number,
-                  &buf[ENCRYPTED_HEADER_SIZE], size - ENCRYPTED_HEADER_SIZE))
+                  &buf[ENCRYPTED_HEADER_SIZE], 
+                 size - ENCRYPTED_HEADER_SIZE))
     return;
   pt = (struct EncryptedMessage *) buf;
 
   /* validate hash */
   GNUNET_CRYPTO_hash (&pt->sequence_number,
-                      size - ENCRYPTED_HEADER_SIZE, &ph);
-  if (0 != memcmp (&ph, &m->plaintext_hash, sizeof (GNUNET_HashCode)))
+                      size - ENCRYPTED_HEADER_SIZE - sizeof (GNUNET_HashCode), &ph);
+  if (0 != memcmp (&ph, 
+                  &pt->plaintext_hash, 
+                  sizeof (GNUNET_HashCode)))
     {
       /* checksum failed */
       GNUNET_break_op (0);
@@ -2677,10 +3210,34 @@ handle_encrypted_message (struct Neighbour *n,
     }
 
   /* process decrypted message(s) */
-  n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
-  n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
-                           n->bpm_out_internal_limit);
+  if (n->bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
+    {
+#if DEBUG_CORE_SET_QUOTA
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Received %u b/s as new inbound limit for peer `%4s'\n",
+                 (unsigned int) ntohl (pt->inbound_bw_limit.value__),
+                 GNUNET_i2s (&n->peer));
+#endif
+      n->bw_out_external_limit = pt->inbound_bw_limit;
+      n->bw_out = GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+                                             n->bw_out_internal_limit);
+      GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+                                            n->bw_out);
+      GNUNET_TRANSPORT_set_quota (transport,
+                                 &n->peer,
+                                 n->bw_in,
+                                 n->bw_out,
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 NULL, NULL); 
+    }
   n->last_activity = GNUNET_TIME_absolute_get ();
+  if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+  n->keep_alive_task 
+    = GNUNET_SCHEDULER_add_delayed (sched, 
+                                   GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+                                   &send_keep_alive,
+                                   n);
   off = sizeof (struct EncryptedMessage);
   deliver_messages (n, buf, size, off);
 }
@@ -2690,16 +3247,18 @@ handle_encrypted_message (struct Neighbour *n,
  * Function called by the transport for each received message.
  *
  * @param cls closure
- * @param latency estimated latency for communicating with the
- *             given peer
  * @param peer (claimed) identity of the other peer
  * @param message the message
+ * @param latency estimated latency for communicating with the
+ *             given peer (round-trip)
+ * @param distance in overlay hops, as given by transport plugin
  */
 static void
 handle_transport_receive (void *cls,
-                          struct GNUNET_TIME_Relative latency,
                           const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *message)
+                          const struct GNUNET_MessageHeader *message,
+                          struct GNUNET_TIME_Relative latency,
+                         unsigned int distance)
 {
   struct Neighbour *n;
   struct GNUNET_TIME_Absolute now;
@@ -2714,14 +3273,20 @@ handle_transport_receive (void *cls,
 #endif
   n = find_neighbour (peer);
   if (n == NULL)
-    {
-      GNUNET_break (0);
-      return;
-    }
+    n = create_neighbour (peer);
+  if (n == NULL)
+    return;   
   n->last_latency = latency;
-  up = n->status == PEER_STATE_KEY_CONFIRMED;
+  n->last_distance = distance;
+  up = (n->status == PEER_STATE_KEY_CONFIRMED);
   type = ntohs (message->type);
   size = ntohs (message->size);
+#if DEBUG_HANDSHAKE
+  fprintf (stderr,
+          "Received message of type %u from `%4s'\n",
+          type,
+          GNUNET_i2s (peer));
+#endif
   switch (type)
     {
     case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
@@ -2730,6 +3295,7 @@ handle_transport_receive (void *cls,
           GNUNET_break_op (0);
           return;
         }
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# session keys received"), 1, GNUNET_NO);
       handle_set_key (n, (const struct SetKeyMessage *) message);
       break;
     case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
@@ -2743,6 +3309,13 @@ handle_transport_receive (void *cls,
           (n->status != PEER_STATE_KEY_CONFIRMED))
         {
           GNUNET_break_op (0);
+         /* blacklist briefly (?); might help recover (?) */
+         GNUNET_TRANSPORT_blacklist (sched, cfg,
+                                     &n->peer, 
+                                     GNUNET_TIME_UNIT_SECONDS,
+                                     GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                                    5),
+                                     NULL, NULL);
           return;
         }
       handle_encrypted_message (n, (const struct EncryptedMessage *) message);
@@ -2753,6 +3326,7 @@ handle_transport_receive (void *cls,
           GNUNET_break_op (0);
           return;
         }
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# ping messages received"), 1, GNUNET_NO);
       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
           (n->status != PEER_STATE_KEY_CONFIRMED))
         {
@@ -2769,20 +3343,26 @@ handle_transport_receive (void *cls,
       handle_ping (n, (const struct PingMessage *) message);
       break;
     case GNUNET_MESSAGE_TYPE_CORE_PONG:
-      if (size != sizeof (struct PingMessage))
+      if (size != sizeof (struct PongMessage))
         {
           GNUNET_break_op (0);
           return;
         }
-      if ((n->status != PEER_STATE_KEY_SENT) &&
-          (n->status != PEER_STATE_KEY_RECEIVED) &&
-          (n->status != PEER_STATE_KEY_CONFIRMED))
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# pong messages received"), 1, GNUNET_NO);
+      if ( (n->status != PEER_STATE_KEY_RECEIVED) &&
+          (n->status != PEER_STATE_KEY_CONFIRMED) )
         {
-          /* could not decrypt pong, oops! */
-          GNUNET_break_op (0);
+#if DEBUG_CORE
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
+                      "PONG", GNUNET_i2s (&n->peer));
+#endif
+          GNUNET_free_non_null (n->pending_pong);
+          n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
+          memcpy (n->pending_pong, message, sizeof (struct PongMessage));
           return;
         }
-      handle_pong (n, (const struct PingMessage *) message);
+      handle_pong (n, (const struct PongMessage *) message);
       break;
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -2794,11 +3374,99 @@ handle_transport_receive (void *cls,
       now = GNUNET_TIME_absolute_get ();
       n->last_activity = now;
       if (!up)
-        n->time_established = now;
+       {
+         GNUNET_STATISTICS_update (stats, gettext_noop ("# peers connected"), 1, GNUNET_NO);
+         n->time_established = now;
+       }
+      if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched, n->keep_alive_task);
+      n->keep_alive_task 
+       = GNUNET_SCHEDULER_add_delayed (sched, 
+                                       GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2),
+                                       &send_keep_alive,
+                                       n);
     }
 }
 
 
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ * 
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+  struct GNUNET_BANDWIDTH_Value32NBO q_in;
+  double pref_rel;
+  double share;
+  unsigned long long distributable;
+  uint64_t need_per_peer;
+  uint64_t need_per_second;
+  
+  n->quota_update_task = GNUNET_SCHEDULER_NO_TASK;
+  /* calculate relative preference among all neighbours;
+     divides by a bit more to avoid division by zero AND to
+     account for possibility of new neighbours joining any time 
+     AND to convert to double... */
+  if (preference_sum == 0)
+    {
+      pref_rel = 1.0 / (double) neighbour_count;
+    }
+  else
+    {
+      pref_rel = n->current_preference / preference_sum;
+    }
+  need_per_peer = GNUNET_BANDWIDTH_value_get_available_until (MIN_BANDWIDTH_PER_PEER,
+                                                             GNUNET_TIME_UNIT_SECONDS);  
+  need_per_second = need_per_peer * neighbour_count;
+  distributable = 0;
+  if (bandwidth_target_out_bps > need_per_second)
+    distributable = bandwidth_target_out_bps - need_per_second;
+  share = distributable * pref_rel;
+  if (share + need_per_peer > ( (uint32_t)-1))
+    q_in = GNUNET_BANDWIDTH_value_init ((uint32_t) -1);
+  else
+    q_in = GNUNET_BANDWIDTH_value_init (need_per_peer + (uint32_t) share);
+  /* check if we want to disconnect for good due to inactivity */
+  if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
+       (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Forcing disconnect of `%4s' due to inactivity (?).\n",
+                 GNUNET_i2s (&n->peer));
+#endif
+      q_in = GNUNET_BANDWIDTH_value_init (0); /* force disconnect */
+    }
+#if DEBUG_CORE_QUOTA
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Current quota for `%4s' is %u/%llu b/s in (old: %u b/s) / %u out (%u internal)\n",
+             GNUNET_i2s (&n->peer),
+             (unsigned int) ntohl (q_in.value__),
+             bandwidth_target_out_bps,
+             (unsigned int) ntohl (n->bw_in.value__),
+             (unsigned int) ntohl (n->bw_out.value__),
+             (unsigned int) ntohl (n->bw_out_internal_limit.value__));
+#endif
+  if (n->bw_in.value__ != q_in.value__) 
+    {
+      n->bw_in = q_in;
+      GNUNET_TRANSPORT_set_quota (transport,
+                                 &n->peer,
+                                 n->bw_in,
+                                 n->bw_out,
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 NULL, NULL);
+    }
+  schedule_quota_update (n);
+}
+
+
 /**
  * Function called by transport to notify us that
  * a peer connected to us (on the network level).
@@ -2806,83 +3474,58 @@ handle_transport_receive (void *cls,
  * @param cls closure
  * @param peer the peer that connected
  * @param latency current latency of the connection
+ * @param distance in overlay hops, as given by transport plugin
  */
 static void
 handle_transport_notify_connect (void *cls,
                                  const struct GNUNET_PeerIdentity *peer,
-                                 struct GNUNET_TIME_Relative latency)
+                                 struct GNUNET_TIME_Relative latency,
+                                unsigned int distance)
 {
   struct Neighbour *n;
-  struct GNUNET_TIME_Absolute now;
   struct ConnectNotifyMessage cnm;
 
   n = find_neighbour (peer);
   if (n != NULL)
     {
-      /* duplicate connect notification!? */
-      GNUNET_break (0);
-      return;
+      if (n->is_connected)
+       {
+         /* duplicate connect notification!? */
+         GNUNET_break (0);
+         return;
+       }
     }
-  now = GNUNET_TIME_absolute_get ();
-  n = GNUNET_malloc (sizeof (struct Neighbour));
-  n->next = neighbours;
-  neighbours = n;
-  n->peer = *peer;
+  else
+    {
+      n = create_neighbour (peer);
+    }
+  n->is_connected = GNUNET_YES;      
   n->last_latency = latency;
-  GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
-  n->encrypt_key_created = now;
-  n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
-  n->last_asw_update = now;
-  n->last_arw_update = now;
-  n->bpm_in = DEFAULT_BPM_IN_OUT;
-  n->bpm_out = DEFAULT_BPM_IN_OUT;
-  n->bpm_out_internal_limit = (uint32_t) - 1;
-  n->bpm_out_external_limit = DEFAULT_BPM_IN_OUT;
-  n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                                (uint32_t) - 1);
+  n->last_distance = distance;
+  GNUNET_BANDWIDTH_tracker_init (&n->available_send_window,
+                                n->bw_out,
+                                MAX_WINDOW_TIME_S);
+  GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window,
+                                n->bw_in,
+                                MAX_WINDOW_TIME_S);  
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received connection from `%4s'.\n",
               GNUNET_i2s (&n->peer));
 #endif
   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
-  cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-  cnm.bpm_available = htonl (DEFAULT_BPM_IN_OUT);
+  cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_PRE_CONNECT);
+  cnm.distance = htonl (n->last_distance);
+  cnm.latency = GNUNET_TIME_relative_hton (n->last_latency);
   cnm.peer = *peer;
-  cnm.last_activity = GNUNET_TIME_absolute_hton (now);
-  send_to_all_clients (&cnm.header, GNUNET_YES);
-}
-
-
-/**
- * Free the given entry for the neighbour (it has
- * already been removed from the list at this point).
- * @param n neighbour to free
- */
-static void
-free_neighbour (struct Neighbour *n)
-{
-  struct MessageEntry *m;
-
-  while (NULL != (m = n->messages))
-    {
-      n->messages = m->next;
-      GNUNET_free (m);
-    }
-  while (NULL != (m = n->encrypted_head))
-    {
-      n->encrypted_head = m->next;
-      GNUNET_free (m);
-    }
-  if (NULL != n->th)
-    GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
-  if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
-    GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
-  if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
-    GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
-  GNUNET_free_non_null (n->public_key);
-  GNUNET_free_non_null (n->pending_ping);
-  GNUNET_free (n);
+  send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_PRE_CONNECT);
+  GNUNET_TRANSPORT_set_quota (transport,
+                             &n->peer,
+                             n->bw_in,
+                             n->bw_out,
+                             GNUNET_TIME_UNIT_FOREVER_REL,
+                             NULL, NULL);
+  send_key (n); 
 }
 
 
@@ -2897,38 +3540,25 @@ static void
 handle_transport_notify_disconnect (void *cls,
                                     const struct GNUNET_PeerIdentity *peer)
 {
-  struct ConnectNotifyMessage cnm;
+  struct DisconnectNotifyMessage cnm;
   struct Neighbour *n;
-  struct Neighbour *p;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
 #endif
-  p = NULL;
-  n = neighbours;
-  while ((n != NULL) &&
-         (0 != memcmp (&n->peer, peer, sizeof (struct GNUNET_PeerIdentity))))
-    {
-      p = n;
-      n = n->next;
-    }
+  n = find_neighbour (peer);
   if (n == NULL)
     {
       GNUNET_break (0);
       return;
     }
-  if (p == NULL)
-    neighbours = n->next;
-  else
-    p->next = n->next;
-  cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
+  GNUNET_break (n->is_connected);
+  cnm.header.size = htons (sizeof (struct DisconnectNotifyMessage));
   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
-  cnm.bpm_available = htonl (0);
   cnm.peer = *peer;
-  cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
-  send_to_all_clients (&cnm.header, GNUNET_YES);
-  free_neighbour (n);
+  send_to_all_clients (&cnm.header, GNUNET_YES, GNUNET_CORE_OPTION_SEND_DISCONNECT);
+  n->is_connected = GNUNET_NO;
 }
 
 
@@ -2952,10 +3582,19 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   while (NULL != (n = neighbours))
     {
       neighbours = n->next;
+      GNUNET_assert (neighbour_count > 0);
+      neighbour_count--;
       free_neighbour (n);
     }
+  GNUNET_STATISTICS_set (stats, gettext_noop ("# active neighbours"), neighbour_count, GNUNET_NO);
+  GNUNET_SERVER_notification_context_destroy (notifier);
+  notifier = NULL;
   while (NULL != (c = clients))
     handle_client_disconnect (NULL, c->client_handle);
+  if (my_private_key != NULL)
+    GNUNET_CRYPTO_rsa_key_free (my_private_key);
+  if (stats != NULL)
+    GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
 }
 
 
@@ -2970,35 +3609,25 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 static void
 run (void *cls,
      struct GNUNET_SCHEDULER_Handle *s,
-     struct GNUNET_SERVER_Handle *serv, struct GNUNET_CONFIGURATION_Handle *c)
+     struct GNUNET_SERVER_Handle *serv,
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
-#if 0
-  unsigned long long qin;
-  unsigned long long qout;
-  unsigned long long tneigh;
-#endif
   char *keyfile;
 
   sched = s;
-  cfg = c;
+  cfg = c;  
   /* parse configuration */
   if (
-#if 0
-       (GNUNET_OK !=
-        GNUNET_CONFIGURATION_get_value_number (c,
-                                               "CORE",
-                                               "XX",
-                                               &qin)) ||
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
-                                               "YY",
-                                               &qout)) ||
+                                               "TOTAL_QUOTA_IN",
+                                               &bandwidth_target_in_bps)) ||
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
-                                               "ZZ_LIMIT", &tneigh)) ||
-#endif
+                                               "TOTAL_QUOTA_OUT",
+                                               &bandwidth_target_out_bps)) ||
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_filename (c,
                                                  "GNUNETD",
@@ -3024,6 +3653,8 @@ run (void *cls,
                       sizeof (my_public_key), &my_identity.hashPubKey);
   /* setup notification */
   server = serv;
+  notifier = GNUNET_SERVER_notification_context_create (server, 
+                                                       MAX_NOTIFY_QUEUE);
   GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
   /* setup transport connection */
   transport = GNUNET_TRANSPORT_connect (sched,
@@ -3033,10 +3664,8 @@ run (void *cls,
                                         &handle_transport_notify_connect,
                                         &handle_transport_notify_disconnect);
   GNUNET_assert (NULL != transport);
+  stats = GNUNET_STATISTICS_create (sched, "core", cfg);
   GNUNET_SCHEDULER_add_delayed (sched,
-                                GNUNET_YES,
-                                GNUNET_SCHEDULER_PRIORITY_IDLE,
-                                GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &cleaning_task, NULL);
   /* process client requests */
@@ -3046,18 +3675,6 @@ run (void *cls,
 }
 
 
-/**
- * Function called during shutdown.  Clean up our state.
- */
-static void
-cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
-{
-
-
-  if (my_private_key != NULL)
-    GNUNET_CRYPTO_rsa_key_free (my_private_key);
-}
-
 
 /**
  * The main function for the transport service.
@@ -3072,7 +3689,9 @@ main (int argc, char *const *argv)
   return (GNUNET_OK ==
           GNUNET_SERVICE_run (argc,
                               argv,
-                              "core", &run, NULL, &cleanup, NULL)) ? 0 : 1;
+                              "core",
+                             GNUNET_SERVICE_OPTION_NONE,
+                             &run, NULL)) ? 0 : 1;
 }
 
 /* end of gnunet-service-core.c */