* @author Matthias Wachs
*/
#include "platform.h"
-#include "gnunet_getopt_lib.h"
#include "gnunet_util_lib.h"
#include "gnunet_core_service.h"
#include "gnunet_statistics_service.h"
static unsigned int experiments_scheduled;
-static unsigned int experiments_running;
+static unsigned int experiments_outbound_running;
+static unsigned int experiments_inbound_running;
static unsigned int experiments_requested;
struct ScheduledExperiment *se = cls;
se->task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Peer `%s' did not respond to request for experiment `%s'\n"),
GNUNET_i2s (&se->n->id), se->e->name);
GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
struct ScheduledExperiment *se = cls;
struct GNUNET_TIME_Relative start;
struct GNUNET_TIME_Relative end;
- struct GNUNET_TIME_Relative backoff;
se->task = GNUNET_SCHEDULER_NO_TASK;
- if (GNUNET_NO == GED_nodes_rts (se->n))
- {
- se->state = BUSY;
- backoff = GNUNET_TIME_UNIT_SECONDS;
- backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
- GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
- se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se);
- return;
- }
- else if (BUSY == se->state)
- se->state = NOT_RUNNING;
-
switch (se->state) {
case NOT_RUNNING:
/* Send START_ACK message */
- //GED_nodes_request_start (se->n, se->e);
+ GED_nodes_send_start_ack (se->n, se->e);
se->state = REQUESTED;
/* Schedule to run */
start = GNUNET_TIME_absolute_get_remaining(se->e->start);
- if (0 == start.rel_value)
+ if (0 == start.rel_value_us)
se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
else
se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
break;
case REQUESTED:
- /* Already requested */
+ experiments_inbound_running ++;
+ GNUNET_STATISTICS_set (GED_stats, "# experiments inbound running", experiments_inbound_running, GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting inbound experiment `%s' with peer `%s'\n"),
+ se->e->name, GNUNET_i2s (&se->n->id));
se->state = STARTED;
+ se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
+ break;
case STARTED:
/* Experiment is running */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
- GNUNET_i2s (&se->n->id), se->e->name);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
+ "inbound", GNUNET_i2s (&se->n->id), se->e->name);
/* do work here */
/* Reschedule */
end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
- if (0 == end.rel_value)
+ if (0 == end.rel_value_us)
{
se->state = STOPPED;
return; /* End of experiment is reached */
{
struct ScheduledExperiment *se = cls;
struct GNUNET_TIME_Relative end;
- struct GNUNET_TIME_Relative backoff;
se->task = GNUNET_SCHEDULER_NO_TASK;
- if (GNUNET_NO == GED_nodes_rts (se->n))
- {
- /* Cannot send to peer, core is busy */
- se->state = BUSY;
- backoff = GNUNET_TIME_UNIT_SECONDS;
- backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
- GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
- se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se);
- return;
- }
- else if (BUSY == se->state)
- se->state = NOT_RUNNING; /* Not busy anymore, can send */
-
switch (se->state) {
case NOT_RUNNING:
/* Send START message */
- GED_nodes_request_start (se->n, se->e);
+ GED_nodes_send_start (se->n, se->e);
se->state = REQUESTED;
se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending start request to peer `%s' for `%s'\n",
- GNUNET_i2s (&se->n->id), se->e->name);
experiments_requested ++;
GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
break;
break;
case STARTED:
/* Experiment is running */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
- GNUNET_i2s (&se->n->id), se->e->name);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
+ "outbound", GNUNET_i2s (&se->n->id), se->e->name);
/* do work here */
/* Reschedule */
end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
- if (0 == end.rel_value)
+ if (0 == end.rel_value_us)
{
se->state = STOPPED;
return; /* End of experiment is reached */
void
GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
{
- struct ScheduledExperiment *se;
-
- if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
- (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s message from peer %s for experiment `%s'\n"),
- "START", GNUNET_i2s (&n->id), e->name);
- GNUNET_break_op (0);
- return;
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
- "START", GNUNET_i2s (&n->id), e->name);
-
- GED_scheduler_add (n, e, GNUNET_NO);
+ if ((NULL != find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)) ||
+ (NULL != find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %s message from peer %s for experiment `%s'\n",
+ "START", GNUNET_i2s (&n->id), e->name);
+ GED_scheduler_add (n, e, GNUNET_NO);
}
+
/**
- * Handle a START_ACL message from a remote node
+ * Handle a START_ACK message from a remote node
*
* @param n the node
* @param e the experiment
{
struct ScheduledExperiment *se;
- if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
+ if (NULL == (se = find_experiment (waiting_out_head, waiting_out_tail, n, e, GNUNET_YES)))
{
GNUNET_break (0);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for requested experiment `%s'\n"),
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for requested experiment `%s'\n",
"START_ACK", GNUNET_i2s (&n->id), e->name);
if (GNUNET_SCHEDULER_NO_TASK != se->task)
- GNUNET_SCHEDULER_cancel (se->task);
+ {
+ GNUNET_SCHEDULER_cancel (se->task); /* *Canceling timeout task */
+ se->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ /* Remove from waiting list, add to running list */
+ GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
+ GNUNET_CONTAINER_DLL_insert (running_out_head, running_out_tail, se);
+
+ /* Change state and schedule to run */
+ experiments_outbound_running ++;
+ GNUNET_STATISTICS_set (GED_stats, "# experiments outbound running", experiments_outbound_running, GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting outbound experiment `%s' with peer `%s'\n"),
+ e->name, GNUNET_i2s (&n->id));
+ se->state = STARTED;
se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
}
{
struct ScheduledExperiment *se;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _("Received %s message from peer %s for experiment `%s'\n"),
"STOP", GNUNET_i2s (&n->id), e->name);
if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for waiting experiment `%s'\n"),
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for waiting experiment `%s'\n",
"STOP", GNUNET_i2s (&n->id), e->name);
}
if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for running experiment `%s'\n"),
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for running experiment `%s'\n",
"STOP", GNUNET_i2s (&n->id), e->name);
}
start = GNUNET_TIME_absolute_get_remaining(e->start);
end = GNUNET_TIME_absolute_get_remaining(e->stop);
- if (0 == end.rel_value)
+ if (0 == end.rel_value_us)
return; /* End of experiment is reached */
/* Add additional checks here if required */
if (GNUNET_YES == outbound)
{
- if (0 == start.rel_value)
+ if (0 == start.rel_value_us)
se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
else
se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
}
else
{
- if (0 == start.rel_value)
+ if (0 == start.rel_value_us)
se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
else
se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node to be scheduled\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Added %s experiment `%s' for node to be scheduled\n",
(GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
experiments_scheduled ++;
GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
cur->task = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_free (cur);
- GNUNET_assert (experiments_running > 0);
- experiments_running --;
- GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
+ GNUNET_assert (experiments_outbound_running > 0);
+ experiments_inbound_running --;
+ GNUNET_STATISTICS_set (GED_stats, "# experiments inbound running", experiments_inbound_running, GNUNET_NO);
+ }
+
+ next = waiting_out_head;
+ while (NULL != (cur = next))
+ {
+ next = cur->next;
+ GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, cur);
+ if (GNUNET_SCHEDULER_NO_TASK != cur->task)
+ {
+ GNUNET_SCHEDULER_cancel (cur->task);
+ cur->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (cur);
+ GNUNET_assert (experiments_scheduled > 0);
+ experiments_scheduled --;
+ GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
+ }
+
+ next = running_out_head;
+ while (NULL != (cur = next))
+ {
+ next = cur->next;
+ GNUNET_CONTAINER_DLL_remove (running_out_head, running_out_tail, cur);
+ if (GNUNET_SCHEDULER_NO_TASK != cur->task)
+ {
+ GNUNET_SCHEDULER_cancel (cur->task);
+ cur->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (cur);
+ GNUNET_assert (experiments_outbound_running > 0);
+ experiments_outbound_running --;
+ GNUNET_STATISTICS_set (GED_stats, "# experiments outbound running", experiments_outbound_running, GNUNET_NO);
}
}