*/
#define SUPPORT_DELAYS GNUNET_NO
-/**
- * Maximum number of outgoing messages we queue per peer.
- */
-#define MAX_QUEUE_PER_PEER 16
-
/**
* Size for the hash map for DHT requests from the FS
* service. Should be about the number of concurrent
* getting a reply (only calculated over the requests for
* which we actually got a reply). Calculated
* as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+ *
+ * FIXME: actually, this is currently the delay between us originally
+ * receiving (not forwarding!) a request and us receiving a reply from
+ * this peer (regardless of when we transmitted this request to this peer!)
*/
struct GNUNET_TIME_Relative avg_delay;
*/
struct GNUNET_TIME_Absolute last_migration_block;
+ /**
+ * Transmission times for the last MAX_QUEUE_PER_PEER
+ * requests for this peer. Used as a ring buffer, current
+ * offset is stored in 'last_request_times_off'. If the
+ * oldest entry is more recent than the 'avg_delay', we should
+ * not send any more requests right now.
+ */
+ struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
+
/**
* Handle for an active request for transmission to this
* peer, or NULL.
*/
unsigned int last_client_replies_woff;
+ /**
+ * Current offset into 'last_request_times' ring buffer.
+ */
+ unsigned int last_request_times_off;
+
};
};
+/**
+ * Information about a peer that we have forwarded this
+ * request to already.
+ */
+struct UsedTargetEntry
+{
+ /**
+ * What was the last time we have transmitted this request to this
+ * peer?
+ */
+ struct GNUNET_TIME_Absolute last_request_time;
+
+ /**
+ * How often have we transmitted this request to this peer?
+ */
+ unsigned int num_requests;
+
+ /**
+ * PID of the target peer.
+ */
+ GNUNET_PEER_Id pid;
+
+};
+
+
+
+
+
/**
* Doubly-linked list of messages we are performing
* due to a pending request.
* (Interned) Peer identifiers of peers that have already
* received our query for this content.
*/
- GNUNET_PEER_Id *used_pids;
+ struct UsedTargetEntry *used_targets;
/**
* Our entry in the queue (non-NULL while we wait for our
uint32_t anonymity_level;
/**
- * How many entries in "used_pids" are actually valid?
+ * How many entries in "used_targets" are actually valid?
*/
- unsigned int used_pids_off;
+ unsigned int used_targets_off;
/**
- * How long is the "used_pids" array?
+ * How long is the "used_targets" array?
*/
- unsigned int used_pids_size;
+ unsigned int used_targets_size;
/**
* Number of results found for this request.
destroy_pending_request (struct PendingRequest *pr)
{
struct GNUNET_PeerIdentity pid;
+ unsigned int i;
if (pr->hnode != NULL)
{
while (NULL != pr->pending_head)
destroy_pending_message_list_entry (pr->pending_head);
GNUNET_PEER_change_rc (pr->target_pid, -1);
- if (pr->used_pids != NULL)
+ if (pr->used_targets != NULL)
{
- GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
- GNUNET_free (pr->used_pids);
- pr->used_pids_off = 0;
- pr->used_pids_size = 0;
- pr->used_pids = NULL;
+ for (i=0;i<pr->used_targets_off;i++)
+ GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
+ GNUNET_free (pr->used_targets);
+ pr->used_targets_off = 0;
+ pr->used_targets_size = 0;
+ pr->used_targets = NULL;
}
GNUNET_free (pr);
}
pm);
cp->pending_requests++;
if (cp->pending_requests > MAX_QUEUE_PER_PEER)
- destroy_pending_message (cp->pending_messages_tail, 0);
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches discarded (queue length bound)"),
+ 1,
+ GNUNET_NO);
+ destroy_pending_message (cp->pending_messages_tail, 0);
+ }
GNUNET_PEER_resolve (cp->pid, &pid);
if (NULL != cp->cth)
{
GNUNET_PEER_Id tpid)
{
struct PendingRequest *pr = cls;
+ unsigned int i;
GNUNET_STATISTICS_update (stats,
gettext_noop ("# queries scheduled for forwarding"),
pr);
return;
}
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted query `%s'\n",
+ GNUNET_h2s (&pr->query));
+#endif
GNUNET_STATISTICS_update (stats,
gettext_noop ("# queries forwarded"),
1,
GNUNET_NO);
- GNUNET_PEER_change_rc (tpid, 1);
- if (pr->used_pids_off == pr->used_pids_size)
- GNUNET_array_grow (pr->used_pids,
- pr->used_pids_size,
- pr->used_pids_size * 2 + 2);
- pr->used_pids[pr->used_pids_off++] = tpid;
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == tpid)
+ break; /* found match! */
+ if (i == pr->used_targets_off)
+ {
+ /* need to create new entry */
+ if (pr->used_targets_off == pr->used_targets_size)
+ GNUNET_array_grow (pr->used_targets,
+ pr->used_targets_size,
+ pr->used_targets_size * 2 + 2);
+ GNUNET_PEER_change_rc (tpid, 1);
+ pr->used_targets[pr->used_targets_off].pid = tpid;
+ pr->used_targets[pr->used_targets_off].num_requests = 0;
+ i = pr->used_targets_off++;
+ }
+ pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
+ pr->used_targets[i].num_requests++;
if (pr->task == GNUNET_SCHEDULER_NO_TASK)
pr->task = GNUNET_SCHEDULER_add_delayed (sched,
get_processing_delay (),
unsigned int k;
int no_route;
uint32_t bm;
+ unsigned int i;
pr->irc = NULL;
if (peer == NULL)
pr);
return;
}
- // (3) transmit, update ttl/priority
+ /* (3) transmit, update ttl/priority */
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&peer->hashPubKey);
if (cp == NULL)
gettext_noop ("# queries scheduled for forwarding"),
1,
GNUNET_NO);
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == cp->pid)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# queries retransmitted to same target"),
+ 1,
+ GNUNET_NO);
+ break;
+ }
+
/* build message and insert message into priority queue */
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
pr->bf_size);
pm->cont = &transmit_query_continuation;
pm->cont_cls = pr;
+ cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
add_to_pending_messages_for_peer (cp, pm, pr);
}
struct PeerSelectionContext *psc = cls;
struct ConnectedPeer *cp = value;
struct PendingRequest *pr = psc->pr;
+ struct GNUNET_TIME_Relative delay;
double score;
unsigned int i;
unsigned int pc;
}
/* 2) check if we have already (recently) forwarded to this peer */
+ /* 2a) this particular request */
pc = 0;
- for (i=0;i<pr->used_pids_off;i++)
- if (pr->used_pids[i] == cp->pid)
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == cp->pid)
{
- pc++;
+ pc = pr->used_targets[i].num_requests;
+ GNUNET_assert (pc > 0);
if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- RETRY_PROBABILITY_INV))
+ RETRY_PROBABILITY_INV * pc))
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"NOT re-trying query that was previously transmitted %u times\n",
- (unsigned int) pr->used_pids_off);
+ (unsigned int) pc);
#endif
return GNUNET_YES; /* skip */
}
+ break;
}
#if DEBUG_FS
if (0 < pc)
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Re-trying query that was previously transmitted %u times to this peer\n",
- (unsigned int) pc);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Re-trying query that was previously transmitted %u times to this peer\n",
+ (unsigned int) pc);
+ }
#endif
+ /* 2b) many other requests to this peer */
+ delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
+ if (delay.value <= cp->avg_delay.value)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "NOT sending query since we send %u others to this peer in the last %llums\n",
+ MAX_QUEUE_PER_PEER,
+ cp->avg_delay.value);
+#endif
+ return GNUNET_YES; /* skip */
+ }
+
/* 3) calculate how much we'd like to forward to this peer,
starting with a random value that is strong enough
to at least give any peer a chance sometimes
struct GNUNET_TIME_Relative art_delay;
#endif
size_t msize;
+ unsigned int i;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_NO);
if (prq->sender != NULL)
{
- cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
- prq->sender->avg_delay.value
- = (prq->sender->avg_delay.value *
- (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
- prq->sender->avg_priority
- = (prq->sender->avg_priority *
- (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == prq->sender->pid)
+ break;
+ if (i < pr->used_targets_off)
+ {
+ cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
+ prq->sender->avg_delay.value
+ = (prq->sender->avg_delay.value *
+ (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
+ prq->sender->avg_priority
+ = (prq->sender->avg_priority *
+ (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ }
if (pr->cp != NULL)
{
GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
return GNUNET_SYSERR;
}
gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for `%s'\n",
+ GNUNET_h2s (&gm->query));
+#endif
type = ntohl (gm->type);
bm = ntohl (gm->hash_bitmap);
bits = 0;
BLOOMFILTER_K);
pr->bf_size = bfsize;
}
-
cdc.have = NULL;
cdc.pr = pr;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
(pr->priority + 1));
if (GNUNET_YES != pr->forward_only)
- pr->qe = GNUNET_DATASTORE_get (dsh,
- &gm->query,
- type,
- pr->priority + 1,
- MAX_DATASTORE_QUEUE,
- timeout,
- &process_local_reply,
- pr);
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Handing request for `%s' to datastore\n",
+ GNUNET_h2s (&gm->query));
+#endif
+ pr->qe = GNUNET_DATASTORE_get (dsh,
+ &gm->query,
+ type,
+ pr->priority + 1,
+ MAX_DATASTORE_QUEUE,
+ timeout,
+ &process_local_reply,
+ pr);
+ if (NULL == pr->qe)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped by datastore (queue length limit)"),
+ 1,
+ GNUNET_NO);
+ }
+ }
else
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests forwarded due to high load"),
- 1,
- GNUNET_NO);
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests forwarded due to high load"),
+ 1,
+ GNUNET_NO);
+ }
/* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
switch (pr->type)
#include "fs_test_lib.h"
#include "gnunet_testing_lib.h"
-#define VERBOSE GNUNET_NO
+#define VERBOSE GNUNET_YES
/**
* File-size we use for testing.
*/
-#define FILESIZE (1024 * 1024 * 10)
+#define FILESIZE (1024 * 1024 * 1)
/**
* How long until we give up on transmitting the message?
*/
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 300)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 3)
#define NUM_DAEMONS 2
{ "fs", "# requests done for free (low load)"},
{ "fs", "# P2P searches received"},
{ "fs", "# replies received for local clients"},
+ { "fs", "# P2P searches discarded (queue length bound)"},
+ { "fs", "# requests dropped due to high load"},
+ { "fs", "# requests dropped by datastore (queue length limit)"},
+ { "fs", "# queries retransmitted to same target"},
{ "fs", "cummulative artificial delay introduced (ms)"},
{ "core", "# bytes decrypted"},
{ "core", "# bytes encrypted"},
return GNUNET_OK;
}
+
/**
* Function that gathers stats from all daemons.
*/
stat_run (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc);
+
/**
* Function called when GET operation on stats is done.
*/
GNUNET_SCHEDULER_add_now (sched, &stat_run, sm);
}
+
/**
* Function that gathers stats from all daemons.
*/
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Timeout during download, shutting down with error\n");
ok = 1;
GNUNET_SCHEDULER_add_now (sched, &do_stop, NULL);
GNUNET_FS_TEST_daemons_stop (sched,
NUM_DAEMONS,
daemons);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Timeout during upload attempt, shutting down with error\n");
ok = 1;
return;
GNUNET_FS_TEST_daemons_stop (sched,
NUM_DAEMONS,
daemons);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Error trying to connect: %s\n",
emsg);
ok = 1;