From bf4a9d8364675b34ac18d505e508006e2b773670 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 26 Jul 2011 16:27:05 +0000 Subject: [PATCH] fixing fs --- src/fs/gnunet-service-fs.c | 2 +- src/fs/gnunet-service-fs.h | 6 + src/fs/gnunet-service-fs_cp.c | 4 +- src/fs/gnunet-service-fs_lc.c | 4 +- src/fs/gnunet-service-fs_pe.c | 319 +++++++++++++++++++++++++--------- src/fs/gnunet-service-fs_pr.c | 68 +++++++- src/fs/gnunet-service-fs_pr.h | 22 ++- 7 files changed, 334 insertions(+), 91 deletions(-) diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index a52e06c02..5ea1bb7c1 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -347,7 +347,7 @@ start_p2p_processing (void *cls, return; /* we're done, 'pr' was already destroyed... */ if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) ) { - GSF_pending_request_cancel_ (pr); + GSF_pending_request_cancel_ (pr, GNUNET_YES); return; } GSF_dht_lookup_ (pr); diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index ed7cd2e7b..bee814318 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -87,6 +87,12 @@ struct GSF_LocalClient; */ struct GSF_RequestPlan; +/** + * DLL of request plans a particular pending request is + * involved with. + */ +struct GSF_RequestPlanReference; + /** * Our connection to the datastore. */ diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index f09516693..ea8a84dfe 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -811,7 +811,7 @@ cancel_pending_request (void *cls, GNUNET_CONTAINER_multihashmap_remove (cp->request_map, &prd->query, peerreq)); - GSF_pending_request_cancel_ (pr); + GSF_pending_request_cancel_ (pr, GNUNET_NO); GNUNET_free (peerreq); return GNUNET_OK; } @@ -1368,7 +1368,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, -1, GNUNET_NO); priority += prd->priority; - GSF_pending_request_cancel_ (pr); + GSF_pending_request_cancel_ (pr, GNUNET_YES); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (cp->request_map, &gm->query, diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index f56ea5116..18f5c10dc 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -202,7 +202,7 @@ client_request_destroy (void *cls, GNUNET_CONTAINER_DLL_remove (lc->cr_head, lc->cr_tail, cr); - GSF_pending_request_cancel_ (cr->pr); + GSF_pending_request_cancel_ (cr->pr, GNUNET_NO); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# client searches active"), - 1, @@ -514,7 +514,7 @@ GSF_client_disconnect_handler_ (void *cls, GNUNET_CONTAINER_DLL_remove (pos->cr_head, pos->cr_tail, cr); - GSF_pending_request_cancel_ (cr->pr); + GSF_pending_request_cancel_ (cr->pr, GNUNET_NO); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# client searches active"), - 1, diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index 83733ef8d..4dd54c88e 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -30,6 +30,72 @@ #include "gnunet-service-fs_pr.h" +/** + * List of GSF_PendingRequests this request plan + * participates with. + */ +struct PendingRequestList; + + +/** + * DLL of request plans a particular pending request is + * involved with. + */ +struct GSF_RequestPlanReference +{ + + /** + * This is a doubly-linked list. + */ + struct GSF_RequestPlanReference *next; + + /** + * This is a doubly-linked list. + */ + struct GSF_RequestPlanReference *prev; + + /** + * Associated request plan. + */ + struct GSF_RequestPlan *rp; + + /** + * Corresponding PendingRequestList. + */ + struct PendingRequestList *prl; +}; + + +/** + * List of GSF_PendingRequests this request plan + * participates with. + */ +struct PendingRequestList +{ + + /** + * This is a doubly-linked list. + */ + struct PendingRequestList *next; + + /** + * This is a doubly-linked list. + */ + struct PendingRequestList *prev; + + /** + * Array of associated pending requests. + */ + struct GSF_PendingRequest *pr; + + /** + * Corresponding GSF_RequestPlanReference. + */ + struct GSF_RequestPlanReference *rpr; + +}; + + /** * Information we keep per request per peer. This is a doubly-linked * list (with head and tail in the 'struct GSF_PendingRequestData') @@ -55,9 +121,14 @@ struct GSF_RequestPlan struct GNUNET_CONTAINER_HeapNode *hn; /** - * Array of associated pending requests. + * Head of list of associated pending requests. */ - struct GSF_PendingRequest **prs; + struct PendingRequestList *prl_head; + + /** + * Tail of list of associated pending requests. + */ + struct PendingRequestList *prl_tail; /** * Earliest time we'd be happy to (re)transmit this request. @@ -69,11 +140,6 @@ struct GSF_RequestPlan */ struct GNUNET_TIME_Absolute last_transmission; - /** - * Number of entries in 'prs'. - */ - unsigned int prs_length; - /** * Current priority for this request for this target. */ @@ -163,7 +229,7 @@ plan (struct PeerPlan *pp, gettext_noop ("# average retransmission delay (ms)"), total_delay * 1000LL / plan_count, GNUNET_NO); - prd = GSF_pending_request_get_data_ (rp->prs[0]); + prd = GSF_pending_request_get_data_ (rp->prl_head->pr); // FIXME: calculate 'rp->priority'! if (rp->transmission_counter < 32) delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, @@ -195,6 +261,32 @@ plan (struct PeerPlan *pp, } +/** + * Get the pending request with the highest TTL from the given plan. + * + * @param rp plan to investigate + * @return pending request with highest TTL + */ +struct GSF_PendingRequest * +get_latest (const struct GSF_RequestPlan *rp) +{ + struct GSF_PendingRequest *ret; + struct PendingRequestList *prl; + + prl = rp->prl_head; + ret = prl->pr; + prl = prl->next; + while (NULL != prl) + { + if (GSF_pending_request_get_data_ (prl->pr)->ttl.abs_value > + GSF_pending_request_get_data_ (ret)->ttl.abs_value) + ret = prl->pr; + prl = prl->next; + } + return ret; +} + + /** * Function called to get a message for transmission. * @@ -225,7 +317,7 @@ transmit_message_callback (void *cls, pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); return 0; } - msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf); + msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf); if (msize > buf_size) { /* buffer to small (message changed), try again */ @@ -314,7 +406,7 @@ schedule_peer_transmission (void *cls, rp); #endif GNUNET_assert (NULL != rp); - msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL); + msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); pp->pth = GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority, @@ -326,6 +418,79 @@ schedule_peer_transmission (void *cls, } +/** + * Closure for 'merge_pr'. + */ +struct MergeContext +{ + + struct GSF_PendingRequest *pr; + + int merged; + +}; + + +/** + * Iterator that checks if an equivalent request is already + * present for this peer. + * + * @param cls closure + * @param node internal node of the heap (ignored) + * @param element request plan stored at the node + * @param cost cost associated with the node (ignored) + * @return GNUNET_YES if we should continue to iterate, + * GNUNET_NO if not (merge success) + */ +static int +merge_pr (void *cls, + struct GNUNET_CONTAINER_HeapNode *node, + void *element, + GNUNET_CONTAINER_HeapCostType cost) +{ + struct MergeContext *mpr = cls; + struct GSF_RequestPlan *rp = element; + struct GSF_PendingRequestData *prd; + struct GSF_RequestPlanReference *rpr; + struct PendingRequestList *prl; + struct GSF_PendingRequest *latest; + + if (GNUNET_OK != + GSF_pending_request_is_compatible_ (mpr->pr, + rp->prl_head->pr)) + return GNUNET_YES; + /* merge new request with existing request plan */ + rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference)); + prl = GNUNET_malloc (sizeof (struct PendingRequestList)); + rpr->rp = rp; + rpr->prl = prl; + prl->rpr = rpr; + prl->pr = mpr->pr; + prd = GSF_pending_request_get_data_ (mpr->pr); + GNUNET_CONTAINER_DLL_insert (prd->rpr_head, + prd->rpr_tail, + rpr); + GNUNET_CONTAINER_DLL_insert (rp->prl_head, + rp->prl_tail, + prl); + mpr->merged = GNUNET_YES; + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# requests merged"), + 1, + GNUNET_NO); + latest = get_latest (rp); + if (GSF_pending_request_get_data_ (latest)->ttl.abs_value < prd->ttl.abs_value) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# requests refreshed"), + 1, + GNUNET_NO); + rp->transmission_counter = 0; /* reset */ + } + return GNUNET_NO; +} + + /** * Create a new query plan entry. * @@ -340,7 +505,9 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct PeerPlan *pp; struct GSF_PendingRequestData *prd; struct GSF_RequestPlan *rp; - unsigned int i; + struct GSF_RequestPlanReference *rpr; + struct PendingRequestList *prl; + struct MergeContext mpc; size_t msize; GNUNET_assert (NULL != cp); @@ -359,52 +526,39 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } msize = GSF_pending_request_get_message_ (pr, 0, NULL); - prd = GSF_pending_request_get_data_ (pr); - for (rp = prd->rp_head; NULL != rp; rp = rp->next) - { - char mbuf[msize]; - char xbuf[msize]; - - GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf)); - if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) && - (0 == memcmp (xbuf, mbuf, msize)) ) - { - /* add request to existing plan */ - GNUNET_array_append (rp->prs, - rp->prs_length, - pr); - for (i=0;iprs_length;i++) - if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value) - { - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# requests refreshed"), - 1, - GNUNET_NO); - rp->transmission_counter = 0; /* reset */ - break; - } - return; - } - } + mpc.merged = GNUNET_NO; + mpc.pr = pr; + GNUNET_CONTAINER_heap_iterate (pp->priority_heap, &merge_pr, &mpc); + if (mpc.merged != GNUNET_NO) + return; + GNUNET_CONTAINER_heap_iterate (pp->delay_heap, &merge_pr, &mpc); + if (mpc.merged != GNUNET_NO) + return; plan_count++; GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1, GNUNET_NO); - rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); + prd = GSF_pending_request_get_data_ (pr); #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Planning transmission of query `%s' to peer `%s' (%p)\n", + "Planning transmission of query `%s' to peer `%s'\n", GNUNET_h2s (&prd->query), - GNUNET_i2s (&id), - rp); + GNUNET_i2s (&id)); #endif - GNUNET_array_append (rp->prs, - rp->prs_length, - pr); - GNUNET_CONTAINER_DLL_insert (prd->rp_head, - prd->rp_tail, - rp); + rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); + rpr = GNUNET_malloc (sizeof (struct GSF_RequestPlanReference)); + prl = GNUNET_malloc (sizeof (struct PendingRequestList)); + rpr->rp = rp; + rpr->prl = prl; + prl->rpr = rpr; + prl->pr = pr; + GNUNET_CONTAINER_DLL_insert (prd->rpr_head, + prd->rpr_tail, + rpr); + GNUNET_CONTAINER_DLL_insert (rp->prl_head, + rp->prl_tail, + prl); plan (pp, rp); } @@ -422,7 +576,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) struct PeerPlan *pp; struct GSF_RequestPlan *rp; struct GSF_PendingRequestData *prd; - unsigned int i; + struct PendingRequestList *prl; GSF_connected_peer_get_identity_ (cp, &id); pp = GNUNET_CONTAINER_multihashmap_get (plans, @@ -442,29 +596,35 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) } while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) { - for (i=0;iprs_length;i++) + while (NULL != (prl = rp->prl_head)) { - prd = GSF_pending_request_get_data_ (rp->prs[i]); - GNUNET_CONTAINER_DLL_remove (prd->rp_head, - prd->rp_tail, - rp); + GNUNET_CONTAINER_DLL_remove (rp->prl_head, + rp->prl_tail, + prl); + prd = GSF_pending_request_get_data_ (prl->pr); + GNUNET_CONTAINER_DLL_remove (prd->rpr_head, + prd->rpr_tail, + prl->rpr); + GNUNET_free (prl->rpr); + GNUNET_free (prl); } - plan_count--; - GNUNET_array_grow (rp->prs, rp->prs_length, 0); GNUNET_free (rp); } GNUNET_CONTAINER_heap_destroy (pp->priority_heap); while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) { - for (i=0;iprs_length;i++) + while (NULL != (prl = rp->prl_head)) { - prd = GSF_pending_request_get_data_ (rp->prs[i]); - GNUNET_CONTAINER_DLL_remove (prd->rp_head, - prd->rp_tail, - rp); + GNUNET_CONTAINER_DLL_remove (rp->prl_head, + rp->prl_tail, + prl); + prd = GSF_pending_request_get_data_ (prl->pr); + GNUNET_CONTAINER_DLL_remove (prd->rpr_head, + prd->rpr_tail, + prl->rpr); + GNUNET_free (prl->rpr); + GNUNET_free (prl); } - plan_count--; - GNUNET_array_grow (rp->prs, rp->prs_length, 0); GNUNET_free (rp); } GNUNET_STATISTICS_set (GSF_stats, @@ -488,28 +648,25 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr) { struct GSF_RequestPlan *rp; struct GSF_PendingRequestData *prd; - unsigned int i; + struct GSF_RequestPlanReference *rpr; prd = GSF_pending_request_get_data_ (pr); - while (NULL != (rp = prd->rp_head)) + while (NULL != (rpr = prd->rpr_head)) { - for (i=0;iprs_length;i++) + GNUNET_CONTAINER_DLL_remove (prd->rpr_head, + prd->rpr_tail, + rpr); + rp = rpr->rp; + GNUNET_CONTAINER_DLL_remove (rp->prl_head, + rp->prl_tail, + rpr->prl); + GNUNET_free (rpr->prl); + GNUNET_free (rpr); + if (rp->prl_head == 0) { - if (rp->prs[i] == pr) - { - rp->prs[i] = rp->prs[rp->prs_length - 1]; - GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1); - if (rp->prs_length == 0) - { - GNUNET_CONTAINER_heap_remove_node (rp->hn); - GNUNET_CONTAINER_DLL_remove (prd->rp_head, - prd->rp_tail, - rp); - plan_count--; - GNUNET_free (rp); - break; - } - } + GNUNET_CONTAINER_heap_remove_node (rp->hn); + plan_count--; + GNUNET_free (rp); } } GNUNET_STATISTICS_set (GSF_stats, diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 37865d913..4db3505dd 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -385,7 +385,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY, NULL, 0); - GSF_pending_request_cancel_ (dpr); + GSF_pending_request_cancel_ (dpr, GNUNET_YES); } } GNUNET_STATISTICS_update (GSF_stats, @@ -409,6 +409,33 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr) } +/** + * Test if two pending requests are compatible (would generate + * the same query modulo filters and should thus be processed + * jointly). + * + * @param pra a pending request + * @param pra another pending request + * @return GNUNET_OK if the requests are compatible + */ +int +GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, + struct GSF_PendingRequest *prb) +{ + if ( (pra->public_data.type != prb->public_data.type) || + (0 != memcmp (&pra->public_data.query, + &prb->public_data.query, + sizeof (GNUNET_HashCode))) || + ( (pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && + (0 != memcmp (&pra->public_data.namespace, + &prb->public_data.namespace, + sizeof (GNUNET_HashCode))) ) ) + return GNUNET_NO; + return GNUNET_OK; +} + + + /** * Update a given pending request with additional replies * that have been seen. @@ -646,12 +673,47 @@ clean_request (void *cls, * Explicitly cancel a pending request. * * @param pr request to cancel + * @param full_cleanup fully purge the request */ void -GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) +GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, + int full_cleanup) { + GSF_LocalLookupContinuation cont; + if (NULL == pr_map) return; /* already cleaned up! */ + if (GNUNET_YES != full_cleanup) + { + /* make request inactive (we're no longer interested in more results), + but do NOT remove from our data-structures, we still need it there + to prevent the request from looping */ + pr->rh = NULL; + if (NULL != (cont = pr->llc_cont)) + { + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, + pr, + pr->local_result); + } + GSF_plan_notify_request_done_ (pr); + if (NULL != pr->qe) + { + GNUNET_DATASTORE_cancel (pr->qe); + pr->qe = NULL; + } + if (NULL != pr->gh) + { + GNUNET_DHT_get_stop (pr->gh); + pr->gh = NULL; + } + if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) + { + GNUNET_SCHEDULER_cancel (pr->warn_task); + pr->warn_task = GNUNET_SCHEDULER_NO_TASK; + } + return; + } GNUNET_assert (GNUNET_YES == clean_request (NULL, &pr->public_data.query, pr)); } @@ -763,6 +825,8 @@ process_reply (void *cls, struct GSF_PendingRequest *pr = value; GNUNET_HashCode chash; + if (NULL == pr->rh) + return GNUNET_YES; #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Matched result (type %u) for query `%s' with pending request\n", diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 2f828e281..1e71aa7ee 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -94,12 +94,12 @@ struct GSF_PendingRequestData /** * Fields for the plan module to track a DLL with the request. */ - struct GSF_RequestPlan *rp_head; + struct GSF_RequestPlanReference *rpr_head; /** * Fields for the plan module to track a DLL with the request. */ - struct GSF_RequestPlan *rp_tail; + struct GSF_RequestPlanReference *rpr_tail; /** * Current TTL for the request. @@ -241,6 +241,20 @@ struct GSF_PendingRequestData * GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr); +/** + * Test if two pending requests are compatible (would generate + * the same query modulo filters and should thus be processed + * jointly). + * + * @param pra a pending request + * @param pra another pending request + * @return GNUNET_OK if the requests are compatible + */ +int +GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, + struct GSF_PendingRequest *prb); + + /** * Generate the message corresponding to the given pending request for * transmission to other peers (or at least determine its size). @@ -260,9 +274,11 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, * Explicitly cancel a pending request. * * @param pr request to cancel + * @param full_cleanup fully purge the request */ void -GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr); +GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, + int full_cleanup); /** -- 2.25.1