From 68b9a6e898f2f2f48006311adfcb7d71055d9c7c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 25 Jul 2011 15:38:48 +0000 Subject: [PATCH] deduplicate requests --- src/fs/gnunet-service-fs_pe.c | 100 +++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 21 deletions(-) diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b52e04712..83733ef8d 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -55,9 +55,9 @@ struct GSF_RequestPlan struct GNUNET_CONTAINER_HeapNode *hn; /** - * Associated pending request. + * Array of associated pending requests. */ - struct GSF_PendingRequest *pr; + struct GSF_PendingRequest **prs; /** * Earliest time we'd be happy to (re)transmit this request. @@ -69,6 +69,11 @@ struct GSF_RequestPlan */ struct GNUNET_TIME_Absolute last_transmission; + /** + * Number of entries in 'prs'. + */ + unsigned int prs_length; + /** * Current priority for this request for this target. */ @@ -158,7 +163,7 @@ plan (struct PeerPlan *pp, gettext_noop ("# average retransmission delay (ms)"), total_delay * 1000LL / plan_count, GNUNET_NO); - prd = GSF_pending_request_get_data_ (rp->pr); + prd = GSF_pending_request_get_data_ (rp->prs[0]); // FIXME: calculate 'rp->priority'! if (rp->transmission_counter < 32) delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, @@ -220,7 +225,7 @@ transmit_message_callback (void *cls, pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); return 0; } - msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf); + msize = GSF_pending_request_get_message_ (rp->prs[0], buf_size, buf); if (msize > buf_size) { /* buffer to small (message changed), try again */ @@ -309,7 +314,7 @@ schedule_peer_transmission (void *cls, rp); #endif GNUNET_assert (NULL != rp); - msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); + msize = GSF_pending_request_get_message_ (rp->prs[0], 0, NULL); pp->pth = GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority, @@ -335,6 +340,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, struct PeerPlan *pp; struct GSF_PendingRequestData *prd; struct GSF_RequestPlan *rp; + unsigned int i; + size_t msize; GNUNET_assert (NULL != cp); GSF_connected_peer_get_identity_ (cp, &id); @@ -351,12 +358,39 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, pp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } + msize = GSF_pending_request_get_message_ (pr, 0, NULL); prd = GSF_pending_request_get_data_ (pr); + for (rp = prd->rp_head; NULL != rp; rp = rp->next) + { + char mbuf[msize]; + char xbuf[msize]; + + GNUNET_assert (msize == GSF_pending_request_get_message_ (pr, msize, mbuf)); + if ( (msize == GSF_pending_request_get_message_ (rp->prs[0], msize, xbuf)) && + (0 == memcmp (xbuf, mbuf, msize)) ) + { + /* add request to existing plan */ + GNUNET_array_append (rp->prs, + rp->prs_length, + pr); + for (i=0;iprs_length;i++) + if (GSF_pending_request_get_data_ (rp->prs[0])->ttl.abs_value < prd->ttl.abs_value) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# requests refreshed"), + 1, + GNUNET_NO); + rp->transmission_counter = 0; /* reset */ + break; + } + return; + } + } plan_count++; GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1, - GNUNET_NO); + GNUNET_NO); rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan)); #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -365,7 +399,9 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, GNUNET_i2s (&id), rp); #endif - rp->pr = pr; + GNUNET_array_append (rp->prs, + rp->prs_length, + pr); GNUNET_CONTAINER_DLL_insert (prd->rp_head, prd->rp_tail, rp); @@ -386,6 +422,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) struct PeerPlan *pp; struct GSF_RequestPlan *rp; struct GSF_PendingRequestData *prd; + unsigned int i; GSF_connected_peer_get_identity_ (cp, &id); pp = GNUNET_CONTAINER_multihashmap_get (plans, @@ -405,21 +442,29 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) } while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) { - prd = GSF_pending_request_get_data_ (rp->pr); - GNUNET_CONTAINER_DLL_remove (prd->rp_head, - prd->rp_tail, - rp); + for (i=0;iprs_length;i++) + { + prd = GSF_pending_request_get_data_ (rp->prs[i]); + GNUNET_CONTAINER_DLL_remove (prd->rp_head, + prd->rp_tail, + rp); + } plan_count--; + GNUNET_array_grow (rp->prs, rp->prs_length, 0); GNUNET_free (rp); } GNUNET_CONTAINER_heap_destroy (pp->priority_heap); while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) { - prd = GSF_pending_request_get_data_ (rp->pr); - GNUNET_CONTAINER_DLL_remove (prd->rp_head, - prd->rp_tail, - rp); + for (i=0;iprs_length;i++) + { + prd = GSF_pending_request_get_data_ (rp->prs[i]); + GNUNET_CONTAINER_DLL_remove (prd->rp_head, + prd->rp_tail, + rp); + } plan_count--; + GNUNET_array_grow (rp->prs, rp->prs_length, 0); GNUNET_free (rp); } GNUNET_STATISTICS_set (GSF_stats, @@ -443,16 +488,29 @@ GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr) { struct GSF_RequestPlan *rp; struct GSF_PendingRequestData *prd; + unsigned int i; prd = GSF_pending_request_get_data_ (pr); while (NULL != (rp = prd->rp_head)) { - GNUNET_CONTAINER_heap_remove_node (rp->hn); - GNUNET_CONTAINER_DLL_remove (prd->rp_head, - prd->rp_tail, - rp); - plan_count--; - GNUNET_free (rp); + for (i=0;iprs_length;i++) + { + if (rp->prs[i] == pr) + { + rp->prs[i] = rp->prs[rp->prs_length - 1]; + GNUNET_array_grow (rp->prs, rp->prs_length, rp->prs_length-1); + if (rp->prs_length == 0) + { + GNUNET_CONTAINER_heap_remove_node (rp->hn); + GNUNET_CONTAINER_DLL_remove (prd->rp_head, + prd->rp_tail, + rp); + plan_count--; + GNUNET_free (rp); + break; + } + } + } } GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"), -- 2.25.1