/*
This file is part of GNUnet.
- (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
+ (C) 2009-2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "gnunet-service-fs_indexing.h"
#include "gnunet-service-fs_pe.h"
#include "gnunet-service-fs_pr.h"
+#include "gnunet-service-fs_mesh.h"
+
+
+/**
+ * Desired replication level for GETs.
+ */
+#define DHT_GET_REPLICATION 5
/**
* Maximum size of the datastore queue for P2P operations. Needs to
*/
#define INSANE_STATISTICS GNUNET_NO
+/**
+ * If obtaining a block via mesh fails, how often do we retry it before
+ * giving up for good (and sticking to non-anonymous transfer)?
+ */
+#define MESH_RETRY_MAX 3
+
+
/**
* An active request.
*/
*/
struct GNUNET_DHT_GetHandle *gh;
+ /**
+ * Mesh request handle for this request (or NULL for none).
+ */
+ struct GSF_MeshRequest *mesh_request;
+
/**
* Function to call upon completion of the local get
* request, or NULL for none.
*/
uint64_t first_uid;
+ /**
+ * How often have we retried this request via 'mesh'?
+ * (used to bound overall retries).
+ */
+ unsigned int mesh_retry_count;
+
/**
* Number of valid entries in the 'replies_seen' array.
*/
static int active_to_migration;
-/**
- * Size of the datastore queue we assume for common requests.
- * Determined based on the network quota.
- */
-static unsigned int datastore_queue_size;
-
/**
* Heap with the request that will expire next at the top. Contains
* pointers of type "struct PendingRequest*"; these will *also* be
* @param options request options
* @param type type of the block that is being requested
* @param query key for the lookup
- * @param namespace namespace to lookup, NULL for no namespace
* @param target preferred target for the request, NULL for none
* @param bf_data raw data for bloom filter for known replies, can be NULL
* @param bf_size number of bytes in bf_data
struct GSF_PendingRequest *
GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
enum GNUNET_BLOCK_Type type,
- const struct GNUNET_HashCode * query,
- const struct GNUNET_HashCode * namespace,
+ const struct GNUNET_HashCode *query,
const struct GNUNET_PeerIdentity *target,
const char *bf_data, size_t bf_size,
uint32_t mingle, uint32_t anonymity_level,
uint32_t priority, int32_t ttl,
GNUNET_PEER_Id sender_pid,
GNUNET_PEER_Id origin_pid,
- const struct GNUNET_HashCode * replies_seen,
+ const struct GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count,
GSF_PendingRequestReplyHandler rh, void *rh_cls)
{
GNUNET_NO);
#endif
extra = 0;
- if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
- extra += sizeof (struct GNUNET_HashCode);
if (NULL != target)
extra += sizeof (struct GNUNET_PeerIdentity);
pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
pr->public_data.query = *query;
eptr = (struct GNUNET_HashCode *) &pr[1];
- if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
- {
- GNUNET_assert (NULL != namespace);
- pr->public_data.namespace = eptr;
- memcpy (eptr, namespace, sizeof (struct GNUNET_HashCode));
- eptr++;
- }
if (NULL != target)
{
pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
{
pr->hnode =
GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr,
- pr->public_data.ttl.abs_value);
+ pr->public_data.ttl.abs_value_us);
/* make sure we don't track too many requests */
while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
max_pending_requests)
if ((pra->public_data.type != prb->public_data.type) ||
(0 !=
memcmp (&pra->public_data.query, &prb->public_data.query,
- sizeof (struct GNUNET_HashCode))) ||
- ((pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
- (0 !=
- memcmp (pra->public_data.namespace,
- prb->public_data.namespace,
- sizeof (struct GNUNET_HashCode)))))
+ sizeof (struct GNUNET_HashCode))))
return GNUNET_NO;
return GNUNET_OK;
}
bm |= GET_MESSAGE_BIT_RETURN_TO;
k++;
}
- if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
- {
- bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
- k++;
- }
if (NULL != pr->public_data.target)
{
bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
pr->public_data.respect_offered += prio;
gm->priority = htonl (prio);
now = GNUNET_TIME_absolute_get ();
- ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
- gm->ttl = htonl (ttl / 1000);
+ ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
+ gm->ttl = htonl (ttl / 1000LL / 1000LL);
gm->filter_mutator = htonl (pr->mingle);
gm->hash_bitmap = htonl (bm);
gm->query = pr->public_data.query;
if (!do_route)
GNUNET_PEER_resolve (pr->sender_pid,
(struct GNUNET_PeerIdentity *) &ext[k++]);
- if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
- memcpy (&ext[k++], pr->public_data.namespace, sizeof (struct GNUNET_HashCode));
if (NULL != pr->public_data.target)
memcpy (&ext[k++],
pr->public_data.target,
GNUNET_DHT_get_stop (pr->gh);
pr->gh = NULL;
}
+ if (NULL != pr->mesh_request)
+ {
+ GSF_mesh_query_cancel (pr->mesh_request);
+ pr->mesh_request = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
{
GNUNET_SCHEDULER_cancel (pr->warn_task);
GNUNET_DHT_get_stop (pr->gh);
pr->gh = NULL;
}
+ if (NULL != pr->mesh_request)
+ {
+ GSF_mesh_query_cancel (pr->mesh_request);
+ pr->mesh_request = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
{
GNUNET_SCHEDULER_cancel (pr->warn_task);
GNUNET_NO);
prq->eval =
GNUNET_BLOCK_evaluate (GSF_block_ctx, prq->type, key, &pr->bf, pr->mingle,
- pr->public_data.namespace,
- (prq->type ==
- GNUNET_BLOCK_TYPE_FS_SBLOCK) ?
- sizeof (struct GNUNET_HashCode) : 0, prq->data,
+ NULL, 0, prq->data,
prq->size);
switch (prq->eval)
{
update_request_performance_data (prq, pr);
GNUNET_LOAD_update (GSF_rt_entry_lifetime,
GNUNET_TIME_absolute_get_duration (pr->
- public_data.start_time).rel_value);
+ public_data.start_time).rel_value_us);
if (GNUNET_YES !=
GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
prq->sender,
&last_transmission))
- last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
+ last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
/* pass on to other peers / local clients */
pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
last_transmission, prq->type, prq->data, prq->size);
if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
prq->sender,
&last_transmission))
- last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
+ last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
pr->rh (pr->rh_cls, prq->eval, pr,
prq->anonymity_level, prq->expiration,
last_transmission, prq->type, prq->data, prq->size);
if (GNUNET_SYSERR != success)
{
GNUNET_LOAD_update (datastore_put_load,
- GNUNET_TIME_absolute_get_duration (pmc->start).rel_value);
+ GNUNET_TIME_absolute_get_duration (pmc->start).rel_value_us);
}
else
{
/* on queue failure / timeout, increase the put load dramatically */
GNUNET_LOAD_update (datastore_put_load,
- GNUNET_TIME_UNIT_MINUTES.rel_value);
+ GNUNET_TIME_UNIT_MINUTES.rel_value_us);
}
}
cp = GSF_peer_get_ (&pmc->origin);
if (NULL != cp)
{
ppd = GSF_get_peer_performance_data_ (cp);
- ppd->migration_delay.rel_value /= 2;
+ ppd->migration_delay.rel_value_us /= 2;
}
GNUNET_free (pmc);
return;
(NULL != cp) )
{
ppd = GSF_get_peer_performance_data_ (cp);
- if (min_expiration.abs_value > 0)
+ if (min_expiration.abs_value_us > 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking to stop migration for %s because datastore is full\n",
ppd->migration_delay);
ppd->migration_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS,
ppd->migration_delay);
- mig_pause.rel_value = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
- ppd->migration_delay.rel_value);
+ mig_pause.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ ppd->migration_delay.rel_value_us);
ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Replicated content already exists locally, asking to stop migration for %s\n",
}
xquery = NULL;
xquery_size = 0;
- if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
- {
- xquery = buf;
- memcpy (buf, pr->public_data.namespace, sizeof (struct GNUNET_HashCode));
- xquery_size = sizeof (struct GNUNET_HashCode);
- }
if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
{
GNUNET_assert (0 != pr->sender_pid);
pr->gh =
GNUNET_DHT_get_start (GSF_dht,
pr->public_data.type, &pr->public_data.query,
- 5 /* DEFAULT_GET_REPLICATION */ ,
+ DHT_GET_REPLICATION,
GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
xquery, xquery_size, &handle_dht_reply, pr);
if ( (NULL != pr->gh) &&
}
+/**
+ * Function called with a reply from the mesh.
+ *
+ * @param cls the pending request struct
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+static void
+mesh_reply_proc (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute expiration,
+ size_t data_size,
+ const void *data)
+{
+ struct GSF_PendingRequest *pr = cls;
+ struct ProcessReplyClosure prq;
+ struct GNUNET_HashCode query;
+
+ pr->mesh_request = NULL;
+ if (GNUNET_BLOCK_TYPE_ANY == type)
+ {
+ GNUNET_break (NULL == data);
+ GNUNET_break (0 == data_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Error retrieiving block via mesh\n");
+ pr->mesh_retry_count++;
+ if (pr->mesh_retry_count >= MESH_RETRY_MAX)
+ return; /* give up on mesh */
+ /* retry -- without delay, as this is non-anonymous
+ and mesh/mesh connect will take some time anyway */
+ pr->mesh_request = GSF_mesh_query (pr->public_data.target,
+ &pr->public_data.query,
+ pr->public_data.type,
+ &mesh_reply_proc,
+ pr);
+ return;
+ }
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ data, data_size, &query))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to derive key for block of type %d\n",
+ (int) type);
+ GNUNET_break_op (0);
+ return;
+ }
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Replies received from MESH"), 1,
+ GNUNET_NO);
+ memset (&prq, 0, sizeof (prq));
+ prq.data = data;
+ prq.expiration = expiration;
+ /* do not allow migrated content to live longer than 1 year */
+ prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+ prq.expiration);
+ prq.size = data_size;
+ prq.type = type;
+ process_reply (&prq, &query, pr);
+}
+
+
+/**
+ * Consider downloading via mesh (if possible)
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_mesh_lookup_ (struct GSF_PendingRequest *pr)
+{
+ if (0 != pr->public_data.anonymity_level)
+ return;
+ if (0 == pr->public_data.target)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cannot do mesh-based download, target peer not known\n");
+ return;
+ }
+ if (NULL != pr->mesh_request)
+ return;
+ pr->mesh_request = GSF_mesh_query (pr->public_data.target,
+ &pr->public_data.query,
+ pr->public_data.type,
+ &mesh_reply_proc,
+ pr);
+}
+
+
/**
* Task that issues a warning if the datastore lookup takes too long.
*
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED &
pr->public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED &
pr->public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED & pr->
public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
GSF_LocalLookupContinuation cont, void *cont_cls)
{
GNUNET_assert (NULL == pr->gh);
+ GNUNET_assert (NULL == pr->mesh_request);
GNUNET_assert (NULL == pr->llc_cont);
pr->llc_cont = cont;
pr->llc_cont_cls = cont_cls;
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED & pr->
public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
(GNUNET_CRYPTO_QUALITY_WEAK,
(unsigned int) (60000 * putl * putl)));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking to stop migration for %llu ms because of load %f and events %d/%d\n",
- (unsigned long long) block_time.rel_value,
+ "Asking to stop migration for %s because of load %f and events %d/%d\n",
+ GNUNET_STRINGS_relative_time_to_string (block_time,
+ GNUNET_YES),
putl,
active_to_migration,
(GNUNET_NO == prq.request_found));
void
GSF_pending_request_init_ ()
{
- unsigned long long dqs;
-
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
"MAX_PENDING_REQUESTS",
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
"fs", "MAX_PENDING_REQUESTS");
}
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE",
- &dqs))
- {
- GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
- "fs", "DATASTORE_QUEUE_SIZE");
- dqs = 1024;
- }
- datastore_queue_size = (unsigned int) dqs;
-
active_to_migration =
GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);