move bit distance function into util
[oweals/gnunet.git] / src / dht / dht_api.c
index c4229a3444726bbe34b5c82e92fee3caf2547c5a..15faba6c97dec4631e3f80a53d8ca89538b94fbe 100644 (file)
@@ -4,7 +4,7 @@
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
@@ -74,6 +74,13 @@ struct PendingMessage
    */
   uint64_t unique_id;
 
+  /**
+   * Free the saved message once sent, set
+   * to GNUNET_YES for messages that don't
+   * receive responses!
+   */
+  int free_on_send;
+
 };
 
 struct PendingMessageList
@@ -290,6 +297,7 @@ hash_from_uid (uint64_t uid,
   *((uint64_t*)hash) = uid;
 }
 
+#if RETRANSMIT
 /**
  * Iterator callback to retransmit each outstanding request
  * because the connection to the DHT service went down (and
@@ -317,6 +325,7 @@ static int retransmit_iterator (void *cls,
 
   return GNUNET_OK;
 }
+#endif
 
 /**
  * Try to (re)connect to the dht service.
@@ -346,12 +355,11 @@ finish (struct GNUNET_DHT_Handle *handle, int code)
 {
   struct PendingMessage *pos = handle->current;
   GNUNET_HashCode uid_hash;
-  hash_from_uid (pos->unique_id, &uid_hash);
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
 #endif
   GNUNET_assert (pos != NULL);
-
+  hash_from_uid (pos->unique_id, &uid_hash);
   if (pos->cont != NULL)
     {
       if (code == GNUNET_SYSERR)
@@ -364,7 +372,8 @@ finish (struct GNUNET_DHT_Handle *handle, int code)
                                            GNUNET_SCHEDULER_REASON_PREREQ_DONE);
     }
 
-  if (pos->unique_id == 0)
+  GNUNET_assert(handle->th == NULL);
+  if (pos->free_on_send == GNUNET_YES)
     GNUNET_free(pos->msg);
   GNUNET_free (pos);
   handle->current = NULL;
@@ -383,6 +392,8 @@ transmit_pending (void *cls, size_t size, void *buf)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': In transmit_pending\n", "DHT API");
 #endif
+  handle->th = NULL;
+
   if (buf == NULL)
     {
 #if DEBUG_DHT_API
@@ -393,8 +404,6 @@ transmit_pending (void *cls, size_t size, void *buf)
       return 0;
     }
 
-  handle->th = NULL;
-
   if (handle->current != NULL)
     {
       tsize = ntohs (handle->current->msg->size);
@@ -428,6 +437,7 @@ process_pending_message (struct GNUNET_DHT_Handle *handle)
     return;                     /* action already pending */
   if (GNUNET_YES != try_connect (handle))
     {
+      handle->th = NULL;
       finish (handle, GNUNET_SYSERR);
       return;
     }
@@ -569,20 +579,23 @@ service_message_handler (void *cls,
                                               handle->cfg);
       if (handle->current != NULL)
         {
+          handle->th = NULL;
           finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
         }
-      if (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0)
+#if RETRANSMIT
+      if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
         {
           handle->retransmit_stage = DHT_RETRANSMITTING;
           handle->current = handle->retransmissions->message;
           process_pending_retransmissions(handle);
         }
+#endif
       return;
     }
 
   switch (ntohs (msg->type))
     {
-    case GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT:
+    case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT:
       {
         dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg;
         uid = GNUNET_ntohll (dht_msg->unique_id);
@@ -683,7 +696,7 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
 #endif
   GNUNET_assert (handle != NULL);
-  if (handle->th != NULL)       /* We have a live transmit request in the Aether */
+  if (handle->th != NULL)       /* We have a live transmit request */
     {
       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
       handle->th = NULL;
@@ -774,7 +787,7 @@ get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
   result_data = (char *) &result[1];    /* Set data pointer to end of message */
 
   get_handle->get_context.iter (get_handle->get_context.iter_cls,
-                                result->expiration, &result->key,
+                                GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
                                 ntohs (result->type), data_size, result_data);
 }
 
@@ -788,7 +801,6 @@ find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
   struct GNUNET_MessageHeader *hello;
-  size_t hello_size;
 
   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
     {
@@ -800,7 +812,6 @@ find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 
   GNUNET_assert (ntohs (reply->size) >=
                  sizeof (struct GNUNET_MessageHeader));
-  hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader);
   hello = (struct GNUNET_MessageHeader *)&reply[1];
 
   if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
@@ -815,7 +826,197 @@ find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 }
 
 /**
- * Perform an asynchronous FIND_PEER operation on the DHT.
+ * Send a message to the DHT telling it to start issuing random GET
+ * requests every 'frequency' milliseconds.
+ *
+ * @param handle handle to the DHT service
+ * @param frequency delay (in milliseconds) between sending malicious messages
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
+  msg->variable = htons(frequency);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to issue a single find
+ * peer request using the peers unique identifier as key.  This
+ * is used to fill the routing table, and is normally controlled
+ * by the DHT itself.  However, for testing and perhaps more
+ * close control over the DHT, this can be explicitly managed.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
+                           GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to start issuing random PUT
+ * requests every 'frequency' milliseconds.
+ *
+ * @param handle handle to the DHT service
+ * @param frequency delay (in milliseconds) between sending malicious messages
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
+  msg->variable = htons(frequency);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+/**
+ * Send a message to the DHT telling it to start dropping
+ * all requests received.
+ *
+ * @param handle handle to the DHT service
+ * @param cont continuation to call once the message is sent
+ * @param cont_cls closure for continuation
+ *
+ * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
+ */
+int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+{
+  struct GNUNET_DHT_ControlMessage *msg;
+  struct PendingMessage *pending;
+
+  if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
+    return GNUNET_NO;
+
+  msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
+  msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
+  msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
+  msg->variable = htons(0);
+
+  pending = GNUNET_malloc (sizeof (struct PendingMessage));
+  pending->msg = &msg->header;
+  pending->timeout = GNUNET_TIME_relative_get_forever();
+  pending->free_on_send = GNUNET_YES;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->unique_id = 0;
+
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Initiate a generic DHT route operation.
  *
  * @param handle handle to the DHT service
  * @param key the key to look up
@@ -867,18 +1068,14 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
   route_handle->iter = iter;
   route_handle->iter_cls = iter_cls;
   route_handle->dht_handle = handle;
+  route_handle->uid = handle->uid_gen++;
   if (iter != NULL)
     {
-      route_handle->uid = handle->uid_gen++;
       hash_from_uid (route_handle->uid, &uid_key);
       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
                                          &uid_key, route_handle,
                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     }
-  else
-    {
-      route_handle->uid = 0;
-    }
 
 #if DEBUG_DHT_API
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -888,15 +1085,17 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
   message = GNUNET_malloc (msize);
   message->header.size = htons (msize);
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE);
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
   message->options = htonl (options);
-  message->desired_replication_level = htonl (options);
+  message->desired_replication_level = htonl (desired_replication_level);
   message->unique_id = GNUNET_htonll (route_handle->uid);
   memcpy (&message[1], enc, ntohs (enc->size));
   pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = &message->header;
   pending->timeout = timeout;
+  if (iter == NULL)
+    pending->free_on_send = GNUNET_YES;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
   pending->unique_id = route_handle->uid;
@@ -905,7 +1104,7 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
       handle->current = pending;
       process_pending_message (handle);
     }
-  else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING))
+  else
   {
     handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
     handle->retransmission_buffer = pending;
@@ -933,7 +1132,7 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
 struct GNUNET_DHT_GetHandle *
 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
                       struct GNUNET_TIME_Relative timeout,
-                      uint32_t type,
+                      enum GNUNET_BLOCK_Type type,
                       const GNUNET_HashCode * key,
                       GNUNET_DHT_GetIterator iter,
                       void *iter_cls,
@@ -960,7 +1159,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
   get_msg.type = htons (type);
 
   get_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
+    GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout,
                             &get_reply_iterator, get_handle, cont, cont_cls);
 
   return get_handle;
@@ -986,18 +1185,20 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
   msize = sizeof (struct GNUNET_DHT_StopMessage);
   message = GNUNET_malloc (msize);
   message->header.size = htons (msize);
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Remove outstanding request for uid %llu\n", "DHT API",
               route_handle->uid);
 #endif
   message->unique_id = GNUNET_htonll (route_handle->uid);
+  memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
   pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = (struct GNUNET_MessageHeader *) message;
   pending->timeout = GNUNET_TIME_relative_get_forever();
   pending->cont = cont;
   pending->cont_cls = cont_cls;
+  pending->free_on_send = GNUNET_YES;
   pending->unique_id = 0; /* When finished is called, free pending->msg */
 
   if (route_handle->dht_handle->current == NULL)
@@ -1005,13 +1206,14 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
       route_handle->dht_handle->current = pending;
       process_pending_message (route_handle->dht_handle);
     }
-  else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING))
+  else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)
     {
       route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
       route_handle->dht_handle->retransmission_buffer = pending;
     }
   else
     {
+      GNUNET_free(pending);
       GNUNET_break(0);
     }
 
@@ -1084,7 +1286,7 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
                             void *cont_cls)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
-  struct GNUNET_MessageHeader find_peer_msg;
+  struct GNUNET_DHT_FindPeerMessage find_peer_msg;
 
   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
     return NULL;
@@ -1100,10 +1302,10 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
               "FIND PEER", GNUNET_h2s (key));
 #endif
 
-  find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
-  find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+  find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
+  find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
   find_peer_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
+    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
                             timeout, &find_peer_reply_iterator,
                             find_peer_handle, cont, cont_cls);
   return find_peer_handle;
@@ -1164,7 +1366,7 @@ GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
 void
 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 const GNUNET_HashCode * key,
-                uint32_t type,
+                enum GNUNET_BLOCK_Type type,
                 uint32_t size,
                 const char *data,
                 struct GNUNET_TIME_Absolute exp,
@@ -1177,6 +1379,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
 
   if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
     {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n");
       if (cont != NULL)
         {
           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
@@ -1200,11 +1403,12 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
   memcpy (&put_msg[1], data, size);
 
-  put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
+  put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL,
                                       NULL, cont, cont_cls);
 
   if (put_route == NULL) /* Route start failed! */
     {
+      GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
       if (cont != NULL)
         {
           GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
@@ -1212,7 +1416,11 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
         }
     }
   else
-    GNUNET_free(put_route);
+    {
+      GNUNET_free(put_route);
+    }
 
   GNUNET_free (put_msg);
 }
+
+/* end of dht_api.c */