struct GSF_ConnectedPeer *cp = cls;
struct GNUNET_PeerIdentity pid;
+ if (GNUNET_YES !=
+ GSF_pending_request_test_active_ (pr))
+ return GNUNET_YES; /* request is not actually active, skip! */
GSF_connected_peer_get_identity_ (cp, &pid);
if (GNUNET_YES !=
GSF_pending_request_test_target_ (pr, &pid))
rprd = GSF_pending_request_get_data_ (ret);
for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE)
{
+ GNUNET_break (GNUNET_YES ==
+ GSF_pending_request_test_active_ (bi->pr));
prd = GSF_pending_request_get_data_ (bi->pr);
if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
{
return 0;
}
/* remove from root, add again elsewhere... */
- GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
+ GNUNET_assert (rp ==
+ GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
rp->hn = NULL;
rp->last_transmission = GNUNET_TIME_absolute_get ();
rp->transmission_counter++;
rp, rp->transmission_counter);
plan (pp, rp);
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop
- ("# query messages sent to other peers"), 1,
+ gettext_noop ("# query messages sent to other peers"),
+ 1,
GNUNET_NO);
return msize;
}
(rp->earliest_transmission).rel_value_us))
{
GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
- rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
}
if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
{
#endif
/* process from priority heap */
rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing query plan %p\n",
+ rp);
GNUNET_assert (NULL != rp);
msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
pp->pth =
struct GSF_PendingRequestPlanBijection *bi;
struct GSF_PendingRequest *latest;
+ GNUNET_break (GNUNET_YES ==
+ GSF_pending_request_test_active_ (mpr->pr));
if (GNUNET_OK !=
GSF_pending_request_is_compatible_ (mpr->pr,
rp->pe_head->pr))
mpr->merged = GNUNET_YES;
#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# requests merged"), 1,
+ gettext_noop ("# requests merged"),
+ 1,
GNUNET_NO);
#endif
latest = get_latest (rp);
#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# requests refreshed"),
- 1, GNUNET_NO);
+ 1,
+ GNUNET_NO);
#endif
rp->transmission_counter = 0; /* reset */
}
struct GSF_PendingRequestPlanBijection *bi;
struct MergeContext mpc;
+ GNUNET_assert (GNUNET_YES ==
+ GSF_pending_request_test_active_ (pr));
GNUNET_assert (NULL != cp);
id = GSF_connected_peer_get_identity2_ (cp);
pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
{
GNUNET_break (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
- get_rp_key (rp), rp));
+ get_rp_key (rp),
+ rp));
while (NULL != (bi = rp->pe_head))
{
- GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi);
+ GNUNET_CONTAINER_MDLL_remove (PE,
+ rp->pe_head,
+ rp->pe_tail,
+ bi);
prd = GSF_pending_request_get_data_ (bi->pr);
- GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi);
+ GNUNET_CONTAINER_MDLL_remove (PR,
+ prd->pr_head,
+ prd->pr_tail,
+ bi);
GNUNET_free (bi);
}
plan_count--;
{
GNUNET_break (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
- get_rp_key (rp), rp));
+ get_rp_key (rp),
+ rp));
while (NULL != (bi = rp->pe_head))
{
prd = GSF_pending_request_get_data_ (bi->pr);
- GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi);
- GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi);
+ GNUNET_CONTAINER_MDLL_remove (PE,
+ rp->pe_head,
+ rp->pe_tail,
+ bi);
+ GNUNET_CONTAINER_MDLL_remove (PR,
+ prd->pr_head,
+ prd->pr_tail,
+ bi);
GNUNET_free (bi);
}
plan_count--;
GNUNET_free (rp);
}
- GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
- plan_count, GNUNET_NO);
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ plan_count,
+ GNUNET_NO);
GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
GNUNET_free (pp);
while (NULL != (bi = prd->pr_head))
{
rp = bi->rp;
- GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi);
- GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi);
+ GNUNET_CONTAINER_MDLL_remove (PR,
+ prd->pr_head,
+ prd->pr_tail,
+ bi);
+ GNUNET_CONTAINER_MDLL_remove (PE,
+ rp->pe_head,
+ rp->pe_tail,
+ bi);
+ GNUNET_assert (bi->pr == pr);
if (NULL == rp->pe_head)
{
GNUNET_CONTAINER_heap_remove_node (rp->hn);
plan_count--;
GNUNET_break (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
- &GSF_pending_request_get_data_
- (bi->pr)->query,
+ &prd->query,
rp));
GNUNET_free (rp);
}
GNUNET_free (bi);
}
- GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
- plan_count, GNUNET_NO);
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ plan_count,
+ GNUNET_NO);
}
void
GSF_plan_init ()
{
- plans = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
+ plans = GNUNET_CONTAINER_multipeermap_create (256,
+ GNUNET_YES);
}
if (NULL != (cont = pr->llc_cont))
{
pr->llc_cont = NULL;
- cont (pr->llc_cont_cls, pr, pr->local_result);
+ cont (pr->llc_cont_cls,
+ pr,
+ pr->local_result);
}
GSF_plan_notify_request_done_ (pr);
GNUNET_free_non_null (pr->replies_seen);
&pr->public_data.query,
pr));
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# Pending requests active"), -1,
+ gettext_noop ("# Pending requests active"),
+ -1,
GNUNET_NO);
GNUNET_free (pr);
return GNUNET_YES;
if (NULL == pr_map)
return; /* already cleaned up! */
- if (GNUNET_YES != full_cleanup)
+ if (GNUNET_NO == full_cleanup)
{
/* make request inactive (we're no longer interested in more results),
* but do NOT remove from our data-structures, we still need it there
/* retry -- without delay, as this is non-anonymous
and cadet/cadet connect will take some time anyway */
pr->cadet_request = GSF_cadet_query (pr->public_data.target,
- &pr->public_data.query,
- pr->public_data.type,
- &cadet_reply_proc,
- pr);
+ &pr->public_data.query,
+ pr->public_data.type,
+ &cadet_reply_proc,
+ pr);
return;
}
if (GNUNET_YES !=
if (NULL != pr->cadet_request)
return;
pr->cadet_request = GSF_cadet_query (pr->public_data.target,
- &pr->public_data.query,
- pr->public_data.type,
- &cadet_reply_proc,
- pr);
+ &pr->public_data.query,
+ pr->public_data.type,
+ &cadet_reply_proc,
+ pr);
}
}
+/**
+ * Check if the given request is still active.
+ *
+ * @param pr pending request
+ * @return #GNUNET_YES if the request is still active
+ */
+int
+GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr)
+{
+ return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
+}
+
+
/**
* Setup the subsystem.
*/
void
GSF_pending_request_done_ ()
{
- GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL);
+ GNUNET_CONTAINER_multihashmap_iterate (pr_map,
+ &clean_request,
+ NULL);
GNUNET_CONTAINER_multihashmap_destroy (pr_map);
pr_map = NULL;
GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
struct GSF_PendingRequest *
GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
enum GNUNET_BLOCK_Type type,
- const struct GNUNET_HashCode * query,
+ const struct GNUNET_HashCode *query,
const struct GNUNET_PeerIdentity *target,
- const char *bf_data, size_t bf_size,
- uint32_t mingle, uint32_t anonymity_level,
- uint32_t priority, int32_t ttl,
+ const char *bf_data,
+ size_t bf_size,
+ uint32_t mingle,
+ uint32_t anonymity_level,
+ uint32_t priority,
+ int32_t ttl,
GNUNET_PEER_Id sender_pid,
GNUNET_PEER_Id origin_pid,
- const struct GNUNET_HashCode * replies_seen,
+ const struct GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count,
- GSF_PendingRequestReplyHandler rh, void *rh_cls);
+ GSF_PendingRequestReplyHandler rh,
+ void *rh_cls);
/**
*
* @param pr request to update
* @param replies_seen hash codes of replies that we've seen
- * @param replies_seen_count size of the replies_seen array
+ * @param replies_seen_count size of the @a replies_seen array
*/
void
GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
- const struct GNUNET_HashCode * replies_seen,
+ const struct GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count);
GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr);
+/**
+ * Check if the given request is still active.
+ *
+ * @param pr pending request
+ * @return #GNUNET_YES if the request is still active
+ */
+int
+GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr);
+
+
/**
* Test if two pending requests are compatible (would generate
* the same query modulo filters and should thus be processed
*
* @param pra a pending request
* @param prb another pending request
- * @return GNUNET_OK if the requests are compatible
+ * @return #GNUNET_OK if the requests are compatible
*/
int
GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
* transmission to other peers (or at least determine its size).
*
* @param pr request to generate the message for
- * @param buf_size number of bytes available in buf
+ * @param buf_size number of bytes available in @a buf
* @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if buf_size too small) or used
+ * @return number of bytes needed (if @a buf_size too small) or used
*/
size_t
GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
- size_t buf_size, void *buf);
+ size_t buf_size,
+ void *buf);
/**
* @param full_cleanup fully purge the request
*/
void
-GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup);
+GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr,
+ int full_cleanup);
/**
* @param cls closure
* @param key query for the request
* @param pr handle to the pending request
- * @return GNUNET_YES to continue to iterate
+ * @return #GNUNET_YES to continue to iterate
*/
-typedef int (*GSF_PendingRequestIterator) (void *cls,
- const struct GNUNET_HashCode * key,
- struct GSF_PendingRequest * pr);
+typedef int
+(*GSF_PendingRequestIterator) (void *cls,
+ const struct GNUNET_HashCode *key,
+ struct GSF_PendingRequest *pr);
/**
* @param cls closure for it
*/
void
-GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls);
+GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
+ void *cls);
/**
* @param cp the other peer involved (sender or receiver, NULL
* for loopback messages where we are both sender and receiver)
* @param message the actual message
- * @return GNUNET_OK if the message was well-formed,
- * GNUNET_SYSERR if the message was malformed (close connection,
+ * @return #GNUNET_OK if the message was well-formed,
+ * #GNUNET_SYSERR if the message was malformed (close connection,
* do not cache under any circumstances)
*/
int
* @param pr the pending request we were processing
* @param result final datastore lookup result
*/
-typedef void (*GSF_LocalLookupContinuation) (void *cls,
- struct GSF_PendingRequest * pr,
- enum GNUNET_BLOCK_EvaluationResult
- result);
+typedef void
+(*GSF_LocalLookupContinuation) (void *cls,
+ struct GSF_PendingRequest *pr,
+ enum GNUNET_BLOCK_EvaluationResult result);
/**
*
* @param pr the pending request to process
* @param cont function to call at the end
- * @param cont_cls closure for cont
+ * @param cont_cls closure for @a cont
*/
void
GSF_local_lookup_ (struct GSF_PendingRequest *pr,
- GSF_LocalLookupContinuation cont, void *cont_cls);
+ GSF_LocalLookupContinuation cont,
+ void *cont_cls);
/**
*
* @param pr request
* @param target
- * @return GNUNET_YES if this request could be forwarded to the given peer
+ * @return #GNUNET_YES if this request could be forwarded to the given peer
*/
int
GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr,