#include "gnunet-service-fs_indexing.h"
#include "fs.h"
-#define DEBUG_FS GNUNET_NO
+#define DEBUG_FS GNUNET_YES
/**
* Maximum number of outgoing messages we queue per peer.
- * FIXME: set to a tiny value for testing; make configurable.
+ * FIXME: make configurable?
*/
-#define MAX_QUEUE_PER_PEER 2
+#define MAX_QUEUE_PER_PEER 16
/**
* Inverse of the probability that we will submit the same query
* to the same peer again. If the same peer already got the query
* repeatedly recently, the probability is multiplied by the inverse
- * of this number each time.
+ * of this number each time. Note that we only try about every TTL_DECREMENT/2
+ * plus MAX_CORK_DELAY (so roughly every 3.5s).
*/
-#define RETRY_PROBABILITY_INV 8
+#define RETRY_PROBABILITY_INV 3
/**
* What is the maximum delay for a P2P FS message (in our interaction
/**
* Maximum number of requests (from other peers) that we're
* willing to have pending at any given point in time.
- * FIXME: set from configuration (and 32 is a tiny value for testing only).
+ * FIXME: set from configuration.
*/
-static uint64_t max_pending_requests = 32;
+static uint64_t max_pending_requests = (32 * 1024);
/**
*/
uint32_t type;
+ /**
+ * Remove this request after transmission of the current response.
+ */
+ int do_remove;
+
};
if (pos == NULL)
return; /* no requests pending for this client */
while (NULL != (rcl = pos->rl_head))
- destroy_pending_request (rcl->req);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Destroying pending request `%s' on disconnect\n",
+ GNUNET_h2s (&rcl->req->query));
+ destroy_pending_request (rcl->req);
+ }
if (prev == NULL)
client_list = pos->next;
else
struct GNUNET_PeerIdentity pid;
struct PendingMessage *pm;
size_t msize;
-
+ struct PendingRequest *pr;
+
cp->cth = NULL;
if (NULL == buf)
{
memcpy (&cbuf[msize], &pm[1], pm->msize);
msize += pm->msize;
size -= pm->msize;
+ pr = pm->pml->req;
destroy_pending_message (pm, cp->pid);
}
if (NULL != pm)
struct PendingRequest *pr = psc->pr;
double score;
unsigned int i;
+ unsigned int pc;
/* 1) check if we have already (recently) forwarded to this peer */
+ pc = 0;
for (i=0;i<pr->used_pids_off;i++)
- if ( (pr->used_pids[i] == cp->pid) &&
- (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- RETRY_PROBABILITY_INV)) )
- return GNUNET_YES; /* skip */
+ if (pr->used_pids[i] == cp->pid)
+ {
+ pc++;
+ if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ RETRY_PROBABILITY_INV))
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "NOT re-trying query that was previously transmitted %u times\n",
+ (unsigned int) pr->used_pids_off);
+#endif
+ return GNUNET_YES; /* skip */
+ }
+ }
+#if DEBUG_FS
+ if (0 < pc)
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Re-trying query that was previously transmitted %u times to this peer\n",
+ (unsigned int) pc);
+#endif
// 2) calculate how much we'd like to forward to this peer
score = 42; // FIXME!
// FIXME: also need API to gather data on responsiveness
struct PendingRequest *pr = cls;
struct PeerSelectionContext psc;
struct ConnectedPeer *cp;
+ struct GNUNET_TIME_Relative delay;
pr->task = GNUNET_SCHEDULER_NO_TASK;
if (pr->irc != NULL)
- return; /* already pending */
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Forwarding of query `%s' not attempted due to pending local lookup!\n",
+ GNUNET_h2s (&pr->query));
+#endif
+ return; /* already pending */
+ }
/* (1) select target */
psc.pr = pr;
psc.target_score = DBL_MIN;
&psc);
if (psc.target_score == DBL_MIN)
{
-#if DEBUG_FS
+ delay = get_processing_delay ();
+#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No peer selected for forwarding of query `%s'!\n",
- GNUNET_h2s (&pr->query));
+ "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
+ GNUNET_h2s (&pr->query),
+ delay.value);
#endif
pr->task = GNUNET_SCHEDULER_add_delayed (sched,
- get_processing_delay (),
+ delay,
&forward_request_task,
pr);
return; /* nobody selected */
GNUNET_HashCode chash;
GNUNET_HashCode mhash;
size_t msize;
- int do_remove;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
gettext_noop ("# replies received and matched"),
1,
GNUNET_NO);
- do_remove = GNUNET_NO;
GNUNET_CRYPTO_hash (prq->data,
prq->size,
&chash);
GNUNET_FS_drq_get_cancel (pr->drq);
pr->drq = NULL;
}
- do_remove = GNUNET_YES;
+ pr->do_remove = GNUNET_YES;
+ if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ pr->task);
+ pr->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+ key,
+ pr));
break;
case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
if (pr->namespace == NULL)
cl);
}
GNUNET_break (cl->th != NULL);
+ if (pr->do_remove)
+ destroy_pending_request (pr);
}
else
{
memcpy (&pm[1], prq->data, prq->size);
add_to_pending_messages_for_peer (cp, reply, pr);
}
- if (GNUNET_YES == do_remove)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Removing request `%s' from request map (has been satisfied)\n",
- GNUNET_h2s (key));
-#endif
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (query_request_map,
- key,
- pr));
- // FIXME: request somehow does not fully disappear; how to fix?
- // destroy_pending_request (pr); (not like this!)
- }
// FIXME: implement hot-path routing statistics keeping!
return GNUNET_YES;
}
prq.type = type;
prq.priority = priority;
process_reply (&prq, key, pr);
-
- if ( ( (pr->client_request_list == NULL) &&
- ( (GNUNET_YES == test_load_too_high()) ||
- (pr->results_found > 5 + 2 * pr->priority) ) ) ||
- (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) )
+
+ if ( (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) ||
+ (type == GNUNET_DATASTORE_BLOCKTYPE_IBLOCK) )
+ {
+ GNUNET_FS_drq_get_next (GNUNET_NO);
+ return;
+ }
+ if ( (pr->client_request_list == NULL) &&
+ ( (GNUNET_YES == test_load_too_high()) ||
+ (pr->results_found > 5 + 2 * pr->priority) ) )
{
#if DEBUG_FS > 2
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Unique reply found or load too high, done with request\n");
+ "Load too high, done with request\n");
#endif
- if (type != GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# processing result set cut short due to load"),
- 1,
- GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# processing result set cut short due to load"),
+ 1,
+ GNUNET_NO);
GNUNET_FS_drq_get_next (GNUNET_NO);
return;
}
return GNUNET_OK;
}
-#if DEBUG_FS
+#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for `%s' of type %u from peer `%4s' with flags %u\n",
GNUNET_h2s (&gm->query),