2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file experimentation/gnunet-daemon-experimentation_scheduler.c
23 * @brief experimentation daemon: execute experiments
24 * @author Christian Grothoff
25 * @author Matthias Wachs
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
35 * An experiment is added during startup as not running NOT_RUNNING
37 * The scheduler then decides to schedule it and sends a request to the
38 * remote peer, if core cannot send since it is busy we wait for some time
39 * and change state to BUSY, if we can send we change to REQUESTED and wait
40 * for remote peers ACK.
42 * When we receive an ACK we change to STARTED and when scheduler decides that
43 * the experiment is finished we change to STOPPED.
48 /* Experiment is added and waiting to be executed */
50 /* Cannot send request to remote peer, core is busy*/
52 /* We requested experiment and wait for remote peer to ACK */
54 /* Experiment is running */
56 /* Experiment is done */
60 struct ScheduledExperiment {
61 struct ScheduledExperiment *next;
62 struct ScheduledExperiment *prev;
68 GNUNET_SCHEDULER_TaskIdentifier task;
71 struct ScheduledExperiment *waiting_in_head;
72 struct ScheduledExperiment *waiting_in_tail;
74 struct ScheduledExperiment *running_in_head;
75 struct ScheduledExperiment *running_in_tail;
77 struct ScheduledExperiment *waiting_out_head;
78 struct ScheduledExperiment *waiting_out_tail;
80 struct ScheduledExperiment *running_out_head;
81 struct ScheduledExperiment *running_out_tail;
84 static unsigned int experiments_scheduled;
85 static unsigned int experiments_running;
86 static unsigned int experiments_requested;
89 static struct ScheduledExperiment *
90 find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment *tail,
91 struct Node *n, struct Experiment *e, int outbound)
93 struct ScheduledExperiment *cur;
94 for (cur = head; NULL != cur; cur = cur->next)
96 if ((cur->n == n) && (cur->e == e) && (cur->outbound == outbound)) /* Node and experiment are equal */
103 request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
105 struct ScheduledExperiment *se = cls;
106 se->task = GNUNET_SCHEDULER_NO_TASK;
108 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n",
109 GNUNET_i2s (&se->n->id), se->e->name);
111 GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
114 /* Remove experiment */
115 GNUNET_assert (experiments_requested > 0);
116 experiments_requested --;
117 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
120 static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
122 struct ScheduledExperiment *se = cls;
123 struct GNUNET_TIME_Relative start;
124 struct GNUNET_TIME_Relative end;
125 struct GNUNET_TIME_Relative backoff;
127 se->task = GNUNET_SCHEDULER_NO_TASK;
129 if (GNUNET_NO == GED_nodes_rts (se->n))
132 backoff = GNUNET_TIME_UNIT_SECONDS;
133 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
134 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
135 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
136 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se);
139 else if (BUSY == se->state)
140 se->state = NOT_RUNNING;
144 /* Send START_ACK message */
145 //GED_nodes_request_start (se->n, se->e);
146 se->state = REQUESTED;
147 /* Schedule to run */
148 start = GNUNET_TIME_absolute_get_remaining(se->e->start);
149 if (0 == start.rel_value)
150 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
152 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
155 /* Already requested */
158 /* Experiment is running */
159 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
160 GNUNET_i2s (&se->n->id), se->e->name);
165 end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
166 if (0 == end.rel_value)
169 return; /* End of experiment is reached */
172 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_inbound, se);
175 /* Experiment expired */
183 static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
185 struct ScheduledExperiment *se = cls;
186 struct GNUNET_TIME_Relative end;
187 struct GNUNET_TIME_Relative backoff;
189 se->task = GNUNET_SCHEDULER_NO_TASK;
191 if (GNUNET_NO == GED_nodes_rts (se->n))
193 /* Cannot send to peer, core is busy */
195 backoff = GNUNET_TIME_UNIT_SECONDS;
196 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
197 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
198 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
199 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se);
202 else if (BUSY == se->state)
203 se->state = NOT_RUNNING; /* Not busy anymore, can send */
207 /* Send START message */
208 GED_nodes_request_start (se->n, se->e);
209 se->state = REQUESTED;
210 se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
212 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending start request to peer `%s' for `%s'\n",
213 GNUNET_i2s (&se->n->id), se->e->name);
214 experiments_requested ++;
215 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
218 /* Expecting START_ACK */
222 /* Experiment is running */
223 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
224 GNUNET_i2s (&se->n->id), se->e->name);
229 end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
230 if (0 == end.rel_value)
233 return; /* End of experiment is reached */
236 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_outbound, se);
239 /* Experiment expired */
248 * Handle a START message from a remote node
251 * @param e the experiment
254 GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
256 struct ScheduledExperiment *se;
258 if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
259 (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
261 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s message from peer %s for experiment `%s'\n"),
262 "START", GNUNET_i2s (&n->id), e->name);
267 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
268 "START", GNUNET_i2s (&n->id), e->name);
270 GED_scheduler_add (n, e, GNUNET_NO);
274 * Handle a START_ACL message from a remote node
277 * @param e the experiment
280 GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
282 struct ScheduledExperiment *se;
284 if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
290 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for requested experiment `%s'\n"),
291 "START_ACK", GNUNET_i2s (&n->id), e->name);
293 if (GNUNET_SCHEDULER_NO_TASK != se->task)
294 GNUNET_SCHEDULER_cancel (se->task);
295 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
300 * Handle a STOP message from a remote node
303 * @param e the experiment
306 GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
308 struct ScheduledExperiment *se;
310 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
311 "STOP", GNUNET_i2s (&n->id), e->name);
313 if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
315 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for waiting experiment `%s'\n"),
316 "STOP", GNUNET_i2s (&n->id), e->name);
319 if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
321 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for running experiment `%s'\n"),
322 "STOP", GNUNET_i2s (&n->id), e->name);
328 * Add a new experiment for a node
331 * @param e the experiment
332 * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
335 GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
337 struct ScheduledExperiment *se;
338 struct GNUNET_TIME_Relative start;
339 struct GNUNET_TIME_Relative end;
341 GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
343 start = GNUNET_TIME_absolute_get_remaining(e->start);
344 end = GNUNET_TIME_absolute_get_remaining(e->stop);
345 if (0 == end.rel_value)
346 return; /* End of experiment is reached */
348 /* Add additional checks here if required */
349 se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
350 se->state = NOT_RUNNING;
351 se->outbound = outbound;
355 if (GNUNET_YES == outbound)
357 if (0 == start.rel_value)
358 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
360 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
361 GNUNET_CONTAINER_DLL_insert (waiting_out_head, waiting_out_tail, se);
365 if (0 == start.rel_value)
366 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
368 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
369 GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
372 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node to be scheduled\n",
373 (GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
374 experiments_scheduled ++;
375 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
380 * Start the scheduler component
383 GED_scheduler_start ()
385 experiments_requested = 0;
386 experiments_scheduled = 0;
391 * Stop the scheduler component
394 GED_scheduler_stop ()
396 struct ScheduledExperiment *cur;
397 struct ScheduledExperiment *next;
399 next = waiting_in_head;
400 while (NULL != (cur = next))
403 GNUNET_CONTAINER_DLL_remove (waiting_in_head, waiting_in_tail, cur);
404 if (GNUNET_SCHEDULER_NO_TASK != cur->task)
406 GNUNET_SCHEDULER_cancel (cur->task);
407 cur->task = GNUNET_SCHEDULER_NO_TASK;
410 GNUNET_assert (experiments_scheduled > 0);
411 experiments_scheduled --;
412 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
415 next = running_in_head;
416 while (NULL != (cur = next))
419 GNUNET_CONTAINER_DLL_remove (running_in_head, running_in_tail, cur);
420 if (GNUNET_SCHEDULER_NO_TASK != cur->task)
422 GNUNET_SCHEDULER_cancel (cur->task);
423 cur->task = GNUNET_SCHEDULER_NO_TASK;
426 GNUNET_assert (experiments_running > 0);
427 experiments_running --;
428 GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
432 /* end of gnunet-daemon-experimentation_scheduler.c */