removing dead code / unused variables
[oweals/gnunet.git] / src / core / gnunet-service-core.c
index e9e076bcc6e114d4a1def24aaf9297e34f1db2f1..b07c427442c2fe530a4fe216f047542f66d20b91 100644 (file)
  * @brief high-level P2P messaging
  * @author Christian Grothoff
  *
- * TODO:
- * TESTING:
- * - write test for basic core functions:
- *   + connect to peer
- *   + transmit (encrypted) message [with handshake]
- *   + receive (encrypted) message, forward plaintext to clients
  * POST-TESTING:
- * - revisit API (which arguments are used, needed)?
- * - add code to bound queue size when handling client's SEND message
- * - add code to bound message queue size when passing messages to clients
- * - add code to discard_expired_messages
- * - add code to re-transmit key if first attempt failed
- *   + timeout on connect / key exchange, etc.
- *   + timeout for automatic re-try, etc.
- * - add code to give up re-transmission of key if many attempts fail
- * - add code to send PINGs if we are about to time-out otherwise
- * ? add heuristic to do another send_key in "handle_set_key"
- *   in case previous attempt failed / didn't work / persist
- *   (but don't do it always to avoid storm of SET_KEY's going
- *   back and forth!) --- alternatively, add "status" field
- *   of the other peer to the set key message, that way we'd
- *   know for sure!
- * - check that hostkey used by transport (for HELLOs) is the
- *   same as the hostkey that we are using!
- * - free list of clients on exit
  * - topology management:
  *   + bootstrapping (transport offer hello, plugins)
  *   + internal neighbour selection
- *   + update bandwidth usage statistics
- *   + bandwidth allocation (transport set quota)
+ *
+ * Considerations for later:
+ * - check that hostkey used by transport (for HELLOs) is the
+ *   same as the hostkey that we are using!
+ * - add code to send PINGs if we are about to time-out otherwise
  * - optimize lookup (many O(n) list traversals
  *   could ideally be changed to O(1) hash map lookups)
  */
 #include "platform.h"
+#include "gnunet_constants.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_peerinfo_service.h"
  */
 #define MAX_WINDOW_TIME (5 * 60 * 1000)
 
+/**
+ * Minimum of bytes per minute (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
+ * sense.
+ */
+#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
 
 /**
- * Amount of bytes per minute (in/out) to assume initially
- * (before either peer has communicated any particular
- * preference).  Should be rather low.
+ * What is the smallest change (in number of bytes per minute)
+ * that we consider significant enough to bother triggering?
  */
-#define DEFAULT_BPM_IN_OUT 2048
+#define MIN_BPM_CHANGE 32
 
+/**
+ * After how much time past the "official" expiration time do
+ * we discard messages?  Should not be zero since we may 
+ * intentionally defer transmission until close to the deadline
+ * and then may be slightly past the deadline due to inaccuracy
+ * in sleep and our own CPU consumption.
+ */
+#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
 
 /**
  * What is the maximum delay for a SET_KEY message?
  */
 #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
 
-
 /**
  * What how long do we wait for SET_KEY confirmation initially?
  */
 #define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
 
-
 /**
  * What is the maximum delay for a PING message?
  */
 #define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
 
-
 /**
  * What is the maximum delay for a PONG message?
  */
 #define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
 
+/**
+ * How often do we recalculate bandwidth quotas?
+ */
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
 
 /**
  * What is the priority for a SET_KEY message?
  */
 #define SET_KEY_PRIORITY 0xFFFFFF
 
-
 /**
  * What is the priority for a PING message?
  */
 #define PING_PRIORITY 0xFFFFFF
 
-
 /**
  * What is the priority for a PONG message?
  */
 #define PONG_PRIORITY 0xFFFFFF
 
+/**
+ * How many messages do we queue per peer at most?
+ */
+#define MAX_PEER_QUEUE_SIZE 16
+
+/**
+ * How many non-mandatory messages do we queue per client at most?
+ */
+#define MAX_CLIENT_QUEUE_SIZE 32
 
 /**
  * What is the maximum age of a message for us to consider
  */
 #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
 
-
 /**
  * What is the maximum size for encrypted messages?  Note that this
  * number imposes a clear limit on the maximum size of any message.
@@ -167,6 +167,7 @@ enum PeerStateMachine
  */
 #define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
 
+
 /**
  * Encapsulation for encrypted messages exchanged between
  * peers.  Followed by the actual encrypted data.
@@ -390,6 +391,11 @@ struct Neighbour
    */
   GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
 
+  /**
+   * ID of task used for updating bandwidth quota for this neighbour.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
+
   /**
    * At what time did we generate our encryption key?
    */
@@ -418,7 +424,7 @@ struct Neighbour
   struct GNUNET_TIME_Relative last_latency;
 
   /**
-   * At what frequency are we currently re-trying SET KEY messages?
+   * At what frequency are we currently re-trying SET_KEY messages?
    */
   struct GNUNET_TIME_Relative set_key_retry_frequency;
 
@@ -439,7 +445,7 @@ struct Neighbour
    * bandwidth-hogs are sampled at a frequency of about 78s!);
    * may get negative if we have VERY high priority content.
    */
-  long long available_send_window;
+  long long available_send_window; 
 
   /**
    * How much downstream capacity of this peer has been reserved for
@@ -448,12 +454,12 @@ struct Neighbour
    * make sure that this reserved amount of bandwidth is actually
    * available).
    */
-  long long available_recv_window;
+  long long available_recv_window; 
 
   /**
    * How valueable were the messages of this peer recently?
    */
-  double current_preference;
+  unsigned long long current_preference;
 
   /**
    * Bit map indicating which of the 32 sequence numbers before the last
@@ -489,7 +495,7 @@ struct Neighbour
 
   /**
    * Internal bandwidth limit set for this peer (initially
-   * typcially set to "-1").  "bpm_out" is MAX of
+   * typically set to "-1").  "bpm_out" is MAX of
    * "bpm_out_internal_limit" and "bpm_out_external_limit".
    */
   uint32_t bpm_out_internal_limit;
@@ -502,7 +508,7 @@ struct Neighbour
   uint32_t bpm_out_external_limit;
 
   /**
-   * What was our PING challenge number?
+   * What was our PING challenge number (for this peer)?
    */
   uint32_t ping_challenge;
 
@@ -628,34 +634,86 @@ static struct GNUNET_SERVER_Handle *server;
  */
 static struct GNUNET_TRANSPORT_Handle *transport;
 
+/**
+ * Linked list of our clients.
+ */
+static struct Client *clients;
+
 /**
  * We keep neighbours in a linked list (for now).
  */
 static struct Neighbour *neighbours;
 
 /**
- * Linked list of our clients.
+ * Sum of all preferences among all neighbours.
  */
-static struct Client *clients;
+static unsigned long long preference_sum;
+
+/**
+ * Total number of neighbours we have.
+ */
+static unsigned int neighbour_count;
+
+/**
+ * How much inbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_in;
+
+/**
+ * How much outbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_out;
+
+
+
+/**
+ * A preference value for a neighbour was update.  Update
+ * the preference sum accordingly.
+ *
+ * @param inc how much was a preference value increased?
+ */
+static void
+update_preference_sum (unsigned long long inc)
+{
+  struct Neighbour *n;
+  unsigned long long os;
+
+  os = preference_sum;
+  preference_sum += inc;
+  if (preference_sum >= os)
+    return; /* done! */
+  /* overflow! compensate by cutting all values in half! */
+  preference_sum = 0;
+  n = neighbours;
+  while (n != NULL)
+    {
+      n->current_preference /= 2;
+      preference_sum += n->current_preference;
+      n = n->next;
+    }    
+}
 
 
 /**
  * Recalculate the number of bytes we expect to
  * receive or transmit in a given window.
  *
+ * @param force force an update now (even if not much time has passed)
  * @param window pointer to the byte counter (updated)
  * @param ts pointer to the timestamp (updated)
  * @param bpm number of bytes per minute that should
  *        be added to the window.
  */
 static void
-update_window (long long *window,
+update_window (int force,
+              long long *window,
                struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
 {
   struct GNUNET_TIME_Relative since;
 
   since = GNUNET_TIME_absolute_get_duration (*ts);
-  if (since.value < 60 * 1000)
+  if ( (force == GNUNET_NO) &&
+       (since.value < 60 * 1000) )
     return;                     /* not even a minute has passed */
   *ts = GNUNET_TIME_absolute_get ();
   *window += (bpm * since.value) / 60 / 1000;
@@ -714,6 +772,7 @@ static void request_transmit (struct Client *client);
 
 /**
  * Client is ready to receive data, provide it.
+ *
  * @param cls closure
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
@@ -728,12 +787,16 @@ do_client_transmit (void *cls, size_t size, void *buf)
   size_t ret;
 
   client->th = NULL;
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client ready to receive %u bytes.\n", size);
+#endif
   if (buf == NULL)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Failed to transmit data to client (disconnect)?\n");
+#endif
       return 0;                 /* we'll surely get a disconnect soon... */
     }
   tgt = buf;
@@ -749,8 +812,10 @@ do_client_transmit (void *cls, size_t size, void *buf)
   GNUNET_assert (ret > 0);
   if (client->event_head == NULL)
     client->event_tail = NULL;
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transmitting %u bytes to client\n", ret);
+#endif
   request_transmit (client);
   return ret;
 }
@@ -769,9 +834,11 @@ request_transmit (struct Client *client)
     return;                     /* already pending */
   if (NULL == client->event_head)
     return;                     /* no more events pending */
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asking server to transmit %u bytes to client\n",
               client->event_head->size);
+#endif
   client->th
     = GNUNET_SERVER_notify_transmit_ready (client->client_handle,
                                            client->event_head->size,
@@ -792,11 +859,31 @@ send_to_client (struct Client *client,
                 const struct GNUNET_MessageHeader *msg, int can_drop)
 {
   struct Event *e;
+  unsigned int queue_size;
   uint16_t msize;
 
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Preparing to send message of type %u to client.\n",
               ntohs (msg->type));
+#endif
+  queue_size = 0;
+  e = client->event_head;
+  while (e != NULL)
+    {
+      queue_size++;
+      e = e->next;
+    }
+  if ( (queue_size >= MAX_CLIENT_QUEUE_SIZE) &&
+       (can_drop == GNUNET_YES) )
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Too many messages in queue for the client, dropping the new message.\n");
+#endif
+      return;
+    }
+
   msize = ntohs (msg->size);
   e = GNUNET_malloc (sizeof (struct Event) + msize);
   /* append */
@@ -834,7 +921,6 @@ send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop)
  */
 static void
 handle_client_init (void *cls,
-                    struct GNUNET_SERVER_Handle *server,
                     struct GNUNET_SERVER_Client *client,
                     const struct GNUNET_MessageHeader *message)
 {
@@ -846,9 +932,11 @@ handle_client_init (void *cls,
   struct Neighbour *n;
   struct ConnectNotifyMessage cnm;
 
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client connecting to core service with `%s' message\n",
               "INIT");
+#endif
   /* check that we don't have an entry already */
   c = clients;
   while (c != NULL)
@@ -886,8 +974,10 @@ handle_client_init (void *cls,
   memcpy (&irm.publicKey,
           &my_public_key,
           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending `%s' message to client.\n", "INIT_REPLY");
+#endif
   send_to_client (c, &irm.header, GNUNET_NO);
   /* notify new client about existing neighbours */
   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
@@ -895,10 +985,11 @@ handle_client_init (void *cls,
   n = neighbours;
   while (n != NULL)
     {
+#if DEBUG_CORE_CLIENT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Sending `%s' message to client.\n", "NOTIFY_CONNECT");
-      cnm.bpm_available = htonl (n->bpm_out);
-      cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
+#endif
+      cnm.reserved = htonl (0);
       cnm.peer = n->peer;
       send_to_client (c, &cnm.header, GNUNET_NO);
       n = n->next;
@@ -918,9 +1009,12 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
 {
   struct Client *pos;
   struct Client *prev;
+  struct Event *e;
 
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Client has disconnected from core service.\n");
+#endif
   prev = NULL;
   pos = clients;
   while (pos != NULL)
@@ -933,6 +1027,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
             prev->next = pos->next;
           if (pos->th != NULL)
             GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
+         while (NULL != (e = pos->event_head))
+           {
+             pos->event_head = e->next;
+             GNUNET_free (e);
+           }
           GNUNET_free (pos);
           return;
         }
@@ -948,7 +1047,6 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
  */
 static void
 handle_client_request_configure (void *cls,
-                                 struct GNUNET_SERVER_Handle *server,
                                  struct GNUNET_SERVER_Client *client,
                                  const struct GNUNET_MessageHeader *message)
 {
@@ -957,14 +1055,21 @@ handle_client_request_configure (void *cls,
   struct ConfigurationInfoMessage cim;
   struct Client *c;
   int reserv;
+  unsigned long long old_preference;
 
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request.\n", "CONFIGURE");
+#endif
   rcm = (const struct RequestConfigureMessage *) message;
   n = find_neighbour (&rcm->peer);
   memset (&cim, 0, sizeof (cim));
   if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
     {
+      update_window (GNUNET_YES,
+                    &n->available_send_window,
+                    &n->last_asw_update,
+                    n->bpm_out);
       n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
       n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
                                n->bpm_out_external_limit);
@@ -975,15 +1080,21 @@ handle_client_request_configure (void *cls,
         }
       else if (reserv > 0)
         {
-          update_window (&n->available_recv_window,
+          update_window (GNUNET_NO,
+                        &n->available_recv_window,
                          &n->last_arw_update, n->bpm_in);
           if (n->available_recv_window < reserv)
             reserv = n->available_recv_window;
           n->available_recv_window -= reserv;
         }
-      n->current_preference += rcm->preference_change;
-      if (n->current_preference < 0)
-        n->current_preference = 0;
+      old_preference = n->current_preference;
+      n->current_preference += GNUNET_ntohll(rcm->preference_change);
+      if (old_preference > n->current_preference) 
+       {
+         /* overflow; cap at maximum value */
+         n->current_preference = (unsigned long long) -1;
+       }
+      update_preference_sum (n->current_preference - old_preference);
       cim.reserved_amount = htonl (reserv);
       cim.bpm_in = htonl (n->bpm_in);
       cim.bpm_out = htonl (n->bpm_out);
@@ -999,8 +1110,10 @@ handle_client_request_configure (void *cls,
       GNUNET_break (0);
       return;
     }
+#if DEBUG_CORE_CLIENT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending `%s' message to client.\n", "CONFIGURATION_INFO");
+#endif
   send_to_client (c, &cim.header, GNUNET_NO);
 }
 
@@ -1033,9 +1146,6 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
   char *cbuf;
 
   n->th = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transport ready to receive %u bytes for `%4s'\n",
-              size, GNUNET_i2s (&n->peer));
   GNUNET_assert (NULL != (m = n->encrypted_head));
   n->encrypted_head = m->next;
   if (m->next == NULL)
@@ -1047,15 +1157,18 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
       GNUNET_assert (size >= m->size);
       memcpy (cbuf, &m[1], m->size);
       ret = m->size;
+      n->available_send_window -= m->size;
       process_encrypted_neighbour_queue (n);
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Copied message of type %u and size %u into transport buffer for `%4s'\n",
                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                   ret, GNUNET_i2s (&n->peer));
+#endif
     }
   else
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Transmission for message of type %u and size %u failed\n",
                   ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
                   m->size);
@@ -1085,28 +1198,29 @@ static void process_plaintext_neighbour_queue (struct Neighbour *n);
 static void
 process_encrypted_neighbour_queue (struct Neighbour *n)
 {
+  struct MessageEntry *m;
   if (n->th != NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Asked to process encrypted queue, but request already pending.\n");
-      return;                   /* already pending */
-    }
+    return;  /* request already pending */
   if (n->encrypted_head == NULL)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Encrypted queue empty, trying plaintext queue instead.\n");
+      /* encrypted queue empty, try plaintext instead */
       process_plaintext_neighbour_queue (n);
       return;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
               n->encrypted_head->size,
               GNUNET_i2s (&n->peer),
-              GNUNET_TIME_absolute_get_remaining (n->encrypted_head->
-                                                  deadline).value);
+              GNUNET_TIME_absolute_get_remaining (n->
+                                                  encrypted_head->deadline).
+              value);
+#endif
   n->th =
     GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
                                             n->encrypted_head->size,
+                                           n->encrypted_head->priority,
                                             GNUNET_TIME_absolute_get_remaining
                                             (n->encrypted_head->deadline),
                                             &notify_encrypted_transmit_ready,
@@ -1115,7 +1229,13 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
     {
       /* message request too large (oops) */
       GNUNET_break (0);
-      /* FIXME: handle error somehow! */
+      /* discard encrypted message */
+      GNUNET_assert (NULL != (m = n->encrypted_head));
+      n->encrypted_head = m->next;
+      if (m->next == NULL)
+       n->encrypted_tail = NULL;
+      GNUNET_free (m);
+      process_encrypted_neighbour_queue (n);
     }
 }
 
@@ -1159,9 +1279,11 @@ do_decrypt (struct Neighbour *n,
       GNUNET_break (0);
       return GNUNET_SYSERR;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted %u bytes from `%4s' using key %u\n",
               size, GNUNET_i2s (&n->peer), n->decrypt_key.crc32);
+#endif
   return GNUNET_OK;
 }
 
@@ -1194,9 +1316,11 @@ do_encrypt (struct Neighbour *n,
                                             (const struct
                                              GNUNET_CRYPTO_AesInitializationVector
                                              *) iv, out));
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Encrypted %u bytes for `%4s' using key %u\n", size,
               GNUNET_i2s (&n->peer), n->encrypt_key.crc32);
+#endif
   return GNUNET_OK;
 }
 
@@ -1247,10 +1371,14 @@ select_messages (struct Neighbour *n,
       min = NULL;
       min_prio = -1;
       discard_low_prio = GNUNET_NO;
-      /* number of bytes available for transmission at time "t" */
+      /* calculate number of bytes available for transmission at time "t" */
+      update_window (GNUNET_NO,
+                    &n->available_send_window,
+                    &n->last_asw_update,
+                    n->bpm_out);
       avail = n->available_send_window;
       t = n->last_asw_update;
-      /* how many bytes have we (hyptothetically) scheduled so far */
+      /* how many bytes have we (hypothetically) scheduled so far */
       off = 0;
       /* maximum time we can wait before transmitting anything
          and still make all of our deadlines */
@@ -1307,6 +1435,7 @@ select_messages (struct Neighbour *n,
         }
       if (discard_low_prio)
         {
+          GNUNET_assert (min != NULL);
           /* remove lowest-priority entry from consideration */
           min->do_transmit = GNUNET_YES;        /* means: discard (for now) */
         }
@@ -1314,7 +1443,7 @@ select_messages (struct Neighbour *n,
     }
   /* guard against sending "tiny" messages with large headers without
      urgent deadlines */
-  if ((slack > 1000) && (size > 4 * off))
+  if ( (slack > 1000) && (size > 4 * off) )
     {
       /* less than 25% of message would be filled with
          deadlines still being met if we delay by one
@@ -1343,9 +1472,11 @@ select_messages (struct Neighbour *n,
         pos->do_transmit = GNUNET_NO;   /* mark for not transmitting! */
       pos = pos->next;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Selected %u bytes of plaintext messages for transmission to `%4s'.\n",
               off, GNUNET_i2s (&n->peer));
+#endif
   return off;
 }
 
@@ -1381,7 +1512,7 @@ batch_message (struct Neighbour *n,
   *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
   if (0 == select_messages (n, size, retry_time))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "No messages selected, will try again in %llu ms\n",
                   retry_time->value);
       return 0;
@@ -1424,7 +1555,36 @@ batch_message (struct Neighbour *n,
 static void
 discard_expired_messages (struct Neighbour *n)
 {
-  /* FIXME */
+  struct MessageEntry *prev;
+  struct MessageEntry *next;
+  struct MessageEntry *pos;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative delta;
+
+  now = GNUNET_TIME_absolute_get ();
+  prev = NULL;
+  pos = n->messages;
+  while (pos != NULL) 
+    {
+      next = pos->next;
+      delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
+      if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
+       {
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                     "Message is %llu ms past due, discarding.\n",
+                     delta.value);
+#endif
+         if (prev == NULL)
+           n->messages = next;
+         else
+           prev->next = next;
+         GNUNET_free (pos);
+       }
+      else
+       prev = pos;
+      pos = next;
+    }
 }
 
 
@@ -1473,9 +1633,6 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   struct GNUNET_TIME_Absolute deadline;
   struct GNUNET_TIME_Relative retry_time;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Processing plaintext message queue for `%4s', scheduling messages.\n",
-              GNUNET_i2s (&n->peer));
   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
     {
       GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
@@ -1485,36 +1642,51 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
     {
     case PEER_STATE_DOWN:
       send_key (n);
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Not yet connected, deferring processing of plaintext messages.\n");
+                  "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+                 GNUNET_i2s(&n->peer));
+#endif
       return;
     case PEER_STATE_KEY_SENT:
       GNUNET_assert (n->retry_set_key_task !=
                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Not yet connected, deferring processing of plaintext messages.\n");
+                  "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+                 GNUNET_i2s(&n->peer));
+#endif
       return;
     case PEER_STATE_KEY_RECEIVED:
       GNUNET_assert (n->retry_set_key_task !=
                      GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Not yet connected, deferring processing of plaintext messages.\n");
+                  "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+                 GNUNET_i2s(&n->peer));
+#endif
       return;
     case PEER_STATE_KEY_CONFIRMED:
       /* ready to continue */
       break;
     }
+  discard_expired_messages (n);
   if (n->messages == NULL)
     {
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Plaintext message queue is empty.\n");
+                  "Plaintext message queue for `%4s' is empty.\n",
+                 GNUNET_i2s(&n->peer));
+#endif
       return;                   /* no pending messages */
     }
-  discard_expired_messages (n);
   if (n->encrypted_head != NULL)
     {
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Encrypted message queue is still full, delaying plaintext processing.\n");
+                  "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
+                 GNUNET_i2s(&n->peer));
+#endif
       return;                   /* wait for messages already encrypted to be
                                    processed first! */
     }
@@ -1522,15 +1694,17 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
   priority = 0;
   used = sizeof (struct EncryptedMessage);
-
   used += batch_message (n,
                          &pbuf[used],
                          MAX_ENCRYPTED_MESSAGE_SIZE - used,
                          &deadline, &retry_time, &priority);
   if (used == sizeof (struct EncryptedMessage))
     {
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "No messages selected for processing at this time, will try again later.\n");
+                  "No messages selected for transmission to `%4s' at this time, will try again later.\n",
+                 GNUNET_i2s(&n->peer));
+#endif
       /* no messages selected for sending, try again later... */
       n->retry_plaintext_task =
         GNUNET_SCHEDULER_add_delayed (sched,
@@ -1558,8 +1732,12 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
   esize = used - ENCRYPTED_HEADER_SIZE;
   GNUNET_CRYPTO_hash (&ph->sequence_number, esize, &em->plaintext_hash);
   /* encrypt */
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Encrypting plaintext messages for transmission.\n");
+              "Encrypting %u bytes of plaintext messages for `%4s' for transmission.\n",
+             esize,
+             GNUNET_i2s(&n->peer));
+#endif
   GNUNET_assert (GNUNET_OK ==
                  do_encrypt (n,
                              &em->plaintext_hash,
@@ -1580,7 +1758,6 @@ process_plaintext_neighbour_queue (struct Neighbour *n)
  */
 static void
 handle_client_send (void *cls,
-                    struct GNUNET_SERVER_Handle *server,
                     struct GNUNET_SERVER_Client *client,
                     const struct GNUNET_MessageHeader *message);
 
@@ -1603,16 +1780,20 @@ send_connect_continuation (void *cls, size_t size, void *buf)
 
   if (buf == NULL)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Asked to send message to disconnected peer `%4s' and connection failed.  Discarding message.\n",
                   GNUNET_i2s (&sm->peer));
+#endif
       GNUNET_free (sm);
       return 0;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Connection to peer `%4s' succeeded, retrying original send request\n",
+              "Connection to peer `%4s' succeeded, retrying original transmission request\n",
               GNUNET_i2s (&sm->peer));
-  handle_client_send (NULL, NULL, NULL, &sm->header);
+#endif
+  handle_client_send (NULL, NULL, &sm->header);
   GNUNET_free (sm);
   return 0;
 }
@@ -1623,7 +1804,6 @@ send_connect_continuation (void *cls, size_t size, void *buf)
  */
 static void
 handle_client_send (void *cls,
-                    struct GNUNET_SERVER_Handle *server,
                     struct GNUNET_SERVER_Client *client,
                     const struct GNUNET_MessageHeader *message)
 {
@@ -1631,13 +1811,15 @@ handle_client_send (void *cls,
   struct SendMessage *smc;
   const struct GNUNET_MessageHeader *mh;
   struct Neighbour *n;
-  struct MessageEntry *pred;
+  struct MessageEntry *prev;
   struct MessageEntry *pos;
-  struct MessageEntry *e;
+  struct MessageEntry *e; 
+  struct MessageEntry *min_prio_entry;
+  struct MessageEntry *min_prio_prev;
+  unsigned int min_prio;
+  unsigned int queue_size;
   uint16_t msize;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core service receives `%s' request.\n", "SEND");
   msize = ntohs (message->size);
   if (msize <
       sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
@@ -1660,33 +1842,91 @@ handle_client_send (void *cls,
   n = find_neighbour (&sm->peer);
   if (n == NULL)
     {
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Not yet connected to `%4s', will try to establish connection\n",
-                  GNUNET_i2s (&sm->peer));
+                  "Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
+                 "SEND",
+                  GNUNET_i2s (&sm->peer),
+                 GNUNET_TIME_absolute_get_remaining
+                 (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
+#endif
       msize += sizeof (struct SendMessage);
       /* ask transport to connect to the peer */
-      /* FIXME: this code does not handle the
-         case where we get multiple SendMessages before
-         transport responds to this request;
-         => need to track pending requests! */
       smc = GNUNET_malloc (msize);
       memcpy (smc, sm, msize);
-      GNUNET_TRANSPORT_notify_transmit_ready (transport,
-                                              &sm->peer,
-                                              0,
-                                              GNUNET_TIME_absolute_get_remaining
-                                              (GNUNET_TIME_absolute_ntoh
-                                               (sm->deadline)),
-                                              &send_connect_continuation,
-                                              smc);
+      if (NULL ==
+         GNUNET_TRANSPORT_notify_transmit_ready (transport,
+                                                 &sm->peer,
+                                                 0, 0,
+                                                 GNUNET_TIME_absolute_get_remaining
+                                                 (GNUNET_TIME_absolute_ntoh
+                                                  (sm->deadline)),
+                                                 &send_connect_continuation,
+                                                 smc))
+       {
+         /* transport has already a request pending for this peer! */
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     "Dropped second message destined for `%4s' since connection is still down.\n",
+                     GNUNET_i2s(&sm->peer));
+#endif
+         GNUNET_free (smc);
+       }
       if (client != NULL)
         GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core queues %u bytes of plaintext data for transmission to `%4s'.\n",
-              msize, GNUNET_i2s (&sm->peer));
-  /* FIXME: consider bounding queue size */
+              "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
+             "SEND",
+              msize, 
+             GNUNET_i2s (&sm->peer));
+#endif
+  /* bound queue size */
+  discard_expired_messages (n);
+  min_prio = (unsigned int) -1;
+  queue_size = 0;
+  prev = NULL;
+  pos = n->messages;
+  while (pos != NULL) 
+    {
+      if (pos->priority < min_prio)
+       {
+         min_prio_entry = pos;
+         min_prio_prev = prev;
+         min_prio = pos->priority;
+       }
+      queue_size++;
+      prev = pos;
+      pos = pos->next;
+    }
+  if (queue_size >= MAX_PEER_QUEUE_SIZE)
+    {
+      /* queue full */
+      if (ntohl(sm->priority) <= min_prio)
+       {
+         /* discard new entry */
+#if DEBUG_CORE
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Queue full, discarding new request\n");
+#endif
+         if (client != NULL)
+           GNUNET_SERVER_receive_done (client, GNUNET_OK);
+         return;
+       }
+      /* discard "min_prio_entry" */
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Queue full, discarding existing older request\n");
+#endif
+      if (min_prio_prev == NULL)
+       n->messages = min_prio_entry->next;
+      else
+       min_prio_prev->next = min_prio_entry->next;      
+      GNUNET_free (min_prio_entry);    
+    }
+  
   e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
   e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
   e->priority = ntohl (sm->priority);
@@ -1694,17 +1934,17 @@ handle_client_send (void *cls,
   memcpy (&e[1], mh, msize);
 
   /* insert, keep list sorted by deadline */
-  pred = NULL;
+  prev = NULL;
   pos = n->messages;
   while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
     {
-      pred = pos;
+      prev = pos;
       pos = pos->next;
     }
-  if (pred == NULL)
+  if (prev == NULL)
     n->messages = e;
   else
-    pred->next = e;
+    prev->next = e;
   e->next = pos;
 
   /* consider scheduling now */
@@ -1756,9 +1996,12 @@ process_hello_retry_send_key (void *cls,
     return;
   if (n->public_key != NULL)
     return;
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received new HELLO for `%4s', initiating key exchange.\n",
+              "Received new `%s' message for `%4s', initiating key exchange.\n",
+             "HELLO",
               GNUNET_i2s (peer));
+#endif
   n->public_key =
     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
@@ -1801,15 +2044,19 @@ send_key (struct Neighbour *n)
   struct PingMessage pp;
   struct PingMessage *pm;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Asked to perform key exchange with `%4s'.\n",
               GNUNET_i2s (&n->peer));
+#endif
   if (n->public_key == NULL)
     {
       /* lookup n's public key, then try again */
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Lacking public key for `%4s', trying to obtain one.\n",
                   GNUNET_i2s (&n->peer));
+#endif
       GNUNET_PEERINFO_for_all (cfg,
                                sched,
                                &n->peer,
@@ -1865,6 +2112,7 @@ send_key (struct Neighbour *n)
   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
   pp.challenge = htonl (n->ping_challenge);
   pp.target = n->peer;
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Encrypting `%s' and `%s' messages for `%4s'.\n",
               "SET_KEY", "PING", GNUNET_i2s (&n->peer));
@@ -1872,6 +2120,7 @@ send_key (struct Neighbour *n)
               "Sending `%s' to `%4s' with challenge %u encrypted using key %u\n",
               "PING",
               GNUNET_i2s (&n->peer), n->ping_challenge, n->encrypt_key.crc32);
+#endif
   do_encrypt (n,
               &n->peer.hashPubKey,
               &pp.challenge,
@@ -1896,8 +2145,6 @@ send_key (struct Neighbour *n)
       break;
     }
   /* trigger queue processing */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Triggering processing of encrypted message queue.\n");
   process_encrypted_neighbour_queue (n);
   if (n->status != PEER_STATE_KEY_CONFIRMED)
     n->retry_set_key_task
@@ -1945,9 +2192,6 @@ process_hello_retry_handle_set_key (void *cls,
       GNUNET_free (sm);
       return;
     }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Looking for peer `%4s' to handle `%s' message.\n",
-              GNUNET_i2s (peer), "SET_KEY");
   n = find_neighbour (peer);
   if (n == NULL)
     {
@@ -1960,13 +2204,16 @@ process_hello_retry_handle_set_key (void *cls,
     GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
   if (GNUNET_OK != GNUNET_HELLO_get_key (hello, n->public_key))
     {
+      GNUNET_break_op (0);
       GNUNET_free (n->public_key);
       n->public_key = NULL;
       return;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received `%s' for `%4s', continuing processing of `%s' message.\n",
               "HELLO", GNUNET_i2s (peer), "SET_KEY");
+#endif
   handle_set_key (n, sm);
 }
 
@@ -1985,9 +2232,11 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
   struct PingMessage *tp;
   struct MessageEntry *me;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "PING", GNUNET_i2s (&n->peer));
+#endif
   if (GNUNET_OK !=
       do_decrypt (n,
                   &my_identity.hashPubKey,
@@ -1996,6 +2245,7 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
                   sizeof (struct PingMessage) -
                   sizeof (struct GNUNET_MessageHeader)))
     return;
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted `%s' to `%4s' with challenge %u decrypted using key %u\n",
               "PING",
@@ -2004,6 +2254,7 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Target of `%s' request is `%4s'.\n",
               "PING", GNUNET_i2s (&t.target));
+#endif
   if (0 != memcmp (&t.target,
                    &my_identity, sizeof (struct GNUNET_PeerIdentity)))
     {
@@ -2031,9 +2282,11 @@ handle_ping (struct Neighbour *n, const struct PingMessage *m)
               &tp->challenge,
               sizeof (struct PingMessage) -
               sizeof (struct GNUNET_MessageHeader));
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Encrypting `%s' with challenge %u using key %u\n", "PONG",
               ntohl (t.challenge), n->encrypt_key.crc32);
+#endif
   /* trigger queue processing */
   process_encrypted_neighbour_queue (n);
 }
@@ -2055,13 +2308,17 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
   struct PingMessage *ping;
   enum PeerStateMachine sender_status;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "SET_KEY", GNUNET_i2s (&n->peer));
+#endif
   if (n->public_key == NULL)
     {
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Lacking public key for peer, trying to obtain one.\n");
+#endif
       m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
       memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
       /* lookup n's public key, then try again */
@@ -2097,7 +2354,9 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
       GNUNET_break_op (0);
       return;
     }
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Decrypting key material.\n");
+#endif
   if ((GNUNET_CRYPTO_rsa_decrypt (my_private_key,
                                   &m->encrypted_key,
                                   &k,
@@ -2123,8 +2382,10 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
     {
     case PEER_STATE_DOWN:
       n->status = PEER_STATE_KEY_RECEIVED;
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Responding to `%s' with my own key.\n", "SET_KEY");
+#endif
       send_key (n);
       break;
     case PEER_STATE_KEY_SENT:
@@ -2133,9 +2394,11 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
           (sender_status != PEER_STATE_KEY_CONFIRMED))
         {
+#if DEBUG_CORE
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Responding to `%s' with my own key (other peer has status %u).\n",
                       "SET_KEY", sender_status);
+#endif
           send_key (n);
         }
       break;
@@ -2143,9 +2406,11 @@ handle_set_key (struct Neighbour *n, const struct SetKeyMessage *m)
       if ((sender_status != PEER_STATE_KEY_RECEIVED) &&
           (sender_status != PEER_STATE_KEY_CONFIRMED))
         {
+#if DEBUG_CORE
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Responding to `%s' with my own key (other peer has status %u), I was already fully up.\n",
                       "SET_KEY", sender_status);
+#endif
           send_key (n);
         }
       break;
@@ -2175,9 +2440,11 @@ handle_pong (struct Neighbour *n, const struct PingMessage *m)
 {
   struct PingMessage t;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "PONG", GNUNET_i2s (&n->peer));
+#endif
   if (GNUNET_OK !=
       do_decrypt (n,
                   &n->peer.hashPubKey,
@@ -2186,23 +2453,27 @@ handle_pong (struct Neighbour *n, const struct PingMessage *m)
                   sizeof (struct PingMessage) -
                   sizeof (struct GNUNET_MessageHeader)))
     return;
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted `%s' from `%4s' with challenge %u using key %u\n",
               "PONG",
               GNUNET_i2s (&t.target),
               ntohl (t.challenge), n->decrypt_key.crc32);
+#endif
   if ((0 != memcmp (&t.target,
                     &n->peer,
                     sizeof (struct GNUNET_PeerIdentity))) ||
       (n->ping_challenge != ntohl (t.challenge)))
     {
       /* PONG malformed */
+#if DEBUG_CORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Received malfromed `%s' wanted sender `%4s' with challenge %u\n",
                   "PONG", GNUNET_i2s (&n->peer), n->ping_challenge);
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Received malfromed `%s' received from `%4s' with challenge %u\n",
                   "PONG", GNUNET_i2s (&t.target), ntohl (t.challenge));
+#endif
       GNUNET_break_op (0);
       return;
     }
@@ -2249,9 +2520,12 @@ send_p2p_message_to_client (struct Neighbour *sender,
   char buf[msize + sizeof (struct NotifyTrafficMessage)];
   struct NotifyTrafficMessage *ntm;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core service passes P2P message of type %u to client.\n",
+              "Core service passes message from `%4s' of type %u to client.\n",
+             GNUNET_i2s(&sender->peer),
               ntohs (((const struct GNUNET_MessageHeader *) m)->type));
+#endif
   ntm = (struct NotifyTrafficMessage *) buf;
   ntm->header.size = htons (msize + sizeof (struct NotifyTrafficMessage));
   ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
@@ -2278,8 +2552,6 @@ deliver_message (struct Neighbour *sender,
   unsigned int tpos;
   int deliver_full;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Passing decrypted P2P message to interested clients.\n");
   type = ntohs (m->type);
   cpos = clients;
   while (cpos != NULL)
@@ -2342,9 +2614,6 @@ deliver_messages (struct Neighbour *sender,
   uint16_t msize;
   int need_align;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Delivering %u bytes of plaintext to interested clients.\n",
-              buffer_size);
   while (offset + sizeof (struct GNUNET_MessageHeader) <= buffer_size)
     {
       if (0 != offset % sizeof (uint16_t))
@@ -2398,9 +2667,11 @@ handle_encrypted_message (struct Neighbour *n,
   uint32_t snum;
   struct GNUNET_TIME_Absolute t;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives `%s' request from `%4s'.\n",
               "ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
+#endif  
   /* decrypt */
   if (GNUNET_OK !=
       do_decrypt (n,
@@ -2424,7 +2695,7 @@ handle_encrypted_message (struct Neighbour *n,
   snum = ntohl (pt->sequence_number);
   if (n->last_sequence_number_received == snum)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Received duplicate message, ignoring.\n");
       /* duplicate, ignore */
       return;
@@ -2432,7 +2703,7 @@ handle_encrypted_message (struct Neighbour *n,
   if ((n->last_sequence_number_received > snum) &&
       (n->last_sequence_number_received - snum > 32))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Received ancient out of sequence message, ignoring.\n");
       /* ancient out of sequence, ignore */
       return;
@@ -2443,7 +2714,7 @@ handle_encrypted_message (struct Neighbour *n,
         1 << (n->last_sequence_number_received - snum - 1);
       if ((n->last_packets_bitmap & rotbit) != 0)
         {
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                       "Received duplicate message, ignoring.\n");
           /* duplicate, ignore */
           return;
@@ -2468,6 +2739,10 @@ handle_encrypted_message (struct Neighbour *n,
     }
 
   /* process decrypted message(s) */
+  update_window (GNUNET_YES,
+                &n->available_send_window,
+                &n->last_asw_update,
+                n->bpm_out);
   n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
   n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
                            n->bpm_out_internal_limit);
@@ -2498,9 +2773,11 @@ handle_transport_receive (void *cls,
   uint16_t type;
   uint16_t size;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received message of type %u from `%4s', demultiplexing.\n",
               ntohs (message->type), GNUNET_i2s (peer));
+#endif
   n = find_neighbour (peer);
   if (n == NULL)
     {
@@ -2545,9 +2822,11 @@ handle_transport_receive (void *cls,
       if ((n->status != PEER_STATE_KEY_RECEIVED) &&
           (n->status != PEER_STATE_KEY_CONFIRMED))
         {
+#if DEBUG_CORE
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
                       "PING", GNUNET_i2s (&n->peer));
+#endif
           GNUNET_free_non_null (n->pending_ping);
           n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
           memcpy (n->pending_ping, message, sizeof (struct PingMessage));
@@ -2586,6 +2865,87 @@ handle_transport_receive (void *cls,
 }
 
 
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ * 
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedule the task that will recalculate the bandwidth
+ * quota for this peer (and possibly force a disconnect of
+ * idle peers by calculating a bandwidth of zero).
+ */
+static void
+schedule_quota_update (struct Neighbour *n)
+{
+  GNUNET_assert (n->quota_update_task ==
+                GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+  n->quota_update_task
+    = GNUNET_SCHEDULER_add_delayed (sched,
+                                   GNUNET_NO,
+                                   GNUNET_SCHEDULER_PRIORITY_IDLE,
+                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+                                   QUOTA_UPDATE_FREQUENCY,
+                                   &neighbour_quota_update,
+                                   n);
+}
+
+
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ * 
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+                       const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+  uint32_t q_in;
+  double pref_rel;
+  double share;
+  unsigned long long distributable;
+  
+  n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+  /* calculate relative preference among all neighbours;
+     divides by a bit more to avoid division by zero AND to
+     account for possibility of new neighbours joining any time 
+     AND to convert to double... */
+  pref_rel = n->current_preference / (1.0 + preference_sum);
+  share = 0;
+  distributable = 0;
+  if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
+    distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
+  share = distributable * pref_rel;
+  q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
+  /* check if we want to disconnect for good due to inactivity */
+  if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
+       (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
+    q_in = 0; /* force disconnect */
+  if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
+       (n->bpm_in - MIN_BPM_CHANGE > q_in) ) 
+    {
+      n->bpm_in = q_in;
+      GNUNET_TRANSPORT_set_quota (transport,
+                                 &n->peer,
+                                 n->bpm_in, 
+                                 n->bpm_out,
+                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                 NULL, NULL);
+    }
+  schedule_quota_update (n);
+}
+
+
 /**
  * Function called by transport to notify us that
  * a peer connected to us (on the network level).
@@ -2603,8 +2963,6 @@ handle_transport_notify_connect (void *cls,
   struct GNUNET_TIME_Absolute now;
   struct ConnectNotifyMessage cnm;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received connection from `%4s'.\n", GNUNET_i2s (peer));
   n = find_neighbour (peer);
   if (n != NULL)
     {
@@ -2616,6 +2974,7 @@ handle_transport_notify_connect (void *cls,
   n = GNUNET_malloc (sizeof (struct Neighbour));
   n->next = neighbours;
   neighbours = n;
+  neighbour_count++;
   n->peer = *peer;
   n->last_latency = latency;
   GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
@@ -2623,20 +2982,22 @@ handle_transport_notify_connect (void *cls,
   n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
   n->last_asw_update = now;
   n->last_arw_update = now;
-  n->bpm_in = DEFAULT_BPM_IN_OUT;
-  n->bpm_out = DEFAULT_BPM_IN_OUT;
+  n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
+  n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
   n->bpm_out_internal_limit = (uint32_t) - 1;
-  n->bpm_out_external_limit = DEFAULT_BPM_IN_OUT;
+  n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
   n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                                 (uint32_t) - 1);
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Created entry for new neighbour `%4s'.\n",
+              "Received connection from `%4s'.\n",
               GNUNET_i2s (&n->peer));
+#endif
+  schedule_quota_update (n);
   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-  cnm.bpm_available = htonl (DEFAULT_BPM_IN_OUT);
+  cnm.reserved = htonl (0);
   cnm.peer = *peer;
-  cnm.last_activity = GNUNET_TIME_absolute_hton (now);
   send_to_all_clients (&cnm.header, GNUNET_YES);
 }
 
@@ -2644,6 +3005,7 @@ handle_transport_notify_connect (void *cls,
 /**
  * Free the given entry for the neighbour (it has
  * already been removed from the list at this point).
+ *
  * @param n neighbour to free
  */
 static void
@@ -2667,6 +3029,8 @@ free_neighbour (struct Neighbour *n)
     GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
   if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
     GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+  if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+    GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
   GNUNET_free_non_null (n->public_key);
   GNUNET_free_non_null (n->pending_ping);
   GNUNET_free (n);
@@ -2688,8 +3052,10 @@ handle_transport_notify_disconnect (void *cls,
   struct Neighbour *n;
   struct Neighbour *p;
 
+#if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Peer `%4s' disconnected from us.\n", GNUNET_i2s (peer));
+#endif
   p = NULL;
   n = neighbours;
   while ((n != NULL) &&
@@ -2707,11 +3073,12 @@ handle_transport_notify_disconnect (void *cls,
     neighbours = n->next;
   else
     p->next = n->next;
+  GNUNET_assert (neighbour_count > 0);
+  neighbour_count--;
   cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
   cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
-  cnm.bpm_available = htonl (0);
+  cnm.reserved = htonl (0);
   cnm.peer = *peer;
-  cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
   send_to_all_clients (&cnm.header, GNUNET_YES);
   free_neighbour (n);
 }
@@ -2724,10 +3091,25 @@ handle_transport_notify_disconnect (void *cls,
 static void
 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service shutting down.\n");
+  struct Neighbour *n;
+  struct Client *c;
+
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Core service shutting down.\n");
+#endif
   GNUNET_assert (transport != NULL);
   GNUNET_TRANSPORT_disconnect (transport);
   transport = NULL;
+  while (NULL != (n = neighbours))
+    {
+      neighbours = n->next;
+      GNUNET_assert (neighbour_count > 0);
+      neighbour_count--;
+      free_neighbour (n);
+    }
+  while (NULL != (c = clients))
+    handle_client_disconnect (NULL, c->client_handle);
 }
 
 
@@ -2755,12 +3137,17 @@ run (void *cls,
   cfg = c;
   /* parse configuration */
   if (
-#if 0
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
-                                               "XX",
-                                               &qin)) ||
+                                               "TOTAL_QUOTA_IN",
+                                               &bandwidth_target_in)) ||
+       (GNUNET_OK !=
+        GNUNET_CONFIGURATION_get_value_number (c,
+                                               "CORE",
+                                               "TOTAL_QUOTA_OUT",
+                                               &bandwidth_target_out)) ||
+#if 0
        (GNUNET_OK !=
         GNUNET_CONFIGURATION_get_value_number (c,
                                                "CORE",
@@ -2824,19 +3211,8 @@ run (void *cls,
 static void
 cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct Neighbour *n;
-
   if (my_private_key != NULL)
     GNUNET_CRYPTO_rsa_key_free (my_private_key);
-  while (NULL != (n = neighbours))
-    {
-      neighbours = n->next;
-      free_neighbour (n);
-    }
-  /*
-     FIXME:
-     - free clients
-   */
 }