stuff
[oweals/gnunet.git] / src / transport / transport_api.c
index c6269e5625f0862808ca5dcf4bf3f84845fec82d..4583cf8760c5c79af22b7c3028789370289f615b 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @file transport/transport_api.c
  * @brief library to access the low-level P2P IO service
  * @author Christian Grothoff
+ *
+ * TODO:
+ * - adjust testcases to use new 'try connect' style (should be easy, breaks API compatibility!)
+ * - adjust core service to use new 'try connect' style (should be MUCH nicer there as well!)
+ * - test test test
  */
 #include "platform.h"
+#include "gnunet_bandwidth_lib.h"
 #include "gnunet_client_lib.h"
+#include "gnunet_constants.h"
+#include "gnunet_container_lib.h"
 #include "gnunet_arm_service.h"
 #include "gnunet_hello_lib.h"
 #include "gnunet_protocols.h"
 #include "transport.h"
 
 /**
- * After how long do we give up on transmitting a HELLO
- * to the service?
- */
-#define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
-
-/**
- * After how long do we automatically retry an unsuccessful
- * CONNECT request?
+ * How large to start with for the hashmap of neighbours.
  */
-#define CONNECT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 750)
+#define STARTING_NEIGHBOURS_SIZE 16
 
-/**
- * How long should ARM wait when starting up the
- * transport service before reporting back?
- */
-#define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
-
-/**
- * How long should ARM wait when stopping the
- * transport service before reporting back?
- */
-#define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
- * Entry in linked list of all of our current neighbours.
+ * Handle for a message that should be transmitted to the service.
+ * Used for both control messages and normal messages.
  */
-struct NeighbourList
+struct GNUNET_TRANSPORT_TransmitHandle
 {
 
   /**
-   * This is a linked list.
+   * We keep all requests in a DLL.
    */
-  struct NeighbourList *next;
+  struct GNUNET_TRANSPORT_TransmitHandle *next;
 
   /**
-   * Active transmit handle, can be NULL.  Used to move
-   * from ready to wait list on disconnect and to block
-   * two transmissions to the same peer from being scheduled
-   * at the same time.
+   * We keep all requests in a DLL.
    */
-  struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
+  struct GNUNET_TRANSPORT_TransmitHandle *prev;
 
   /**
-   * Identity of this neighbour.
+   * Neighbour for this handle, NULL for control messages.
    */
-  struct GNUNET_PeerIdentity id;
+  struct Neighbour *neighbour;
+
+  /**
+   * Function to call when notify_size bytes are available
+   * for transmission.
+   */
+  GNUNET_CONNECTION_TransmitReadyNotify notify;
 
   /**
-   * At what time did we reset last_sent last?
+   * Closure for notify.
    */
-  struct GNUNET_TIME_Absolute last_quota_update;
+  void *notify_cls;
 
   /**
-   * How many bytes have we sent since the "last_quota_update"
-   * timestamp?
+   * Timeout for this request, 0 for control messages.
    */
-  uint64_t last_sent;
+  struct GNUNET_TIME_Absolute timeout;
 
   /**
-   * Quota for outbound traffic to the neighbour in bytes/ms.
+   * Task to trigger request timeout if the request is stalled due to 
+   * congestion.
    */
-  uint32_t quota_out;
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
   /**
-   * Set to GNUNET_YES if we are currently allowed to
-   * transmit a message to the transport service for this
-   * peer, GNUNET_NO otherwise.
+   * How many bytes is our notify callback waiting for?
    */
-  int transmit_ok;
+  size_t notify_size;
 
   /**
-   * Set to GNUNET_YES if we have received an ACK for the
-   * given peer.  Peers that receive our HELLO always respond
-   * with an ACK to let us know that we are successfully
-   * communicating.  Note that a PING can not be used for this
-   * since PINGs are only send if a HELLO address requires
-   * confirmation (and also, PINGs are not passed to the
-   * transport API itself).
+   * How important is this message? Not used for control messages.
    */
-  int received_ack;
+  uint32_t priority;
 
 };
 
 
 /**
- * Linked list of requests from clients for our HELLO
- * that were deferred.
+ * Entry in hash table of all of our current neighbours.
  */
-struct HelloWaitList
+struct Neighbour
 {
-
   /**
-   * This is a linked list.
+   * Overall transport handle.
    */
-  struct HelloWaitList *next;
+  struct GNUNET_TRANSPORT_Handle *h;
 
   /**
-   * Reference back to our transport handle.
+   * Active transmit handle or NULL.
    */
-  struct GNUNET_TRANSPORT_Handle *handle;
+  struct GNUNET_TRANSPORT_TransmitHandle *th;
 
   /**
-   * Callback to call once we got our HELLO.
+   * Identity of this neighbour.
    */
-  GNUNET_TRANSPORT_HelloUpdateCallback rec;
+  struct GNUNET_PeerIdentity id;
 
   /**
-   * Closure for rec.
+   * Outbound bandwidh tracker.
    */
-  void *rec_cls;
+  struct GNUNET_BANDWIDTH_Tracker out_tracker;
 
   /**
-   * When to time out (call rec with NULL).
+   * Entry in our readyness heap (which is sorted by 'next_ready'
+   * value).  NULL if there is no pending transmission request for
+   * this neighbour or if we're waiting for 'is_ready' to become
+   * true AFTER the 'out_tracker' suggested that this peer's quota
+   * has been satisfied (so once 'is_ready' goes to GNUNET_YES,
+   * we should immediately go back into the heap).
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct GNUNET_CONTAINER_HeapNode *hn;
 
   /**
-   * Timeout task (used to trigger timeout,
-   * cancel if we get the HELLO in time).
+   * Is this peer currently ready to receive a message?
    */
-  GNUNET_SCHEDULER_TaskIdentifier task;
-
+  int is_ready;
 
 };
 
 
 /**
- * Opaque handle for a transmission-ready request.
+ * Linked list of functions to call whenever our HELLO is updated.
  */
-struct GNUNET_TRANSPORT_TransmitHandle
+struct HelloWaitList
 {
 
   /**
-   * We keep the transmit handles that are waiting for
-   * a transport-level connection in a doubly linked list.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *next;
-
-  /**
-   * We keep the transmit handles that are waiting for
-   * a transport-level connection in a doubly linked list.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *prev;
-
-  /**
-   * Handle of the main transport data structure.
-   */
-  struct GNUNET_TRANSPORT_Handle *handle;
-
-  /**
-   * Neighbour for this handle, can be NULL if the service
-   * is not yet connected to the target.
-   */
-  struct NeighbourList *neighbour;
-
-  /**
-   * Which peer is this transmission going to be for?  All
-   * zeros if it is control-traffic to the service.
-   */
-  struct GNUNET_PeerIdentity target;
-
-  /**
-   * Function to call when notify_size bytes are available
-   * for transmission.
-   */
-  GNUNET_CONNECTION_TransmitReadyNotify notify;
-
-  /**
-   * Closure for notify.
-   */
-  void *notify_cls;
-
-  /**
-   * transmit_ready task Id.  The task is used to introduce the
-   * artificial delay that may be required to maintain the bandwidth
-   * limits.  Later, this will be the ID of the "transmit_timeout"
-   * task which is used to signal a timeout if the transmission could
-   * not be done in a timely fashion.
+   * This is a doubly linked list.
    */
-  GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
+  struct HelloWaitList *next;
 
   /**
-   * Timeout for this request.
+   * This is a doubly linked list.
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct HelloWaitList *prev;
 
   /**
-   * How many bytes is our notify callback waiting for?
+   * Callback to call once we got our HELLO.
    */
-  size_t notify_size;
+  GNUNET_TRANSPORT_HelloUpdateCallback rec;
 
   /**
-   * How important is this message?
+   * Closure for rec.
    */
-  unsigned int priority;
+  void *rec_cls;
 
 };
 
@@ -260,6 +203,16 @@ struct GNUNET_TRANSPORT_Handle
    */
   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
 
+  /**
+   * Head of DLL of control messages.
+   */
+  struct GNUNET_TRANSPORT_TransmitHandle *control_head;
+
+  /**
+   * Tail of DLL of control messages.
+   */
+  struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
+
   /**
    * The current HELLO message for this peer.  Updated
    * whenever transports change their addresses.
@@ -274,1108 +227,1062 @@ struct GNUNET_TRANSPORT_Handle
   /**
    * Handle to our registration with the client for notification.
    */
-  struct GNUNET_CLIENT_TransmitHandle *network_handle;
+  struct GNUNET_CLIENT_TransmitHandle *cth;
 
   /**
-   * Linked list of transmit handles that are waiting for the
-   * transport to connect to the respective peer.  When we
-   * receive notification that the transport connected to a
-   * peer, we go over this list and check if someone has already
-   * requested a transmission to the new peer; if so, we trigger
-   * the next step.
+   * Linked list of pending requests for our HELLO.
    */
-  struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
+  struct HelloWaitList *hwl_head;
 
   /**
-   * Linked list of transmit handles that are waiting for the
-   * transport to be ready for transmission to the respective
-   * peer.  When we
-   * receive notification that the transport disconnected from
-   * a peer, we go over this list and move the entry back to
-   * the connect_wait list.
+   * Linked list of pending requests for our HELLO.
    */
-  struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
+  struct HelloWaitList *hwl_tail;
 
   /**
-   * Linked list of pending requests for our HELLO.
+   * My configuration.
    */
-  struct HelloWaitList *hwl_head;
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * My scheduler.
+   * Hash map of the current connected neighbours of this peer.
+   * Maps peer identities to 'struct Neighbour' entries.
    */
-  struct GNUNET_SCHEDULER_Handle *sched;
+  struct GNUNET_CONTAINER_MultiHashMap *neighbours;
 
   /**
-   * My configuration.
-   */
-  const struct GNUNET_CONFIGURATION_Handle *cfg;
+   * Heap sorting peers with pending messages by the timestamps that
+   * specify when we could next send a message to the respective peer.
+   * Excludes control messages (which can always go out immediately).
+   * Maps time stamps to 'struct Neighbour' entries.
+   */ 
+  struct GNUNET_CONTAINER_Heap *ready_heap;
 
   /**
-   * Linked list of the current neighbours of this peer.
+   * Peer identity as assumed by this process, or all zeros.
    */
-  struct NeighbourList *neighbours;
+  struct GNUNET_PeerIdentity self;
 
   /**
-   * ID of the task trying to reconnect to the
-   * service.
+   * ID of the task trying to reconnect to the service.
    */
   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
 
+  /**
+   * ID of the task trying to trigger transmission for a peer while
+   * maintaining bandwidth quotas.  In use if there are no control
+   * messages and the smallest entry in the 'ready_heap' has a time
+   * stamp in the future.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier quota_task;
+
   /**
    * Delay until we try to reconnect.
    */
   struct GNUNET_TIME_Relative reconnect_delay;
 
   /**
-   * Do we currently have a transmission pending?
-   * (schedule transmission was called but has not
-   * yet succeeded)?
+   * Should we check that 'self' matches what the service thinks?
+   * (if GNUNET_NO, then 'self' is all zeros!).
    */
-  int transmission_scheduled;
+  int check_self;
 };
 
 
-static struct NeighbourList *
-find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
-                const struct GNUNET_PeerIdentity *peer)
-{
-  struct NeighbourList *pos;
-
-  pos = h->neighbours;
-  while ((pos != NULL) &&
-         (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
-    pos = pos->next;
-  return pos;
-}
-
-
-/**
- * Schedule the task to send one message from the
- * connect_ready list to the service.
- */
-static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
-
-
-/**
- * Transmit message to client...
- */
-static size_t
-transport_notify_ready (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_TRANSPORT_Handle *h = cls;
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-  struct NeighbourList *n;
-  size_t ret;
-  char *cbuf;
-
-  h->network_handle = NULL;
-  h->transmission_scheduled = GNUNET_NO;
-  if (buf == NULL)
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Could not transmit to transport service, cancelling pending requests\n");
-#endif
-      th = h->connect_ready_head;
-      if (th->next != NULL)
-        th->next->prev = NULL;
-      h->connect_ready_head = th->next;
-      if (NULL != (n = th->neighbour))
-        {
-          GNUNET_assert (n->transmit_handle == th);
-          n->transmit_handle = NULL;
-        }
-      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-        {
-          GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
-          th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-        }
-      if (NULL != th->notify)
-       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-      GNUNET_free (th);
-      if (h->connect_ready_head != NULL)
-        schedule_transmission (h);      /* FIXME: is this ok? */
-      return 0;
-    }
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Ready to transmit %u bytes to transport service\n", size);
-#endif
-  cbuf = buf;
-  ret = 0;
-  h->network_handle = NULL;
-  h->transmission_scheduled = GNUNET_NO;
-  while ((h->connect_ready_head != NULL) &&
-         (h->connect_ready_head->notify_size <= size))
-    {
-      th = h->connect_ready_head;
-      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-        {
-          GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
-          th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-        }
-      GNUNET_assert (th->notify_size <= size);
-      if (th->next != NULL)
-        th->next->prev = NULL;
-      h->connect_ready_head = th->next;
-      if (NULL != (n = th->neighbour))
-        {
-          GNUNET_assert (n->transmit_handle == th);
-          n->transmit_handle = NULL;
-        }
-      if (NULL != th->notify)
-       ret += th->notify (th->notify_cls, size, &cbuf[ret]);
-      GNUNET_free (th);
-      if (n != NULL)
-        n->last_sent += ret;
-      size -= ret;
-    }
-  if (h->connect_ready_head != NULL)
-    schedule_transmission (h);
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting %u bytes to transport service\n", ret);
-#endif
-  return ret;
-}
-
-
 /**
- * Schedule the task to send one message from the
- * connect_ready list to the service.
+ * Schedule the task to send one message, either from the control
+ * list or the peer message queues  to the service.
+ *
+ * @param h transport service to schedule a transmission for
  */
 static void
-schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
-{
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-
-  GNUNET_assert (NULL == h->network_handle);
-  if (h->client == NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  "Could not yet schedule transmission: we are not yet connected to the transport service!\n");
-      return;                   /* not yet connected */
-    }
-  th = h->connect_ready_head;
-  if (th == NULL)
-    return;                     /* no request pending */
-  if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      /* remove existing time out task, will be integrated
-         with transmit_ready notification! */
-      GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
-      th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  h->transmission_scheduled = GNUNET_YES;
-  h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                                           th->notify_size,
-                                                           GNUNET_TIME_absolute_get_remaining
-                                                           (th->timeout),
-                                                           GNUNET_NO,
-                                                           &transport_notify_ready,
-                                                           h);
-  GNUNET_assert (NULL != h->network_handle);
-}
+schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
 
 
 /**
- * Insert the given transmit handle in the given sorted
- * doubly linked list based on timeout.
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
  *
- * @param head pointer to the head of the linked list
- * @param th element to insert into the list
+ * @param h transport service to reconnect
  */
 static void
-insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
-                        struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
-  struct GNUNET_TRANSPORT_TransmitHandle *pos;
-  struct GNUNET_TRANSPORT_TransmitHandle *prev;
-
-  pos = *head;
-  prev = NULL;
-  while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
-    {
-      prev = pos;
-      pos = pos->next;
-    }
-  if (prev == NULL)
-    {
-      th->next = *head;
-      if (th->next != NULL)
-        th->next->prev = th;
-      *head = th;
-    }
-  else
-    {
-      th->next = pos;
-      th->prev = prev;
-      prev->next = th;
-      if (pos != NULL)
-        pos->prev = th;
-    }
-}
+disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
 
 
 /**
- * Cancel a pending notify delay task (if pending) and also remove the
- * given transmit handle from whatever list is on.
+ * Get the neighbour list entry for the given peer
  *
- * @param th handle for the transmission request to manipulate
+ * @param h our context
+ * @param peer peer to look up
+ * @return NULL if no such peer entry exists
  */
-static void
-remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
+static struct Neighbour *
+neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
+                const struct GNUNET_PeerIdentity *peer)
 {
-  struct GNUNET_TRANSPORT_Handle *h;
-
-  h = th->handle;
-  if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
-      th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  if (th->prev == NULL)
-    {
-      if (th == h->connect_wait_head)
-        h->connect_wait_head = th->next;
-      else
-        h->connect_ready_head = th->next;
-    }
-  else
-    {
-      th->prev->next = th->next;
-    }
-  if (th->next != NULL)
-    th->next->prev = th->prev;
+  return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey);
 }
 
 
 /**
- * Schedule a request to connect to the given
- * neighbour (and if successful, add the specified
- * handle to the wait list).
+ * Add neighbour to our list
  *
- * @param th handle for a request to transmit once we
- *        have connected
- */
-static void try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th);
-
-
-/**
- * Called when our transmit request timed out before any transport
- * reported success connecting to the desired peer or before the
- * transport was ready to receive.  Signal error and free
- * TransmitHandle.
+ * @return NULL if this API is currently disconnecting from the service
  */
-static void
-peer_transmit_timeout (void *cls,
-                       const struct GNUNET_SCHEDULER_TaskContext *tc)
+static struct Neighbour *
+neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
+               const struct GNUNET_PeerIdentity *pid)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+  struct Neighbour *n;
 
-  th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-  if (th->neighbour != NULL)
-    th->neighbour->transmit_handle = NULL;
 #if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Request for transmission to peer `%s' timed out.\n",
-              GNUNET_i2s (&th->target));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating entry for neighbour `%4s'.\n",
+             GNUNET_i2s (pid));
 #endif
-  remove_from_any_list (th);
-  if (NULL != th->notify)
-    th->notify (th->notify_cls, 0, NULL);
-  GNUNET_free (th);
+  n = GNUNET_malloc (sizeof (struct Neighbour));
+  n->id = *pid;
+  n->h = h;
+  n->is_ready = GNUNET_YES;
+  GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
+                                GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+                                MAX_BANDWIDTH_CARRY_S);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (h->neighbours,
+                                                   &pid->hashPubKey,
+                                                   n,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  return n;
 }
 
 
-
-
 /**
- * Queue control request for transmission to the transport
- * service.
+ * Iterator over hash map entries, for deleting state of a neighbour.
  *
- * @param h handle to the transport service
- * @param size number of bytes to be transmitted
- * @param at_head request must be added to the head of the queue
- *        (otherwise request will be appended)
- * @param timeout how long this transmission can wait (at most)
- * @param notify function to call to get the content
- * @param notify_cls closure for notify
+ * @param cls the 'struct GNUNET_TRANSPORT_Handle*'
+ * @param key peer identity
+ * @param value value in the hash map, the neighbour entry to delete
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
  */
-static void
-schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
-                           size_t size,
-                           int at_head,
-                           struct GNUNET_TIME_Relative timeout,
-                           GNUNET_CONNECTION_TransmitReadyNotify notify,
-                           void *notify_cls)
+static int
+neighbour_delete (void *cls,
+                 const GNUNET_HashCode * key,
+                 void *value)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Control transmit of %u bytes within %llums requested\n",
-              size, (unsigned long long) timeout.value);
-#endif
-  th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
-  th->handle = h;
-  th->notify = notify;
-  th->notify_cls = notify_cls;
-  th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  th->notify_size = size;
-  th->notify_delay_task
-    = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                    timeout, &peer_transmit_timeout, th);
-  if (at_head)
-    {
-      th->next = h->connect_ready_head;
-      h->connect_ready_head = th;
-      if (th->next != NULL)
-        th->next->prev = th;
-    }
-  else
-    {
-      insert_transmit_handle (&h->connect_ready_head, th);
-    }
-  if (GNUNET_NO == h->transmission_scheduled)
-    schedule_transmission (h);
+  struct GNUNET_TRANSPORT_Handle *handle = cls;
+  struct Neighbour *n = value;
+
+  if (NULL != handle->nd_cb)
+    handle->nd_cb (handle->cls,
+                  &n->id);
+  GNUNET_assert (NULL == n->th);
+  GNUNET_assert (NULL == n->hn);
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (handle->neighbours,
+                                                      key,
+                                                      n));
+  GNUNET_free (n);
+  return GNUNET_YES;
 }
 
 
 /**
- * Update the quota values for the given neighbour now.
+ * Function we use for handling incoming messages.
+ *
+ * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
+ * @param msg message received, NULL on timeout or fatal error
  */
 static void
-update_quota (struct NeighbourList *n)
+demultiplexer (void *cls,
+              const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_TIME_Relative delta;
-  uint64_t allowed;
-  uint64_t remaining;
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  const struct DisconnectInfoMessage *dim;
+  const struct ConnectInfoMessage *cim;
+  const struct InboundMessage *im;
+  const struct GNUNET_MessageHeader *imm;
+  const struct SendOkMessage *okm;
+  struct HelloWaitList *hwl;
+  struct HelloWaitList *next_hwl;
+  struct Neighbour *n;
+  struct GNUNET_PeerIdentity me;
+  uint16_t size;
+  uint32_t ats_count;
 
-  delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-  allowed = delta.value * n->quota_out;
-  if (n->last_sent < allowed)
+  GNUNET_assert (h->client != NULL);
+  if (msg == NULL)
     {
-      remaining = allowed - n->last_sent;
-      if (n->quota_out > 0)
-        remaining /= n->quota_out;
-      else
-        remaining = 0;
-      if (remaining > MAX_BANDWIDTH_CARRY)
-        remaining = MAX_BANDWIDTH_CARRY;
-      n->last_sent = 0;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
-      n->last_quota_update.value -= remaining;
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                 "Error receiving from transport service, disconnecting temporarily.\n");
+#endif
+      disconnect_and_schedule_reconnect (h);
+      return;
     }
-  else
+  GNUNET_CLIENT_receive (h->client,
+                         &demultiplexer, h, 
+                        GNUNET_TIME_UNIT_FOREVER_REL);
+  size = ntohs (msg->size);
+  switch (ntohs (msg->type))
     {
-      n->last_sent -= allowed;
-      n->last_quota_update = GNUNET_TIME_absolute_get ();
-    }
-}
-
-
-struct SetQuotaContext
-{
-  struct GNUNET_TRANSPORT_Handle *handle;
-
-  struct GNUNET_PeerIdentity target;
-
-  GNUNET_SCHEDULER_Task cont;
-
-  void *cont_cls;
-
-  struct GNUNET_TIME_Absolute timeout;
-
-  uint32_t quota_in;
-};
-
-
-static size_t
-send_set_quota (void *cls, size_t size, void *buf)
-{
-  struct SetQuotaContext *sqc = cls;
-  struct QuotaSetMessage *msg;
+    case GNUNET_MESSAGE_TYPE_HELLO:
+      if (GNUNET_OK !=
+          GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+                               &me))
+        {
+          GNUNET_break (0);
+          break;
+        }
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Receiving (my own) `%s' message, I am `%4s'.\n",
+                  "HELLO", GNUNET_i2s (&me));
+#endif
+      GNUNET_free_non_null (h->my_hello);
+      h->my_hello = NULL;
+      if (size < sizeof (struct GNUNET_MessageHeader))
+        {
+          GNUNET_break (0);
+          break;
+        }
+      h->my_hello = GNUNET_malloc (size);
+      memcpy (h->my_hello, msg, size);
+      hwl = h->hwl_head;
+      while (NULL != hwl)
+        {
+         next_hwl = hwl->next;
+          hwl->rec (hwl->rec_cls,
+                   (const struct GNUNET_MessageHeader *) h->my_hello);
+         hwl = next_hwl;
+        }
+      break;
+    case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
+      if (size < sizeof (struct ConnectInfoMessage))
+        {
+          GNUNET_break (0);
+          break;
+        }
+      cim = (const struct ConnectInfoMessage *) msg;
+      ats_count = ntohl (cim->ats_count);
+      if (size != sizeof (struct ConnectInfoMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information))
+        {
+          GNUNET_break (0);
+          break;
+        }
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Receiving `%s' message for `%4s'.\n",
+                  "CONNECT", GNUNET_i2s (&cim->id));
+#endif
+      n = neighbour_find (h, &cim->id);
+      if (n != NULL)
+       {
+         GNUNET_break (0);
+         break;
+       }
+      n = neighbour_add (h, &cim->id);
+      if (h->nc_cb != NULL)
+       h->nc_cb (h->cls, &n->id,
+                 &cim->ats, ats_count);
+      break;
+    case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
+      if (size != sizeof (struct DisconnectInfoMessage))
+        {
+          GNUNET_break (0);
+          break;
+        }
+      dim = (const struct DisconnectInfoMessage *) msg;
+      GNUNET_break (ntohl (dim->reserved) == 0);
+#if DEBUG_TRANSPORT_DISCONNECT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Receiving `%s' message for `%4s'.\n",
+                  "DISCONNECT",
+                 GNUNET_i2s (&dim->peer));
+#endif
+      n = neighbour_find (h, &dim->peer);
+      if (n == NULL)
+       {
+         GNUNET_break (0);
+         break;
+       }
+      neighbour_delete (h, &dim->peer.hashPubKey, n);
+      break;
+    case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
+      if (size != sizeof (struct SendOkMessage))
+        {
+          GNUNET_break (0);
+          break;
+        }
+      okm = (const struct SendOkMessage *) msg;
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Receiving `%s' message, transmission %s.\n", "SEND_OK",
+                  ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+#endif
+      n = neighbour_find (h, &okm->peer);
+      if (n == NULL)
+       break;  
+      GNUNET_break (GNUNET_NO == n->is_ready);
+      n->is_ready = GNUNET_YES;
+      if ( (n->th != NULL) &&
+          (n->hn == NULL) )
+       {
+         GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->th->timeout_task);
+         GNUNET_SCHEDULER_cancel (n->th->timeout_task);
+         n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+         /* we've been waiting for this (congestion, not quota, 
+            caused delayed transmission) */
+         n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
+                                               n, 0);
+         schedule_transmission (h);
+       }
+      break;
+    case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Receiving `%s' message.\n", "RECV");
+#endif
+      if (size <
+          sizeof (struct InboundMessage) +
+          sizeof (struct GNUNET_MessageHeader))
+        {
+          GNUNET_break (0);
+          break;
+        }
+      im = (const struct InboundMessage *) msg;
+      GNUNET_break (0 == ntohl (im->reserved));
+      ats_count = ntohl(im->ats_count);
+      imm = (const struct GNUNET_MessageHeader *) &((&(im->ats))[ats_count+1]);
 
-  if (buf == NULL)
-    {
-      GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
-                                         sqc->cont,
-                                         sqc->cont_cls,
-                                         GNUNET_SCHEDULER_REASON_TIMEOUT);
-      GNUNET_free (sqc);
-      return 0;
-    }
+      if (ntohs (imm->size) + sizeof (struct InboundMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) != size)
+        {
+          GNUNET_break (0);
+          break;
+        }
 #if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting `%s' request with respect to `%4s'.\n",
-              "SET_QUOTA", GNUNET_i2s (&sqc->target));
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Received message of type %u from `%4s'.\n",
+                 ntohs (imm->type), GNUNET_i2s (&im->peer));
 #endif
-  GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
-  msg = buf;
-  msg->header.size = htons (sizeof (struct QuotaSetMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
-  msg->quota_in = htonl (sqc->quota_in);
-  memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
-  if (sqc->cont != NULL)
-    GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
-                                       sqc->cont,
-                                       sqc->cont_cls,
-                                       GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-  GNUNET_free (sqc);
-  return sizeof (struct QuotaSetMessage);
+      n = neighbour_find (h, &im->peer);
+      if (n == NULL)
+       {
+         GNUNET_break (0);
+         break;
+       }
+      if (h->rec != NULL)
+       h->rec (h->cls, &im->peer, imm,
+               &im->ats, ats_count);
+      break;
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  _
+                  ("Received unexpected message of type %u in %s:%u\n"),
+                  ntohs (msg->type), __FILE__, __LINE__);
+      GNUNET_break (0);
+      break;
+    }
 }
 
 
 /**
- * Set the share of incoming bandwidth for the given
- * peer to the specified amount.
+ * A transmission request could not be satisfied because of
+ * network congestion.  Notify the initiator and clean up.
  *
- * @param handle connection to transport service
- * @param target who's bandwidth quota is being changed
- * @param quota_in incoming bandwidth quota in bytes per ms
- * @param quota_out outgoing bandwidth quota in bytes per ms
- * @param timeout how long to wait until signaling failure if
- *        we can not communicate the quota change
- * @param cont continuation to call when done, will be called
- *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
- * @param cont_cls closure for continuation
+ * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle'
+ * @param tc scheduler context
  */
-void
-GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
-                            const struct GNUNET_PeerIdentity *target,
-                            uint32_t quota_in,
-                            uint32_t quota_out,
-                            struct GNUNET_TIME_Relative timeout,
-                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
+static void
+timeout_request_due_to_congestion (void *cls,
+                                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct NeighbourList *n;
-  struct SetQuotaContext *sqc;
+  struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+  struct Neighbour *n = th->neighbour;
 
-  n = find_neighbour (handle, target);
-  if (n != NULL)
-    {
-      update_quota (n);
-      if (n->quota_out < quota_out)
-        n->last_quota_update = GNUNET_TIME_absolute_get ();
-      n->quota_out = quota_out;
-    }
-  sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
-  sqc->handle = handle;
-  sqc->target = *target;
-  sqc->cont = cont;
-  sqc->cont_cls = cont_cls;
-  sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  sqc->quota_in = quota_in;
-  schedule_control_transmit (handle,
-                             sizeof (struct QuotaSetMessage),
-                             GNUNET_NO, timeout, &send_set_quota, sqc);
+  n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (th == n->th);
+  GNUNET_assert (NULL == n->hn);
+  n->th = NULL;
+  th->notify (th->notify_cls, 0, NULL);
+  GNUNET_free (th);
 }
 
 
 /**
- * Obtain the HELLO message for this peer.
+ * Transmit message(s) to service.
  *
- * @param handle connection to transport service
- * @param timeout how long to wait for the HELLO
- * @param rec function to call with the HELLO, sender will be our peer
- *            identity; message and sender will be NULL on timeout
- *            (handshake with transport service pending/failed).
- *             cost estimate will be 0.
- * @param rec_cls closure for rec
+ * @param cls handle to transport
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
  */
-void
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
-                            GNUNET_TRANSPORT_HelloUpdateCallback rec,
-                            void *rec_cls)
+static size_t
+transport_notify_ready (void *cls, size_t size, void *buf)
 {
-  struct HelloWaitList *hwl;
-
-  hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
-  hwl->next = handle->hwl_head;
-  handle->hwl_head = hwl;
-  hwl->handle = handle;
-  hwl->rec = rec;
-  hwl->rec_cls = rec_cls;
-  if (handle->my_hello == NULL)
-    return;    
-  rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello);
-}
-
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  struct GNUNET_TRANSPORT_TransmitHandle *th;
+  struct Neighbour *n;
+  char *cbuf;
+  struct OutboundMessage obm;
+  size_t ret;
+  size_t nret;
+  size_t mret;
 
+  GNUNET_assert (NULL != h->client);
+  h->cth = NULL;
+  if (NULL == buf)
+    {
+      /* transmission failed */
+      disconnect_and_schedule_reconnect (h);
+      return 0;
+    }
 
-/**
- * Stop receiving updates about changes to our HELLO message.
- *
- * @param handle connection to transport service
- * @param rec function previously registered to be called with the HELLOs
- * @param rec_cls closure for rec
- */
-void
-GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
-                                  GNUNET_TRANSPORT_HelloUpdateCallback rec,
-                                  void *rec_cls)
-{
-  struct HelloWaitList *pos;
-  struct HelloWaitList *prev;
+  cbuf = buf;
+  ret = 0;
+  /* first send control messages */
+  while ( (NULL != (th = h->control_head)) &&
+         (th->notify_size <= size) )
+    {
+      GNUNET_CONTAINER_DLL_remove (h->control_head,
+                                  h->control_tail,
+                                  th);
+      nret = th->notify (th->notify_cls, size, &cbuf[ret]);
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Added %u bytes of control message at %u\n",
+                 nret,
+                 ret);
+#endif
+      GNUNET_free (th);
+      ret += nret;
+      size -= nret;
+    }
 
-  prev = NULL;
-  pos = handle->hwl_head;
-  while (pos != NULL)
+  /* then, if possible and no control messages pending, send data messages */
+  while ( (NULL == h->control_head) &&
+         (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
     {
-      if ( (pos->rec == rec) &&
-          (pos->rec_cls == rec_cls) )
-       break;
-      prev = pos;
-      pos = pos->next;
+       if (GNUNET_YES != n->is_ready)
+       {
+         /* peer not ready, wait for notification! */
+         GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
+         n->hn = NULL;
+         GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->th->timeout_task);
+         n->th->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (n->th->timeout),
+                                                             &timeout_request_due_to_congestion,
+                                                             n->th);
+         continue;
+       }
+      th = n->th;
+      if (th->notify_size + sizeof (struct OutboundMessage) > size)
+       break; /* does not fit */
+      if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, th->notify_size).rel_value > 0)
+       break; /* too early */
+      GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
+      n->hn = NULL;
+      n->th = NULL;
+      n->is_ready = GNUNET_NO;
+      GNUNET_assert (size >= sizeof (struct OutboundMessage));
+      mret = th->notify (th->notify_cls,
+                        size - sizeof (struct OutboundMessage),
+                        &cbuf[ret + sizeof (struct OutboundMessage)]);
+      GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
+      if (mret != 0)   
+       {
+         GNUNET_assert (mret + sizeof (struct OutboundMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+         obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
+         obm.header.size = htons (mret + sizeof (struct OutboundMessage));
+         obm.priority = htonl (th->priority);
+         obm.timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout));
+         obm.peer = n->id;
+         memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
+         ret += (mret + sizeof (struct OutboundMessage));
+         size -= (mret + sizeof (struct OutboundMessage));
+         GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret);
+       }
+      GNUNET_free (th);
     }
-  GNUNET_break (pos != NULL);
-  if (pos == NULL)
-    return;
-  if (prev == NULL)
-    handle->hwl_head = pos->next;
-  else
-    prev->next = pos->next;
-  GNUNET_free (pos);
+  /* if there are more pending messages, try to schedule those */
+  schedule_transmission (h);
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Transmitting %u bytes to transport service\n", ret);
+#endif
+  return ret;
 }
 
 
-static size_t
-send_hello (void *cls, size_t size, void *buf)
+/**
+ * Schedule the task to send one message, either from the control
+ * list or the peer message queues  to the service.
+ *
+ * @param cls transport service to schedule a transmission for
+ * @param tc scheduler context
+ */
+static void
+schedule_transmission_task (void *cls,
+                           const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_MessageHeader *hello = cls;
-  uint16_t msize;
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  size_t size;
+  struct GNUNET_TRANSPORT_TransmitHandle *th;
+  struct Neighbour *n;
 
-  if (buf == NULL)
+  h->quota_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (NULL != h->client);
+  /* destroy all requests that have timed out */
+  while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
+         (GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value == 0) )
     {
+      /* notify client that the request could not be satisfied within
+        the given time constraints */
+      th = n->th;
+      n->th = NULL;
+      GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
+      n->hn = NULL;
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Timeout while trying to transmit `%s' request.\n",
-                  "HELLO");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Signalling timeout for transmission to peer %s due to congestion\n",
+                 GNUNET_i2s (&n->id));
 #endif
-      GNUNET_free (hello);
-      return 0;
+      GNUNET_assert (0 == 
+                    th->notify (th->notify_cls, 0, NULL));
+      GNUNET_free (th);      
+    }
+  if (NULL != h->cth)
+    return;
+  if (NULL != h->control_head)
+    {
+      size = h->control_head->notify_size;
     }
+  else
+    {
+      n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
+      if (NULL == n)
+       return; /* no pending messages */
+      size = n->th->notify_size + sizeof (struct OutboundMessage);
+    }  
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting `%s' request.\n", "HELLO");
+             "Calling notify_transmit_ready\n");
 #endif
-  msize = ntohs (hello->size);
-  GNUNET_assert (size >= msize);
-  memcpy (buf, hello, msize);
-  GNUNET_free (hello);
-  return msize;
+  h->cth =
+    GNUNET_CLIENT_notify_transmit_ready (h->client,
+                                        size,
+                                        GNUNET_TIME_UNIT_FOREVER_REL,
+                                        GNUNET_NO,
+                                        &transport_notify_ready,
+                                        h);
+  GNUNET_assert (NULL != h->cth);
 }
 
 
 /**
- * Offer the transport service the HELLO of another peer.  Note that
- * the transport service may just ignore this message if the HELLO is
- * malformed or useless due to our local configuration.
+ * Schedule the task to send one message, either from the control
+ * list or the peer message queues  to the service.
  *
- * @param handle connection to transport service
- * @param hello the hello message
+ * @param h transport service to schedule a transmission for
  */
-void
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
-                              const struct GNUNET_MessageHeader *hello)
+static void
+schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
 {
-  struct GNUNET_MessageHeader *hc;
-  uint16_t size;
+  struct GNUNET_TIME_Relative delay;
+  struct Neighbour *n;
 
-  if (handle->client == NULL)
+  GNUNET_assert (NULL != h->client);
+  if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
     {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  "Not connected to transport service, dropping offered HELLO\n");
-#endif
-      return;
+      GNUNET_SCHEDULER_cancel (h->quota_task);
+      h->quota_task = GNUNET_SCHEDULER_NO_TASK;
     }
-  GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
-  size = ntohs (hello->size);
-  GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
-  hc = GNUNET_malloc (size);
-  memcpy (hc, hello, size);
-  schedule_control_transmit (handle,
-                             size,
-                             GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
+  if (NULL != h->control_head)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))    
+    delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, n->th->notify_size);
+  else
+    return; /* no work to be done */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Scheduling next transmission to service in %llu ms\n",
+             (unsigned long long) delay.rel_value);
+  h->quota_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                               &schedule_transmission_task,
+                                               h);
 }
 
 
 /**
- * Function we use for handling incoming messages.
+ * Queue control request for transmission to the transport
+ * service.
+ *
+ * @param h handle to the transport service
+ * @param size number of bytes to be transmitted
+ * @param notify function to call to get the content
+ * @param notify_cls closure for notify
  */
-static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
-
-
-static size_t
-send_start (void *cls, size_t size, void *buf)
+static void
+schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
+                           size_t size,
+                           GNUNET_CONNECTION_TransmitReadyNotify notify,
+                           void *notify_cls)
 {
-  struct GNUNET_MessageHeader *s = buf;
+  struct GNUNET_TRANSPORT_TransmitHandle *th;
 
-  if (buf == NULL)
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Timeout while trying to transmit `%s' request.\n",
-                  "START");
-#endif
-      return 0;
-    }
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting `%s' request.\n", "START");
+              "Control transmit of %u bytes requested\n",
+              size);
 #endif
-  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
-  s->size = htons (sizeof (struct GNUNET_MessageHeader));
-  s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
-  return sizeof (struct GNUNET_MessageHeader);
+  th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
+  th->notify = notify;
+  th->notify_cls = notify_cls;
+  th->notify_size = size;
+  GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
+                                   h->control_tail,
+                                   th);
+  schedule_transmission (h);
 }
 
 
 /**
- * We're ready to transmit the request that the transport service
- * should connect to a new peer.  In addition to sending the
- * request, schedule the next phase for the transmission processing
- * that caused the connect request in the first place.
+ * Transmit START message to service.
+ *
+ * @param cls unused
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
  */
 static size_t
-request_connect (void *cls, size_t size, void *buf)
+send_start (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
-  struct TryConnectMessage *tcm;
-  struct GNUNET_TRANSPORT_Handle *h;
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  struct StartMessage s;
 
-  GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
-  h = th->handle;
   if (buf == NULL)
     {
+      /* Can only be shutdown, just give up */
 #if DEBUG_TRANSPORT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit `%s' request for `%4s' to service.\n",
-                  "TRY_CONNECT", GNUNET_i2s (&th->target));
+                  "Shutdown while trying to transmit `%s' request.\n",
+                  "START");
 #endif
-      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-        {
-          GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
-          th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-        }
-      if (NULL != th->notify)
-       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-      GNUNET_free (th);
       return 0;
     }
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting `%s' message for `%4s' (need connection in %llu ms).\n",
-              "TRY_CONNECT", GNUNET_i2s (&th->target),
-              GNUNET_TIME_absolute_get_remaining (th->timeout).value);
+              "Transmitting `%s' request.\n", "START");
 #endif
-  GNUNET_assert (size >= sizeof (struct TryConnectMessage));
-  tcm = buf;
-  tcm->header.size = htons (sizeof (struct TryConnectMessage));
-  tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
-  tcm->reserved = htonl (0);
-  memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
-  th->notify_delay_task
-    = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                    GNUNET_TIME_absolute_get_remaining
-                                    (th->timeout),
-                                    &peer_transmit_timeout, th);
-  insert_transmit_handle (&h->connect_wait_head, th);
-  return sizeof (struct TryConnectMessage);
-}
-
-
-/**
- * Schedule a request to connect to the given
- * neighbour (and if successful, add the specified
- * handle to the wait list).
- *
- * @param th handle for a request to transmit once we
- *        have connected
- */
-static void
-try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
-  GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
-  schedule_control_transmit (th->handle,
-                             sizeof (struct TryConnectMessage),
-                             GNUNET_NO,
-                             GNUNET_TIME_absolute_get_remaining (th->timeout),
-                             &request_connect, th);
+  GNUNET_assert (size >= sizeof (struct StartMessage));
+  s.header.size = htons (sizeof (struct StartMessage));
+  s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+  s.do_check = htonl (h->check_self);
+  s.self = h->self;
+  memcpy (buf, &s, sizeof (struct StartMessage));
+  GNUNET_CLIENT_receive (h->client,
+                         &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
+  return sizeof (struct StartMessage);
 }
 
 
 /**
- * Task for delayed attempts to reconnect to a peer.
+ * Try again to connect to transport service.
  *
- * @param cls must be a transmit handle that determines the peer
- *        to which we will try to connect
- * @param tc scheduler information about why we were triggered (not used)
+ * @param cls the handle to the transport service
+ * @param tc scheduler context
  */
 static void
-try_connect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+reconnect (void *cls,
+          const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+  struct GNUNET_TRANSPORT_Handle *h = cls;
 
-  th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-  try_connect (th);
+  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+    {
+      /* shutdown, just give up */
+      return;
+    }
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Connecting to transport service.\n");
+#endif
+  GNUNET_assert (h->client == NULL);
+  GNUNET_assert (h->control_head == NULL);
+  GNUNET_assert (h->control_tail == NULL);
+  h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
+  GNUNET_assert (h->client != NULL);
+  schedule_control_transmit (h,
+                             sizeof (struct StartMessage),
+                            &send_start, h);
 }
 
 
 /**
- * Remove neighbour from our list.  Will automatically
- * trigger a re-connect attempt if we have messages pending
- * for this peer.
- * 
- * @param h our state
- * @param peer the peer to remove
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
+ *
+ * @param h transport service to reconnect
  */
 static void
-remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
-                  const struct GNUNET_PeerIdentity *peer)
+disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
 {
-  struct NeighbourList *prev;
-  struct NeighbourList *pos;
   struct GNUNET_TRANSPORT_TransmitHandle *th;
 
+  GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+  /* Forget about all neighbours that we used to be connected to */
+  GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 
+                                       &neighbour_delete, 
+                                       h);
+  if (NULL != h->cth)
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
+      h->cth = NULL;
+    }
+  if (NULL != h->client)
+    {
+      GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
+      h->client = NULL;
+    }
+  if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (h->quota_task);
+      h->quota_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  while ( (NULL != (th = h->control_head)))
+    {
+      GNUNET_CONTAINER_DLL_remove (h->control_head,
+                                   h->control_tail,
+                                   th);
+      th->notify (th->notify_cls, 0, NULL);
+      GNUNET_free (th);
+    }
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Removing neighbour `%s' from list of connected peers.\n",
-              GNUNET_i2s (peer));
+              "Scheduling task to reconnect to transport service in %llu ms.\n",
+              h->reconnect_delay.rel_value);
 #endif
-  prev = NULL;
-  pos = h->neighbours;
-  while ((pos != NULL) &&
-         (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
-    {
-      prev = pos;
-      pos = pos->next;
-    }
-  if (pos == NULL)
+  h->reconnect_task
+    = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
+                                   &reconnect, h);
+  if (h->reconnect_delay.rel_value == 0)
     {
-      GNUNET_break (0);
-      return;
+      h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
     }
-  if (prev == NULL)
-    h->neighbours = pos->next;
   else
-    prev->next = pos->next;
-  if (NULL != (th = pos->transmit_handle))
     {
-      pos->transmit_handle = NULL;
-      th->neighbour = NULL;
-      remove_from_any_list (th);
-      if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <=
-          CONNECT_RETRY_TIMEOUT.value)
-        {
-          /* signal error */
-          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                      _
-                      ("Connection with `%4s' failed and timeout was in the past, giving up on message delivery.\n"),
-                      GNUNET_i2s (peer));
-          GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
-          peer_transmit_timeout (th, NULL);
-        }
-      else
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                      _
-                      ("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"),
-                      GNUNET_i2s (peer),
-                      GNUNET_TIME_absolute_get_remaining (th->timeout).value);
-          /* try again in a bit */
-          GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
-          th->notify_delay_task
-            = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                            CONNECT_RETRY_TIMEOUT,
-                                            &try_connect_task, th);
-        }
+      h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
+      h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
+                                                    h->reconnect_delay);
     }
-  if (h->nc_cb != NULL)
-    h->nd_cb (h->cls, peer);
-  GNUNET_free (pos);
 }
 
 
 /**
- * Try again to connect to transport service.
+ * Closure for 'send_set_quota'.
  */
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+struct SetQuotaContext
 {
-  struct GNUNET_TRANSPORT_Handle *h = cls;
-  struct GNUNET_TRANSPORT_TransmitHandle *pos;
-  struct NeighbourList *n;
 
-  /* Forget about all neighbours that we used to be connected
-     to */
-  while (NULL != (n = h->neighbours))
-    remove_neighbour (h, &n->id);
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
-#endif
-  GNUNET_assert (h->client == NULL);
-  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
-  GNUNET_assert (h->client != NULL);
-  /* make sure we don't send "START" twice,
-     remove existing entry from queue (if present) */
-  pos = h->connect_ready_head;
-  while (pos != NULL)
-    {
-      if (pos->notify == &send_start)
-        {
-          if (pos->prev == NULL)
-            h->connect_ready_head = pos->next;
-          else
-            pos->prev->next = pos->next;
-          if (pos->next != NULL)
-            pos->next->prev = pos->prev;
-          GNUNET_assert (pos->neighbour == NULL);
-          if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task)
-            {
-              GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
-              pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-            }
-          GNUNET_free (pos);
-          break;
-        }
-      pos = pos->next;
-    }
-  schedule_control_transmit (h,
-                             sizeof (struct GNUNET_MessageHeader),
-                             GNUNET_YES,
-                             GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
-  GNUNET_CLIENT_receive (h->client,
-                         &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
-}
+  /**
+   * Identity of the peer impacted by the quota change.
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Quota to transmit.
+   */
+  struct GNUNET_BANDWIDTH_Value32NBO quota_in;
+};
 
 
 /**
- * Function that will schedule the job that will try
- * to connect us again to the client.
+ * Send SET_QUOTA message to the service.
+ *
+ * @param cls the 'struct SetQuotaContext'
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
  */
-static void
-schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
+static size_t
+send_set_quota (void *cls, size_t size, void *buf)
 {
+  struct SetQuotaContext *sqc = cls;
+  struct QuotaSetMessage msg;
+
+  if (buf == NULL)
+    {
+      GNUNET_free (sqc);
+      return 0;
+    }
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Scheduling task to reconnect to transport service in %llu ms.\n",
-              h->reconnect_delay.value);
+              "Transmitting `%s' request with respect to `%4s'.\n",
+              "SET_QUOTA",
+             GNUNET_i2s (&sqc->target));
 #endif
-  GNUNET_assert (h->client == NULL);
-  GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
-  h->reconnect_task
-    = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                    h->reconnect_delay, &reconnect, h);
-  h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
+  GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
+  msg.header.size = htons (sizeof (struct QuotaSetMessage));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
+  msg.quota = sqc->quota_in;
+  msg.peer = sqc->target;
+  memcpy (buf, &msg, sizeof (msg));
+  GNUNET_free (sqc);
+  return sizeof (struct QuotaSetMessage);
 }
 
 
 /**
- * We are connected to the respective peer, check the
- * bandwidth limits and schedule the transmission.
+ * Set the share of incoming bandwidth for the given
+ * peer to the specified amount.
+ *
+ * @param handle connection to transport service
+ * @param target who's bandwidth quota is being changed
+ * @param quota_in incoming bandwidth quota in bytes per ms
+ * @param quota_out outgoing bandwidth quota in bytes per ms
  */
-static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
+void
+GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
+                            const struct GNUNET_PeerIdentity *target,
+                            struct GNUNET_BANDWIDTH_Value32NBO quota_in,
+                            struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+{
+  struct Neighbour *n;
+  struct SetQuotaContext *sqc;
+   
+  n = neighbour_find (handle, target);
+  if (NULL == n)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "Quota changed to %u for peer `%s', but I have no such neighbour!\n",
+                 (unsigned int) ntohl (quota_out.value__),
+                 GNUNET_i2s (target));
+      return;
+    }
+  GNUNET_assert (NULL != handle->client);
+#if DEBUG_TRANSPORT
+  if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Quota changed from %u to %u for peer `%s'\n",
+               (unsigned int) n->out_tracker.available_bytes_per_s__,
+               (unsigned int) ntohl (quota_out.value__),
+               GNUNET_i2s (target));
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Quota remains at %u for peer `%s'\n",
+               (unsigned int) n->out_tracker.available_bytes_per_s__,
+               GNUNET_i2s (target));
+#endif
+  GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+                                        quota_out);
+  sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
+  sqc->target = *target;
+  sqc->quota_in = quota_in;
+  schedule_control_transmit (handle,
+                             sizeof (struct QuotaSetMessage),
+                             &send_set_quota, sqc);
+}
 
 
 /**
- * Function called by the scheduler when the timeout
- * for bandwidth availablility for the target
- * neighbour is reached.
+ * Send REQUEST_CONNECT message to the service.
+ *
+ * @param cls the 'struct GNUNET_PeerIdentity'
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
  */
-static void
-transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static size_t
+send_try_connect (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+  struct GNUNET_PeerIdentity *pid = cls;
+  struct TransportRequestConnectMessage msg;
 
-  th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-  schedule_request (th);
+  if (buf == NULL)
+    {
+      GNUNET_free (pid);
+      return 0;
+    }
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Transmitting `%s' request with respect to `%4s'.\n",
+              "REQUEST_CONNECT",
+             GNUNET_i2s (pid));
+#endif
+  GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
+  msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
+  msg.reserved = htonl (0);
+  msg.peer = *pid;
+  memcpy (buf, &msg, sizeof (msg));
+  GNUNET_free (pid);
+  return sizeof (struct TransportRequestConnectMessage);
 }
 
 
 /**
- * Remove the given transmit handle from the wait list.  Does NOT free
- * it.
+ * Ask the transport service to establish a connection to 
+ * the given peer.
+ *
+ * @param handle connection to transport service
+ * @param target who we should try to connect to
  */
-static void
-remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
+void
+GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
+                             const struct GNUNET_PeerIdentity *target)
 {
-  if (th->prev == NULL)
-    th->handle->connect_wait_head = th->next;
-  else
-    th->prev->next = th->next;
-  if (th->next != NULL)
-    th->next->prev = th->prev;
+  struct GNUNET_PeerIdentity *pid;
+
+  if (NULL == handle->client)
+    return;
+  pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
+  *pid = *target;
+  schedule_control_transmit (handle,
+                             sizeof (struct TransportRequestConnectMessage),
+                             &send_try_connect, pid);
 }
 
 
 /**
- * We are connected to the respective peer, check the
- * bandwidth limits and schedule the transmission.
+ * Send HELLO message to the service.
+ *
+ * @param cls the HELLO message to send
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
  */
-static void
-schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
+static size_t
+send_hello (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_TRANSPORT_Handle *h;
-  struct GNUNET_TIME_Relative duration;
-  struct NeighbourList *n;
-  uint64_t available;
+  struct GNUNET_MessageHeader *msg = cls;
+  uint16_t ssize;
 
-  h = th->handle;
-  n = th->neighbour;
-  if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
-      th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  /* check outgoing quota */
-  duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-  if (duration.value > MIN_QUOTA_REFRESH_TIME)
-    {
-      update_quota (n);
-      duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
-    }
-  available = duration.value * n->quota_out;
-  if (available < n->last_sent + th->notify_size)
-    {
-      /* calculate how much bandwidth we'd still need to
-         accumulate and based on that how long we'll have
-         to wait... */
-      available = n->last_sent + th->notify_size - available;
-      duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
-                                                available / n->quota_out);
-      if (th->timeout.value <
-          GNUNET_TIME_relative_to_absolute (duration).value)
-        {
-          /* signal timeout! */
-#if DEBUG_TRANSPORT
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
-                      duration.value, GNUNET_i2s (&th->target));
-#endif
-          remove_from_wait_list (th);
-         if (NULL != th->notify)
-           GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-          GNUNET_free (th);
-          return;
-        }
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
-                  GNUNET_i2s (&th->target), duration.value);
-#endif
-      th->notify_delay_task
-        = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                        duration, &transmit_ready, th);
-      return;
-    }
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Bandwidth available for transmission to `%4s'\n",
-              GNUNET_i2s (&n->id));
-#endif
-  if (GNUNET_NO == n->transmit_ok)
+  if (buf == NULL)
     {
-      /* we may be ready, but transport service is not;
-         wait for SendOkMessage or timeout */
-#if DEBUG_TRANSPORT
+#if DEBUG_TRANSPORT_TIMEOUT
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Need to wait for transport service `%s' message\n",
-                  "SEND_OK");
+                  "Timeout while trying to transmit `%s' request.\n",
+                  "HELLO");
 #endif
-      th->notify_delay_task
-        = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                        GNUNET_TIME_absolute_get_remaining
-                                        (th->timeout), &peer_transmit_timeout,
-                                        th);
-      return;
+      GNUNET_free (msg);
+      return 0;
     }
-  n->transmit_ok = GNUNET_NO;
-  remove_from_wait_list (th);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Moving message for `%4s' to ready list\n",
-              GNUNET_i2s (&n->id));
+              "Transmitting `%s' request.\n", "HELLO");
 #endif
-  insert_transmit_handle (&h->connect_ready_head, th);
-  if (GNUNET_NO == h->transmission_scheduled)
-    schedule_transmission (h);
+  ssize = ntohs (msg->size);
+  GNUNET_assert (size >= ssize);
+  memcpy (buf, msg, ssize);
+  GNUNET_free (msg);
+  return ssize;
 }
 
 
 /**
- * Add neighbour to our list
+ * Offer the transport service the HELLO of another peer.  Note that
+ * the transport service may just ignore this message if the HELLO is
+ * malformed or useless due to our local configuration.
+ *
+ * @param handle connection to transport service
+ * @param hello the hello message
+ * @param cont continuation to call when HELLO has been sent
+ * @param cls closure for continuation
+ *
  */
-static void
-add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
-               uint32_t quota_out,
-               struct GNUNET_TIME_Relative latency,
-               uint16_t distance,
-               const struct GNUNET_PeerIdentity *pid)
+void
+GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
+                              const struct GNUNET_MessageHeader *hello,
+                              GNUNET_SCHEDULER_Task cont,
+                              void *cls)
 {
-  struct NeighbourList *n;
-  struct GNUNET_TRANSPORT_TransmitHandle *prev;
-  struct GNUNET_TRANSPORT_TransmitHandle *pos;
-  struct GNUNET_TRANSPORT_TransmitHandle *next;
+  uint16_t size;
+  struct GNUNET_PeerIdentity peer;
+  struct GNUNET_MessageHeader *msg;
 
-  /* check for duplicates */
-  if (NULL != find_neighbour (h, pid))
+  if (NULL == handle->client)
+    return;
+  GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
+  size = ntohs (hello->size);
+  GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
+  if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) hello,
+                                       &peer))
     {
       GNUNET_break (0);
       return;
     }
+  msg = GNUNET_malloc(size);
+  memcpy (msg, hello, size);
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
+             "Offering `%s' message of `%4s' to transport for validation.\n",
+             "HELLO",
+             GNUNET_i2s (&peer));
 #endif
-  n = GNUNET_malloc (sizeof (struct NeighbourList));
-  n->id = *pid;
-  n->last_quota_update = GNUNET_TIME_absolute_get ();
-  n->quota_out = quota_out;
-  n->next = h->neighbours;
-  n->transmit_ok = GNUNET_YES;
-  h->neighbours = n;
-  if (h->nc_cb != NULL)
-    h->nc_cb (h->cls, &n->id, latency, distance);
-  prev = NULL;
-  pos = h->connect_wait_head;
+  schedule_control_transmit (handle,
+                             size,
+                             &send_hello, msg);
+}
+
+
+/**
+ * Obtain the HELLO message for this peer.
+ *
+ * @param handle connection to transport service
+ * @param rec function to call with the HELLO, sender will be our peer
+ *            identity; message and sender will be NULL on timeout
+ *            (handshake with transport service pending/failed).
+ *             cost estimate will be 0.
+ * @param rec_cls closure for rec
+ */
+void
+GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
+                            GNUNET_TRANSPORT_HelloUpdateCallback rec,
+                            void *rec_cls)
+{
+  struct HelloWaitList *hwl;
+
+  hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
+  hwl->rec = rec;
+  hwl->rec_cls = rec_cls;
+  GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
+                              handle->hwl_tail,
+                              hwl);
+  if (handle->my_hello == NULL)
+    return;
+  rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello);
+}
+
+
+/**
+ * Stop receiving updates about changes to our HELLO message.
+ *
+ * @param handle connection to transport service
+ * @param rec function previously registered to be called with the HELLOs
+ * @param rec_cls closure for rec
+ */
+void
+GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
+                                  GNUNET_TRANSPORT_HelloUpdateCallback rec,
+                                  void *rec_cls)
+{
+  struct HelloWaitList *pos;
+
+  pos = handle->hwl_head;
   while (pos != NULL)
     {
-      next = pos->next;
-      if (0 == memcmp (pid,
-                       &pos->target, sizeof (struct GNUNET_PeerIdentity)))
-        {
-          pos->neighbour = n;
-          GNUNET_assert (NULL == n->transmit_handle);
-          n->transmit_handle = pos;
-          if (prev == NULL)
-            h->connect_wait_head = next;
-          else
-            prev->next = next;
-//          if (GNUNET_YES == n->received_ack)
-//            {
-#if DEBUG_TRANSPORT
-              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                          "Found pending request for `%4s' will trigger it now.\n",
-                          GNUNET_i2s (&pos->target));
-#endif
-              if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-                {
-                  GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
-                  pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-                }
-              schedule_request (pos);
-//            }
-
-          break;
-        }
-      prev = pos;
-      pos = next;
+      if ( (pos->rec == rec) &&
+          (pos->rec_cls == rec_cls) )
+       break;
+      pos = pos->next;
     }
+  GNUNET_break (pos != NULL);
+  if (pos == NULL)
+    return;
+  GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
+                              handle->hwl_tail,
+                              pos);
+  GNUNET_free (pos);
 }
 
 
@@ -1383,17 +1290,17 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
  * Connect to the transport service.  Note that the connection may
  * complete (or fail) asynchronously.
  *
-
- * @param sched scheduler to use
  * @param cfg configuration to use
+ * @param self our own identity (API should check that it matches
+ *             the identity found by transport), or NULL (no check)
  * @param cls closure for the callbacks
  * @param rec receive function to call
  * @param nc function to call on connect events
  * @param nd function to call on disconnect events
  */
 struct GNUNET_TRANSPORT_Handle *
-GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
-                          const struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                         const struct GNUNET_PeerIdentity *self,
                           void *cls,
                           GNUNET_TRANSPORT_ReceiveCallback rec,
                           GNUNET_TRANSPORT_NotifyConnect nc,
@@ -1401,402 +1308,63 @@ GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
 {
   struct GNUNET_TRANSPORT_Handle *ret;
 
-  GNUNET_ARM_start_services (cfg, sched, "peerinfo", "transport", NULL);
   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
-  ret->sched = sched;
+  if (self != NULL)
+    {
+      ret->self = *self;
+      ret->check_self = GNUNET_YES;
+    }
   ret->cfg = cfg;
   ret->cls = cls;
   ret->rec = rec;
   ret->nc_cb = nc;
   ret->nd_cb = nd;
   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  schedule_reconnect (ret);
+  ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE);
+  ret->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
   return ret;
 }
 
 
 /**
  * Disconnect from the transport service.
+ *
+ * @param handle handle to the service as returned from GNUNET_TRANSPORT_connect
  */
 void
 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-  struct NeighbourList *n;
-  struct HelloWaitList *hwl;
-  struct GNUNET_CLIENT_Connection *client;
-
 #if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
+             "Transport disconnect called!\n");
 #endif
-  while (NULL != (th = handle->connect_ready_head))
-    {
-      handle->connect_ready_head = th->next;
-      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-        {
-          GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
-          th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-        }
-      if (NULL != th->notify)
-       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-      GNUNET_free (th);
-    }
-  while (NULL != (th = handle->connect_wait_head))
-    {
-      handle->connect_wait_head = th->next;
-      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-        {
-          GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
-          th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-        }
-      if (NULL != th->notify)
-       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-      GNUNET_free (th);
-    }
-  while (NULL != (n = handle->neighbours))
-    {
-      handle->neighbours = n->next;
-      if (NULL != (th = n->transmit_handle))
-        {
-          if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-            {
-              GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
-              th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-            }
-         if (NULL != th->notify)
-           GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));        
-          GNUNET_free (th);
-        }
-      GNUNET_free (n);
-    }
-  while (NULL != (hwl = handle->hwl_head))
-    {
-      handle->hwl_head = hwl->next;
-      GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  _
-                  ("Disconnect while notification for `%s' still registered.\n"),
-                  "HELLO");
-      if (hwl->rec != NULL)
-        hwl->rec (hwl->rec_cls, NULL);
-      GNUNET_free (hwl);
-    }
+  /* this disconnects all neighbours... */
+  if (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
+    disconnect_and_schedule_reconnect (handle);
+  /* and now we stop trying to connect again... */
   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
     {
-      GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
+      GNUNET_SCHEDULER_cancel (handle->reconnect_task);
       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+    }  
+  GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours);
+  handle->neighbours = NULL;
+  if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (handle->quota_task);
+      handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
     }
   GNUNET_free_non_null (handle->my_hello);
   handle->my_hello = NULL;
-  GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport",
-                            "peerinfo", NULL);
-  if (NULL != handle->network_handle)
-    {
-      GNUNET_CLIENT_notify_transmit_ready_cancel (handle->network_handle);
-      handle->network_handle = NULL;
-    }
-  if (NULL != (client = handle->client))
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Disconnecting from transport service for good.\n");
-#endif
-      handle->client = NULL;
-      GNUNET_CLIENT_disconnect (client);
-    }
+  GNUNET_assert (handle->hwl_head == NULL);
+  GNUNET_assert (handle->hwl_tail == NULL);
+  GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
+  handle->ready_heap = NULL;
   GNUNET_free (handle);
 }
 
 
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_TRANSPORT_Handle *h = cls;
-  const struct DisconnectInfoMessage *dim;
-  const struct ConnectInfoMessage *cim;
-  const struct InboundMessage *im;
-  const struct GNUNET_MessageHeader *imm;
-  const struct SendOkMessage *okm;
-  struct HelloWaitList *hwl;
-  struct HelloWaitList *next_hwl;
-  struct NeighbourList *n;
-  struct GNUNET_PeerIdentity me;
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-  uint16_t size;
-
-  if ((msg == NULL) || (h->client == NULL))
-    {
-      if (h->client != NULL)
-        {
-#if DEBUG_TRANSPORT
-          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                      "Error receiving from transport service, disconnecting temporarily.\n");
-#endif
-          if (h->network_handle != NULL)
-            {
-              GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
-              h->network_handle = NULL;
-              h->transmission_scheduled = GNUNET_NO;
-              th = h->connect_ready_head;
-              /* add timeout again, we cancelled the transmit_ready task! */
-              GNUNET_assert (th->notify_delay_task ==
-                             GNUNET_SCHEDULER_NO_TASK);
-              th->notify_delay_task =
-                GNUNET_SCHEDULER_add_delayed (h->sched,
-                                              GNUNET_TIME_absolute_get_remaining
-                                              (th->timeout),
-                                              &peer_transmit_timeout, th);
-            }
-          GNUNET_CLIENT_disconnect (h->client);
-          h->client = NULL;
-          schedule_reconnect (h);
-        }
-      else
-        {
-          /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
-             finish clean up work! */
-          GNUNET_free (h);
-        }
-      return;
-    }
-  GNUNET_CLIENT_receive (h->client,
-                         &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
-  size = ntohs (msg->size);
-  switch (ntohs (msg->type))
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      if (GNUNET_OK !=
-          GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
-                               &me))
-        {
-          GNUNET_break (0);
-          break;
-        }
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Receiving (my own) `%s' message, I am `%4s'.\n",
-                  "HELLO", GNUNET_i2s (&me));
-#endif
-      GNUNET_free_non_null (h->my_hello);
-      h->my_hello = NULL;
-      if (size < sizeof (struct GNUNET_MessageHeader))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      h->my_hello = GNUNET_malloc (size);
-      memcpy (h->my_hello, msg, size);
-      hwl = h->hwl_head;
-      while (NULL != hwl)
-        {
-         next_hwl = hwl->next;
-          hwl->rec (hwl->rec_cls,
-                   (const struct GNUNET_MessageHeader *) h->my_hello);
-         hwl = next_hwl;
-        }
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
-      if (size != sizeof (struct ConnectInfoMessage))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      cim = (const struct ConnectInfoMessage *) msg;
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Receiving `%s' message for `%4s'.\n",
-                  "CONNECT", GNUNET_i2s (&cim->id));
-#endif
-      add_neighbour (h,
-                     ntohl (cim->quota_out),
-                     GNUNET_TIME_relative_ntoh (cim->latency), ntohs(cim->distance), &cim->id);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
-      if (size != sizeof (struct DisconnectInfoMessage))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      dim = (const struct DisconnectInfoMessage *) msg;
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Receiving `%s' message for `%4s'.\n",
-                  "DISCONNECT", GNUNET_i2s (&dim->peer));
-#endif
-      remove_neighbour (h, &dim->peer);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
-      if (size != sizeof (struct SendOkMessage))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      okm = (const struct SendOkMessage *) msg;
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Receiving `%s' message, transmission %s.\n", "SEND_OK",
-                  ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
-#endif
-      n = find_neighbour (h, &okm->peer);
-      GNUNET_assert (n != NULL);
-      n->transmit_ok = GNUNET_YES;
-      if (n->transmit_handle != NULL)
-        {
-#if DEBUG_TRANSPORT
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Processing pending message for `%4s'\n",
-                      GNUNET_i2s (&n->id));
-#endif
-          GNUNET_SCHEDULER_cancel (h->sched,
-                                   n->transmit_handle->notify_delay_task);
-          n->transmit_handle->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-          GNUNET_assert (GNUNET_YES == n->received_ack);
-          schedule_request (n->transmit_handle);
-        }
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Receiving `%s' message.\n", "RECV");
-#endif
-      if (size <
-          sizeof (struct InboundMessage) +
-          sizeof (struct GNUNET_MessageHeader))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      im = (const struct InboundMessage *) msg;
-      imm = (const struct GNUNET_MessageHeader *) &im[1];
-      if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
-        {
-          GNUNET_break (0);
-          break;
-        }
-      switch (ntohs (imm->type))
-        {
-        case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
-#if DEBUG_TRANSPORT
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Receiving `%s' message from `%4s'.\n",
-                      "ACK", GNUNET_i2s (&im->peer));
-#endif
-          n = find_neighbour (h, &im->peer);
-          if (n == NULL)
-            {
-              GNUNET_break (0);
-              break;
-            }
-          if (n->received_ack == GNUNET_NO)
-            {
-              n->received_ack = GNUNET_YES;
-              if (NULL != n->transmit_handle)
-                {
-#if DEBUG_TRANSPORT
-                  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                              "Peer connected, scheduling delayed message for delivery now.\n");
-#endif
-                  schedule_request (n->transmit_handle);
-                }
-            }
-          break;
-        default:
-#if DEBUG_TRANSPORT
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Received message of type %u from `%4s'.\n",
-                      ntohs (imm->type), GNUNET_i2s (&im->peer));
-#endif
-          if (h->rec != NULL)
-            h->rec (h->cls, &im->peer, imm,
-                    GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance));
-          break;
-        }
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  _
-                  ("Received unexpected message of type %u in %s:%u\n"),
-                  ntohs (msg->type), __FILE__, __LINE__);
-      GNUNET_break (0);
-      break;
-    }
-}
-
-
-struct ClientTransmitWrapper
-{
-  GNUNET_CONNECTION_TransmitReadyNotify notify;
-  void *notify_cls;
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-};
-
-
-/**
- * Transmit message of a client destined for another
- * peer to the service.
- */
-static size_t
-client_notify_wrapper (void *cls, size_t size, void *buf)
-{
-  struct ClientTransmitWrapper *ctw = cls;
-  struct OutboundMessage *obm;
-  struct GNUNET_MessageHeader *hdr;
-  size_t ret;
-
-  if (size == 0)
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Transmission request could not be satisfied.\n");
-#endif
-      if (NULL != ctw->notify)
-       GNUNET_assert (0 == ctw->notify (ctw->notify_cls, 0, NULL));
-      GNUNET_free (ctw);
-      return 0;
-    }
-  GNUNET_assert (size >= sizeof (struct OutboundMessage));
-  obm = buf;
-  if (ctw->notify != NULL)
-    ret = ctw->notify (ctw->notify_cls,
-                      size - sizeof (struct OutboundMessage),
-                      (void *) &obm[1]);
-  else
-    ret = 0;
-  if (ret == 0)
-    {
-      /* Need to reset flag, no SEND means no SEND_OK! */
-      ctw->th->neighbour->transmit_ok = GNUNET_YES;
-      GNUNET_free (ctw);
-      return 0;
-    }
-  GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
-  hdr = (struct GNUNET_MessageHeader *) &obm[1];
-  GNUNET_assert (ntohs (hdr->size) == ret);
-  GNUNET_assert (ret + sizeof (struct OutboundMessage) <
-                 GNUNET_SERVER_MAX_MESSAGE_SIZE);
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmitting `%s' message with data for `%4s'\n",
-              "SEND", GNUNET_i2s (&ctw->th->target));
-#endif
-  ret += sizeof (struct OutboundMessage);
-  obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
-  obm->header.size = htons (ret);
-  obm->priority = htonl (ctw->th->priority);
-  obm->peer = ctw->th->target;
-  GNUNET_free (ctw);
-  return ret;
-}
-
-
-
 /**
  * Check if we could queue a message of the given size for
  * transmission.  The transport service will take both its
@@ -1817,123 +1385,86 @@ client_notify_wrapper (void *cls, size_t size, void *buf)
  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
  */
 struct GNUNET_TRANSPORT_TransmitHandle *
-GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
-                                        *handle,
-                                        const struct GNUNET_PeerIdentity
-                                        *target, size_t size,
-                                        unsigned int priority,
+GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
+                                        const struct GNUNET_PeerIdentity *target,
+                                       size_t size,
+                                        uint32_t priority,
                                         struct GNUNET_TIME_Relative timeout,
-                                        GNUNET_CONNECTION_TransmitReadyNotify
-                                        notify, void *notify_cls)
+                                        GNUNET_CONNECTION_TransmitReadyNotify notify, 
+                                       void *notify_cls)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *pos;
+  struct Neighbour *n;
   struct GNUNET_TRANSPORT_TransmitHandle *th;
-  struct NeighbourList *n;
-  struct ClientTransmitWrapper *ctw;
-
-  if (size + sizeof (struct OutboundMessage) >=
-      GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  struct GNUNET_TIME_Relative delay;
+  
+  n = neighbour_find (handle, target);
+  if (NULL == n)
     {
-      GNUNET_break (0);
+      /* use GNUNET_TRANSPORT_try_connect first, only use this function
+        once a connection has been established */
+      GNUNET_assert (0);
       return NULL;
     }
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
-              size, GNUNET_i2s (target));
-#endif
-  n = find_neighbour (handle, target);
-  if ((n != NULL) && (n->transmit_handle != NULL))
-    return NULL;                /* already have a request pending for this peer! */
-  ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
+  if (NULL != n->th)
+    {
+      /* attempt to send two messages at the same time to the same peer */
+      GNUNET_assert (0);
+      return NULL;
+    }
+  GNUNET_assert (NULL == n->hn);
   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
-  ctw->notify = notify;
-  ctw->notify_cls = notify_cls;
-  ctw->th = th;
-  th->handle = handle;
   th->neighbour = n;
-  th->target = *target;
-  th->notify = &client_notify_wrapper;
-  th->notify_cls = ctw;
+  th->notify = notify;
+  th->notify_cls = notify_cls;
   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  th->notify_size = size + sizeof (struct OutboundMessage);
+  th->notify_size = size;
   th->priority = priority;
-  if (NULL == n)
-    {
-      pos = handle->connect_wait_head;
-      while (pos != NULL)
-        {
-          GNUNET_assert (0 != memcmp (target,
-                                      &pos->target,
-                                      sizeof (struct GNUNET_PeerIdentity)));
-          pos = pos->next;
-        }
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
-#endif
-      try_connect (th);
-      return th;
-    }
-
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmission request queued for transmission to transport service.\n");
-#endif
-  GNUNET_assert (NULL == n->transmit_handle);
-  n->transmit_handle = th;
-  if (GNUNET_YES != n->received_ack)
-    {
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
-                  GNUNET_i2s (target), timeout.value);
-#endif
-      th->notify_delay_task
-        = GNUNET_SCHEDULER_add_delayed (handle->sched,
-                                        timeout, &peer_transmit_timeout, th);
-      return th;
-    }
-
+  n->th = th;
+  /* calculate when our transmission should be ready */
+  delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
+  if (delay.rel_value > timeout.rel_value)
+    delay.rel_value = 0; /* notify immediately (with failure) */
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
-              GNUNET_i2s (target));
+             "Bandwidth tracker allows next transmission to peer %s in %llu ms\n",
+             GNUNET_i2s (target),
+             (unsigned long long) delay.rel_value);
 #endif
-  th->notify_delay_task
-    = GNUNET_SCHEDULER_add_now (handle->sched, &transmit_ready, th);
+  n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
+                                       n, 
+                                       delay.rel_value);
+  schedule_transmission (handle);
   return th;
 }
 
 
 /**
  * Cancel the specified transmission-ready notification.
+ *
+ * @param th handle returned from GNUNET_TRANSPORT_notify_transmit_ready
  */
 void
-GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
-                                               GNUNET_TRANSPORT_TransmitHandle
-                                               *th)
+GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
 {
-  struct GNUNET_TRANSPORT_Handle *h;
+  struct Neighbour *n;
 
-#if DEBUG_TRANSPORT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmission request of %u bytes to `%4s' was cancelled.\n",
-              th->notify_size - sizeof (struct OutboundMessage),
-              GNUNET_i2s (&th->target));
-#endif
-  GNUNET_assert (th->notify == &client_notify_wrapper);
-  remove_from_any_list (th);
-  h = th->handle;
-  if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
+  GNUNET_assert (NULL == th->next);
+  GNUNET_assert (NULL == th->prev);
+  n = th->neighbour;
+  GNUNET_assert (th == n->th);
+  n->th = NULL;
+  if (n->hn != NULL)
     {
-      GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
-      h->network_handle = NULL;
-      h->transmission_scheduled = GNUNET_NO;
+      GNUNET_CONTAINER_heap_remove_node (n->hn);
+      n->hn = NULL;
     }
-  GNUNET_free (th->notify_cls);
-  GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
-  GNUNET_free (th);
+  else
+    {
+      GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != th->timeout_task);
+      GNUNET_SCHEDULER_cancel (th->timeout_task);
+      th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  GNUNET_free (th);                                        
 }