stats
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
index 3fbbc8804f87cb8165f0f7c41eed3c44d1b2bb2d..6899ef5fbc00aff0579ecbfe9b9e7dcd0d702af0 100644 (file)
@@ -79,7 +79,7 @@
  * How often must a peer violate bandwidth quotas before we start
  * to simply drop its messages?
  */
-#define QUOTA_VIOLATION_DROP_THRESHOLD 100
+#define QUOTA_VIOLATION_DROP_THRESHOLD 10
 
 /**
  * How long until a HELLO verification attempt should time out?
  */
 #define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
 
+/**
+ * How often will we re-validate for latency information
+ */
+#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
 /**
  * Priority to use for PONG messages.
  */
@@ -144,6 +149,12 @@ struct ForeignAddressList
    */
   struct GNUNET_TIME_Absolute expires;
 
+  /**
+   * Task used to re-validate addresses, updates latencies and
+   * verifies liveness.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier revalidate_task;
+
   /**
    * Length of addr.
    */
@@ -445,9 +456,9 @@ struct NeighbourList
   struct GNUNET_TIME_Absolute peer_timeout;
 
   /**
-   * At what time did we reset last_received last?
+   * Tracker for inbound bandwidth.
    */
-  struct GNUNET_TIME_Absolute last_quota_update;
+  struct GNUNET_BANDWIDTH_Tracker in_tracker;
 
   /**
    * The latency we have seen for this particular address for
@@ -462,22 +473,6 @@ struct NeighbourList
    */
   struct GNUNET_TIME_Relative latency;
 
-  /**
-   * DV distance to this peer (1 if no DV is used). 
-   */
-  uint32_t distance;
-
-  /**
-   * How many bytes have we received since the "last_quota_update"
-   * timestamp?
-   */
-  uint64_t last_received;
-
-  /**
-   * Global quota for inbound traffic for the neighbour in bytes/ms.
-   */
-  uint32_t quota_in;
-
   /**
    * How often has the other peer (recently) violated the
    * inbound traffic limit?  Incremented by 10 per violation,
@@ -486,6 +481,11 @@ struct NeighbourList
    */
   unsigned int quota_violation_count;
 
+  /**
+   * DV distance to this peer (1 if no DV is used). 
+   */
+  uint32_t distance;
+
   /**
    * Have we seen an PONG from this neighbour in the past (and
    * not had a disconnect since)?
@@ -496,9 +496,7 @@ struct NeighbourList
 
 /**
  * Message used to ask a peer to validate receipt (to check an address
- * from a HELLO).  Followed by the address used.  Note that the
- * recipients response does not affirm that he has this address,
- * only that he got the challenge message.
+ * from a HELLO).  
  */
 struct TransportPingMessage
 {
@@ -722,6 +720,32 @@ struct CheckHelloValidatedContext
 
 };
 
+/**
+ * Struct for keeping information about addresses to validate
+ * so that we can re-use for sending around ping's and receiving
+ * pongs periodically to keep connections alive and also better
+ * estimate latency of connections.
+ *
+ */
+struct PeriodicValidationContext
+{
+
+  /**
+   * The address we are keeping alive
+   */
+  struct ForeignAddressList *foreign_address;
+
+  /**
+   * The name of the transport
+   */
+  char *transport;
+
+  /**
+   * Public Key of the peer to re-validate
+   */
+  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+};
 
 /**
  * Our HELLO message.
@@ -794,13 +818,17 @@ static struct CheckHelloValidatedContext *chvc_head;
  */
 static struct CheckHelloValidatedContext *chvc_tail;
 
-
 /**
  * Map of PeerIdentities to 'struct ValidationEntry*'s (addresses
  * of the given peer that we are currently validating).
  */
 static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
 
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
 
 /**
  * The peer specified by the given neighbour has timed-out or a plugin
@@ -831,7 +859,9 @@ static void try_transmission_to_peer (struct NeighbourList *neighbour);
  * if sender_address is not specified (NULL) then return the
  * first matching entry.  If sender_address is specified, then
  * make sure that the address and address_len also matches.
- *
+ * 
+ * FIXME: This description does not fit the function.
+ *  
  * @return NULL if not found.
  */
 static struct NeighbourList *
@@ -861,49 +891,6 @@ find_transport (const char *short_name)
 }
 
 
-/**
- * Update the quota values for the given neighbour now.
- */
-static void
-update_quota (struct NeighbourList *n)
-{
-  struct GNUNET_TIME_Relative delta;
-  uint64_t allowed;
-  uint64_t remaining;
-
-  delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-  if (delta.value < MIN_QUOTA_REFRESH_TIME)
-    return;                     /* not enough time passed for doing quota update */
-  allowed = delta.value * n->quota_in;
-
-  if (n->last_received < allowed)
-    {
-      remaining = allowed - n->last_received;
-      if (n->quota_in > 0)
-        remaining /= n->quota_in;
-      else
-        remaining = 0;
-      if (remaining > MAX_BANDWIDTH_CARRY)
-        remaining = MAX_BANDWIDTH_CARRY;
-      n->last_received = 0;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
-      n->last_quota_update.value -= remaining;
-      if (n->quota_violation_count > 0)
-        n->quota_violation_count--;
-    }
-  else
-    {
-      n->last_received -= allowed;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
-      if (n->last_received > allowed)
-        {
-          /* more than twice the allowed rate! */
-          n->quota_violation_count += 10;
-        }
-    }
-}
-
-
 /**
  * Function called to notify a client about the socket being ready to
  * queue more data.  "buf" will be NULL and "size" zero if the socket
@@ -932,6 +919,10 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
       /* fatal error with client, free message queue! */
       while (NULL != (q = client->message_queue_head))
         {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# bytes discarded (could not transmit to client)"),
+                                   ntohs (((const struct GNUNET_MessageHeader*)&q[1])->size),
+                                   GNUNET_NO);      
          GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
                                       client->message_queue_tail,
                                       q);
@@ -1068,6 +1059,20 @@ transmit_send_continuation (void *cls,
   struct MessageQueue *mq = cls;
   struct NeighbourList *n;
 
+  if (result == GNUNET_OK)
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# bytes successfully transmitted by plugins"),
+                               mq->message_buf_size,
+                               GNUNET_NO);      
+    }
+  else
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# bytes with transmission failure by plugins"),
+                               mq->message_buf_size,
+                               GNUNET_NO);      
+    }  
   n = find_neighbour(&mq->neighbour_id);
   GNUNET_assert (n != NULL);
   if (mq->specific_address != NULL)
@@ -1090,8 +1095,6 @@ transmit_send_continuation (void *cls,
     transmit_send_ok (mq->client, n, result);
   GNUNET_free (mq);
   try_transmission_to_peer (n);
-  if (result != GNUNET_OK)
-    disconnect_neighbour (n, GNUNET_YES);    
 }
 
 
@@ -1184,14 +1187,12 @@ retry_transmission_task (void *cls,
 static void
 try_transmission_to_peer (struct NeighbourList *neighbour)
 {
-  struct GNUNET_TIME_Relative min_latency;
   struct ReadyList *rl;
   struct MessageQueue *mq;
   struct GNUNET_TIME_Relative timeout;
 
   if (neighbour->messages_head == NULL)
     return;                     /* nothing to do */
-  min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
   rl = NULL;
   mq = neighbour->messages_head;
   /* FIXME: support bi-directional use of TCP */
@@ -1208,6 +1209,14 @@ try_transmission_to_peer (struct NeighbourList *neighbour)
                      mq->message_buf_size,
                      GNUNET_i2s (&mq->neighbour_id));
 #endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# bytes in message queue for other peers"),
+                                   -mq->message_buf_size,
+                                   GNUNET_NO);
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# bytes discarded (no destination address available)"),
+                                   mq->message_buf_size,
+                                   GNUNET_NO);      
          if (mq->client != NULL)
            transmit_send_ok (mq->client, neighbour, GNUNET_NO);
          GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
@@ -1230,6 +1239,8 @@ try_transmission_to_peer (struct NeighbourList *neighbour)
                  GNUNET_i2s (&mq->neighbour_id),
                  timeout.value);
 #endif
+      /* FIXME: might want to trigger peerinfo lookup here
+        (unless that's already pending...) */
       return;    
     }
   GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
@@ -1250,6 +1261,14 @@ try_transmission_to_peer (struct NeighbourList *neighbour)
                          mq->specific_address->addrlen),
              rl->plugin->short_name);
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# bytes in message queue for other peers"),
+                           -mq->message_buf_size,
+                           GNUNET_NO);
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# bytes transmitted to other peers"),
+                           mq->message_buf_size,
+                           GNUNET_NO);
   rl->plugin->api->send (rl->plugin->api->cls,
                         &mq->neighbour_id,
                         mq->message_buf,
@@ -1297,7 +1316,7 @@ transmit_to_peer (struct TransportClient *client,
           if (mq->client == client)
             {
               /* client transmitted to same peer twice
-                 before getting SendOk! */
+                 before getting SEND_OK! */
               GNUNET_break (0);
               return;
             }
@@ -1305,6 +1324,10 @@ transmit_to_peer (struct TransportClient *client,
         }
     }
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# bytes in message queue for other peers"),
+                           message_buf_size,
+                           GNUNET_NO);
   mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
   mq->specific_address = peer_address;
   mq->client = client;
@@ -1387,6 +1410,10 @@ refresh_hello ()
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
               "Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello));
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# refreshed my HELLO"),
+                           1,
+                           GNUNET_NO);
   cpos = clients;
   while (cpos != NULL)
     {
@@ -1408,6 +1435,10 @@ refresh_hello ()
                   "Transmitting updated `%s' to neighbour `%4s'\n",
                   "HELLO", GNUNET_i2s (&npos->id));
 #endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# transmitted my HELLO to other peers"),
+                               1,
+                               GNUNET_NO);
       transmit_to_peer (NULL, NULL, 0,
                        HELLO_ADDRESS_EXPIRATION,
                         (const char *) our_hello, 
@@ -1570,6 +1601,10 @@ notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
              "Notifying clients about connection from `%s'\n",
              GNUNET_i2s (peer));
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# peers connected"),
+                           1,
+                           GNUNET_NO);
   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
   cim.distance = htonl (distance);
@@ -1598,6 +1633,10 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
              "Notifying clients about lost connection to `%s'\n",
              GNUNET_i2s (peer));
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# peers connected"),
+                           -1,
+                           GNUNET_NO);
   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
   dim.reserved = htonl (0);
@@ -1745,13 +1784,16 @@ add_validated_address (void *cls,
 }
 
 
+static void send_periodic_ping(void *cls,
+                              const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /**
- * Iterator over hash map entries.  Checks if the given
- * validation entry is for the same challenge as what
- * is given in the PONG.
+ * Iterator over hash map entries.  Checks if the given validation
+ * entry is for the same challenge as what is given in the PONG.
  *
  * @param cls the 'struct TransportPongMessage*'
- * @param key peer identity 
+ * @param key peer identity
  * @param value value in the hash map ('struct ValidationEntry')
  * @return GNUNET_YES if we should continue to
  *         iterate (mismatch), GNUNET_NO if not (entry matched)
@@ -1769,10 +1811,11 @@ check_pending_validation (void *cls,
   struct GNUNET_PeerIdentity target;
   struct NeighbourList *n;
   struct ForeignAddressList *fal;
+  struct PeriodicValidationContext *periodic_validation_context;
 
   if (ve->challenge != challenge)
     return GNUNET_YES;
-  
+
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Confirmed validity of address, peer `%4s' has address `%s' (%s).\n",
@@ -1781,6 +1824,10 @@ check_pending_validation (void *cls,
                          ve->addrlen),
              ve->transport_name);
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# address validation successes"),
+                           1,
+                           GNUNET_NO);
   /* create the updated HELLO */
   GNUNET_CRYPTO_hash (&ve->publicKey,
                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
@@ -1791,18 +1838,35 @@ check_pending_validation (void *cls,
                               &add_validated_address,
                               &avac);
   GNUNET_PEERINFO_add_peer (cfg, sched,
-                           &target, 
+                           &target,
                            hello);
   GNUNET_free (hello);
   n = find_neighbour (&target);
   if (n != NULL)
     {
-      fal = add_peer_address (n, ve->transport_name, 
+      fal = add_peer_address (n,
+                             ve->transport_name,
                              ve->addr,
                              ve->addrlen);
+      GNUNET_assert (fal != NULL);
       fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
       fal->validated = GNUNET_YES;
       fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+      periodic_validation_context = GNUNET_malloc(sizeof(struct PeriodicValidationContext));
+      periodic_validation_context->foreign_address = fal;
+      periodic_validation_context->transport = strdup(ve->transport_name);
+      memcpy(&periodic_validation_context->publicKey, 
+            &ve->publicKey, 
+            sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+      /* FIXME: this causes all of the revalidation PINGs for the same HELLO
+        to be transmitted in bulk, which is not nice; also,
+        triggering these HERE means that revalidations do NOT happen AT ALL
+        for HELLOs a previous instance of this process validated (since
+        there is no "initial" validation PING => no revalidation => BUG! */
+      fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched, 
+                                                         TRANSPORT_DEFAULT_REVALIDATION, 
+                                                         &send_periodic_ping, 
+                                                         periodic_validation_context);
       if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
        n->latency = fal->latency;
       else
@@ -1817,7 +1881,7 @@ check_pending_validation (void *cls,
        {
          GNUNET_SCHEDULER_cancel (sched,
                                   n->retry_task);
-         n->retry_task = GNUNET_SCHEDULER_NO_TASK;     
+         n->retry_task = GNUNET_SCHEDULER_NO_TASK;
          try_transmission_to_peer (n);
        }
     }
@@ -1844,11 +1908,11 @@ check_pending_validation (void *cls,
  * (otherwise we may be seeing a MiM attack).
  *
  * @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
  * @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- *         by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ *         by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
  */
 static void
 handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
@@ -1863,7 +1927,11 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
              "Receiving `%s' message from `%4s'.\n", "PONG",
              GNUNET_i2s (peer));
 #endif
-  if (GNUNET_SYSERR != 
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# PONG messages received"),
+                           1,
+                           GNUNET_NO);
+  if (GNUNET_SYSERR !=
       GNUNET_CONTAINER_multihashmap_get_multiple (validation_map,
                                                  &peer->hashPubKey,
                                                  &check_pending_validation,
@@ -1884,15 +1952,15 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
 #endif
       return;
     }
-  
+
 #if 0
   /* FIXME: add given address to potential pool of our addresses
      (for voting) */
   GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
              _("Another peer saw us using the address `%s' via `%s'.\n"),
              GNUNET_a2s ((const struct sockaddr *) &pong[1],
-                         ntohs(pong->addrlen)), 
-             va->transport_name);  
+                         ntohs(pong->addrlen)),
+             va->transport_name);
 #endif
 }
 
@@ -1907,6 +1975,10 @@ neighbour_timeout_task (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
               "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
 #endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# disconnects due to timeout"),
+                           1,
+                           GNUNET_NO);
   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
   disconnect_neighbour (n, GNUNET_NO);
 }
@@ -1932,11 +2004,12 @@ setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
   n->next = neighbours;
   neighbours = n;
   n->id = *peer;
-  n->last_quota_update = GNUNET_TIME_absolute_get ();
   n->peer_timeout =
     GNUNET_TIME_relative_to_absolute
     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
+  GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+                                GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+                                MAX_BANDWIDTH_CARRY_S);
   tp = plugins;
   while (tp != NULL)
     {
@@ -2034,6 +2107,10 @@ timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *
   struct ValidationEntry *va = cls;
   struct GNUNET_PeerIdentity pid;
 
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# address validation timeouts"),
+                           1,
+                           GNUNET_NO);
   GNUNET_CRYPTO_hash (&va->publicKey,
                      sizeof (struct
                              GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
@@ -2046,6 +2123,163 @@ timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *
 }
 
 
+/**
+ * Check if the given address is already being validated; if not,
+ * append the given address to the list of entries that are being be
+ * validated and initiate validation.
+ *
+ * @param cls closure ('struct PeriodicValidationContext *')
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
+ */
+static int
+rerun_validation (void *cls,
+                const char *tname,
+                struct GNUNET_TIME_Absolute expiration,
+                const void *addr, size_t addrlen)
+{
+  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey = cls;
+  struct GNUNET_PeerIdentity id;
+  struct TransportPlugin *tp;
+  struct ValidationEntry *va;
+  struct NeighbourList *neighbour;
+  struct ForeignAddressList *peer_address;
+  struct TransportPingMessage ping;
+  /*struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;*/
+  struct CheckAddressExistsClosure caec;
+  char * message_buf;
+  uint16_t hello_size;
+  size_t tsize;
+
+  tp = find_transport (tname);
+  if (tp == NULL)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO |
+                  GNUNET_ERROR_TYPE_BULK,
+                  _
+                  ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
+                  tname);
+      return GNUNET_OK;
+    }
+
+  GNUNET_CRYPTO_hash (publicKey,
+                      sizeof (struct
+                              GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+                      &id.hashPubKey);
+  caec.addr = addr;
+  caec.addrlen = addrlen;
+  caec.tname = tname;
+  caec.exists = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+                                         &check_address_exists,
+                                         &caec);
+  if (caec.exists == GNUNET_YES)
+    {
+      /* During validation attempts we will likely trigger the other
+         peer trying to validate our address which in turn will cause
+         it to send us its HELLO, so we expect to hit this case rather
+         frequently.  Only print something if we are very verbose. */
+#if DEBUG_TRANSPORT > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Some validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
+                  GNUNET_a2s (addr, addrlen),
+                  tname,
+                  GNUNET_i2s (&id));
+#endif
+      return GNUNET_OK;
+    }
+  va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
+  va->transport_name = GNUNET_strdup (tname);
+  va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                            (unsigned int) -1);
+  va->send_time = GNUNET_TIME_absolute_get();
+  va->addr = (const void*) &va[1];
+  memcpy (&va[1], addr, addrlen);
+  va->addrlen = addrlen;
+  memcpy(&va->publicKey, publicKey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+  va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                   HELLO_VERIFICATION_TIMEOUT,
+                                                   &timeout_hello_validation,
+                                                   va);
+  GNUNET_CONTAINER_multihashmap_put (validation_map,
+                                     &id.hashPubKey,
+                                     va,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  neighbour = find_neighbour(&id);
+  if (neighbour == NULL)
+    neighbour = setup_new_neighbour(&id);
+  peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+  GNUNET_assert(peer_address != NULL);
+  hello_size = GNUNET_HELLO_size(our_hello);
+  tsize = sizeof(struct TransportPingMessage) + hello_size;
+  message_buf = GNUNET_malloc(tsize);
+  ping.challenge = htonl(va->challenge);
+  ping.header.size = htons(sizeof(struct TransportPingMessage));
+  ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
+  memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
+  memcpy(message_buf, our_hello, hello_size);
+  memcpy(&message_buf[hello_size],
+         &ping,
+         sizeof(struct TransportPingMessage));
+#if DEBUG_TRANSPORT_REVALIDATION
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Performing re-validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
+              GNUNET_a2s (addr, addrlen),
+              tname,
+              GNUNET_i2s (&id),
+              "HELLO", hello_size,
+              "PING", sizeof (struct TransportPingMessage));
+#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# PING messages sent for re-validation"),
+                           1,
+                           GNUNET_NO);
+  transmit_to_peer (NULL, peer_address,
+                    GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+                    HELLO_VERIFICATION_TIMEOUT,
+                    message_buf, tsize,
+                    GNUNET_YES, neighbour);
+  GNUNET_free(message_buf);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Send periodic ping messages to a give foreign address.
+ *
+ * cls closure, can be safely cast to ForeignAddressList
+ * tc task context
+ *
+ * FIXME: Since a _billion_ pongs are sent for every ping,
+ * maybe this should be a special message type or something
+ * that gets discarded on the other side instead of initiating
+ * a flood.
+ */
+static void 
+send_periodic_ping (void *cls, 
+                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PeriodicValidationContext *periodic_validation_context = cls;
+
+  if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+    {
+      GNUNET_free(periodic_validation_context->transport);
+      GNUNET_free(periodic_validation_context);
+      return; /* We have been shutdown, don't do anything! */
+    }
+  rerun_validation(&periodic_validation_context->publicKey,
+                  periodic_validation_context->transport, 
+                  periodic_validation_context->foreign_address->expires,
+                  periodic_validation_context->foreign_address->addr, 
+                  periodic_validation_context->foreign_address->addrlen);
+  GNUNET_free(periodic_validation_context->transport);
+  GNUNET_free(periodic_validation_context);
+}
+
+
 /**
  * Check if the given address is already being validated; if not,
  * append the given address to the list of entries that are being be
@@ -2077,6 +2311,10 @@ run_validation (void *cls,
   uint16_t hello_size;
   size_t tsize;
 
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# peer addresses scheduled for validation"),
+                           1,
+                           GNUNET_NO);      
   tp = find_transport (tname);
   if (tp == NULL)
     {
@@ -2085,6 +2323,10 @@ run_validation (void *cls,
                   _
                   ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
                   tname);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# peer addresses not validated (no applicable transport plugin available)"),
+                               1,
+                               GNUNET_NO);      
       return GNUNET_OK;
     }
   GNUNET_HELLO_get_key (chvc->hello, &pk);
@@ -2108,12 +2350,16 @@ run_validation (void *cls,
 #if DEBUG_TRANSPORT > 1
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
-                 GNUNET_a2s (addr, addrlen), 
-                 tname, 
+                 GNUNET_a2s (addr, addrlen),
+                 tname,
                  GNUNET_i2s (&id));
 #endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# peer addresses not validated (in progress)"),
+                               1,
+                               GNUNET_NO);      
       return GNUNET_OK;
-    } 
+    }
   va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
   va->transport_name = GNUNET_strdup (tname);
   va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
@@ -2127,15 +2373,15 @@ run_validation (void *cls,
   va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
                                                   HELLO_VERIFICATION_TIMEOUT,
                                                   &timeout_hello_validation,
-                                                  va);  
+                                                  va);
   GNUNET_CONTAINER_multihashmap_put (validation_map,
                                     &id.hashPubKey,
                                     va,
                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  neighbour = find_neighbour(&id);  
+  neighbour = find_neighbour(&id);
   if (neighbour == NULL)
     neighbour = setup_new_neighbour(&id);
-  peer_address = add_peer_address(neighbour, tname, addr, addrlen);    
+  peer_address = add_peer_address(neighbour, tname, addr, addrlen);
   GNUNET_assert(peer_address != NULL);
   hello_size = GNUNET_HELLO_size(our_hello);
   tsize = sizeof(struct TransportPingMessage) + hello_size;
@@ -2145,22 +2391,26 @@ run_validation (void *cls,
   ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
   memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
   memcpy(message_buf, our_hello, hello_size);
-  memcpy(&message_buf[hello_size], 
-        &ping, 
+  memcpy(&message_buf[hello_size],
+        &ping,
         sizeof(struct TransportPingMessage));
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
-              GNUNET_a2s (addr, addrlen), 
-             tname, 
+              GNUNET_a2s (addr, addrlen),
+             tname,
              GNUNET_i2s (&id),
              "HELLO", hello_size,
              "PING", sizeof (struct TransportPingMessage));
 #endif
-  transmit_to_peer (NULL, peer_address, 
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# PING messages sent for initial validation"),
+                           1,
+                           GNUNET_NO);      
+  transmit_to_peer (NULL, peer_address,
                    GNUNET_SCHEDULER_PRIORITY_DEFAULT,
                    HELLO_VERIFICATION_TIMEOUT,
-                   message_buf, tsize, 
+                   message_buf, tsize,
                    GNUNET_YES, neighbour);
   GNUNET_free(message_buf);
   return GNUNET_OK;
@@ -2186,7 +2436,9 @@ add_to_foreign_address_list (void *cls,
 {
   struct NeighbourList *n = cls;
   struct ForeignAddressList *fal;
+  int try;
 
+  try = GNUNET_NO;
   fal = find_peer_address (n, tname, addr, addrlen);
   if (fal == NULL)
     {
@@ -2199,12 +2451,15 @@ add_to_foreign_address_list (void *cls,
                  expiration.value);
 #endif
       fal = add_peer_address (n, tname, addr, addrlen);
+      try = GNUNET_YES;
     }
   if (fal == NULL)
     return GNUNET_OK;
   fal->expires = GNUNET_TIME_absolute_max (expiration,
                                           fal->expires);
-  fal->validated = GNUNET_YES;
+  fal->validated = GNUNET_YES;  
+  if (try == GNUNET_YES)
+    try_transmission_to_peer (n);
   return GNUNET_OK;
 }
 
@@ -2215,7 +2470,7 @@ add_to_foreign_address_list (void *cls,
  *
  * @param cls closure
  * @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
  * @param trust amount of trust we have in the peer (not used)
  */
 static void
@@ -2255,16 +2510,33 @@ check_hello_validated (void *cls,
                      "HELLO",
                      GNUNET_i2s (&target));
 #endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# new HELLOs requiring full validation"),
+                                   1,
+                                   GNUNET_NO);      
          GNUNET_HELLO_iterate_addresses (chvc->hello,
                                          GNUNET_NO, 
                                          &run_validation, 
                                          chvc);
        }
+      else
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# duplicate HELLO (peer known)"),
+                                   1,
+                                   GNUNET_NO);      
+       }
       GNUNET_free (chvc);
       return;
-    }
+    } 
   if (h == NULL)
     return;
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Peerinfo had `%s' message for peer `%4s', validating only new addresses.\n",
+             "HELLO",
+             GNUNET_i2s (peer));
+#endif
   chvc->hello_known = GNUNET_YES;
   n = find_neighbour (peer);
   if (n != NULL)
@@ -2310,7 +2582,10 @@ process_hello (struct TransportPlugin *plugin,
   if (GNUNET_SCHEDULER_get_load (sched,
                                 GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD)
     {
-      /* TODO: call to stats? */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# HELLOs ignored due to high load"),
+                               1,
+                               GNUNET_NO);      
       return GNUNET_OK;
     }
   hello = (const struct GNUNET_HELLO_Message *) message;
@@ -2322,14 +2597,23 @@ process_hello (struct TransportPlugin *plugin,
   GNUNET_CRYPTO_hash (&publicKey,
                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
                       &target.hashPubKey);
-#if DEBUG_TRANSPORT
+  if (0 == memcmp (&my_identity,
+                  &target,
+                  sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# HELLOs ignored for validation (is my own HELLO)"),
+                               1,
+                               GNUNET_NO);      
+      return GNUNET_OK;      
+    }
+#if DEBUG_TRANSPORT > 1
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Processing `%s' message for `%4s' of size %u\n",
               "HELLO", 
              GNUNET_i2s (&target), 
              GNUNET_HELLO_size(hello));
 #endif
-
   chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
   chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1];
   memcpy (&chvc[1], hello, hsize);
@@ -2427,6 +2711,14 @@ disconnect_neighbour (struct NeighbourList *n, int check)
   /* free all messages on the queue */
   while (NULL != (mq = n->messages_head))
     {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# bytes in message queue for other peers"),
+                               -mq->message_buf_size,
+                               GNUNET_NO);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# bytes discarded due to disconnect"),
+                               mq->message_buf_size,
+                               GNUNET_NO);
       GNUNET_CONTAINER_DLL_remove (n->messages_head,
                                   n->messages_tail,
                                   mq);
@@ -2467,13 +2759,11 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
   struct TransportPlugin *plugin = cls;
   struct TransportPingMessage *ping;
   struct TransportPongMessage *pong;
-  uint16_t msize;
   struct NeighbourList *n;
   struct ReadyList *rl;
   struct ForeignAddressList *fal;
 
-  msize = ntohs (message->size);
-  if (msize < sizeof (struct TransportPingMessage))
+  if (ntohs (message->size) != sizeof (struct TransportPingMessage))
     {
       GNUNET_break_op (0);
       return GNUNET_SYSERR;
@@ -2495,7 +2785,10 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
              GNUNET_a2s ((const struct sockaddr *)sender_address, 
                          sender_address_len));
 #endif
-  msize -= sizeof (struct TransportPingMessage);
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# PING messages received"),
+                           1,
+                           GNUNET_NO);
   pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len);
   pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len);
   pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG);
@@ -2547,18 +2840,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
  * and generally forward to our receive callback.
  *
  * @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- *        (opaque to transport service)
- * @param sender_address_len the length of the sender address
  * @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- *         for future receive calls for messages from this
- *         particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ *         (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
 plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message,
                     unsigned int distance, const char *sender_address,
@@ -2571,102 +2862,125 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
   struct ForeignAddressList *peer_address;
   uint16_t msize;
   struct NeighbourList *n;
+  struct GNUNET_TIME_Relative ret;
 
   n = find_neighbour (peer);
   if (n == NULL)
-    {
-      if (message == NULL)
-        return;                 /* disconnect of peer already marked down */
-      n = setup_new_neighbour (peer);
-    }
+    n = setup_new_neighbour (peer);    
   service_context = n->plugins;
   while ((service_context != NULL) && (plugin != service_context->plugin))
     service_context = service_context->next;
   GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
-  if (message == NULL)
+  if (message != NULL)
     {
+      peer_address = add_peer_address(n, 
+                                     plugin->short_name,
+                                     sender_address, 
+                                     sender_address_len);  
+      if (peer_address != NULL)
+       {
+         peer_address->distance = distance;
+         if (peer_address->connected == GNUNET_NO)
+           {
+             peer_address->connected = GNUNET_YES;
+             peer_address->connect_attempts++;
+           }
+         peer_address->timeout
+           =
+           GNUNET_TIME_relative_to_absolute
+           (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+       }
+      /* update traffic received amount ... */
+      msize = ntohs (message->size);      
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# bytes received from other peers"),
+                               msize,
+                               GNUNET_NO);
+      n->distance = distance;
+      n->peer_timeout =
+       GNUNET_TIME_relative_to_absolute
+       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+      GNUNET_SCHEDULER_cancel (sched,
+                              n->timeout_task);
+      n->timeout_task =
+       GNUNET_SCHEDULER_add_delayed (sched,
+                                     GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                     &neighbour_timeout_task, n);
+      if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+       {
+         /* dropping message due to frequent inbound volume violations! */
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+                     GNUNET_ERROR_TYPE_BULK,
+                     _
+                     ("Dropping incoming message due to repeated bandwidth quota (%u b/s) violations (total of %u).\n"), 
+                     n->in_tracker.available_bytes_per_s__,
+                     n->quota_violation_count);
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# bandwidth quota violations by other peers"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+       }
+      switch (ntohs (message->type))
+       {
+       case GNUNET_MESSAGE_TYPE_HELLO:
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# HELLO messages received from other peers"),
+                                   1,
+                                   GNUNET_NO);
+         process_hello (plugin, message);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+         handle_ping(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+         handle_pong(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       default:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Receive failed from `%4s', triggering disconnect\n",
-                  GNUNET_i2s (&n->id));
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Received message of type %u from `%4s', sending to all clients.\n",
+                     ntohs (message->type), GNUNET_i2s (peer));
 #endif
-      /* TODO: call stats */
-      disconnect_neighbour (n, GNUNET_YES);
-      return;
-    }
-  peer_address = add_peer_address(n, 
-                                 plugin->short_name,
-                                 sender_address, 
-                                 sender_address_len);  
-  if (peer_address != NULL)
+         if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+                                                             msize))
+           n->quota_violation_count++;
+         else 
+           n->quota_violation_count = 0; /* back within limits */
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# payload received from other peers"),
+                                   msize,
+                                   GNUNET_NO);
+         /* transmit message to all clients */
+         im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+         im->header.size = htons (sizeof (struct InboundMessage) + msize);
+         im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+         im->latency = GNUNET_TIME_relative_hton (n->latency);
+         im->peer = *peer;
+         memcpy (&im[1], message, msize);
+         cpos = clients;
+         while (cpos != NULL)
+           {
+             transmit_to_client (cpos, &im->header, GNUNET_YES);
+             cpos = cpos->next;
+           }
+         GNUNET_free (im);
+       }
+    }  
+  ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
+  if (ret.value > 0)
     {
-      peer_address->distance = distance;
-      if (peer_address->connected == GNUNET_NO)
-        {
-         peer_address->connected = GNUNET_YES;
-          peer_address->connect_attempts++;
-        }
-      peer_address->timeout
-        =
-        GNUNET_TIME_relative_to_absolute
-        (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    }
-  /* update traffic received amount ... */
-  msize = ntohs (message->size);
-  n->last_received += msize;
-  n->distance = distance;
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  GNUNET_SCHEDULER_cancel (sched,
-                          n->timeout_task);
-  n->timeout_task =
-    GNUNET_SCHEDULER_add_delayed (sched,
-                                  GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                  &neighbour_timeout_task, n);
-  update_quota (n);
-  if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
-    {
-      /* dropping message due to frequent inbound volume violations! */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
-                  GNUNET_ERROR_TYPE_BULK,
-                  _
-                  ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
-      /* TODO: call stats */
-      return;
-    }
-  switch (ntohs (message->type))
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      process_hello (plugin, message);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
-      handle_ping(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
-      handle_pong(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    default:
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received message of type %u from `%4s', sending to all clients.\n",
-                  ntohs (message->type), GNUNET_i2s (peer));
-#endif
-      /* transmit message to all clients */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
-      im->header.size = htons (sizeof (struct InboundMessage) + msize);
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = GNUNET_TIME_relative_hton (n->latency);
-      im->peer = *peer;
-      memcpy (&im[1], message, msize);
-      cpos = clients;
-      while (cpos != NULL)
-        {
-          transmit_to_client (cpos, &im->header, GNUNET_YES);
-          cpos = cpos->next;
-        }
-      GNUNET_free (im);
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Throttling read (%llu bytes excess at %u b/s), waiting %llums before reading more.\n",
+                 (unsigned long long) n->in_tracker.consumption_since_last_update__,
+                 (unsigned int) n->in_tracker.available_bytes_per_s__,
+                 (unsigned long long) ret.value);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# ms throttling suggested"),
+                               (int64_t) ret.value,
+                               GNUNET_NO);      
     }
+  return ret;
 }
 
 
@@ -2750,10 +3064,10 @@ handle_hello (void *cls,
 {
   int ret;
 
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client\n", "HELLO");
-#endif
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# HELLOs received from clients"),
+                           1,
+                           GNUNET_NO);      
   ret = process_hello (NULL, message);
   GNUNET_SERVER_receive_done (client, ret);
 }
@@ -2786,6 +3100,10 @@ handle_send (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# payload received for other peers"),
+                           size,
+                           GNUNET_NO);      
   obm = (const struct OutboundMessage *) message;
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2836,34 +3154,33 @@ handle_set_quota (void *cls,
   const struct QuotaSetMessage *qsm =
     (const struct QuotaSetMessage *) message;
   struct NeighbourList *n;
-  struct TransportPlugin *p;
-  struct ReadyList *rl;
-
+  
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# SET QUOTA messages received"),
+                           1,
+                           GNUNET_NO);      
   n = find_neighbour (&qsm->peer);
   if (n == NULL)
     {
       GNUNET_SERVER_receive_done (client, GNUNET_OK);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
+                               1,
+                               GNUNET_NO);      
       return;
     }
-
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
-              "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+              "SET_QUOTA", 
+             (unsigned int) ntohl (qsm->quota.value__),
+             (unsigned int) n->in_tracker.available_bytes_per_s__,
+             GNUNET_i2s (&qsm->peer));
 #endif
-
-  update_quota (n);
-  if (n->quota_in < ntohl (qsm->quota_in))
-    n->last_quota_update = GNUNET_TIME_absolute_get ();
-  n->quota_in = ntohl (qsm->quota_in);
-  rl = n->plugins;
-  while (rl != NULL)
-    {
-      p = rl->plugin;
-      p->api->set_receive_quota (p->api->cls,
-                                 &qsm->peer, ntohl (qsm->quota_in));
-      rl = rl->next;
-    }
+  GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker,
+                                        qsm->quota);
+  if (0 == ntohl (qsm->quota.value__)) 
+    disconnect_neighbour (n, GNUNET_NO);    
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -2980,9 +3297,8 @@ create_environment (struct TransportPlugin *plug)
   plug->env.cls = plug;
   plug->env.receive = &plugin_env_receive;
   plug->env.notify_address = &plugin_env_notify_address;
-  plug->env.default_quota_in =
-    (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
   plug->env.max_connections = max_connect_per_transport;
+  plug->env.stats = stats;
 }
 
 
@@ -3146,6 +3462,12 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
                                         &abort_validation,
                                         NULL);
   GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+  validation_map = NULL;
+  if (stats != NULL)
+    {
+      GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+      stats = NULL;
+    }
 }
 
 
@@ -3171,6 +3493,7 @@ run (void *cls,
 
   sched = s;
   cfg = c;
+  stats = GNUNET_STATISTICS_create (sched, "transport", cfg);
   validation_map = GNUNET_CONTAINER_multihashmap_create (64);
   /* parse configuration */
   if ((GNUNET_OK !=
@@ -3187,6 +3510,13 @@ run (void *cls,
                   _
                   ("Transport service is lacking key configuration settings.  Exiting.\n"));
       GNUNET_SCHEDULER_shutdown (s);
+      if (stats != NULL)
+       {
+         GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+         stats = NULL;
+       }
+      GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+      validation_map = NULL;
       return;
     }
   max_connect_per_transport = (uint32_t) tneigh;
@@ -3198,6 +3528,13 @@ run (void *cls,
                   _
                   ("Transport service could not access hostkey.  Exiting.\n"));
       GNUNET_SCHEDULER_shutdown (s);
+      if (stats != NULL)
+       {
+         GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+         stats = NULL;
+       }
+      GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+      validation_map = NULL;
       return;
     }
   GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);