continue to fix extract result
[oweals/gnunet.git] / src / dv / dv_api.c
index 182f9ab72dee4e5e267f8ef4d4ac3467c94203a2..e3ba995c3bfce277420bd54f9fb519bbe46490eb 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009--2013 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
  * @author Nathan Evans
  */
 #include "platform.h"
-#include "gnunet_bandwidth_lib.h"
-#include "gnunet_client_lib.h"
-#include "gnunet_constants.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_arm_service.h"
-#include "gnunet_hello_lib.h"
-#include "gnunet_protocols.h"
-#include "gnunet_server_lib.h"
-#include "gnunet_time_lib.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_dv_service.h"
+#include "gnunet_protocols.h"
 #include "dv.h"
+#include "gnunet_transport_plugin.h"
+
+#define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__)
+
+
+/**
+ * Information we track for each peer.
+ */
+struct ConnectedPeer;
 
 
-struct PendingMessages
+/**
+ * Handle for a send operation.
+ */
+struct GNUNET_DV_TransmitHandle
 {
   /**
-   * Linked list of pending messages
+   * Kept in a DLL.
    */
-  struct PendingMessages *next;
+  struct GNUNET_DV_TransmitHandle *next;
 
   /**
-   * Message that is pending
+   * Kept in a DLL.
    */
-  struct GNUNET_DV_SendMessage *msg;
+  struct GNUNET_DV_TransmitHandle *prev;
 
   /**
-   * Timeout for this message
+   * Handle to the service.
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct GNUNET_DV_ServiceHandle *sh;
 
-};
+  /**
+   * Function to call upon completion.
+   */
+  GNUNET_DV_MessageSentCallback cb;
+
+  /**
+   * Closure for @a cb.
+   */
+  void *cb_cls;
+
+  /**
+   * The actual message (allocated at the end of this struct).
+   */
+  const struct GNUNET_MessageHeader *msg;
+
+  /**
+   * Destination for the message.
+   */
+  struct ConnectedPeer *target;
+
+  /**
+   * UID of our message, if any.
+   */
+  uint32_t uid;
 
+};
 
 
 /**
- * Handle for the service.
+ * Information we track for each peer.
  */
-struct GNUNET_DV_Handle
+struct ConnectedPeer
 {
+
   /**
-   * Our scheduler.
+   * Identity of the peer.
    */
-  struct GNUNET_SCHEDULER_Handle *sched;
+  struct GNUNET_PeerIdentity pid;
 
   /**
-   * Configuration to use.
+   * Head of DLL of transmission handles where we need
+   * to invoke a continuation when we are informed about
+   * successful transmission.  The respective request
+   * has already been sent to the DV service.
    */
-  const struct GNUNET_CONFIGURATION_Handle *cfg;
+  struct GNUNET_DV_TransmitHandle *head;
 
   /**
-   * Socket (if available).
+   * Tail of DLL of transmission handles where we need
+   * to invoke a continuation when we are informed about
+   * successful transmission.  The respective request
+   * has already been sent to the DV service.
+   */
+  struct GNUNET_DV_TransmitHandle *tail;
+
+};
+
+
+/**
+ * Handle to the DV service.
+ */
+struct GNUNET_DV_ServiceHandle
+{
+
+  /**
+   * Connection to DV service.
    */
   struct GNUNET_CLIENT_Connection *client;
 
   /**
-   * Currently pending transmission request.
+   * Active request for transmission to DV service.
    */
   struct GNUNET_CLIENT_TransmitHandle *th;
 
   /**
-   * List of the currently pending messages for the DV service.
+   * Our configuration.
    */
-  struct PendingMessages *pending_list;
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Message we are currently sending.
+   * Closure for the callbacks.
    */
-  struct PendingMessages *current;
+  void *cls;
 
   /**
-   * Kill off the connection and any pending messages.
+   * Function to call on connect events.
    */
-  int do_destroy;
+  GNUNET_DV_ConnectCallback connect_cb;
 
   /**
-   * Handler for messages we receive from the DV service
+   * Function to call on distance change events.
    */
-  GNUNET_DV_MessageReceivedHandler receive_handler;
+  GNUNET_DV_DistanceChangedCallback distance_cb;
 
   /**
-   * Closure for the receive handler
+   * Function to call on disconnect events.
    */
-  void *receive_cls;
+  GNUNET_DV_DisconnectCallback disconnect_cb;
 
-};
+  /**
+   * Function to call on receiving messages events.
+   */
+  GNUNET_DV_MessageReceivedCallback message_cb;
 
+  /**
+   * Head of messages to transmit.
+   */
+  struct GNUNET_DV_TransmitHandle *th_head;
 
-struct StartContext
-{
+  /**
+   * Tail of messages to transmit.
+   */
+  struct GNUNET_DV_TransmitHandle *th_tail;
 
   /**
-   * Start message
+   * Information tracked per connected peer.  Maps peer
+   * identities to `struct ConnectedPeer` entries.
    */
-  struct GNUNET_MessageHeader *message;
+  struct GNUNET_CONTAINER_MultiPeerMap *peers;
 
   /**
-   * Handle to service, in case of timeout
+   * Current unique ID
    */
-  struct GNUNET_DV_Handle *handle;
+  uint32_t uid_gen;
+
 };
 
 
 /**
- * Try to (re)connect to the dv service.
+ * Disconnect and then reconnect to the DV service.
  *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
+ * @param sh service handle
  */
-static int
-try_connect (struct GNUNET_DV_Handle *ret)
-{
-  if (ret->client != NULL)
-    return GNUNET_OK;
-  ret->client = GNUNET_CLIENT_connect (ret->sched, "dv", ret->cfg);
-  if (ret->client != NULL)
-    return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to the dv service!\n"));
-#endif
-  return GNUNET_NO;
-}
+static void
+reconnect (struct GNUNET_DV_ServiceHandle *sh);
 
-static void process_pending_message(struct GNUNET_DV_Handle *handle);
 
 /**
- * Send complete, schedule next
+ * Start sending messages from our queue to the service.
+ *
+ * @param sh service handle
  */
 static void
-finish (struct GNUNET_DV_Handle *handle, int code)
-{
-  struct PendingMessages *pos = handle->current;
-  handle->current = NULL;
-  process_pending_message (handle);
-
-  GNUNET_free (pos);
-}
+start_transmit (struct GNUNET_DV_ServiceHandle *sh);
 
 
+/**
+ * Gives a message from our queue to the DV service.
+ *
+ * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`)
+ * @param size how many bytes can we send
+ * @param buf where to copy the message to send
+ * @return how many bytes we copied to @a buf
+ */
 static size_t
 transmit_pending (void *cls, size_t size, void *buf)
 {
-  struct GNUNET_DV_Handle *handle = cls;
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  char *cbuf = buf;
+  struct GNUNET_DV_TransmitHandle *th;
   size_t ret;
   size_t tsize;
 
-#if DEBUG_DV
-  if (handle->current != NULL)
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending called with message type %d\n", ntohs(handle->current->msg->header.type));
-#endif
-
-  if (buf == NULL)
-    {
-#if DEBUG_DV
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n");
-#endif
-      finish(handle, GNUNET_SYSERR);
-      return 0;
-    }
-  handle->th = NULL;
-
+  sh->th = NULL;
+  if (NULL == buf)
+  {
+    reconnect (sh);
+    return 0;
+  }
   ret = 0;
-
-  if (handle->current != NULL)
+  while ( (NULL != (th = sh->th_head)) &&
+         (size - ret >= (tsize = ntohs (th->msg->size)) ))
   {
-    tsize = ntohs(handle->current->msg->header.size);
-    if (size >= tsize)
+    GNUNET_CONTAINER_DLL_remove (sh->th_head,
+                                sh->th_tail,
+                                th);
+    memcpy (&cbuf[ret], th->msg, tsize);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Passing %u bytes of type %u to DV service\n",
+         tsize,
+         ntohs (th->msg->type));
+    th->msg = NULL;
+    ret += tsize;
+    if (NULL != th->cb)
     {
-      memcpy(buf, handle->current->msg, tsize);
-#if DEBUG_DV
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Copied %d bytes into buffer!\n\n\n", tsize);
-#endif
-      finish(handle, GNUNET_OK);
-      return tsize;
+      GNUNET_CONTAINER_DLL_insert_tail (th->target->head,
+                                        th->target->tail,
+                                        th);
+    }
+    else
+    {
+      GNUNET_free (th);
     }
-
   }
-
+  if (NULL != sh->th_head)
+    start_transmit (sh);
   return ret;
 }
 
+
 /**
- * Try to send messages from list of messages to send
+ * Start sending messages from our queue to the service.
+ *
+ * @param sh service handle
  */
-static void process_pending_message(struct GNUNET_DV_Handle *handle)
+static void
+start_transmit (struct GNUNET_DV_ServiceHandle *sh)
 {
+  if (NULL != sh->th)
+    return;
+  if (NULL == sh->th_head)
+    return;
+  sh->th =
+    GNUNET_CLIENT_notify_transmit_ready (sh->client,
+                                        ntohs (sh->th_head->msg->size),
+                                        GNUNET_TIME_UNIT_FOREVER_REL,
+                                        GNUNET_NO,
+                                        &transmit_pending, sh);
+}
 
-  if (handle->current != NULL)
-    return;                     /* action already pending */
-  if (GNUNET_YES != try_connect (handle))
+
+/**
+ * We got disconnected from the service and thus all of the
+ * pending send callbacks will never be confirmed.  Clean up.
+ *
+ * @param cls the 'struct GNUNET_DV_ServiceHandle'
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+cleanup_send_cb (void *cls,
+                const struct GNUNET_PeerIdentity *key,
+                void *value)
+{
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  struct ConnectedPeer *peer = value;
+  struct GNUNET_DV_TransmitHandle *th;
+
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+                                                      key,
+                                                      peer));
+  sh->disconnect_cb (sh->cls,
+                     key);
+  while (NULL != (th = peer->head))
+  {
+    GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
+    th->cb (th->cb_cls, GNUNET_SYSERR);
+    GNUNET_free (th);
+  }
+  GNUNET_free (peer);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
+ */
+static void
+handle_message_receipt (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  const struct GNUNET_DV_ConnectMessage *cm;
+  const struct GNUNET_DV_DistanceUpdateMessage *dum;
+  const struct GNUNET_DV_DisconnectMessage *dm;
+  const struct GNUNET_DV_ReceivedMessage *rm;
+  const struct GNUNET_MessageHeader *payload;
+  const struct GNUNET_DV_AckMessage *ack;
+  struct GNUNET_DV_TransmitHandle *th;
+  struct GNUNET_DV_TransmitHandle *tn;
+  struct ConnectedPeer *peer;
+
+  if (NULL == msg)
+  {
+    /* Connection closed */
+    reconnect (sh);
+    return;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received message of type %u with %u bytes from DV service\n",
+       (unsigned int) ntohs (msg->type),
+       (unsigned int) ntohs (msg->size));
+  switch (ntohs (msg->type))
+  {
+  case GNUNET_MESSAGE_TYPE_DV_CONNECT:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage))
     {
-      finish (handle, GNUNET_SYSERR);
+      GNUNET_break (0);
+      reconnect (sh);
       return;
     }
-
-  /* schedule next action */
-  handle->current = handle->pending_list;
-  if (NULL == handle->current)
+    cm = (const struct GNUNET_DV_ConnectMessage *) msg;
+    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                              &cm->peer);
+    if (NULL != peer)
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    peer = GNUNET_new (struct ConnectedPeer);
+    peer->pid = cm->peer;
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multipeermap_put (sh->peers,
+                                                      &peer->pid,
+                                                      peer,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    sh->connect_cb (sh->cls,
+                   &cm->peer,
+                   ntohl (cm->distance),
+                    (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
+    break;
+  case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
+    sh->distance_cb (sh->cls,
+                    &dum->peer,
+                    ntohl (dum->distance),
+                     (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
+    break;
+  case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
+    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                              &dm->peer);
+    if (NULL == peer)
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    tn = sh->th_head;
+    while (NULL != (th = tn))
+    {
+      tn = th->next;
+      if (peer == th->target)
+      {
+        GNUNET_CONTAINER_DLL_remove (sh->th_head,
+                                     sh->th_tail,
+                                     th);
+        th->cb (th->cb_cls, GNUNET_SYSERR);
+        GNUNET_free (th);
+      }
+    }
+    cleanup_send_cb (sh, &dm->peer, peer);
+    break;
+  case GNUNET_MESSAGE_TYPE_DV_RECV:
+    if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof (struct GNUNET_MessageHeader))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
+    payload = (const struct GNUNET_MessageHeader *) &rm[1];
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    if (NULL ==
+        GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                           &rm->sender))
+    {
+      GNUNET_break (0);
+      reconnect (sh);
+      return;
+    }
+    sh->message_cb (sh->cls,
+                   &rm->sender,
+                   ntohl (rm->distance),
+                   payload);
+    break;
+  case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+  case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
+    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
     {
-      if (handle->do_destroy)
-        {
-          handle->do_destroy = GNUNET_NO;
-          //GNUNET_DV_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
-        }
+      GNUNET_break (0);
+      reconnect (sh);
       return;
     }
-  handle->pending_list = handle->pending_list->next;
-  handle->current->next = NULL;
-
-  if (NULL ==
-      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                                    ntohs(handle->current->msg->msgbuf_size),
-                                                    handle->current->msg->timeout,
-                                                    GNUNET_YES,
-                                                    &transmit_pending, handle)))
+    ack = (const struct GNUNET_DV_AckMessage *) msg;
+    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                              &ack->target);
+    if (NULL == peer)
+      break; /* this happens, just ignore */
+    for (th = peer->head; NULL != th; th = th->next)
     {
-#if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to dv service.\n");
-#endif
-      finish (handle, GNUNET_SYSERR);
+      if (th->uid != ntohl (ack->uid))
+        continue;
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Matched ACK for message to peer %s\n",
+           GNUNET_i2s (&ack->target));
+      GNUNET_CONTAINER_DLL_remove (peer->head,
+                                   peer->tail,
+                                   th);
+      th->cb (th->cb_cls,
+              (ntohs (ack->header.type) == GNUNET_MESSAGE_TYPE_DV_SEND_ACK)
+              ? GNUNET_OK
+              : GNUNET_SYSERR);
+      GNUNET_free (th);
+      break;
     }
+    break;
+  default:
+    reconnect (sh);
+    break;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received message, continuing receive loop for %p\n",
+       sh->client);
+  GNUNET_CLIENT_receive (sh->client,
+                        &handle_message_receipt, sh,
+                         GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
+
 /**
- * Add a pending message to the linked list
+ * Transmit the start message to the DV service.
  *
- * @param handle handle to the specified DV api
- * @param msg the message to add to the list
+ * @param cls the `struct GNUNET_DV_ServiceHandle *`
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes written to buf
  */
-static void add_pending(struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg)
+static size_t
+transmit_start (void *cls,
+               size_t size,
+               void *buf)
 {
-  struct PendingMessages *new_message;
-  struct PendingMessages *pos;
-  struct PendingMessages *last;
-
-  new_message = GNUNET_malloc(sizeof(struct PendingMessages));
-  new_message->msg = msg;
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  struct GNUNET_MessageHeader start_message;
 
-  if (handle->pending_list != NULL)
-    {
-      pos = handle->pending_list;
-      while(pos != NULL)
-        {
-          last = pos;
-          pos = pos->next;
-        }
-      new_message->next = last->next; /* Should always be null */
-      last->next = new_message;
-    }
-  else
-    {
-      new_message->next = handle->pending_list; /* Will always be null */
-      handle->pending_list = new_message;
-    }
-
-  process_pending_message(handle);
+  sh->th = NULL;
+  if (NULL == buf)
+  {
+    GNUNET_break (0);
+    reconnect (sh);
+    return 0;
+  }
+  GNUNET_assert (size >= sizeof (start_message));
+  start_message.size = htons (sizeof (struct GNUNET_MessageHeader));
+  start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START);
+  memcpy (buf, &start_message, sizeof (start_message));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Transmitting START request, starting receive loop for %p\n",
+       sh->client);
+  GNUNET_CLIENT_receive (sh->client,
+                        &handle_message_receipt, sh,
+                         GNUNET_TIME_UNIT_FOREVER_REL);
+  start_transmit (sh);
+  return sizeof (start_message);
 }
 
 
-void handle_message_receipt (void *cls,
-                             const struct GNUNET_MessageHeader * msg)
+/**
+ * Disconnect and then reconnect to the DV service.
+ *
+ * @param sh service handle
+ */
+static void
+reconnect (struct GNUNET_DV_ServiceHandle *sh)
 {
-  struct GNUNET_DV_Handle *handle = cls;
-  struct GNUNET_DV_MessageReceived *received_msg;
-  size_t packed_msg_len;
-  size_t sender_address_len;
-  char *sender_address;
-  char *packed_msg;
-  char *packed_msg_start;
-
-  if (msg == NULL)
+  if (NULL != sh->th)
   {
-    return; /* Connection closed? */
+    GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
+    sh->th = NULL;
   }
-
-  GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
-
-  if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Disconnecting from DV service at %p\n",
+       sh->client);
+  if (NULL != sh->client)
+  {
+    GNUNET_CLIENT_disconnect (sh->client);
+    sh->client = NULL;
+  }
+  GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
+                                        &cleanup_send_cb,
+                                        sh);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Connecting to DV service\n");
+  sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
+  if (NULL == sh->client)
+  {
+    GNUNET_break (0);
     return;
-
-  received_msg = (struct GNUNET_DV_MessageReceived *)msg;
-  packed_msg_len = ntohs(received_msg->msg_len);
-  sender_address_len = ntohs(received_msg->sender_address_len);
-
-  GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "dv api receives message, size checks out!\n");
-#endif
-  sender_address = GNUNET_malloc(sender_address_len);
-  memcpy(sender_address, &received_msg[1], sender_address_len);
-  packed_msg_start = (char *)&received_msg[1];
-  packed_msg = GNUNET_malloc(packed_msg_len);
-  memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
-
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type);
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "message sender reported as %s\n", GNUNET_i2s(&received_msg->sender));
-#endif
-  handle->receive_handler(handle->receive_cls,
-                          &received_msg->sender,
-                          packed_msg,
-                          packed_msg_len,
-                          ntohl(received_msg->distance),
-                          sender_address,
-                          sender_address_len);
-
-  GNUNET_free(sender_address);
-
-  GNUNET_CLIENT_receive (handle->client,
-                         &handle_message_receipt,
-                         handle, GNUNET_TIME_UNIT_FOREVER_REL);
+  }
+  sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
+                                               sizeof (struct GNUNET_MessageHeader),
+                                               GNUNET_TIME_UNIT_FOREVER_REL,
+                                               GNUNET_YES,
+                                               &transmit_start,
+                                               sh);
 }
 
+
 /**
- * Send a message from the plugin to the DV service indicating that
- * a message should be sent via DV to some peer.
- *
- * @param dv_handle the handle to the DV api
- * @param target the final target of the message
- * @param msgbuf the msg(s) to send
- * @param msgbuf_size the size of msgbuf
- * @param priority priority to pass on to core when sending the message
- * @param timeout how long can this message be delayed (pass through to core)
- * @param addr the address of this peer (internally known to DV)
- * @param addrlen the length of the peer address
+ * Connect to the DV service.
  *
+ * @param cfg configuration
+ * @param cls closure for callbacks
+ * @param connect_cb function to call on connects
+ * @param distance_cb function to call if distances change
+ * @param disconnect_cb function to call on disconnects
+ * @param message_cb function to call if we receive messages
+ * @return handle to access the service
  */
-int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle,
-                    const struct GNUNET_PeerIdentity *target,
-                    const char *msgbuf,
-                    size_t msgbuf_size,
-                    unsigned int priority,
-                    struct GNUNET_TIME_Relative timeout,
-                    const void *addr,
-                    size_t addrlen)
+struct GNUNET_DV_ServiceHandle *
+GNUNET_DV_service_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                          void *cls,
+                          GNUNET_DV_ConnectCallback connect_cb,
+                          GNUNET_DV_DistanceChangedCallback distance_cb,
+                          GNUNET_DV_DisconnectCallback disconnect_cb,
+                          GNUNET_DV_MessageReceivedCallback message_cb)
 {
-  struct GNUNET_DV_SendMessage *msg;
-  char *end_of_message;
-  /* FIXME: Copy message to end of thingy, can't just allocate dummy! */
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV SEND called with message of size %d, address size %d, total size to send is %d\n", msgbuf_size, addrlen, sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
-#endif
-  msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size);
-  msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size);
-  msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
-  memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity));
-  msg->msgbuf_size = htons(msgbuf_size);
-  msg->priority = htonl(priority);
-  msg->timeout = timeout;
-  msg->addrlen = htons(addrlen);
-  memcpy(&msg[1], addr, addrlen);
-  end_of_message = (char *)&msg[1];
-  end_of_message = &end_of_message[addrlen];
-  memcpy(end_of_message, msgbuf, msgbuf_size);
-  add_pending(dv_handle, msg);
-
-  return GNUNET_OK;
+  struct GNUNET_DV_ServiceHandle *sh;
+
+  sh = GNUNET_new (struct GNUNET_DV_ServiceHandle);
+  sh->cfg = cfg;
+  sh->cls = cls;
+  sh->connect_cb = connect_cb;
+  sh->distance_cb = distance_cb;
+  sh->disconnect_cb = disconnect_cb;
+  sh->message_cb = message_cb;
+  sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
+  reconnect (sh);
+  return sh;
 }
 
-/* Forward declaration */
-void GNUNET_DV_disconnect(struct GNUNET_DV_Handle *handle);
 
-static size_t
-transmit_start (void *cls, size_t size, void *buf)
+/**
+ * Disconnect from DV service.
+ *
+ * @param sh service handle
+ */
+void
+GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
 {
-  struct StartContext *start_context = cls;
-  struct GNUNET_DV_Handle *handle = start_context->handle;
-  size_t tsize;
-
-  if (buf == NULL)
-    {
-      GNUNET_free(start_context->message);
-      GNUNET_free(start_context);
-      GNUNET_DV_disconnect(handle);
-      return 0;
-    }
+  struct GNUNET_DV_TransmitHandle *pos;
 
-  tsize = ntohs(start_context->message->size);
-  if (size >= tsize)
+  if (NULL == sh)
+    return;
+  if (NULL != sh->th)
   {
-    memcpy(buf, start_context->message, tsize);
-    return tsize;
+    GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
+    sh->th = NULL;
   }
-
-  return 0;
+  while (NULL != (pos = sh->th_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (sh->th_head,
+                                sh->th_tail,
+                                pos);
+    GNUNET_free (pos);
+  }
+  if (NULL != sh->client)
+  {
+    GNUNET_CLIENT_disconnect (sh->client);
+    sh->client = NULL;
+  }
+  GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
+                                        &cleanup_send_cb,
+                                        sh);
+  GNUNET_CONTAINER_multipeermap_destroy (sh->peers);
+  GNUNET_free (sh);
 }
 
+
 /**
- * Connect to the DV service
- *
- * @param sched the scheduler to use
- * @param cfg the configuration to use
- * @param receive_handler method call when on receipt from the service
- * @param receive_handler_cls closure for receive_handler
+ * Send a message via DV service.
  *
- * @return handle to the DV service
+ * @param sh service handle
+ * @param target intended recpient
+ * @param msg message payload
+ * @param cb function to invoke when done
+ * @param cb_cls closure for @a cb
+ * @return handle to cancel the operation
  */
-struct GNUNET_DV_Handle *
-GNUNET_DV_connect (struct GNUNET_SCHEDULER_Handle *sched,
-                  const struct GNUNET_CONFIGURATION_Handle *cfg,
-                  GNUNET_DV_MessageReceivedHandler receive_handler,
-                  void *receive_handler_cls)
+struct GNUNET_DV_TransmitHandle *
+GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
+               const struct GNUNET_PeerIdentity *target,
+               const struct GNUNET_MessageHeader *msg,
+               GNUNET_DV_MessageSentCallback cb,
+               void *cb_cls)
 {
-  struct GNUNET_DV_Handle *handle;
-  struct GNUNET_MessageHeader *start_message;
-  struct StartContext *start_context;
-  handle = GNUNET_malloc(sizeof(struct GNUNET_DV_Handle));
-
-  handle->cfg = cfg;
-  handle->sched = sched;
-  handle->pending_list = NULL;
-  handle->current = NULL;
-  handle->do_destroy = GNUNET_NO;
-  handle->th = NULL;
-  handle->client = GNUNET_CLIENT_connect(sched, "dv", cfg);
-  handle->receive_handler = receive_handler;
-  handle->receive_cls = receive_handler_cls;
-
-  if (handle->client == NULL)
-    {
-      GNUNET_free(handle);
-      return NULL;
-    }
-
-  start_message = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
-  start_message->size = htons(sizeof(struct GNUNET_MessageHeader));
-  start_message->type = htons(GNUNET_MESSAGE_TYPE_DV_START);
-
-  start_context = GNUNET_malloc(sizeof(struct StartContext));
-  start_context->handle = handle;
-  start_context->message = start_message;
-  GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                       sizeof(struct GNUNET_MessageHeader),
-                                       GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60),
-                                       GNUNET_YES,
-                                       &transmit_start, start_context);
+  struct GNUNET_DV_TransmitHandle *th;
+  struct GNUNET_DV_SendMessage *sm;
+  struct ConnectedPeer *peer;
 
-  GNUNET_CLIENT_receive (handle->client,
-                         &handle_message_receipt,
-                         handle, GNUNET_TIME_UNIT_FOREVER_REL);
-
-  return handle;
+  if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to send %u bytes of type %u to %s via %p\n",
+       (unsigned int) ntohs (msg->size),
+       (unsigned int) ntohs (msg->type),
+       GNUNET_i2s (target),
+       sh->client);
+  peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                            target);
+  if (NULL == peer)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+  th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
+                     sizeof (struct GNUNET_DV_SendMessage) +
+                     ntohs (msg->size));
+  th->sh = sh;
+  th->target = peer;
+  th->cb = cb;
+  th->cb_cls = cb_cls;
+  th->msg = (const struct GNUNET_MessageHeader *) &th[1];
+  sm = (struct GNUNET_DV_SendMessage *) &th[1];
+  sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
+  sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
+                          ntohs (msg->size));
+  if (0 == sh->uid_gen)
+    sh->uid_gen = 1;
+  th->uid = sh->uid_gen;
+  sm->uid = htonl (sh->uid_gen++);
+  /* use memcpy here as 'target' may not be sufficiently aligned */
+  memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&sm[1], msg, ntohs (msg->size));
+  GNUNET_CONTAINER_DLL_insert_tail (sh->th_head,
+                                    sh->th_tail,
+                                    th);
+  start_transmit (sh);
+  return th;
 }
 
+
 /**
- * Disconnect from the DV service
+ * Abort send operation (naturally, the message may have
+ * already been transmitted; this only stops the 'cb'
+ * from being called again).
  *
- * @param handle the current handle to the service to disconnect
+ * @param th send operation to cancel
  */
-void GNUNET_DV_disconnect(struct GNUNET_DV_Handle *handle)
+void
+GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
 {
-  struct PendingMessages *pos;
-
-  GNUNET_assert(handle != NULL);
+  struct GNUNET_DV_ServiceHandle *sh = th->sh;
 
-  if (handle->th != NULL) /* We have a live transmit request in the Aether */
-    {
-      GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
-      handle->th = NULL;
-    }
-  if (handle->current != NULL) /* We are trying to send something now, clean it up */
-    GNUNET_free(handle->current);
-  while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
-    {
-      handle->pending_list = pos->next;
-      GNUNET_free(pos);
-    }
-  if (handle->client != NULL) /* Finally, disconnect from the service */
-    {
-      GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
-      handle->client = NULL;
-    }
-
-  GNUNET_free (handle);
+  if (NULL == th->msg)
+    GNUNET_CONTAINER_DLL_remove (th->target->head,
+                                th->target->tail,
+                                th);
+  else
+    GNUNET_CONTAINER_DLL_remove (sh->th_head,
+                                sh->th_tail,
+                                th);
+  GNUNET_free (th);
 }
 
+
 /* end of dv_api.c */