- fix 2699
[oweals/gnunet.git] / src / dv / dv_api.c
index 182f9ab72dee4e5e267f8ef4d4ac3467c94203a2..d3cdb0fb7ae01c352f896658a95701b6e04ad66f 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
 #include "gnunet_time_lib.h"
 #include "gnunet_dv_service.h"
 #include "dv.h"
+#include "gnunet_transport_plugin.h"
 
+#define LOG(kind,...) GNUNET_log_from (kind, "dv-api",__VA_ARGS__)
 
+/**
+ * Store ready to send messages
+ */
 struct PendingMessages
 {
   /**
@@ -57,17 +62,11 @@ struct PendingMessages
 
 };
 
-
-
 /**
  * Handle for the service.
  */
 struct GNUNET_DV_Handle
 {
-  /**
-   * Our scheduler.
-   */
-  struct GNUNET_SCHEDULER_Handle *sched;
 
   /**
    * Configuration to use.
@@ -94,11 +93,6 @@ struct GNUNET_DV_Handle
    */
   struct PendingMessages *current;
 
-  /**
-   * Kill off the connection and any pending messages.
-   */
-  int do_destroy;
-
   /**
    * Handler for messages we receive from the DV service
    */
@@ -109,12 +103,21 @@ struct GNUNET_DV_Handle
    */
   void *receive_cls;
 
+  /**
+   * Current unique ID
+   */
+  uint32_t uid_gen;
+
+  /**
+   * Hashmap containing outstanding send requests awaiting confirmation.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *send_callbacks;
+
 };
 
 
 struct StartContext
 {
-
   /**
    * Start message
    */
@@ -126,10 +129,52 @@ struct StartContext
   struct GNUNET_DV_Handle *handle;
 };
 
+struct SendCallbackContext
+{
+  /**
+   * The continuation to call once a message is confirmed sent (or failed)
+   */
+  GNUNET_TRANSPORT_TransmitContinuation cont;
+
+  /**
+   * Closure to call with send continuation.
+   */
+  void *cont_cls;
+
+  /**
+   * Target of the message.
+   */
+  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Payload size in bytes
+   */
+  size_t payload_size;
+
+  /**
+   * DV message size
+   */
+  size_t msg_size;
+};
+
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
+static void
+hash_from_uid (uint32_t uid, struct GNUNET_HashCode * hash)
+{
+  memset (hash, 0, sizeof (struct GNUNET_HashCode));
+  *((uint32_t *) hash) = uid;
+}
 
 /**
  * Try to (re)connect to the dv service.
  *
+ * @param ret handle to the (disconnected) dv service
+ *
  * @return GNUNET_YES on success, GNUNET_NO on failure.
  */
 static int
@@ -137,32 +182,45 @@ try_connect (struct GNUNET_DV_Handle *ret)
 {
   if (ret->client != NULL)
     return GNUNET_OK;
-  ret->client = GNUNET_CLIENT_connect (ret->sched, "dv", ret->cfg);
+  ret->client = GNUNET_CLIENT_connect ("dv", ret->cfg);
   if (ret->client != NULL)
     return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to the dv service!\n"));
+#if DEBUG_DV_MESSAGES
+  LOG (GNUNET_ERROR_TYPE_DEBUG, _("Failed to connect to the dv service!\n"));
 #endif
   return GNUNET_NO;
 }
 
-static void process_pending_message(struct GNUNET_DV_Handle *handle);
+static void
+process_pending_message (struct GNUNET_DV_Handle *handle);
 
 /**
  * Send complete, schedule next
+ *
+ * @param handle handle to the dv service
+ * @param code return code for send (unused)
  */
 static void
 finish (struct GNUNET_DV_Handle *handle, int code)
 {
   struct PendingMessages *pos = handle->current;
+
   handle->current = NULL;
   process_pending_message (handle);
 
+  GNUNET_free (pos->msg);
   GNUNET_free (pos);
 }
 
-
+/**
+ * Notification that we can send data
+ *
+ * @param cls handle to the dv service (struct GNUNET_DV_Handle)
+ * @param size how many bytes can we send
+ * @param buf where to copy the message to send
+ *
+ * @return how many bytes we copied to buf
+ */
 static size_t
 transmit_pending (void *cls, size_t size, void *buf)
 {
@@ -172,31 +230,34 @@ transmit_pending (void *cls, size_t size, void *buf)
 
 #if DEBUG_DV
   if (handle->current != NULL)
-    GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending called with message type %d\n", ntohs(handle->current->msg->header.type));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "DV API: Transmit pending called with message type %d\n",
+         ntohs (handle->current->msg->header.type));
 #endif
 
   if (buf == NULL)
-    {
+  {
 #if DEBUG_DV
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "DV API: Transmit pending FAILED!\n\n\n");
 #endif
-      finish(handle, GNUNET_SYSERR);
-      return 0;
-    }
+    finish (handle, GNUNET_SYSERR);
+    return 0;
+  }
   handle->th = NULL;
 
   ret = 0;
 
   if (handle->current != NULL)
   {
-    tsize = ntohs(handle->current->msg->header.size);
+    tsize = ntohs (handle->current->msg->header.size);
     if (size >= tsize)
     {
-      memcpy(buf, handle->current->msg, tsize);
+      memcpy (buf, handle->current->msg, tsize);
 #if DEBUG_DV
-      GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV API: Copied %d bytes into buffer!\n\n\n", tsize);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "DV API: Copied %d bytes into buffer!\n\n\n", tsize);
 #endif
-      finish(handle, GNUNET_OK);
+      finish (handle, GNUNET_OK);
       return tsize;
     }
 
@@ -207,45 +268,45 @@ transmit_pending (void *cls, size_t size, void *buf)
 
 /**
  * Try to send messages from list of messages to send
+ *
+ * @param handle handle to the distance vector service
  */
-static void process_pending_message(struct GNUNET_DV_Handle *handle)
+static void
+process_pending_message (struct GNUNET_DV_Handle *handle)
 {
 
   if (handle->current != NULL)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (handle))
-    {
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
+  {
+    finish (handle, GNUNET_SYSERR);
+    return;
+  }
 
   /* schedule next action */
   handle->current = handle->pending_list;
   if (NULL == handle->current)
-    {
-      if (handle->do_destroy)
-        {
-          handle->do_destroy = GNUNET_NO;
-          //GNUNET_DV_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
-        }
-      return;
-    }
+  {
+    return;
+  }
   handle->pending_list = handle->pending_list->next;
   handle->current->next = NULL;
 
   if (NULL ==
-      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                                    ntohs(handle->current->msg->msgbuf_size),
-                                                    handle->current->msg->timeout,
-                                                    GNUNET_YES,
-                                                    &transmit_pending, handle)))
-    {
+      (handle->th =
+       GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                            ntohs (handle->current->msg->
+                                                   header.size),
+                                            handle->current->msg->timeout,
+                                            GNUNET_YES, &transmit_pending,
+                                            handle)))
+  {
 #if DEBUG_DV
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to dv service.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to transmit request to dv service.\n");
 #endif
-      finish (handle, GNUNET_SYSERR);
-    }
+    finish (handle, GNUNET_SYSERR);
+  }
 }
 
 /**
@@ -254,88 +315,133 @@ static void process_pending_message(struct GNUNET_DV_Handle *handle)
  * @param handle handle to the specified DV api
  * @param msg the message to add to the list
  */
-static void add_pending(struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg)
+static void
+add_pending (struct GNUNET_DV_Handle *handle, struct GNUNET_DV_SendMessage *msg)
 {
   struct PendingMessages *new_message;
   struct PendingMessages *pos;
   struct PendingMessages *last;
 
-  new_message = GNUNET_malloc(sizeof(struct PendingMessages));
+  new_message = GNUNET_malloc (sizeof (struct PendingMessages));
   new_message->msg = msg;
 
   if (handle->pending_list != NULL)
+  {
+    pos = handle->pending_list;
+    while (pos != NULL)
     {
-      pos = handle->pending_list;
-      while(pos != NULL)
-        {
-          last = pos;
-          pos = pos->next;
-        }
-      new_message->next = last->next; /* Should always be null */
-      last->next = new_message;
+      last = pos;
+      pos = pos->next;
     }
+    last->next = new_message;
+  }
   else
-    {
-      new_message->next = handle->pending_list; /* Will always be null */
-      handle->pending_list = new_message;
-    }
+  {
+    handle->pending_list = new_message;
+  }
 
-  process_pending_message(handle);
+  process_pending_message (handle);
 }
 
-
-void handle_message_receipt (void *cls,
-                             const struct GNUNET_MessageHeader * msg)
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
+ */
+void
+handle_message_receipt (void *cls, const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_DV_Handle *handle = cls;
   struct GNUNET_DV_MessageReceived *received_msg;
+  struct GNUNET_DV_SendResultMessage *send_result_msg;
   size_t packed_msg_len;
   size_t sender_address_len;
   char *sender_address;
   char *packed_msg;
   char *packed_msg_start;
+  struct GNUNET_HashCode uidhash;
+  struct SendCallbackContext *send_ctx;
 
   if (msg == NULL)
   {
-    return; /* Connection closed? */
+#if DEBUG_DV_MESSAGES
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: connection closed\n");
+#endif
+    return;                     /* Connection closed? */
   }
 
-  GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE);
-
-  if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived))
-    return;
+  GNUNET_assert ((ntohs (msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE)
+                 || (ntohs (msg->type) ==
+                     GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT));
 
-  received_msg = (struct GNUNET_DV_MessageReceived *)msg;
-  packed_msg_len = ntohs(received_msg->msg_len);
-  sender_address_len = ntohs(received_msg->sender_address_len);
+  switch (ntohs (msg->type))
+  {
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE:
+    if (ntohs (msg->size) < sizeof (struct GNUNET_DV_MessageReceived))
+      return;
 
-  GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len));
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "dv api receives message, size checks out!\n");
+    received_msg = (struct GNUNET_DV_MessageReceived *) msg;
+    packed_msg_len = ntohl (received_msg->msg_len);
+    sender_address_len =
+        ntohs (msg->size) - packed_msg_len -
+        sizeof (struct GNUNET_DV_MessageReceived);
+    GNUNET_assert (sender_address_len > 0);
+    sender_address = GNUNET_malloc (sender_address_len);
+    memcpy (sender_address, &received_msg[1], sender_address_len);
+    packed_msg_start = (char *) &received_msg[1];
+    packed_msg = GNUNET_malloc (packed_msg_len);
+    memcpy (packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
+
+#if DEBUG_DV_MESSAGES
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "DV_API receive: packed message type: %d or %d\n",
+         ntohs (((struct GNUNET_MessageHeader *) packed_msg)->type),
+         ((struct GNUNET_MessageHeader *) packed_msg)->type);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "DV_API receive: message sender reported as %s\n",
+         GNUNET_i2s (&received_msg->sender));
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "DV_API receive: distance is %u\n",
+         ntohl (received_msg->distance));
 #endif
-  sender_address = GNUNET_malloc(sender_address_len);
-  memcpy(sender_address, &received_msg[1], sender_address_len);
-  packed_msg_start = (char *)&received_msg[1];
-  packed_msg = GNUNET_malloc(packed_msg_len);
-  memcpy(packed_msg, &packed_msg_start[sender_address_len], packed_msg_len);
 
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "packed message type: %d or %d\n", ntohs(((struct GNUNET_MessageHeader *)packed_msg)->type), ((struct GNUNET_MessageHeader *)packed_msg)->type);
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "message sender reported as %s\n", GNUNET_i2s(&received_msg->sender));
-#endif
-  handle->receive_handler(handle->receive_cls,
-                          &received_msg->sender,
-                          packed_msg,
-                          packed_msg_len,
-                          ntohl(received_msg->distance),
-                          sender_address,
-                          sender_address_len);
-
-  GNUNET_free(sender_address);
-
-  GNUNET_CLIENT_receive (handle->client,
-                         &handle_message_receipt,
-                         handle, GNUNET_TIME_UNIT_FOREVER_REL);
+    handle->receive_handler (handle->receive_cls, &received_msg->sender,
+                             packed_msg, packed_msg_len,
+                             ntohl (received_msg->distance), sender_address,
+                             sender_address_len);
+
+    GNUNET_free (sender_address);
+    break;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND_RESULT:
+    if (ntohs (msg->size) < sizeof (struct GNUNET_DV_SendResultMessage))
+      return;
+
+    send_result_msg = (struct GNUNET_DV_SendResultMessage *) msg;
+    hash_from_uid (ntohl (send_result_msg->uid), &uidhash);
+    send_ctx =
+        GNUNET_CONTAINER_multihashmap_get (handle->send_callbacks, &uidhash);
+
+    if ((send_ctx != NULL) && (send_ctx->cont != NULL))
+    {
+      if (ntohl (send_result_msg->result) == 0)
+      {
+        send_ctx->cont (send_ctx->cont_cls, &send_ctx->target, GNUNET_OK,
+                        send_ctx->payload_size, send_ctx->msg_size);
+      }
+      else
+      {
+        send_ctx->cont (send_ctx->cont_cls, &send_ctx->target, GNUNET_SYSERR,
+                        send_ctx->payload_size, 0);
+      }
+    }
+    GNUNET_free_non_null (send_ctx);
+    break;
+  default:
+    break;
+  }
+  GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle,
+                         GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
 /**
@@ -350,43 +456,69 @@ void handle_message_receipt (void *cls,
  * @param timeout how long can this message be delayed (pass through to core)
  * @param addr the address of this peer (internally known to DV)
  * @param addrlen the length of the peer address
+ * @param cont continuation to call once the message has been sent (or failed)
+ * @param cont_cls closure for continuation
  *
  */
-int GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle,
-                    const struct GNUNET_PeerIdentity *target,
-                    const char *msgbuf,
-                    size_t msgbuf_size,
-                    unsigned int priority,
-                    struct GNUNET_TIME_Relative timeout,
-                    const void *addr,
-                    size_t addrlen)
+int
+GNUNET_DV_send (struct GNUNET_DV_Handle *dv_handle,
+                const struct GNUNET_PeerIdentity *target, const char *msgbuf,
+                size_t msgbuf_size, unsigned int priority,
+                struct GNUNET_TIME_Relative timeout, const void *addr,
+                size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont,
+                void *cont_cls)
 {
   struct GNUNET_DV_SendMessage *msg;
+  struct SendCallbackContext *send_ctx;
   char *end_of_message;
-  /* FIXME: Copy message to end of thingy, can't just allocate dummy! */
-#if DEBUG_DV
-  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "DV SEND called with message of size %d, address size %d, total size to send is %d\n", msgbuf_size, addrlen, sizeof(struct GNUNET_DV_SendMessage) + msgbuf_size + addrlen);
+  struct GNUNET_HashCode uidhash;
+  int msize;
+
+#if DEBUG_DV_MESSAGES
+  dv_handle->uid_gen =
+      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
+#else
+  dv_handle->uid_gen++;
 #endif
-  msg = GNUNET_malloc(sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size);
-  msg->header.size = htons(sizeof(struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size);
-  msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
-  memcpy(&msg->target, target, sizeof(struct GNUNET_PeerIdentity));
-  msg->msgbuf_size = htons(msgbuf_size);
-  msg->priority = htonl(priority);
+
+  msize = sizeof (struct GNUNET_DV_SendMessage) + addrlen + msgbuf_size;
+  msg = GNUNET_malloc (msize);
+  msg->header.size = htons (msize);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_SEND);
+  memcpy (&msg->target, target, sizeof (struct GNUNET_PeerIdentity));
+  msg->priority = htonl (priority);
   msg->timeout = timeout;
-  msg->addrlen = htons(addrlen);
-  memcpy(&msg[1], addr, addrlen);
-  end_of_message = (char *)&msg[1];
+  msg->addrlen = htonl (addrlen);
+  msg->uid = htonl (dv_handle->uid_gen);
+  memcpy (&msg[1], addr, addrlen);
+  end_of_message = (char *) &msg[1];
   end_of_message = &end_of_message[addrlen];
-  memcpy(end_of_message, msgbuf, msgbuf_size);
-  add_pending(dv_handle, msg);
+  memcpy (end_of_message, msgbuf, msgbuf_size);
+  add_pending (dv_handle, msg);
+  send_ctx = GNUNET_malloc (sizeof (struct SendCallbackContext));
+  send_ctx->payload_size = msgbuf_size;
+  send_ctx->msg_size = msize;
+  send_ctx->cont = cont;
+  send_ctx->cont_cls = cont_cls;
+  memcpy (&send_ctx->target, target, sizeof (struct GNUNET_PeerIdentity));
+  hash_from_uid (dv_handle->uid_gen, &uidhash);
+  GNUNET_CONTAINER_multihashmap_put (dv_handle->send_callbacks, &uidhash,
+                                     send_ctx,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
 
   return GNUNET_OK;
 }
 
-/* Forward declaration */
-void GNUNET_DV_disconnect(struct GNUNET_DV_Handle *handle);
-
+/**
+ * Callback to transmit a start message to
+ * the DV service, once we can send
+ *
+ * @param cls struct StartContext
+ * @param size how much can we send
+ * @param buf where to copy the message
+ *
+ * @return number of bytes copied to buf
+ */
 static size_t
 transmit_start (void *cls, size_t size, void *buf)
 {
@@ -394,18 +526,27 @@ transmit_start (void *cls, size_t size, void *buf)
   struct GNUNET_DV_Handle *handle = start_context->handle;
   size_t tsize;
 
+#if DEBUG_DV
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "DV API: sending start request to service\n");
+#endif
   if (buf == NULL)
-    {
-      GNUNET_free(start_context->message);
-      GNUNET_free(start_context);
-      GNUNET_DV_disconnect(handle);
-      return 0;
-    }
+  {
+    GNUNET_free (start_context->message);
+    GNUNET_free (start_context);
+    GNUNET_DV_disconnect (handle);
+    return 0;
+  }
 
-  tsize = ntohs(start_context->message->size);
+  tsize = ntohs (start_context->message->size);
   if (size >= tsize)
   {
-    memcpy(buf, start_context->message, tsize);
+    memcpy (buf, start_context->message, tsize);
+    GNUNET_free (start_context->message);
+    GNUNET_free (start_context);
+    GNUNET_CLIENT_receive (handle->client, &handle_message_receipt, handle,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+
+
     return tsize;
   }
 
@@ -415,7 +556,6 @@ transmit_start (void *cls, size_t size, void *buf)
 /**
  * Connect to the DV service
  *
- * @param sched the scheduler to use
  * @param cfg the configuration to use
  * @param receive_handler method call when on receipt from the service
  * @param receive_handler_cls closure for receive_handler
@@ -423,48 +563,45 @@ transmit_start (void *cls, size_t size, void *buf)
  * @return handle to the DV service
  */
 struct GNUNET_DV_Handle *
-GNUNET_DV_connect (struct GNUNET_SCHEDULER_Handle *sched,
-                  const struct GNUNET_CONFIGURATION_Handle *cfg,
-                  GNUNET_DV_MessageReceivedHandler receive_handler,
-                  void *receive_handler_cls)
+GNUNET_DV_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                   GNUNET_DV_MessageReceivedHandler receive_handler,
+                   void *receive_handler_cls)
 {
   struct GNUNET_DV_Handle *handle;
   struct GNUNET_MessageHeader *start_message;
   struct StartContext *start_context;
-  handle = GNUNET_malloc(sizeof(struct GNUNET_DV_Handle));
+
+  handle = GNUNET_malloc (sizeof (struct GNUNET_DV_Handle));
 
   handle->cfg = cfg;
-  handle->sched = sched;
   handle->pending_list = NULL;
   handle->current = NULL;
-  handle->do_destroy = GNUNET_NO;
   handle->th = NULL;
-  handle->client = GNUNET_CLIENT_connect(sched, "dv", cfg);
+  handle->client = GNUNET_CLIENT_connect ("dv", cfg);
   handle->receive_handler = receive_handler;
   handle->receive_cls = receive_handler_cls;
 
   if (handle->client == NULL)
-    {
-      GNUNET_free(handle);
-      return NULL;
-    }
+  {
+    GNUNET_free (handle);
+    return NULL;
+  }
 
-  start_message = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
-  start_message->size = htons(sizeof(struct GNUNET_MessageHeader));
-  start_message->type = htons(GNUNET_MESSAGE_TYPE_DV_START);
+  start_message = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
+  start_message->size = htons (sizeof (struct GNUNET_MessageHeader));
+  start_message->type = htons (GNUNET_MESSAGE_TYPE_DV_START);
 
-  start_context = GNUNET_malloc(sizeof(struct StartContext));
+  start_context = GNUNET_malloc (sizeof (struct StartContext));
   start_context->handle = handle;
   start_context->message = start_message;
   GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                       sizeof(struct GNUNET_MessageHeader),
-                                       GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 60),
-                                       GNUNET_YES,
-                                       &transmit_start, start_context);
+                                       sizeof (struct GNUNET_MessageHeader),
+                                       GNUNET_TIME_relative_multiply
+                                       (GNUNET_TIME_UNIT_SECONDS, 60),
+                                       GNUNET_YES, &transmit_start,
+                                       start_context);
 
-  GNUNET_CLIENT_receive (handle->client,
-                         &handle_message_receipt,
-                         handle, GNUNET_TIME_UNIT_FOREVER_REL);
+  handle->send_callbacks = GNUNET_CONTAINER_multihashmap_create (100, GNUNET_NO);
 
   return handle;
 }
@@ -474,29 +611,30 @@ GNUNET_DV_connect (struct GNUNET_SCHEDULER_Handle *sched,
  *
  * @param handle the current handle to the service to disconnect
  */
-void GNUNET_DV_disconnect(struct GNUNET_DV_Handle *handle)
+void
+GNUNET_DV_disconnect (struct GNUNET_DV_Handle *handle)
 {
   struct PendingMessages *pos;
 
-  GNUNET_assert(handle != NULL);
+  GNUNET_assert (handle != NULL);
 
-  if (handle->th != NULL) /* We have a live transmit request in the Aether */
-    {
-      GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
-      handle->th = NULL;
-    }
-  if (handle->current != NULL) /* We are trying to send something now, clean it up */
-    GNUNET_free(handle->current);
-  while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
-    {
-      handle->pending_list = pos->next;
-      GNUNET_free(pos);
-    }
-  if (handle->client != NULL) /* Finally, disconnect from the service */
-    {
-      GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
-      handle->client = NULL;
-    }
+  if (handle->th != NULL)       /* We have a live transmit request in the Aether */
+  {
+    GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
+    handle->th = NULL;
+  }
+  if (handle->current != NULL)  /* We are trying to send something now, clean it up */
+    GNUNET_free (handle->current);
+  while (NULL != (pos = handle->pending_list))  /* Remove all pending sends from the list */
+  {
+    handle->pending_list = pos->next;
+    GNUNET_free (pos);
+  }
+  if (handle->client != NULL)   /* Finally, disconnect from the service */
+  {
+    GNUNET_CLIENT_disconnect (handle->client);
+    handle->client = NULL;
+  }
 
   GNUNET_free (handle);
 }