struct GNUNET_MessageHeader header;
/**
- * Desired content type.
+ * Desired content type. (actually an enum GNUNET_BLOCK_Type)
*/
uint32_t type GNUNET_PACKED;
};
+/**
+ * Message to the datastore service asking about zero
+ * anonymity content.
+ */
+struct GetZeroAnonymityMessage
+{
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Desired content type (actually an enum GNUNET_BLOCK_Type)
+ */
+ uint32_t type GNUNET_PACKED;
+
+};
+
+
/**
* Message to the datastore service requesting an update
* to the priority or expiration for some content.
uint32_t size GNUNET_PACKED;
/**
- * Type of the item (NBO), zero for remove.
+ * Type of the item (NBO), zero for remove, (actually an enum GNUNET_BLOCK_Type)
*/
uint32_t type GNUNET_PACKED;
}
+/**
+ * Get a zero-anonymity value from the datastore.
+ *
+ * @param h handle to the datastore
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
+ * @param type allowed type for the operation
+ * @param iter function to call on a random value; it
+ * will be called once with a value (if available)
+ * and always once with a value of NULL.
+ * @param iter_cls closure for iter
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
+ */
+struct GNUNET_DATASTORE_QueueEntry *
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ enum GNUNET_BLOCK_Type type,
+ GNUNET_DATASTORE_Iterator iter,
+ void *iter_cls)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct GetZeroAnonymityMessage *m;
+ union QueueContext qc;
+
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asked to get zero-anonymity entry in %llu ms\n",
+ (unsigned long long) timeout.value);
+#endif
+ qc.rc.iter = iter;
+ qc.rc.iter_cls = iter_cls;
+ qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
+ queue_priority, max_queue_size, timeout,
+ &process_result_message, &qc);
+ if (qe == NULL)
+ return NULL;
+ m = (struct GetZeroAnonymityMessage*) &qe[1];
+ m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
+ m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
+ m->type = htonl ((uint32_t) type);
+ process_queue (h);
+ return qe;
+}
+
+
/**
* Iterate over the results for a particular key
GNUNET_NO);
GNUNET_SERVER_client_keep (client);
plugin->api->iter_migration_order (plugin->api->cls,
- 0,
+ GNUNET_BLOCK_TYPE_ANY,
&transmit_item,
client);
}
+/**
+ * Handle GET_ZERO_ANONYMITY-message.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ */
+static void
+handle_get_zero_anonymity (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
+ enum GNUNET_BLOCK_Type type;
+
+ type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Processing `%s' request\n",
+ "GET_ZERO_ANONYMITY");
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# GET ZERO ANONYMITY requests received"),
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVER_client_keep (client);
+ plugin->api->iter_zero_anonymity (plugin->api->cls,
+ type,
+ &transmit_item,
+ client);
+}
+
/**
* Context for the 'remove_callback'.
plugin->api->get (plugin->api->cls,
&dm->key,
&vhash,
- ntohl(dm->type),
+ (enum GNUNET_BLOCK_Type) ntohl(dm->type),
&remove_callback,
rc);
}
{&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 },
{&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM,
sizeof(struct GNUNET_MessageHeader) },
+ {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
+ sizeof(struct GetZeroAnonymityMessage) },
{&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 },
{&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
sizeof(struct GNUNET_MessageHeader) },
*/
#define DHT_FORWARD_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
-/**
- * FIXME: document.
- */
-#define DEFAULT_DHT_REPUBLISH_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 60)
-
/**
* FIXME: document.
*/
malicious_put_frequency = DEFAULT_MALICIOUS_PUT_FREQUENCY;
}
- dht_republish_frequency = DEFAULT_DHT_REPUBLISH_FREQUENCY;
+ dht_republish_frequency = GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY;
if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number(cfg, "DHT", "REPLICATION_FREQUENCY", &temp_config_num))
{
dht_republish_frequency = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, temp_config_num);
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
*/
#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
+/**
+ * How often do we at most PUT content into the DHT?
+ */
+#define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
/**
* Inverse of the probability that we will submit the same query
* to the same peer again. If the same peer already got the query
*/
static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+/**
+ * Request to datastore for DHT PUTs (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+/**
+ * Type we will request for the next DHT PUT round from the datastore.
+ */
+static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+
/**
* Where do we store trust information?
*/
*/
static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+/**
+ * ID of task that collects blocks for DHT PUTs.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier dht_task;
+
/**
* What is the maximum frequency at which we are allowed to
* poll the datastore for migration content?
*/
static int active_migration;
+/**
+ * How many entires with zero anonymity do we currently estimate
+ * to have in the database?
+ */
+static unsigned int zero_anonymity_count_estimate;
+
/**
* Typical priorities we're seeing from other peers right now. Since
* most priorities will be zero, this value is the weighted average of
const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
* If the migration task is not currently running, consider
* (re)scheduling it with the appropriate delay.
}
+/**
+ * If the DHT PUT gathering task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_dht_put_gathering (void *cls)
+{
+ struct GNUNET_TIME_Relative delay;
+
+ if (dsh == NULL)
+ return;
+ if (dht_qe != NULL)
+ return;
+ if (dht_task != GNUNET_SCHEDULER_NO_TASK)
+ return;
+ if (zero_anonymity_count_estimate > 0)
+ {
+ delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+ zero_anonymity_count_estimate);
+ delay = GNUNET_TIME_relative_min (delay,
+ MAX_DHT_PUT_FREQ);
+ }
+ else
+ {
+ /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+ (hopefully) appear */
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+ }
+ dht_task = GNUNET_SCHEDULER_add_delayed (sched,
+ delay,
+ &gather_dht_put_blocks,
+ cls);
+}
+
+
/**
* Process content offered for migration.
*
}
+/**
+ * Function called upon completion of the DHT PUT operation.
+ */
+static void
+dht_put_continuation (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Store content in DHT.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_dht_put_content (void *cls,
+ const GNUNET_HashCode * key,
+ size_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ static unsigned int counter;
+ static GNUNET_HashCode last_vhash;
+ static GNUNET_HashCode vhash;
+
+ if (key == NULL)
+ {
+ dht_qe = NULL;
+ consider_dht_put_gathering (cls);
+ return;
+ }
+ /* slightly funky code to estimate the total number of values with zero
+ anonymity from the maximum observed length of a monotonically increasing
+ sequence of hashes over the contents */
+ GNUNET_CRYPTO_hash (data, size, &vhash);
+ if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
+ {
+ if (zero_anonymity_count_estimate > 0)
+ zero_anonymity_count_estimate /= 2;
+ counter = 0;
+ }
+ last_vhash = vhash;
+ if (counter < 31)
+ counter++;
+ if (zero_anonymity_count_estimate < (1 << counter))
+ zero_anonymity_count_estimate = (1 << counter);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Retrieved block `%s' of type %u for DHT PUT\n",
+ GNUNET_h2s (key),
+ type);
+#endif
+ GNUNET_DHT_put (dht_handle,
+ key,
+ GNUNET_DHT_RO_NONE,
+ type,
+ size,
+ data,
+ expiration,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &dht_put_continuation,
+ cls);
+}
+
+
/**
* Task that is run periodically to obtain blocks for content
* migration
mig_task = GNUNET_SCHEDULER_NO_TASK;
if (dsh != NULL)
{
- mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+ mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_migration_content, NULL);
GNUNET_assert (mig_qe != NULL);
}
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ *
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ dht_task = GNUNET_SCHEDULER_NO_TASK;
+ if (dsh != NULL)
+ {
+ if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+ dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+ dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ dht_put_type++,
+ &process_dht_put_content, NULL);
+ GNUNET_assert (dht_qe != NULL);
+ }
+}
+
+
/**
* We're done with a particular message list entry.
* Free all associated resources.
GNUNET_DATASTORE_cancel (mig_qe);
mig_qe = NULL;
}
+ if (dht_qe != NULL)
+ {
+ GNUNET_DATASTORE_cancel (dht_qe);
+ dht_qe = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != mig_task)
{
GNUNET_SCHEDULER_cancel (sched, mig_task);
mig_task = GNUNET_SCHEDULER_NO_TASK;
}
+ if (GNUNET_SCHEDULER_NO_TASK != dht_task)
+ {
+ GNUNET_SCHEDULER_cancel (sched, dht_task);
+ dht_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (client_list != NULL)
handle_client_disconnect (NULL,
client_list->client);
_("Content migration is enabled, will start to gather data\n"));
consider_migration_gathering ();
}
+ consider_dht_put_gathering (NULL);
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[TESTING]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
[testing]
WEAKRANDOM = YES
+
+
+[dhtcache]
+QUOTA=65536
+
+DATABASE=sqlite
GNUNET_BLOCK_TYPE_FS_IBLOCK = 2,
/**
- * Type of a block representing a keyword search result.
+ * Type of a block representing a keyword search result. Note that
+ * the values for KBLOCK, SBLOCK and NBLOCK must be consecutive.
*/
GNUNET_BLOCK_TYPE_FS_KBLOCK = 3,
GNUNET_BLOCK_TYPE_FS_SBLOCK = 4,
/**
- * Type of a block representing a block to be encoded on demand from disk.
- * Should never appear on the network directly.
+ * Type of a block that is used to advertise a namespace.
*/
- GNUNET_BLOCK_TYPE_FS_ONDEMAND = 5,
+ GNUNET_BLOCK_TYPE_FS_NBLOCK = 5,
/**
- * Type of a block that is used to advertise a namespace.
+ * Type of a block representing a block to be encoded on demand from disk.
+ * Should never appear on the network directly.
*/
- GNUNET_BLOCK_TYPE_FS_NBLOCK = 6,
+ GNUNET_BLOCK_TYPE_FS_ONDEMAND = 6,
/**
* Type of a block that contains a HELLO for a peer (for
void *iter_cls);
+/**
+ * Get a zero-anonymity value from the datastore.
+ *
+ * @param h handle to the datastore
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
+ * @param type allowed type for the operation
+ * @param iter function to call on a random value; it
+ * will be called once with a value (if available)
+ * and always once with a value of NULL.
+ * @param iter_cls closure for iter
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel; note that even if NULL is returned, the callback will be invoked
+ * (or rather, will already have been invoked)
+ */
+struct GNUNET_DATASTORE_QueueEntry *
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ enum GNUNET_BLOCK_Type type,
+ GNUNET_DATASTORE_Iterator iter,
+ void *iter_cls);
+
+
/**
* Cancel a datastore operation. The final callback from the
* operation must not have been done yet.
#endif
+/**
+ * FIXME: document.
+ */
+#define GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 60)
+
/**
* K-value that must be used for the bloom filter 'GET'
* queries.
*/
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM 98
+/**
+ * Message sent by datastore client to get random data.
+ */
+#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY 99
+
/**
* Message sent by datastore to client providing requested data
* (in response to GET or GET_RANDOM request).
*/
-#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA 99
+#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA 100
/**
* Message sent by datastore to client signaling end of matching data.
* This message will also be sent for "GET_RANDOM", even though
* "GET_RANDOM" returns at most one data item.
*/
-#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END 100
+#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END 101
/**
* Message sent by datastore client to remove data.
*/
-#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE 101
+#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE 102
/**
* Message sent by datastore client to drop the database.
*/
-#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP 102
+#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP 103
/**