fixes for list queues
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_scheduler.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_scheduler.c
23  * @brief experimentation daemon: execute experiments
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34 /**
35  * An experiment is added during startup as not running NOT_RUNNING
36  *
37  * The scheduler then decides to schedule it and sends a request to the
38  * remote peer, if core cannot send since it is busy we wait for some time
39  * and change state to BUSY, if we can send we change to REQUESTED and wait
40  * for remote peers ACK.
41  *
42  * When we receive an ACK we change to STARTED and when scheduler decides that
43  * the experiment is finished we change to STOPPED.
44  */
45
46 enum ExperimentState
47 {
48         /* Experiment is added and waiting to be executed */
49         NOT_RUNNING,
50         /* Cannot send request to remote peer, core is busy*/
51         BUSY,
52         /* We requested experiment and wait for remote peer to ACK */
53         REQUESTED,
54         /* Experiment is running */
55         STARTED,
56         /* Experiment is done */
57         STOPPED
58 };
59
60 struct ScheduledExperiment {
61         struct ScheduledExperiment *next;
62         struct ScheduledExperiment *prev;
63
64         struct Experiment *e;
65         struct Node *n;
66         int state;
67         int outbound;
68         GNUNET_SCHEDULER_TaskIdentifier task;
69 };
70
71 struct ScheduledExperiment *waiting_in_head;
72 struct ScheduledExperiment *waiting_in_tail;
73
74 struct ScheduledExperiment *running_in_head;
75 struct ScheduledExperiment *running_in_tail;
76
77 struct ScheduledExperiment *waiting_out_head;
78 struct ScheduledExperiment *waiting_out_tail;
79
80 struct ScheduledExperiment *running_out_head;
81 struct ScheduledExperiment *running_out_tail;
82
83
84 static unsigned int experiments_scheduled;
85 static unsigned int experiments_running;
86 static unsigned int experiments_requested;
87
88
89 static struct ScheduledExperiment *
90 find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment *tail,
91                                                                  struct Node *n, struct Experiment *e, int outbound)
92 {
93         struct ScheduledExperiment *cur;
94         for (cur = head; NULL != cur; cur = cur->next)
95         {
96                 if ((cur->n == n) && (cur->e == e) && (cur->outbound == outbound)) /* Node and experiment are equal */
97                         break;
98         }
99         return cur;
100 }
101
102 static void
103 request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
104 {
105         struct ScheduledExperiment *se = cls;
106         se->task = GNUNET_SCHEDULER_NO_TASK;
107
108         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n",
109                         GNUNET_i2s (&se->n->id), se->e->name);
110
111         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
112         GNUNET_free (se);
113
114         /* Remove experiment */
115         GNUNET_assert (experiments_requested > 0);
116         experiments_requested --;
117         GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
118 }
119
120 static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
121 {
122         struct ScheduledExperiment *se = cls;
123         struct GNUNET_TIME_Relative start;
124         struct GNUNET_TIME_Relative end;
125
126         se->task = GNUNET_SCHEDULER_NO_TASK;
127
128         switch (se->state) {
129                 case NOT_RUNNING:
130                         /* Send START_ACK message */
131                         GED_nodes_send_start_ack (se->n, se->e);
132                         se->state = REQUESTED;
133                         /* Schedule to run */
134                         start = GNUNET_TIME_absolute_get_remaining(se->e->start);
135                         if (0 == start.rel_value)
136                                         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
137                         else
138                                         se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
139                         break;
140                 case REQUESTED:
141                 case STARTED:
142                         se->state = STARTED;
143                         /* Experiment is running */
144                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
145                                         "inbound", GNUNET_i2s (&se->n->id), se->e->name);
146
147                         /* do work here */
148
149                         /* Reschedule */
150                         end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
151                         if (0 == end.rel_value)
152                         {
153                                 se->state = STOPPED;
154                                 return; /* End of experiment is reached */
155                         }
156                         /* Reschedule */
157                         se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_inbound, se);
158                         break;
159                 case STOPPED:
160                         /* Experiment expired */
161                         break;
162                 default:
163                         break;
164         }
165
166 }
167
168 static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
169 {
170         struct ScheduledExperiment *se = cls;
171         struct GNUNET_TIME_Relative end;
172
173         se->task = GNUNET_SCHEDULER_NO_TASK;
174
175         switch (se->state) {
176                 case NOT_RUNNING:
177                         /* Send START message */
178                         GED_nodes_request_start (se->n, se->e);
179                         se->state = REQUESTED;
180                         se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
181                         experiments_requested ++;
182                         GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
183                         break;
184                 case REQUESTED:
185                         /* Expecting START_ACK */
186                         GNUNET_break (0);
187                         break;
188                 case STARTED:
189                         /* Experiment is running */
190                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
191                                         "outbound", GNUNET_i2s (&se->n->id), se->e->name);
192
193                         /* do work here */
194
195                         /* Reschedule */
196                         end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
197                         if (0 == end.rel_value)
198                         {
199                                 se->state = STOPPED;
200                                 return; /* End of experiment is reached */
201                         }
202                         /* Reschedule */
203                 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_outbound, se);
204                         break;
205                 case STOPPED:
206                         /* Experiment expired */
207                         break;
208                 default:
209                         break;
210         }
211 }
212
213
214 /**
215  * Handle a START message from a remote node
216  *
217  * @param n the node
218  * @param e the experiment
219  */
220 void
221 GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
222 {
223         struct ScheduledExperiment *se;
224
225         if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
226                  (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
227         {
228                 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s message from peer %s for experiment `%s'\n"),
229                                 "START", GNUNET_i2s (&n->id), e->name);
230                 GNUNET_break_op (0);
231                 return;
232         }
233
234         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
235                         "START", GNUNET_i2s (&n->id), e->name);
236
237         GED_scheduler_add (n, e, GNUNET_NO);
238 }
239
240 /**
241  * Handle a START_ACK message from a remote node
242  *
243  * @param n the node
244  * @param e the experiment
245  */
246 void
247 GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
248 {
249         struct ScheduledExperiment *se;
250
251         if (NULL == (se = find_experiment (waiting_out_head, waiting_out_tail, n, e, GNUNET_YES)))
252         {
253                 GNUNET_break (0);
254                 return;
255         }
256
257         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for requested experiment `%s'\n"),
258                         "START_ACK", GNUNET_i2s (&n->id), e->name);
259
260         if (GNUNET_SCHEDULER_NO_TASK != se->task)
261         {
262                 GNUNET_SCHEDULER_cancel (se->task); /* *Canceling timeout task */
263                 se->task = GNUNET_SCHEDULER_NO_TASK;
264         }
265
266         /* Remove from waiting list, add to running list */
267         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
268         GNUNET_CONTAINER_DLL_insert (running_out_head, running_out_tail, se);
269
270         /* Change state and schedule to run */
271         se->state = STARTED;
272         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
273 }
274
275
276 /**
277  * Handle a STOP message from a remote node
278  *
279  * @param n the node
280  * @param e the experiment
281  */
282 void
283 GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
284 {
285         struct ScheduledExperiment *se;
286
287         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
288                         "STOP", GNUNET_i2s (&n->id), e->name);
289
290         if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
291         {
292                 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for waiting experiment `%s'\n"),
293                                 "STOP", GNUNET_i2s (&n->id), e->name);
294         }
295
296         if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
297         {
298                 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for running experiment `%s'\n"),
299                                 "STOP", GNUNET_i2s (&n->id), e->name);
300         }
301
302 }
303
304 /**
305  * Add a new experiment for a node
306  *
307  * @param n the node
308  * @param e the experiment
309  * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
310  */
311 void
312 GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
313 {
314         struct ScheduledExperiment *se;
315         struct GNUNET_TIME_Relative start;
316         struct GNUNET_TIME_Relative end;
317
318         GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
319
320         start = GNUNET_TIME_absolute_get_remaining(e->start);
321         end = GNUNET_TIME_absolute_get_remaining(e->stop);
322         if (0 == end.rel_value)
323                         return; /* End of experiment is reached */
324
325         /* Add additional checks here if required */
326         se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
327         se->state = NOT_RUNNING;
328         se->outbound = outbound;
329         se->e = e;
330         se->n = n;
331
332         if (GNUNET_YES == outbound)
333         {
334                 if (0 == start.rel_value)
335                                 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
336                 else
337                                 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
338                 GNUNET_CONTAINER_DLL_insert (waiting_out_head, waiting_out_tail, se);
339         }
340         else
341         {
342                 if (0 == start.rel_value)
343                                 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
344                 else
345                                 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
346                 GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
347         }
348
349         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node to be scheduled\n",
350                         (GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
351         experiments_scheduled ++;
352         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
353
354 }
355
356 /**
357  * Start the scheduler component
358  */
359 void
360 GED_scheduler_start ()
361 {
362         experiments_requested = 0;
363         experiments_scheduled = 0;
364 }
365
366
367 /**
368  * Stop the scheduler component
369  */
370 void
371 GED_scheduler_stop ()
372 {
373         struct ScheduledExperiment *cur;
374         struct ScheduledExperiment *next;
375
376         next = waiting_in_head;
377         while (NULL != (cur = next))
378         {
379                         next = cur->next;
380                         GNUNET_CONTAINER_DLL_remove (waiting_in_head, waiting_in_tail, cur);
381                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
382                         {
383                                         GNUNET_SCHEDULER_cancel (cur->task);
384                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
385                         }
386                         GNUNET_free (cur);
387                         GNUNET_assert (experiments_scheduled > 0);
388                         experiments_scheduled --;
389                         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
390         }
391
392         next = running_in_head;
393         while (NULL != (cur = next))
394         {
395                         next = cur->next;
396                         GNUNET_CONTAINER_DLL_remove (running_in_head, running_in_tail, cur);
397                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
398                         {
399                                         GNUNET_SCHEDULER_cancel (cur->task);
400                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
401                         }
402                         GNUNET_free (cur);
403                         GNUNET_assert (experiments_running > 0);
404                         experiments_running --;
405                         GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
406         }
407
408         next = waiting_out_head;
409         while (NULL != (cur = next))
410         {
411                         next = cur->next;
412                         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, cur);
413                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
414                         {
415                                         GNUNET_SCHEDULER_cancel (cur->task);
416                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
417                         }
418                         GNUNET_free (cur);
419                         GNUNET_assert (experiments_scheduled > 0);
420                         experiments_scheduled --;
421                         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
422         }
423
424         next = running_out_head;
425         while (NULL != (cur = next))
426         {
427                         next = cur->next;
428                         GNUNET_CONTAINER_DLL_remove (running_out_head, running_out_tail, cur);
429                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
430                         {
431                                         GNUNET_SCHEDULER_cancel (cur->task);
432                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
433                         }
434                         GNUNET_free (cur);
435                         GNUNET_assert (experiments_running > 0);
436                         experiments_running --;
437                         GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
438         }
439 }
440
441 /* end of gnunet-daemon-experimentation_scheduler.c */