From ca98a9146439dca8bc6349ada18e7cbe8ecae63c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 14 Mar 2011 19:04:15 +0000 Subject: [PATCH] adding delay heap --- src/fs/gnunet-service-fs_pe.c | 109 +++++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 27 deletions(-) diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index af20c4bf3..1c73c8dda 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -59,14 +59,24 @@ struct GSF_RequestPlan struct GSF_PendingRequest *pr; /** - * Earliest time we'd be happy to transmit this request. + * Earliest time we'd be happy to (re)transmit this request. */ struct GNUNET_TIME_Absolute earliest_transmission; /** - * Priority for this request for this target. + * When was the last time we transmitted this request to this peer? 0 for never. */ - uint32_t priority; + struct GNUNET_TIME_Absolute last_transmission; + + /** + * Current priority for this request for this target. + */ + uint64_t priority; + + /** + * How often did we transmit this request to this peer? + */ + unsigned int transmission_counter; }; @@ -77,9 +87,14 @@ struct GSF_RequestPlan struct PeerPlan { /** - * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority. + * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority. */ - struct GNUNET_CONTAINER_Heap *heap; + struct GNUNET_CONTAINER_Heap *priority_heap; + + /** + * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first. + */ + struct GNUNET_CONTAINER_Heap *delay_heap; /** * Current transmission request handle. @@ -114,16 +129,20 @@ static void plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp) { - GNUNET_CONTAINER_HeapCostType weight; struct GSF_PendingRequestData *prd; prd = GSF_pending_request_get_data_ (rp->pr); - weight = 0; // FIXME: calculate real weight! // FIXME: calculate 'rp->earliest_transmission'! // fIXME: claculate 'rp->priority'! - rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap, - rp, - weight); + + if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) + rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, + rp, + rp->priority); + else + rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap, + rp, + rp->earliest_transmission.abs_value); } @@ -161,7 +180,7 @@ transmit_message_callback (void *cls, pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); return 0; } - rp = GNUNET_CONTAINER_heap_peek (pp->heap); + rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf); if (msize > buf_size) { @@ -170,8 +189,10 @@ transmit_message_callback (void *cls, return 0; } /* remove from root, add again elsewhere... */ - GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap)); + GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); rp->hn = NULL; + rp->last_transmission = GNUNET_TIME_absolute_get (); + rp->transmission_counter++; plan (pp, rp); return msize; } @@ -191,24 +212,34 @@ schedule_peer_transmission (void *cls, struct GSF_RequestPlan *rp; struct GSF_PendingRequestData *prd; size_t msize; - struct GNUNET_TIME_Relative delay; pp->task = GNUNET_SCHEDULER_NO_TASK; - if (NULL == pp->heap) - return; - if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap)) - return; GNUNET_assert (NULL == pp->pth); - rp = GNUNET_CONTAINER_heap_peek (pp->heap); - prd = GSF_pending_request_get_data_ (rp->pr); - delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission); - if (delay.rel_value > 0) + /* move ready requests to priority queue */ + while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && + (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) ) + { + rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap); + GNUNET_CONTAINER_heap_insert (pp->priority_heap, + rp, + rp->priority); + if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) + break; + } + if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) { - pp->task = GNUNET_SCHEDULER_add_delayed (delay, + /* priority heap (still) empty, check for delay... */ + rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap); + if (NULL == rp) + return; /* both queues empty */ + pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), &schedule_peer_transmission, pp); return; } + /* process from priority heap */ + rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); + prd = GSF_pending_request_get_data_ (rp->pr); msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL); pp->pth = GSF_peer_transmit_ (pp->cp, GNUNET_YES, @@ -242,7 +273,8 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, if (NULL == pp) { pp = GNUNET_malloc (sizeof (struct PeerPlan)); - pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX); + pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); GNUNET_CONTAINER_multihashmap_put (plans, &id.hashPubKey, pp, @@ -255,10 +287,24 @@ GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, prd->rp_tail, rp); plan (pp, rp); + if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap)) + { + /* no request that should be done immediately, figure out delay */ + if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap)) + return; /* did not change delay heap top, no need to do anything */ + GNUNET_assert (NULL == pp->pth); + if (GNUNET_SCHEDULER_NO_TASK != pp->task) + GNUNET_SCHEDULER_cancel (pp->task); + pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission), + &schedule_peer_transmission, + pp); + return; + } + if (pp->pth != NULL) { - if (rp != GNUNET_CONTAINER_heap_peek (pp->heap)) - return; + if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap)) + return; /* did not change priority heap top, no need to do anyhing */ GSF_peer_transmit_cancel_ (pp->pth); pp->pth = NULL; } @@ -293,7 +339,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) GSF_peer_transmit_cancel_ (pp->pth); if (GNUNET_SCHEDULER_NO_TASK != pp->task) GNUNET_SCHEDULER_cancel (pp->task); - while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap))) + while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) + { + prd = GSF_pending_request_get_data_ (rp->pr); + GNUNET_CONTAINER_DLL_remove (prd->rp_head, + prd->rp_tail, + rp); + GNUNET_free (rp); + } + GNUNET_CONTAINER_heap_destroy (pp->priority_heap); + while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap))) { prd = GSF_pending_request_get_data_ (rp->pr); GNUNET_CONTAINER_DLL_remove (prd->rp_head, @@ -301,7 +356,7 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) rp); GNUNET_free (rp); } - GNUNET_CONTAINER_heap_destroy (pp->heap); + GNUNET_CONTAINER_heap_destroy (pp->delay_heap); GNUNET_free (pp); } -- 2.25.1