* @param cp responding peer (will be updated)
* @param request_time time at which the original query was transmitted
* @param request_priority priority of the original request
- * @param initiator_client local client on responsible for query (or NULL)
- * @param initiator_peer other peer responsible for query (or NULL)
*/
void
GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
struct GNUNET_TIME_Absolute request_time,
- uint32_t request_priority,
- const struct GSF_LocalClient *initiator_client,
- const struct GSF_ConnectedPeer *initiator_peer)
+ uint32_t request_priority)
{
struct GNUNET_TIME_Relative delay;
- unsigned int i;
delay = GNUNET_TIME_absolute_get_duration (request_time);
cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
- if (NULL != initiator_client)
- {
- cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
- }
- else if (NULL != initiator_peer)
- {
- GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
- cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
- GNUNET_PEER_change_rc (initiator_peer->pid, 1);
- }
- else
- GNUNET_break (0);
+}
+
+
+/**
+ * Report on receiving a reply in response to an initiating client.
+ * Remember that this peer is good for this client.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_client local client on responsible for query
+ */
+void
+GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
+ const struct GSF_LocalClient *initiator_client)
+{
+ cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
+}
+
+
+/**
+ * Report on receiving a reply in response to an initiating peer.
+ * Remember that this peer is good for this initiating peer.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_peer other peer responsible for query
+ */
+void
+GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
+ const struct GSF_ConnectedPeer *initiator_peer)
+{
+ GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
+ cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
+ GNUNET_PEER_change_rc (initiator_peer->pid, 1);
}
* @param cp responding peer (will be updated)
* @param request_time time at which the original query was transmitted
* @param request_priority priority of the original request
- * @param initiator_client local client on responsible for query (or NULL)
- * @param initiator_peer other peer responsible for query (or NULL)
*/
void
GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
struct GNUNET_TIME_Absolute request_time,
- uint32_t request_priority,
- const struct GSF_LocalClient *initiator_client,
- const struct GSF_ConnectedPeer *initiator_peer);
+ uint32_t request_priority);
+
+
+/**
+ * Report on receiving a reply in response to an initiating client.
+ * Remember that this peer is good for this client.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_client local client on responsible for query
+ */
+void
+GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
+ const struct GSF_LocalClient *initiator_client);
+
+
+/**
+ * Report on receiving a reply in response to an initiating peer.
+ * Remember that this peer is good for this initiating peer.
+ *
+ * @param cp responding peer (will be updated)
+ * @param initiator_peer other peer responsible for query
+ */
+void
+GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
+ const struct GSF_ConnectedPeer *initiator_peer);
/**
*/
struct GNUNET_CONTAINER_BloomFilter *bf;
+ /**
+ * Entry for this pending request in the expiration heap, or NULL.
+ */
+ struct GNUNET_CONTAINER_HeapNode *hnode;
+
/**
* Number of valid entries in the 'replies_seen' array.
*/
/**
* Mingle value we currently use for the bf.
*/
- int32_t mingle;
+ uint32_t mingle;
};
return GNUNET_NO; /* size not changed */
if (pr->bf != NULL)
GNUNET_CONTAINER_bloomfilter_free (pr->bf);
- pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT32_MAX);
+ pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
nsize,
BLOOMFILTER_K);
const struct GNUNET_PeerIdentity *target,
const char *bf_data,
size_t bf_size,
- int32_t mingle,
+ uint32_t mingle,
uint32_t anonymity_level,
uint32_t priority,
int32_t ttl,
void *rh_cls)
{
struct GSF_PendingRequest *pr;
-
+ struct GSF_PendingRequest *dpr;
pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
pr->public_data.query = *query;
}
pr->public_data.anonymity_level = anonymity_data;
pr->public_data.priority = priority;
+ pr->public_data.original_priority = priority;
pr->public_data.options = options;
pr->public_data.type = type;
pr->public_data.start_time = GNUNET_TIME_absolute_get ();
query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- // FIXME: if not a local query, we also need to track the
- // total number of external queries we currently have and
- // bound it => need an additional heap!
-
- pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
- pr,
- pr->start_time.abs_value + pr->ttl);
-
-
-
- /* make sure we don't track too many requests */
- if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
+ if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
{
- pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
- GNUNET_assert (pr != NULL);
- destroy_pending_request (pr);
+ pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+ pr,
+ pr->ttl.abs_value);
+ /* make sure we don't track too many requests */
+ while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
+ {
+ dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+ GNUNET_assert (dpr != NULL);
+ if (pr == dpr)
+ break; /* let the request live briefly... */
+ dpr->rh (dpr->rh_cls,
+ dpr,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ NULL, 0,
+ GNUNET_SYSERR);
+ GSF_pending_request_cancel_ (dpr);
+ }
}
-
-
return pr;
}
{
/* we're not the initiator, but the initiator did not give us
any bloom-filter, so we need to create one on-the-fly */
- pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT32_MAX);
+ pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
pr->mingle,
BLOOMFILTER_K);
GNUNET_free_non_null (pr->replies_seen);
if (NULL != pr->bf)
GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+ if (NULL != pr->hnode)
+ GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
+ pr->hnode);
GNUNET_free (pr);
return GNUNET_YES;
}
/**
* Who gave us this reply? NULL for local host (or DHT)
*/
- struct ConnectedPeer *sender;
+ struct GSF_ConnectedPeer *sender;
/**
* When the reply expires.
if (prq->sender == NULL)
return;
- /* FIXME: adapt code to new API... */
- for (i=0;i<pr->used_targets_off;i++)
- if (pr->used_targets[i].pid == prq->sender->pid)
- break;
- if (i < pr->used_targets_off)
- {
- cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
- prq->sender->avg_delay.rel_value
- = (prq->sender->avg_delay.rel_value *
- (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
- prq->sender->avg_priority
- = (prq->sender->avg_priority *
- (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
- }
- if (pr->cp != NULL)
- {
- GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
- [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
- -1);
- GNUNET_PEER_change_rc (pr->cp->pid, 1);
- prq->sender->last_p2p_replies
- [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
- = pr->cp->pid;
- }
- else
- {
- if (NULL != prq->sender->last_client_replies
- [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
- GNUNET_SERVER_client_drop (prq->sender->last_client_replies
- [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
- prq->sender->last_client_replies
- [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
- = pr->client_request_list->client_list->client;
- GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
- }
+ GSF_peer_update_performance_ (prq->sender,
+ pr->start_time,
+ prq->priority);
}
struct PutMessage *pm;
struct ConnectedPeer *cp;
size_t msize;
+ GNUNET_HashCode chash;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
update_request_performance_data (prq, pr);
break;
case GNUNET_BLOCK_EVALUATION_OK_LAST:
+ /* short cut: stop processing early, no BF-update, etc. */
update_request_performance_data (prq, pr);
GNUNET_LOAD_update (rt_entry_lifetime,
GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
prq->type);
return GNUNET_NO;
}
- /* FIXME: adapt code to new API! */
- if (pr->client_request_list != NULL)
- {
- if (pr->replies_seen_size == pr->replies_seen_off)
- GNUNET_array_grow (pr->replies_seen,
- pr->replies_seen_size,
- pr->replies_seen_size * 2 + 4);
- GNUNET_CRYPTO_hash (prq->data,
- prq->size,
- &pr->replies_seen[pr->replies_seen_off++]);
- refresh_bloomfilter (pr);
- }
+ /* update bloomfilter */
+ GNUNET_CRYPTO_hash (prq->data,
+ prq->size,
+ &chash);
+ GSF_pending_request_update_ (pr, &chash, 1);
if (NULL == prq->sender)
{
#if DEBUG_FS
1,
GNUNET_NO);
}
- prq->priority += pr->remaining_priority;
- pr->remaining_priority = 0;
- pr->results_found++;
+ prq->priority += pr->public_data.original_priority;
+ pr->public_data.remaining_priority = 0;
+ pr->public_data.original_priority = 0;
+ pr->public_data.results_found++;
prq->request_found = GNUNET_YES;
- /* finally, pass on to other peers / local clients */
+ /* finally, pass on to other peer / local client */
pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
return GNUNET_YES;
}