calc update
[oweals/gnunet.git] / src / transport / gnunet-service-transport.c
index 239ba9ca6dc3605e71f71a2bfe2db4932c205b88..d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b 100644 (file)
 #include "plugin_transport.h"
 #include "transport.h"
 
+/**
+ * Should we do some additional checks (to validate behavior
+ * of clients)?
+ */
+#define EXTRA_CHECKS GNUNET_YES
+
 /**
  * How many messages can we have pending for a given client process
  * before we start to drop incoming messages?  We typically should
@@ -164,24 +170,20 @@ struct ForeignAddressList
   struct GNUNET_TIME_Absolute timeout;
 
   /**
-   * Is this plugin currently connected?  The first time we transmit
-   * or send data to a peer via a particular plugin, we set this to
-   * GNUNET_YES.  If we later get an error (disconnect notification or
-   * transmission failure), we set it back to GNUNET_NO.  Each time
-   * the value is set to GNUNET_YES, we increment the
-   * "connect_attempts" counter.  If that one reaches a particular
-   * threshold, we consider the address to not be working properly at
-   * this time and remove it from the eligible list.
+   * Are we currently connected via this address?  The first time we
+   * successfully transmit or receive data to a peer via a particular
+   * address, we set this to GNUNET_YES.  If we later get an error
+   * (disconnect notification, transmission failure, timeout), we set
+   * it back to GNUNET_NO.  
    */
   int connected;
 
   /**
-   * Is this plugin ready to transmit to the specific target?
-   * GNUNET_NO if not.  Initially, all plugins are marked ready.  If a
-   * transmission is in progress, "transmit_ready" is set to
-   * GNUNET_NO.
+   * Is this plugin currently busy transmitting to the specific target?
+   * GNUNET_NO if not (initial, default state is GNUNET_NO).   Internal
+   * messages do not count as 'in transmit'.
    */
-  int transmit_ready;
+  int in_transmit;
 
   /**
    * Has this address been validated yet?
@@ -189,10 +191,18 @@ struct ForeignAddressList
   int validated;
 
   /**
-   * How often have we tried to connect using this plugin?
+   * How often have we tried to connect using this plugin?  Used to
+   * discriminate against addresses that do not work well.
+   * FIXME: not yet used, but should be!
    */
   unsigned int connect_attempts;
 
+  /**
+   * DV distance to this peer (1 if no DV is used). 
+   * FIXME: need to set this from transport plugins!
+   */
+  uint32_t distance;
+
 };
 
 
@@ -280,20 +290,25 @@ struct TransportPlugin
 
 };
 
-struct NeighborList;
+struct NeighbourList;
 
 /**
- * For each neighbor we keep a list of messages
- * that we still want to transmit to the neighbor.
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
  */
 struct MessageQueue
 {
 
   /**
-   * This is a linked list.
+   * This is a doubly linked list.
    */
   struct MessageQueue *next;
 
+  /**
+   * This is a doubly linked list.
+   */
+  struct MessageQueue *prev;
+
   /**
    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
    * stuck together in memory.  Allocated at the end of this struct.
@@ -318,9 +333,9 @@ struct MessageQueue
   struct ForeignAddressList *specific_address;
 
   /**
-   * Peer ID of the Neighbor this entry belongs to.
+   * Peer ID of the Neighbour this entry belongs to.
    */
-  struct GNUNET_PeerIdentity neighbor_id;
+  struct GNUNET_PeerIdentity neighbour_id;
 
   /**
    * Plugin that we used for the transmission.
@@ -328,6 +343,11 @@ struct MessageQueue
    */
   struct TransportPlugin *plugin;
 
+  /**
+   * At what time should we fail?
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
   /**
    * Internal message of the transport system that should not be
    * included in the usual SEND-SEND_OK transmission confirmation
@@ -346,7 +366,7 @@ struct MessageQueue
 
 
 /**
- * For a given Neighbor, which plugins are available
+ * For a given Neighbour, which plugins are available
  * to talk to this peer and what are their costs?
  */
 struct ReadyList
@@ -368,25 +388,19 @@ struct ReadyList
    */
   struct ForeignAddressList *addresses;
 
-  /**
-   * Is the plugin represented by this entry currently connected to
-   * the respective peer?
-   */
-  int connected;
-
 };
 
 
 /**
- * Entry in linked list of all of our current neighbors.
+ * Entry in linked list of all of our current neighbours.
  */
-struct NeighborList
+struct NeighbourList
 {
 
   /**
    * This is a linked list.
    */
-  struct NeighborList *next;
+  struct NeighbourList *next;
 
   /**
    * Which of our transports is connected to this peer
@@ -395,13 +409,19 @@ struct NeighborList
   struct ReadyList *plugins;
 
   /**
-   * List of messages we would like to send to this peer;
+   * Head of list of messages we would like to send to this peer;
    * must contain at most one message per client.
    */
-  struct MessageQueue *messages;
+  struct MessageQueue *messages_head;
+
+  /**
+   * Tail of list of messages we would like to send to this peer; must
+   * contain at most one message per client.
+   */
+  struct MessageQueue *messages_tail;
 
   /**
-   * Identity of this neighbor.
+   * Identity of this neighbour.
    */
   struct GNUNET_PeerIdentity id;
 
@@ -411,6 +431,12 @@ struct NeighborList
    */
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
+  /**
+   * ID of task scheduled to run when we should retry transmitting
+   * the head of the message queue.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
   /**
    * How long until we should consider this peer dead
    * (if we don't receive another message in the
@@ -428,10 +454,18 @@ struct NeighborList
    * this particular peer.  This latency may have been calculated
    * over multiple transports.  This value reflects how long it took
    * us to receive a response when SENDING via this particular
-   * transport/neighbor/address combination!
-   * FIXME: why is this NBO?
+   * transport/neighbour/address combination!
+   *
+   * FIXME: we need to periodically send PINGs to update this
+   * latency (at least more often than the current "huge" (11h?)
+   * update interval).
+   */
+  struct GNUNET_TIME_Relative latency;
+
+  /**
+   * DV distance to this peer (1 if no DV is used). 
    */
-  struct GNUNET_TIME_RelativeNBO latency;
+  uint32_t distance;
 
   /**
    * How many bytes have we received since the "last_quota_update"
@@ -440,7 +474,7 @@ struct NeighborList
   uint64_t last_received;
 
   /**
-   * Global quota for inbound traffic for the neighbor in bytes/ms.
+   * Global quota for inbound traffic for the neighbour in bytes/ms.
    */
   uint32_t quota_in;
 
@@ -453,9 +487,8 @@ struct NeighborList
   unsigned int quota_violation_count;
 
   /**
-   * Have we seen an ACK from this neighbor in the past?
-   * (used to make up a fake ACK for clients connecting after
-   * the neighbor connected to us).
+   * Have we seen an PONG from this neighbour in the past (and
+   * not had a disconnect since)?
    */
   int received_pong;
 
@@ -540,17 +573,22 @@ struct TransportPongMessage
 
 };
 
+
 /**
- * Linked list of messages to be transmitted to
- * the client.  Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client.  Each
+ * entry is followed by the actual message.
  */
 struct ClientMessageQueueEntry
 {
   /**
-   * This is a linked list.
+   * This is a doubly-linked list.
    */
   struct ClientMessageQueueEntry *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct ClientMessageQueueEntry *prev;
 };
 
 
@@ -582,6 +620,11 @@ struct TransportClient
    */
   struct ClientMessageQueueEntry *message_queue_tail;
 
+  /**
+   * Current transmit request handle.
+   */ 
+  struct GNUNET_CONNECTION_TransmitHandle *th;
+
   /**
    * Is a call to "transmit_send_continuation" pending?  If so, we
    * must not free this struct (even if the corresponding client
@@ -686,7 +729,7 @@ struct CheckHelloValidatedContext
 static struct GNUNET_HELLO_Message *our_hello;
 
 /**
- * "version" of "our_hello".  Used to see if a given neighbor has
+ * "version" of "our_hello".  Used to see if a given neighbour has
  * already been sent the latest version of our HELLO message.
  */
 static unsigned int our_hello_version;
@@ -732,12 +775,12 @@ static struct TransportPlugin *plugins;
 static struct GNUNET_SERVER_Handle *server;
 
 /**
- * All known neighbors and their HELLOs.
+ * All known neighbours and their HELLOs.
  */
-static struct NeighborList *neighbors;
+static struct NeighbourList *neighbours;
 
 /**
- * Number of neighbors we'd like to have.
+ * Number of neighbours we'd like to have.
  */
 static uint32_t max_connect_per_transport;
 
@@ -759,43 +802,42 @@ static struct CheckHelloValidatedContext *chvc_tail;
 static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
 
 
-
 /**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
  * has disconnected.  We may either need to do nothing (other plugins
  * still up), or trigger a full disconnect and clean up.  This
  * function updates our state and do the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
  * gone.
  *
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
  * @param check should we just check if all plugins
  *        disconnected or must we ask all plugins to
  *        disconnect?
  */
-static void disconnect_neighbor (struct NeighborList *n, int check);
+static void disconnect_neighbour (struct NeighbourList *n, int check);
 
 /**
- * Check the ready list for the given neighbor and if a plugin is
+ * Check the ready list for the given neighbour and if a plugin is
  * ready for transmission (and if we have a message), do so!
  *
- * @param neighbor target peer for which to transmit
+ * @param neighbour target peer for which to transmit
  */
-static void try_transmission_to_peer (struct NeighborList *neighbor);
+static void try_transmission_to_peer (struct NeighbourList *neighbour);
 
 
 /**
- * Find an entry in the neighbor list for a particular peer.
+ * Find an entry in the neighbour list for a particular peer.
  * if sender_address is not specified (NULL) then return the
  * first matching entry.  If sender_address is specified, then
  * make sure that the address and address_len also matches.
  *
  * @return NULL if not found.
  */
-static struct NeighborList *
-find_neighbor (const struct GNUNET_PeerIdentity *key)
+static struct NeighbourList *
+find_neighbour (const struct GNUNET_PeerIdentity *key)
 {
-  struct NeighborList *head = neighbors;
+  struct NeighbourList *head = neighbours;
 
   while ((head != NULL) &&
         (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
@@ -820,20 +862,27 @@ find_transport (const char *short_name)
 
 
 /**
- * Update the quota values for the given neighbor now.
+ * Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ * @param force GNUNET_YES to force recalculation now
  */
 static void
-update_quota (struct NeighborList *n)
+update_quota (struct NeighbourList *n,
+             int force)
 {
-  struct GNUNET_TIME_Relative delta;
+  struct GNUNET_TIME_Absolute now;
+  unsigned long long delta;
   uint64_t allowed;
   uint64_t remaining;
 
-  delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-  if (delta.value < MIN_QUOTA_REFRESH_TIME)
-    return;                     /* not enough time passed for doing quota update */
-  allowed = delta.value * n->quota_in;
-
+  now = GNUNET_TIME_absolute_get ();
+  delta = now.value - n->last_quota_update.value;
+  allowed = n->quota_in * delta;
+  if ( (delta < MIN_QUOTA_REFRESH_TIME) &&
+       (!force) &&
+       (allowed < 32 * 1024) )
+    return;                     /* too early, not enough data */
   if (n->last_received < allowed)
     {
       remaining = allowed - n->last_received;
@@ -844,7 +893,7 @@ update_quota (struct NeighborList *n)
       if (remaining > MAX_BANDWIDTH_CARRY)
         remaining = MAX_BANDWIDTH_CARRY;
       n->last_received = 0;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
+      n->last_quota_update = now;
       n->last_quota_update.value -= remaining;
       if (n->quota_violation_count > 0)
         n->quota_violation_count--;
@@ -852,10 +901,10 @@ update_quota (struct NeighborList *n)
   else
     {
       n->last_received -= allowed;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
+      n->last_quota_update = now;
       if (n->last_received > allowed)
         {
-          /* more than twice the allowed rate! */
+          /* much more than the allowed rate! */
           n->quota_violation_count += 10;
         }
     }
@@ -880,9 +929,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
   uint16_t msize;
   size_t tsize;
   const struct GNUNET_MessageHeader *msg;
-  struct GNUNET_CONNECTION_TransmitHandle *th;
   char *cbuf;
 
+  client->th = NULL;
   if (buf == NULL)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -890,10 +939,11 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
       /* fatal error with client, free message queue! */
       while (NULL != (q = client->message_queue_head))
         {
-          client->message_queue_head = q->next;
+         GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+                                      client->message_queue_tail,
+                                      q);
           GNUNET_free (q);
         }
-      client->message_queue_tail = NULL;
       client->message_count = 0;
       return 0;
     }
@@ -910,9 +960,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
                   "Transmitting message of type %u to client.\n",
                   ntohs (msg->type));
 #endif
-      client->message_queue_head = q->next;
-      if (q->next == NULL)
-        client->message_queue_tail = NULL;
+      GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+                                  client->message_queue_tail,
+                                  q);
       memcpy (&cbuf[tsize], msg, msize);
       tsize += msize;
       GNUNET_free (q);
@@ -921,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf)
   if (NULL != q)
     {
       GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
-      th = GNUNET_SERVER_notify_transmit_ready (client->client,
-                                                msize,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                &transmit_to_client_callback,
-                                                client);
-      GNUNET_assert (th != NULL);
+      client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client_callback,
+                                                       client);
+      GNUNET_assert (client->th != NULL);
     }
   return tsize;
 }
@@ -948,7 +998,6 @@ transmit_to_client (struct TransportClient *client,
 {
   struct ClientMessageQueueEntry *q;
   uint16_t msize;
-  struct GNUNET_CONNECTION_TransmitHandle *th;
 
   if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
     {
@@ -959,34 +1008,51 @@ transmit_to_client (struct TransportClient *client,
       /* TODO: call to statistics... */
       return;
     }
-  client->message_count++;
   msize = ntohs (msg->size);
   GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
   q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
   memcpy (&q[1], msg, msize);
-  /* append to message queue */
-  if (client->message_queue_tail == NULL)
-    {
-      client->message_queue_tail = q;
-    }
-  else
+  GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+                                    client->message_queue_tail,
+                                    client->message_queue_tail,
+                                    q);                                     
+  client->message_count++;
+  if (client->th == NULL)
     {
-      client->message_queue_tail->next = q;
-      client->message_queue_tail = q;
-    }
-  if (client->message_queue_head == NULL)
-    {
-      client->message_queue_head = q;
-      th = GNUNET_SERVER_notify_transmit_ready (client->client,
-                                                msize,
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                &transmit_to_client_callback,
-                                                client);
-      GNUNET_assert (th != NULL);
+      client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+                                                       msize,
+                                                       GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client_callback,
+                                                       client);
+      GNUNET_assert (client->th != NULL);
     }
 }
 
 
+/**
+ * Transmit a 'SEND_OK' notification to the given client for the
+ * given neighbour.
+ *
+ * @param client who to notify
+ * @param n neighbour to notify about
+ * @param result status code for the transmission request
+ */
+static void
+transmit_send_ok (struct TransportClient *client,
+                 struct NeighbourList *n,
+                 int result)
+{
+  struct SendOkMessage send_ok_msg;
+
+  send_ok_msg.header.size = htons (sizeof (send_ok_msg));
+  send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+  send_ok_msg.success = htonl (result);
+  send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
+  send_ok_msg.peer = n->id;
+  transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); 
+}
+
+
 /**
  * Function called by the GNUNET_TRANSPORT_TransmitFunction
  * upon "completion" of a send request.  This tells the API
@@ -1007,76 +1073,48 @@ transmit_send_continuation (void *cls,
                             int result)
 {
   struct MessageQueue *mq = cls;
-  /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */
-  struct SendOkMessage send_ok_msg;
-  struct NeighborList *n;
-
-  GNUNET_assert (mq != NULL);
-  n = find_neighbor(&mq->neighbor_id);
-  if (n == NULL) /* Neighbor must have been removed asynchronously! */
-    return;
-
-  /* Otherwise, let's make sure we've got the right peer */
-  GNUNET_assert (0 ==
-                 memcmp (&n->id, target,
-                         sizeof (struct GNUNET_PeerIdentity)));
+  struct NeighbourList *n;
 
-  if (result == GNUNET_OK)
-    {
-      mq->specific_address->timeout =
-        GNUNET_TIME_relative_to_absolute
-        (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    }
-  else
+  n = find_neighbour(&mq->neighbour_id);
+  GNUNET_assert (n != NULL);
+  if (mq->specific_address != NULL)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Transmission to peer `%s' failed, marking connection as down.\n",
-                  GNUNET_i2s (target));
-      mq->specific_address->connected = GNUNET_NO;
-    }
-  if (!mq->internal_msg)
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Setting transmit_ready on transport!\n");
-#endif
-      mq->specific_address->transmit_ready = GNUNET_YES;
+      if (result == GNUNET_OK)    
+       {
+         mq->specific_address->timeout =
+           GNUNET_TIME_relative_to_absolute
+           (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+         mq->specific_address->connected = GNUNET_YES;
+       }    
+      else
+       {
+         mq->specific_address->connected = GNUNET_NO;
+       }    
+      if (! mq->internal_msg) 
+       mq->specific_address->in_transmit = GNUNET_NO;
     }
-
   if (mq->client != NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Notifying client %p about transmission to peer `%4s'.\n",
-                  mq->client, GNUNET_i2s (target));
-      send_ok_msg.header.size = htons (sizeof (send_ok_msg));
-      send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
-      send_ok_msg.success = htonl (result);
-      send_ok_msg.peer = n->id;
-      transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
-    }
+    transmit_send_ok (mq->client, n, result);
   GNUNET_free (mq);
-  /* one plugin just became ready again, try transmitting
-     another message (if available) */
-  if (result == GNUNET_OK)
-    try_transmission_to_peer (n);
-  else
-    disconnect_neighbor (n, GNUNET_YES);
+  try_transmission_to_peer (n);
+  if (result != GNUNET_OK)
+    disconnect_neighbour (n, GNUNET_YES);    
 }
 
 
 /**
  * Find an address in any of the available transports for
- * the given neighbor that would be good for message
+ * the given neighbour that would be good for message
  * transmission.  This is essentially the transport selection
  * routine.
  *
- * @param neighbor for whom to select an address
+ * @param neighbour for whom to select an address
  * @return selected address, NULL if we have none
  */
 struct ForeignAddressList *
-find_ready_address(struct NeighborList *neighbor)
+find_ready_address(struct NeighbourList *neighbour)
 {
-  struct ReadyList *head = neighbor->plugins;
+  struct ReadyList *head = neighbour->plugins;
   struct ForeignAddressList *addresses;
   struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
   struct ForeignAddressList *best_address;
@@ -1093,7 +1131,7 @@ find_ready_address(struct NeighborList *neighbor)
 #if DEBUG_TRANSPORT
               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                           "Marking long-time inactive connection to `%4s' as down.\n",
-                          GNUNET_i2s (&neighbor->id));
+                          GNUNET_i2s (&neighbour->id));
 #endif
               addresses->connected = GNUNET_NO;
             }
@@ -1106,7 +1144,7 @@ find_ready_address(struct NeighborList *neighbor)
           if ( ( (best_address == NULL) || 
                 (addresses->connected == GNUNET_YES) ||
                 (best_address->connected == GNUNET_NO) ) &&
-              (addresses->transmit_ready == GNUNET_YES) &&
+              (addresses->in_transmit == GNUNET_NO) &&
               ( (best_address == NULL) || 
                 (addresses->latency.value < best_address->latency.value)) )
            best_address = addresses;            
@@ -1130,51 +1168,95 @@ find_ready_address(struct NeighborList *neighbor)
 
 
 /**
- * Check the ready list for the given neighbor and if a plugin is
+ * We should re-try transmitting to the given peer,
+ * hopefully we've learned something in the meantime.
+ */
+static void
+retry_transmission_task (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct NeighbourList *n = cls;
+
+  n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+  try_transmission_to_peer (n);
+}
+
+
+/**
+ * Check the ready list for the given neighbour and if a plugin is
  * ready for transmission (and if we have a message), do so!
  *
- * @param neighbor target peer for which to transmit
+ * @param neighbour target peer for which to transmit
  */
 static void
-try_transmission_to_peer (struct NeighborList *neighbor)
+try_transmission_to_peer (struct NeighbourList *neighbour)
 {
-  struct GNUNET_TIME_Relative min_latency;
   struct ReadyList *rl;
   struct MessageQueue *mq;
+  struct GNUNET_TIME_Relative timeout;
 
-  if (neighbor->messages == NULL)
+  if (neighbour->messages_head == NULL)
     return;                     /* nothing to do */
-  min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
   rl = NULL;
-  mq = neighbor->messages;
+  mq = neighbour->messages_head;
+  /* FIXME: support bi-directional use of TCP */
   if (mq->specific_address == NULL)
-    mq->specific_address = find_ready_address(neighbor); 
+    mq->specific_address = find_ready_address(neighbour); 
   if (mq->specific_address == NULL)
     {
+      timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+      if (timeout.value == 0)
+       {
+#if DEBUG_TRANSPORT
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "No destination address available to transmit message of size %u to peer `%4s'\n",
+                     mq->message_buf_size,
+                     GNUNET_i2s (&mq->neighbour_id));
+#endif
+         if (mq->client != NULL)
+           transmit_send_ok (mq->client, neighbour, GNUNET_NO);
+         GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+                                      neighbour->messages_tail,
+                                      mq);
+         GNUNET_free (mq);
+         return;               /* nobody ready */ 
+       }
+      if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (sched,
+                                neighbour->retry_task);
+      neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched,
+                                                           timeout,
+                                                           &retry_transmission_task,
+                                                           neighbour);
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "No destination address available to transmit message of size %u to peer `%4s'\n",
+                 "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
                  mq->message_buf_size,
-                 GNUNET_i2s (&mq->neighbor_id));
+                 GNUNET_i2s (&mq->neighbour_id),
+                 timeout.value);
 #endif
-      return;                   /* nobody ready */
+      return;    
     }
+  GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+                              neighbour->messages_tail,
+                              mq);
+  if (mq->specific_address->connected == GNUNET_NO)
+    mq->specific_address->connect_attempts++;
   rl = mq->specific_address->ready_list;
-  neighbor->messages = mq->next;
   mq->plugin = rl->plugin;
   if (!mq->internal_msg)
-    mq->specific_address->transmit_ready = GNUNET_NO;
+    mq->specific_address->in_transmit = GNUNET_YES;
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
               mq->message_buf_size,
-              GNUNET_i2s (&neighbor->id), 
+              GNUNET_i2s (&neighbour->id), 
              GNUNET_a2s (mq->specific_address->addr,
                          mq->specific_address->addrlen),
              rl->plugin->short_name);
 #endif
   rl->plugin->api->send (rl->plugin->api->cls,
-                        &mq->neighbor_id,
+                        &mq->neighbour_id,
                         mq->message_buf,
                         mq->message_buf_size,
                         mq->priority,
@@ -1192,26 +1274,29 @@ try_transmission_to_peer (struct NeighborList *neighbor)
  * @param client source of the transmission request (can be NULL)
  * @param peer_address ForeignAddressList where we should send this message
  * @param priority how important is the message
+ * @param timeout how long do we have to transmit?
  * @param message_buf message(s) to send GNUNET_MessageHeader(s)
  * @param message_buf_size total size of all messages in message_buf
- * @param is_internal is this an internal message
- * @param neighbor handle to the neighbor for transmission
+ * @param is_internal is this an internal message; these are pre-pended and
+ *                    also do not count for plugins being "ready" to transmit
+ * @param neighbour handle to the neighbour for transmission
  */
 static void
 transmit_to_peer (struct TransportClient *client,
                   struct ForeignAddressList *peer_address,
                   unsigned int priority,
+                 struct GNUNET_TIME_Relative timeout,
                   const char *message_buf,
                   size_t message_buf_size,
-                  int is_internal, struct NeighborList *neighbor)
+                  int is_internal, struct NeighbourList *neighbour)
 {
   struct MessageQueue *mq;
-  struct MessageQueue *mqe;
 
+#if EXTRA_CHECKS
   if (client != NULL)
     {
       /* check for duplicate submission */
-      mq = neighbor->messages;
+      mq = neighbour->messages_head;
       while (NULL != mq)
         {
           if (mq->client == client)
@@ -1224,41 +1309,27 @@ transmit_to_peer (struct TransportClient *client,
           mq = mq->next;
         }
     }
+#endif
   mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
   mq->specific_address = peer_address;
   mq->client = client;
   memcpy (&mq[1], message_buf, message_buf_size);
   mq->message_buf = (const char*) &mq[1];
   mq->message_buf_size = message_buf_size;
-  memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity));
+  memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity));
   mq->internal_msg = is_internal;
   mq->priority = priority;
-
-  if (is_internal)
-    {
-      /* append at head */
-      mq->next = neighbor->messages;
-      neighbor->messages = mq;
-    }
+  mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+  if (is_internal)    
+    GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+                                neighbour->messages_tail,
+                                mq);
   else
-    {
-      /* find tail */
-      mqe = neighbor->messages;
-      if (mqe != NULL)
-       while (mqe->next != NULL)
-         mqe = mqe->next;
-      if (mqe == NULL)
-       {
-         /* new head */
-         neighbor->messages = mq;
-       }
-      else
-       {
-         /* append */
-         mqe->next = mq;
-       }
-    }
-  try_transmission_to_peer (neighbor);
+    GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+                                      neighbour->messages_tail,
+                                      neighbour->messages_tail,
+                                      mq);
+  try_transmission_to_peer (neighbour);
 }
 
 
@@ -1310,7 +1381,7 @@ refresh_hello ()
 {
   struct GNUNET_HELLO_Message *hello;
   struct TransportClient *cpos;
-  struct NeighborList *npos;
+  struct NeighbourList *npos;
   struct GeneratorContext gc;
 
   gc.plug_pos = plugins;
@@ -1334,15 +1405,16 @@ refresh_hello ()
   our_hello = hello;
   our_hello_version++;
   GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
-  npos = neighbors;
+  npos = neighbours;
   while (npos != NULL)
     {
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Transmitting updated `%s' to neighbor `%4s'\n",
+                  "Transmitting updated `%s' to neighbour `%4s'\n",
                   "HELLO", GNUNET_i2s (&npos->id));
 #endif
       transmit_to_peer (NULL, NULL, 0,
+                       HELLO_ADDRESS_EXPIRATION,
                         (const char *) our_hello, 
                        GNUNET_HELLO_size(our_hello),
                         GNUNET_NO, npos);
@@ -1492,14 +1564,20 @@ plugin_env_notify_address (void *cls,
  */
 static void
 notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
-                        struct GNUNET_TIME_Relative latency)
+                        struct GNUNET_TIME_Relative latency,
+                       uint32_t distance)
 {
   struct ConnectInfoMessage cim;
   struct TransportClient *cpos;
 
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Notifying clients about connection from `%s'\n",
+             GNUNET_i2s (peer));
+#endif
   cim.header.size = htons (sizeof (struct ConnectInfoMessage));
   cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-  cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
+  cim.distance = htonl (distance);
   cim.latency = GNUNET_TIME_relative_hton (latency);
   memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
   cpos = clients;
@@ -1520,6 +1598,11 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
   struct DisconnectInfoMessage dim;
   struct TransportClient *cpos;
 
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Notifying clients about lost connection to `%s'\n",
+             GNUNET_i2s (peer));
+#endif
   dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
   dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
   dim.reserved = htonl (0);
@@ -1537,14 +1620,14 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
  * Find a ForeignAddressList entry for the given neighbour
  * that matches the given address and transport.
  *
- * @param neighbor which peer we care about
+ * @param neighbour which peer we care about
  * @param tname name of the transport plugin
  * @param addr binary address
  * @param addrlen length of addr
  * @return NULL if no such entry exists
  */
 static struct ForeignAddressList *
-find_peer_address(struct NeighborList *neighbor,
+find_peer_address(struct NeighbourList *neighbour,
                  const char *tname,
                  const char *addr,
                  size_t addrlen)
@@ -1552,7 +1635,7 @@ find_peer_address(struct NeighborList *neighbor,
   struct ReadyList *head;
   struct ForeignAddressList *address_head;
 
-  head = neighbor->plugins;
+  head = neighbour->plugins;
   while (head != NULL)
     {
       if (0 == strcmp (tname, head->plugin->short_name))
@@ -1572,17 +1655,17 @@ find_peer_address(struct NeighborList *neighbor,
 
 
 /**
- * Get the peer address struct for the given neighbor and
+ * Get the peer address struct for the given neighbour and
  * address.  If it doesn't yet exist, create it.
  *
- * @param neighbor which peer we care about
+ * @param neighbour which peer we care about
  * @param tname name of the transport plugin
  * @param addr binary address
  * @param addrlen length of addr
  * @return NULL if we do not have a transport plugin for 'tname'
  */
 static struct ForeignAddressList *
-add_peer_address(struct NeighborList *neighbor,
+add_peer_address(struct NeighbourList *neighbour,
                 const char *tname,
                 const char *addr, 
                 size_t addrlen)
@@ -1590,10 +1673,10 @@ add_peer_address(struct NeighborList *neighbor,
   struct ReadyList *head;
   struct ForeignAddressList *ret;
 
-  ret = find_peer_address (neighbor, tname, addr, addrlen);
+  ret = find_peer_address (neighbour, tname, addr, addrlen);
   if (ret != NULL)
     return ret;
-  head = neighbor->plugins;
+  head = neighbour->plugins;
   while (head != NULL)
     {
       if (0 == strcmp (tname, head->plugin->short_name))
@@ -1609,7 +1692,7 @@ add_peer_address(struct NeighborList *neighbor,
   ret->expires = GNUNET_TIME_relative_to_absolute
     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   ret->latency = GNUNET_TIME_relative_get_forever();
-  ret->transmit_ready = GNUNET_YES;
+  ret->distance = -1;
   ret->timeout = GNUNET_TIME_relative_to_absolute
     (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 
   ret->ready_list = head;
@@ -1689,7 +1772,7 @@ check_pending_validation (void *cls,
   unsigned int challenge = ntohl(pong->challenge);
   struct GNUNET_HELLO_Message *hello;
   struct GNUNET_PeerIdentity target;
-  struct NeighborList *n;
+  struct NeighbourList *n;
   struct ForeignAddressList *fal;
 
   if (ve->challenge != challenge)
@@ -1716,7 +1799,7 @@ check_pending_validation (void *cls,
                            &target, 
                            hello);
   GNUNET_free (hello);
-  n = find_neighbor (&target);
+  n = find_neighbour (&target);
   if (n != NULL)
     {
       fal = add_peer_address (n, ve->transport_name, 
@@ -1725,6 +1808,23 @@ check_pending_validation (void *cls,
       fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
       fal->validated = GNUNET_YES;
       fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+      if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+       n->latency = fal->latency;
+      else
+       n->latency.value = (fal->latency.value + n->latency.value) / 2;
+      n->distance = fal->distance;
+      if (GNUNET_NO == n->received_pong)
+       {
+         notify_clients_connect (&target, n->latency, n->distance);
+         n->received_pong = GNUNET_YES;
+       }
+      if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+       {
+         GNUNET_SCHEDULER_cancel (sched,
+                                  n->retry_task);
+         n->retry_task = GNUNET_SCHEDULER_NO_TASK;     
+         try_transmission_to_peer (n);
+       }
     }
 
   /* clean up validation entry */
@@ -1749,11 +1849,11 @@ check_pending_validation (void *cls,
  * (otherwise we may be seeing a MiM attack).
  *
  * @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
  * @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- *         by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ *         by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
  */
 static void
 handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
@@ -1789,6 +1889,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
 #endif
       return;
     }
+  
 #if 0
   /* FIXME: add given address to potential pool of our addresses
      (for voting) */
@@ -1802,39 +1903,39 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
 
 
 static void
-neighbor_timeout_task (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+neighbour_timeout_task (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct NeighborList *n = cls;
+  struct NeighbourList *n = cls;
 
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-              "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id));
+              "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
 #endif
   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  disconnect_neighbor (n, GNUNET_NO);
+  disconnect_neighbour (n, GNUNET_NO);
 }
 
 
 /**
- * Create a fresh entry in our neighbor list for the given peer.
- * Will try to transmit our current HELLO to the new neighbor.  Also
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour.  Also
  * notifies our clients about the new "connection".
  *
  * @param peer the peer for which we create the entry
- * @return the new neighbor list entry
+ * @return the new neighbour list entry
  */
-static struct NeighborList *
-setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
+static struct NeighbourList *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
 {
-  struct NeighborList *n;
+  struct NeighbourList *n;
   struct TransportPlugin *tp;
   struct ReadyList *rl;
 
   GNUNET_assert (our_hello != NULL);
-  n = GNUNET_malloc (sizeof (struct NeighborList));
-  n->next = neighbors;
-  neighbors = n;
+  n = GNUNET_malloc (sizeof (struct NeighbourList));
+  n->next = neighbours;
+  neighbours = n;
   n->id = *peer;
   n->last_quota_update = GNUNET_TIME_absolute_get ();
   n->peer_timeout =
@@ -1854,13 +1955,15 @@ setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
         }
       tp = tp->next;
     }
+  n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+  n->distance = -1;
   n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
                                                   GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                                  &neighbor_timeout_task, n);
+                                                  &neighbour_timeout_task, n);
   transmit_to_peer (NULL, NULL, 0,
+                   HELLO_ADDRESS_EXPIRATION,
                     (const char *) our_hello, GNUNET_HELLO_size(our_hello),
                     GNUNET_NO, n);
-  notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
   return n;
 }
 
@@ -1970,7 +2073,7 @@ run_validation (void *cls,
   struct GNUNET_PeerIdentity id;
   struct TransportPlugin *tp;
   struct ValidationEntry *va;
-  struct NeighborList *neighbor;
+  struct NeighbourList *neighbour;
   struct ForeignAddressList *peer_address;
   struct TransportPingMessage ping;
   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
@@ -2034,10 +2137,10 @@ run_validation (void *cls,
                                     &id.hashPubKey,
                                     va,
                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  neighbor = find_neighbor(&id);  
-  if (neighbor == NULL)
-    neighbor = setup_new_neighbor(&id);
-  peer_address = add_peer_address(neighbor, tname, addr, addrlen);    
+  neighbour = find_neighbour(&id);  
+  if (neighbour == NULL)
+    neighbour = setup_new_neighbour(&id);
+  peer_address = add_peer_address(neighbour, tname, addr, addrlen);    
   GNUNET_assert(peer_address != NULL);
   hello_size = GNUNET_HELLO_size(our_hello);
   tsize = sizeof(struct TransportPingMessage) + hello_size;
@@ -2059,10 +2162,11 @@ run_validation (void *cls,
              "HELLO", hello_size,
              "PING", sizeof (struct TransportPingMessage));
 #endif
-  transmit_to_peer(NULL, peer_address, 
-                  GNUNET_SCHEDULER_PRIORITY_DEFAULT,
-                  message_buf, tsize, 
-                  GNUNET_YES, neighbor);
+  transmit_to_peer (NULL, peer_address, 
+                   GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+                   HELLO_VERIFICATION_TIMEOUT,
+                   message_buf, tsize, 
+                   GNUNET_YES, neighbour);
   GNUNET_free(message_buf);
   return GNUNET_OK;
 }
@@ -2072,7 +2176,7 @@ run_validation (void *cls,
  * Add the given address to the list of foreign addresses
  * available for the given peer (check for duplicates).
  *
- * @param cls the respective 'struct NeighborList' to update
+ * @param cls the respective 'struct NeighbourList' to update
  * @param tname name of the transport
  * @param expiration expiration time
  * @param addr the address
@@ -2085,7 +2189,7 @@ add_to_foreign_address_list (void *cls,
                             struct GNUNET_TIME_Absolute expiration,
                             const void *addr, size_t addrlen)
 {
-  struct NeighborList *n = cls;
+  struct NeighbourList *n = cls;
   struct ForeignAddressList *fal;
 
   fal = find_peer_address (n, tname, addr, addrlen);
@@ -2093,13 +2197,16 @@ add_to_foreign_address_list (void *cls,
     {
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Adding address `%s' (%s) for peer `%4s' due to peerinfo data.\n",
+                 "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n",
                  GNUNET_a2s (addr, addrlen),
                  tname,
-                 GNUNET_i2s (&n->id));
+                 GNUNET_i2s (&n->id),
+                 expiration.value);
 #endif
       fal = add_peer_address (n, tname, addr, addrlen);
     }
+  if (fal == NULL)
+    return GNUNET_OK;
   fal->expires = GNUNET_TIME_absolute_max (expiration,
                                           fal->expires);
   fal->validated = GNUNET_YES;
@@ -2113,7 +2220,7 @@ add_to_foreign_address_list (void *cls,
  *
  * @param cls closure
  * @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
  * @param trust amount of trust we have in the peer (not used)
  */
 static void
@@ -2126,7 +2233,7 @@ check_hello_validated (void *cls,
   struct GNUNET_HELLO_Message *plain_hello;
   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
   struct GNUNET_PeerIdentity target;
-  struct NeighborList *n;
+  struct NeighbourList *n;
 
   if (peer == NULL)
     {
@@ -2164,12 +2271,15 @@ check_hello_validated (void *cls,
   if (h == NULL)
     return;
   chvc->hello_known = GNUNET_YES;
-  n = find_neighbor (peer);
+  n = find_neighbour (peer);
   if (n != NULL)
-    GNUNET_HELLO_iterate_addresses (h,
-                                   GNUNET_NO,
-                                   &add_to_foreign_address_list,
-                                   n);
+    {
+      GNUNET_HELLO_iterate_addresses (h,
+                                     GNUNET_NO,
+                                     &add_to_foreign_address_list,
+                                     n);
+      try_transmission_to_peer (n);
+    }
   GNUNET_HELLO_iterate_new_addresses (chvc->hello,
                                      h,
                                      GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME),
@@ -2244,41 +2354,28 @@ process_hello (struct TransportPlugin *plugin,
 
 
 /**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
  * has disconnected.  We may either need to do nothing (other plugins
  * still up), or trigger a full disconnect and clean up.  This
  * function updates our state and does the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
  * gone.
  *
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
  * @param check should we just check if all plugins
  *        disconnected or must we ask all plugins to
  *        disconnect?
  */
 static void
-disconnect_neighbor (struct NeighborList *current_handle, int check)
+disconnect_neighbour (struct NeighbourList *n, int check)
 {
   struct ReadyList *rpos;
-  struct NeighborList *npos;
-  struct NeighborList *nprev;
-  struct NeighborList *n;
+  struct NeighbourList *npos;
+  struct NeighbourList *nprev;
   struct MessageQueue *mq;
   struct ForeignAddressList *peer_addresses;
   struct ForeignAddressList *peer_pos;
 
-  if (neighbors == NULL)
-    return; /* We don't have any neighbors, so client has an already removed handle! */
-
-  npos = neighbors;
-  while ((npos != NULL) && (current_handle != npos))
-    npos = npos->next;
-
-  if (npos == NULL)
-    return; /* Couldn't find neighbor in existing list, must have been already removed! */
-  else
-    n = npos;
-
   if (GNUNET_YES == check)
     {
       rpos = n->plugins;
@@ -2294,14 +2391,14 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
           rpos = rpos->next;
         }
     }
-
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-              "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
+              "Disconnecting from `%4s'\n",
+             GNUNET_i2s (&n->id));
 #endif
-  /* remove n from neighbors list */
+  /* remove n from neighbours list */
   nprev = NULL;
-  npos = neighbors;
+  npos = neighbours;
   while ((npos != NULL) && (npos != n))
     {
       nprev = npos;
@@ -2309,19 +2406,19 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
     }
   GNUNET_assert (npos != NULL);
   if (nprev == NULL)
-    neighbors = n->next;
+    neighbours = n->next;
   else
     nprev->next = n->next;
 
   /* notify all clients about disconnect */
-  notify_clients_disconnect (&n->id);
+  if (GNUNET_YES == n->received_pong)
+    notify_clients_disconnect (&n->id);
 
   /* clean up all plugins, cancel connections and pending transmissions */
   while (NULL != (rpos = n->plugins))
     {
       n->plugins = rpos->next;
-      if (GNUNET_YES == rpos->connected)
-        rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
+      rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
 
       while (rpos->addresses != NULL)
         {
@@ -2333,16 +2430,26 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
     }
 
   /* free all messages on the queue */
-  while (NULL != (mq = n->messages))
+  while (NULL != (mq = n->messages_head))
     {
-      n->messages = mq->next;
-      GNUNET_assert (0 == memcmp(&mq->neighbor_id, 
+      GNUNET_CONTAINER_DLL_remove (n->messages_head,
+                                  n->messages_tail,
+                                  mq);
+      GNUNET_assert (0 == memcmp(&mq->neighbour_id, 
                                 &n->id,
                                 sizeof(struct GNUNET_PeerIdentity)));
       GNUNET_free (mq);
     }
   if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+    {
+      GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+      n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (sched, n->retry_task);
+      n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+    }
   /* finally, free n itself */
   GNUNET_free (n);
 }
@@ -2366,7 +2473,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
   struct TransportPingMessage *ping;
   struct TransportPongMessage *pong;
   uint16_t msize;
-  struct NeighborList *n;
+  struct NeighbourList *n;
   struct ReadyList *rl;
   struct ForeignAddressList *fal;
 
@@ -2412,9 +2519,9 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
                  GNUNET_CRYPTO_rsa_sign (my_private_key,
                                          &pong->purpose, &pong->signature));
 
-  n = find_neighbor(peer);
+  n = find_neighbour(peer);
   if (n == NULL)
-    n = setup_new_neighbor(peer);
+    n = setup_new_neighbour(peer);
   /* broadcast 'PONG' to all available addresses */
   rl = n->plugins;
   while (rl != NULL)
@@ -2424,6 +2531,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
        {
          transmit_to_peer(NULL, fal,
                           TRANSPORT_PONG_PRIORITY, 
+                          HELLO_VERIFICATION_TIMEOUT,
                           (const char *)pong, 
                           ntohs(pong->header.size), 
                           GNUNET_YES, 
@@ -2437,6 +2545,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
 }
 
 
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+  struct GNUNET_TIME_Relative ret;
+  struct GNUNET_TIME_Absolute now;
+  uint64_t del;
+  uint64_t avail;
+  uint64_t excess;
+
+  now = GNUNET_TIME_absolute_get ();
+  del = now.value - n->last_quota_update.value;
+  if (del > MAX_BANDWIDTH_CARRY)
+    {
+      update_quota (n, GNUNET_YES);
+      del = now.value - n->last_quota_update.value;
+      GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+    }
+  if (n->quota_in == 0)
+    n->quota_in = 1;      /* avoid divison by zero */
+  avail = del * n->quota_in;
+  if (avail > n->last_received)
+    return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
+  excess = n->last_received - avail;
+  ret.value = excess / n->quota_in;
+  if (ret.value > 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+               (unsigned long long) excess,
+               (unsigned long long) n->quota_in,
+               (unsigned long long) ret.value);
+  return ret;
+}
+
+
 /**
  * Function called by the plugin for each received message.
  * Update data volumes, possibly notify plugins about
@@ -2444,18 +2593,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
  * and generally forward to our receive callback.
  *
  * @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- *        (opaque to transport service)
- * @param sender_address_len the length of the sender address
  * @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- *         for future receive calls for messages from this
- *         particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ *         (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
 plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message,
                     unsigned int distance, const char *sender_address,
@@ -2467,108 +2614,91 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
   struct InboundMessage *im;
   struct ForeignAddressList *peer_address;
   uint16_t msize;
-  struct NeighborList *n;
+  struct NeighbourList *n;
 
-  n = find_neighbor (peer);
+  n = find_neighbour (peer);
   if (n == NULL)
-    {
-      if (message == NULL)
-        return;                 /* disconnect of peer already marked down */
-      n = setup_new_neighbor (peer);
-
-    }
+    n = setup_new_neighbour (peer);    
+  update_quota (n, GNUNET_NO);
   service_context = n->plugins;
   while ((service_context != NULL) && (plugin != service_context->plugin))
     service_context = service_context->next;
   GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
-  if (message == NULL)
+  if (message != NULL)
     {
+      peer_address = add_peer_address(n, 
+                                     plugin->short_name,
+                                     sender_address, 
+                                     sender_address_len);  
+      if (peer_address != NULL)
+       {
+         peer_address->distance = distance;
+         if (peer_address->connected == GNUNET_NO)
+           {
+             peer_address->connected = GNUNET_YES;
+             peer_address->connect_attempts++;
+           }
+         peer_address->timeout
+           =
+           GNUNET_TIME_relative_to_absolute
+           (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+       }
+      /* update traffic received amount ... */
+      msize = ntohs (message->size);
+      n->distance = distance;
+      n->peer_timeout =
+       GNUNET_TIME_relative_to_absolute
+       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+      GNUNET_SCHEDULER_cancel (sched,
+                              n->timeout_task);
+      n->timeout_task =
+       GNUNET_SCHEDULER_add_delayed (sched,
+                                     GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                     &neighbour_timeout_task, n);
+      if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+       {
+         /* dropping message due to frequent inbound volume violations! */
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+                     GNUNET_ERROR_TYPE_BULK,
+                     _
+                     ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), 
+                     n->quota_violation_count);
+         return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+       }
+      switch (ntohs (message->type))
+       {
+       case GNUNET_MESSAGE_TYPE_HELLO:
+         process_hello (plugin, message);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+         handle_ping(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+         handle_pong(plugin, message, peer, sender_address, sender_address_len);
+         break;
+       default:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Receive failed from `%4s', triggering disconnect\n",
-                  GNUNET_i2s (&n->id));
-#endif
-      /* TODO: call stats */
-      if (service_context != NULL)
-        service_context->connected = GNUNET_NO;
-      disconnect_neighbor (n, GNUNET_YES);
-      return;
-    }
-  peer_address = add_peer_address(n, 
-                                 plugin->short_name,
-                                 sender_address, 
-                                 sender_address_len);
-  if (service_context != NULL)
-    {
-      if (service_context->connected == GNUNET_NO)
-        {
-          service_context->connected = GNUNET_YES;
-          /* FIXME: What to do here?  Should we use these as well, 
-            to specify some Address in the AddressList should be
-            available? */
-          peer_address->transmit_ready = GNUNET_YES;
-          peer_address->connect_attempts++;
-        }
-      peer_address->timeout
-        =
-        GNUNET_TIME_relative_to_absolute
-        (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    }
-  /* update traffic received amount ... */
-  msize = ntohs (message->size);
-  n->last_received += msize;
-  GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  n->timeout_task =
-    GNUNET_SCHEDULER_add_delayed (sched,
-                                  GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                  &neighbor_timeout_task, n);
-  update_quota (n);
-  if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
-    {
-      /* dropping message due to frequent inbound volume violations! */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
-                  GNUNET_ERROR_TYPE_BULK,
-                  _
-                  ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
-      /* TODO: call stats */
-      return;
-    }
-  switch (ntohs (message->type))
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      process_hello (plugin, message);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
-      handle_ping(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
-      handle_pong(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    default:
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received message of type %u from `%4s', sending to all clients.\n",
-                  ntohs (message->type), GNUNET_i2s (peer));
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Received message of type %u from `%4s', sending to all clients.\n",
+                     ntohs (message->type), GNUNET_i2s (peer));
 #endif
-      /* transmit message to all clients */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
-      im->header.size = htons (sizeof (struct InboundMessage) + msize);
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = n->latency;
-      im->peer = *peer;
-      memcpy (&im[1], message, msize);
-
-      cpos = clients;
-      while (cpos != NULL)
-        {
-          transmit_to_client (cpos, &im->header, GNUNET_YES);
-          cpos = cpos->next;
-        }
-      GNUNET_free (im);
-    }
+         /* transmit message to all clients */
+         im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+         im->header.size = htons (sizeof (struct InboundMessage) + msize);
+         im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+         im->latency = GNUNET_TIME_relative_hton (n->latency);
+         im->peer = *peer;
+         memcpy (&im[1], message, msize);
+         cpos = clients;
+         while (cpos != NULL)
+           {
+             transmit_to_client (cpos, &im->header, GNUNET_YES);
+             cpos = cpos->next;
+           }
+         GNUNET_free (im);
+       }
+    }  
+  return calculate_throttle_delay (n);
 }
 
 
@@ -2587,9 +2717,7 @@ handle_start (void *cls,
 {
   struct TransportClient *c;
   struct ConnectInfoMessage cim;
-  struct NeighborList *n;
-  struct InboundMessage *im;
-  struct GNUNET_MessageHeader *ack;
+  struct NeighbourList *n;
 
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2623,33 +2751,18 @@ handle_start (void *cls,
       /* tell new client about all existing connections */
       cim.header.size = htons (sizeof (struct ConnectInfoMessage));
       cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
-      cim.quota_out =
-        htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
-      cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) +
-                          sizeof (struct GNUNET_MessageHeader));
-      im->header.size = htons (sizeof (struct InboundMessage) +
-                               sizeof (struct GNUNET_MessageHeader));
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);  /* FIXME? */
-      ack = (struct GNUNET_MessageHeader *) &im[1];
-      ack->size = htons (sizeof (struct GNUNET_MessageHeader));
-      ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
-      for (n = neighbors; n != NULL; n = n->next)
-        {
-          cim.id = n->id;
-          transmit_to_client (c, &cim.header, GNUNET_NO);
-          if (n->received_pong)
-            {
-              im->peer = n->id;
-              transmit_to_client (c, &im->header, GNUNET_NO);
+      n = neighbours; 
+      while (n != NULL)
+       {
+         if (GNUNET_YES == n->received_pong)
+           {
+             cim.id = n->id;
+             cim.latency = GNUNET_TIME_relative_hton (n->latency);
+             cim.distance = htonl (n->distance);
+             transmit_to_client (c, &cim.header, GNUNET_NO);
             }
+           n = n->next;
         }
-      GNUNET_free (im);
-    }
-  else
-    {
-      fprintf(stderr, "Our hello is NULL!\n");
     }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -2669,10 +2782,6 @@ handle_hello (void *cls,
 {
   int ret;
 
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client\n", "HELLO");
-#endif
   ret = process_hello (NULL, message);
   GNUNET_SERVER_receive_done (client, ret);
 }
@@ -2691,7 +2800,7 @@ handle_send (void *cls,
              const struct GNUNET_MessageHeader *message)
 {
   struct TransportClient *tc;
-  struct NeighborList *n;
+  struct NeighbourList *n;
   const struct OutboundMessage *obm;
   const struct GNUNET_MessageHeader *obmm;
   uint16_t size;
@@ -2719,9 +2828,9 @@ handle_send (void *cls,
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
-  n = find_neighbor (&obm->peer);
+  n = find_neighbour (&obm->peer);
   if (n == NULL)
-    n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */
+    n = setup_new_neighbour (&obm->peer);
   tc = clients;
   while ((tc != NULL) && (tc->client != client))
     tc = tc->next;
@@ -2732,7 +2841,9 @@ handle_send (void *cls,
               ntohs (obmm->size),
               ntohs (obmm->type), GNUNET_i2s (&obm->peer));
 #endif
-  transmit_to_peer (tc, NULL, ntohl (obm->priority), (char *)obmm, 
+  transmit_to_peer (tc, NULL, ntohl (obm->priority), 
+                   GNUNET_TIME_relative_ntoh (obm->timeout),
+                   (char *)obmm, 
                    ntohs (obmm->size), GNUNET_NO, n);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -2752,62 +2863,25 @@ handle_set_quota (void *cls,
 {
   const struct QuotaSetMessage *qsm =
     (const struct QuotaSetMessage *) message;
-  struct NeighborList *n;
-  struct TransportPlugin *p;
-  struct ReadyList *rl;
+  struct NeighbourList *n;
+  uint32_t qin;
 
-  n = find_neighbor (&qsm->peer);
+  n = find_neighbour (&qsm->peer);
   if (n == NULL)
     {
       GNUNET_SERVER_receive_done (client, GNUNET_OK);
       return;
     }
-
+  qin = ntohl (qsm->quota_in);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
-              "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+              "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer));
 #endif
-
-  update_quota (n);
-  if (n->quota_in < ntohl (qsm->quota_in))
+  update_quota (n, GNUNET_YES);
+  if (n->quota_in < qin)
     n->last_quota_update = GNUNET_TIME_absolute_get ();
-  n->quota_in = ntohl (qsm->quota_in);
-  rl = n->plugins;
-  while (rl != NULL)
-    {
-      p = rl->plugin;
-      p->api->set_receive_quota (p->api->cls,
-                                 &qsm->peer, ntohl (qsm->quota_in));
-      rl = rl->next;
-    }
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Handle TRY_CONNECT-message.
- *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_try_connect (void *cls,
-                    struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *message)
-{
-  const struct TryConnectMessage *tcm;
-  struct NeighborList *neighbor;
-  tcm = (const struct TryConnectMessage *) message;
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received `%s' request from client %p asking to connect to `%4s'\n",
-              "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
-#endif
-  neighbor = find_neighbor(&tcm->peer);
-  if (neighbor == NULL)
-    setup_new_neighbor (&tcm->peer);
+  n->quota_in = qin;
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -2905,9 +2979,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
    GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
   {&handle_set_quota, NULL,
    GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
-  {&handle_try_connect, NULL,
-   GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
-   sizeof (struct TryConnectMessage)},
   {&handle_address_lookup, NULL,
    GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
    0},
@@ -2927,8 +2998,6 @@ create_environment (struct TransportPlugin *plug)
   plug->env.cls = plug;
   plug->env.receive = &plugin_env_receive;
   plug->env.notify_address = &plugin_env_notify_address;
-  plug->env.default_quota_in =
-    (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
   plug->env.max_connections = max_connect_per_transport;
 }
 
@@ -2996,10 +3065,12 @@ client_disconnect_notification (void *cls,
     return;
   while (NULL != (mqe = pos->message_queue_head))
     {
-      pos->message_queue_head = mqe->next;
+      GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+                                  pos->message_queue_tail,
+                                  mqe);
+      pos->message_count--;
       GNUNET_free (mqe);
     }
-  pos->message_queue_head = NULL;
   if (prev == NULL)
     clients = pos->next;
   else
@@ -3009,6 +3080,12 @@ client_disconnect_notification (void *cls,
       pos->client = NULL;
       return;
     }
+  if (pos->th != NULL)
+    {
+      GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+      pos->th = NULL;
+    }
+  GNUNET_break (0 == pos->message_count);
   GNUNET_free (pos);
 }
 
@@ -3049,6 +3126,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct OwnAddressList *al;
   struct CheckHelloValidatedContext *chvc;
 
+  while (neighbours != NULL)
+    disconnect_neighbour (neighbours, GNUNET_NO);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transport service is unloading plugins...\n");