*/
#define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+/**
+ * At what frequency should our datastore load decrease
+ * automatically (since if we don't use it, clearly the
+ * load must be going down).
+ */
+#define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
+
/* ****************************** globals ****************************** */
*/
static struct GNUNET_CORE_Handle *core;
+/**
+ * Datastore 'GET' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
/**
* Identity of this peer.
*/
}
+
+/**
+ * We've just now completed a datastore request. Update our
+ * datastore load calculations.
+ *
+ * @param start time when the datastore request was issued
+ */
+void
+GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start)
+{
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (start);
+ GNUNET_LOAD_update (datastore_get_load,
+ delay.rel_value);
+}
+
+
+/**
+ * Test if the DATABASE (GET) load on this peer is too high
+ * to even consider processing the query at
+ * all.
+ *
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ * GNUNET_NO to process normally (load normal)
+ * GNUNET_SYSERR to process for free (load low)
+ */
+int
+GSF_test_get_load_too_high_ (uint32_t priority)
+{
+ double ld;
+
+ ld = GNUNET_LOAD_get_load (datastore_get_load);
+ if (ld < 1)
+ return GNUNET_SYSERR;
+ if (ld <= priority)
+ return GNUNET_NO;
+ return GNUNET_YES;
+}
+
+
+
+
+
/**
* Handle P2P "PUT" message.
*
enum GNUNET_BLOCK_EvaluationResult result)
{
struct GNUNET_SERVER_Client *client = cls;
+ struct GSF_PendingRequestData *prd;
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
- return; /* we're done... */
+ return; /* we're done, 'pr' was already destroyed... */
+ prd = GSF_pending_request_get_data_ (pr);
+ if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) )
+ {
+ GSF_pending_request_cancel_ (pr);
+ return;
+ }
GSF_dht_lookup_ (pr);
consider_forwarding (NULL, pr, result);
}
GNUNET_SCHEDULER_cancel (cover_age_task);
cover_age_task = GNUNET_SCHEDULER_NO_TASK;
}
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
}
cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
&age_cover_counters,
NULL);
+ datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task,
NULL);
#include "platform.h"
#include "gnunet_load_lib.h"
#include "gnunet-service-fs_cp.h"
+#include "gnunet-service-fs_indexing.h"
#include "gnunet-service-fs_pr.h"
*/
struct GNUNET_DHT_GetHandle *gh;
+ /**
+ * Function to call upon completion of the local get
+ * request, or NULL for none.
+ */
+ GSF_LocalLookupContinuation llc_cont;
+
+ /**
+ * Closure for llc_cont.
+ */
+ void *llc_cont_cls;
+
+ /**
+ * Last result from the local datastore lookup evaluation.
+ */
+ enum GNUNET_BLOCK_EvaluationResult local_result;
+
/**
* Identity of the peer that we should use for the 'sender'
* (recipient of the response) when forwarding (0 for none).
prq->data, prq->size,
GNUNET_NO);
/* destroy request, we're done */
+ prq->finished = GNUNET_YES;
GSF_pending_request_cancel_ (pr);
return GNUNET_YES;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
pr);
}
-
/**
* We're processing (local) results for a search request
* from another peer. Pass applicable results to the
* peer and if we are done either clean up (operation
* complete) or forward to other peers (more results possible).
*
- * @param cls our closure (struct LocalGetContext)
+ * @param cls our closure (struct PendingRequest)
* @param key key for the content
* @param size number of bytes in data
* @param data content stored
struct GNUNET_TIME_Absolute expiration,
uint64_t uid)
{
-#if FIXME
- struct PendingRequest *pr = cls;
+ struct GSF_PendingRequest *pr = cls;
+ GSF_LocalLookupContinuation cont;
+
struct ProcessReplyClosure prq;
- struct CheckDuplicateRequestClosure cdrc;
GNUNET_HashCode query;
unsigned int old_rf;
if (NULL == key)
{
-#if DEBUG_FS > 1
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Done processing local replies, forwarding request to other peers.\n");
-#endif
pr->qe = NULL;
- if (pr->client_request_list != NULL)
- {
- GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
- GNUNET_YES);
- /* Figure out if this is a duplicate request and possibly
- merge 'struct PendingRequest' entries */
- cdrc.have = NULL;
- cdrc.pr = pr;
- GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
- &pr->query,
- &check_duplicate_request_client,
- &cdrc);
- if (cdrc.have != NULL)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received request for block `%s' twice from client, will only request once.\n",
- GNUNET_h2s (&pr->query));
-#endif
-
- destroy_pending_request (pr);
- return;
- }
- }
- if (pr->local_only == GNUNET_YES)
+ if (NULL != (cont = pr->llc_cont))
{
- destroy_pending_request (pr);
- return;
+ pr->llc_cont = NULL;
+ cont (pr->llc_cont_cls,
+ pr,
+ pr->local_result);
}
- /* no more results */
- if (pr->task == GNUNET_SCHEDULER_NO_TASK)
- pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
- pr);
return;
}
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found ONDEMAND block, performing on-demand encoding\n");
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# on-demand blocks matched requests"),
1,
GNUNET_NO);
anonymity, expiration, uid,
&process_local_reply,
pr))
- if (pr->qe != NULL)
{
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ if (pr->qe != NULL)
+ GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
}
return;
}
- old_rf = pr->results_found;
+ old_rf = pr->public_data.results_found;
memset (&prq, 0, sizeof (prq));
prq.data = data;
prq.expiration = expiration;
prq.size = size;
if (GNUNET_OK !=
- GNUNET_BLOCK_get_key (block_ctx,
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
type,
data,
size,
&query))
{
GNUNET_break (0);
- GNUNET_DATASTORE_remove (dsh,
+ GNUNET_DATASTORE_remove (GSF_dsh,
key,
size, data,
-1, -1,
GNUNET_TIME_UNIT_FOREVER_REL,
NULL, NULL);
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+ GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
return;
}
prq.type = type;
prq.request_found = GNUNET_NO;
prq.anonymity_level = anonymity;
if ( (old_rf == 0) &&
- (pr->results_found == 0) )
- update_datastore_delays (pr->start_time);
+ (pr->public_data.results_found == 0) )
+ GSF_update_datastore_delay_ (pr->public_data.start_time);
process_reply (&prq, key, pr);
if (prq.finished == GNUNET_YES)
return;
+ pr->local_result = prq.eval;
if (pr->qe == NULL)
return; /* done here */
if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
{
- pr->local_only = GNUNET_YES; /* do not forward */
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_NO);
return;
}
- if ( (pr->client_request_list == NULL) &&
- ( (GNUNET_YES == test_get_load_too_high (0)) ||
- (pr->results_found > 5 + 2 * pr->priority) ) )
+ if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
+ ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
+ (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) )
{
#if DEBUG_FS > 2
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Load too high, done with request\n");
#endif
- GNUNET_STATISTICS_update (stats,
+ GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# processing result set cut short due to load"),
1,
GNUNET_NO);
- GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+ GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_NO);
return;
}
- GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
-#endif
+ GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
}
GSF_LocalLookupContinuation cont,
void *cont_cls)
{
- // FIXME: fix process_local_reply / cont!
GNUNET_assert (NULL == pr->gh);
+ GNUNET_assert (NULL == pr->llc_cont);
+ pr->llc_cont = cont;
+ pr->llc_cont_cls = cont_cls;
pr->qe = GNUNET_DATASTORE_get (GSF_dsh,
&pr->public_data.query,
pr->public_data.type,