};
+/**
+ * Handle for DHT PUT activity triggered from the namestore monitor.
+ */
+struct MonitorActivity
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct MonitorActivity *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct MonitorActivity *prev;
+
+ /**
+ * Handle for the DHT PUT operation.
+ */
+ struct GNUNET_DHT_PutHandle *ph;
+};
+
+
/**
* Our handle to the DHT
*/
*/
static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter;
+/**
+ * Handle to monitor namestore changes to instant propagation.
+ */
+static struct GNUNET_NAMESTORE_ZoneMonitor *zmon;
+
/**
* Our notification context.
*/
*/
static struct ClientLookupHandle *clh_tail;
+/**
+ * Head of monitor activities; kept in a DLL.
+ */
+static struct MonitorActivity *ma_head;
+
+/**
+ * Tail of monitor activities; kept in a DLL.
+ */
+static struct MonitorActivity *ma_tail;
+
/**
* Useful for zone update for DHT put
*/
*/
static int v4_enabled;
+/**
+ * Did we finish the initial iteration over the namestore?
+ * (while we do the initial iteration, we do not generate
+ * DHT PUTs as there might be WAY too many of those).
+ * TODO: expand namestore monitor API with a way to
+ * suppress this initial iteration.
+ */
+static int sync_finished;
+
/**
* Handle to the statistics service
*/
* @param tc unused
*/
static void
-shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct ClientLookupHandle *clh;
+ struct MonitorActivity *ma;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Shutting down!\n");
GNS_interceptor_done ();
GNS_resolver_done ();
GNS_shorten_done ();
+ while (NULL != (ma = ma_head))
+ {
+ GNUNET_DHT_put_cancel (ma->ph);
+ GNUNET_CONTAINER_DLL_remove (ma_head,
+ ma_tail,
+ ma);
+ GNUNET_free (ma);
+ }
if (NULL != statistics)
{
GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
GNUNET_NAMESTORE_zone_iteration_stop (namestore_iter);
namestore_iter = NULL;
}
+ if (NULL != zmon)
+ {
+ GNUNET_NAMESTORE_zone_monitor_stop (zmon);
+ zmon = NULL;
+ }
if (NULL != namestore_handle)
{
GNUNET_NAMESTORE_disconnect (namestore_handle);
/**
* Continuation called from DHT once the PUT operation is done.
*
- * @param cls closure, NULL
+ * @param cls closure, NULL if called from regular iteration,
+ * `struct MonitorActivity` if called from #handle_monitor_event.
* @param success #GNUNET_OK on success
*/
static void
dht_put_continuation (void *cls,
int success)
{
+ struct MonitorActivity *ma = cls;
struct GNUNET_TIME_Relative next_put_interval;
- active_put = NULL;
num_public_records++;
- if ( (num_public_records > last_num_public_records) &&
- (GNUNET_NO == first_zone_iteration) )
+ if (NULL == ma)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Last record count was lower than current record count. Reducing interval.\n");
- put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
- num_public_records);
- next_put_interval = GNUNET_TIME_relative_divide (put_interval,
- LATE_ITERATION_SPEEDUP_FACTOR);
+ active_put = NULL;
+ if ( (num_public_records > last_num_public_records) &&
+ (GNUNET_NO == first_zone_iteration) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Last record count was lower than current record count. Reducing interval.\n");
+ put_interval = GNUNET_TIME_relative_divide (zone_publish_time_window,
+ num_public_records);
+ next_put_interval = GNUNET_TIME_relative_divide (put_interval,
+ LATE_ITERATION_SPEEDUP_FACTOR);
+ }
+ else
+ next_put_interval = put_interval;
+
+ GNUNET_STATISTICS_set (statistics,
+ "Current zone iteration interval (ms)",
+ next_put_interval.rel_value_us / 1000LL,
+ GNUNET_NO);
+ zone_publish_task = GNUNET_SCHEDULER_add_delayed (next_put_interval,
+ &publish_zone_dht_next,
+ NULL);
}
else
- next_put_interval = put_interval;
-
- GNUNET_STATISTICS_set (statistics,
- "Current zone iteration interval (ms)",
- next_put_interval.rel_value_us / 1000LL,
- GNUNET_NO);
- zone_publish_task = GNUNET_SCHEDULER_add_delayed (next_put_interval,
- &publish_zone_dht_next,
- NULL);
+ {
+ GNUNET_CONTAINER_DLL_remove (ma_head,
+ ma_tail,
+ ma);
+ GNUNET_free (ma);
+ }
+}
+
+
+/**
+ * Convert namestore records from the internal format to that
+ * suitable for publication (removes private records, converts
+ * to absolute expiration time).
+ *
+ * @param rd input records
+ * @param rd_count size of the @a rd and @a rd_public arrays
+ * @param rd_public where to write the converted records
+ * @return number of records written to @a rd_public
+ */
+static unsigned int
+convert_records_for_export (const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ struct GNUNET_GNSRECORD_Data *rd_public)
+{
+ struct GNUNET_TIME_Absolute now;
+ unsigned int rd_public_count;
+ unsigned int i;
+
+ rd_public_count = 0;
+ now = GNUNET_TIME_absolute_get ();
+ for (i=0;i<rd_count;i++)
+ if (0 == (rd[i].flags & (GNUNET_GNSRECORD_RF_PRIVATE |
+ GNUNET_GNSRECORD_RF_PENDING)))
+ {
+ rd_public[rd_public_count] = rd[i];
+ if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
+ {
+ /* GNUNET_GNSRECORD_block_create will convert to absolute time;
+ we just need to adjust our iteration frequency */
+ min_relative_record_time.rel_value_us =
+ GNUNET_MIN (rd_public[rd_public_count].expiration_time,
+ min_relative_record_time.rel_value_us);
+ }
+ else if (rd_public[rd_public_count].expiration_time < now.abs_value_us)
+ {
+ /* record already expired, skip it */
+ continue;
+ }
+ rd_public_count++;
+ }
+ return rd_public_count;
+}
+
+
+/**
+ * Store GNS records in the DHT.
+ *
+ * @param key key of the zone
+ * @param label label to store under
+ * @param rd_public public record data
+ * @param rd_public_count number of records in @a rd_public
+ * @param pc_arg closure argument to pass to the #dht_put_continuation
+ * @return DHT PUT handle, NULL on error
+ */
+static struct GNUNET_DHT_PutHandle *
+perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd_public,
+ unsigned int rd_public_count,
+ void *pc_arg)
+{
+ struct GNUNET_GNSRECORD_Block *block;
+ struct GNUNET_HashCode query;
+ struct GNUNET_TIME_Absolute expire;
+ size_t block_size;
+ struct GNUNET_DHT_PutHandle *ret;
+
+ expire = GNUNET_GNSRECORD_record_get_expiration_time (rd_public_count,
+ rd_public);
+ block = GNUNET_GNSRECORD_block_create (key,
+ expire,
+ label,
+ rd_public,
+ rd_public_count);
+ block_size = ntohl (block->purpose.size)
+ + sizeof (struct GNUNET_CRYPTO_EcdsaSignature)
+ + sizeof (struct GNUNET_CRYPTO_EcdsaPublicKey);
+ GNUNET_GNSRECORD_query_from_private_key (key,
+ label,
+ &query);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing record in DHT with expiration `%s'\n",
+ GNUNET_STRINGS_absolute_time_to_string (expire));
+ ret = GNUNET_DHT_put (dht_handle, &query,
+ DHT_GNS_REPLICATION_LEVEL,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
+ block_size,
+ block,
+ expire,
+ DHT_OPERATION_TIMEOUT,
+ &dht_put_continuation,
+ pc_arg);
+ GNUNET_free (block);
+ return ret;
}
unsigned int rd_count,
const struct GNUNET_GNSRECORD_Data *rd)
{
- struct GNUNET_GNSRECORD_Block *block;
- struct GNUNET_HashCode query;
- struct GNUNET_TIME_Absolute expire;
- struct GNUNET_TIME_Absolute now;
- size_t block_size;
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
unsigned int rd_public_count;
- unsigned int i;
if (NULL == name)
{
return;
}
- /* filter out records that are not public, and convert to
- absolute expiration time. */
- rd_public_count = 0;
- now = GNUNET_TIME_absolute_get ();
- for (i=0;i<rd_count;i++)
- if (0 == (rd[i].flags & (GNUNET_GNSRECORD_RF_PRIVATE |
- GNUNET_GNSRECORD_RF_PENDING)))
- {
- rd_public[rd_public_count] = rd[i];
- if (0 != (rd[i].flags & GNUNET_GNSRECORD_RF_RELATIVE_EXPIRATION))
- {
- /* GNUNET_GNSRECORD_block_create will convert to absolute time;
- we just need to adjust our iteration frequency */
- min_relative_record_time.rel_value_us =
- GNUNET_MIN (rd_public[rd_public_count].expiration_time,
- min_relative_record_time.rel_value_us);
- }
- else if (rd_public[rd_public_count].expiration_time < now.abs_value_us)
- {
- /* record already expired, skip it */
- continue;
- }
- rd_public_count++;
- }
+ rd_public_count = convert_records_for_export (rd,
+ rd_count,
+ rd_public);
/* We got a set of records to publish */
if (0 == rd_public_count)
NULL);
return;
}
- expire = GNUNET_GNSRECORD_record_get_expiration_time (rd_public_count,
- rd_public);
- block = GNUNET_GNSRECORD_block_create (key,
- expire,
- name,
- rd_public,
- rd_public_count);
- block_size = ntohl (block->purpose.size)
- + sizeof (struct GNUNET_CRYPTO_EcdsaSignature)
- + sizeof (struct GNUNET_CRYPTO_EcdsaPublicKey);
- GNUNET_GNSRECORD_query_from_private_key (key,
- name,
- &query);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing record in DHT with expiration `%s'\n",
- GNUNET_STRINGS_absolute_time_to_string (expire));
- active_put = GNUNET_DHT_put (dht_handle, &query,
- DHT_GNS_REPLICATION_LEVEL,
- GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
- GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
- block_size,
- block,
- expire,
- DHT_OPERATION_TIMEOUT,
- &dht_put_continuation,
- NULL);
+
+ active_put = perform_dht_put (key,
+ name,
+ rd_public,
+ rd_public_count,
+ NULL);
if (NULL == active_put)
{
GNUNET_break (0);
dht_put_continuation (NULL, GNUNET_NO);
}
- GNUNET_free (block);
}
}
+/**
+ * Process a record that was stored in the namestore
+ * (invoked by the monitor).
+ *
+ * @param cls closure, NULL
+ * @param zone private key of the zone; NULL on disconnect
+ * @param label label of the records; NULL on disconnect
+ * @param rd_count number of entries in @a rd array, 0 if label was deleted
+ * @param rd array of records with data to store
+ */
+static void
+handle_monitor_event (void *cls,
+ const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone,
+ const char *label,
+ unsigned int rd_count,
+ const struct GNUNET_GNSRECORD_Data *rd)
+{
+ struct GNUNET_GNSRECORD_Data rd_public[rd_count];
+ unsigned int rd_public_count;
+ struct MonitorActivity *ma;
+
+ if (GNUNET_YES != sync_finished)
+ return; /* do not do DHT PUTs on initial sync, as that may
+ create far too many PUTs on startup */
+ /* filter out records that are not public, and convert to
+ absolute expiration time. */
+ rd_public_count = convert_records_for_export (rd, rd_count,
+ rd_public);
+ if (0 == rd_public_count)
+ return; /* nothing to do */
+ ma = GNUNET_new (struct MonitorActivity);
+ ma->ph = perform_dht_put (zone, label,
+ rd, rd_count,
+ ma);
+ if (NULL == ma->ph)
+ {
+ /* PUT failed, do not remember operation */
+ GNUNET_free (ma);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_insert (ma_head,
+ ma_tail,
+ ma);
+}
+
+
/* END DHT ZONE PROPAGATION */
}
+/**
+ * The zone monitor is now in SYNC with the current state of the
+ * name store. Start to perform periodic iterations.
+ *
+ * @param cls NULL
+ */
+static void
+monitor_sync_event (void *cls)
+{
+ sync_finished = GNUNET_YES;
+ zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
+ NULL);
+}
+
+
/**
* Process GNS requests.
*
{
if (GNUNET_OK !=
GNUNET_CRYPTO_ecdsa_public_key_from_string (dns_root_name,
- strlen (dns_root_name),
- &dns_root))
+ strlen (dns_root_name),
+ &dns_root))
{
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
"gns", "DNS_ROOT",
GNUNET_SERVER_add_handlers (server, handlers);
statistics = GNUNET_STATISTICS_create ("gns", c);
nc = GNUNET_SERVER_notification_context_create (server, 1);
- zone_publish_task = GNUNET_SCHEDULER_add_now (&publish_zone_dht_start,
- NULL);
+ zmon = GNUNET_NAMESTORE_zone_monitor_start (c,
+ NULL,
+ &handle_monitor_event,
+ &monitor_sync_event,
+ NULL);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task, NULL);
}