*/
static struct GNUNET_CONTAINER_MultiHashMap *plans;
+/**
+ * Sum of all transmission counters (equals total delay for all plan entries).
+ */
+static unsigned long long total_delay;
+
+/**
+ * Number of plan entries.
+ */
+static unsigned long long plan_count;
+
+
+/**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' for transmission
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
/**
* Insert the given request plan into the heap with the appropriate weight.
struct GSF_RequestPlan *rp)
{
struct GSF_PendingRequestData *prd;
+ struct GNUNET_TIME_Relative delay;
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# average retransmission delay (ms)"),
+ total_delay * 1000LL / plan_count,
+ GNUNET_NO);
prd = GSF_pending_request_get_data_ (rp->pr);
// FIXME: calculate 'rp->earliest_transmission'!
- // fIXME: claculate 'rp->priority'!
-
+ // FIXME: claculate 'rp->priority'!
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ rp->transmission_counter);
+ rp->earliest_transmission
+ = GNUNET_TIME_relative_to_absolute (delay);
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Earliest (re)transmission for `%s' in %us\n",
+ GNUNET_h2s (&prd->query),
+ rp->transmission_counter);
+#endif
+
+ GNUNET_assert (rp->hn == NULL);
if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
rp,
rp->hn = GNUNET_CONTAINER_heap_insert (pp->delay_heap,
rp,
rp->earliest_transmission.abs_value);
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
}
-/**
- * Figure out when and how to transmit to the given peer.
- *
- * @param cls the 'struct GSF_ConnectedPeer' for transmission
- * @param tc scheduler context
- */
-static void
-schedule_peer_transmission (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
/**
* Function called to get a message for transmission.
*
rp->hn = NULL;
rp->last_transmission = GNUNET_TIME_absolute_get ();
rp->transmission_counter++;
+ total_delay++;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing plan %p executed %u times, planning retransmission\n",
+ rp,
+ rp->transmission_counter);
+#endif
plan (pp, rp);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries messages sent to other peers"),
+ 1,
+ GNUNET_NO);
return msize;
}
{
struct PeerPlan *pp = cls;
struct GSF_RequestPlan *rp;
- struct GSF_PendingRequestData *prd;
size_t msize;
pp->task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_assert (NULL == pp->pth);
+ if (pp->pth != NULL)
+ {
+ GSF_peer_transmit_cancel_ (pp->pth);
+ pp->pth = NULL;
+ }
/* move ready requests to priority queue */
while ( (NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
(GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) )
{
- rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap);
- GNUNET_CONTAINER_heap_insert (pp->priority_heap,
- rp,
- rp->priority);
- if (NULL == (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap)))
- break;
+ GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
+ rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
+ rp,
+ rp->priority);
}
if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
{
/* priority heap (still) empty, check for delay... */
rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
if (NULL == rp)
- return; /* both queues empty */
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No active requests for plan %p.\n",
+ pp);
+#endif
+ return; /* both queues empty */
+ }
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sleeping for %llu ms before retrying requests on plan %p.\n",
+ (unsigned long long) GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value,
+ pp);
+#endif
pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
&schedule_peer_transmission,
pp);
}
/* process from priority heap */
rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
+#if DEBUG_FS > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Executing query plan %p\n",
+ rp);
+#endif
GNUNET_assert (NULL != rp);
- prd = GSF_pending_request_get_data_ (rp->pr);
- msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
+ msize = GSF_pending_request_get_message_ (rp->pr, 0, NULL);
pp->pth = GSF_peer_transmit_ (pp->cp,
GNUNET_YES,
rp->priority,
* @param pr request with the entry
*/
void
-GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
+GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
struct GSF_PendingRequest *pr)
{
struct GNUNET_PeerIdentity id;
struct PeerPlan *pp;
struct GSF_PendingRequestData *prd;
struct GSF_RequestPlan *rp;
-
+
+ GNUNET_assert (NULL != cp);
GSF_connected_peer_get_identity_ (cp, &id);
pp = GNUNET_CONTAINER_multihashmap_get (plans,
&id.hashPubKey);
pp = GNUNET_malloc (sizeof (struct PeerPlan));
pp->priority_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
pp->delay_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ pp->cp = cp;
GNUNET_CONTAINER_multihashmap_put (plans,
&id.hashPubKey,
pp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
prd = GSF_pending_request_get_data_ (pr);
+ plan_count++;
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ 1,
+ GNUNET_NO);
rp = GNUNET_malloc (sizeof (struct GSF_RequestPlan));
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Planning transmission of query `%s' to peer `%s' (%p)\n",
+ GNUNET_h2s (&prd->query),
+ GNUNET_i2s (&id),
+ rp);
+#endif
rp->pr = pr;
GNUNET_CONTAINER_DLL_insert (prd->rp_head,
prd->rp_tail,
rp);
plan (pp, rp);
- if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
- {
- /* no request that should be done immediately, figure out delay */
- if (rp != GNUNET_CONTAINER_heap_peek (pp->delay_heap))
- return; /* did not change delay heap top, no need to do anything */
- GNUNET_assert (NULL == pp->pth);
- if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission),
- &schedule_peer_transmission,
- pp);
- return;
- }
-
- if (pp->pth != NULL)
- {
- if (rp != GNUNET_CONTAINER_heap_peek (pp->priority_heap))
- return; /* did not change priority heap top, no need to do anyhing */
- GSF_peer_transmit_cancel_ (pp->pth);
- pp->pth = NULL;
- }
- if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
- pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
- pp);
}
if (NULL != pp->pth)
GSF_peer_transmit_cancel_ (pp->pth);
if (GNUNET_SCHEDULER_NO_TASK != pp->task)
- GNUNET_SCHEDULER_cancel (pp->task);
+ {
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
{
prd = GSF_pending_request_get_data_ (rp->pr);
GNUNET_CONTAINER_DLL_remove (prd->rp_head,
prd->rp_tail,
rp);
+ plan_count--;
GNUNET_free (rp);
}
GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
GNUNET_CONTAINER_DLL_remove (prd->rp_head,
prd->rp_tail,
rp);
+ plan_count--;
GNUNET_free (rp);
}
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ plan_count,
+ GNUNET_NO);
+
GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
GNUNET_free (pp);
}
GNUNET_CONTAINER_DLL_remove (prd->rp_head,
prd->rp_tail,
rp);
+ plan_count--;
GNUNET_free (rp);
}
+ GNUNET_STATISTICS_set (GSF_stats,
+ gettext_noop ("# query plan entries"),
+ plan_count,
+ GNUNET_NO);
}