adding Ludo's gnunet-download-manager.scm back to SVN HEAD
[oweals/gnunet.git] / src / core / core_api.c
index 52a36342c92030346d12697b75b760e61419c7bc..66df134fb1acaf7c5ace5f77f1f02c79a954bf14 100644 (file)
 #include "gnunet_core_service.h"
 #include "core.h"
 
+#define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
 
 /**
- * Context for the core service connection.
+ * Information we track for each peer.
  */
-struct GNUNET_CORE_Handle
+struct PeerRecord
+{
+
+  /**
+   * We generally do NOT keep peer records in a DLL; this
+   * DLL is only used IF this peer's 'pending_head' message
+   * is ready for transmission.
+   */
+  struct PeerRecord *prev;
+
+  /**
+   * We generally do NOT keep peer records in a DLL; this
+   * DLL is only used IF this peer's 'pending_head' message
+   * is ready for transmission.
+   */
+  struct PeerRecord *next;
+
+  /**
+   * Peer the record is about.
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  /**
+   * Corresponding core handle.
+   */
+  struct GNUNET_CORE_Handle *ch;
+
+  /**
+   * Head of doubly-linked list of pending requests.
+   * Requests are sorted by deadline *except* for HEAD,
+   * which is only modified upon transmission to core.
+   */
+  struct GNUNET_CORE_TransmitHandle *pending_head;
+
+  /**
+   * Tail of doubly-linked list of pending requests.
+   */
+  struct GNUNET_CORE_TransmitHandle *pending_tail;
+
+  /**
+   * ID of timeout task for the 'pending_head' handle
+   * which is the one with the smallest timeout.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
+   * ID of task to run 'next_request_transmission'.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier ntr_task;
+
+  /**
+   * Current size of the queue of pending requests.
+   */
+  unsigned int queue_size;
+
+  /**
+   * SendMessageRequest ID generator for this peer.
+   */
+  uint16_t smr_id_gen;
+
+};
+
+
+/**
+ * Type of function called upon completion.
+ *
+ * @param cls closure
+ * @param success GNUNET_OK on success (which for request_connect
+ *        ONLY means that we transmitted the connect request to CORE,
+ *        it does not mean that we are actually now connected!);
+ *        GNUNET_NO on timeout,
+ *        GNUNET_SYSERR if core was shut down
+ */
+typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success);
+
+
+/**
+ * Entry in a doubly-linked list of control messages to be transmitted
+ * to the core service.  Control messages include traffic allocation,
+ * connection requests and of course our initial 'init' request.
+ *
+ * The actual message is allocated at the end of this struct.
+ */
+struct ControlMessage
 {
+  /**
+   * This is a doubly-linked list.
+   */
+  struct ControlMessage *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct ControlMessage *prev;
+
+  /**
+   * Function to run after transmission failed/succeeded.
+   */
+  GNUNET_CORE_ControlContinuation cont;
 
   /**
-   * Our scheduler.
+   * Closure for 'cont'.
    */
-  struct GNUNET_SCHEDULER_Handle *sched;
+  void *cont_cls;
+
+  /**
+   * Transmit handle (if one is associated with this ControlMessage), or NULL.
+   */
+  struct GNUNET_CORE_TransmitHandle *th;
+};
+
+
+
+/**
+ * Context for the core service connection.
+ */
+struct GNUNET_CORE_Handle
+{
 
   /**
    * Configuration we're using.
@@ -66,11 +178,6 @@ struct GNUNET_CORE_Handle
    */
   GNUNET_CORE_DisconnectEventHandler disconnects;
 
-  /**
-   * Function to call whenever we're notified about a peer changing status.
-   */  
-  GNUNET_CORE_PeerStatusEventHandler status_events;
-  
   /**
    * Function to call whenever we receive an inbound message.
    */
@@ -87,9 +194,9 @@ struct GNUNET_CORE_Handle
   const struct GNUNET_CORE_MessageHandler *handlers;
 
   /**
-   * Our connection to the service for notifications.
+   * Our connection to the service.
    */
-  struct GNUNET_CLIENT_Connection *client_notifications;
+  struct GNUNET_CLIENT_Connection *client;
 
   /**
    * Handle for our current transmission request.
@@ -99,39 +206,51 @@ struct GNUNET_CORE_Handle
   /**
    * Head of doubly-linked list of pending requests.
    */
-  struct GNUNET_CORE_TransmitHandle *pending_head;
+  struct ControlMessage *control_pending_head;
 
   /**
    * Tail of doubly-linked list of pending requests.
    */
-  struct GNUNET_CORE_TransmitHandle *pending_tail;
+  struct ControlMessage *control_pending_tail;
 
   /**
-   * Currently submitted request (or NULL)
+   * Head of doubly-linked list of peers that are core-approved
+   * to send their next message.
    */
-  struct GNUNET_CORE_TransmitHandle *submitted;
+  struct PeerRecord *ready_peer_head;
 
   /**
-   * Currently submitted request based on solicitation (or NULL)
+   * Tail of doubly-linked list of peers that are core-approved
+   * to send their next message.
    */
-  struct GNUNET_CORE_TransmitHandle *solicit_transmit_req;
+  struct PeerRecord *ready_peer_tail;
 
   /**
-   * Buffer where we store a message for transmission in response
-   * to a traffic solicitation (or NULL).
+   * Hash map listing all of the peers that we are currently
+   * connected to.
    */
-  char *solicit_buffer;
+  struct GNUNET_CONTAINER_MultiHashMap *peers;
 
   /**
-   * How long to wait until we time out the connection attempt?
+   * Identity of this peer.
    */
-  struct GNUNET_TIME_Absolute startup_timeout;
+  struct GNUNET_PeerIdentity me;
 
   /**
    * ID of reconnect task (if any).
    */
   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
 
+  /**
+   * Current delay we use for re-trying to connect to core.
+   */
+  struct GNUNET_TIME_Relative retry_backoff;
+
+  /**
+   * Number of messages we are allowed to queue per target.
+   */
+  unsigned int queue_size;
+
   /**
    * Number of entries in the handlers array.
    */
@@ -154,6 +273,7 @@ struct GNUNET_CORE_Handle
    * requests?
    */
   int currently_down;
+
 };
 
 
@@ -174,9 +294,15 @@ struct GNUNET_CORE_TransmitHandle
   struct GNUNET_CORE_TransmitHandle *prev;
 
   /**
-   * Corresponding core handle.
+   * Corresponding peer record.
    */
-  struct GNUNET_CORE_Handle *ch;
+  struct PeerRecord *peer;
+
+  /**
+   * Corresponding SEND_REQUEST message.  Only non-NULL
+   * while SEND_REQUEST message is pending.
+   */
+  struct ControlMessage *cm;
 
   /**
    * Function that will be called to get the actual request
@@ -192,90 +318,240 @@ struct GNUNET_CORE_TransmitHandle
   void *get_message_cls;
 
   /**
-   * If this entry is for a transmission request, pointer
-   * to the notify callback; otherwise NULL.
+   * Timeout for this handle.
    */
-  GNUNET_CONNECTION_TransmitReadyNotify notify;
+  struct GNUNET_TIME_Absolute timeout;
 
   /**
-   * Closure for notify.
+   * How important is this message?
    */
-  void *notify_cls;
+  uint32_t priority;
 
   /**
-   * Peer the request is about.
+   * Size of this request.
    */
-  struct GNUNET_PeerIdentity peer;
+  uint16_t msize;
 
   /**
-   * Timeout for this handle.
+   * Send message request ID for this request.
    */
-  struct GNUNET_TIME_Absolute timeout;
+  uint16_t smr_id;
 
   /**
-   * ID of timeout task.
+   * Is corking allowed?
    */
-  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+  int cork;
 
-  /**
-   * How important is this message?
-   */
-  uint32_t priority;
+};
 
-  /**
-   * Size of this request.
-   */
-  uint16_t msize;
+
+/**
+ * Our current client connection went down.  Clean it up
+ * and try to reconnect!
+ *
+ * @param h our handle to the core service
+ */
+static void
+reconnect (struct GNUNET_CORE_Handle *h);
 
 
-};
+/**
+ * Task schedule to try to re-connect to core.
+ *
+ * @param cls the 'struct GNUNET_CORE_Handle'
+ * @param tc task context
+ */
+static void
+reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_CORE_Handle *h = cls;
+
+  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+#if DEBUG_CORE
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
+#endif
+  reconnect (h);
+}
+
+
+/**
+ * Notify clients about disconnect and free
+ * the entry for connected peer.
+ *
+ * @param cls the 'struct GNUNET_CORE_Handle*'
+ * @param key the peer identity (not used)
+ * @param value the 'struct PeerRecord' to free.
+ * @return GNUNET_YES (continue)
+ */
+static int
+disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key,
+                                void *value)
+{
+  struct GNUNET_CORE_Handle *h = cls;
+  struct GNUNET_CORE_TransmitHandle *th;
+  struct PeerRecord *pr = value;
+
+  if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (pr->timeout_task);
+    pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (pr->ntr_task);
+    pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
+    GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
+  if (h->disconnects != NULL)
+    h->disconnects (h->cls, &pr->peer);
+  /* all requests should have been cancelled, clean up anyway, just in case */
+  GNUNET_break (pr->queue_size == 0);
+  while (NULL != (th = pr->pending_head))
+  {
+    GNUNET_break (0);
+    GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
+    pr->queue_size--;
+    if (th->cm != NULL)
+      th->cm->th = NULL;
+    GNUNET_free (th);
+  }
+  /* done with 'voluntary' cleanups, now on to normal freeing */
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr));
+  GNUNET_assert (pr->pending_head == NULL);
+  GNUNET_assert (pr->pending_tail == NULL);
+  GNUNET_assert (pr->ch == h);
+  GNUNET_assert (pr->queue_size == 0);
+  GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_free (pr);
+  return GNUNET_YES;
+}
 
 
+/**
+ * Close down any existing connection to the CORE service and
+ * try re-establishing it later.
+ *
+ * @param h our handle
+ */
 static void
-reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+reconnect_later (struct GNUNET_CORE_Handle *h)
+{
+  struct ControlMessage *cm;
+  struct PeerRecord *pr;
+
+  GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+  if (NULL != h->cth)
+  {
+    GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
+    h->cth = NULL;
+  }
+  if (h->client != NULL)
+  {
+    GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+    h->client = NULL;
+  }
+  h->currently_down = GNUNET_YES;
+  GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+  h->reconnect_task =
+      GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_task, h);
+  while (NULL != (cm = h->control_pending_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
+                                 h->control_pending_tail, cm);
+    if (cm->th != NULL)
+      cm->th->cm = NULL;
+    if (cm->cont != NULL)
+      cm->cont (cm->cont_cls, GNUNET_NO);
+    GNUNET_free (cm);
+  }
+  GNUNET_CONTAINER_multihashmap_iterate (h->peers,
+                                         &disconnect_and_free_peer_entry, h);
+  while (NULL != (pr = h->ready_peer_head))
+    GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
+  GNUNET_assert (h->control_pending_head == NULL);
+  h->retry_backoff =
+      GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff);
+  h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2);
+}
 
 
 /**
- * Function called when we are ready to transmit our
- * "START" message (or when this operation timed out).
+ * Check the list of pending requests, send the next
+ * one to the core.
  *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param h core handle
+ * @param ignore_currently_down transmit message even if not initialized?
  */
-static size_t transmit_start (void *cls, size_t size, void *buf);
+static void
+trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down);
 
 
 /**
- * Our current client connection went down.  Clean it up
- * and try to reconnect!
+ * The given request hit its timeout.  Remove from the
+ * doubly-linked list and call the respective continuation.
  *
- * @param h our handle to the core service
+ * @param cls the transmit handle of the request that timed out
+ * @param tc context, can be NULL (!)
  */
 static void
-reconnect (struct GNUNET_CORE_Handle *h)
+transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Send a control message to the peer asking for transmission
+ * of the message in the given peer record.
+ *
+ * @param pr peer to request transmission to
+ */
+static void
+request_next_transmission (struct PeerRecord *pr)
 {
+  struct GNUNET_CORE_Handle *h = pr->ch;
+  struct ControlMessage *cm;
+  struct SendMessageRequest *smr;
+  struct GNUNET_CORE_TransmitHandle *th;
+
+  if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (pr->timeout_task);
+    pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (NULL == (th = pr->pending_head))
+  {
+    trigger_next_request (h, GNUNET_NO);
+    return;
+  }
+  if (th->cm != NULL)
+    return;                     /* already done */
+  GNUNET_assert (pr->prev == NULL);
+  GNUNET_assert (pr->next == NULL);
+  pr->timeout_task =
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
+                                    (th->timeout), &transmission_timeout, pr);
+  cm = GNUNET_malloc (sizeof (struct ControlMessage) +
+                      sizeof (struct SendMessageRequest));
+  th->cm = cm;
+  cm->th = th;
+  smr = (struct SendMessageRequest *) &cm[1];
+  smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
+  smr->header.size = htons (sizeof (struct SendMessageRequest));
+  smr->priority = htonl (th->priority);
+  smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
+  smr->peer = pr->peer;
+  smr->queue_size = htonl (pr->queue_size);
+  smr->size = htons (th->msize);
+  smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
+  GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
+                                    h->control_pending_tail, cm);
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Reconnecting to CORE service\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Adding SEND REQUEST for peer `%s' to message queue\n",
+       GNUNET_i2s (&pr->peer));
 #endif
-  if (h->client_notifications != NULL)
-    GNUNET_CLIENT_disconnect (h->client_notifications, GNUNET_NO);
-  h->currently_down = GNUNET_YES;
-  h->client_notifications = GNUNET_CLIENT_connect (h->sched, "core", h->cfg);
-  if (h->client_notifications == NULL)
-    h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                                     GNUNET_TIME_UNIT_SECONDS,
-                                                     &reconnect_task,
-                                                     h);
-  else
-    h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client_notifications,
-                                                 sizeof (struct InitMessage) +
-                                                 sizeof (uint16_t) * h->hcnt,
-                                                 GNUNET_TIME_UNIT_SECONDS,
-                                                 GNUNET_NO,
-                                                 &transmit_start, h);
+  trigger_next_request (h, GNUNET_NO);
 }
 
 
@@ -287,94 +563,204 @@ reconnect (struct GNUNET_CORE_Handle *h)
  * @param tc context, can be NULL (!)
  */
 static void
-timeout_request (void *cls, 
-                const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_CORE_TransmitHandle *th = cls;
+  struct PeerRecord *pr = cls;
+  struct GNUNET_CORE_Handle *h = pr->ch;
+  struct GNUNET_CORE_TransmitHandle *th;
 
-  GNUNET_CONTAINER_DLL_remove (th->ch->pending_head,
-                               th->ch->pending_tail,
-                               th);
-  th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  th = pr->pending_head;
+  GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
+  pr->queue_size--;
+  if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
+  {
+    /* the request that was 'approved' by core was
+     * canceled before it could be transmitted; remove
+     * us from the 'ready' list */
+    GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
+  }
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Signalling timeout of request for transmission to CORE service\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Signalling timeout of request for transmission to CORE service\n");
 #endif
+  request_next_transmission (pr);
   GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
+  GNUNET_free (th);
 }
 
 
 /**
- * Function called when we are ready to transmit a request from our
- * request list (or when this operation timed out).
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * Transmit the next message to the core service.
  */
 static size_t
-request_start (void *cls, size_t size, void *buf)
+transmit_message (void *cls, size_t size, void *buf)
 {
   struct GNUNET_CORE_Handle *h = cls;
+  struct ControlMessage *cm;
   struct GNUNET_CORE_TransmitHandle *th;
+  struct PeerRecord *pr;
+  struct SendMessage *sm;
+  const struct GNUNET_MessageHeader *hdr;
+  uint16_t msize;
   size_t ret;
 
-  h->cth = NULL;  
-  th = h->pending_head;
-  if (th == NULL)
-    return 0;
+  GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+  h->cth = NULL;
   if (buf == NULL)
+  {
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Transmission failed, initiating reconnect\n");
+#endif
+    reconnect_later (h);
+    return 0;
+  }
+  /* first check for control messages */
+  if (NULL != (cm = h->control_pending_head))
+  {
+    hdr = (const struct GNUNET_MessageHeader *) &cm[1];
+    msize = ntohs (hdr->size);
+    if (size < msize)
     {
-      if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
-        GNUNET_SCHEDULER_cancel(h->sched, th->timeout_task);
-      timeout_request (th, NULL);
+      trigger_next_request (h, GNUNET_NO);
       return 0;
     }
-  GNUNET_CONTAINER_DLL_remove (h->pending_head,
-                              h->pending_tail,
-                              th);
-  GNUNET_assert (h->submitted == NULL);
-  h->submitted = th;
-  GNUNET_assert (size >= th->msize);
-  ret = th->get_message (th->get_message_cls, size, buf);
-  GNUNET_assert (ret <= size);
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u bytes to core\n",
-             ret);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Transmitting control message with %u bytes of type %u to core.\n",
+         (unsigned int) msize, (unsigned int) ntohs (hdr->type));
 #endif
-  return ret;
+    memcpy (buf, hdr, msize);
+    GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
+                                 h->control_pending_tail, cm);
+    if (cm->th != NULL)
+      cm->th->cm = NULL;
+    if (NULL != cm->cont)
+      cm->cont (cm->cont_cls, GNUNET_OK);
+    GNUNET_free (cm);
+    trigger_next_request (h, GNUNET_NO);
+    return msize;
+  }
+  /* now check for 'ready' P2P messages */
+  if (NULL != (pr = h->ready_peer_head))
+  {
+    GNUNET_assert (pr->pending_head != NULL);
+    th = pr->pending_head;
+    if (size < th->msize + sizeof (struct SendMessage))
+    {
+      trigger_next_request (h, GNUNET_NO);
+      return 0;
+    }
+    GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
+    GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
+    pr->queue_size--;
+    if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (pr->timeout_task);
+      pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Transmitting SEND request to `%s' with %u bytes.\n",
+         GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
+#endif
+    sm = (struct SendMessage *) buf;
+    sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
+    sm->priority = htonl (th->priority);
+    sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
+    sm->peer = pr->peer;
+    sm->cork = htonl ((uint32_t) th->cork);
+    sm->reserved = htonl (0);
+    ret =
+        th->get_message (th->get_message_cls,
+                         size - sizeof (struct SendMessage), &sm[1]);
+
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Transmitting SEND request to `%s' yielded %u bytes.\n",
+         GNUNET_i2s (&pr->peer), ret);
+#endif
+    GNUNET_free (th);
+    if (0 == ret)
+    {
+#if DEBUG_CORE
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Size of clients message to peer %s is 0!\n",
+           GNUNET_i2s (&pr->peer));
+#endif
+      /* client decided to send nothing! */
+      request_next_transmission (pr);
+      return 0;
+    }
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Produced SEND message to core with %u bytes payload\n",
+         (unsigned int) ret);
+#endif
+    GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
+    if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      request_next_transmission (pr);
+      return 0;
+    }
+    ret += sizeof (struct SendMessage);
+    sm->header.size = htons (ret);
+    GNUNET_assert (ret <= size);
+    request_next_transmission (pr);
+    return ret;
+  }
+  return 0;
 }
 
 
 /**
  * Check the list of pending requests, send the next
  * one to the core.
+ *
+ * @param h core handle
+ * @param ignore_currently_down transmit message even if not initialized?
  */
 static void
-trigger_next_request (struct GNUNET_CORE_Handle *h)
+trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
 {
-  struct GNUNET_CORE_TransmitHandle *th;
+  uint16_t msize;
 
-  if (h->currently_down)
-    {
+  if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
+  {
 #if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "In trigger_next_request, connection currently down...\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Core connection down, not processing queue\n");
 #endif
-      return;                     /* connection temporarily down */
-    }
-  if (NULL == (th = h->pending_head))
-    return;                     /* no requests pending */
-  GNUNET_assert (NULL == h->cth);
-  h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client_notifications,
-                                               th->msize,
-                                               GNUNET_TIME_absolute_get_remaining
-                                               (th->timeout), 
-                                               GNUNET_NO,
-                                               &request_start,
-                                               h);
+    return;
+  }
+  if (NULL != h->cth)
+  {
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
+#endif
+    return;
+  }
+  if (h->control_pending_head != NULL)
+    msize =
+        ntohs (((struct GNUNET_MessageHeader *) &h->
+                control_pending_head[1])->size);
+  else if (h->ready_peer_head != NULL)
+    msize =
+        h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
+  else
+  {
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Request queue empty, not processing queue\n");
+#endif
+    return;                     /* no pending message */
+  }
+  h->cth =
+      GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           GNUNET_NO, &transmit_message, h);
 }
 
 
@@ -388,351 +774,439 @@ static void
 main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_CORE_Handle *h = cls;
-  unsigned int hpos;
+  const struct InitReplyMessage *m;
   const struct ConnectNotifyMessage *cnm;
   const struct DisconnectNotifyMessage *dnm;
   const struct NotifyTrafficMessage *ntm;
   const struct GNUNET_MessageHeader *em;
-  const struct PeerStatusNotifyMessage *psnm;
+  const struct SendMessageReady *smr;
+  const struct GNUNET_CORE_MessageHandler *mh;
+  const struct GNUNET_ATS_Information *ats;
+  GNUNET_CORE_StartupCallback init;
+  struct PeerRecord *pr;
+  struct GNUNET_CORE_TransmitHandle *th;
+  unsigned int hpos;
+  int trigger;
   uint16_t msize;
   uint16_t et;
-  const struct GNUNET_CORE_MessageHandler *mh;
+  uint32_t ats_count;
 
   if (msg == NULL)
+  {
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         _
+         ("Client was disconnected from core service, trying to reconnect.\n"));
+    reconnect_later (h);
+    return;
+  }
+  msize = ntohs (msg->size);
+#if DEBUG_CORE > 2
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Processing message of type %u and size %u from core service\n",
+       ntohs (msg->type), msize);
+#endif
+  switch (ntohs (msg->type))
+  {
+  case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
+    if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  _
-                  ("Client was disconnected from core service, trying to reconnect.\n"));
-      reconnect (h);
+      GNUNET_break (0);
+      reconnect_later (h);
       return;
     }
-  msize = ntohs (msg->size);
+    m = (const struct InitReplyMessage *) msg;
+    GNUNET_break (0 == ntohl (m->reserved));
+    /* start our message processing loop */
+    if (GNUNET_YES == h->currently_down)
+    {
+      h->currently_down = GNUNET_NO;
+      trigger_next_request (h, GNUNET_NO);
+    }
+    h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+    h->me = m->my_identity;
+    if (NULL != (init = h->init))
+    {
+      /* mark so we don't call init on reconnect */
+      h->init = NULL;
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Processing message of type %u and size %u from core service\n",
-              ntohs (msg->type), msize);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
+           GNUNET_i2s (&h->me));
 #endif
-  switch (ntohs (msg->type))
+      init (h->cls, h, &h->me);
+    }
+    else
     {
-    case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
-      if (NULL == h->connects)
-        {
-          GNUNET_break (0);
-          break;
-        }
-      if (msize != sizeof (struct ConnectNotifyMessage))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      cnm = (const struct ConnectNotifyMessage *) msg;
-      h->connects (h->cls,
-                  &cnm->peer,
-                  GNUNET_TIME_relative_ntoh (cnm->latency),
-                  ntohl (cnm->distance));
-      break;
-    case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
-      if (NULL == h->disconnects)
-        {
-          GNUNET_break (0);
-          break;
-        }
-      if (msize != sizeof (struct DisconnectNotifyMessage))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      dnm = (const struct DisconnectNotifyMessage *) msg;
-      h->disconnects (h->cls,
-                     &dnm->peer);
-      break;
-    case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
-      if (NULL == h->status_events)
-        {
-          GNUNET_break (0);
-          break;
-        }
-      if (msize != sizeof (struct PeerStatusNotifyMessage))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      psnm = (const struct PeerStatusNotifyMessage *) msg;
-      h->status_events (h->cls,
-                       &psnm->peer,
-                       GNUNET_TIME_relative_ntoh (psnm->latency),
-                       ntohl (psnm->distance),
-                       psnm->bandwidth_in,
-                       psnm->bandwidth_out,
-                       GNUNET_TIME_absolute_ntoh (psnm->timeout));
-      break;
-    case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
-      if (msize <
-          sizeof (struct NotifyTrafficMessage) +
-          sizeof (struct GNUNET_MessageHeader))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      ntm = (const struct NotifyTrafficMessage *) msg;
-      em = (const struct GNUNET_MessageHeader *) &ntm[1];
 #if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received message of type %u and size %u from peer `%4s'\n",
-                  ntohs (em->type), 
-                 ntohs (em->size),
-                 GNUNET_i2s (&ntm->peer));
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Successfully reconnected to core service.\n");
 #endif
-      if ((GNUNET_NO == h->inbound_hdr_only) &&
-          (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      et = ntohs (em->type);
-      for (hpos = 0; hpos < h->hcnt; hpos++)
-        {
-          mh = &h->handlers[hpos];
-          if (mh->type != et)
-            continue;
-          if ((mh->expected_size != ntohs (em->size)) &&
-              (mh->expected_size != 0))
-            {
-              GNUNET_break (0);
-              continue;
-            }
-          if (GNUNET_OK !=
-              h->handlers[hpos].callback (h->cls, &ntm->peer, em,
-                                         GNUNET_TIME_relative_ntoh (ntm->latency),
-                                         ntohl (ntm->distance)))
-            {
-              /* error in processing, disconnect ! */
-              reconnect (h);
-              return;
-            }
-        }
-      if (NULL != h->inbound_notify)
-        h->inbound_notify (h->cls, &ntm->peer, em,
-                          GNUNET_TIME_relative_ntoh (ntm->latency),
-                          ntohl (ntm->distance));
-      break;
-    case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
-      if (msize <
-          sizeof (struct NotifyTrafficMessage) +
-          sizeof (struct GNUNET_MessageHeader))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      ntm = (const struct NotifyTrafficMessage *) msg;
-      em = (const struct GNUNET_MessageHeader *) &ntm[1];
-      if ((GNUNET_NO == h->outbound_hdr_only) &&
-          (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
-        {
-          GNUNET_break (0);
-          break;
-        }
-      if (NULL == h->outbound_notify)
-        {
-          GNUNET_break (0);
-          break;
-        }
-      h->outbound_notify (h->cls, &ntm->peer, em,
-                         GNUNET_TIME_relative_ntoh (ntm->latency),
-                         ntohl (ntm->distance));
-      break;
-    default:
+    }
+    /* fake 'connect to self' */
+    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
+    GNUNET_assert (pr == NULL);
+    pr = GNUNET_malloc (sizeof (struct PeerRecord));
+    pr->peer = h->me;
+    pr->ch = h;
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multihashmap_put (h->peers,
+                                                      &h->me.hashPubKey, pr,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+    if (NULL != h->connects)
+      h->connects (h->cls, &h->me, NULL, 0);
+    break;
+  case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
+    if (msize < sizeof (struct ConnectNotifyMessage))
+    {
       GNUNET_break (0);
-      break;
+      reconnect_later (h);
+      return;
     }
-  GNUNET_CLIENT_receive (h->client_notifications,
-                         &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL);
-}
-
-
-/**
- * Function called when we are ready to transmit our
- * "START" message (or when this operation timed out).
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t transmit_start (void *cls, size_t size, void *buf);
-
-
-/**
- * Function called on the first message received from
- * the service (contains our public key, etc.).
- * Should trigger calling the init callback
- * and then start our regular message processing.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-init_reply_handler (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct GNUNET_CORE_Handle *h = cls;
-  const struct InitReplyMessage *m;
-  GNUNET_CORE_StartupCallback init;
-  struct GNUNET_PeerIdentity my_identity;
-
-  if ((msg == NULL) ||
-      (ntohs (msg->size) != sizeof (struct InitReplyMessage)) ||
-      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY))
-    {
-      if (msg != NULL)
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                     _
-                     ("Error connecting to core service (failed to receive `%s' message, got message of type %u and size %u).\n"),
-                     "INIT_REPLY",
-                     ntohs (msg->type),
-                     ntohs (msg->size));
-         GNUNET_break (0);
-       }
-      else
-       {
+    cnm = (const struct ConnectNotifyMessage *) msg;
+    ats_count = ntohl (cnm->ats_count);
+    if (msize !=
+        sizeof (struct ConnectNotifyMessage) +
+        ats_count * sizeof (struct GNUNET_ATS_Information))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received notification about connection from `%s'.\n",
+         GNUNET_i2s (&cnm->peer));
+#endif
+    if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      /* connect to self!? */
+      GNUNET_break (0);
+      return;
+    }
+    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
+    if (pr != NULL)
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    pr = GNUNET_malloc (sizeof (struct PeerRecord));
+    pr->peer = cnm->peer;
+    pr->ch = h;
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multihashmap_put (h->peers,
+                                                      &cnm->peer.hashPubKey, pr,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+    ats = (const struct GNUNET_ATS_Information *) &cnm[1];
+    if (NULL != h->connects)
+      h->connects (h->cls, &cnm->peer, ats, ats_count);
+    break;
+  case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
+    if (msize != sizeof (struct DisconnectNotifyMessage))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    dnm = (const struct DisconnectNotifyMessage *) msg;
+    if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      /* connection to self!? */
+      GNUNET_break (0);
+      return;
+    }
+    GNUNET_break (0 == ntohl (dnm->reserved));
 #if DEBUG_CORE
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     _("Failed to connect to core service, will retry.\n"));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received notification about disconnect from `%s'.\n",
+         GNUNET_i2s (&dnm->peer));
 #endif
-       }
-      transmit_start (h, 0, NULL);
+    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
+    if (pr == NULL)
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
       return;
     }
-  m = (const struct InitReplyMessage *) msg;
-  /* start our message processing loop */
+    trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
+               (h->ready_peer_head == pr));
+    disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
+    if (trigger)
+      trigger_next_request (h, GNUNET_NO);
+    break;
+  case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
+    if (msize < sizeof (struct NotifyTrafficMessage))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    ntm = (const struct NotifyTrafficMessage *) msg;
+
+    ats_count = ntohl (ntm->ats_count);
+    if ((msize <
+         sizeof (struct NotifyTrafficMessage) +
+         ats_count * sizeof (struct GNUNET_ATS_Information) +
+         sizeof (struct GNUNET_MessageHeader)) ||
+        (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Successfully connected to core service, starting processing loop.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received message of type %u and size %u from peer `%4s'\n",
+         ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
 #endif
-  h->currently_down = GNUNET_NO;
-  trigger_next_request (h);
-  GNUNET_CLIENT_receive (h->client_notifications,
-                         &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL);
-  if (NULL != (init = h->init))
+    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
+    if (pr == NULL)
     {
-      /* mark so we don't call init on reconnect */
-      h->init = NULL;
-      GNUNET_CRYPTO_hash (&m->publicKey,
-                          sizeof (struct
-                                  GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
-                          &my_identity.hashPubKey);
-      init (h->cls, h, &my_identity, &m->publicKey);
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    if ((GNUNET_NO == h->inbound_hdr_only) &&
+        (msize !=
+         ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
+         +ats_count * sizeof (struct GNUNET_ATS_Information)))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    et = ntohs (em->type);
+    for (hpos = 0; hpos < h->hcnt; hpos++)
+    {
+      mh = &h->handlers[hpos];
+      if (mh->type != et)
+        continue;
+      if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                    "Unexpected message size %u for message of type %u from peer `%4s'\n",
+                    htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
+        GNUNET_break_op (0);
+        continue;
+      }
+      if (GNUNET_OK !=
+          h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats,
+                                      ats_count))
+      {
+        /* error in processing, do not process other messages! */
+        break;
+      }
+    }
+    if (NULL != h->inbound_notify)
+      h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
+    break;
+  case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
+    if (msize < sizeof (struct NotifyTrafficMessage))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
     }
+    ntm = (const struct NotifyTrafficMessage *) msg;
+    if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      /* self-change!? */
+      GNUNET_break (0);
+      return;
+    }
+    ats_count = ntohl (ntm->ats_count);
+    if ((msize <
+         sizeof (struct NotifyTrafficMessage) +
+         ats_count * sizeof (struct GNUNET_ATS_Information) +
+         sizeof (struct GNUNET_MessageHeader)) ||
+        (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
+    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
+    if (pr == NULL)
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received notification about transmission to `%s'.\n",
+         GNUNET_i2s (&ntm->peer));
+#endif
+    if ((GNUNET_NO == h->outbound_hdr_only) &&
+        (msize !=
+         ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
+         ats_count * sizeof (struct GNUNET_ATS_Information)))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    if (NULL == h->outbound_notify)
+    {
+      GNUNET_break (0);
+      break;
+    }
+    h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
+    break;
+  case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
+    if (msize != sizeof (struct SendMessageReady))
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    smr = (const struct SendMessageReady *) msg;
+    pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
+    if (pr == NULL)
+    {
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Received notification about transmission readiness to `%s'.\n",
+         GNUNET_i2s (&smr->peer));
+#endif
+    if (pr->pending_head == NULL)
+    {
+      /* request must have been cancelled between the original request
+       * and the response from core, ignore core's readiness */
+      break;
+    }
+
+    th = pr->pending_head;
+    if (ntohs (smr->smr_id) != th->smr_id)
+    {
+      /* READY message is for expired or cancelled message,
+       * ignore! (we should have already sent another request) */
+      break;
+    }
+    if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
+    {
+      /* we should not already be on the ready list... */
+      GNUNET_break (0);
+      reconnect_later (h);
+      return;
+    }
+    GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr);
+    trigger_next_request (h, GNUNET_NO);
+    break;
+  default:
+    reconnect_later (h);
+    return;
+  }
+  GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
+                         GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
 
+/**
+ * Task executed once we are done transmitting the INIT message.
+ * Starts our 'receive' loop.
+ *
+ * @param cls the 'struct GNUNET_CORE_Handle'
+ * @param success were we successful
+ */
 static void
-reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+init_done_task (void *cls, int success)
 {
   struct GNUNET_CORE_Handle *h = cls;
-  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  reconnect (h);
+
+  if (success == GNUNET_SYSERR)
+    return;                     /* shutdown */
+  if (success == GNUNET_NO)
+  {
+#if DEBUG_CORE
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to exchange INIT with core, retrying\n");
+#endif
+    if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
+      reconnect_later (h);
+    return;
+  }
+  GNUNET_CLIENT_receive (h->client, &main_notify_handler, h,
+                         GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
 
 /**
- * Function called when we are ready to transmit our
- * "START" message (or when this operation timed out).
+ * Our current client connection went down.  Clean it up
+ * and try to reconnect!
  *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param h our handle to the core service
  */
-static size_t
-transmit_start (void *cls, size_t size, void *buf)
+static void
+reconnect (struct GNUNET_CORE_Handle *h)
 {
-  struct GNUNET_CORE_Handle *h = cls;
+  struct ControlMessage *cm;
   struct InitMessage *init;
-  uint16_t *ts;
-  uint16_t msize;
   uint32_t opt;
+  uint16_t msize;
+  uint16_t *ts;
   unsigned int hpos;
-  struct GNUNET_TIME_Relative delay;
 
-  h->cth = NULL;
-  if (size == 0)
-    {
-      if ((h->init == NULL) ||
-          (GNUNET_TIME_absolute_get ().value < h->startup_timeout.value))
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                      _("Failed to connect to core service, retrying.\n"));
-          delay = GNUNET_TIME_absolute_get_remaining (h->startup_timeout);
-          if ((h->init == NULL) || (delay.value > 1000))
-            delay = GNUNET_TIME_UNIT_SECONDS;
-          if (h->init == NULL)
-            h->startup_timeout =
-              GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES);
-          h->reconnect_task =
-            GNUNET_SCHEDULER_add_delayed (h->sched, 
-                                          delay, &reconnect_task, h);
-          return 0;
-        }
-      /* timeout on initial connect */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  _("Failed to connect to core service, giving up.\n"));
-      h->init (h->cls, NULL, NULL, NULL);
-      GNUNET_CORE_disconnect (h);
-      return 0;
-    }
+#if DEBUG_CORE
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n");
+#endif
+  GNUNET_assert (h->client == NULL);
+  GNUNET_assert (h->currently_down == GNUNET_YES);
+  h->client = GNUNET_CLIENT_connect ("core", h->cfg);
+  if (h->client == NULL)
+  {
+    reconnect_later (h);
+    return;
+  }
   msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
-  GNUNET_assert (size >= msize);
-  init = buf;
+  cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize);
+  cm->cont = &init_done_task;
+  cm->cont_cls = h;
+  init = (struct InitMessage *) &cm[1];
   init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
   init->header.size = htons (msize);
-  opt = GNUNET_CORE_OPTION_NOTHING;
-  if (h->connects != NULL)
-    opt |= GNUNET_CORE_OPTION_SEND_CONNECT;
-  if (h->disconnects != NULL)
-    opt |= GNUNET_CORE_OPTION_SEND_DISCONNECT;
-  if (h->status_events != NULL)
-    opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
+  opt = 0;
   if (h->inbound_notify != NULL)
-    {
-      if (h->inbound_hdr_only)
-        opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
-      else
-        opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
-    }
+  {
+    if (h->inbound_hdr_only)
+      opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND;
+    else
+      opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND;
+  }
   if (h->outbound_notify != NULL)
-    {
-      if (h->outbound_hdr_only)
-        opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
-      else
-        opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
-    }
+  {
+    if (h->outbound_hdr_only)
+      opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND;
+    else
+      opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
+  }
   init->options = htonl (opt);
   ts = (uint16_t *) & init[1];
   for (hpos = 0; hpos < h->hcnt; hpos++)
     ts[hpos] = htons (h->handlers[hpos].type);
-  GNUNET_CLIENT_receive (h->client_notifications,
-                         &init_reply_handler,
-                         h,
-                         GNUNET_TIME_absolute_get_remaining
-                         (h->startup_timeout));
-  return sizeof (struct InitMessage) + h->hcnt * sizeof (uint16_t);
+  GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail,
+                               cm);
+  trigger_next_request (h, GNUNET_YES);
 }
 
 
+
 /**
  * Connect to the core service.  Note that the connection may
  * complete (or fail) asynchronously.
  *
- * @param sched scheduler to use
  * @param cfg configuration to use
- * @param timeout after how long should we give up trying to connect to the core service?
+ * @param queue_size size of the per-peer message queue
  * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
  * @param init callback to call on timeout or once we have successfully
  *        connected to the core service; note that timeout is only meaningful if init is not NULL
  * @param connects function to call on peer connect, can be NULL
  * @param disconnects function to call on peer disconnect / timeout, can be NULL
- * @param status_events function to call on changes to peer connection status, can be NULL
  * @param inbound_notify function to call for all inbound messages, can be NULL
  * @param inbound_hdr_only set to GNUNET_YES if inbound_notify will only read the
  *                GNUNET_MessageHeader and hence we do not need to give it the full message;
@@ -746,14 +1220,11 @@ transmit_start (void *cls, size_t size, void *buf)
  *                NULL on error (in this case, init is never called)
  */
 struct GNUNET_CORE_Handle *
-GNUNET_CORE_connect (struct GNUNET_SCHEDULER_Handle *sched,
-                     const struct GNUNET_CONFIGURATION_Handle *cfg,
-                     struct GNUNET_TIME_Relative timeout,
-                     void *cls,
+GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                     unsigned int queue_size, void *cls,
                      GNUNET_CORE_StartupCallback init,
                      GNUNET_CORE_ConnectEventHandler connects,
                      GNUNET_CORE_DisconnectEventHandler disconnects,
-                    GNUNET_CORE_PeerStatusEventHandler status_events,
                      GNUNET_CORE_MessageCallback inbound_notify,
                      int inbound_hdr_only,
                      GNUNET_CORE_MessageCallback outbound_notify,
@@ -763,154 +1234,110 @@ GNUNET_CORE_connect (struct GNUNET_SCHEDULER_Handle *sched,
   struct GNUNET_CORE_Handle *h;
 
   h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
-  h->sched = sched;
   h->cfg = cfg;
+  h->queue_size = queue_size;
   h->cls = cls;
   h->init = init;
   h->connects = connects;
   h->disconnects = disconnects;
-  h->status_events = status_events;
   h->inbound_notify = inbound_notify;
   h->outbound_notify = outbound_notify;
   h->inbound_hdr_only = inbound_hdr_only;
   h->outbound_hdr_only = outbound_hdr_only;
   h->handlers = handlers;
-  h->client_notifications = GNUNET_CLIENT_connect (sched, "core", cfg);
-  if (h->client_notifications == NULL)
-    {
-      GNUNET_free (h);
-      return NULL;
-    }
-  h->startup_timeout = GNUNET_TIME_relative_to_absolute (timeout);
   h->hcnt = 0;
-  while (handlers[h->hcnt].callback != NULL)
-    h->hcnt++;
+  h->currently_down = GNUNET_YES;
+  h->peers = GNUNET_CONTAINER_multihashmap_create (128);
+  h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  if (NULL != handlers)
+    while (handlers[h->hcnt].callback != NULL)
+      h->hcnt++;
   GNUNET_assert (h->hcnt <
                  (GNUNET_SERVER_MAX_MESSAGE_SIZE -
                   sizeof (struct InitMessage)) / sizeof (uint16_t));
 #if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Trying to connect to core service in next %llu ms.\n",
-              timeout.value);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
 #endif
-  h->cth =
-    GNUNET_CLIENT_notify_transmit_ready (h->client_notifications,
-                                         sizeof (struct InitMessage) +
-                                         sizeof (uint16_t) * h->hcnt, timeout,
-                                        GNUNET_YES,
-                                         &transmit_start, h);
+  reconnect (h);
   return h;
 }
 
 
 /**
- * Disconnect from the core service.
+ * Disconnect from the core service.  This function can only
+ * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
+ * requests have been explicitly canceled.
  *
  * @param handle connection to core to disconnect
  */
 void
 GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
 {
+  struct ControlMessage *cm;
+
+#if DEBUG_CORE
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
+#endif
   if (handle->cth != NULL)
+  {
     GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
-  if (handle->solicit_transmit_req != NULL)
-    GNUNET_CORE_notify_transmit_ready_cancel (handle->solicit_transmit_req);
+    handle->cth = NULL;
+  }
+  while (NULL != (cm = handle->control_pending_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
+                                 handle->control_pending_tail, cm);
+    if (cm->th != NULL)
+      cm->th->cm = NULL;
+    if (cm->cont != NULL)
+      cm->cont (cm->cont_cls, GNUNET_SYSERR);
+    GNUNET_free (cm);
+  }
+  if (handle->client != NULL)
+  {
+    GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
+    handle->client = NULL;
+  }
+  GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
+                                         &disconnect_and_free_peer_entry,
+                                         handle);
   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
-  if (handle->client_notifications != NULL)
-    GNUNET_CLIENT_disconnect (handle->client_notifications, GNUNET_NO);
-  GNUNET_break (handle->pending_head == NULL);
-  GNUNET_free_non_null (handle->solicit_buffer);
+  {
+    GNUNET_SCHEDULER_cancel (handle->reconnect_task);
+    handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
+  handle->peers = NULL;
+  GNUNET_break (handle->ready_peer_head == NULL);
   GNUNET_free (handle);
 }
 
 
 /**
- * Build the message requesting data transmission.
+ * Task that calls 'request_next_transmission'.
+ *
+ * @param cls the 'struct PeerRecord*'
+ * @param tc scheduler context
  */
-static size_t
-produce_send (void *cls, size_t size, void *buf)
+static void
+run_request_next_transmission (void *cls,
+                               const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_CORE_TransmitHandle *th = cls;
-  struct GNUNET_CORE_Handle *h;
-  struct SendMessage *sm;
-  size_t dt;
-  GNUNET_CONNECTION_TransmitReadyNotify notify;
-  void *notify_cls;
+  struct PeerRecord *pr = cls;
 
-  h = th->ch;
-  if (buf == NULL)
-    {
-      /* timeout or error */
-#if DEBUG_CORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "P2P transmission request for `%4s' timed out.\n",
-                 GNUNET_i2s(&th->peer));
-#endif
-      GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-      GNUNET_CORE_notify_transmit_ready_cancel (th);
-      if ((h->pending_head == th) && (h->cth != NULL)) /* Request hasn't been canceled yet! */
-        {
-          GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
-          h->cth = NULL;
-          trigger_next_request (h);
-        }
-      /* Otherwise this request timed out, but another is actually queued for sending, so don't try to send another! */
-      return 0;
-    }
-  sm = (struct SendMessage *) buf;
-  sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
-  sm->priority = htonl (th->priority);
-  sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
-  sm->peer = th->peer;
-  notify = th->notify;
-  notify_cls = th->notify_cls;
-  GNUNET_CORE_notify_transmit_ready_cancel (th);
-  trigger_next_request (h);
-  size = GNUNET_MIN (size,
-                    GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
-  GNUNET_assert (size >= sizeof (struct SendMessage));
-  dt = notify (notify_cls, size - sizeof (struct SendMessage), &sm[1]);
-  if (0 == dt)
-    {
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Size of clients message to peer %s is 0!\n",
-              GNUNET_i2s(&th->peer));
-#endif
-      /* client decided to send nothing! */
-      return 0;
-    }
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Produced SEND message to core with %u bytes payload\n",
-             dt);
-#endif
-  GNUNET_assert (dt >= sizeof (struct GNUNET_MessageHeader));
-  if (dt + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
-    {
-      GNUNET_break (0);
-      return 0;
-    }
-#if DEBUG_CORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Preparing for P2P transmission of %u bytes to `%4s'.\n",
-             dt,
-             GNUNET_i2s(&th->peer));
-#endif
-  sm->header.size = htons (dt + sizeof (struct SendMessage));
-  GNUNET_assert (dt + sizeof (struct SendMessage) <= size);
-  return dt + sizeof (struct SendMessage);
+  pr->ntr_task = GNUNET_SCHEDULER_NO_TASK;
+  request_next_transmission (pr);
 }
 
 
 /**
  * Ask the core to call "notify" once it is ready to transmit the
- * given number of bytes to the specified "target".  If we are not yet
- * connected to the specified peer, a call to this function will cause
- * us to try to establish a connection.
+ * given number of bytes to the specified "target".    Must only be
+ * called after a connection to the respective peer has been
+ * established (and the client has been informed about this).
  *
  * @param handle connection to core service
+ * @param cork is corking allowed for this transmission?
  * @param priority how important is the message?
  * @param maxdelay how long can the message wait?
  * @param target who should receive the message,
@@ -923,39 +1350,105 @@ produce_send (void *cls, size_t size, void *buf)
  *         memory); if NULL is returned, "notify" will NOT be called.
  */
 struct GNUNET_CORE_TransmitHandle *
-GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
-                                   unsigned int priority,
+GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
+                                   uint32_t priority,
                                    struct GNUNET_TIME_Relative maxdelay,
                                    const struct GNUNET_PeerIdentity *target,
                                    size_t notify_size,
                                    GNUNET_CONNECTION_TransmitReadyNotify notify,
                                    void *notify_cls)
 {
+  struct PeerRecord *pr;
   struct GNUNET_CORE_TransmitHandle *th;
-
+  struct GNUNET_CORE_TransmitHandle *pos;
+  struct GNUNET_CORE_TransmitHandle *prev;
+  struct GNUNET_CORE_TransmitHandle *minp;
+
+  pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey);
+  if (NULL == pr)
+  {
+    /* attempt to send to peer that is not connected */
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Attempting to send to peer `%s' from peer `%s', but not connected!\n",
+         GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey));
+    GNUNET_break (0);
+    return NULL;
+  }
   GNUNET_assert (notify_size + sizeof (struct SendMessage) <
                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
   th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
-  th->ch = handle;
-  GNUNET_CONTAINER_DLL_insert_after (handle->pending_head,
-                                    handle->pending_tail,
-                                    handle->pending_tail,
-                                    th);
-  th->get_message = &produce_send;
-  th->get_message_cls = th;
-  th->notify = notify;
-  th->notify_cls = notify_cls;
-  th->peer = *target;
+  th->peer = pr;
+  GNUNET_assert (NULL != notify);
+  th->get_message = notify;
+  th->get_message_cls = notify_cls;
   th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
-  th->timeout_task = GNUNET_SCHEDULER_add_delayed (handle->sched,
-                                                   maxdelay,
-                                                   &timeout_request, th);
   th->priority = priority;
-  th->msize = sizeof (struct SendMessage) + notify_size;
+  th->msize = notify_size;
+  th->cork = cork;
+  /* bound queue size */
+  if (pr->queue_size == handle->queue_size)
+  {
+    /* find lowest-priority entry, but skip the head of the list */
+    minp = pr->pending_head->next;
+    prev = minp;
+    while (prev != NULL)
+    {
+      if (prev->priority < minp->priority)
+        minp = prev;
+      prev = prev->next;
+    }
+    if (minp == NULL)
+    {
+      GNUNET_break (handle->queue_size != 0);
+      GNUNET_break (pr->queue_size == 1);
+      GNUNET_free (th);
+#if DEBUG_CORE
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Dropping transmission request: cannot drop queue head and limit is one\n");
+#endif
+      return NULL;
+    }
+    if (priority <= minp->priority)
+    {
+#if DEBUG_CORE
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Dropping transmission request: priority too low\n");
+#endif
+      GNUNET_free (th);
+      return NULL;              /* priority too low */
+    }
+    GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp);
+    pr->queue_size--;
+    GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL));
+    GNUNET_free (minp);
+  }
+
+  /* Order entries by deadline, but SKIP 'HEAD' (as we may have transmitted
+   * that request already or might even already be approved to transmit that
+   * message to core) */
+  pos = pr->pending_head;
+  if (pos != NULL)
+    pos = pos->next;            /* skip head */
+
+  /* insertion sort */
+  prev = pos;
+  while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
+  {
+    prev = pos;
+    pos = pos->next;
+  }
+  GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev,
+                                     th);
+  pr->queue_size++;
   /* was the request queue previously empty? */
-  if ( (handle->pending_head == th) &&
-       (handle->cth == NULL) )
-    trigger_next_request (handle);
+#if DEBUG_CORE
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
+#endif
+  if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
+      (pr->next == NULL) && (pr->prev == NULL) &&
+      (handle->ready_peer_head != pr))
+    pr->ntr_task =
+        GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr);
   return th;
 }
 
@@ -966,20 +1459,34 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
  * @param th handle that was returned by "notify_transmit_ready".
  */
 void
-GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
-                                          *th)
+GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
 {
-  struct GNUNET_CORE_Handle *h = th->ch;
-  
-  if (h->submitted == th)
-    h->submitted = NULL;
-  else    
-    GNUNET_CONTAINER_DLL_remove (h->pending_head,
-                                h->pending_tail,
-                                th);    
-  if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (h->sched, th->timeout_task);
+  struct PeerRecord *pr = th->peer;
+  struct GNUNET_CORE_Handle *h = pr->ch;
+  int was_head;
+
+  was_head = (pr->pending_head == th);
+  GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
+  pr->queue_size--;
+  if (th->cm != NULL)
+  {
+    /* we're currently in the control queue, remove */
+    GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
+                                 h->control_pending_tail, th->cm);
+    GNUNET_free (th->cm);
+  }
   GNUNET_free (th);
+  if (was_head)
+  {
+    if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
+    {
+      /* the request that was 'approved' by core was
+       * canceled before it could be transmitted; remove
+       * us from the 'ready' list */
+      GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
+    }
+    request_next_transmission (pr);
+  }
 }