-options to play with
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
index cfab966bc77e7b3cbda971a8107e4e26ac2b810f..76e04f57cf5684b60caecc564e69ca2d423ab0cd 100644 (file)
 #include "gnunet-service-fs_indexing.h"
 #include "gnunet-service-fs_pe.h"
 #include "gnunet-service-fs_pr.h"
+#include "gnunet-service-fs_stream.h"
+
+
+/**
+ * Desired replication level for GETs.
+ */
+#define DHT_GET_REPLICATION 5
 
 /**
  * Maximum size of the datastore queue for P2P operations.  Needs to
  */
 #define MAX_RESULTS (100 * 1024)
 
+/**
+ * Collect an instane number of statistics?  May cause excessive IPC.
+ */
+#define INSANE_STATISTICS GNUNET_NO
+
+/**
+ * If obtaining a block via stream fails, how often do we retry it before
+ * giving up for good (and sticking to non-anonymous transfer)?
+ */
+#define STREAM_RETRY_MAX 3
+
+
 /**
  * An active request.
  */
@@ -96,6 +115,11 @@ struct GSF_PendingRequest
    */
   struct GNUNET_DHT_GetHandle *gh;
 
+  /**
+   * Stream request handle for this request (or NULL for none).
+   */
+  struct GSF_StreamRequest *stream_request;
+
   /**
    * Function to call upon completion of the local get
    * request, or NULL for none.
@@ -148,6 +172,12 @@ struct GSF_PendingRequest
    */
   uint64_t first_uid;
 
+  /**
+   * How often have we retried this request via 'stream'?
+   * (used to bound overall retries).
+   */
+  unsigned int stream_retry_count;
+
   /**
    * Number of valid entries in the 'replies_seen' array.
    */
@@ -190,12 +220,6 @@ static struct GNUNET_LOAD_Value *datastore_put_load;
 static int active_to_migration;
 
 
-/**
- * Size of the datastore queue we assume for common requests.
- * Determined based on the network quota.
- */
-static unsigned int datastore_queue_size;
-
 /**
  * Heap with the request that will expire next at the top.  Contains
  * pointers of type "struct PendingRequest*"; these will *also* be
@@ -263,15 +287,15 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
 struct GSF_PendingRequest *
 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
                              enum GNUNET_BLOCK_Type type,
-                             const struct GNUNET_HashCode * query,
-                             const struct GNUNET_HashCode * namespace,
+                             const struct GNUNET_HashCode *query,
+                             const struct GNUNET_HashCode *namespace,
                              const struct GNUNET_PeerIdentity *target,
                              const char *bf_data, size_t bf_size,
                              uint32_t mingle, uint32_t anonymity_level,
                              uint32_t priority, int32_t ttl,
                              GNUNET_PEER_Id sender_pid,
                              GNUNET_PEER_Id origin_pid,
-                             const struct GNUNET_HashCode * replies_seen,
+                             const struct GNUNET_HashCode *replies_seen,
                              unsigned int replies_seen_count,
                              GSF_PendingRequestReplyHandler rh, void *rh_cls)
 {
@@ -283,9 +307,11 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Creating request handle for `%s' of type %d\n",
               GNUNET_h2s (query), type);
+#if INSANE_STATISTICS
   GNUNET_STATISTICS_update (GSF_stats,
                             gettext_noop ("# Pending requests created"), 1,
                             GNUNET_NO);
+#endif
   extra = 0;
   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
     extra += sizeof (struct GNUNET_HashCode);
@@ -471,6 +497,10 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
       }
     }
   }
+  if (NULL != pr->gh) 
+    GNUNET_DHT_get_filter_known_results (pr->gh,
+                                        replies_seen_count,
+                                        replies_seen);
 }
 
 
@@ -619,6 +649,11 @@ clean_request (void *cls, const struct GNUNET_HashCode * key, void *value)
     GNUNET_DHT_get_stop (pr->gh);
     pr->gh = NULL;
   }
+  if (NULL != pr->stream_request)
+  {
+    GSF_stream_query_cancel (pr->stream_request);
+    pr->stream_request = NULL;
+  }
   if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
   {
     GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -671,6 +706,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
       GNUNET_DHT_get_stop (pr->gh);
       pr->gh = NULL;
     }
+    if (NULL != pr->stream_request)
+    {
+      GSF_stream_query_cancel (pr->stream_request);
+      pr->stream_request = NULL;
+    }
     if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
     {
       GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -812,20 +852,33 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
     GNUNET_LOAD_update (GSF_rt_entry_lifetime,
                         GNUNET_TIME_absolute_get_duration (pr->
                                                            public_data.start_time).rel_value);
-    if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission))
+    if (GNUNET_YES !=
+       GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head, 
+                                                          prq->sender, 
+                                                          &last_transmission))
       last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
     /* pass on to other peers / local clients */
     pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
             last_transmission, prq->type, prq->data, prq->size);
     return GNUNET_YES;
   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+#if INSANE_STATISTICS
     GNUNET_STATISTICS_update (GSF_stats,
                               gettext_noop
                               ("# duplicate replies discarded (bloomfilter)"),
                               1, GNUNET_NO);
+#endif
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Duplicate response, discarding.\n");
     return GNUNET_YES;          /* duplicate */
+  case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
+    GNUNET_STATISTICS_update (GSF_stats,
+                              gettext_noop
+                              ("# irrelevant replies discarded"),
+                              1, GNUNET_NO);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Irrelevant response, ignoring.\n");
+    return GNUNET_YES;
   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
     return GNUNET_YES;          /* wrong namespace */
   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
@@ -861,9 +914,12 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
   pr->public_data.results_found++;
   prq->request_found = GNUNET_YES;
   /* finally, pass on to other peer / local client */
-  if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission))
+  if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
+                                                          prq->sender, 
+                                                          &last_transmission))
     last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
-  pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
+  pr->rh (pr->rh_cls, prq->eval, pr, 
+         prq->anonymity_level, prq->expiration,
           last_transmission, prq->type, prq->data, prq->size);
   return GNUNET_YES;
 }
@@ -1100,10 +1156,105 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
   pr->gh =
       GNUNET_DHT_get_start (GSF_dht, 
                             pr->public_data.type, &pr->public_data.query,
-                            5 /* DEFAULT_GET_REPLICATION */ ,
+                            DHT_GET_REPLICATION,
                             GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
-                            /* FIXME: can no longer pass pr->bf/pr->mingle... */
                             xquery, xquery_size, &handle_dht_reply, pr);
+  if ( (NULL != pr->gh) && 
+       (0 != pr->replies_seen_count) )
+    GNUNET_DHT_get_filter_known_results (pr->gh,
+                                        pr->replies_seen_count,
+                                        pr->replies_seen);
+}
+
+
+/**
+ * Function called with a reply from the stream.
+ * 
+ * @param cls the pending request struct
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+static void
+stream_reply_proc (void *cls,
+                  enum GNUNET_BLOCK_Type type,
+                  struct GNUNET_TIME_Absolute expiration,
+                  size_t data_size,
+                  const void *data)
+{
+  struct GSF_PendingRequest *pr = cls;
+  struct ProcessReplyClosure prq;
+  struct GNUNET_HashCode query;
+
+  pr->stream_request = NULL;
+  if (GNUNET_BLOCK_TYPE_ANY == type)
+  {
+    GNUNET_break (NULL == data);
+    GNUNET_break (0 == data_size);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Error retrieiving block via stream\n");
+    pr->stream_retry_count++;
+    if (pr->stream_retry_count >= STREAM_RETRY_MAX)
+      return; /* give up on stream */
+    /* retry -- without delay, as this is non-anonymous
+       and mesh/stream connect will take some time anyway */
+    pr->stream_request = GSF_stream_query (pr->public_data.target,
+                                          &pr->public_data.query,
+                                          pr->public_data.type,
+                                          &stream_reply_proc,
+                                          pr);
+    return;
+  }
+  if (GNUNET_YES !=
+      GNUNET_BLOCK_get_key (GSF_block_ctx,
+                           type,
+                           data, data_size, &query))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Failed to derive key for block of type %d\n",
+               (int) type);
+    GNUNET_break_op (0);
+    return;
+  }
+  GNUNET_STATISTICS_update (GSF_stats,
+                            gettext_noop ("# Replies received from STREAM"), 1,
+                            GNUNET_NO);
+  memset (&prq, 0, sizeof (prq));
+  prq.data = data;
+  prq.expiration = expiration;
+  /* do not allow migrated content to live longer than 1 year */
+  prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+                                            prq.expiration);
+  prq.size = data_size;
+  prq.type = type;
+  process_reply (&prq, &query, pr);
+}
+
+
+/**
+ * Consider downloading via stream (if possible)
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_stream_lookup_ (struct GSF_PendingRequest *pr)
+{
+  if (0 != pr->public_data.anonymity_level)
+    return;
+  if (0 == pr->public_data.target)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Cannot do stream-based download, target peer not known\n");
+    return;
+  }
+  if (NULL != pr->stream_request)
+    return;
+  pr->stream_request = GSF_stream_query (pr->public_data.target,
+                                        &pr->public_data.query,
+                                        pr->public_data.type,
+                                        &stream_reply_proc,
+                                        pr);
 }
 
 
@@ -1183,10 +1334,12 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
     pr->qe = NULL;
     if (NULL == key)
     {
+#if INSANE_STATISTICS
       GNUNET_STATISTICS_update (GSF_stats,
                                 gettext_noop
                                 ("# Datastore lookups concluded (no results)"),
                                 1, GNUNET_NO);
+#endif
     }
     if (GNUNET_NO == pr->have_first_uid)
     {
@@ -1218,12 +1371,14 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
                 "No further local responses available.\n");
+#if INSANE_STATISTICS
     if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
         (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK))
       GNUNET_STATISTICS_update (GSF_stats,
                                 gettext_noop
                                 ("# requested DBLOCK or IBLOCK not found"), 1,
                                 GNUNET_NO);
+#endif
     goto check_error_and_continue;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1272,7 +1427,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
                                   (0 !=
                                    (GSF_PRO_PRIORITY_UNLIMITED &
                                     pr->public_data.options)) ? UINT_MAX :
-                                  datastore_queue_size
+                                  GSF_datastore_queue_size
                                   /* max queue size */ ,
                                   GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
@@ -1312,7 +1467,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
                                   (0 !=
                                    (GSF_PRO_PRIORITY_UNLIMITED &
                                     pr->public_data.options)) ? UINT_MAX :
-                                  datastore_queue_size
+                                  GSF_datastore_queue_size
                                   /* max queue size */ ,
                                   GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
@@ -1370,7 +1525,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
                                 (0 !=
                                  (GSF_PRO_PRIORITY_UNLIMITED & pr->
                                   public_data.options)) ? UINT_MAX :
-                                datastore_queue_size
+                                GSF_datastore_queue_size
                                 /* max queue size */ ,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
@@ -1427,6 +1582,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
                    GSF_LocalLookupContinuation cont, void *cont_cls)
 {
   GNUNET_assert (NULL == pr->gh);
+  GNUNET_assert (NULL == pr->stream_request);
   GNUNET_assert (NULL == pr->llc_cont);
   pr->llc_cont = cont;
   pr->llc_cont_cls = cont_cls;
@@ -1434,9 +1590,11 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
   pr->warn_task =
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
                                     pr);
+#if INSANE_STATISTICS
   GNUNET_STATISTICS_update (GSF_stats,
                             gettext_noop ("# Datastore lookups initiated"), 1,
                             GNUNET_NO);
+#endif
   pr->qe =
       GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
                                 &pr->public_data.query,
@@ -1450,7 +1608,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
                                 (0 !=
                                  (GSF_PRO_PRIORITY_UNLIMITED & pr->
                                   public_data.options)) ? UINT_MAX :
-                                datastore_queue_size
+                                GSF_datastore_queue_size
                                 /* max queue size */ ,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
@@ -1602,8 +1760,6 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
 void
 GSF_pending_request_init_ ()
 {
-  unsigned long long bps;
-
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
                                              "MAX_PENDING_REQUESTS",
@@ -1612,19 +1768,6 @@ GSF_pending_request_init_ ()
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
                               "fs", "MAX_PENDING_REQUESTS");
   }
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "ats", "WAN_QUOTA_OUT",
-                                           &bps))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
-                              "ats", "WAN_QUOTA_OUT");
-    bps = 65536;
-  }
-  /* queue size should be #queries we can have pending and satisfy within
-   * a carry interval: */
-  datastore_queue_size =
-      bps * GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S / DBLOCK_SIZE;
-
   active_to_migration =
       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);