move bit distance function into util
[oweals/gnunet.git] / src / dht / dht_api.c
index 9fb77d5d4fab0c014e0314a24d34288949df1f9c..15faba6c97dec4631e3f80a53d8ca89538b94fbe 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @author Christian Grothoff
  * @author Nathan Evans
  *
- * TODO: Only allow a single message until confirmed as received by
- *       the service.  For put messages call continuation as soon as
- *       receipt acknowledged (then remove), for GET or other messages
- *       only call continuation when data received.
- *       Add unique identifier to message types requesting data to be
- *       returned.
+ * TODO: retransmission of pending requests maybe happens now, at least
+ *       the code is in place to do so.  Need to add checks when api calls
+ *       happen to check if retransmission is in progress, and if so set
+ *       the single pending message for transmission once the list of
+ *       retries are done.
  */
+
 #include "platform.h"
 #include "gnunet_bandwidth_lib.h"
 #include "gnunet_client_lib.h"
@@ -44,9 +44,7 @@
 #include "gnunet_dht_service.h"
 #include "dht.h"
 
-#define DEBUG_DHT_API GNUNET_YES
-
-#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+#define DEBUG_DHT_API GNUNET_NO
 
 struct PendingMessage
 {
@@ -72,22 +70,34 @@ struct PendingMessage
   void *cont_cls;
 
   /**
-   * Whether or not to await verification the message
-   * was received by the service
+   * Unique ID for this request
    */
-  size_t is_unique;
+  uint64_t unique_id;
 
   /**
-   * Unique ID for this request
+   * Free the saved message once sent, set
+   * to GNUNET_YES for messages that don't
+   * receive responses!
    */
-  uint64_t unique_id;
+  int free_on_send;
 
 };
 
-struct GNUNET_DHT_GetContext
+struct PendingMessageList
 {
+  /**
+   * This is a singly linked list.
+   */
+  struct PendingMessageList *next;
 
+  /**
+   * The pending message.
+   */
+  struct PendingMessage *message;
+};
 
+struct GNUNET_DHT_GetContext
+{
   /**
    * Iterator to call on data receipt
    */
@@ -100,9 +110,22 @@ struct GNUNET_DHT_GetContext
 
 };
 
+struct GNUNET_DHT_FindPeerContext
+{
+  /**
+   * Iterator to call on data receipt
+   */
+  GNUNET_DHT_FindPeerProcessor proc;
+
+  /**
+   * Closure for the iterator callback
+   */
+  void *proc_cls;
+
+};
+
 /**
- * Handle to control a unique operation (one that is
- * expected to return results)
+ * Handle to a route request
  */
 struct GNUNET_DHT_RouteHandle
 {
@@ -131,35 +154,68 @@ struct GNUNET_DHT_RouteHandle
    * Main handle to this DHT api
    */
   struct GNUNET_DHT_Handle *dht_handle;
+
+  /**
+   * The actual message sent for this request,
+   * used for retransmitting requests on service
+   * failure/reconnect.  Freed on route_stop.
+   */
+  struct GNUNET_DHT_RouteMessage *message;
 };
 
+
 /**
- * Handle for a non unique request, holds callback
- * which needs to be called before we allow other
- * messages to be processed and sent to the DHT service
+ * Handle to control a get operation.
  */
-struct GNUNET_DHT_NonUniqueHandle
+struct GNUNET_DHT_GetHandle
 {
   /**
-   * Key that this get request is for
+   * Handle to the actual route operation for the get
    */
-  GNUNET_HashCode key;
+  struct GNUNET_DHT_RouteHandle *route_handle;
 
   /**
-   * Type of data get request was for
+   * The context of the get request
    */
-  uint32_t type;
+  struct GNUNET_DHT_GetContext get_context;
+};
+
+
+/**
+ * Handle to control a find peer operation.
+ */
+struct GNUNET_DHT_FindPeerHandle
+{
+  /**
+     * Handle to the actual route operation for the request
+     */
+  struct GNUNET_DHT_RouteHandle *route_handle;
 
+    /**
+     * The context of the find peer request
+     */
+  struct GNUNET_DHT_FindPeerContext find_peer_context;
+};
+
+
+enum DHT_Retransmit_Stage
+{
   /**
-   * Continuation to call on service
-   * confirmation of message receipt.
+   * The API is not retransmitting anything at this time.
    */
-  GNUNET_SCHEDULER_Task cont;
+  DHT_NOT_RETRANSMITTING,
 
   /**
-   * Send continuation cls
+   * The API is retransmitting, and nothing has been single
+   * queued for sending.
    */
-  void *cont_cls;
+  DHT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and a single message has been
+   * queued for transmission once finished.
+   */
+  DHT_RETRANSMITTING_MESSAGE_QUEUED
 };
 
 
@@ -203,42 +259,294 @@ struct GNUNET_DHT_Handle
   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
 
   /**
-   * Non unique handle.  If set don't schedule another non
-   * unique request.
+   * Generator for unique ids.
+   */
+  uint64_t uid_gen;
+
+  /**
+   * Are we currently retransmitting requests?  If so queue a _single_
+   * new request when received.
    */
-  struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
+  enum DHT_Retransmit_Stage retransmit_stage;
 
   /**
-   * Kill off the connection and any pending messages.
+   * Linked list of retranmissions, to be used in the event
+   * of a dht service disconnect/reconnect.
    */
-  int do_destroy;
+  struct PendingMessageList *retransmissions;
 
+  /**
+   * A single pending message allowed to be scheduled
+   * during retransmission phase.
+   */
+  struct PendingMessage *retransmission_buffer;
 };
 
-static struct GNUNET_TIME_Relative default_request_timeout;
+
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
+static void
+hash_from_uid (uint64_t uid,
+              GNUNET_HashCode *hash)
+{
+  memset (hash, 0, sizeof(GNUNET_HashCode));
+  *((uint64_t*)hash) = uid;
+}
+
+#if RETRANSMIT
+/**
+ * Iterator callback to retransmit each outstanding request
+ * because the connection to the DHT service went down (and
+ * came back).
+ *
+ *
+ */
+static int retransmit_iterator (void *cls,
+                                const GNUNET_HashCode * key,
+                                void *value)
+{
+  struct GNUNET_DHT_RouteHandle *route_handle = value;
+  struct PendingMessageList *pending_message_list;
+
+  pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
+  pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
+  pending_message_list->message->msg = &route_handle->message->header;
+  pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
+  pending_message_list->message->cont = NULL;
+  pending_message_list->message->cont_cls = NULL;
+  pending_message_list->message->unique_id = route_handle->uid;
+  /* Add the new pending message to the front of the retransmission list */
+  pending_message_list->next = route_handle->dht_handle->retransmissions;
+  route_handle->dht_handle->retransmissions = pending_message_list;
+
+  return GNUNET_OK;
+}
+#endif
+
+/**
+ * Try to (re)connect to the dht service.
+ *
+ * @return GNUNET_YES on success, GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_DHT_Handle *handle)
+{
+  if (handle->client != NULL)
+    return GNUNET_OK;
+  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
+  if (handle->client != NULL)
+    return GNUNET_YES;
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              _("Failed to connect to the dht service!\n"));
+#endif
+  return GNUNET_NO;
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  GNUNET_HashCode uid_hash;
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos != NULL);
+  hash_from_uid (pos->unique_id, &uid_hash);
+  if (pos->cont != NULL)
+    {
+      if (code == GNUNET_SYSERR)
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
+      else
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    }
+
+  GNUNET_assert(handle->th == NULL);
+  if (pos->free_on_send == GNUNET_YES)
+    GNUNET_free(pos->msg);
+  GNUNET_free (pos);
+  handle->current = NULL;
+}
+
+/**
+ * Transmit the next pending message, called by notify_transmit_ready
+ */
+static size_t
+transmit_pending (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_DHT_Handle *handle = cls;
+  size_t tsize;
+
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': In transmit_pending\n", "DHT API");
+#endif
+  handle->th = NULL;
+
+  if (buf == NULL)
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': In transmit_pending buf is NULL\n", "DHT API");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return 0;
+    }
+
+  if (handle->current != NULL)
+    {
+      tsize = ntohs (handle->current->msg->size);
+      if (size >= tsize)
+        {
+#if DEBUG_DHT_API
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Sending message size %d\n", "DHT API", tsize);
+#endif
+          memcpy (buf, handle->current->msg, tsize);
+          finish (handle, GNUNET_OK);
+          return tsize;
+        }
+      else
+        {
+          return 0;
+        }
+    }
+  /* Have no pending request */
+  return 0;
+}
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_message (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      handle->th = NULL;
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ * Forward declaration.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
 
 /* Forward declaration */
-static void process_pending_message(struct GNUNET_DHT_Handle *handle);
+static size_t
+transmit_pending_retransmission (void *cls, size_t size, void *buf);
 
-static GNUNET_HashCode * hash_from_uid(uint64_t uid)
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
 {
-  int count;
-  int remaining;
-  GNUNET_HashCode *hash;
-  hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
-  count = 0;
 
-  while (count < sizeof(GNUNET_HashCode))
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
     {
-      remaining = sizeof(GNUNET_HashCode) - count;
-      if (remaining > sizeof(uid))
-        remaining = sizeof(uid);
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending_retransmission,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  struct PendingMessageList *pending_list;
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos == handle->retransmissions->message);
+  pending_list = handle->retransmissions;
+  handle->retransmissions = handle->retransmissions->next;
+  GNUNET_free (pending_list);
 
-      memcpy(hash, &uid, remaining);
-      count += remaining;
+  if (handle->retransmissions == NULL)
+    {
+      handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
     }
 
-  return hash;
+  if (handle->retransmissions != NULL)
+    {
+      handle->current = handle->retransmissions->message;
+      process_pending_retransmissions(handle);
+    }
+  else if (handle->retransmission_buffer != NULL)
+    {
+      handle->current = handle->retransmission_buffer;
+      process_pending_message(handle);
+    }
 }
 
 /**
@@ -246,88 +554,88 @@ static GNUNET_HashCode * hash_from_uid(uint64_t uid)
  * a demultiplexer which handles numerous message types
  *
  */
-void service_message_handler (void *cls,
-                              const struct GNUNET_MessageHeader *msg)
+void
+service_message_handler (void *cls,
+                         const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_DHT_Handle *handle = cls;
-  struct GNUNET_DHT_Message *dht_msg;
-  struct GNUNET_DHT_StopMessage *stop_msg;
+  struct GNUNET_DHT_RouteResultMessage *dht_msg;
   struct GNUNET_MessageHeader *enc_msg;
   struct GNUNET_DHT_RouteHandle *route_handle;
   uint64_t uid;
-  GNUNET_HashCode *uid_hash;
+  GNUNET_HashCode uid_hash;
   size_t enc_size;
-  /* TODO: find out message type, handle callbacks for different types of messages.
-   * Should be a non unique acknowledgment, or unique result. */
 
   if (msg == NULL)
-  {
+    {
 #if DEBUG_DHT_API
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "`%s': Received NULL from server, connection down?\n", "DHT API");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Received NULL from server, connection down!\n",
+                  "DHT API");
 #endif
-    return;
-  }
+      GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
+      handle->client = GNUNET_CLIENT_connect (handle->sched, 
+                                              "dht",
+                                              handle->cfg);
+      if (handle->current != NULL)
+        {
+          handle->th = NULL;
+          finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
+        }
+#if RETRANSMIT
+      if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
+        {
+          handle->retransmit_stage = DHT_RETRANSMITTING;
+          handle->current = handle->retransmissions->message;
+          process_pending_retransmissions(handle);
+        }
+#endif
+      return;
+    }
 
-  if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT)
-  {
-    dht_msg = (struct GNUNET_DHT_Message *)msg;
-    uid = GNUNET_ntohll(dht_msg->unique_id);
+  switch (ntohs (msg->type))
+    {
+    case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
+      {
+        dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
+        uid = GNUNET_ntohll (dht_msg->unique_id);
 #if DEBUG_DHT_API
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "`%s': Received response to message (uid %llu)\n",
+                    "DHT API", uid);
 #endif
-    if (ntohs(dht_msg->unique))
-      {
-        uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
-        route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
-        GNUNET_free(uid_hash);
-        if (route_handle == NULL) /* We have no recollection of this request */
+
+        hash_from_uid (uid, &uid_hash);
+        route_handle =
+          GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
+                                             &uid_hash);
+        if (route_handle == NULL)   /* We have no recollection of this request */
           {
 #if DEBUG_DHT_API
-          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
+            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                        "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
+                        "DHT API", uid);
 #endif
           }
         else
           {
-            enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
-            GNUNET_assert(enc_size > 0);
-            enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
-            route_handle->iter(route_handle->iter_cls, enc_msg);
+            enc_size =
+              ntohs (dht_msg->header.size) -
+              sizeof (struct GNUNET_DHT_RouteResultMessage);
+            GNUNET_assert (enc_size > 0);
+            enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
+            route_handle->iter (route_handle->iter_cls, enc_msg);
           }
+
+        break;
       }
-  }
-  else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP)
-  {
-    stop_msg = (struct GNUNET_DHT_StopMessage *)msg;
-    uid = GNUNET_ntohll(stop_msg->unique_id);
-#if DEBUG_DHT_API
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id);
-#endif
-    if (handle->current->unique_id == uid)
+    default:
       {
-#if DEBUG_DHT_API
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "`%s': Have pending confirmation for this message!\n", "DHT API", uid);
-#endif
-        if (handle->current->cont != NULL)
-          GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-
-        GNUNET_free(handle->current->msg);
-        GNUNET_free(handle->current);
-        handle->current = NULL;
+        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                    "`%s': Received unknown message type %d\n", "DHT API",
+                    ntohs (msg->type));
       }
-  }
-  else
-  {
-#if DEBUG_DHT_API
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type));
-#endif
-  }
-
+    }
   GNUNET_CLIENT_receive (handle->client,
                          &service_message_handler,
                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
@@ -352,24 +660,18 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
 {
   struct GNUNET_DHT_Handle *handle;
 
-  handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
-
-  default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
+  handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
   handle->cfg = cfg;
   handle->sched = sched;
-
-  handle->current = NULL;
-  handle->do_destroy = GNUNET_NO;
-  handle->th = NULL;
-
-  handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
-  handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
-
+  handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
+  handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
   if (handle->client == NULL)
     {
-      GNUNET_free(handle);
+      GNUNET_free (handle);
       return NULL;
     }
+  handle->outstanding_requests =
+    GNUNET_CONTAINER_multihashmap_create (ht_len);
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Connection to service in progress\n", "DHT API");
@@ -377,7 +679,6 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
   GNUNET_CLIENT_receive (handle->client,
                          &service_message_handler,
                          handle, GNUNET_TIME_UNIT_FOREVER_REL);
-
   return handle;
 }
 
@@ -394,69 +695,39 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
 #endif
-  GNUNET_assert(handle != NULL);
-
-  if (handle->th != NULL) /* We have a live transmit request in the Aether */
+  GNUNET_assert (handle != NULL);
+  if (handle->th != NULL)       /* We have a live transmit request */
     {
       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
       handle->th = NULL;
     }
-  if (handle->current != NULL) /* We are trying to send something now, clean it up */
-    GNUNET_free(handle->current);
+  if (handle->current != NULL)  /* We are trying to send something now, clean it up */
+    GNUNET_free (handle->current);
 
-  if (handle->client != NULL) /* Finally, disconnect from the service */
+  if (handle->client != NULL)   /* Finally, disconnect from the service */
     {
       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
       handle->client = NULL;
     }
 
+  GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
+  GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
   GNUNET_free (handle);
 }
 
 
-/**
- * Send complete (or failed), schedule next (or don't)
- */
-static void
-finish (struct GNUNET_DHT_Handle *handle, int code)
-{
-  /* TODO: if code is not GNUNET_OK, do something! */
-  struct PendingMessage *pos = handle->current;
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Finish called!\n", "DHT API");
-#endif
-  GNUNET_assert(pos != NULL);
-
-  if (pos->is_unique)
-    {
-      if (pos->cont != NULL)
-      {
-        if (code == GNUNET_SYSERR)
-          GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
-        else
-          GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      }
-
-      GNUNET_free(pos->msg);
-      handle->current = NULL;
-      GNUNET_free(pos);
-    }
-  /* Otherwise we need to wait for a response to this message! */
-}
-
 /**
  * Transmit the next pending message, called by notify_transmit_ready
  */
 static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+transmit_pending_retransmission (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DHT_Handle *handle = cls;
   size_t tsize;
 
 #if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': In transmit_pending\n", "DHT API");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': In transmit_pending\n", "DHT API");
 #endif
   if (buf == NULL)
     {
@@ -464,109 +735,288 @@ transmit_pending (void *cls, size_t size, void *buf)
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
 #endif
-      /* FIXME: free associated resources or summat */
-      finish(handle, GNUNET_SYSERR);
+      finish_retransmission (handle, GNUNET_SYSERR);
       return 0;
     }
 
   handle->th = NULL;
 
   if (handle->current != NULL)
-  {
-    tsize = ntohs(handle->current->msg->size);
-    if (size >= tsize)
     {
+      tsize = ntohs (handle->current->msg->size);
+      if (size >= tsize)
+        {
 #if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Sending message size %d\n", "DHT API", tsize);
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Sending message size %d\n", "DHT API", tsize);
 #endif
-      memcpy(buf, handle->current->msg, tsize);
-      finish(handle, GNUNET_OK);
-      return tsize;
-    }
-    else
-    {
-      return 0;
+          memcpy (buf, handle->current->msg, tsize);
+          finish_retransmission (handle, GNUNET_OK);
+          return tsize;
+        }
+      else
+        {
+          return 0;
+        }
     }
-  }
   /* Have no pending request */
   return 0;
 }
 
 
 /**
- * Try to (re)connect to the dht service.
- *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
+ * Iterator called on each result obtained from a generic route
+ * operation
  */
-static int
-try_connect (struct GNUNET_DHT_Handle *handle)
+void
+get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 {
-  if (handle->client != NULL)
-    return GNUNET_OK;
-  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
-  if (handle->client != NULL)
-    return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to the dht service!\n"));
-#endif
-  return GNUNET_NO;
+  struct GNUNET_DHT_GetHandle *get_handle = cls;
+  struct GNUNET_DHT_GetResultMessage *result;
+  size_t data_size;
+  char *result_data;
+
+  if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
+    return;
+
+  GNUNET_assert (ntohs (reply->size) >=
+                 sizeof (struct GNUNET_DHT_GetResultMessage));
+  result = (struct GNUNET_DHT_GetResultMessage *) reply;
+  data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
+
+  result_data = (char *) &result[1];    /* Set data pointer to end of message */
+
+  get_handle->get_context.iter (get_handle->get_context.iter_cls,
+                                GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
+                                ntohs (result->type), data_size, result_data);
 }
 
 
 /**
- * Try to send messages from list of messages to send
+ * Iterator called on each result obtained from a generic route
+ * operation
  */
-static void process_pending_message(struct GNUNET_DHT_Handle *handle)
+void
+find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 {
+  struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
+  struct GNUNET_MessageHeader *hello;
 
-  if (handle->current == NULL)
-    return;                     /* action already pending */
-  if (GNUNET_YES != try_connect (handle))
+  if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
     {
-      finish (handle, GNUNET_SYSERR);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Received wrong type of response to a find peer request...\n");
       return;
     }
 
-  /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
-  if (handle->do_destroy)
+
+  GNUNET_assert (ntohs (reply->size) >=
+                 sizeof (struct GNUNET_MessageHeader));
+  hello = (struct GNUNET_MessageHeader *)&reply[1];
+
+  if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
     {
-      //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
+      return;
     }
+  find_peer_handle->find_peer_context.proc (find_peer_handle->
+                                            find_peer_context.proc_cls,
+                                            (struct GNUNET_HELLO_Message *)hello);
+}
 
+/**
+ * Send a message to the DHT telling it to start issuing random GET
+ * requests every 'frequency' milliseconds.
+ *
+ * @param handle handle to the DHT service
+ * @param frequency delay (in milliseconds) between sending malicious messages
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
 
-  if (NULL ==
-      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                                    ntohs(handle->current->msg->size),
-                                                    handle->current->timeout,
-                                                    GNUNET_YES,
-                                                    &transmit_pending, handle)))
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
+  msg->variable = htons(frequency);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
     {
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to dht service.\n");
-#endif
-      finish (handle, GNUNET_SYSERR);
+      handle->current = pending;
+      process_pending_message (handle);
     }
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
-#endif
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
 }
 
 /**
- * Iterator called on each result obtained from a generic route
- * operation
+ * Send a message to the DHT telling it to issue a single find
+ * peer request using the peers unique identifier as key.  This
+ * is used to fill the routing table, and is normally controlled
+ * by the DHT itself.  However, for testing and perhaps more
+ * close control over the DHT, this can be explicitly managed.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
  */
-void get_reply_iterator (void *cls,
-                         const struct GNUNET_MessageHeader *reply)
+int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
+                           GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
 
+  return GNUNET_YES;
 }
 
 /**
- * Perform an asynchronous FIND_PEER operation on the DHT.
+ * Send a message to the DHT telling it to start issuing random PUT
+ * requests every 'frequency' milliseconds.
+ *
+ * @param handle handle to the DHT service
+ * @param frequency delay (in milliseconds) between sending malicious messages
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
+  msg->variable = htons(frequency);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to start dropping
+ * all requests received.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
+  msg->variable = htons(0);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Initiate a generic DHT route operation.
  *
  * @param handle handle to the DHT service
  * @param key the key to look up
@@ -589,93 +1039,81 @@ void get_reply_iterator (void *cls,
  */
 struct GNUNET_DHT_RouteHandle *
 GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
-                        const GNUNET_HashCode *key,
+                        const GNUNET_HashCode * key,
                         unsigned int desired_replication_level,
                         enum GNUNET_DHT_RouteOption options,
                         const struct GNUNET_MessageHeader *enc,
                         struct GNUNET_TIME_Relative timeout,
                         GNUNET_DHT_ReplyProcessor iter,
                         void *iter_cls,
-                        GNUNET_SCHEDULER_Task cont,
-                        void *cont_cls)
+                        GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_RouteHandle *route_handle;
   struct PendingMessage *pending;
-  struct GNUNET_DHT_Message *message;
-  size_t is_unique;
-  size_t msize;
-  GNUNET_HashCode *uid_key;
-  uint64_t uid;
+  struct GNUNET_DHT_RouteMessage *message;
+  uint16_t msize;
+  GNUNET_HashCode uid_key;
 
-  is_unique = GNUNET_YES;
-  if (iter == NULL)
-    is_unique = GNUNET_NO;
-
-  route_handle = NULL;
-  uid_key = NULL;
-
-  do
-  {
-    GNUNET_free_non_null(uid_key);
-    uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
-    uid_key = hash_from_uid(uid);
-  } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES);
-
-  if (is_unique)
-    {
-      route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
-      memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
-      route_handle->iter = iter;
-      route_handle->iter_cls = iter_cls;
-      route_handle->dht_handle = handle;
-      route_handle->uid = uid;
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Unique ID is %llu\n", "DHT API", uid);
-#endif
-      /**
-       * Store based on random identifier!
-       */
-      GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-      msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return NULL;
 
+  if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      return NULL;
     }
-  else
+
+  route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
+  memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
+  route_handle->iter = iter;
+  route_handle->iter_cls = iter_cls;
+  route_handle->dht_handle = handle;
+  route_handle->uid = handle->uid_gen++;
+  if (iter != NULL)
     {
-      msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
+      hash_from_uid (route_handle->uid, &uid_key);
+      GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
+                                         &uid_key, route_handle,
+                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     }
 
-  GNUNET_free(uid_key);
-  message = GNUNET_malloc(msize);
-  message->header.size = htons(msize);
-  message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
-  memcpy(&message->key, key, sizeof(GNUNET_HashCode));
-  message->options = htons(options);
-  message->desired_replication_level = htons(options);
-  message->unique = htons(is_unique);
-  message->unique_id = GNUNET_htonll(uid);
-  memcpy(&message[1], enc, ntohs(enc->size));
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
+#endif
 
-  pending = GNUNET_malloc(sizeof(struct PendingMessage));
+  msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
+  message = GNUNET_malloc (msize);
+  message->header.size = htons (msize);
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
+  memcpy (&message->key, key, sizeof (GNUNET_HashCode));
+  message->options = htonl (options);
+  message->desired_replication_level = htonl (desired_replication_level);
+  message->unique_id = GNUNET_htonll (route_handle->uid);
+  memcpy (&message[1], enc, ntohs (enc->size));
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = &message->header;
   pending->timeout = timeout;
+  if (iter == NULL)
+    pending->free_on_send = GNUNET_YES;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->is_unique = is_unique;
-  pending->unique_id = uid;
-
-  GNUNET_assert(handle->current == NULL);
-
-  handle->current = pending;
-
-  process_pending_message(handle);
+  pending->unique_id = route_handle->uid;
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
 
+  route_handle->message = message;
   return route_handle;
 }
 
-void
-GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
-
 
 /**
  * Perform an asynchronous GET operation on the DHT identified.
@@ -691,135 +1129,233 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
  *
  * @return handle to stop the async get
  */
-struct GNUNET_DHT_RouteHandle *
+struct GNUNET_DHT_GetHandle *
 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
                       struct GNUNET_TIME_Relative timeout,
-                      uint32_t type,
+                      enum GNUNET_BLOCK_Type type,
                       const GNUNET_HashCode * key,
                       GNUNET_DHT_GetIterator iter,
                       void *iter_cls,
-                      GNUNET_SCHEDULER_Task cont,
-                      void *cont_cls)
+                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
-  struct GNUNET_DHT_GetContext *get_context;
-  struct GNUNET_DHT_GetMessage *get_msg;
+  struct GNUNET_DHT_GetHandle *get_handle;
+  struct GNUNET_DHT_GetMessage get_msg;
 
-  if (handle->current != NULL) /* Can't send right now, we have a pending message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
     return NULL;
 
-  get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext));
-  get_context->iter = iter;
-  get_context->iter_cls = iter_cls;
+  get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
+  get_handle->get_context.iter = iter;
+  get_handle->get_context.iter_cls = iter_cls;
 
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key));
+              "`%s': Inserting pending get request with key %s\n", "DHT API",
+              GNUNET_h2s (key));
 #endif
 
-  get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
-  get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
-  get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
-  get_msg->type = htonl(type);
+  get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
+  get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
+  get_msg.type = htons (type);
 
-  return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, cont, cont_cls);
+  get_handle->route_handle =
+    GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout,
+                            &get_reply_iterator, get_handle, cont, cont_cls);
 
+  return get_handle;
 }
 
 
+/**
+ * Stop a previously issued routing request
+ *
+ * @param route_handle handle to the request to stop
+ * @param cont continuation to call once this message is sent to the service or times out
+ * @param cont_cls closure for the continuation
+ */
 void
-GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
+GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
+                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct PendingMessage *pending;
   struct GNUNET_DHT_StopMessage *message;
   size_t msize;
-  GNUNET_HashCode *uid_key;
+  GNUNET_HashCode uid_key;
 
-  msize = sizeof(struct GNUNET_DHT_StopMessage);
-
-  message = GNUNET_malloc(msize);
-  message->header.size = htons(msize);
-  message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
+  msize = sizeof (struct GNUNET_DHT_StopMessage);
+  message = GNUNET_malloc (msize);
+  message->header.size = htons (msize);
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
 #if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Remove outstanding request for uid %llu\n", "DHT API",
+              route_handle->uid);
 #endif
-  message->unique_id = GNUNET_htonll(route_handle->uid);
-
-  GNUNET_assert(route_handle->dht_handle->current == NULL);
-
-  pending = GNUNET_malloc(sizeof(struct PendingMessage));
-  pending->msg = (struct GNUNET_MessageHeader *)message;
-  pending->timeout = DEFAULT_DHT_TIMEOUT;
-  pending->cont = NULL;
-  pending->cont_cls = NULL;
-  pending->is_unique = GNUNET_NO;
-  pending->unique_id = route_handle->uid;
+  message->unique_id = GNUNET_htonll (route_handle->uid);
+  memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = (struct GNUNET_MessageHeader *) message;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->free_on_send = GNUNET_YES;
+  pending->unique_id = 0; /* When finished is called, free pending->msg */
 
-  GNUNET_assert(route_handle->dht_handle->current == NULL);
+  if (route_handle->dht_handle->current == NULL)
+    {
+      route_handle->dht_handle->current = pending;
+      process_pending_message (route_handle->dht_handle);
+    }
+  else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
+    {
+      route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+      route_handle->dht_handle->retransmission_buffer = pending;
+    }
+  else
+    {
+      GNUNET_free(pending);
+      GNUNET_break(0);
+    }
 
-  route_handle->dht_handle->current = pending;
+  hash_from_uid (route_handle->uid, &uid_key);
+  GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
+                (route_handle->dht_handle->outstanding_requests, &uid_key,
+                 route_handle) == GNUNET_YES);
 
-  process_pending_message(route_handle->dht_handle);
+  GNUNET_free(route_handle->message);
+  GNUNET_free(route_handle);
+}
 
-  uid_key = hash_from_uid(route_handle->uid);
 
-  if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES)
+/**
+ * Stop async DHT-get.
+ *
+ * @param get_handle handle to the GET operation to stop
+ * @param cont continuation to call once this message is sent to the service or times out
+ * @param cont_cls closure for the continuation
+ */
+void
+GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
+                     GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  if ((get_handle->route_handle->dht_handle->current != NULL) &&
+      (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
     {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Removing pending get request with key %s, uid %llu\n",
+              "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
+              get_handle->route_handle->uid);
 #endif
-    }
-  GNUNET_free(uid_key);
-  return;
+  GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
+  GNUNET_free (get_handle);
 }
 
 
 /**
- * Stop async DHT-get.
+ * Perform an asynchronous FIND PEER operation on the DHT.
  *
- * @param get_handle handle to the GET operation to stop
+ * @param handle handle to the DHT service
+ * @param timeout timeout for this request to be sent to the
+ *        service
+ * @param options routing options for this message
+ * @param key the key to look up
+ * @param proc function to call on each result
+ * @param proc_cls closure for proc
+ * @param cont continuation to call once message sent
+ * @param cont_cls closure for continuation
+ *
+ * @return handle to stop the async get, NULL on error
  */
-void
-GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *get_handle)
+struct GNUNET_DHT_FindPeerHandle *
+GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
+                            struct GNUNET_TIME_Relative timeout,
+                            enum GNUNET_DHT_RouteOption options,
+                            const GNUNET_HashCode * key,
+                            GNUNET_DHT_FindPeerProcessor proc,
+                            void *proc_cls,
+                            GNUNET_SCHEDULER_Task cont,
+                            void *cont_cls)
 {
-#if OLDREMOVE
-  struct GNUNET_DHT_GetMessage *get_msg;
-  struct GNUNET_DHT_Handle *handle;
-  GNUNET_HashCode *uid_key;
-#endif
+  struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
+  struct GNUNET_DHT_FindPeerMessage find_peer_msg;
 
-  GNUNET_DHT_route_stop(get_handle);
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
+    return NULL;
 
-#if OLDREMOVE
-  uid_key = hash_from_uid(get_handle->uid);
-  GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES);
+  find_peer_handle =
+    GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
+  find_peer_handle->find_peer_context.proc = proc;
+  find_peer_handle->find_peer_context.proc_cls = proc_cls;
 
-  if (handle->do_destroy == GNUNET_NO)
-    {
-      get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
-      get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
-      get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
+              "FIND PEER", GNUNET_h2s (key));
+#endif
 
+  find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
+  find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+  find_peer_handle->route_handle =
+    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
+                            timeout, &find_peer_reply_iterator,
+                            find_peer_handle, cont, cont_cls);
+  return find_peer_handle;
+}
 
+/**
+ * Stop async find peer.  Frees associated resources.
+ *
+ * @param find_peer_handle GET operation to stop.
+ * @param cont continuation to call once this message is sent to the service or times out
+ * @param cont_cls closure for the continuation
+ */
+void
+GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
+                           GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
+      (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
     }
-#endif
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&get_handle->key), get_handle->uid);
+              "`%s': Removing pending `%s' request with key %s, uid %llu\n",
+              "DHT API", "FIND PEER",
+              GNUNET_h2s (&find_peer_handle->route_handle->key),
+              find_peer_handle->route_handle->uid);
 #endif
+  GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
+  GNUNET_free (find_peer_handle);
+
 }
 
 
 /**
  * Perform a PUT operation storing data in the DHT.
  *
- * @param h handle to DHT service
+ * @param handle handle to DHT service
  * @param key the key to store under
  * @param type type of the value
  * @param size number of bytes in data; must be less than 64k
  * @param data the data to store
  * @param exp desired expiration time for the value
+ * @param timeout how long to wait for transmission of this request
  * @param cont continuation to call when done;
  *             reason will be TIMEOUT on error,
  *             reason will be PREREQ_DONE on success
@@ -830,38 +1366,61 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *get_handle)
 void
 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 const GNUNET_HashCode * key,
-                uint32_t type,
+                enum GNUNET_BLOCK_Type type,
                 uint32_t size,
                 const char *data,
                 struct GNUNET_TIME_Absolute exp,
                 struct GNUNET_TIME_Relative timeout,
-                GNUNET_SCHEDULER_Task cont,
-                void *cont_cls)
+                GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_PutMessage *put_msg;
+  struct GNUNET_DHT_RouteHandle *put_route;
   size_t msize;
 
-  if (handle->current != NULL)
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
     {
-      GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
       return;
     }
 
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key));
+              "`%s': Inserting pending put request with key %s\n", "DHT API",
+              GNUNET_h2s (key));
 #endif
 
-  msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
-  put_msg = GNUNET_malloc(msize);
-  put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
-  put_msg->header.size = htons(msize);
-  put_msg->type = htonl(type);
-  put_msg->data_size = htons(size);
-  put_msg->expiration = exp;
-  memcpy(&put_msg[1], data, size);
+  msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
+  put_msg = GNUNET_malloc (msize);
+  put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
+  put_msg->header.size = htons (msize);
+  put_msg->type = hton(type);
+  put_msg->data_size = htons (size);
+  put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
+  memcpy (&put_msg[1], data, size);
 
-  GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls);
+  put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL,
+                                      NULL, cont, cont_cls);
 
-  GNUNET_free(put_msg);
+  if (put_route == NULL) /* Route start failed! */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+    }
+  else
+    {
+      GNUNET_free(put_route);
+    }
+
+  GNUNET_free (put_msg);
 }
+
+/* end of dht_api.c */