WiP
[oweals/gnunet.git] / src / dht / dht_api_get_put.c
index b1b80b01e238ac0306d18ad231f4626a44054fbc..5b5baa531874bd82a2faa1af222f7112029e8d9f 100644 (file)
@@ -39,6 +39,8 @@
  *
  * @param handle handle to DHT service
  * @param key the key to store under
+ * @param desired_replication_level estimate of how many
+ *                nearest peers this request should reach
  * @param options routing options for this message
  * @param type type of the value
  * @param size number of bytes in data; must be less than 64k
@@ -52,6 +54,7 @@
 void
 GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 const GNUNET_HashCode * key,
+                uint32_t desired_replication_level,
                enum GNUNET_DHT_RouteOption options,
                 enum GNUNET_BLOCK_Type type,
                 size_t size,
@@ -72,17 +75,17 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
   put_msg = (struct GNUNET_DHT_PutMessage*) buf;
   put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
   put_msg->header.size = htons (sizeof (struct GNUNET_DHT_PutMessage) + size);
-  put_msg->type = htons (type);
+  put_msg->type = htonl ((uint32_t)type);
   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
   memcpy (&put_msg[1], data, size);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Starting route for %u byte `%s' message\n",
+             "Starting route for %u byte `%s' message of type %u \n",
              (unsigned int) (sizeof (struct GNUNET_DHT_PutMessage) + size),
-             "PUT");
+             "PUT", type);
   GNUNET_break (NULL ==
                GNUNET_DHT_route_start (handle, 
                                        key, 
-                                       DEFAULT_PUT_REPLICATION, options,
+                                       desired_replication_level, options,
                                        &put_msg->header, 
                                        timeout, 
                                        NULL, NULL,
@@ -121,18 +124,25 @@ struct GNUNET_DHT_GetHandle
  *
  * @param cls the 'struct GNUNET_DHT_GetHandle'
  * @param key key that was used
+ * @param outgoing_path path of the message from this peer
+ *                      to the target
  * @param reply response
  */
 static void
 get_reply_iterator (void *cls, 
                    const GNUNET_HashCode *key,
+                   const struct GNUNET_PeerIdentity * const *outgoing_path,
                    const struct GNUNET_MessageHeader *reply)
 {
   struct GNUNET_DHT_GetHandle *get_handle = cls;
   const struct GNUNET_DHT_GetResultMessage *result;
-  const struct GNUNET_PeerIdentity *const*get_path;
-  const struct GNUNET_PeerIdentity *const*put_path;
+  const struct GNUNET_PeerIdentity **put_path;
   size_t payload;
+  char *path_offset;
+  const struct GNUNET_PeerIdentity *pos;
+  unsigned int i;
+  uint16_t put_path_length;
+  uint16_t data_size;
 
   if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
     {
@@ -143,18 +153,36 @@ get_reply_iterator (void *cls,
   GNUNET_assert (ntohs (reply->size) >=
                  sizeof (struct GNUNET_DHT_GetResultMessage));
   result = (const struct GNUNET_DHT_GetResultMessage *) reply;
-  payload = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
-  get_path = NULL; // FIXME: parse path info!
-  put_path = NULL; // FIXME: parse path info!
 
+  put_path = NULL;
+  put_path_length = ntohs(result->put_path_length);
+  if (put_path_length > 0)
+    {
+      data_size = ntohs(result->header.size) - (put_path_length * sizeof(struct GNUNET_PeerIdentity)) - sizeof(struct GNUNET_DHT_GetResultMessage);
+      path_offset = (char *)&result[1];
+      //GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "In get_reply_iterator, result->header.size is %d, put_path_length %d, offset is %d, data_size is %d\n", ntohs(result->header.size), put_path_length, ntohs(result->header.size) - (put_path_length * sizeof(struct GNUNET_PeerIdentity)), data_size);
+      path_offset += data_size;
+      pos = (const struct GNUNET_PeerIdentity *)path_offset;
+      //GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Found put peer %s\n", GNUNET_i2s((const struct GNUNET_PeerIdentity *)path_offset));
+      put_path = GNUNET_malloc ((put_path_length + 1) * sizeof (struct GNUNET_PeerIdentity*));
+      for (i = 0; i < put_path_length; i++)
+        {
+          put_path[i] = pos;
+          pos++;
+        }
+      put_path[put_path_length] = NULL;
+    }
+
+  payload = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
   get_handle->iter (get_handle->iter_cls,
                    GNUNET_TIME_absolute_ntoh (result->expiration),
                    key,
-                   get_path,
+                   outgoing_path,
                    put_path,
                    ntohs (result->type), 
                    payload,
                    &result[1]);
+  GNUNET_free_non_null(put_path);
 }
 
 
@@ -167,10 +195,12 @@ get_reply_iterator (void *cls,
  * @param timeout how long to wait for transmission of this request to the service
  * @param type expected type of the response object
  * @param key the key to look up
+ * @param desired_replication_level estimate of how many
+                  nearest peers this request should reach
  * @param options routing options for this message
  * @param bf bloom filter associated with query (can be NULL)
  * @param bf_mutator mutation value for bf
- * @param xquery extrended query data (can be NULL, depending on type)
+ * @param xquery extended query data (can be NULL, depending on type)
  * @param xquery_size number of bytes in xquery
  * @param iter function to call on each result
  * @param iter_cls closure for iter
@@ -182,6 +212,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
                       struct GNUNET_TIME_Relative timeout,
                       enum GNUNET_BLOCK_Type type,
                       const GNUNET_HashCode * key,
+                      uint32_t desired_replication_level,
                      enum GNUNET_DHT_RouteOption options,
                      const struct GNUNET_CONTAINER_BloomFilter *bf,
                      int32_t bf_mutator,
@@ -191,25 +222,49 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
                       void *iter_cls)
 {
   struct GNUNET_DHT_GetHandle *get_handle;
-  struct GNUNET_DHT_GetMessage get_msg;
-
-  /* FIXME: transmit bf, mutator, xquery & xquery_size as well... */
+  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
+  struct GNUNET_DHT_GetMessage *get_msg;
+  size_t bf_size;
+    
+  bf_size = GNUNET_CONTAINER_bloomfilter_get_size (bf);
+  if ( (sizeof (buf) <= 
+       sizeof (struct GNUNET_DHT_GetMessage) + xquery_size + bf_size) ||
+       (sizeof (buf) <= bf_size))
+    {
+      GNUNET_break (0);
+      return NULL;
+    } 
   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
   get_handle->iter = iter;
   get_handle->iter_cls = iter_cls;
-  get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
-  get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg.type = htons (type);
+  get_msg = (struct GNUNET_DHT_GetMessage*) buf;
+  get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
+  get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage) + 
+                               xquery_size + 
+                               bf_size);
+  get_msg->type = htonl ((uint32_t) type);
+  get_msg->bf_mutator = bf_mutator;
+  get_msg->xquery_size = htons ((uint16_t) xquery_size);
+  get_msg->bf_size = htons (bf_size);
+  if (xquery != NULL)
+    memcpy (&buf[sizeof(struct GNUNET_DHT_GetMessage)],
+           xquery,
+           xquery_size);
+  else
+    GNUNET_assert (xquery_size == 0);
+  (void) GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
+                                                   &buf[sizeof(struct GNUNET_DHT_GetMessage) + xquery_size],
+                                                   bf_size);  
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Starting route for %u byte `%s' message\n",
-             (unsigned int) sizeof (struct GNUNET_DHT_GetMessage),
+             (unsigned int) (sizeof (struct GNUNET_DHT_GetMessage) + xquery_size + bf_size) ,
              "GET");
   get_handle->route_handle =
     GNUNET_DHT_route_start (handle,
                            key, 
-                           DEFAULT_GET_REPLICATION,
+                           desired_replication_level,
                            options,
-                           &get_msg.header, 
+                           &get_msg->header, 
                            timeout,
                             &get_reply_iterator, get_handle,
                            NULL, NULL);
@@ -222,6 +277,9 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
  * Stop async DHT-get.
  *
  * @param get_handle handle to the GET operation to stop
+ *
+ * On return get_handle will no longer be valid, caller
+ * must not use again!!!
  */
 void
 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)