-options to play with
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
index 349232dafb7ae58873175d70b52a4bee5caa7ad8..76e04f57cf5684b60caecc564e69ca2d423ab0cd 100644 (file)
 #include "gnunet-service-fs_indexing.h"
 #include "gnunet-service-fs_pe.h"
 #include "gnunet-service-fs_pr.h"
+#include "gnunet-service-fs_stream.h"
+
+
+/**
+ * Desired replication level for GETs.
+ */
+#define DHT_GET_REPLICATION 5
 
 /**
  * Maximum size of the datastore queue for P2P operations.  Needs to
  */
 #define INSANE_STATISTICS GNUNET_NO
 
+/**
+ * If obtaining a block via stream fails, how often do we retry it before
+ * giving up for good (and sticking to non-anonymous transfer)?
+ */
+#define STREAM_RETRY_MAX 3
+
+
 /**
  * An active request.
  */
@@ -101,6 +115,11 @@ struct GSF_PendingRequest
    */
   struct GNUNET_DHT_GetHandle *gh;
 
+  /**
+   * Stream request handle for this request (or NULL for none).
+   */
+  struct GSF_StreamRequest *stream_request;
+
   /**
    * Function to call upon completion of the local get
    * request, or NULL for none.
@@ -153,6 +172,12 @@ struct GSF_PendingRequest
    */
   uint64_t first_uid;
 
+  /**
+   * How often have we retried this request via 'stream'?
+   * (used to bound overall retries).
+   */
+  unsigned int stream_retry_count;
+
   /**
    * Number of valid entries in the 'replies_seen' array.
    */
@@ -195,12 +220,6 @@ static struct GNUNET_LOAD_Value *datastore_put_load;
 static int active_to_migration;
 
 
-/**
- * Size of the datastore queue we assume for common requests.
- * Determined based on the network quota.
- */
-static unsigned int datastore_queue_size;
-
 /**
  * Heap with the request that will expire next at the top.  Contains
  * pointers of type "struct PendingRequest*"; these will *also* be
@@ -268,15 +287,15 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
 struct GSF_PendingRequest *
 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
                              enum GNUNET_BLOCK_Type type,
-                             const struct GNUNET_HashCode * query,
-                             const struct GNUNET_HashCode * namespace,
+                             const struct GNUNET_HashCode *query,
+                             const struct GNUNET_HashCode *namespace,
                              const struct GNUNET_PeerIdentity *target,
                              const char *bf_data, size_t bf_size,
                              uint32_t mingle, uint32_t anonymity_level,
                              uint32_t priority, int32_t ttl,
                              GNUNET_PEER_Id sender_pid,
                              GNUNET_PEER_Id origin_pid,
-                             const struct GNUNET_HashCode * replies_seen,
+                             const struct GNUNET_HashCode *replies_seen,
                              unsigned int replies_seen_count,
                              GSF_PendingRequestReplyHandler rh, void *rh_cls)
 {
@@ -630,6 +649,11 @@ clean_request (void *cls, const struct GNUNET_HashCode * key, void *value)
     GNUNET_DHT_get_stop (pr->gh);
     pr->gh = NULL;
   }
+  if (NULL != pr->stream_request)
+  {
+    GSF_stream_query_cancel (pr->stream_request);
+    pr->stream_request = NULL;
+  }
   if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
   {
     GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -682,6 +706,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
       GNUNET_DHT_get_stop (pr->gh);
       pr->gh = NULL;
     }
+    if (NULL != pr->stream_request)
+    {
+      GSF_stream_query_cancel (pr->stream_request);
+      pr->stream_request = NULL;
+    }
     if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
     {
       GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -1127,7 +1156,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
   pr->gh =
       GNUNET_DHT_get_start (GSF_dht, 
                             pr->public_data.type, &pr->public_data.query,
-                            5 /* DEFAULT_GET_REPLICATION */ ,
+                            DHT_GET_REPLICATION,
                             GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
                             xquery, xquery_size, &handle_dht_reply, pr);
   if ( (NULL != pr->gh) && 
@@ -1138,6 +1167,97 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
 }
 
 
+/**
+ * Function called with a reply from the stream.
+ * 
+ * @param cls the pending request struct
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+static void
+stream_reply_proc (void *cls,
+                  enum GNUNET_BLOCK_Type type,
+                  struct GNUNET_TIME_Absolute expiration,
+                  size_t data_size,
+                  const void *data)
+{
+  struct GSF_PendingRequest *pr = cls;
+  struct ProcessReplyClosure prq;
+  struct GNUNET_HashCode query;
+
+  pr->stream_request = NULL;
+  if (GNUNET_BLOCK_TYPE_ANY == type)
+  {
+    GNUNET_break (NULL == data);
+    GNUNET_break (0 == data_size);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Error retrieiving block via stream\n");
+    pr->stream_retry_count++;
+    if (pr->stream_retry_count >= STREAM_RETRY_MAX)
+      return; /* give up on stream */
+    /* retry -- without delay, as this is non-anonymous
+       and mesh/stream connect will take some time anyway */
+    pr->stream_request = GSF_stream_query (pr->public_data.target,
+                                          &pr->public_data.query,
+                                          pr->public_data.type,
+                                          &stream_reply_proc,
+                                          pr);
+    return;
+  }
+  if (GNUNET_YES !=
+      GNUNET_BLOCK_get_key (GSF_block_ctx,
+                           type,
+                           data, data_size, &query))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Failed to derive key for block of type %d\n",
+               (int) type);
+    GNUNET_break_op (0);
+    return;
+  }
+  GNUNET_STATISTICS_update (GSF_stats,
+                            gettext_noop ("# Replies received from STREAM"), 1,
+                            GNUNET_NO);
+  memset (&prq, 0, sizeof (prq));
+  prq.data = data;
+  prq.expiration = expiration;
+  /* do not allow migrated content to live longer than 1 year */
+  prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+                                            prq.expiration);
+  prq.size = data_size;
+  prq.type = type;
+  process_reply (&prq, &query, pr);
+}
+
+
+/**
+ * Consider downloading via stream (if possible)
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_stream_lookup_ (struct GSF_PendingRequest *pr)
+{
+  if (0 != pr->public_data.anonymity_level)
+    return;
+  if (0 == pr->public_data.target)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Cannot do stream-based download, target peer not known\n");
+    return;
+  }
+  if (NULL != pr->stream_request)
+    return;
+  pr->stream_request = GSF_stream_query (pr->public_data.target,
+                                        &pr->public_data.query,
+                                        pr->public_data.type,
+                                        &stream_reply_proc,
+                                        pr);
+}
+
+
 /**
  * Task that issues a warning if the datastore lookup takes too long.
  *
@@ -1307,7 +1427,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
                                   (0 !=
                                    (GSF_PRO_PRIORITY_UNLIMITED &
                                     pr->public_data.options)) ? UINT_MAX :
-                                  datastore_queue_size
+                                  GSF_datastore_queue_size
                                   /* max queue size */ ,
                                   GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
@@ -1347,7 +1467,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
                                   (0 !=
                                    (GSF_PRO_PRIORITY_UNLIMITED &
                                     pr->public_data.options)) ? UINT_MAX :
-                                  datastore_queue_size
+                                  GSF_datastore_queue_size
                                   /* max queue size */ ,
                                   GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
@@ -1405,7 +1525,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
                                 (0 !=
                                  (GSF_PRO_PRIORITY_UNLIMITED & pr->
                                   public_data.options)) ? UINT_MAX :
-                                datastore_queue_size
+                                GSF_datastore_queue_size
                                 /* max queue size */ ,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
@@ -1462,6 +1582,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
                    GSF_LocalLookupContinuation cont, void *cont_cls)
 {
   GNUNET_assert (NULL == pr->gh);
+  GNUNET_assert (NULL == pr->stream_request);
   GNUNET_assert (NULL == pr->llc_cont);
   pr->llc_cont = cont;
   pr->llc_cont_cls = cont_cls;
@@ -1487,7 +1608,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
                                 (0 !=
                                  (GSF_PRO_PRIORITY_UNLIMITED & pr->
                                   public_data.options)) ? UINT_MAX :
-                                datastore_queue_size
+                                GSF_datastore_queue_size
                                 /* max queue size */ ,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
@@ -1639,8 +1760,6 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
 void
 GSF_pending_request_init_ ()
 {
-  unsigned long long dqs;
-
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
                                              "MAX_PENDING_REQUESTS",
@@ -1649,16 +1768,6 @@ GSF_pending_request_init_ ()
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
                               "fs", "MAX_PENDING_REQUESTS");
   }
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE",
-                                           &dqs))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
-                              "fs", "DATASTORE_QUEUE_SIZE");
-    dqs = 1024;
-  }
-  datastore_queue_size = (unsigned int) dqs;
-
   active_to_migration =
       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);