move bit distance function into util
[oweals/gnunet.git] / src / dht / dht_api.c
index 7dd3305ebc17b416befc7b8a19ab6511b37b2daa..15faba6c97dec4631e3f80a53d8ca89538b94fbe 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
  * @author Christian Grothoff
  * @author Nathan Evans
  *
+ * TODO: retransmission of pending requests maybe happens now, at least
+ *       the code is in place to do so.  Need to add checks when api calls
+ *       happen to check if retransmission is in progress, and if so set
+ *       the single pending message for transmission once the list of
+ *       retries are done.
  */
 
 #include "platform.h"
@@ -41,8 +46,6 @@
 
 #define DEBUG_DHT_API GNUNET_NO
 
-#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
-
 struct PendingMessage
 {
   /**
@@ -67,18 +70,32 @@ struct PendingMessage
   void *cont_cls;
 
   /**
-   * Whether or not to await verification the message
-   * was received by the service
+   * Unique ID for this request
    */
-  size_t is_unique;
+  uint64_t unique_id;
 
   /**
-   * Unique ID for this request
+   * Free the saved message once sent, set
+   * to GNUNET_YES for messages that don't
+   * receive responses!
    */
-  uint64_t unique_id;
+  int free_on_send;
 
 };
 
+struct PendingMessageList
+{
+  /**
+   * This is a singly linked list.
+   */
+  struct PendingMessageList *next;
+
+  /**
+   * The pending message.
+   */
+  struct PendingMessage *message;
+};
+
 struct GNUNET_DHT_GetContext
 {
   /**
@@ -108,8 +125,7 @@ struct GNUNET_DHT_FindPeerContext
 };
 
 /**
- * Handle to control a unique operation (one that is
- * expected to return results)
+ * Handle to a route request
  */
 struct GNUNET_DHT_RouteHandle
 {
@@ -138,37 +154,16 @@ struct GNUNET_DHT_RouteHandle
    * Main handle to this DHT api
    */
   struct GNUNET_DHT_Handle *dht_handle;
-};
 
-/**
- * Handle for a non unique request, holds callback
- * which needs to be called before we allow other
- * messages to be processed and sent to the DHT service
- */
-struct GNUNET_DHT_NonUniqueHandle
-{
   /**
-   * Key that this get request is for
-   */
-  GNUNET_HashCode key;
-
-  /**
-   * Type of data get request was for
-   */
-  uint32_t type;
-
-  /**
-   * Continuation to call on service
-   * confirmation of message receipt.
+   * The actual message sent for this request,
+   * used for retransmitting requests on service
+   * failure/reconnect.  Freed on route_stop.
    */
-  GNUNET_SCHEDULER_Task cont;
-
-  /**
-   * Send continuation cls
-   */
-  void *cont_cls;
+  struct GNUNET_DHT_RouteMessage *message;
 };
 
+
 /**
  * Handle to control a get operation.
  */
@@ -185,6 +180,7 @@ struct GNUNET_DHT_GetHandle
   struct GNUNET_DHT_GetContext get_context;
 };
 
+
 /**
  * Handle to control a find peer operation.
  */
@@ -196,12 +192,33 @@ struct GNUNET_DHT_FindPeerHandle
   struct GNUNET_DHT_RouteHandle *route_handle;
 
     /**
-     * The context of the get request
+     * The context of the find peer request
      */
   struct GNUNET_DHT_FindPeerContext find_peer_context;
 };
 
 
+enum DHT_Retransmit_Stage
+{
+  /**
+   * The API is not retransmitting anything at this time.
+   */
+  DHT_NOT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and nothing has been single
+   * queued for sending.
+   */
+  DHT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and a single message has been
+   * queued for transmission once finished.
+   */
+  DHT_RETRANSMITTING_MESSAGE_QUEUED
+};
+
+
 /**
  * Connection to the DHT service.
  */
@@ -242,16 +259,27 @@ struct GNUNET_DHT_Handle
   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
 
   /**
-   * Non unique handle.  If set don't schedule another non
-   * unique request.
+   * Generator for unique ids.
    */
-  struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
+  uint64_t uid_gen;
 
   /**
-   * Generator for unique ids.
+   * Are we currently retransmitting requests?  If so queue a _single_
+   * new request when received.
    */
-  uint64_t uid_gen;
+  enum DHT_Retransmit_Stage retransmit_stage;
+
+  /**
+   * Linked list of retranmissions, to be used in the event
+   * of a dht service disconnect/reconnect.
+   */
+  struct PendingMessageList *retransmissions;
 
+  /**
+   * A single pending message allowed to be scheduled
+   * during retransmission phase.
+   */
+  struct PendingMessage *retransmission_buffer;
 };
 
 
@@ -269,6 +297,257 @@ hash_from_uid (uint64_t uid,
   *((uint64_t*)hash) = uid;
 }
 
+#if RETRANSMIT
+/**
+ * Iterator callback to retransmit each outstanding request
+ * because the connection to the DHT service went down (and
+ * came back).
+ *
+ *
+ */
+static int retransmit_iterator (void *cls,
+                                const GNUNET_HashCode * key,
+                                void *value)
+{
+  struct GNUNET_DHT_RouteHandle *route_handle = value;
+  struct PendingMessageList *pending_message_list;
+
+  pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
+  pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
+  pending_message_list->message->msg = &route_handle->message->header;
+  pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
+  pending_message_list->message->cont = NULL;
+  pending_message_list->message->cont_cls = NULL;
+  pending_message_list->message->unique_id = route_handle->uid;
+  /* Add the new pending message to the front of the retransmission list */
+  pending_message_list->next = route_handle->dht_handle->retransmissions;
+  route_handle->dht_handle->retransmissions = pending_message_list;
+
+  return GNUNET_OK;
+}
+#endif
+
+/**
+ * Try to (re)connect to the dht service.
+ *
+ * @return GNUNET_YES on success, GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_DHT_Handle *handle)
+{
+  if (handle->client != NULL)
+    return GNUNET_OK;
+  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
+  if (handle->client != NULL)
+    return GNUNET_YES;
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              _("Failed to connect to the dht service!\n"));
+#endif
+  return GNUNET_NO;
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  GNUNET_HashCode uid_hash;
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos != NULL);
+  hash_from_uid (pos->unique_id, &uid_hash);
+  if (pos->cont != NULL)
+    {
+      if (code == GNUNET_SYSERR)
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
+      else
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    }
+
+  GNUNET_assert(handle->th == NULL);
+  if (pos->free_on_send == GNUNET_YES)
+    GNUNET_free(pos->msg);
+  GNUNET_free (pos);
+  handle->current = NULL;
+}
+
+/**
+ * Transmit the next pending message, called by notify_transmit_ready
+ */
+static size_t
+transmit_pending (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_DHT_Handle *handle = cls;
+  size_t tsize;
+
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': In transmit_pending\n", "DHT API");
+#endif
+  handle->th = NULL;
+
+  if (buf == NULL)
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': In transmit_pending buf is NULL\n", "DHT API");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return 0;
+    }
+
+  if (handle->current != NULL)
+    {
+      tsize = ntohs (handle->current->msg->size);
+      if (size >= tsize)
+        {
+#if DEBUG_DHT_API
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Sending message size %d\n", "DHT API", tsize);
+#endif
+          memcpy (buf, handle->current->msg, tsize);
+          finish (handle, GNUNET_OK);
+          return tsize;
+        }
+      else
+        {
+          return 0;
+        }
+    }
+  /* Have no pending request */
+  return 0;
+}
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_message (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      handle->th = NULL;
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ * Forward declaration.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
+
+/* Forward declaration */
+static size_t
+transmit_pending_retransmission (void *cls, size_t size, void *buf);
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending_retransmission,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  struct PendingMessageList *pending_list;
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos == handle->retransmissions->message);
+  pending_list = handle->retransmissions;
+  handle->retransmissions = handle->retransmissions->next;
+  GNUNET_free (pending_list);
+
+  if (handle->retransmissions == NULL)
+    {
+      handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
+    }
+
+  if (handle->retransmissions != NULL)
+    {
+      handle->current = handle->retransmissions->message;
+      process_pending_retransmissions(handle);
+    }
+  else if (handle->retransmission_buffer != NULL)
+    {
+      handle->current = handle->retransmission_buffer;
+      process_pending_message(handle);
+    }
+}
 
 /**
  * Handler for messages received from the DHT service
@@ -277,18 +556,15 @@ hash_from_uid (uint64_t uid,
  */
 void
 service_message_handler (void *cls,
-                        const struct GNUNET_MessageHeader *msg)
+                         const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_DHT_Handle *handle = cls;
-  struct GNUNET_DHT_Message *dht_msg;
-  struct GNUNET_DHT_StopMessage *stop_msg;
+  struct GNUNET_DHT_RouteResultMessage *dht_msg;
   struct GNUNET_MessageHeader *enc_msg;
   struct GNUNET_DHT_RouteHandle *route_handle;
   uint64_t uid;
   GNUNET_HashCode uid_hash;
   size_t enc_size;
-  /* TODO: find out message type, handle callbacks for different types of messages.
-   * Should be a non unique acknowledgment, or unique result. */
 
   if (msg == NULL)
     {
@@ -297,78 +573,60 @@ service_message_handler (void *cls,
                   "`%s': Received NULL from server, connection down!\n",
                   "DHT API");
 #endif
-      GNUNET_CLIENT_disconnect (handle->client);
+      GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
       handle->client = GNUNET_CLIENT_connect (handle->sched, 
-                                             "dht", 
-                                             handle->cfg);
-      /* FIXME: re-transmit *all* of our GET requests AND re-start
-        receiving responses! */
+                                              "dht",
+                                              handle->cfg);
+      if (handle->current != NULL)
+        {
+          handle->th = NULL;
+          finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
+        }
+#if RETRANSMIT
+      if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
+        {
+          handle->retransmit_stage = DHT_RETRANSMITTING;
+          handle->current = handle->retransmissions->message;
+          process_pending_retransmissions(handle);
+        }
+#endif
       return;
     }
 
   switch (ntohs (msg->type))
     {
-    case GNUNET_MESSAGE_TYPE_DHT:
+    case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
       {
-        dht_msg = (struct GNUNET_DHT_Message *) msg;
+        dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
         uid = GNUNET_ntohll (dht_msg->unique_id);
 #if DEBUG_DHT_API
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                     "`%s': Received response to message (uid %llu)\n",
                     "DHT API", uid);
 #endif
-        if (ntohl (dht_msg->unique))
-          {
-            hash_from_uid (uid, &uid_hash);
-            route_handle =
-              GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
-                                                 &uid_hash);
-            if (route_handle == NULL)   /* We have no recollection of this request */
-              {
-#if DEBUG_DHT_API
-                GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                            "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
-                            "DHT API", uid);
-#endif
-              }
-            else
-              {
-                enc_size =
-                  ntohs (dht_msg->header.size) -
-                  sizeof (struct GNUNET_DHT_Message);
-                GNUNET_assert (enc_size > 0);
-                enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
-                route_handle->iter (route_handle->iter_cls, enc_msg);
-              }
-          }
-        break;
-      }
-    case GNUNET_MESSAGE_TYPE_DHT_STOP:
-      {
-        stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
-        uid = GNUNET_ntohll (stop_msg->unique_id);
-#if DEBUG_DHT_API
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "`%s': Received response to message (uid %llu), current uid %llu\n",
-                    "DHT API", uid, handle->current->unique_id);
-#endif
-        if (handle->current->unique_id == uid)
+
+        hash_from_uid (uid, &uid_hash);
+        route_handle =
+          GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
+                                             &uid_hash);
+        if (route_handle == NULL)   /* We have no recollection of this request */
           {
 #if DEBUG_DHT_API
             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                        "`%s': Have pending confirmation for this message!\n",
+                        "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
                         "DHT API", uid);
 #endif
-            if (handle->current->cont != NULL)
-              GNUNET_SCHEDULER_add_continuation (handle->sched,
-                                                 handle->current->cont,
-                                                 handle->current->cont_cls,
-                                                 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-
-            GNUNET_free (handle->current->msg);
-            GNUNET_free (handle->current);
-            handle->current = NULL;
           }
+        else
+          {
+            enc_size =
+              ntohs (dht_msg->header.size) -
+              sizeof (struct GNUNET_DHT_RouteResultMessage);
+            GNUNET_assert (enc_size > 0);
+            enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
+            route_handle->iter (route_handle->iter_cls, enc_msg);
+          }
+
         break;
       }
     default:
@@ -406,6 +664,7 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
   handle->cfg = cfg;
   handle->sched = sched;
   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
+  handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
   if (handle->client == NULL)
     {
       GNUNET_free (handle);
@@ -437,7 +696,7 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
 #endif
   GNUNET_assert (handle != NULL);
-  if (handle->th != NULL)       /* We have a live transmit request in the Aether */
+  if (handle->th != NULL)       /* We have a live transmit request */
     {
       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
       handle->th = NULL;
@@ -450,44 +709,10 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
       handle->client = NULL;
     }
-  /* Either assert that outstanding_requests is empty */
-  /* FIXME: handle->outstanding_requests not freed! */
-  GNUNET_free (handle);
-}
-
-
-/**
- * Send complete (or failed), schedule next (or don't)
- */
-static void
-finish (struct GNUNET_DHT_Handle *handle, int code)
-{
-  /* TODO: if code is not GNUNET_OK, do something! */
-  struct PendingMessage *pos = handle->current;
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
-#endif
-  GNUNET_assert (pos != NULL);
 
-  if (pos->is_unique)
-    {
-      if (pos->cont != NULL)
-        {
-          if (code == GNUNET_SYSERR)
-            GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
-                                               pos->cont_cls,
-                                               GNUNET_SCHEDULER_REASON_TIMEOUT);
-          else
-            GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
-                                               pos->cont_cls,
-                                               GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-        }
-
-      GNUNET_free (pos->msg);
-      handle->current = NULL;
-      GNUNET_free (pos);
-    }
-  /* Otherwise we need to wait for a response to this message! */
+  GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
+  GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
+  GNUNET_free (handle);
 }
 
 
@@ -495,7 +720,7 @@ finish (struct GNUNET_DHT_Handle *handle, int code)
  * Transmit the next pending message, called by notify_transmit_ready
  */
 static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+transmit_pending_retransmission (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DHT_Handle *handle = cls;
   size_t tsize;
@@ -510,8 +735,7 @@ transmit_pending (void *cls, size_t size, void *buf)
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
 #endif
-      /* FIXME: free associated resources or summat */
-      finish (handle, GNUNET_SYSERR);
+      finish_retransmission (handle, GNUNET_SYSERR);
       return 0;
     }
 
@@ -527,7 +751,7 @@ transmit_pending (void *cls, size_t size, void *buf)
                       "`%s': Sending message size %d\n", "DHT API", tsize);
 #endif
           memcpy (buf, handle->current->msg, tsize);
-          finish (handle, GNUNET_OK);
+          finish_retransmission (handle, GNUNET_OK);
           return tsize;
         }
       else
@@ -540,66 +764,6 @@ transmit_pending (void *cls, size_t size, void *buf)
 }
 
 
-/**
- * Try to (re)connect to the dht service.
- *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
- */
-static int
-try_connect (struct GNUNET_DHT_Handle *handle)
-{
-  if (handle->client != NULL)
-    return GNUNET_OK;
-  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
-  if (handle->client != NULL)
-    return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to the dht service!\n"));
-#endif
-  return GNUNET_NO;
-}
-
-
-/**
- * Try to send messages from list of messages to send
- */
-static void
-process_pending_message (struct GNUNET_DHT_Handle *handle)
-{
-
-  if (handle->current == NULL)
-    return;                     /* action already pending */
-  if (GNUNET_YES != try_connect (handle))
-    {
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
-
-  if (NULL ==
-      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                                         ntohs (handle->
-                                                                current->msg->
-                                                                size),
-                                                         handle->current->
-                                                         timeout, GNUNET_YES,
-                                                         &transmit_pending,
-                                                         handle)))
-    {
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to dht service.\n");
-#endif
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Scheduled sending message of size %d to service\n",
-              "DHT API", ntohs (handle->current->msg->size));
-#endif
-}
-
 /**
  * Iterator called on each result obtained from a generic route
  * operation
@@ -618,13 +782,12 @@ get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
   GNUNET_assert (ntohs (reply->size) >=
                  sizeof (struct GNUNET_DHT_GetResultMessage));
   result = (struct GNUNET_DHT_GetResultMessage *) reply;
-  data_size = ntohs (result->data_size);
-  GNUNET_assert (ntohs (reply->size) ==
-                 sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
+  data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
+
   result_data = (char *) &result[1];    /* Set data pointer to end of message */
 
   get_handle->get_context.iter (get_handle->get_context.iter_cls,
-                                result->expiration, &result->key,
+                                GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
                                 ntohs (result->type), data_size, result_data);
 }
 
@@ -637,36 +800,223 @@ void
 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
-  struct GNUNET_DHT_FindPeerResultMessage *result;
-  size_t data_size;
-  struct GNUNET_MessageHeader *result_data;
+  struct GNUNET_MessageHeader *hello;
 
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Find peer iterator called.\n");
-#endif
   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
-    return;
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Received wrong type of response to a find peer request...\n");
+      return;
+    }
+
 
   GNUNET_assert (ntohs (reply->size) >=
-                 sizeof (struct GNUNET_DHT_FindPeerResultMessage));
-  result = (struct GNUNET_DHT_FindPeerResultMessage *) reply;
-  data_size = ntohs (result->data_size);
-  GNUNET_assert (ntohs (reply->size) ==
-                 sizeof (struct GNUNET_DHT_FindPeerResultMessage) + data_size);
-
-  if (data_size > 0)
-    result_data = (struct GNUNET_MessageHeader *) &result[1];   /* Set data pointer to end of message */
-  else
-    result_data = NULL;
+                 sizeof (struct GNUNET_MessageHeader));
+  hello = (struct GNUNET_MessageHeader *)&reply[1];
 
+  if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
+      return;
+    }
   find_peer_handle->find_peer_context.proc (find_peer_handle->
                                             find_peer_context.proc_cls,
-                                            &result->peer, result_data);
+                                            (struct GNUNET_HELLO_Message *)hello);
+}
+
+/**
+ * Send a message to the DHT telling it to start issuing random GET
+ * requests every 'frequency' milliseconds.
+ *
+ * @param handle handle to the DHT service
+ * @param frequency delay (in milliseconds) between sending malicious messages
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
+  msg->variable = htons(frequency);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to issue a single find
+ * peer request using the peers unique identifier as key.  This
+ * is used to fill the routing table, and is normally controlled
+ * by the DHT itself.  However, for testing and perhaps more
+ * close control over the DHT, this can be explicitly managed.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
+                           GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to start issuing random PUT
+ * requests every 'frequency' milliseconds.
+ *
+ * @param handle handle to the DHT service
+ * @param frequency delay (in milliseconds) between sending malicious messages
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
+  msg->variable = htons(frequency);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to start dropping
+ * all requests received.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
+  msg->variable = htons(0);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
 }
 
+
 /**
- * Perform an asynchronous FIND_PEER operation on the DHT.
+ * Initiate a generic DHT route operation.
  *
  * @param handle handle to the DHT service
  * @param key the key to look up
@@ -700,57 +1050,67 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
 {
   struct GNUNET_DHT_RouteHandle *route_handle;
   struct PendingMessage *pending;
-  struct GNUNET_DHT_Message *message;
-  size_t expects_response;
+  struct GNUNET_DHT_RouteMessage *message;
   uint16_t msize;
   GNUNET_HashCode uid_key;
-  uint64_t uid;
 
-  if (sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return NULL;
+
+  if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
       GNUNET_break (0);
       return NULL;
     }
-  expects_response = GNUNET_YES;
-  if (iter == NULL)
-    expects_response = GNUNET_NO;
-  uid = handle->uid_gen++;
-  if (expects_response)
-    {
-      route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
-      memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
-      route_handle->iter = iter;
-      route_handle->iter_cls = iter_cls;
-      route_handle->dht_handle = handle;
-      route_handle->uid = uid;
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Unique ID is %llu\n", "DHT API", uid);
-#endif
+
+  route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
+  memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
+  route_handle->iter = iter;
+  route_handle->iter_cls = iter_cls;
+  route_handle->dht_handle = handle;
+  route_handle->uid = handle->uid_gen++;
+  if (iter != NULL)
+    {
+      hash_from_uid (route_handle->uid, &uid_key);
       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
                                          &uid_key, route_handle,
                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     }
-  msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
+
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
+#endif
+
+  msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
   message = GNUNET_malloc (msize);
   message->header.size = htons (msize);
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
   message->options = htonl (options);
-  message->desired_replication_level = htonl (options);
-  message->unique = htonl (expects_response);
-  message->unique_id = GNUNET_htonll (uid);
+  message->desired_replication_level = htonl (desired_replication_level);
+  message->unique_id = GNUNET_htonll (route_handle->uid);
   memcpy (&message[1], enc, ntohs (enc->size));
   pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = &message->header;
   pending->timeout = timeout;
+  if (iter == NULL)
+    pending->free_on_send = GNUNET_YES;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->expects_response = expects_response;
-  pending->unique_id = uid;
-  GNUNET_assert (handle->current == NULL);
-  handle->current = pending;
-  process_pending_message (handle);
+  pending->unique_id = route_handle->uid;
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  route_handle->message = message;
   return route_handle;
 }
 
@@ -772,16 +1132,16 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
 struct GNUNET_DHT_GetHandle *
 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
                       struct GNUNET_TIME_Relative timeout,
-                      uint32_t type,
+                      enum GNUNET_BLOCK_Type type,
                       const GNUNET_HashCode * key,
                       GNUNET_DHT_GetIterator iter,
                       void *iter_cls,
                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_GetHandle *get_handle;
-  struct GNUNET_DHT_GetMessage *get_msg;
+  struct GNUNET_DHT_GetMessage get_msg;
 
-  if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
     return NULL;
 
   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
@@ -794,14 +1154,14 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
               GNUNET_h2s (key));
 #endif
 
-  get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
-  get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg->type = htons (type);
+  get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
+  get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
+  get_msg.type = htons (type);
 
   get_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
+    GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout,
                             &get_reply_iterator, get_handle, cont, cont_cls);
+
   return get_handle;
 }
 
@@ -825,27 +1185,45 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
   msize = sizeof (struct GNUNET_DHT_StopMessage);
   message = GNUNET_malloc (msize);
   message->header.size = htons (msize);
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
               route_handle->uid);
 #endif
   message->unique_id = GNUNET_htonll (route_handle->uid);
-  GNUNET_assert (route_handle->dht_handle->current == NULL);
+  memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
   pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = (struct GNUNET_MessageHeader *) message;
-  pending->timeout = DEFAULT_DHT_TIMEOUT;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->unique_id = route_handle->uid;
-  GNUNET_assert (route_handle->dht_handle->current == NULL);
-  route_handle->dht_handle->current = pending;
-  process_pending_message (route_handle->dht_handle);
+  pending->free_on_send = GNUNET_YES;
+  pending->unique_id = 0; /* When finished is called, free pending->msg */
+
+  if (route_handle->dht_handle->current == NULL)
+    {
+      route_handle->dht_handle->current = pending;
+      process_pending_message (route_handle->dht_handle);
+    }
+  else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
+    {
+      route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+      route_handle->dht_handle->retransmission_buffer = pending;
+    }
+  else
+    {
+      GNUNET_free(pending);
+      GNUNET_break(0);
+    }
+
   hash_from_uid (route_handle->uid, &uid_key);
   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
                 (route_handle->dht_handle->outstanding_requests, &uid_key,
                  route_handle) == GNUNET_YES);
+
+  GNUNET_free(route_handle->message);
+  GNUNET_free(route_handle);
 }
 
 
@@ -860,6 +1238,17 @@ void
 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  if ((get_handle->route_handle->dht_handle->current != NULL) &&
+      (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Removing pending get request with key %s, uid %llu\n",
@@ -878,7 +1267,6 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
  * @param timeout timeout for this request to be sent to the
  *        service
  * @param options routing options for this message
- * @param message a message to inject at found peers (may be null)
  * @param key the key to look up
  * @param proc function to call on each result
  * @param proc_cls closure for proc
@@ -891,24 +1279,18 @@ struct GNUNET_DHT_FindPeerHandle *
 GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
                             struct GNUNET_TIME_Relative timeout,
                             enum GNUNET_DHT_RouteOption options,
-                            struct GNUNET_MessageHeader *message,
                             const GNUNET_HashCode * key,
                             GNUNET_DHT_FindPeerProcessor proc,
                             void *proc_cls,
-                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
+                            GNUNET_SCHEDULER_Task cont,
+                            void *cont_cls)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
-  struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
-  size_t msize;
+  struct GNUNET_DHT_FindPeerMessage find_peer_msg;
 
-  if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
     return NULL;
 
-  if (message != NULL)
-    msize = ntohs (message->size);
-  else
-    msize = 0;
-
   find_peer_handle =
     GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
   find_peer_handle->find_peer_context.proc = proc;
@@ -920,20 +1302,10 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
               "FIND PEER", GNUNET_h2s (key));
 #endif
 
-  find_peer_msg =
-    GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + msize);
-  find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
-  find_peer_msg->header.size =
-    htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
-  find_peer_msg->msg_len = msize;
-
-  if (message != NULL)
-    {
-      memcpy (&find_peer_msg[1], message, msize);
-    }
-
+  find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
+  find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
   find_peer_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg->header,
+    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
                             timeout, &find_peer_reply_iterator,
                             find_peer_handle, cont, cont_cls);
   return find_peer_handle;
@@ -950,6 +1322,17 @@ void
 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
+      (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
@@ -983,7 +1366,7 @@ GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
 void
 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 const GNUNET_HashCode * key,
-                uint32_t type,
+                enum GNUNET_BLOCK_Type type,
                 uint32_t size,
                 const char *data,
                 struct GNUNET_TIME_Absolute exp,
@@ -991,12 +1374,17 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_PutMessage *put_msg;
+  struct GNUNET_DHT_RouteHandle *put_route;
   size_t msize;
 
-  if (handle->current != NULL)
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
     {
-      GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
-                                         GNUNET_SCHEDULER_REASON_TIMEOUT);
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
       return;
     }
 
@@ -1012,11 +1400,27 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
   put_msg->header.size = htons (msize);
   put_msg->type = htons (type);
   put_msg->data_size = htons (size);
-  put_msg->expiration = exp;
+  put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
   memcpy (&put_msg[1], data, size);
 
-  GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
-                          NULL, cont, cont_cls);
+  put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL,
+                                      NULL, cont, cont_cls);
+
+  if (put_route == NULL) /* Route start failed! */
+    {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+    }
+  else
+    {
+      GNUNET_free(put_route);
+    }
 
   GNUNET_free (put_msg);
 }
+
+/* end of dht_api.c */