/*
This file is part of GNUnet.
- (C) 2011 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2011 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
*/
#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+/**
+ * How many replicas do we try to create per PUT?
+ */
+#define DEFAULT_PUT_REPLICATION 5
+
/**
* Context for each zero-anonymity iterator.
*/
enum GNUNET_BLOCK_Type dht_put_type;
+ /**
+ * Handle to PUT operation.
+ */
+ struct GNUNET_DHT_PutHandle *dht_put;
+
/**
* ID of task that collects blocks for DHT PUTs.
*/
- GNUNET_SCHEDULER_TaskIdentifier dht_task;
+ struct GNUNET_SCHEDULER_Task * dht_task;
/**
* How many entires with zero anonymity of our type do we currently
uint64_t zero_anonymity_count_estimate;
/**
- * Current offset when iterating the database.
+ * Count of results received from the database.
*/
- uint64_t current_offset;
+ uint64_t result_count;
+
+ /**
+ * Next UID to request when iterating the database.
+ */
+ uint64_t next_uid;
};
* of block that we're putting into the DHT).
*/
static struct PutOperator operators[] = {
- {NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0},
- {NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0},
- {NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0},
+ {NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0},
{NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0}
};
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
* @param tc scheduler context (unused)
*/
-static void gather_dht_put_blocks (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext
- *tc);
+static void
+gather_dht_put_blocks (void *cls);
/**
- * Task that is run periodically to obtain blocks for DHT PUTs.
- *
- * @param cls type of blocks to gather
- * @param tc scheduler context (unused)
+ * Calculate when to run the next PUT operation and schedule it.
+ *
+ * @param po put operator to schedule
*/
static void
-delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+schedule_next_put (struct PutOperator *po)
{
- struct PutOperator *po = cls;
struct GNUNET_TIME_Relative delay;
- po->dht_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
if (po->zero_anonymity_count_estimate > 0)
{
delay =
}
+/**
+ * Continuation called after DHT PUT operation has finished.
+ *
+ * @param cls type of blocks to gather
+ * @param success GNUNET_OK if the PUT was transmitted,
+ * GNUNET_NO on timeout,
+ * GNUNET_SYSERR on disconnect from service
+ * after the PUT message was transmitted
+ * (so we don't know if it was received or not)
+ */
+static void
+delay_dht_put_blocks (void *cls, int success)
+{
+ struct PutOperator *po = cls;
+
+ po->dht_put = NULL;
+ schedule_next_put (po);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ */
+static void
+delay_dht_put_task (void *cls)
+{
+ struct PutOperator *po = cls;
+
+ po->dht_task = NULL;
+ schedule_next_put (po);
+}
+
+
/**
* Store content in DHT.
*
* @param type type of the content
* @param priority priority of the content
* @param anonymity anonymity-level for the content
+ * @param replication replication-level for the content
* @param expiration expiration time for the content
* @param uid unique identifier for the datum;
* maybe 0 if no unique identifier is available
*/
static void
-process_dht_put_content (void *cls, const GNUNET_HashCode * key, size_t size,
- const void *data, enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
- struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+process_dht_put_content (void *cls,
+ const struct GNUNET_HashCode * key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ uint32_t replication,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
{
struct PutOperator *po = cls;
po->dht_qe = NULL;
if (key == NULL)
{
- po->zero_anonymity_count_estimate = po->current_offset - 1;
- po->current_offset = 0;
- po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po);
+ po->zero_anonymity_count_estimate = po->result_count;
+ po->result_count = 0;
+ po->next_uid = 0;
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
return;
}
+ po->result_count++;
+ po->next_uid = uid + 1;
po->zero_anonymity_count_estimate =
- GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
-#if DEBUG_FS
+ GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
type);
-#endif
- GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION, GNUNET_DHT_RO_NONE,
- type, size, data, expiration, GNUNET_TIME_UNIT_FOREVER_REL,
- &delay_dht_put_blocks, po);
+ po->dht_put = GNUNET_DHT_put (GSF_dht,
+ key,
+ DEFAULT_PUT_REPLICATION,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ type,
+ size,
+ data,
+ expiration,
+ &delay_dht_put_blocks,
+ po);
}
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
- * @param tc scheduler context (unused)
*/
static void
-gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+gather_dht_put_blocks (void *cls)
{
struct PutOperator *po = cls;
- po->dht_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
+ po->dht_task = NULL;
po->dht_qe =
- GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
+ GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
+ po->next_uid,
+ 0,
UINT_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
po->dht_put_type,
- &process_dht_put_content, po);
+ &process_dht_put_content,
+ po);
if (NULL == po->dht_qe)
- po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po);
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
}
i = 0;
while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
{
- if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
+ if (NULL != po->dht_task)
{
GNUNET_SCHEDULER_cancel (po->dht_task);
- po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+ po->dht_task = NULL;
+ }
+ if (NULL != po->dht_put)
+ {
+ GNUNET_DHT_put_cancel (po->dht_put);
+ po->dht_put = NULL;
}
if (NULL != po->dht_qe)
{