*/
#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+/**
+ * How many replicas do we try to create per PUT?
+ */
+#define DEFAULT_PUT_REPLICATION 5
+
/**
* Context for each zero-anonymity iterator.
*/
enum GNUNET_BLOCK_Type dht_put_type;
+ /**
+ * Handle to PUT operation.
+ */
+ struct GNUNET_DHT_PutHandle *dht_put;
+
/**
* ID of task that collects blocks for DHT PUTs.
*/
/**
- * Task that is run periodically to obtain blocks for DHT PUTs.
+ * Calculate when to run the next PUT operation and schedule it.
*
- * @param cls type of blocks to gather
- * @param tc scheduler context (unused)
+ * @param po put operator to schedule
*/
static void
-delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+schedule_next_put (struct PutOperator *po)
{
- struct PutOperator *po = cls;
struct GNUNET_TIME_Relative delay;
- po->dht_task = GNUNET_SCHEDULER_NO_TASK;
- if (tc != NULL && 0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
if (po->zero_anonymity_count_estimate > 0)
{
delay =
}
+/**
+ * Continuation called after DHT PUT operation has finished.
+ *
+ * @param cls type of blocks to gather
+ * @param success GNUNET_OK if the PUT was transmitted,
+ * GNUNET_NO on timeout,
+ * GNUNET_SYSERR on disconnect from service
+ * after the PUT message was transmitted
+ * (so we don't know if it was received or not)
+ */
+static void
+delay_dht_put_blocks (void *cls, int success)
+{
+ struct PutOperator *po = cls;
+
+ po->dht_put = NULL;
+ schedule_next_put (po);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context
+ */
+static void
+delay_dht_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PutOperator *po = cls;
+
+ po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+ schedule_next_put (po);
+}
+
+
/**
* Store content in DHT.
*
* maybe 0 if no unique identifier is available
*/
static void
-process_dht_put_content (void *cls, const GNUNET_HashCode * key, size_t size,
+process_dht_put_content (void *cls, const struct GNUNET_HashCode * key, size_t size,
const void *data, enum GNUNET_BLOCK_Type type,
uint32_t priority, uint32_t anonymity,
struct GNUNET_TIME_Absolute expiration, uint64_t uid)
{
po->zero_anonymity_count_estimate = po->current_offset - 1;
po->current_offset = 0;
- po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po);
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
return;
}
po->zero_anonymity_count_estimate =
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
type);
- GNUNET_DHT_put (GSF_dht, key, 5 /* DEFAULT_PUT_REPLICATION */ ,
- GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data,
- expiration, GNUNET_TIME_UNIT_FOREVER_REL,
- &delay_dht_put_blocks, po);
+ po->dht_put = GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data,
+ expiration, GNUNET_TIME_UNIT_FOREVER_REL,
+ &delay_dht_put_blocks, po);
}
po->dht_put_type,
&process_dht_put_content, po);
if (NULL == po->dht_qe)
- po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po);
+ po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
}
GNUNET_SCHEDULER_cancel (po->dht_task);
po->dht_task = GNUNET_SCHEDULER_NO_TASK;
}
+ if (NULL != po->dht_put)
+ {
+ GNUNET_DHT_put_cancel (po->dht_put);
+ po->dht_put = NULL;
+ }
if (NULL != po->dht_qe)
{
GNUNET_DATASTORE_cancel (po->dht_qe);