dht api fixes, it works again (for me)
authorNathan S. Evans <evans@in.tum.de>
Mon, 19 Apr 2010 15:16:38 +0000 (15:16 +0000)
committerNathan S. Evans <evans@in.tum.de>
Mon, 19 Apr 2010 15:16:38 +0000 (15:16 +0000)
src/dht/Makefile.am
src/dht/dht.h
src/dht/dht_api.c
src/dht/gnunet-dht-get-peer.c
src/dht/gnunet-service-dht.c
src/dht/test_dht_api.c

index 1a50800b6b1cd4f0fe0b8adb505878d9d57c2a88..1677cf4659cc56e3a3119231bffdc000e8bf884c 100644 (file)
@@ -68,6 +68,7 @@ test_dht_api_SOURCES = \
  test_dht_api.c
 test_dht_api_LDADD = \
  $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/hello/libgnunethello.la \
  $(top_builddir)/src/dht/libgnunetdht.la    
 
 EXTRA_DIST = \
index 2bafc369491562620455012ebbd3ac77a0b688a2..93ac9fa69c4b57ec960d079bcfb307c1c037a8fd 100644 (file)
@@ -34,12 +34,13 @@ typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls,
                                                    * msg);
 
 /**
- * FIXME.
+ * Message which indicates the DHT should cancel outstanding
+ * requests and discard any state.
  */
 struct GNUNET_DHT_StopMessage
 {
   /**
-   * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE
+   * Type: GNUNET_MESSAGE_TYPE_DHT_STOP
    */
   struct GNUNET_MessageHeader header;
 
@@ -57,7 +58,8 @@ struct GNUNET_DHT_StopMessage
 
 
 /**
- * Generic DHT message, wrapper for other message types
+ * Generic DHT message, indicates that a route request
+ * should be issued.
  */
 struct GNUNET_DHT_RouteMessage
 {
@@ -77,7 +79,8 @@ struct GNUNET_DHT_RouteMessage
   GNUNET_HashCode key;
 
   /**
-   * Unique ID identifying this request
+   * Unique ID identifying this request, if 0 then
+   * the client will not expect a response
    */
   uint64_t unique_id GNUNET_PACKED;
 
@@ -86,12 +89,6 @@ struct GNUNET_DHT_RouteMessage
    */
   uint32_t desired_replication_level GNUNET_PACKED;
 
-  /**
-   * Is this message uniquely identified?  If so it will
-   * be fire and forget, if not we will wait for a receipt
-   * from the service.
-   */
-  uint32_t unique GNUNET_PACKED;
 
   /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */
 
index 5675cef50873f2b98506ca78e8f8e1cd8184d5dd..fda836d691ab6df0d31fc1c4b44b1dc599d2b411 100644 (file)
  * @author Christian Grothoff
  * @author Nathan Evans
  *
+ * TODO: retransmission of pending requests maybe happens now, at least
+ *       the code is in place to do so.  Need to add checks when api calls
+ *       happen to check if retransmission is in progress, and if so set
+ *       the single pending message for transmission once the list of
+ *       retries are done.
  */
 
 #include "platform.h"
@@ -67,16 +72,23 @@ struct PendingMessage
   void *cont_cls;
 
   /**
-   * Whether or not to await verification the message
-   * was received by the service
+   * Unique ID for this request
    */
-  size_t is_unique;
+  uint64_t unique_id;
+
+};
 
+struct PendingMessageList
+{
   /**
-   * Unique ID for this request
+   * This is a singly linked list.
    */
-  uint64_t unique_id;
+  struct PendingMessageList *next;
 
+  /**
+   * The pending message.
+   */
+  struct PendingMessage *message;
 };
 
 struct GNUNET_DHT_GetContext
@@ -108,8 +120,7 @@ struct GNUNET_DHT_FindPeerContext
 };
 
 /**
- * Handle to control a unique operation (one that is
- * expected to return results)
+ * Handle to a route request
  */
 struct GNUNET_DHT_RouteHandle
 {
@@ -138,37 +149,16 @@ struct GNUNET_DHT_RouteHandle
    * Main handle to this DHT api
    */
   struct GNUNET_DHT_Handle *dht_handle;
-};
 
-/**
- * Handle for a non unique request, holds callback
- * which needs to be called before we allow other
- * messages to be processed and sent to the DHT service
- */
-struct GNUNET_DHT_NonUniqueHandle
-{
   /**
-   * Key that this get request is for
+   * The actual message sent for this request,
+   * used for retransmitting requests on service
+   * failure/reconnect.  Freed on route_stop.
    */
-  GNUNET_HashCode key;
-
-  /**
-   * Type of data get request was for
-   */
-  uint32_t type;
-
-  /**
-   * Continuation to call on service
-   * confirmation of message receipt.
-   */
-  GNUNET_SCHEDULER_Task cont;
-
-  /**
-   * Send continuation cls
-   */
-  void *cont_cls;
+  struct GNUNET_DHT_RouteMessage *message;
 };
 
+
 /**
  * Handle to control a get operation.
  */
@@ -185,6 +175,7 @@ struct GNUNET_DHT_GetHandle
   struct GNUNET_DHT_GetContext get_context;
 };
 
+
 /**
  * Handle to control a find peer operation.
  */
@@ -202,6 +193,27 @@ struct GNUNET_DHT_FindPeerHandle
 };
 
 
+enum DHT_Retransmit_Stage
+{
+  /**
+   * The API is not retransmitting anything at this time.
+   */
+  DHT_NOT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and nothing has been single
+   * queued for sending.
+   */
+  DHT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and a single message has been
+   * queued for transmission once finished.
+   */
+  DHT_RETRANSMITTING_MESSAGE_QUEUED
+};
+
+
 /**
  * Connection to the DHT service.
  */
@@ -242,16 +254,27 @@ struct GNUNET_DHT_Handle
   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
 
   /**
-   * Non unique handle.  If set don't schedule another non
-   * unique request.
+   * Generator for unique ids.
    */
-  struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
+  uint64_t uid_gen;
 
   /**
-   * Generator for unique ids.
+   * Are we currently retransmitting requests?  If so queue a _single_
+   * new request when received.
    */
-  uint64_t uid_gen;
+  enum DHT_Retransmit_Stage retransmit_stage;
 
+  /**
+   * Linked list of retranmissions, to be used in the event
+   * of a dht service disconnect/reconnect.
+   */
+  struct PendingMessageList *retransmissions;
+
+  /**
+   * A single pending message allowed to be scheduled
+   * during retransmission phase.
+   */
+  struct PendingMessage *retransmission_buffer;
 };
 
 
@@ -269,6 +292,253 @@ hash_from_uid (uint64_t uid,
   *((uint64_t*)hash) = uid;
 }
 
+/**
+ * Iterator callback to retransmit each outstanding request
+ * because the connection to the DHT service went down (and
+ * came back).
+ *
+ *
+ */
+static int retransmit_iterator (void *cls,
+                                const GNUNET_HashCode * key,
+                                void *value)
+{
+  struct GNUNET_DHT_RouteHandle *route_handle = value;
+  struct PendingMessageList *pending_message_list;
+
+  pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
+  pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
+  pending_message_list->message->msg = &route_handle->message->header;
+  pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
+  pending_message_list->message->cont = NULL;
+  pending_message_list->message->cont_cls = NULL;
+  pending_message_list->message->unique_id = route_handle->uid;
+  /* Add the new pending message to the front of the retransmission list */
+  pending_message_list->next = route_handle->dht_handle->retransmissions;
+
+  return GNUNET_OK;
+}
+
+/**
+ * Try to (re)connect to the dht service.
+ *
+ * @return GNUNET_YES on success, GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_DHT_Handle *handle)
+{
+  if (handle->client != NULL)
+    return GNUNET_OK;
+  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
+  if (handle->client != NULL)
+    return GNUNET_YES;
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              _("Failed to connect to the dht service!\n"));
+#endif
+  return GNUNET_NO;
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  GNUNET_HashCode uid_hash;
+  hash_from_uid (pos->unique_id, &uid_hash);
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos != NULL);
+
+  if (pos->cont != NULL)
+    {
+      if (code == GNUNET_SYSERR)
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
+      else
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    }
+
+  if (pos->unique_id != 0)
+    GNUNET_free(pos->msg);
+  GNUNET_free (pos);
+  handle->current = NULL;
+}
+
+/**
+ * Transmit the next pending message, called by notify_transmit_ready
+ */
+static size_t
+transmit_pending (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_DHT_Handle *handle = cls;
+  size_t tsize;
+
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': In transmit_pending\n", "DHT API");
+#endif
+  if (buf == NULL)
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': In transmit_pending buf is NULL\n", "DHT API");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return 0;
+    }
+
+  handle->th = NULL;
+
+  if (handle->current != NULL)
+    {
+      tsize = ntohs (handle->current->msg->size);
+      if (size >= tsize)
+        {
+#if DEBUG_DHT_API
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Sending message size %d\n", "DHT API", tsize);
+#endif
+          memcpy (buf, handle->current->msg, tsize);
+          finish (handle, GNUNET_OK);
+          return tsize;
+        }
+      else
+        {
+          return 0;
+        }
+    }
+  /* Have no pending request */
+  return 0;
+}
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_message (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ * Forward declaration.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
+
+/* Forward declaration */
+static size_t
+transmit_pending_retransmission (void *cls, size_t size, void *buf);
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending_retransmission,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  struct PendingMessageList *pending_list;
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos == handle->retransmissions->message);
+  pending_list = handle->retransmissions;
+  handle->retransmissions = handle->retransmissions->next;
+  GNUNET_free (pending_list);
+
+  if (handle->retransmissions == NULL)
+    {
+      handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
+    }
+
+  if (handle->retransmissions != NULL)
+    {
+      handle->current = handle->retransmissions->message;
+      process_pending_retransmissions(handle);
+    }
+  else if (handle->retransmission_buffer != NULL)
+    {
+      handle->current = handle->retransmission_buffer;
+      process_pending_message(handle);
+    }
+}
 
 /**
  * Handler for messages received from the DHT service
@@ -286,8 +556,6 @@ service_message_handler (void *cls,
   uint64_t uid;
   GNUNET_HashCode uid_hash;
   size_t enc_size;
-  /* TODO: find out message type, handle callbacks for different types of messages.
-   * Should be a non unique acknowledgment, or unique result. */
 
   if (msg == NULL)
     {
@@ -300,8 +568,11 @@ service_message_handler (void *cls,
       handle->client = GNUNET_CLIENT_connect (handle->sched, 
                                              "dht", 
                                              handle->cfg);
-      /* FIXME: re-transmit *all* of our GET requests AND re-start
-        receiving responses! */
+
+      handle->retransmit_stage = DHT_RETRANSMITTING;
+      GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle);
+      handle->current = handle->retransmissions->message;
+      process_pending_retransmissions(handle);
       return;
     }
 
@@ -341,37 +612,6 @@ service_message_handler (void *cls,
 
         break;
       }
-      /* FIXME: we don't want these anymore, call continuation once message is sent. */
-      /*
-    case GNUNET_MESSAGE_TYPE_DHT_STOP:
-      {
-        stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
-        uid = GNUNET_ntohll (stop_msg->unique_id);
-#if DEBUG_DHT_API
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "`%s': Received response to message (uid %llu), current uid %llu\n",
-                    "DHT API", uid, handle->current->unique_id);
-#endif
-        if (handle->current->unique_id == uid)
-          {
-#if DEBUG_DHT_API
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                        "`%s': Have pending confirmation for this message!\n",
-                        "DHT API", uid);
-#endif
-            if (handle->current->cont != NULL)
-              GNUNET_SCHEDULER_add_continuation (handle->sched,
-                                                 handle->current->cont,
-                                                 handle->current->cont_cls,
-                                                 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-
-            GNUNET_free (handle->current->msg);
-            GNUNET_free (handle->current);
-            handle->current = NULL;
-          }
-        break;
-      }
-      */
     default:
       {
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -407,6 +647,7 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
   handle->cfg = cfg;
   handle->sched = sched;
   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
+  handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
   if (handle->client == NULL)
     {
       GNUNET_free (handle);
@@ -451,40 +692,10 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
       handle->client = NULL;
     }
-  /* Either assert that outstanding_requests is empty */
-  /* FIXME: handle->outstanding_requests not freed! */
-  GNUNET_free (handle);
-}
-
-
-/**
- * Send complete (or failed), call continuation if we have one.
- */
-static void
-finish (struct GNUNET_DHT_Handle *handle, int code)
-{
-  struct PendingMessage *pos = handle->current;
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
-#endif
-  GNUNET_assert (pos != NULL);
-
-
-  if (pos->cont != NULL)
-    {
-      if (code == GNUNET_SYSERR)
-        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
-                                           pos->cont_cls,
-                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
-      else
-        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
-                                           pos->cont_cls,
-                                           GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-    }
 
-  GNUNET_free (pos->msg);
-  GNUNET_free (pos);
-  handle->current = NULL;
+  GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
+  GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
+  GNUNET_free (handle);
 }
 
 
@@ -492,7 +703,7 @@ finish (struct GNUNET_DHT_Handle *handle, int code)
  * Transmit the next pending message, called by notify_transmit_ready
  */
 static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+transmit_pending_retransmission (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DHT_Handle *handle = cls;
   size_t tsize;
@@ -507,8 +718,7 @@ transmit_pending (void *cls, size_t size, void *buf)
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
 #endif
-      /* FIXME: free associated resources or summat */
-      finish (handle, GNUNET_SYSERR);
+      finish_retransmission (handle, GNUNET_SYSERR);
       return 0;
     }
 
@@ -524,7 +734,7 @@ transmit_pending (void *cls, size_t size, void *buf)
                       "`%s': Sending message size %d\n", "DHT API", tsize);
 #endif
           memcpy (buf, handle->current->msg, tsize);
-          finish (handle, GNUNET_OK);
+          finish_retransmission (handle, GNUNET_OK);
           return tsize;
         }
       else
@@ -537,66 +747,6 @@ transmit_pending (void *cls, size_t size, void *buf)
 }
 
 
-/**
- * Try to (re)connect to the dht service.
- *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
- */
-static int
-try_connect (struct GNUNET_DHT_Handle *handle)
-{
-  if (handle->client != NULL)
-    return GNUNET_OK;
-  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
-  if (handle->client != NULL)
-    return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to the dht service!\n"));
-#endif
-  return GNUNET_NO;
-}
-
-
-/**
- * Try to send messages from list of messages to send
- */
-static void
-process_pending_message (struct GNUNET_DHT_Handle *handle)
-{
-
-  if (handle->current == NULL)
-    return;                     /* action already pending */
-  if (GNUNET_YES != try_connect (handle))
-    {
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
-
-  if (NULL ==
-      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                                         ntohs (handle->
-                                                                current->msg->
-                                                                size),
-                                                         handle->current->
-                                                         timeout, GNUNET_YES,
-                                                         &transmit_pending,
-                                                         handle)))
-    {
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to dht service.\n");
-#endif
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Scheduled sending message of size %d to service\n",
-              "DHT API", ntohs (handle->current->msg->size));
-#endif
-}
-
 /**
  * Iterator called on each result obtained from a generic route
  * operation
@@ -633,20 +783,31 @@ void
 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
+  struct GNUNET_MessageHeader *hello;
+  size_t hello_size;
 
-#if DEBUG_DHT_API
+  if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
+    {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Find peer iterator called.\n");
-#endif
-  if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_HELLO)
-    return;
+                  "Received wrong type of response to a find peer request...\n");
+      return;
+    }
+
 
   GNUNET_assert (ntohs (reply->size) >=
                  sizeof (struct GNUNET_MessageHeader));
+  hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader);
+  hello = (struct GNUNET_MessageHeader *)&reply[1];
 
+  if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
+      return;
+    }
   find_peer_handle->find_peer_context.proc (find_peer_handle->
                                             find_peer_context.proc_cls,
-                                            (struct GNUNET_HELLO_Message *)reply);
+                                            (struct GNUNET_HELLO_Message *)hello);
 }
 
 /**
@@ -685,36 +846,38 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
   struct GNUNET_DHT_RouteHandle *route_handle;
   struct PendingMessage *pending;
   struct GNUNET_DHT_RouteMessage *message;
-  size_t expects_response;
   uint16_t msize;
   GNUNET_HashCode uid_key;
-  uint64_t uid;
 
   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
       GNUNET_break (0);
       return NULL;
     }
-  expects_response = GNUNET_YES;
-  if (iter == NULL)
-    expects_response = GNUNET_NO;
-  uid = handle->uid_gen++;
-  if (expects_response)
+
+  route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
+  memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
+  route_handle->iter = iter;
+  route_handle->iter_cls = iter_cls;
+  route_handle->dht_handle = handle;
+  if (iter != NULL)
     {
-      route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
-      memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
-      route_handle->iter = iter;
-      route_handle->iter_cls = iter_cls;
-      route_handle->dht_handle = handle;
-      route_handle->uid = uid;
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Unique ID is %llu\n", "DHT API", uid);
-#endif
+      route_handle->uid = handle->uid_gen++;
+      hash_from_uid (route_handle->uid, &uid_key);
       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
                                          &uid_key, route_handle,
                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     }
+  else
+    {
+      route_handle->uid = 0;
+    }
+
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
+#endif
+
   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
   message = GNUNET_malloc (msize);
   message->header.size = htons (msize);
@@ -722,18 +885,25 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
   message->options = htonl (options);
   message->desired_replication_level = htonl (options);
-  message->unique = htonl (expects_response);
-  message->unique_id = GNUNET_htonll (uid);
+  message->unique_id = GNUNET_htonll (route_handle->uid);
   memcpy (&message[1], enc, ntohs (enc->size));
   pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = &message->header;
   pending->timeout = timeout;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->unique_id = uid;
-  GNUNET_assert (handle->current == NULL);
-  handle->current = pending;
-  process_pending_message (handle);
+  pending->unique_id = route_handle->uid;
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING))
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+  route_handle->message = message;
   return route_handle;
 }
 
@@ -762,9 +932,9 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_GetHandle *get_handle;
-  struct GNUNET_DHT_GetMessage *get_msg;
+  struct GNUNET_DHT_GetMessage get_msg;
 
-  if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
     return NULL;
 
   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
@@ -777,14 +947,14 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
               GNUNET_h2s (key));
 #endif
 
-  get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
-  get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg->type = htons (type);
+  get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
+  get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
+  get_msg.type = htons (type);
 
   get_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
+    GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
                             &get_reply_iterator, get_handle, cont, cont_cls);
+
   return get_handle;
 }
 
@@ -821,10 +991,23 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
   pending->timeout = DEFAULT_DHT_TIMEOUT;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->unique_id = route_handle->uid;
-  GNUNET_assert (route_handle->dht_handle->current == NULL);
-  route_handle->dht_handle->current = pending;
-  process_pending_message (route_handle->dht_handle);
+  pending->unique_id = 0; /* When finished is called, free pending->msg */
+
+  if (route_handle->dht_handle->current == NULL)
+    {
+      route_handle->dht_handle->current = pending;
+      process_pending_message (route_handle->dht_handle);
+    }
+  else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING))
+    {
+      route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+      route_handle->dht_handle->retransmission_buffer = pending;
+    }
+  else
+    {
+      GNUNET_break(0);
+    }
+
   hash_from_uid (route_handle->uid, &uid_key);
   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
                 (route_handle->dht_handle->outstanding_requests, &uid_key,
@@ -843,6 +1026,17 @@ void
 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  if ((get_handle->route_handle->dht_handle->current != NULL) &&
+      (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Removing pending get request with key %s, uid %llu\n",
@@ -880,9 +1074,9 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
                             void *cont_cls)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
-  struct GNUNET_MessageHeader *find_peer_msg;
+  struct GNUNET_MessageHeader find_peer_msg;
 
-  if (handle->current != NULL)  /* Can't send right now, we have a pending message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
     return NULL;
 
   find_peer_handle =
@@ -896,11 +1090,10 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
               "FIND PEER", GNUNET_h2s (key));
 #endif
 
-  find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
-  find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader));
-  find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+  find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
+  find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
   find_peer_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, options, find_peer_msg,
+    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
                             timeout, &find_peer_reply_iterator,
                             find_peer_handle, cont, cont_cls);
   return find_peer_handle;
@@ -917,6 +1110,17 @@ void
 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
+      (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
@@ -958,12 +1162,16 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_PutMessage *put_msg;
+  struct GNUNET_DHT_RouteHandle *put_route;
   size_t msize;
 
-  if (handle->current != NULL)
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
     {
-      GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
-                                         GNUNET_SCHEDULER_REASON_TIMEOUT);
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
       return;
     }
 
@@ -982,8 +1190,19 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
   memcpy (&put_msg[1], data, size);
 
-  GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
-                          NULL, cont, cont_cls);
+  put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
+                                      NULL, cont, cont_cls);
+
+  if (put_route == NULL) /* Route start failed! */
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+    }
+  else
+    GNUNET_free(put_route);
 
   GNUNET_free (put_msg);
 }
index 3b7f7a4e8e4b5000461a389c6bbfafc10a876455..e3f6116cd3177c7925c3c348748b0c8321003e39 100644 (file)
@@ -101,17 +101,18 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * operation
  *
  * @param cls closure (NULL)
- * @param peer the peer we learned about
- * @param reply the response message, should be a HELLO
+ * @param hello the response message, a HELLO
  */
 void find_peer_processor (void *cls,
-                          const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *reply)
+                          const struct GNUNET_HELLO_Message *hello)
 {
-  result_count++;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(peer), result_count);
-
+  struct GNUNET_PeerIdentity peer;
+  if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer))
+    {
+      result_count++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(&peer), result_count);
+    }
 }
 
 
@@ -191,8 +192,14 @@ run (void *cls,
   if (verbose)
     fprintf (stderr, "Issuing FIND PEER request for %s!\n", query_key);
 
-  find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, timeout, 0, NULL, &key,
-                        &find_peer_processor, NULL, &message_sent_cont, NULL);
+  find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle,
+                                                 timeout,
+                                                 0,
+                                                 &key,
+                                                 &find_peer_processor,
+                                                 NULL,
+                                                 &message_sent_cont,
+                                                 NULL);
 
 }
 
index 8907c364218b15f5959d52e4f0ddfdefe3646186..27c431bd5a5fd9509b1ab799c190ce3677450c31 100644 (file)
@@ -166,7 +166,7 @@ struct DHT_MessageContext
   /**
    * The key this request was about
    */
-  GNUNET_HashCode *key;
+  const GNUNET_HashCode *key;
 
   /**
    * The unique identifier of this request
@@ -240,9 +240,6 @@ send_generic_reply (void *cls, size_t size, void *buf)
   if (buf == NULL)             
     {
       /* client disconnected */
-#if DEBUG_DHT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
-#endif
       return 0;
     }
   off = 0;
@@ -256,10 +253,6 @@ send_generic_reply (void *cls, size_t size, void *buf)
       GNUNET_free (reply);
       off += msize;
     }
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "`%s': Copying reply to buffer, REALLY SENT\n", "DHT");
-#endif
   process_pending_messages (client);
   return off;
 }
@@ -284,7 +277,7 @@ add_pending_message (struct ClientList *client,
 
 
 /**
- * Called when a reply needs to be sent to a client, either as
+ * Called when a reply needs to be sent to a client, as
  * a result it found to a GET or FIND PEER request.
  *
  * @param client the client to send the reply to
@@ -296,7 +289,7 @@ send_reply_to_client (struct ClientList *client,
                       const struct GNUNET_MessageHeader *message,
                       unsigned long long uid)
 {
-  struct GNUNET_DHT_Message *reply;
+  struct GNUNET_DHT_RouteResultMessage *reply;
   struct PendingMessage *pending_message;
   uint16_t msize;
   size_t tsize;
@@ -305,21 +298,21 @@ send_reply_to_client (struct ClientList *client,
               "`%s': Sending reply to client.\n", "DHT");
 #endif
   msize = ntohs (message->size);
-  tsize = sizeof (struct GNUNET_DHT_Message) + msize;
+  tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
-      GNUNET_BREAK_op (0);
+      GNUNET_break_op (0);
       return;
     }
-  reply = GNUNET_malloc (tsize);
+
+  pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
+  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+  reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT);
   reply->header.size = htons (tsize);
-  if (uid != 0)
-    reply->unique = htonl (GNUNET_YES); // ????
   reply->unique_id = GNUNET_htonll (uid);
   memcpy (&reply[1], message, msize);
-  pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); // inline
-  pending_message->msg = &reply->header;
+
   add_pending_message (client, pending_message);
 }
 
@@ -354,7 +347,6 @@ datacache_get_iterator (void *cls,
   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
   get_result->header.size =
     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
-  get_result->data_size = htons (size);
   get_result->expiration = exp;
   memcpy (&get_result->key, key, sizeof (GNUNET_HashCode));
   get_result->type = htons (type);
@@ -383,12 +375,13 @@ handle_dht_get (void *cls,
   unsigned int results;
   struct DatacacheGetContext datacache_get_context;
 
-  if (ntohs (msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
+  get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
+  if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
     {
       GNUNET_break (0);
       return;
     }
-  get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
+
   get_type = ntohs (get_msg->type);
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -422,7 +415,7 @@ handle_dht_find_peer (void *cls,
                      const struct GNUNET_MessageHeader *find_msg,
                       struct DHT_MessageContext *message_context)
 {
-  struct GNUNET_DHT_FindPeerResultMessage *find_peer_result;
+  struct GNUNET_MessageHeader *find_peer_result;
   size_t hello_size;
   size_t tsize;
 
@@ -430,8 +423,8 @@ handle_dht_find_peer (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
               "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
-              ntohs (find_msg->header.size),
-              sizeof (struct GNUNET_DHT_FindPeerMessage));
+              ntohs (find_msg->size),
+              sizeof (struct GNUNET_MessageHeader));
 #endif
   if (my_hello == NULL)
   {
@@ -444,13 +437,18 @@ handle_dht_find_peer (void *cls,
   }
   /* Simplistic find_peer functionality, always return our hello */
   hello_size = ntohs(my_hello->size);
-  tsize = hello_size + sizeof (struct GNUNET_DHT_FindPeerResultMessage);
+  tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
   // check tsize < MAX
   find_peer_result = GNUNET_malloc (tsize);
-  find_peer_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
-  find_peer_result->header.size = htons (tsize);
-  memcpy (&find_peer_result[1], &my_hello, hello_size);
-  send_reply_to_client(message_context->client, &find_peer_result->header, message_context->unique_id);
+  find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
+  find_peer_result->size = htons (tsize);
+  memcpy (&find_peer_result[1], my_hello, hello_size);
+#if DEBUG_DHT_HELLO
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "`%s': Sending hello size %d to client.\n",
+                "DHT", hello_size);
+#endif
+  send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
   GNUNET_free(find_peer_result);
 }
 
@@ -471,16 +469,12 @@ handle_dht_put (void *cls,
   size_t put_type;
   size_t data_size;
 
-  GNUNET_assert (ntohs (msg->header.size) >=
+  GNUNET_assert (ntohs (msg->size) >=
                  sizeof (struct GNUNET_DHT_PutMessage));
   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
-  put_type = ntohl (put_msg->type);
+  put_type = ntohs (put_msg->type);
   data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
 #if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': %s msg total size is %d, data size %d, struct size %d\n",
-              "DHT", "PUT", ntohs (put_msg->header.size), data_size,
-              sizeof (struct GNUNET_DHT_PutMessage));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Received `%s' request from client, message type %d, key %s\n",
               "DHT", "PUT", put_type, GNUNET_h2s (message_context->key));
@@ -488,7 +482,11 @@ handle_dht_put (void *cls,
   if (datacache != NULL)
     GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
                           (char *) &put_msg[1], put_type,
-                          put_msg->expiration);
+                          GNUNET_TIME_absolute_ntoh(put_msg->expiration));
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "`%s': %s request received locally, but have no datacache!\n",
+                "DHT", "PUT");
 }
 
 
@@ -519,41 +517,6 @@ find_active_client (struct GNUNET_SERVER_Client *client)
   return ret;
 }
 
-/**
- * Construct a message receipt confirmation for a particular uid.
- * Receipt confirmations are used for any requests that don't expect
- * a reply otherwise (i.e. put requests, stop requests).
- *
- * @param client the handle for the client
- * @param uid the unique identifier of this message
- */
-static void
-send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client,
-                                  uint64_t uid)
-{
-  struct GNUNET_DHT_StopMessage *confirm_message;
-  struct ClientList *active_client;
-  struct PendingMessage *pending_message;
-
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Sending receipt confirmation for uid %llu\n", "DHT",
-              uid);
-#endif
-  confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage));
-  confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
-  confirm_message->header.size =
-    htons (sizeof (struct GNUNET_DHT_StopMessage));
-  confirm_message->unique_id = GNUNET_htonll (uid);
-
-  active_client = find_active_client (client);
-  pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
-  pending_message->msg = &confirm_message->header;
-
-  add_pending_message (active_client, pending_message);
-
-}
-
 /**
  * Handler for any generic DHT messages, calls the appropriate handler
  * depending on message type, sends confirmation if responses aren't otherwise
@@ -567,9 +530,10 @@ static void
 handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
                           const struct GNUNET_MessageHeader *message)
 {
-  const struct GNUNET_DHT_Message *dht_msg = (const struct GNUNET_DHT_Message *) message;
+  const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
   const struct GNUNET_MessageHeader *enc_msg;
   struct DHT_MessageContext *message_context;
+  int handle_locally;
   size_t enc_type;
 
   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
@@ -590,28 +554,37 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
   message_context->replication = ntohl (dht_msg->desired_replication_level);
   message_context->msg_options = ntohl (dht_msg->options);
 
+  /* TODO: Steps to be added by students */
   /* FIXME: Implement *remote* DHT operations here (forward request) */
+  /* Implement generic route function and call here. */
   /* FIXME: *IF* handling should be local, then do this: */
-  switch (enc_type)
+  /* 1. find if this peer is closest based on whatever metric the DHT uses
+   * 2. if this peer is closest _OR_ the message options indicate it should
+   *    be processed everywhere _AND_ we want it processed everywhere, then
+   *    handle it locally.
+   */
+  handle_locally = GNUNET_YES;
+  if (handle_locally == GNUNET_YES)
     {
-    case GNUNET_MESSAGE_TYPE_DHT_GET:
-      handle_dht_get (cls, enc_msg,
-                      message_context);
-      break;
-    case GNUNET_MESSAGE_TYPE_DHT_PUT:
-      handle_dht_put (cls, enc_msg,
-                      message_context);
-      send_client_receipt_confirmation (client,
-                                        GNUNET_ntohll (dht_msg->unique_id));
-      break;
-    case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
-      handle_dht_find_peer (cls,
-                            enc_msg,
-                            message_context);
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "`%s': Message type (%d) not handled\n", "DHT", enc_type);
+      switch (enc_type)
+        {
+        case GNUNET_MESSAGE_TYPE_DHT_GET:
+          handle_dht_get (cls, enc_msg,
+                          message_context);
+          break;
+        case GNUNET_MESSAGE_TYPE_DHT_PUT:
+          handle_dht_put (cls, enc_msg,
+                          message_context);
+          break;
+        case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
+          handle_dht_find_peer (cls,
+                                enc_msg,
+                                message_context);
+          break;
+        default:
+          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                      "`%s': Message type (%d) not handled\n", "DHT", enc_type);
+        }
     }
   GNUNET_free (message_context);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -620,14 +593,14 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
 
 /**
  * Handler for any generic DHT stop messages, calls the appropriate handler
- * depending on message type, sends confirmation by default (stop messages
- * do not otherwise expect replies)
+ * depending on message type (if processed locally)
  *
  * @param cls closure for the service
  * @param client the client we received this message from
  * @param message the actual message received
  *
- * TODO: add demultiplexing for stop message types.
+ * TODO: once message are remembered by unique id, add code to
+ *       forget them here
  */
 static void
 handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
@@ -635,13 +608,17 @@ handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
 {
   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
     (const struct GNUNET_DHT_StopMessage *) message;
-
+  uint64_t uid;
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Received `%s' request from client, uid %llu\n", "DHT",
               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
 #endif
-  /* TODO: actually stop... */
+
+  uid = GNUNET_ntohll(dht_stop_msg->unique_id);
+  /* TODO: actually stop... free associated resources for the request
+   * lookup request by uid and remove state. */
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -767,14 +744,14 @@ core_init (void *cls,
 
 
 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
-  {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
+  {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0},
   {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
   {NULL, NULL, 0, 0}
 };
 
 
 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-  {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE_REQUEST, 0},
+  {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0},
   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0},
   {NULL, 0, 0}
 };
@@ -803,7 +780,6 @@ run (void *cls,
                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                  NULL,  /* FIXME: anything we want to pass around? */
                                  &core_init,    /* Call core_init once connected */
-                                 NULL,  /* Don't care about pre-connects */
                                  NULL,  /* Don't care about connects */
                                  NULL,  /* Don't care about disconnects */
                                  NULL,  /* Don't want notified about all incoming messages */
index f99e842697c01cbc945cbf96f9a9b607df73144a..10db5f45a6bfaaf16319e7fb931adefc36066095 100644 (file)
@@ -32,6 +32,7 @@
 #include "gnunet_program_lib.h"
 #include "gnunet_scheduler_lib.h"
 #include "gnunet_dht_service.h"
+#include "gnunet_hello_lib.h"
 
 #define VERBOSE GNUNET_NO
 
@@ -90,6 +91,8 @@ struct PeerContext
 
 static struct PeerContext p1;
 
+struct RetryContext retry_context;
+
 static struct GNUNET_SCHEDULER_Handle *sched;
 
 static int ok;
@@ -146,7 +149,10 @@ end_badly ()
 #if VERBOSE
   fprintf (stderr, "Ending on an unhappy note.\n");
 #endif
-
+  if (retry_context.peer_ctx->find_peer_handle != NULL)
+    GNUNET_DHT_find_peer_stop(retry_context.peer_ctx->find_peer_handle, NULL, NULL);
+  if (retry_context.retry_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel(sched, retry_context.retry_task);
   GNUNET_DHT_disconnect (p1.dht_handle);
 
   ok = 1;
@@ -186,21 +192,41 @@ test_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * @param reply response
  */
 void test_find_peer_processor (void *cls,
-                          const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *reply)
+                               const struct GNUNET_HELLO_Message *hello)
 {
   struct RetryContext *retry_ctx = cls;
+  struct GNUNET_PeerIdentity peer;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "test_find_peer_processor called (peer `%s'), stopping find peer request!\n", GNUNET_i2s(peer));
+  if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "test_find_peer_processor called (peer `%s'), stopping find peer request!\n", GNUNET_i2s(&peer));
 
-  if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task);
+      if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK)
+        {
+          GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task);
+          retry_ctx->retry_task = GNUNET_SCHEDULER_NO_TASK;
+        }
+
+      GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1,
+                                         GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    }
+  else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "received find peer request, but hello_get_id failed!\n");
+    }
 
-  GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1,
-                                     GNUNET_SCHEDULER_REASON_PREREQ_DONE);
 }
 
+/**
+ * Retry the find_peer task on timeout. (Forward declaration)
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now?)
+ */
+void
+retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 /**
  * Retry the find_peer task on timeout.
@@ -219,9 +245,9 @@ retry_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "test_find_peer timed out, retrying!\n");
-
+      retry_ctx->next_timeout = GNUNET_TIME_relative_multiply(retry_ctx->next_timeout, 2);
       retry_ctx->peer_ctx->find_peer_handle =
-          GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, retry_ctx->next_timeout, 0, NULL, &hash,
+          GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, retry_ctx->next_timeout, 0, &hash,
                                       &test_find_peer_processor, retry_ctx, NULL, NULL);
     }
   else
@@ -235,14 +261,14 @@ retry_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if (retry_ctx->peer_ctx->find_peer_handle == NULL)
     GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1);
   else
-    retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer, retry_ctx);
+    retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx);
 }
 
 /**
  * Retry the find_peer task on timeout.
  *
  * @param cls closure
- * @param tc context information (why was this task triggered now)
+ * @param tc context information (why was this task triggered now?)
  */
 void
 retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -270,24 +296,22 @@ test_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct PeerContext *peer = cls;
   GNUNET_HashCode hash;
   memset (&hash, 42, sizeof (GNUNET_HashCode));
-  struct RetryContext *retry_ctx;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n");
   GNUNET_assert (peer->dht_handle != NULL);
 
-  retry_ctx = GNUNET_malloc(sizeof(struct RetryContext));
-  retry_ctx->real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT);
-  retry_ctx->next_timeout = BASE_TIMEOUT;
-  retry_ctx->peer_ctx = peer;
+  retry_context.real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT);
+  retry_context.next_timeout = BASE_TIMEOUT;
+  retry_context.peer_ctx = peer;
 
   peer->find_peer_handle =
-    GNUNET_DHT_find_peer_start (peer->dht_handle, retry_ctx->next_timeout, 0, NULL, &hash,
-                                &test_find_peer_processor, retry_ctx, NULL, NULL);
+    GNUNET_DHT_find_peer_start (peer->dht_handle, retry_context.next_timeout, 0, &hash,
+                                &test_find_peer_processor, &retry_context, NULL, NULL);
 
   if (peer->find_peer_handle == NULL)
     GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1);
   else
-    retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx);
+    retry_context.retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_context.next_timeout, &retry_find_peer_stop, &retry_context);
 }
 
 /**