From 6342157c3068d482f7f8c02b7229e80eba2c94dc Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 26 Oct 2015 00:53:32 +0000 Subject: [PATCH] potential fix for #4024 --- src/fs/gnunet-service-fs.c | 3 ++ src/fs/gnunet-service-fs_pe.c | 82 +++++++++++++++++++++++++---------- src/fs/gnunet-service-fs_pr.c | 42 +++++++++++++----- src/fs/gnunet-service-fs_pr.h | 73 +++++++++++++++++++------------ 4 files changed, 139 insertions(+), 61 deletions(-) diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index b4f336a6c..667e4f8ba 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -591,6 +591,9 @@ consider_peer_for_forwarding (void *cls, struct GSF_ConnectedPeer *cp = cls; struct GNUNET_PeerIdentity pid; + if (GNUNET_YES != + GSF_pending_request_test_active_ (pr)) + return GNUNET_YES; /* request is not actually active, skip! */ GSF_connected_peer_get_identity_ (cp, &pid); if (GNUNET_YES != GSF_pending_request_test_target_ (pr, &pid)) diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index 9743d1cb2..1c4065f70 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -371,6 +371,8 @@ get_latest (const struct GSF_RequestPlan *rp) rprd = GSF_pending_request_get_data_ (ret); for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE) { + GNUNET_break (GNUNET_YES == + GSF_pending_request_test_active_ (bi->pr)); prd = GSF_pending_request_get_data_ (bi->pr); if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us) { @@ -433,7 +435,8 @@ transmit_message_callback (void *cls, return 0; } /* remove from root, add again elsewhere... */ - GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); + GNUNET_assert (rp == + GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); rp->hn = NULL; rp->last_transmission = GNUNET_TIME_absolute_get (); rp->transmission_counter++; @@ -443,8 +446,8 @@ transmit_message_callback (void *cls, rp, rp->transmission_counter); plan (pp, rp); GNUNET_STATISTICS_update (GSF_stats, - gettext_noop - ("# query messages sent to other peers"), 1, + gettext_noop ("# query messages sent to other peers"), + 1, GNUNET_NO); return msize; } @@ -477,7 +480,9 @@ schedule_peer_transmission (void *cls, (rp->earliest_transmission).rel_value_us)) { GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)); - rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); + rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, + rp, + rp->priority); } if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) { @@ -512,7 +517,9 @@ schedule_peer_transmission (void *cls, #endif /* process from priority heap */ rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing query plan %p\n", + rp); GNUNET_assert (NULL != rp); msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); pp->pth = @@ -565,6 +572,8 @@ merge_pr (void *cls, struct GSF_PendingRequestPlanBijection *bi; struct GSF_PendingRequest *latest; + GNUNET_break (GNUNET_YES == + GSF_pending_request_test_active_ (mpr->pr)); if (GNUNET_OK != GSF_pending_request_is_compatible_ (mpr->pr, rp->pe_head->pr)) @@ -585,7 +594,8 @@ merge_pr (void *cls, mpr->merged = GNUNET_YES; #if INSANE_STATISTICS GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# requests merged"), 1, + gettext_noop ("# requests merged"), + 1, GNUNET_NO); #endif latest = get_latest (rp); @@ -595,7 +605,8 @@ merge_pr (void *cls, #if INSANE_STATISTICS GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"), - 1, GNUNET_NO); + 1, + GNUNET_NO); #endif rp->transmission_counter = 0; /* reset */ } @@ -620,6 +631,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct GSF_PendingRequestPlanBijection *bi; struct MergeContext mpc; + GNUNET_assert (GNUNET_YES == + GSF_pending_request_test_active_ (pr)); GNUNET_assert (NULL != cp); id = GSF_connected_peer_get_identity2_ (cp); pp = GNUNET_CONTAINER_multipeermap_get (plans, id); @@ -715,12 +728,19 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) { GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (pp->plan_map, - get_rp_key (rp), rp)); + get_rp_key (rp), + rp)); while (NULL != (bi = rp->pe_head)) { - GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi); + GNUNET_CONTAINER_MDLL_remove (PE, + rp->pe_head, + rp->pe_tail, + bi); prd = GSF_pending_request_get_data_ (bi->pr); - GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi); + GNUNET_CONTAINER_MDLL_remove (PR, + prd->pr_head, + prd->pr_tail, + bi); GNUNET_free (bi); } plan_count--; @@ -731,19 +751,28 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) { GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (pp->plan_map, - get_rp_key (rp), rp)); + get_rp_key (rp), + rp)); while (NULL != (bi = rp->pe_head)) { prd = GSF_pending_request_get_data_ (bi->pr); - GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi); - GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi); + GNUNET_CONTAINER_MDLL_remove (PE, + rp->pe_head, + rp->pe_tail, + bi); + GNUNET_CONTAINER_MDLL_remove (PR, + prd->pr_head, + prd->pr_tail, + bi); GNUNET_free (bi); } plan_count--; GNUNET_free (rp); } - GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"), - plan_count, GNUNET_NO); + GNUNET_STATISTICS_set (GSF_stats, + gettext_noop ("# query plan entries"), + plan_count, + GNUNET_NO); GNUNET_CONTAINER_heap_destroy (pp->delay_heap); GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map); GNUNET_free (pp); @@ -798,23 +827,31 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr) while (NULL != (bi = prd->pr_head)) { rp = bi->rp; - GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi); - GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi); + GNUNET_CONTAINER_MDLL_remove (PR, + prd->pr_head, + prd->pr_tail, + bi); + GNUNET_CONTAINER_MDLL_remove (PE, + rp->pe_head, + rp->pe_tail, + bi); + GNUNET_assert (bi->pr == pr); if (NULL == rp->pe_head) { GNUNET_CONTAINER_heap_remove_node (rp->hn); plan_count--; GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map, - &GSF_pending_request_get_data_ - (bi->pr)->query, + &prd->query, rp)); GNUNET_free (rp); } GNUNET_free (bi); } - GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"), - plan_count, GNUNET_NO); + GNUNET_STATISTICS_set (GSF_stats, + gettext_noop ("# query plan entries"), + plan_count, + GNUNET_NO); } @@ -824,7 +861,8 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr) void GSF_plan_init () { - plans = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); + plans = GNUNET_CONTAINER_multipeermap_create (256, + GNUNET_YES); } diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 21b00e80e..fe3f939f9 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -628,7 +628,9 @@ clean_request (void *cls, if (NULL != (cont = pr->llc_cont)) { pr->llc_cont = NULL; - cont (pr->llc_cont_cls, pr, pr->local_result); + cont (pr->llc_cont_cls, + pr, + pr->local_result); } GSF_plan_notify_request_done_ (pr); GNUNET_free_non_null (pr->replies_seen); @@ -666,7 +668,8 @@ clean_request (void *cls, &pr->public_data.query, pr)); GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# Pending requests active"), -1, + gettext_noop ("# Pending requests active"), + -1, GNUNET_NO); GNUNET_free (pr); return GNUNET_YES; @@ -687,7 +690,7 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, if (NULL == pr_map) return; /* already cleaned up! */ - if (GNUNET_YES != full_cleanup) + if (GNUNET_NO == full_cleanup) { /* make request inactive (we're no longer interested in more results), * but do NOT remove from our data-structures, we still need it there @@ -1234,10 +1237,10 @@ cadet_reply_proc (void *cls, /* retry -- without delay, as this is non-anonymous and cadet/cadet connect will take some time anyway */ pr->cadet_request = GSF_cadet_query (pr->public_data.target, - &pr->public_data.query, - pr->public_data.type, - &cadet_reply_proc, - pr); + &pr->public_data.query, + pr->public_data.type, + &cadet_reply_proc, + pr); return; } if (GNUNET_YES != @@ -1286,10 +1289,10 @@ GSF_cadet_lookup_ (struct GSF_PendingRequest *pr) if (NULL != pr->cadet_request) return; pr->cadet_request = GSF_cadet_query (pr->public_data.target, - &pr->public_data.query, - pr->public_data.type, - &cadet_reply_proc, - pr); + &pr->public_data.query, + pr->public_data.type, + &cadet_reply_proc, + pr); } @@ -1839,6 +1842,19 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, } +/** + * Check if the given request is still active. + * + * @param pr pending request + * @return #GNUNET_YES if the request is still active + */ +int +GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr) +{ + return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO; +} + + /** * Setup the subsystem. */ @@ -1868,7 +1884,9 @@ GSF_pending_request_init_ () void GSF_pending_request_done_ () { - GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL); + GNUNET_CONTAINER_multihashmap_iterate (pr_map, + &clean_request, + NULL); GNUNET_CONTAINER_multihashmap_destroy (pr_map); pr_map = NULL; GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 547595b67..0be91fec2 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -222,16 +222,20 @@ typedef void struct GSF_PendingRequest * GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, enum GNUNET_BLOCK_Type type, - const struct GNUNET_HashCode * query, + const struct GNUNET_HashCode *query, const struct GNUNET_PeerIdentity *target, - const char *bf_data, size_t bf_size, - uint32_t mingle, uint32_t anonymity_level, - uint32_t priority, int32_t ttl, + const char *bf_data, + size_t bf_size, + uint32_t mingle, + uint32_t anonymity_level, + uint32_t priority, + int32_t ttl, GNUNET_PEER_Id sender_pid, GNUNET_PEER_Id origin_pid, - const struct GNUNET_HashCode * replies_seen, + const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, - GSF_PendingRequestReplyHandler rh, void *rh_cls); + GSF_PendingRequestReplyHandler rh, + void *rh_cls); /** @@ -240,11 +244,11 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, * * @param pr request to update * @param replies_seen hash codes of replies that we've seen - * @param replies_seen_count size of the replies_seen array + * @param replies_seen_count size of the @a replies_seen array */ void GSF_pending_request_update_ (struct GSF_PendingRequest *pr, - const struct GNUNET_HashCode * replies_seen, + const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count); @@ -258,6 +262,16 @@ struct GSF_PendingRequestData * GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr); +/** + * Check if the given request is still active. + * + * @param pr pending request + * @return #GNUNET_YES if the request is still active + */ +int +GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr); + + /** * Test if two pending requests are compatible (would generate * the same query modulo filters and should thus be processed @@ -265,7 +279,7 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr); * * @param pra a pending request * @param prb another pending request - * @return GNUNET_OK if the requests are compatible + * @return #GNUNET_OK if the requests are compatible */ int GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, @@ -277,13 +291,14 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, * transmission to other peers (or at least determine its size). * * @param pr request to generate the message for - * @param buf_size number of bytes available in buf + * @param buf_size number of bytes available in @a buf * @param buf where to copy the message (can be NULL) - * @return number of bytes needed (if buf_size too small) or used + * @return number of bytes needed (if @a buf_size too small) or used */ size_t GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, - size_t buf_size, void *buf); + size_t buf_size, + void *buf); /** @@ -293,7 +308,8 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, * @param full_cleanup fully purge the request */ void -GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup); +GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, + int full_cleanup); /** @@ -303,11 +319,12 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup); * @param cls closure * @param key query for the request * @param pr handle to the pending request - * @return GNUNET_YES to continue to iterate + * @return #GNUNET_YES to continue to iterate */ -typedef int (*GSF_PendingRequestIterator) (void *cls, - const struct GNUNET_HashCode * key, - struct GSF_PendingRequest * pr); +typedef int +(*GSF_PendingRequestIterator) (void *cls, + const struct GNUNET_HashCode *key, + struct GSF_PendingRequest *pr); /** @@ -317,7 +334,8 @@ typedef int (*GSF_PendingRequestIterator) (void *cls, * @param cls closure for it */ void -GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls); +GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, + void *cls); /** @@ -329,8 +347,8 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls); * @param cp the other peer involved (sender or receiver, NULL * for loopback messages where we are both sender and receiver) * @param message the actual message - * @return GNUNET_OK if the message was well-formed, - * GNUNET_SYSERR if the message was malformed (close connection, + * @return #GNUNET_OK if the message was well-formed, + * #GNUNET_SYSERR if the message was malformed (close connection, * do not cache under any circumstances) */ int @@ -364,10 +382,10 @@ GSF_cadet_lookup_ (struct GSF_PendingRequest *pr); * @param pr the pending request we were processing * @param result final datastore lookup result */ -typedef void (*GSF_LocalLookupContinuation) (void *cls, - struct GSF_PendingRequest * pr, - enum GNUNET_BLOCK_EvaluationResult - result); +typedef void +(*GSF_LocalLookupContinuation) (void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result); /** @@ -375,11 +393,12 @@ typedef void (*GSF_LocalLookupContinuation) (void *cls, * * @param pr the pending request to process * @param cont function to call at the end - * @param cont_cls closure for cont + * @param cont_cls closure for @a cont */ void GSF_local_lookup_ (struct GSF_PendingRequest *pr, - GSF_LocalLookupContinuation cont, void *cont_cls); + GSF_LocalLookupContinuation cont, + void *cont_cls); /** @@ -387,7 +406,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, * * @param pr request * @param target - * @return GNUNET_YES if this request could be forwarded to the given peer + * @return #GNUNET_YES if this request could be forwarded to the given peer */ int GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr, -- 2.25.1