- remove adjust
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_scheduler.c
index de5c806140e8e60630e018acbd995272103d47dc..c13434e9be88c4bda8bffd31d7e7ad2602da39c0 100644 (file)
@@ -25,7 +25,6 @@
  * @author Matthias Wachs
  */
 #include "platform.h"
-#include "gnunet_getopt_lib.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_core_service.h"
 #include "gnunet_statistics_service.h"
@@ -82,7 +81,8 @@ struct ScheduledExperiment *running_out_tail;
 
 
 static unsigned int experiments_scheduled;
-static unsigned int experiments_running;
+static unsigned int experiments_outbound_running;
+static unsigned int experiments_inbound_running;
 static unsigned int experiments_requested;
 
 
@@ -105,7 +105,7 @@ request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
        struct ScheduledExperiment *se = cls;
        se->task = GNUNET_SCHEDULER_NO_TASK;
 
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n",
+       GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Peer `%s' did not respond to request for experiment `%s'\n"),
                        GNUNET_i2s (&se->n->id), se->e->name);
 
        GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
@@ -122,48 +122,39 @@ static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_Task
        struct ScheduledExperiment *se = cls;
        struct GNUNET_TIME_Relative start;
        struct GNUNET_TIME_Relative end;
-       struct GNUNET_TIME_Relative backoff;
 
        se->task = GNUNET_SCHEDULER_NO_TASK;
 
-       if (GNUNET_NO == GED_nodes_rts (se->n))
-       {
-               se->state = BUSY;
-               backoff = GNUNET_TIME_UNIT_SECONDS;
-               backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
-                               GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
-               se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se);
-               return;
-       }
-       else if (BUSY == se->state)
-               se->state = NOT_RUNNING;
-
        switch (se->state) {
                case NOT_RUNNING:
                        /* Send START_ACK message */
-                       //GED_nodes_request_start (se->n, se->e);
+                       GED_nodes_send_start_ack (se->n, se->e);
                        se->state = REQUESTED;
                        /* Schedule to run */
                        start = GNUNET_TIME_absolute_get_remaining(se->e->start);
-                       if (0 == start.rel_value)
+                       if (0 == start.rel_value_us)
                                        se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
                        else
                                        se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
                        break;
                case REQUESTED:
-                       /* Already requested */
+                       experiments_inbound_running ++;
+                       GNUNET_STATISTICS_set (GED_stats, "# experiments inbound running", experiments_inbound_running, GNUNET_NO);
+                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting inbound experiment `%s' with peer `%s'\n"),
+                                       se->e->name, GNUNET_i2s (&se->n->id));
                        se->state = STARTED;
+                       se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
+                       break;
                case STARTED:
                        /* Experiment is running */
-                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
-                                       GNUNET_i2s (&se->n->id), se->e->name);
+                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
+                                       "inbound", GNUNET_i2s (&se->n->id), se->e->name);
 
                        /* do work here */
 
                        /* Reschedule */
                        end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
-                       if (0 == end.rel_value)
+                       if (0 == end.rel_value_us)
                        {
                                se->state = STOPPED;
                                return; /* End of experiment is reached */
@@ -184,33 +175,15 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas
 {
        struct ScheduledExperiment *se = cls;
        struct GNUNET_TIME_Relative end;
-       struct GNUNET_TIME_Relative backoff;
 
        se->task = GNUNET_SCHEDULER_NO_TASK;
 
-       if (GNUNET_NO == GED_nodes_rts (se->n))
-       {
-               /* Cannot send to peer, core is busy */
-               se->state = BUSY;
-               backoff = GNUNET_TIME_UNIT_SECONDS;
-               backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
-               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
-                               GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
-               se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se);
-               return;
-       }
-       else if (BUSY == se->state)
-                       se->state = NOT_RUNNING; /* Not busy anymore, can send */
-
        switch (se->state) {
                case NOT_RUNNING:
                        /* Send START message */
-                       GED_nodes_request_start (se->n, se->e);
+                       GED_nodes_send_start (se->n, se->e);
                        se->state = REQUESTED;
                        se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
-
-                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending start request to peer `%s' for `%s'\n",
-                                       GNUNET_i2s (&se->n->id), se->e->name);
                        experiments_requested ++;
                        GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
                        break;
@@ -220,14 +193,14 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas
                        break;
                case STARTED:
                        /* Experiment is running */
-                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
-                                       GNUNET_i2s (&se->n->id), se->e->name);
+                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
+                                       "outbound", GNUNET_i2s (&se->n->id), se->e->name);
 
                        /* do work here */
 
                        /* Reschedule */
                        end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
-                       if (0 == end.rel_value)
+                       if (0 == end.rel_value_us)
                        {
                                se->state = STOPPED;
                                return; /* End of experiment is reached */
@@ -253,25 +226,22 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas
 void
 GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
 {
-       struct ScheduledExperiment *se;
-
-       if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
-                (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
-       {
-               GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s message from peer %s for experiment `%s'\n"),
-                               "START", GNUNET_i2s (&n->id), e->name);
-               GNUNET_break_op (0);
-               return;
-       }
-
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
-                       "START", GNUNET_i2s (&n->id), e->name);
-
-       GED_scheduler_add (n, e, GNUNET_NO);
+  if ((NULL != find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)) ||
+      (NULL != find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received %s message from peer %s for experiment `%s'\n",
+             "START", GNUNET_i2s (&n->id), e->name);
+  GED_scheduler_add (n, e, GNUNET_NO);
 }
 
+
 /**
- * Handle a START_ACL message from a remote node
+ * Handle a START_ACK message from a remote node
  *
  * @param n the node
  * @param e the experiment
@@ -281,17 +251,31 @@ GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
 {
        struct ScheduledExperiment *se;
 
-       if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
+       if (NULL == (se = find_experiment (waiting_out_head, waiting_out_tail, n, e, GNUNET_YES)))
        {
                GNUNET_break (0);
                return;
        }
 
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for requested experiment `%s'\n"),
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for requested experiment `%s'\n",
                        "START_ACK", GNUNET_i2s (&n->id), e->name);
 
        if (GNUNET_SCHEDULER_NO_TASK != se->task)
-               GNUNET_SCHEDULER_cancel (se->task);
+       {
+               GNUNET_SCHEDULER_cancel (se->task); /* *Canceling timeout task */
+               se->task = GNUNET_SCHEDULER_NO_TASK;
+       }
+
+       /* Remove from waiting list, add to running list */
+       GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
+       GNUNET_CONTAINER_DLL_insert (running_out_head, running_out_tail, se);
+
+       /* Change state and schedule to run */
+       experiments_outbound_running ++;
+       GNUNET_STATISTICS_set (GED_stats, "# experiments outbound running", experiments_outbound_running, GNUNET_NO);
+       GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting outbound experiment `%s' with peer `%s'\n"),
+                       e->name, GNUNET_i2s (&n->id));
+       se->state = STARTED;
        se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
 }
 
@@ -307,18 +291,18 @@ GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
 {
        struct ScheduledExperiment *se;
 
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _("Received %s message from peer %s for experiment `%s'\n"),
                        "STOP", GNUNET_i2s (&n->id), e->name);
 
        if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
        {
-               GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for waiting experiment `%s'\n"),
+               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for waiting experiment `%s'\n",
                                "STOP", GNUNET_i2s (&n->id), e->name);
        }
 
        if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
        {
-               GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for running experiment `%s'\n"),
+               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for running experiment `%s'\n",
                                "STOP", GNUNET_i2s (&n->id), e->name);
        }
 
@@ -342,11 +326,11 @@ GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
 
        start = GNUNET_TIME_absolute_get_remaining(e->start);
        end = GNUNET_TIME_absolute_get_remaining(e->stop);
-       if (0 == end.rel_value)
+       if (0 == end.rel_value_us)
                        return; /* End of experiment is reached */
 
        /* Add additional checks here if required */
-       se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
+       se = GNUNET_new (struct ScheduledExperiment);
        se->state = NOT_RUNNING;
        se->outbound = outbound;
        se->e = e;
@@ -354,7 +338,7 @@ GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
 
        if (GNUNET_YES == outbound)
        {
-               if (0 == start.rel_value)
+         if (0 == start.rel_value_us)
                                se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
                else
                                se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
@@ -362,14 +346,14 @@ GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
        }
        else
        {
-               if (0 == start.rel_value)
+               if (0 == start.rel_value_us)
                                se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
                else
                                se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
                GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
        }
 
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node to be scheduled\n",
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Added %s experiment `%s' for node to be scheduled\n",
                        (GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
        experiments_scheduled ++;
        GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
@@ -423,9 +407,41 @@ GED_scheduler_stop ()
                                        cur->task = GNUNET_SCHEDULER_NO_TASK;
                        }
                        GNUNET_free (cur);
-                       GNUNET_assert (experiments_running > 0);
-                       experiments_running --;
-                       GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
+                       GNUNET_assert (experiments_outbound_running > 0);
+                       experiments_inbound_running --;
+                       GNUNET_STATISTICS_set (GED_stats, "# experiments inbound running", experiments_inbound_running, GNUNET_NO);
+       }
+
+       next = waiting_out_head;
+       while (NULL != (cur = next))
+       {
+                       next = cur->next;
+                       GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, cur);
+                       if (GNUNET_SCHEDULER_NO_TASK != cur->task)
+                       {
+                                       GNUNET_SCHEDULER_cancel (cur->task);
+                                       cur->task = GNUNET_SCHEDULER_NO_TASK;
+                       }
+                       GNUNET_free (cur);
+                       GNUNET_assert (experiments_scheduled > 0);
+                       experiments_scheduled --;
+                       GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
+       }
+
+       next = running_out_head;
+       while (NULL != (cur = next))
+       {
+                       next = cur->next;
+                       GNUNET_CONTAINER_DLL_remove (running_out_head, running_out_tail, cur);
+                       if (GNUNET_SCHEDULER_NO_TASK != cur->task)
+                       {
+                                       GNUNET_SCHEDULER_cancel (cur->task);
+                                       cur->task = GNUNET_SCHEDULER_NO_TASK;
+                       }
+                       GNUNET_free (cur);
+                       GNUNET_assert (experiments_outbound_running > 0);
+                       experiments_outbound_running --;
+                       GNUNET_STATISTICS_set (GED_stats, "# experiments outbound running", experiments_outbound_running, GNUNET_NO);
        }
 }