/*
This file is part of GNUnet.
- (C) 2011 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2011 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
*/
#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
-
/**
- * Request to datastore for DHT PUTs (or NULL).
+ * How many replicas do we try to create per PUT?
*/
-static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+#define DEFAULT_PUT_REPLICATION 5
-/**
- * Type we will request for the next DHT PUT round from the datastore.
- */
-static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
/**
- * ID of task that collects blocks for DHT PUTs.
+ * Context for each zero-anonymity iterator.
*/
-static GNUNET_SCHEDULER_TaskIdentifier dht_task;
+struct PutOperator
+{
+
+ /**
+ * Request to datastore for DHT PUTs (or NULL).
+ */
+ struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+ /**
+ * Type we request from the datastore.
+ */
+ enum GNUNET_BLOCK_Type dht_put_type;
+
+ /**
+ * Handle to PUT operation.
+ */
+ struct GNUNET_DHT_PutHandle *dht_put;
+
+ /**
+ * ID of task that collects blocks for DHT PUTs.
+ */
+ struct GNUNET_SCHEDULER_Task * dht_task;
+
+ /**
+ * How many entires with zero anonymity of our type do we currently
+ * estimate to have in the database?
+ */
+ uint64_t zero_anonymity_count_estimate;
+
+ /**
+ * Current offset when iterating the database.
+ */
+ uint64_t current_offset;
+};
+
/**
- * How many entires with zero anonymity do we currently estimate
- * to have in the database?
+ * ANY-terminated list of our operators (one per type
+ * of block that we're putting into the DHT).
*/
-static unsigned int zero_anonymity_count_estimate;
+static struct PutOperator operators[] = {
+ {NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0},
+ {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0}
+};
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
* @param tc scheduler context (unused)
*/
static void
-gather_dht_put_blocks (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
-
+gather_dht_put_blocks (void *cls);
/**
- * If the DHT PUT gathering task is not currently running, consider
- * (re)scheduling it with the appropriate delay.
+ * Calculate when to run the next PUT operation and schedule it.
+ *
+ * @param po put operator to schedule
*/
static void
-consider_dht_put_gathering (void *cls)
+schedule_next_put (struct PutOperator *po)
{
struct GNUNET_TIME_Relative delay;
- if (GSF_dsh == NULL)
- return;
- if (dht_qe != NULL)
- return;
- if (dht_task != GNUNET_SCHEDULER_NO_TASK)
- return;
- if (zero_anonymity_count_estimate > 0)
- {
- delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
- zero_anonymity_count_estimate);
- delay = GNUNET_TIME_relative_min (delay,
- MAX_DHT_PUT_FREQ);
- }
+ if (po->zero_anonymity_count_estimate > 0)
+ {
+ delay =
+ GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+ po->zero_anonymity_count_estimate);
+ delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ);
+ }
else
- {
- /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
- (hopefully) appear */
- delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
- }
- dht_task = GNUNET_SCHEDULER_add_delayed (delay,
- &gather_dht_put_blocks,
- cls);
+ {
+ /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+ * (hopefully) appear */
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+ }
+ po->dht_task =
+ GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po);
}
/**
- * Function called upon completion of the DHT PUT operation.
+ * Continuation called after DHT PUT operation has finished.
+ *
+ * @param cls type of blocks to gather
+ * @param success GNUNET_OK if the PUT was transmitted,
+ * GNUNET_NO on timeout,
+ * GNUNET_SYSERR on disconnect from service
+ * after the PUT message was transmitted
+ * (so we don't know if it was received or not)
*/
static void
-dht_put_continuation (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+delay_dht_put_blocks (void *cls, int success)
{
- GNUNET_DATASTORE_get_next (GSF_dsh);
+ struct PutOperator *po = cls;
+
+ po->dht_put = NULL;
+ schedule_next_put (po);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ */
+static void
+delay_dht_put_task (void *cls)
+{
+ struct PutOperator *po = cls;
+
+ po->dht_task = NULL;
+ schedule_next_put (po);
}
*/
static void
process_dht_put_content (void *cls,
- const GNUNET_HashCode * key,
+ const struct GNUNET_HashCode * key,
size_t size,
- const void *data,
+ const void *data,
enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration, uint64_t uid)
-{
- static unsigned int counter;
- static GNUNET_HashCode last_vhash;
- static GNUNET_HashCode vhash;
+ uint32_t priority, uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+{
+ struct PutOperator *po = cls;
+ po->dht_qe = NULL;
if (key == NULL)
- {
- dht_qe = NULL;
- consider_dht_put_gathering (cls);
- return;
- }
- /* slightly funky code to estimate the total number of values with zero
- anonymity from the maximum observed length of a monotonically increasing
- sequence of hashes over the contents */
- GNUNET_CRYPTO_hash (data, size, &vhash);
- if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
- {
- if (zero_anonymity_count_estimate > 0)
- zero_anonymity_count_estimate /= 2;
- counter = 0;
- }
- last_vhash = vhash;
- if (counter < 31)
- counter++;
- if (zero_anonymity_count_estimate < (1 << counter))
- zero_anonymity_count_estimate = (1 << counter);
-#if DEBUG_FS
+ {
+ po->zero_anonymity_count_estimate = po->current_offset - 1;
+ po->current_offset = 0;
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
+ return;
+ }
+ po->zero_anonymity_count_estimate =
+ GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Retrieved block `%s' of type %u for DHT PUT\n",
- GNUNET_h2s (key),
- type);
-#endif
- GNUNET_DHT_put (GSF_dht,
- key,
- DEFAULT_PUT_REPLICATION,
- GNUNET_DHT_RO_NONE,
- type,
- size,
- data,
- expiration,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &dht_put_continuation,
- cls);
+ "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
+ type);
+ po->dht_put = GNUNET_DHT_put (GSF_dht,
+ key,
+ DEFAULT_PUT_REPLICATION,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ type,
+ size,
+ data,
+ expiration,
+ &delay_dht_put_blocks, po);
}
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
- * @param tc scheduler context (unused)
*/
static void
-gather_dht_put_blocks (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+gather_dht_put_blocks (void *cls)
{
- dht_task = GNUNET_SCHEDULER_NO_TASK;
- if (GSF_dsh == NULL)
- return;
- if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
- dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
- dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
- 0, UINT_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
- dht_put_type++,
- &process_dht_put_content, NULL);
- GNUNET_assert (dht_qe != NULL);
+ struct PutOperator *po = cls;
+
+ po->dht_task = NULL;
+ po->dht_qe =
+ GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
+ UINT_MAX,
+ po->dht_put_type,
+ &process_dht_put_content, po);
+ if (NULL == po->dht_qe)
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
}
void
GSF_put_init_ ()
{
- dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL);
+ unsigned int i;
+
+ i = 0;
+ while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+ {
+ operators[i].dht_task =
+ GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
+ i++;
+ }
}
void
GSF_put_done_ ()
{
- if (GNUNET_SCHEDULER_NO_TASK != dht_task)
+ struct PutOperator *po;
+ unsigned int i;
+
+ i = 0;
+ while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+ {
+ if (NULL != po->dht_task)
+ {
+ GNUNET_SCHEDULER_cancel (po->dht_task);
+ po->dht_task = NULL;
+ }
+ if (NULL != po->dht_put)
{
- GNUNET_SCHEDULER_cancel (dht_task);
- dht_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_DHT_put_cancel (po->dht_put);
+ po->dht_put = NULL;
}
- if (NULL != dht_qe)
+ if (NULL != po->dht_qe)
{
- GNUNET_DATASTORE_cancel (dht_qe);
- dht_qe = NULL;
+ GNUNET_DATASTORE_cancel (po->dht_qe);
+ po->dht_qe = NULL;
}
+ i++;
+ }
}
/* end of gnunet-service-fs_put.c */