calc update
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
index 4c94e293965fa0fb30c006435766bdb4721cc1e0..d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b 100644 (file)
@@ -197,6 +197,12 @@ struct ForeignAddressList
    */
   unsigned int connect_attempts;
 
+  /**
+   * DV distance to this peer (1 if no DV is used). 
+   * FIXME: need to set this from transport plugins!
+   */
+  uint32_t distance;
+
 };
 
 
@@ -284,11 +290,11 @@ struct TransportPlugin
 
 };
 
-struct NeighborList;
+struct NeighbourList;
 
 /**
- * For each neighbor we keep a list of messages
- * that we still want to transmit to the neighbor.
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
  */
 struct MessageQueue
 {
@@ -327,9 +333,9 @@ struct MessageQueue
   struct ForeignAddressList *specific_address;
 
   /**
-   * Peer ID of the Neighbor this entry belongs to.
+   * Peer ID of the Neighbour this entry belongs to.
    */
-  struct GNUNET_PeerIdentity neighbor_id;
+  struct GNUNET_PeerIdentity neighbour_id;
 
   /**
    * Plugin that we used for the transmission.
@@ -337,6 +343,11 @@ struct MessageQueue
    */
   struct TransportPlugin *plugin;
 
+  /**
+   * At what time should we fail?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
   /**
    * Internal message of the transport system that should not be
    * included in the usual SEND-SEND_OK transmission confirmation
@@ -355,7 +366,7 @@ struct MessageQueue
 
 
 /**
- * For a given Neighbor, which plugins are available
+ * For a given Neighbour, which plugins are available
  * to talk to this peer and what are their costs?
  */
 struct ReadyList
@@ -381,15 +392,15 @@ struct ReadyList
 
 
 /**
- * Entry in linked list of all of our current neighbors.
+ * Entry in linked list of all of our current neighbours.
  */
-struct NeighborList
+struct NeighbourList
 {
 
   /**
    * This is a linked list.
    */
-  struct NeighborList *next;
+  struct NeighbourList *next;
 
   /**
    * Which of our transports is connected to this peer
@@ -410,7 +421,7 @@ struct NeighborList
   struct MessageQueue *messages_tail;
 
   /**
-   * Identity of this neighbor.
+   * Identity of this neighbour.
    */
   struct GNUNET_PeerIdentity id;
 
@@ -420,6 +431,12 @@ struct NeighborList
    */
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
+  /**
+   * ID of task scheduled to run when we should retry transmitting
+   * the head of the message queue.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
   /**
    * How long until we should consider this peer dead
    * (if we don't receive another message in the
@@ -437,7 +454,7 @@ struct NeighborList
    * this particular peer.  This latency may have been calculated
    * over multiple transports.  This value reflects how long it took
    * us to receive a response when SENDING via this particular
-   * transport/neighbor/address combination!
+   * transport/neighbour/address combination!
    *
    * FIXME: we need to periodically send PINGs to update this
    * latency (at least more often than the current "huge" (11h?)
@@ -445,6 +462,11 @@ struct NeighborList
    */
   struct GNUNET_TIME_Relative latency;
 
+  /**
+   * DV distance to this peer (1 if no DV is used). 
+   */
+  uint32_t distance;
+
   /**
    * How many bytes have we received since the "last_quota_update"
    * timestamp?
@@ -452,7 +474,7 @@ struct NeighborList
   uint64_t last_received;
 
   /**
-   * Global quota for inbound traffic for the neighbor in bytes/ms.
+   * Global quota for inbound traffic for the neighbour in bytes/ms.
    */
   uint32_t quota_in;
 
@@ -465,9 +487,8 @@ struct NeighborList
   unsigned int quota_violation_count;
 
   /**
-   * Have we seen an ACK from this neighbor in the past?
-   * (used to make up a fake ACK for clients connecting after
-   * the neighbor connected to us).
+   * Have we seen an PONG from this neighbour in the past (and
+   * not had a disconnect since)?
    */
   int received_pong;
 
@@ -552,17 +573,22 @@ struct TransportPongMessage
 
 };
 
+
 /**
- * Linked list of messages to be transmitted to
- * the client.  Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client.  Each
+ * entry is followed by the actual message.
  */
 struct ClientMessageQueueEntry
 {
   /**
-   * This is a linked list.
+   * This is a doubly-linked list.
    */
   struct ClientMessageQueueEntry *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct ClientMessageQueueEntry *prev;
 };
 
 
@@ -594,6 +620,11 @@ struct TransportClient
    */
   struct ClientMessageQueueEntry *message_queue_tail;
 
+  /**
+   * Current transmit request handle.
+   */ 
+  struct GNUNET_CONNECTION_TransmitHandle *th;
+
   /**
    * Is a call to "transmit_send_continuation" pending?  If so, we
    * must not free this struct (even if the corresponding client
@@ -698,7 +729,7 @@ struct CheckHelloValidatedContext
 static struct GNUNET_HELLO_Message *our_hello;
 
 /**
- * "version" of "our_hello".  Used to see if a given neighbor has
+ * "version" of "our_hello".  Used to see if a given neighbour has
  * already been sent the latest version of our HELLO message.
  */
 static unsigned int our_hello_version;
@@ -744,12 +775,12 @@ static struct TransportPlugin *plugins;
 static struct GNUNET_SERVER_Handle *server;
 
 /**
- * All known neighbors and their HELLOs.
+ * All known neighbours and their HELLOs.
  */
-static struct NeighborList *neighbors;
+static struct NeighbourList *neighbours;
 
 /**
- * Number of neighbors we'd like to have.
+ * Number of neighbours we'd like to have.
  */
 static uint32_t max_connect_per_transport;
 
@@ -771,43 +802,42 @@ static struct CheckHelloValidatedContext *chvc_tail;
 static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
 
 
-
 /**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
  * has disconnected.  We may either need to do nothing (other plugins
  * still up), or trigger a full disconnect and clean up.  This
  * function updates our state and do the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
  * gone.
  *
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
  * @param check should we just check if all plugins
  *        disconnected or must we ask all plugins to
  *        disconnect?
  */
-static void disconnect_neighbor (struct NeighborList *n, int check);
+static void disconnect_neighbour (struct NeighbourList *n, int check);
 
 /**
- * Check the ready list for the given neighbor and if a plugin is
+ * Check the ready list for the given neighbour and if a plugin is
  * ready for transmission (and if we have a message), do so!
  *
- * @param neighbor target peer for which to transmit
+ * @param neighbour target peer for which to transmit
  */
-static void try_transmission_to_peer (struct NeighborList *neighbor);
+static void try_transmission_to_peer (struct NeighbourList *neighbour);
 
 
 /**
- * Find an entry in the neighbor list for a particular peer.
+ * Find an entry in the neighbour list for a particular peer.
  * if sender_address is not specified (NULL) then return the
  * first matching entry.  If sender_address is specified, then
  * make sure that the address and address_len also matches.
  *
  * @return NULL if not found.
  */
-static struct NeighborList *
-find_neighbor (const struct GNUNET_PeerIdentity *key)
+static struct NeighbourList *
+find_neighbour (const struct GNUNET_PeerIdentity *key)
 {
-  struct NeighborList *head = neighbors;
+  struct NeighbourList *head = neighbours;
 
   while ((head != NULL) &&
         (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
@@ -832,20 +862,27 @@ find_transport (const char *short_name)
 
 
 /**
- * Update the quota values for the given neighbor now.
+ * Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ * @param force GNUNET_YES to force recalculation now
  */
 static void
-update_quota (struct NeighborList *n)
+update_quota (struct NeighbourList *n,
+             int force)
 {
-  struct GNUNET_TIME_Relative delta;
+  struct GNUNET_TIME_Absolute now;
+  unsigned long long delta;
   uint64_t allowed;
   uint64_t remaining;
 
-  delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-  if (delta.value < MIN_QUOTA_REFRESH_TIME)
-    return;                     /* not enough time passed for doing quota update */
-  allowed = delta.value * n->quota_in;
-
+  now = GNUNET_TIME_absolute_get ();
+  delta = now.value - n->last_quota_update.value;
+  allowed = n->quota_in * delta;
+  if ( (delta < MIN_QUOTA_REFRESH_TIME) &&
+       (!force) &&
+       (allowed < 32 * 1024) )
+    return;                     /* too early, not enough data */
   if (n->last_received < allowed)
     {
       remaining = allowed - n->last_received;
@@ -856,7 +893,7 @@ update_quota (struct NeighborList *n)
       if (remaining > MAX_BANDWIDTH_CARRY)
         remaining = MAX_BANDWIDTH_CARRY;
       n->last_received = 0;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
+      n->last_quota_update = now;
       n->last_quota_update.value -= remaining;
       if (n->quota_violation_count > 0)
         n->quota_violation_count--;
@@ -864,10 +901,10 @@ update_quota (struct NeighborList *n)
   else
     {
       n->last_received -= allowed;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
+      n->last_quota_update = now;
       if (n->last_received > allowed)
         {
-          /* more than twice the allowed rate! */
+          /* much more than the allowed rate! */
           n->quota_violation_count += 10;
         }
     }
@@ -892,9 +929,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
   uint16_t msize;
   size_t tsize;
   const struct GNUNET_MessageHeader *msg;
-  struct GNUNET_CONNECTION_TransmitHandle *th;
   char *cbuf;
 
+  client->th = NULL;
   if (buf == NULL)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -902,10 +939,11 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
       /* fatal error with client, free message queue! */
       while (NULL != (q = client->message_queue_head))
         {
-          client->message_queue_head = q->next;
+         GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+                                      client->message_queue_tail,
+                                      q);
           GNUNET_free (q);
         }
-      client->message_queue_tail = NULL;
       client->message_count = 0;
       return 0;
     }
@@ -922,9 +960,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
                   "Transmitting message of type %u to client.\n",
                   ntohs (msg->type));
 #endif
-      client->message_queue_head = q->next;
-      if (q->next == NULL)
-        client->message_queue_tail = NULL;
+      GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+                                  client->message_queue_tail,
+                                  q);
       memcpy (&cbuf[tsize], msg, msize);
       tsize += msize;
       GNUNET_free (q);
@@ -933,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
   if (NULL != q)
     {
       GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
-      th = GNUNET_SERVER_notify_transmit_ready (client->client,
-                                                msize,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                &transmit_to_client_callback,
-                                                client);
-      GNUNET_assert (th != NULL);
+      client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client_callback,
+                                                       client);
+      GNUNET_assert (client->th != NULL);
     }
   return tsize;
 }
@@ -960,7 +998,6 @@ transmit_to_client (struct TransportClient *client,
 {
   struct ClientMessageQueueEntry *q;
   uint16_t msize;
-  struct GNUNET_CONNECTION_TransmitHandle *th;
 
   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
     {
@@ -971,34 +1008,51 @@ transmit_to_client (struct TransportClient *client,
       /* TODO: call to statistics... */
       return;
     }
-  client->message_count++;
   msize = ntohs (msg->size);
   GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
   memcpy (&q[1], msg, msize);
-  /* append to message queue */
-  if (client->message_queue_tail == NULL)
-    {
-      client->message_queue_tail = q;
-    }
-  else
+  GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+                                    client->message_queue_tail,
+                                    client->message_queue_tail,
+                                    q);                                     
+  client->message_count++;
+  if (client->th == NULL)
     {
-      client->message_queue_tail->next = q;
-      client->message_queue_tail = q;
-    }
-  if (client->message_queue_head == NULL)
-    {
-      client->message_queue_head = q;
-      th = GNUNET_SERVER_notify_transmit_ready (client->client,
-                                                msize,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                &transmit_to_client_callback,
-                                                client);
-      GNUNET_assert (th != NULL);
+      client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client_callback,
+                                                       client);
+      GNUNET_assert (client->th != NULL);
     }
 }
 
 
+/**
+ * Transmit a 'SEND_OK' notification to the given client for the
+ * given neighbour.
+ *
+ * @param client who to notify
+ * @param n neighbour to notify about
+ * @param result status code for the transmission request
+ */
+static void
+transmit_send_ok (struct TransportClient *client,
+                 struct NeighbourList *n,
+                 int result)
+{
+  struct SendOkMessage send_ok_msg;
+
+  send_ok_msg.header.size = htons (sizeof (send_ok_msg));
+  send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+  send_ok_msg.success = htonl (result);
+  send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
+  send_ok_msg.peer = n->id;
+  transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); 
+}
+
+
 /**
  * Function called by the GNUNET_TRANSPORT_TransmitFunction
  * upon "completion" of a send request.  This tells the API
@@ -1019,76 +1073,48 @@ transmit_send_continuation (void *cls,
                             int result)
 {
   struct MessageQueue *mq = cls;
-  /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */
-  struct SendOkMessage send_ok_msg;
-  struct NeighborList *n;
+  struct NeighbourList *n;
 
-  GNUNET_assert (mq != NULL);
-  n = find_neighbor(&mq->neighbor_id);
-  if (n == NULL) /* Neighbor must have been removed asynchronously! */
-    return;
-
-  /* Otherwise, let's make sure we've got the right peer */
-  GNUNET_assert (0 ==
-                 memcmp (&n->id, target,
-                         sizeof (struct GNUNET_PeerIdentity)));
-
-  if (result == GNUNET_OK)
+  n = find_neighbour(&mq->neighbour_id);
+  GNUNET_assert (n != NULL);
+  if (mq->specific_address != NULL)
     {
-      if (mq->specific_address != NULL)
+      if (result == GNUNET_OK)    
        {
          mq->specific_address->timeout =
            GNUNET_TIME_relative_to_absolute
            (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
          mq->specific_address->connected = GNUNET_YES;
-       }
-    }
-  else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Transmission to peer `%s' failed, marking connection as down.\n",
-                  GNUNET_i2s (target));
-      if (mq->specific_address != NULL)
-       mq->specific_address->connected = GNUNET_NO;
+       }    
+      else
+       {
+         mq->specific_address->connected = GNUNET_NO;
+       }    
+      if (! mq->internal_msg) 
+       mq->specific_address->in_transmit = GNUNET_NO;
     }
-  if ( (! mq->internal_msg) &&
-       (mq->specific_address != NULL) )
-    mq->specific_address->in_transmit = GNUNET_NO;
-
   if (mq->client != NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Notifying client %p about transmission to peer `%4s'.\n",
-                  mq->client, GNUNET_i2s (target));
-      send_ok_msg.header.size = htons (sizeof (send_ok_msg));
-      send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-      send_ok_msg.success = htonl (result);
-      send_ok_msg.peer = n->id;
-      transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
-    }
+    transmit_send_ok (mq->client, n, result);
   GNUNET_free (mq);
-  /* one plugin just became ready again, try transmitting
-     another message (if available) */
-  if (result == GNUNET_OK)
-    try_transmission_to_peer (n);
-  else
-    disconnect_neighbor (n, GNUNET_YES);
+  try_transmission_to_peer (n);
+  if (result != GNUNET_OK)
+    disconnect_neighbour (n, GNUNET_YES);    
 }
 
 
 /**
  * Find an address in any of the available transports for
- * the given neighbor that would be good for message
+ * the given neighbour that would be good for message
  * transmission.  This is essentially the transport selection
  * routine.
  *
- * @param neighbor for whom to select an address
+ * @param neighbour for whom to select an address
  * @return selected address, NULL if we have none
  */
 struct ForeignAddressList *
-find_ready_address(struct NeighborList *neighbor)
+find_ready_address(struct NeighbourList *neighbour)
 {
-  struct ReadyList *head = neighbor->plugins;
+  struct ReadyList *head = neighbour->plugins;
   struct ForeignAddressList *addresses;
   struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
   struct ForeignAddressList *best_address;
@@ -1105,7 +1131,7 @@ find_ready_address(struct NeighborList *neighbor)
 #if DEBUG_TRANSPORT
               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                           "Marking long-time inactive connection to `%4s' as down.\n",
-                          GNUNET_i2s (&neighbor->id));
+                          GNUNET_i2s (&neighbour->id));
 #endif
               addresses->connected = GNUNET_NO;
             }
@@ -1142,41 +1168,81 @@ find_ready_address(struct NeighborList *neighbor)
 
 
 /**
- * Check the ready list for the given neighbor and if a plugin is
+ * We should re-try transmitting to the given peer,
+ * hopefully we've learned something in the meantime.
+ */
+static void
+retry_transmission_task (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct NeighbourList *n = cls;
+
+  n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+  try_transmission_to_peer (n);
+}
+
+
+/**
+ * Check the ready list for the given neighbour and if a plugin is
  * ready for transmission (and if we have a message), do so!
  *
- * @param neighbor target peer for which to transmit
+ * @param neighbour target peer for which to transmit
  */
 static void
-try_transmission_to_peer (struct NeighborList *neighbor)
+try_transmission_to_peer (struct NeighbourList *neighbour)
 {
-  struct GNUNET_TIME_Relative min_latency;
   struct ReadyList *rl;
   struct MessageQueue *mq;
+  struct GNUNET_TIME_Relative timeout;
 
-  if (neighbor->messages_head == NULL)
+  if (neighbour->messages_head == NULL)
     return;                     /* nothing to do */
-  min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
   rl = NULL;
-  mq = neighbor->messages_head;
+  mq = neighbour->messages_head;
+  /* FIXME: support bi-directional use of TCP */
   if (mq->specific_address == NULL)
-    mq->specific_address = find_ready_address(neighbor); 
+    mq->specific_address = find_ready_address(neighbour); 
   if (mq->specific_address == NULL)
     {
+      timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+      if (timeout.value == 0)
+       {
+#if DEBUG_TRANSPORT
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "No destination address available to transmit message of size %u to peer `%4s'\n",
+                     mq->message_buf_size,
+                     GNUNET_i2s (&mq->neighbour_id));
+#endif
+         if (mq->client != NULL)
+           transmit_send_ok (mq->client, neighbour, GNUNET_NO);
+         GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+                                      neighbour->messages_tail,
+                                      mq);
+         GNUNET_free (mq);
+         return;               /* nobody ready */ 
+       }
+      if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched,
+                                neighbour->retry_task);
+      neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                           timeout,
+                                                           &retry_transmission_task,
+                                                           neighbour);
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "No destination address available to transmit message of size %u to peer `%4s'\n",
+                 "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
                  mq->message_buf_size,
-                 GNUNET_i2s (&mq->neighbor_id));
+                 GNUNET_i2s (&mq->neighbour_id),
+                 timeout.value);
 #endif
-      return;                   /* nobody ready */
+      return;    
     }
+  GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+                              neighbour->messages_tail,
+                              mq);
   if (mq->specific_address->connected == GNUNET_NO)
     mq->specific_address->connect_attempts++;
   rl = mq->specific_address->ready_list;
-  GNUNET_CONTAINER_DLL_remove (neighbor->messages_head,
-                              neighbor->messages_tail,
-                              mq);
   mq->plugin = rl->plugin;
   if (!mq->internal_msg)
     mq->specific_address->in_transmit = GNUNET_YES;
@@ -1184,13 +1250,13 @@ try_transmission_to_peer (struct NeighborList *neighbor)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
               mq->message_buf_size,
-              GNUNET_i2s (&neighbor->id), 
+              GNUNET_i2s (&neighbour->id), 
              GNUNET_a2s (mq->specific_address->addr,
                          mq->specific_address->addrlen),
              rl->plugin->short_name);
 #endif
   rl->plugin->api->send (rl->plugin->api->cls,
-                        &mq->neighbor_id,
+                        &mq->neighbour_id,
                         mq->message_buf,
                         mq->message_buf_size,
                         mq->priority,
@@ -1208,19 +1274,21 @@ try_transmission_to_peer (struct NeighborList *neighbor)
  * @param client source of the transmission request (can be NULL)
  * @param peer_address ForeignAddressList where we should send this message
  * @param priority how important is the message
+ * @param timeout how long do we have to transmit?
  * @param message_buf message(s) to send GNUNET_MessageHeader(s)
  * @param message_buf_size total size of all messages in message_buf
  * @param is_internal is this an internal message; these are pre-pended and
  *                    also do not count for plugins being "ready" to transmit
- * @param neighbor handle to the neighbor for transmission
+ * @param neighbour handle to the neighbour for transmission
  */
 static void
 transmit_to_peer (struct TransportClient *client,
                   struct ForeignAddressList *peer_address,
                   unsigned int priority,
+                 struct GNUNET_TIME_Relative timeout,
                   const char *message_buf,
                   size_t message_buf_size,
-                  int is_internal, struct NeighborList *neighbor)
+                  int is_internal, struct NeighbourList *neighbour)
 {
   struct MessageQueue *mq;
 
@@ -1228,7 +1296,7 @@ transmit_to_peer (struct TransportClient *client,
   if (client != NULL)
     {
       /* check for duplicate submission */
-      mq = neighbor->messages_head;
+      mq = neighbour->messages_head;
       while (NULL != mq)
         {
           if (mq->client == client)
@@ -1248,20 +1316,20 @@ transmit_to_peer (struct TransportClient *client,
   memcpy (&mq[1], message_buf, message_buf_size);
   mq->message_buf = (const char*) &mq[1];
   mq->message_buf_size = message_buf_size;
-  memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity));
+  memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity));
   mq->internal_msg = is_internal;
   mq->priority = priority;
-
+  mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
   if (is_internal)    
-    GNUNET_CONTAINER_DLL_insert (neighbor->messages_head,
-                                neighbor->messages_tail,
+    GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+                                neighbour->messages_tail,
                                 mq);
   else
-    GNUNET_CONTAINER_DLL_insert_after (neighbor->messages_head,
-                                      neighbor->messages_tail,
-                                      neighbor->messages_tail,
+    GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+                                      neighbour->messages_tail,
+                                      neighbour->messages_tail,
                                       mq);
-  try_transmission_to_peer (neighbor);
+  try_transmission_to_peer (neighbour);
 }
 
 
@@ -1313,7 +1381,7 @@ refresh_hello ()
 {
   struct GNUNET_HELLO_Message *hello;
   struct TransportClient *cpos;
-  struct NeighborList *npos;
+  struct NeighbourList *npos;
   struct GeneratorContext gc;
 
   gc.plug_pos = plugins;
@@ -1337,15 +1405,16 @@ refresh_hello ()
   our_hello = hello;
   our_hello_version++;
   GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
-  npos = neighbors;
+  npos = neighbours;
   while (npos != NULL)
     {
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Transmitting updated `%s' to neighbor `%4s'\n",
+                  "Transmitting updated `%s' to neighbour `%4s'\n",
                   "HELLO", GNUNET_i2s (&npos->id));
 #endif
       transmit_to_peer (NULL, NULL, 0,
+                       HELLO_ADDRESS_EXPIRATION,
                         (const char *) our_hello, 
                        GNUNET_HELLO_size(our_hello),
                         GNUNET_NO, npos);
@@ -1495,14 +1564,20 @@ plugin_env_notify_address (void *cls,
  */
 static void
 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
-                        struct GNUNET_TIME_Relative latency)
+                        struct GNUNET_TIME_Relative latency,
+                       uint32_t distance)
 {
   struct ConnectInfoMessage cim;
   struct TransportClient *cpos;
 
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Notifying clients about connection from `%s'\n",
+             GNUNET_i2s (peer));
+#endif
   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-  cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
+  cim.distance = htonl (distance);
   cim.latency = GNUNET_TIME_relative_hton (latency);
   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
   cpos = clients;
@@ -1523,6 +1598,11 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
   struct DisconnectInfoMessage dim;
   struct TransportClient *cpos;
 
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Notifying clients about lost connection to `%s'\n",
+             GNUNET_i2s (peer));
+#endif
   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
   dim.reserved = htonl (0);
@@ -1540,14 +1620,14 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
  * Find a ForeignAddressList entry for the given neighbour
  * that matches the given address and transport.
  *
- * @param neighbor which peer we care about
+ * @param neighbour which peer we care about
  * @param tname name of the transport plugin
  * @param addr binary address
  * @param addrlen length of addr
  * @return NULL if no such entry exists
  */
 static struct ForeignAddressList *
-find_peer_address(struct NeighborList *neighbor,
+find_peer_address(struct NeighbourList *neighbour,
                  const char *tname,
                  const char *addr,
                  size_t addrlen)
@@ -1555,7 +1635,7 @@ find_peer_address(struct NeighborList *neighbor,
   struct ReadyList *head;
   struct ForeignAddressList *address_head;
 
-  head = neighbor->plugins;
+  head = neighbour->plugins;
   while (head != NULL)
     {
       if (0 == strcmp (tname, head->plugin->short_name))
@@ -1575,17 +1655,17 @@ find_peer_address(struct NeighborList *neighbor,
 
 
 /**
- * Get the peer address struct for the given neighbor and
+ * Get the peer address struct for the given neighbour and
  * address.  If it doesn't yet exist, create it.
  *
- * @param neighbor which peer we care about
+ * @param neighbour which peer we care about
  * @param tname name of the transport plugin
  * @param addr binary address
  * @param addrlen length of addr
  * @return NULL if we do not have a transport plugin for 'tname'
  */
 static struct ForeignAddressList *
-add_peer_address(struct NeighborList *neighbor,
+add_peer_address(struct NeighbourList *neighbour,
                 const char *tname,
                 const char *addr, 
                 size_t addrlen)
@@ -1593,10 +1673,10 @@ add_peer_address(struct NeighborList *neighbor,
   struct ReadyList *head;
   struct ForeignAddressList *ret;
 
-  ret = find_peer_address (neighbor, tname, addr, addrlen);
+  ret = find_peer_address (neighbour, tname, addr, addrlen);
   if (ret != NULL)
     return ret;
-  head = neighbor->plugins;
+  head = neighbour->plugins;
   while (head != NULL)
     {
       if (0 == strcmp (tname, head->plugin->short_name))
@@ -1612,6 +1692,7 @@ add_peer_address(struct NeighborList *neighbor,
   ret->expires = GNUNET_TIME_relative_to_absolute
     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   ret->latency = GNUNET_TIME_relative_get_forever();
+  ret->distance = -1;
   ret->timeout = GNUNET_TIME_relative_to_absolute
     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 
   ret->ready_list = head;
@@ -1691,7 +1772,7 @@ check_pending_validation (void *cls,
   unsigned int challenge = ntohl(pong->challenge);
   struct GNUNET_HELLO_Message *hello;
   struct GNUNET_PeerIdentity target;
-  struct NeighborList *n;
+  struct NeighbourList *n;
   struct ForeignAddressList *fal;
 
   if (ve->challenge != challenge)
@@ -1718,7 +1799,7 @@ check_pending_validation (void *cls,
                            &target, 
                            hello);
   GNUNET_free (hello);
-  n = find_neighbor (&target);
+  n = find_neighbour (&target);
   if (n != NULL)
     {
       fal = add_peer_address (n, ve->transport_name, 
@@ -1731,6 +1812,19 @@ check_pending_validation (void *cls,
        n->latency = fal->latency;
       else
        n->latency.value = (fal->latency.value + n->latency.value) / 2;
+      n->distance = fal->distance;
+      if (GNUNET_NO == n->received_pong)
+       {
+         notify_clients_connect (&target, n->latency, n->distance);
+         n->received_pong = GNUNET_YES;
+       }
+      if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+       {
+         GNUNET_SCHEDULER_cancel (sched,
+                                  n->retry_task);
+         n->retry_task = GNUNET_SCHEDULER_NO_TASK;     
+         try_transmission_to_peer (n);
+       }
     }
 
   /* clean up validation entry */
@@ -1755,11 +1849,11 @@ check_pending_validation (void *cls,
  * (otherwise we may be seeing a MiM attack).
  *
  * @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
  * @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- *         by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ *         by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
  */
 static void
 handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
@@ -1795,6 +1889,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
 #endif
       return;
     }
+  
 #if 0
   /* FIXME: add given address to potential pool of our addresses
      (for voting) */
@@ -1808,39 +1903,39 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
 
 
 static void
-neighbor_timeout_task (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+neighbour_timeout_task (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct NeighborList *n = cls;
+  struct NeighbourList *n = cls;
 
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-              "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id));
+              "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
 #endif
   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  disconnect_neighbor (n, GNUNET_NO);
+  disconnect_neighbour (n, GNUNET_NO);
 }
 
 
 /**
- * Create a fresh entry in our neighbor list for the given peer.
- * Will try to transmit our current HELLO to the new neighbor.  Also
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour.  Also
  * notifies our clients about the new "connection".
  *
  * @param peer the peer for which we create the entry
- * @return the new neighbor list entry
+ * @return the new neighbour list entry
  */
-static struct NeighborList *
-setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
+static struct NeighbourList *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
 {
-  struct NeighborList *n;
+  struct NeighbourList *n;
   struct TransportPlugin *tp;
   struct ReadyList *rl;
 
   GNUNET_assert (our_hello != NULL);
-  n = GNUNET_malloc (sizeof (struct NeighborList));
-  n->next = neighbors;
-  neighbors = n;
+  n = GNUNET_malloc (sizeof (struct NeighbourList));
+  n->next = neighbours;
+  neighbours = n;
   n->id = *peer;
   n->last_quota_update = GNUNET_TIME_absolute_get ();
   n->peer_timeout =
@@ -1861,13 +1956,14 @@ setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
       tp = tp->next;
     }
   n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+  n->distance = -1;
   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                                  &neighbor_timeout_task, n);
+                                                  &neighbour_timeout_task, n);
   transmit_to_peer (NULL, NULL, 0,
+                   HELLO_ADDRESS_EXPIRATION,
                     (const char *) our_hello, GNUNET_HELLO_size(our_hello),
                     GNUNET_NO, n);
-  notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
   return n;
 }
 
@@ -1977,7 +2073,7 @@ run_validation (void *cls,
   struct GNUNET_PeerIdentity id;
   struct TransportPlugin *tp;
   struct ValidationEntry *va;
-  struct NeighborList *neighbor;
+  struct NeighbourList *neighbour;
   struct ForeignAddressList *peer_address;
   struct TransportPingMessage ping;
   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
@@ -2041,10 +2137,10 @@ run_validation (void *cls,
                                     &id.hashPubKey,
                                     va,
                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  neighbor = find_neighbor(&id);  
-  if (neighbor == NULL)
-    neighbor = setup_new_neighbor(&id);
-  peer_address = add_peer_address(neighbor, tname, addr, addrlen);    
+  neighbour = find_neighbour(&id);  
+  if (neighbour == NULL)
+    neighbour = setup_new_neighbour(&id);
+  peer_address = add_peer_address(neighbour, tname, addr, addrlen);    
   GNUNET_assert(peer_address != NULL);
   hello_size = GNUNET_HELLO_size(our_hello);
   tsize = sizeof(struct TransportPingMessage) + hello_size;
@@ -2068,8 +2164,9 @@ run_validation (void *cls,
 #endif
   transmit_to_peer (NULL, peer_address, 
                    GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+                   HELLO_VERIFICATION_TIMEOUT,
                    message_buf, tsize, 
-                   GNUNET_YES, neighbor);
+                   GNUNET_YES, neighbour);
   GNUNET_free(message_buf);
   return GNUNET_OK;
 }
@@ -2079,7 +2176,7 @@ run_validation (void *cls,
  * Add the given address to the list of foreign addresses
  * available for the given peer (check for duplicates).
  *
- * @param cls the respective 'struct NeighborList' to update
+ * @param cls the respective 'struct NeighbourList' to update
  * @param tname name of the transport
  * @param expiration expiration time
  * @param addr the address
@@ -2092,7 +2189,7 @@ add_to_foreign_address_list (void *cls,
                             struct GNUNET_TIME_Absolute expiration,
                             const void *addr, size_t addrlen)
 {
-  struct NeighborList *n = cls;
+  struct NeighbourList *n = cls;
   struct ForeignAddressList *fal;
 
   fal = find_peer_address (n, tname, addr, addrlen);
@@ -2100,10 +2197,11 @@ add_to_foreign_address_list (void *cls,
     {
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Adding address `%s' (%s) for peer `%4s' due to peerinfo data.\n",
+                 "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n",
                  GNUNET_a2s (addr, addrlen),
                  tname,
-                 GNUNET_i2s (&n->id));
+                 GNUNET_i2s (&n->id),
+                 expiration.value);
 #endif
       fal = add_peer_address (n, tname, addr, addrlen);
     }
@@ -2122,7 +2220,7 @@ add_to_foreign_address_list (void *cls,
  *
  * @param cls closure
  * @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
  * @param trust amount of trust we have in the peer (not used)
  */
 static void
@@ -2135,7 +2233,7 @@ check_hello_validated (void *cls,
   struct GNUNET_HELLO_Message *plain_hello;
   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
   struct GNUNET_PeerIdentity target;
-  struct NeighborList *n;
+  struct NeighbourList *n;
 
   if (peer == NULL)
     {
@@ -2173,12 +2271,15 @@ check_hello_validated (void *cls,
   if (h == NULL)
     return;
   chvc->hello_known = GNUNET_YES;
-  n = find_neighbor (peer);
+  n = find_neighbour (peer);
   if (n != NULL)
-    GNUNET_HELLO_iterate_addresses (h,
-                                   GNUNET_NO,
-                                   &add_to_foreign_address_list,
-                                   n);
+    {
+      GNUNET_HELLO_iterate_addresses (h,
+                                     GNUNET_NO,
+                                     &add_to_foreign_address_list,
+                                     n);
+      try_transmission_to_peer (n);
+    }
   GNUNET_HELLO_iterate_new_addresses (chvc->hello,
                                      h,
                                      GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME),
@@ -2253,41 +2354,28 @@ process_hello (struct TransportPlugin *plugin,
 
 
 /**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
  * has disconnected.  We may either need to do nothing (other plugins
  * still up), or trigger a full disconnect and clean up.  This
  * function updates our state and does the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
  * gone.
  *
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
  * @param check should we just check if all plugins
  *        disconnected or must we ask all plugins to
  *        disconnect?
  */
 static void
-disconnect_neighbor (struct NeighborList *current_handle, int check)
+disconnect_neighbour (struct NeighbourList *n, int check)
 {
   struct ReadyList *rpos;
-  struct NeighborList *npos;
-  struct NeighborList *nprev;
-  struct NeighborList *n;
+  struct NeighbourList *npos;
+  struct NeighbourList *nprev;
   struct MessageQueue *mq;
   struct ForeignAddressList *peer_addresses;
   struct ForeignAddressList *peer_pos;
 
-  if (neighbors == NULL)
-    return; /* We don't have any neighbors, so client has an already removed handle! */
-
-  npos = neighbors;
-  while ((npos != NULL) && (current_handle != npos))
-    npos = npos->next;
-
-  if (npos == NULL)
-    return; /* Couldn't find neighbor in existing list, must have been already removed! */
-  else
-    n = npos;
-
   if (GNUNET_YES == check)
     {
       rpos = n->plugins;
@@ -2303,14 +2391,14 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
           rpos = rpos->next;
         }
     }
-
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-              "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
+              "Disconnecting from `%4s'\n",
+             GNUNET_i2s (&n->id));
 #endif
-  /* remove n from neighbors list */
+  /* remove n from neighbours list */
   nprev = NULL;
-  npos = neighbors;
+  npos = neighbours;
   while ((npos != NULL) && (npos != n))
     {
       nprev = npos;
@@ -2318,12 +2406,13 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
     }
   GNUNET_assert (npos != NULL);
   if (nprev == NULL)
-    neighbors = n->next;
+    neighbours = n->next;
   else
     nprev->next = n->next;
 
   /* notify all clients about disconnect */
-  notify_clients_disconnect (&n->id);
+  if (GNUNET_YES == n->received_pong)
+    notify_clients_disconnect (&n->id);
 
   /* clean up all plugins, cancel connections and pending transmissions */
   while (NULL != (rpos = n->plugins))
@@ -2346,13 +2435,21 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
       GNUNET_CONTAINER_DLL_remove (n->messages_head,
                                   n->messages_tail,
                                   mq);
-      GNUNET_assert (0 == memcmp(&mq->neighbor_id, 
+      GNUNET_assert (0 == memcmp(&mq->neighbour_id, 
                                 &n->id,
                                 sizeof(struct GNUNET_PeerIdentity)));
       GNUNET_free (mq);
     }
   if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+    {
+      GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+      n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, n->retry_task);
+      n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   /* finally, free n itself */
   GNUNET_free (n);
 }
@@ -2376,7 +2473,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
   struct TransportPingMessage *ping;
   struct TransportPongMessage *pong;
   uint16_t msize;
-  struct NeighborList *n;
+  struct NeighbourList *n;
   struct ReadyList *rl;
   struct ForeignAddressList *fal;
 
@@ -2422,9 +2519,9 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
                  GNUNET_CRYPTO_rsa_sign (my_private_key,
                                          &pong->purpose, &pong->signature));
 
-  n = find_neighbor(peer);
+  n = find_neighbour(peer);
   if (n == NULL)
-    n = setup_new_neighbor(peer);
+    n = setup_new_neighbour(peer);
   /* broadcast 'PONG' to all available addresses */
   rl = n->plugins;
   while (rl != NULL)
@@ -2434,6 +2531,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
        {
          transmit_to_peer(NULL, fal,
                           TRANSPORT_PONG_PRIORITY, 
+                          HELLO_VERIFICATION_TIMEOUT,
                           (const char *)pong, 
                           ntohs(pong->header.size), 
                           GNUNET_YES, 
@@ -2447,6 +2545,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
 }
 
 
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+  struct GNUNET_TIME_Relative ret;
+  struct GNUNET_TIME_Absolute now;
+  uint64_t del;
+  uint64_t avail;
+  uint64_t excess;
+
+  now = GNUNET_TIME_absolute_get ();
+  del = now.value - n->last_quota_update.value;
+  if (del > MAX_BANDWIDTH_CARRY)
+    {
+      update_quota (n, GNUNET_YES);
+      del = now.value - n->last_quota_update.value;
+      GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+    }
+  if (n->quota_in == 0)
+    n->quota_in = 1;      /* avoid divison by zero */
+  avail = del * n->quota_in;
+  if (avail > n->last_received)
+    return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
+  excess = n->last_received - avail;
+  ret.value = excess / n->quota_in;
+  if (ret.value > 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+               (unsigned long long) excess,
+               (unsigned long long) n->quota_in,
+               (unsigned long long) ret.value);
+  return ret;
+}
+
+
 /**
  * Function called by the plugin for each received message.
  * Update data volumes, possibly notify plugins about
@@ -2454,18 +2593,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
  * and generally forward to our receive callback.
  *
  * @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- *        (opaque to transport service)
- * @param sender_address_len the length of the sender address
  * @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- *         for future receive calls for messages from this
- *         particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ *         (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
 plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message,
                     unsigned int distance, const char *sender_address,
@@ -2477,101 +2614,91 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
   struct InboundMessage *im;
   struct ForeignAddressList *peer_address;
   uint16_t msize;
-  struct NeighborList *n;
+  struct NeighbourList *n;
 
-  n = find_neighbor (peer);
+  n = find_neighbour (peer);
   if (n == NULL)
-    {
-      if (message == NULL)
-        return;                 /* disconnect of peer already marked down */
-      n = setup_new_neighbor (peer);
-    }
+    n = setup_new_neighbour (peer);    
+  update_quota (n, GNUNET_NO);
   service_context = n->plugins;
   while ((service_context != NULL) && (plugin != service_context->plugin))
     service_context = service_context->next;
   GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
-  if (message == NULL)
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Receive failed from `%4s', triggering disconnect\n",
-                  GNUNET_i2s (&n->id));
-#endif
-      /* TODO: call stats */
-      disconnect_neighbor (n, GNUNET_YES);
-      return;
-    }
-  peer_address = add_peer_address(n, 
-                                 plugin->short_name,
-                                 sender_address, 
-                                 sender_address_len);
-  if (peer_address != NULL)
+  if (message != NULL)
     {
-      if (peer_address->connected == GNUNET_NO)
-        {
-         peer_address->connected = GNUNET_YES;
-          peer_address->connect_attempts++;
-        }
-      peer_address->timeout
-        =
-        GNUNET_TIME_relative_to_absolute
-        (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    }
-  /* update traffic received amount ... */
-  msize = ntohs (message->size);
-  n->last_received += msize;
-  GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  n->timeout_task =
-    GNUNET_SCHEDULER_add_delayed (sched,
-                                  GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                  &neighbor_timeout_task, n);
-  update_quota (n);
-  if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
-    {
-      /* dropping message due to frequent inbound volume violations! */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
-                  GNUNET_ERROR_TYPE_BULK,
-                  _
-                  ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
-      /* TODO: call stats */
-      return;
-    }
-  switch (ntohs (message->type))
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      process_hello (plugin, message);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
-      handle_ping(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
-      handle_pong(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    default:
+      peer_address = add_peer_address(n, 
+                                     plugin->short_name,
+                                     sender_address, 
+                                     sender_address_len);  
+      if (peer_address != NULL)
+       {
+         peer_address->distance = distance;
+         if (peer_address->connected == GNUNET_NO)
+           {
+             peer_address->connected = GNUNET_YES;
+             peer_address->connect_attempts++;
+           }
+         peer_address->timeout
+           =
+           GNUNET_TIME_relative_to_absolute
+           (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+       }
+      /* update traffic received amount ... */
+      msize = ntohs (message->size);
+      n->distance = distance;
+      n->peer_timeout =
+       GNUNET_TIME_relative_to_absolute
+       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+      GNUNET_SCHEDULER_cancel (sched,
+                              n->timeout_task);
+      n->timeout_task =
+       GNUNET_SCHEDULER_add_delayed (sched,
+                                     GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                     &neighbour_timeout_task, n);
+      if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+       {
+         /* dropping message due to frequent inbound volume violations! */
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+                     GNUNET_ERROR_TYPE_BULK,
+                     _
+                     ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), 
+                     n->quota_violation_count);
+         return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+       }
+      switch (ntohs (message->type))
+       {
+       case GNUNET_MESSAGE_TYPE_HELLO:
+         process_hello (plugin, message);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+         handle_ping(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+         handle_pong(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       default:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received message of type %u from `%4s', sending to all clients.\n",
-                  ntohs (message->type), GNUNET_i2s (peer));
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Received message of type %u from `%4s', sending to all clients.\n",
+                     ntohs (message->type), GNUNET_i2s (peer));
 #endif
-      /* transmit message to all clients */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
-      im->header.size = htons (sizeof (struct InboundMessage) + msize);
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = GNUNET_TIME_relative_hton (n->latency);
-      im->peer = *peer;
-      memcpy (&im[1], message, msize);
-
-      cpos = clients;
-      while (cpos != NULL)
-        {
-          transmit_to_client (cpos, &im->header, GNUNET_YES);
-          cpos = cpos->next;
-        }
-      GNUNET_free (im);
-    }
+         /* transmit message to all clients */
+         im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+         im->header.size = htons (sizeof (struct InboundMessage) + msize);
+         im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+         im->latency = GNUNET_TIME_relative_hton (n->latency);
+         im->peer = *peer;
+         memcpy (&im[1], message, msize);
+         cpos = clients;
+         while (cpos != NULL)
+           {
+             transmit_to_client (cpos, &im->header, GNUNET_YES);
+             cpos = cpos->next;
+           }
+         GNUNET_free (im);
+       }
+    }  
+  return calculate_throttle_delay (n);
 }
 
 
@@ -2590,9 +2717,7 @@ handle_start (void *cls,
 {
   struct TransportClient *c;
   struct ConnectInfoMessage cim;
-  struct NeighborList *n;
-  struct InboundMessage *im;
-  struct GNUNET_MessageHeader *ack;
+  struct NeighbourList *n;
 
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2626,33 +2751,18 @@ handle_start (void *cls,
       /* tell new client about all existing connections */
       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-      cim.quota_out =
-        htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
-      cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) +
-                          sizeof (struct GNUNET_MessageHeader));
-      im->header.size = htons (sizeof (struct InboundMessage) +
-                               sizeof (struct GNUNET_MessageHeader));
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
-      ack = (struct GNUNET_MessageHeader *) &im[1];
-      ack->size = htons (sizeof (struct GNUNET_MessageHeader));
-      ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
-      for (n = neighbors; n != NULL; n = n->next)
-        {
-          cim.id = n->id;
-          transmit_to_client (c, &cim.header, GNUNET_NO);
-          if (n->received_pong)
-            {
-              im->peer = n->id;
-              transmit_to_client (c, &im->header, GNUNET_NO);
+      n = neighbours; 
+      while (n != NULL)
+       {
+         if (GNUNET_YES == n->received_pong)
+           {
+             cim.id = n->id;
+             cim.latency = GNUNET_TIME_relative_hton (n->latency);
+             cim.distance = htonl (n->distance);
+             transmit_to_client (c, &cim.header, GNUNET_NO);
             }
+           n = n->next;
         }
-      GNUNET_free (im);
-    }
-  else
-    {
-      fprintf(stderr, "Our hello is NULL!\n");
     }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -2672,10 +2782,6 @@ handle_hello (void *cls,
 {
   int ret;
 
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client\n", "HELLO");
-#endif
   ret = process_hello (NULL, message);
   GNUNET_SERVER_receive_done (client, ret);
 }
@@ -2694,7 +2800,7 @@ handle_send (void *cls,
              const struct GNUNET_MessageHeader *message)
 {
   struct TransportClient *tc;
-  struct NeighborList *n;
+  struct NeighbourList *n;
   const struct OutboundMessage *obm;
   const struct GNUNET_MessageHeader *obmm;
   uint16_t size;
@@ -2722,9 +2828,9 @@ handle_send (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
-  n = find_neighbor (&obm->peer);
+  n = find_neighbour (&obm->peer);
   if (n == NULL)
-    n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */
+    n = setup_new_neighbour (&obm->peer);
   tc = clients;
   while ((tc != NULL) && (tc->client != client))
     tc = tc->next;
@@ -2735,7 +2841,9 @@ handle_send (void *cls,
               ntohs (obmm->size),
               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
 #endif
-  transmit_to_peer (tc, NULL, ntohl (obm->priority), (char *)obmm, 
+  transmit_to_peer (tc, NULL, ntohl (obm->priority), 
+                   GNUNET_TIME_relative_ntoh (obm->timeout),
+                   (char *)obmm, 
                    ntohs (obmm->size), GNUNET_NO, n);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -2755,62 +2863,25 @@ handle_set_quota (void *cls,
 {
   const struct QuotaSetMessage *qsm =
     (const struct QuotaSetMessage *) message;
-  struct NeighborList *n;
-  struct TransportPlugin *p;
-  struct ReadyList *rl;
+  struct NeighbourList *n;
+  uint32_t qin;
 
-  n = find_neighbor (&qsm->peer);
+  n = find_neighbour (&qsm->peer);
   if (n == NULL)
     {
       GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
     }
-
+  qin = ntohl (qsm->quota_in);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
-              "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+              "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer));
 #endif
-
-  update_quota (n);
-  if (n->quota_in < ntohl (qsm->quota_in))
+  update_quota (n, GNUNET_YES);
+  if (n->quota_in < qin)
     n->last_quota_update = GNUNET_TIME_absolute_get ();
-  n->quota_in = ntohl (qsm->quota_in);
-  rl = n->plugins;
-  while (rl != NULL)
-    {
-      p = rl->plugin;
-      p->api->set_receive_quota (p->api->cls,
-                                 &qsm->peer, ntohl (qsm->quota_in));
-      rl = rl->next;
-    }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Handle TRY_CONNECT-message.
- *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_try_connect (void *cls,
-                    struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *message)
-{
-  const struct TryConnectMessage *tcm;
-  struct NeighborList *neighbor;
-  tcm = (const struct TryConnectMessage *) message;
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client %p asking to connect to `%4s'\n",
-              "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
-#endif
-  neighbor = find_neighbor(&tcm->peer);
-  if (neighbor == NULL)
-    setup_new_neighbor (&tcm->peer);
+  n->quota_in = qin;
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -2908,9 +2979,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
   {&handle_set_quota, NULL,
    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
-  {&handle_try_connect, NULL,
-   GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
-   sizeof (struct TryConnectMessage)},
   {&handle_address_lookup, NULL,
    GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
    0},
@@ -2930,8 +2998,6 @@ create_environment (struct TransportPlugin *plug)
   plug->env.cls = plug;
   plug->env.receive = &plugin_env_receive;
   plug->env.notify_address = &plugin_env_notify_address;
-  plug->env.default_quota_in =
-    (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
   plug->env.max_connections = max_connect_per_transport;
 }
 
@@ -2999,10 +3065,12 @@ client_disconnect_notification (void *cls,
     return;
   while (NULL != (mqe = pos->message_queue_head))
     {
-      pos->message_queue_head = mqe->next;
+      GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+                                  pos->message_queue_tail,
+                                  mqe);
+      pos->message_count--;
       GNUNET_free (mqe);
     }
-  pos->message_queue_head = NULL;
   if (prev == NULL)
     clients = pos->next;
   else
@@ -3012,6 +3080,12 @@ client_disconnect_notification (void *cls,
       pos->client = NULL;
       return;
     }
+  if (pos->th != NULL)
+    {
+      GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+      pos->th = NULL;
+    }
+  GNUNET_break (0 == pos->message_count);
   GNUNET_free (pos);
 }
 
@@ -3052,6 +3126,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct OwnAddressList *al;
   struct CheckHelloValidatedContext *chvc;
 
+  while (neighbours != NULL)
+    disconnect_neighbour (neighbours, GNUNET_NO);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transport service is unloading plugins...\n");