* @author Christian Grothoff
*/
#include "platform.h"
+#include "gnunet-service-fs.h"
#include "gnunet-service-fs_cp.h"
#include "gnunet-service-fs_pe.h"
#include "gnunet-service-fs_pr.h"
struct GSF_PendingRequest *pr;
/**
- * Earliest time we'd be happy to transmit this request.
+ * Earliest time we'd be happy to (re)transmit this request.
*/
struct GNUNET_TIME_Absolute earliest_transmission;
/**
- * Priority for this request for this target.
+ * When was the last time we transmitted this request to this peer? 0 for never.
*/
- uint32_t priority;
+ struct GNUNET_TIME_Absolute last_transmission;
+
+ /**
+ * Current priority for this request for this target.
+ */
+ uint64_t priority;
+
+ /**
+ * How often did we transmit this request to this peer?
+ */
+ unsigned int transmission_counter;
};
struct PeerPlan
{
/**
- * Heap with pending queries (struct GSF_RequestPlan), smaller weights mean higher priority.
+ * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
*/
- struct GNUNET_CONTAINER_Heap *heap;
+ struct GNUNET_CONTAINER_Heap *priority_heap;
+
+ /**
+ * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
+ */
+ struct GNUNET_CONTAINER_Heap *delay_heap;
/**
* Current transmission request handle.
*/
static struct GNUNET_CONTAINER_MultiHashMap *plans;
+/**
+ * Sum of all transmission counters (equals total delay for all plan entries).
+ */
+static unsigned long long total_delay;
+
+/**
+ * Number of plan entries.
+ */
+static unsigned long long plan_count;
+
+
+/**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' for transmission
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
/**
* Insert the given request plan into the heap with the appropriate weight.
plan (struct PeerPlan *pp,
struct GSF_RequestPlan *rp)
{
- GNUNET_CONTAINER_HeapCostType weight;
struct GSF_PendingRequestData *prd;
+ struct GNUNET_TIME_Relative delay;
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# average retransmission delay (ms)"),
+ total_delay * 1000LL / plan_count,
+ GNUNET_NO);
prd = GSF_pending_request_get_data_ (rp->pr);
- weight = 0; // FIXME: calculate real weight!
// FIXME: calculate 'rp->earliest_transmission'!
- // fIXME: claculate 'rp->priority'!
- rp->hn = GNUNET_CONTAINER_heap_insert (pp->heap,
- rp,
- weight);
+ // FIXME: claculate 'rp->priority'!
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ rp->transmission_counter);
+ rp->earliest_transmission
+ = GNUNET_TIME_relative_to_absolute (delay);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Earliest (re)transmission for `%s' in %us\n",
+ GNUNET_h2s (&prd->query),
+ rp->transmission_counter);
+#endif
+
+ GNUNET_assert (rp->hn == NULL);
+ if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
+ else
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
+ rp,
+ rp->earliest_transmission.abs_value);
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
}
-/**
- * Figure out when and how to transmit to the given peer.
- *
- * @param cls the 'struct GSF_ConnectedPeer' for transmission
- * @param tc scheduler context
- */
-static void
-schedule_peer_transmission (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
/**
* Function called to get a message for transmission.
*
struct GSF_RequestPlan *rp;
size_t msize;
+ pp->pth = NULL;
if (NULL == buf)
{
/* failed, try again... */
pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
return 0;
}
- rp = GNUNET_CONTAINER_heap_peek (pp->heap);
+ rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+ if (NULL == rp)
+ {
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
msize = GSF_pending_request_get_message_ (rp->pr, buf_size, buf);
if (msize > buf_size)
{
return 0;
}
/* remove from root, add again elsewhere... */
- GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->heap));
+ GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
rp->hn = NULL;
+ rp->last_transmission = GNUNET_TIME_absolute_get ();
+ rp->transmission_counter++;
+ total_delay++;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing plan %p executed %u times, planning retransmission\n",
+ rp,
+ rp->transmission_counter);
+#endif
plan (pp, rp);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries messages sent to other peers"),
+ 1,
+ GNUNET_NO);
return msize;
}
{
struct PeerPlan *pp = cls;
struct GSF_RequestPlan *rp;
- struct GSF_PendingRequestData *prd;
size_t msize;
- struct GNUNET_TIME_Relative delay;
pp->task = GNUNET_SCHEDULER_NO_TASK;
- if (NULL == pp->heap)
- return;
- if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
- return;
- GNUNET_assert (NULL == pp->pth);
- rp = GNUNET_CONTAINER_heap_peek (pp->heap);
- prd = GSF_pending_request_get_data_ (rp->pr);
- delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
- if (delay.rel_value > 0)
+ if (pp->pth != NULL)
{
- pp->task = GNUNET_SCHEDULER_add_delayed (delay,
+ GSF_peer_transmit_cancel_ (pp->pth);
+ pp->pth = NULL;
+ }
+ /* move ready requests to priority queue */
+ while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
+ (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
+ {
+ GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
+ }
+ if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
+ {
+ /* priority heap (still) empty, check for delay... */
+ rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
+ if (NULL == rp)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No active requests for plan %p.\n",
+ pp);
+#endif
+ return; /* both queues empty */
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sleeping for %llu ms before retrying requests on plan %p.\n",
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
+ pp);
+#endif
+ pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
&schedule_peer_transmission,
pp);
return;
}
- msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
+ /* process from priority heap */
+ rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+#if DEBUG_FS > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing query plan %p\n",
+ rp);
+#endif
+ GNUNET_assert (NULL != rp);
+ msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
pp->pth = GSF_peer_transmit_ (pp->cp,
GNUNET_YES,
rp->priority,
* @param pr request with the entry
*/
void
-GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
+GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
struct GSF_PendingRequest *pr)
{
struct GNUNET_PeerIdentity id;
struct PeerPlan *pp;
struct GSF_PendingRequestData *prd;
struct GSF_RequestPlan *rp;
-
+
+ GNUNET_assert (NULL != cp);
GSF_connected_peer_get_identity_ (cp, &id);
pp = GNUNET_CONTAINER_multihashmap_get (plans,
&id.hashPubKey);
if (NULL == pp)
{
pp = GNUNET_malloc (sizeof (struct PeerPlan));
- pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
+ pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ pp->cp = cp;
GNUNET_CONTAINER_multihashmap_put (plans,
&id.hashPubKey,
pp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
prd = GSF_pending_request_get_data_ (pr);
+ plan_count++;
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ 1,
+ GNUNET_NO);
rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Planning transmission of query `%s' to peer `%s' (%p)\n",
+ GNUNET_h2s (&prd->query),
+ GNUNET_i2s (&id),
+ rp);
+#endif
rp->pr = pr;
GNUNET_CONTAINER_DLL_insert (prd->rp_head,
prd->rp_tail,
rp);
plan (pp, rp);
- if (pp->pth != NULL)
- {
- if (rp != GNUNET_CONTAINER_heap_peek (pp->heap))
- return;
- GSF_peer_transmit_cancel_ (pp->pth);
- pp->pth = NULL;
- }
- if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
- pp);
}
GSF_connected_peer_get_identity_ (cp, &id);
pp = GNUNET_CONTAINER_multihashmap_get (plans,
&id.hashPubKey);
+ if (NULL == pp)
+ return; /* nothing was ever planned for this peer */
GNUNET_CONTAINER_multihashmap_remove (plans,
&id.hashPubKey,
pp);
if (NULL != pp->pth)
GSF_peer_transmit_cancel_ (pp->pth);
if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->heap)))
+ {
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
+ {
+ prd = GSF_pending_request_get_data_ (rp->pr);
+ GNUNET_CONTAINER_DLL_remove (prd->rp_head,
+ prd->rp_tail,
+ rp);
+ plan_count--;
+ GNUNET_free (rp);
+ }
+ GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
+ while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
{
prd = GSF_pending_request_get_data_ (rp->pr);
GNUNET_CONTAINER_DLL_remove (prd->rp_head,
prd->rp_tail,
rp);
+ plan_count--;
GNUNET_free (rp);
}
- GNUNET_CONTAINER_heap_destroy (pp->heap);
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ plan_count,
+ GNUNET_NO);
+
+ GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
GNUNET_free (pp);
}
* @param pr request that is done
*/
void
-GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr)
+GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
{
struct GSF_RequestPlan *rp;
struct GSF_PendingRequestData *prd;
+ prd = GSF_pending_request_get_data_ (pr);
while (NULL != (rp = prd->rp_head))
{
- prd = GSF_pending_request_get_data_ (rp->pr);
GNUNET_CONTAINER_heap_remove_node (rp->hn);
GNUNET_CONTAINER_DLL_remove (prd->rp_head,
prd->rp_tail,
rp);
+ plan_count--;
GNUNET_free (rp);
}
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ plan_count,
+ GNUNET_NO);
}