*/
#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+/**
+ * How many replicas do we try to create per PUT?
+ */
+#define DEFAULT_PUT_REPLICATION 5
+
/**
* Context for each zero-anonymity iterator.
*/
enum GNUNET_BLOCK_Type dht_put_type;
+ /**
+ * Handle to PUT operation.
+ */
+ struct GNUNET_DHT_PutHandle *dht_put;
+
/**
* ID of task that collects blocks for DHT PUTs.
*/
GNUNET_SCHEDULER_TaskIdentifier dht_task;
-
+
/**
* How many entires with zero anonymity of our type do we currently
* estimate to have in the database?
* ANY-terminated list of our operators (one per type
* of block that we're putting into the DHT).
*/
-static struct PutOperator operators[] =
- {
- { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 },
- { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 },
- { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 },
- { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 }
- };
+static struct PutOperator operators[] = {
+ {NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0},
+ {NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0},
+ {NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0},
+ {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0}
+};
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
* @param tc scheduler context (unused)
*/
static void
gather_dht_put_blocks (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Calculate when to run the next PUT operation and schedule it.
+ *
+ * @param po put operator to schedule
+ */
+static void
+schedule_next_put (struct PutOperator *po)
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (po->zero_anonymity_count_estimate > 0)
+ {
+ delay =
+ GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+ po->zero_anonymity_count_estimate);
+ delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ);
+ }
+ else
+ {
+ /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+ * (hopefully) appear */
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+ }
+ po->dht_task =
+ GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po);
+}
+
+
+/**
+ * Continuation called after DHT PUT operation has finished.
+ *
+ * @param cls type of blocks to gather
+ * @param success GNUNET_OK if the PUT was transmitted,
+ * GNUNET_NO on timeout,
+ * GNUNET_SYSERR on disconnect from service
+ * after the PUT message was transmitted
+ * (so we don't know if it was received or not)
+ */
+static void
+delay_dht_put_blocks (void *cls, int success)
+{
+ struct PutOperator *po = cls;
+
+ po->dht_put = NULL;
+ schedule_next_put (po);
+}
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
- * @param tc scheduler context (unused)
+ * @param tc scheduler context
*/
static void
-delay_dht_put_blocks (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+delay_dht_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PutOperator *po = cls;
- struct GNUNET_TIME_Relative delay;
po->dht_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
- if (po->zero_anonymity_count_estimate > 0)
- {
- delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
- po->zero_anonymity_count_estimate);
- delay = GNUNET_TIME_relative_min (delay,
- MAX_DHT_PUT_FREQ);
- }
- else
- {
- /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
- (hopefully) appear */
- delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
- }
- po->dht_task = GNUNET_SCHEDULER_add_delayed (delay,
- &gather_dht_put_blocks,
- po);
+ schedule_next_put (po);
}
* maybe 0 if no unique identifier is available
*/
static void
-process_dht_put_content (void *cls,
- const GNUNET_HashCode * key,
- size_t size,
- const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute
- expiration, uint64_t uid)
-{
+process_dht_put_content (void *cls, const struct GNUNET_HashCode * key, size_t size,
+ const void *data, enum GNUNET_BLOCK_Type type,
+ uint32_t priority, uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration, uint64_t uid)
+{
struct PutOperator *po = cls;
po->dht_qe = NULL;
if (key == NULL)
- {
- po->zero_anonymity_count_estimate = po->current_offset - 1;
- po->current_offset = 0;
- po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
- po);
- return;
- }
- po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset,
- po->zero_anonymity_count_estimate);
-#if DEBUG_FS
+ {
+ po->zero_anonymity_count_estimate = po->current_offset - 1;
+ po->current_offset = 0;
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
+ return;
+ }
+ po->zero_anonymity_count_estimate =
+ GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Retrieved block `%s' of type %u for DHT PUT\n",
- GNUNET_h2s (key),
- type);
-#endif
- GNUNET_DHT_put (GSF_dht,
- key,
- DEFAULT_PUT_REPLICATION,
- GNUNET_DHT_RO_NONE,
- type,
- size,
- data,
- expiration,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &delay_dht_put_blocks,
- po);
+ "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
+ type);
+ po->dht_put = GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data,
+ expiration, GNUNET_TIME_UNIT_FOREVER_REL,
+ &delay_dht_put_blocks, po);
}
/**
* Task that is run periodically to obtain blocks for DHT PUTs.
- *
+ *
* @param cls type of blocks to gather
* @param tc scheduler context (unused)
*/
static void
-gather_dht_put_blocks (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PutOperator *po = cls;
po->dht_task = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
- po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
- po->current_offset++,
- 0, UINT_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
- po->dht_put_type,
- &process_dht_put_content, po);
+ po->dht_qe =
+ GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
+ UINT_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ po->dht_put_type,
+ &process_dht_put_content, po);
if (NULL == po->dht_qe)
- po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
- po);
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
}
i = 0;
while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
- {
- operators[i].dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
- i++;
- }
+ {
+ operators[i].dht_task =
+ GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
+ i++;
+ }
}
i = 0;
while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+ {
+ if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
+ {
+ GNUNET_SCHEDULER_cancel (po->dht_task);
+ po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != po->dht_put)
+ {
+ GNUNET_DHT_put_cancel (po->dht_put);
+ po->dht_put = NULL;
+ }
+ if (NULL != po->dht_qe)
{
- if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
- {
- GNUNET_SCHEDULER_cancel (po->dht_task);
- po->dht_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != po->dht_qe)
- {
- GNUNET_DATASTORE_cancel (po->dht_qe);
- po->dht_qe = NULL;
- }
- i++;
+ GNUNET_DATASTORE_cancel (po->dht_qe);
+ po->dht_qe = NULL;
}
+ i++;
+ }
}
/* end of gnunet-service-fs_put.c */