*/
#define NS_BLOCK_SIZE 100
+/**
+ * How many pending DHT operations do we allow at most?
+ */
+#define DHT_QUEUE_LIMIT 1000
+
/**
* The initial interval in milliseconds btween puts in
* a zone iteration
* Handle for the DHT PUT operation.
*/
struct GNUNET_DHT_PutHandle *ph;
+
+ /**
+ * When was this PUT initiated?
+ */
+ struct GNUNET_TIME_Absolute start_date;
};
*/
static struct DhtPutActivity *it_tail;
+/**
+ * Number of entries in the DHT queue.
+ */
+static unsigned int dht_queue_length;
+
/**
* Useful for zone update for DHT put
*/
ma);
GNUNET_free (ma);
}
+ while (NULL != (ma = it_head))
+ {
+ GNUNET_DHT_put_cancel (ma->ph);
+ GNUNET_CONTAINER_DLL_remove (it_head,
+ it_tail,
+ ma);
+ dht_queue_length--;
+ GNUNET_free (ma);
+ }
if (NULL != statistics)
{
GNUNET_STATISTICS_destroy (statistics,
if (0 != ns_iteration_left)
return; /* current NAMESTORE iteration not yet done */
- if (NULL != it_head)
- return; /* waiting on DHT */
delay = GNUNET_TIME_relative_subtract (next_put_interval,
sub_delta);
/* We delay *once* per #NS_BLOCK_SIZE, so we need to multiply the
sub_delta = next_put_interval;
}
}
+ GNUNET_STATISTICS_set (statistics,
+ "# size of the DHT queue",
+ dht_queue_length,
+ GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"% speed increase needed for target velocity",
pct,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PUT complete (%s)\n",
(GNUNET_OK == success) ? "success" : "failure");
+ dht_queue_length--;
GNUNET_CONTAINER_DLL_remove (it_head,
it_tail,
ma);
if (0 == put_cnt % DELTA_INTERVAL)
update_velocity ();
}
- check_zone_dht_next ();
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting DHT PUT\n");
ma = GNUNET_new (struct DhtPutActivity);
+ ma->start_date = GNUNET_TIME_absolute_get ();
ma->ph = perform_dht_put (key,
label,
rd_public,
check_zone_dht_next ();
return;
}
- GNUNET_CONTAINER_DLL_insert (it_head,
- it_tail,
- ma);
+ dht_queue_length++;
+ GNUNET_CONTAINER_DLL_insert_tail (it_head,
+ it_tail,
+ ma);
+ if (dht_queue_length > DHT_QUEUE_LIMIT)
+ {
+ ma = it_head;
+ GNUNET_CONTAINER_DLL_remove (it_head,
+ it_tail,
+ ma);
+ GNUNET_DHT_put_cancel (ma->ph);
+ dht_queue_length--;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "DHT PUT unconfirmed after %s, aborting PUT\n",
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (ma->start_date),
+ GNUNET_YES));
+ GNUNET_free (ma);
+ }
}
NULL,
&zone_iteration_finished,
NULL);
+ GNUNET_assert (NULL != namestore_iter);
}
if (0 == rd_public_count)
return; /* nothing to do */
ma = GNUNET_new (struct DhtPutActivity);
+ ma->start_date = GNUNET_TIME_absolute_get ();
ma->ph = perform_dht_put (zone,
label,
rd,