struct GSF_PendingRequest *pr;
/**
- * Earliest time we'd be happy to transmit this request.
+ * Earliest time we'd be happy to (re)transmit this request.
*/
struct GNUNET_TIME_Absolute earliest_transmission;
/**
- * Priority for this request for this target.
+ * When was the last time we transmitted this request to this peer? 0 for never.
*/
- uint32_t priority;
+ struct GNUNET_TIME_Absolute last_transmission;
+
+ /**
+ * Current priority for this request for this target.
+ */
+ uint64_t priority;
+
+ /**
+ * How often did we transmit this request to this peer?
+ */
+ unsigned int transmission_counter;
};
struct PeerPlan
{
/**
- * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority.
+ * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
*/
- struct GNUNET_CONTAINER_Heap *heap;
+ struct GNUNET_CONTAINER_Heap *priority_heap;
+
+ /**
+ * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
+ */
+ struct GNUNET_CONTAINER_Heap *delay_heap;
/**
* Current transmission request handle.
plan (struct PeerPlan *pp,
struct GSF_RequestPlan *rp)
{
- GNUNET_CONTAINER_HeapCostType weight;
struct GSF_PendingRequestData *prd;
prd = GSF_pending_request_get_data_ (rp->pr);
- weight = 0; // FIXME: calculate real weight!
// FIXME: calculate 'rp->earliest_transmission'!
// fIXME: claculate 'rp->priority'!
- rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap,
- rp,
- weight);
+
+ if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
+ else
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
+ rp,
+ rp->earliest_transmission.abs_value);
}
pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
return 0;
}
- rp = GNUNET_CONTAINER_heap_peek (pp->heap);
+ rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
if (msize > buf_size)
{
return 0;
}
/* remove from root, add again elsewhere... */
- GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap));
+ GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
rp->hn = NULL;
+ rp->last_transmission = GNUNET_TIME_absolute_get ();
+ rp->transmission_counter++;
plan (pp, rp);
return msize;
}
struct GSF_RequestPlan *rp;
struct GSF_PendingRequestData *prd;
size_t msize;
- struct GNUNET_TIME_Relative delay;
pp->task = GNUNET_SCHEDULER_NO_TASK;
- if (NULL == pp->heap)
- return;
- if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
- return;
GNUNET_assert (NULL == pp->pth);
- rp = GNUNET_CONTAINER_heap_peek (pp->heap);
- prd = GSF_pending_request_get_data_ (rp->pr);
- delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
- if (delay.rel_value > 0)
+ /* move ready requests to priority queue */
+ while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
+ (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
+ {
+ rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
+ GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
+ if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
+ break;
+ }
+ if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
{
- pp->task = GNUNET_SCHEDULER_add_delayed (delay,
+ /* priority heap (still) empty, check for delay... */
+ rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
+ if (NULL == rp)
+ return; /* both queues empty */
+ pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
&schedule_peer_transmission,
pp);
return;
}
+ /* process from priority heap */
+ rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+ prd = GSF_pending_request_get_data_ (rp->pr);
msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
pp->pth = GSF_peer_transmit_ (pp->cp,
GNUNET_YES,
if (NULL == pp)
{
pp = GNUNET_malloc (sizeof (struct PeerPlan));
- pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
+ pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
GNUNET_CONTAINER_multihashmap_put (plans,
&id.hashPubKey,
pp,
prd->rp_tail,
rp);
plan (pp, rp);
+ if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
+ {
+ /* no request that should be done immediately, figure out delay */
+ if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
+ return; /* did not change delay heap top, no need to do anything */
+ GNUNET_assert (NULL == pp->pth);
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
+ &schedule_peer_transmission,
+ pp);
+ return;
+ }
+
if (pp->pth != NULL)
{
- if (rp != GNUNET_CONTAINER_heap_peek (pp->heap))
- return;
+ if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
+ return; /* did not change priority heap top, no need to do anyhing */
GSF_peer_transmit_cancel_ (pp->pth);
pp->pth = NULL;
}
GSF_peer_transmit_cancel_ (pp->pth);
if (GNUNET_SCHEDULER_NO_TASK != pp->task)
GNUNET_SCHEDULER_cancel (pp->task);
- while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap)))
+ while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
+ {
+ prd = GSF_pending_request_get_data_ (rp->pr);
+ GNUNET_CONTAINER_DLL_remove (prd->rp_head,
+ prd->rp_tail,
+ rp);
+ GNUNET_free (rp);
+ }
+ GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
+ while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
{
prd = GSF_pending_request_get_data_ (rp->pr);
GNUNET_CONTAINER_DLL_remove (prd->rp_head,
rp);
GNUNET_free (rp);
}
- GNUNET_CONTAINER_heap_destroy (pp->heap);
+ GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
GNUNET_free (pp);
}