set continued flag on client side -- towards fixing 1786:
[oweals/gnunet.git] / src / fs / gnunet-service-fs_put.c
index 5fd2ce81c4a551e266aee9a54b5f10cfa271dc9d..07d32ef31bd6d9d540249c5680576fefb447a695 100644 (file)
 
 
 /**
- * Request to datastore for DHT PUTs (or NULL).
+ * Context for each zero-anonymity iterator.
  */
-static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+struct PutOperator
+{
 
-/**
- * Type we will request for the next DHT PUT round from the datastore.
- */
-static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+  /**
+   * Request to datastore for DHT PUTs (or NULL).
+   */
+  struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+  /**
+   * Type we request from the datastore.
+   */
+  enum GNUNET_BLOCK_Type dht_put_type;
+
+  /**
+   * ID of task that collects blocks for DHT PUTs.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier dht_task;
+
+  /**
+   * How many entires with zero anonymity of our type do we currently
+   * estimate to have in the database?
+   */
+  uint64_t zero_anonymity_count_estimate;
+
+  /**
+   * Current offset when iterating the database.
+   */
+  uint64_t current_offset;
+};
 
-/**
- * ID of task that collects blocks for DHT PUTs.
- */
-static GNUNET_SCHEDULER_TaskIdentifier dht_task;
 
 /**
- * How many entires with zero anonymity do we currently estimate
- * to have in the database?
+ * ANY-terminated list of our operators (one per type
+ * of block that we're putting into the DHT).
  */
-static unsigned int zero_anonymity_count_estimate;
+static struct PutOperator operators[] = {
+  {NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0},
+  {NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0},
+  {NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0},
+  {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0}
+};
 
 
 /**
  * Task that is run periodically to obtain blocks for DHT PUTs.
- * 
+ *
  * @param cls type of blocks to gather
  * @param tc scheduler context (unused)
  */
 static void
 gather_dht_put_blocks (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc);
-
+                       const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
- * If the DHT PUT gathering task is not currently running, consider
- * (re)scheduling it with the appropriate delay.
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
  */
 static void
-consider_dht_put_gathering (void *cls)
+delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct PutOperator *po = cls;
   struct GNUNET_TIME_Relative delay;
 
-  if (GSF_dsh == NULL)
+  po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  if (dht_qe != NULL)
-    return;
-  if (dht_task != GNUNET_SCHEDULER_NO_TASK)
-    return;
-  if (zero_anonymity_count_estimate > 0)
-    {
-      delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
-                                          zero_anonymity_count_estimate);
-      delay = GNUNET_TIME_relative_min (delay,
-                                       MAX_DHT_PUT_FREQ);
-    }
+  if (po->zero_anonymity_count_estimate > 0)
+  {
+    delay =
+        GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+                                     po->zero_anonymity_count_estimate);
+    delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ);
+  }
   else
-    {
-      /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
-        (hopefully) appear */
-      delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
-    }
-  dht_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                          &gather_dht_put_blocks,
-                                          cls);
-}
-
-
-/**
- * Function called upon completion of the DHT PUT operation.
- */
-static void
-dht_put_continuation (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  GNUNET_DATASTORE_get_next (GSF_dsh);
+  {
+    /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+     * (hopefully) appear */
+    delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+  }
+  po->dht_task =
+      GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po);
 }
 
 
@@ -128,82 +139,57 @@ dht_put_continuation (void *cls,
  *        maybe 0 if no unique identifier is available
  */
 static void
-process_dht_put_content (void *cls,
-                        const GNUNET_HashCode * key,
-                        size_t size,
-                        const void *data,
-                        enum GNUNET_BLOCK_Type type,
-                        uint32_t priority,
-                        uint32_t anonymity,
-                        struct GNUNET_TIME_Absolute
-                        expiration, uint64_t uid)
-{ 
-  static unsigned int counter;
-  static GNUNET_HashCode last_vhash;
-  static GNUNET_HashCode vhash;
+process_dht_put_content (void *cls, const GNUNET_HashCode * key, size_t size,
+                         const void *data, enum GNUNET_BLOCK_Type type,
+                         uint32_t priority, uint32_t anonymity,
+                         struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+{
+  struct PutOperator *po = cls;
 
+  po->dht_qe = NULL;
   if (key == NULL)
-    {
-      dht_qe = NULL;
-      consider_dht_put_gathering (cls);
-      return;
-    }
-  /* slightly funky code to estimate the total number of values with zero
-     anonymity from the maximum observed length of a monotonically increasing 
-     sequence of hashes over the contents */
-  GNUNET_CRYPTO_hash (data, size, &vhash);
-  if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
-    {
-      if (zero_anonymity_count_estimate > 0)
-       zero_anonymity_count_estimate /= 2;
-      counter = 0;
-    }
-  last_vhash = vhash;
-  if (counter < 31)
-    counter++;
-  if (zero_anonymity_count_estimate < (1 << counter))
-    zero_anonymity_count_estimate = (1 << counter);
+  {
+    po->zero_anonymity_count_estimate = po->current_offset - 1;
+    po->current_offset = 0;
+    po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po);
+    return;
+  }
+  po->zero_anonymity_count_estimate =
+      GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Retrieved block `%s' of type %u for DHT PUT\n",
-             GNUNET_h2s (key),
-             type);
+              "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
+              type);
 #endif
-  GNUNET_DHT_put (GSF_dht,
-                 key,
-                 DEFAULT_PUT_REPLICATION,
-                 GNUNET_DHT_RO_NONE,
-                 type,
-                 size,
-                 data,
-                 expiration,
-                 GNUNET_TIME_UNIT_FOREVER_REL,
-                 &dht_put_continuation,
-                 cls);
+  GNUNET_DHT_put (GSF_dht, key, 5 /* DEFAULT_PUT_REPLICATION */ ,
+                  GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data,
+                  expiration, GNUNET_TIME_UNIT_FOREVER_REL,
+                  &delay_dht_put_blocks, po);
 }
 
 
 /**
  * Task that is run periodically to obtain blocks for DHT PUTs.
- * 
+ *
  * @param cls type of blocks to gather
  * @param tc scheduler context (unused)
  */
 static void
-gather_dht_put_blocks (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  dht_task = GNUNET_SCHEDULER_NO_TASK;
-  if (GSF_dsh == NULL)
+  struct PutOperator *po = cls;
+
+  po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
-    dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
-  dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, 
-                                               0, UINT_MAX,
-                                               GNUNET_TIME_UNIT_FOREVER_REL,
-                                               dht_put_type++,
-                                               &process_dht_put_content, NULL);
-  GNUNET_assert (dht_qe != NULL);
+  po->dht_qe =
+      GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
+                                           UINT_MAX,
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           po->dht_put_type,
+                                           &process_dht_put_content, po);
+  if (NULL == po->dht_qe)
+    po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po);
 }
 
 
@@ -213,7 +199,15 @@ gather_dht_put_blocks (void *cls,
 void
 GSF_put_init_ ()
 {
-  dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL);
+  unsigned int i;
+
+  i = 0;
+  while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+  {
+    operators[i].dht_task =
+        GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
+    i++;
+  }
 }
 
 
@@ -223,16 +217,24 @@ GSF_put_init_ ()
 void
 GSF_put_done_ ()
 {
-  if (GNUNET_SCHEDULER_NO_TASK != dht_task)
+  struct PutOperator *po;
+  unsigned int i;
+
+  i = 0;
+  while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+  {
+    if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
     {
-      GNUNET_SCHEDULER_cancel (dht_task);
-      dht_task = GNUNET_SCHEDULER_NO_TASK;
+      GNUNET_SCHEDULER_cancel (po->dht_task);
+      po->dht_task = GNUNET_SCHEDULER_NO_TASK;
     }
-  if (NULL != dht_qe)
+    if (NULL != po->dht_qe)
     {
-      GNUNET_DATASTORE_cancel (dht_qe);
-      dht_qe = NULL;
+      GNUNET_DATASTORE_cancel (po->dht_qe);
+      po->dht_qe = NULL;
     }
+    i++;
+  }
 }
 
 /* end of gnunet-service-fs_put.c */