changes to scheduler
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_scheduler.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_scheduler.c
23  * @brief experimentation daemon: execute experiments
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34 /**
35  * An experiment is added during startup as not running NOT_RUNNING
36  *
37  * The scheduler then decides to schedule it and sends a request to the
38  * remote peer, if core cannot send since it is busy we wait for some time
39  * and change state to BUSY, if we can send we change to REQUESTED and wait
40  * for remote peers ACK.
41  *
42  * When we receive an ACK we change to STARTED and when scheduler decides that
43  * the experiment is finished we change to STOPPED.
44  */
45
46 enum ExperimentState
47 {
48         /* Experiment is added and waiting to be executed */
49         NOT_RUNNING,
50         /* Cannot send request to remote peer, core is busy*/
51         BUSY,
52         /* We requested experiment and wait for remote peer to ACK */
53         REQUESTED,
54         /* Experiment is running */
55         STARTED,
56         /* Experiment is done */
57         STOPPED
58 };
59
60 struct ScheduledExperiment {
61         struct ScheduledExperiment *next;
62         struct ScheduledExperiment *prev;
63
64         struct Experiment *e;
65         struct Node *n;
66         int state;
67         int outbound;
68         GNUNET_SCHEDULER_TaskIdentifier task;
69 };
70
71 struct ScheduledExperiment *waiting_in_head;
72 struct ScheduledExperiment *waiting_in_tail;
73
74 struct ScheduledExperiment *running_in_head;
75 struct ScheduledExperiment *running_in_tail;
76
77 struct ScheduledExperiment *waiting_out_head;
78 struct ScheduledExperiment *waiting_out_tail;
79
80 struct ScheduledExperiment *running_out_head;
81 struct ScheduledExperiment *running_out_tail;
82
83
84 static unsigned int experiments_scheduled;
85 static unsigned int experiments_running;
86 static unsigned int experiments_requested;
87
88
89 static struct ScheduledExperiment *
90 find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment *tail,
91                                                                  struct Node *n, struct Experiment *e, int outbound)
92 {
93         struct ScheduledExperiment *cur;
94         for (cur = head; NULL != cur; cur = cur->next)
95         {
96                 if ((cur->n == n) && (cur->e == e) && (cur->outbound == outbound)) /* Node and experiment are equal */
97                         break;
98         }
99         return cur;
100 }
101
102 static void
103 request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
104 {
105         struct ScheduledExperiment *se = cls;
106         se->task = GNUNET_SCHEDULER_NO_TASK;
107
108         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to request for experiment `%s'\n",
109                         GNUNET_i2s (&se->n->id), se->e->name);
110
111         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
112         GNUNET_free (se);
113
114         /* Remove experiment */
115         GNUNET_assert (experiments_requested > 0);
116         experiments_requested --;
117         GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
118 }
119
120 static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
121 {
122         struct ScheduledExperiment *se = cls;
123         struct GNUNET_TIME_Relative start;
124         struct GNUNET_TIME_Relative end;
125         struct GNUNET_TIME_Relative backoff;
126
127         se->task = GNUNET_SCHEDULER_NO_TASK;
128
129         if (GNUNET_NO == GED_nodes_rts (se->n))
130         {
131                 se->state = BUSY;
132                 backoff = GNUNET_TIME_UNIT_SECONDS;
133                 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
134                 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
135                                 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
136                 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se);
137                 return;
138         }
139         else if (BUSY == se->state)
140                 se->state = NOT_RUNNING;
141
142         switch (se->state) {
143                 case NOT_RUNNING:
144                         /* Send START_ACK message */
145                         //GED_nodes_request_start (se->n, se->e);
146                         se->state = REQUESTED;
147                         /* Schedule to run */
148                         start = GNUNET_TIME_absolute_get_remaining(se->e->start);
149                         if (0 == start.rel_value)
150                                         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
151                         else
152                                         se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
153                         break;
154                 case REQUESTED:
155                         /* Already requested */
156                         se->state = STARTED;
157                 case STARTED:
158                         /* Experiment is running */
159                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
160                                         GNUNET_i2s (&se->n->id), se->e->name);
161
162                         /* do work here */
163
164                         /* Reschedule */
165                         end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
166                         if (0 == end.rel_value)
167                         {
168                                 se->state = STOPPED;
169                                 return; /* End of experiment is reached */
170                         }
171                         /* Reschedule */
172                         se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_inbound, se);
173                         break;
174                 case STOPPED:
175                         /* Experiment expired */
176                         break;
177                 default:
178                         break;
179         }
180
181 }
182
183 static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
184 {
185         struct ScheduledExperiment *se = cls;
186         struct GNUNET_TIME_Relative end;
187         struct GNUNET_TIME_Relative backoff;
188
189         se->task = GNUNET_SCHEDULER_NO_TASK;
190
191         if (GNUNET_NO == GED_nodes_rts (se->n))
192         {
193                 /* Cannot send to peer, core is busy */
194                 se->state = BUSY;
195                 backoff = GNUNET_TIME_UNIT_SECONDS;
196                 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
197                 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
198                                 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
199                 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se);
200                 return;
201         }
202         else if (BUSY == se->state)
203                         se->state = NOT_RUNNING; /* Not busy anymore, can send */
204
205         switch (se->state) {
206                 case NOT_RUNNING:
207                         /* Send START message */
208                         GED_nodes_request_start (se->n, se->e);
209                         se->state = REQUESTED;
210                         se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
211
212                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending start request to peer `%s' for `%s'\n",
213                                         GNUNET_i2s (&se->n->id), se->e->name);
214                         experiments_requested ++;
215                         GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
216                         break;
217                 case REQUESTED:
218                         /* Expecting START_ACK */
219                         GNUNET_break (0);
220                         break;
221                 case STARTED:
222                         /* Experiment is running */
223                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n",
224                                         GNUNET_i2s (&se->n->id), se->e->name);
225
226                         /* do work here */
227
228                         /* Reschedule */
229                         end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
230                         if (0 == end.rel_value)
231                         {
232                                 se->state = STOPPED;
233                                 return; /* End of experiment is reached */
234                         }
235                         /* Reschedule */
236                 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_outbound, se);
237                         break;
238                 case STOPPED:
239                         /* Experiment expired */
240                         break;
241                 default:
242                         break;
243         }
244 }
245
246
247 /**
248  * Handle a START message from a remote node
249  *
250  * @param n the node
251  * @param e the experiment
252  */
253 void
254 GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
255 {
256         struct ScheduledExperiment *se;
257
258         if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
259                  (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
260         {
261                 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s message from peer %s for experiment `%s'\n"),
262                                 "START", GNUNET_i2s (&n->id), e->name);
263                 GNUNET_break_op (0);
264                 return;
265         }
266
267         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
268                         "START", GNUNET_i2s (&n->id), e->name);
269
270         GED_scheduler_add (n, e, GNUNET_NO);
271 }
272
273 /**
274  * Handle a START_ACL message from a remote node
275  *
276  * @param n the node
277  * @param e the experiment
278  */
279 void
280 GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
281 {
282         struct ScheduledExperiment *se;
283
284         if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
285         {
286                 GNUNET_break (0);
287                 return;
288         }
289
290         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for requested experiment `%s'\n"),
291                         "START_ACK", GNUNET_i2s (&n->id), e->name);
292
293         if (GNUNET_SCHEDULER_NO_TASK != se->task)
294                 GNUNET_SCHEDULER_cancel (se->task);
295         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
296 }
297
298
299 /**
300  * Handle a STOP message from a remote node
301  *
302  * @param n the node
303  * @param e the experiment
304  */
305 void
306 GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
307 {
308         struct ScheduledExperiment *se;
309
310         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
311                         "STOP", GNUNET_i2s (&n->id), e->name);
312
313         if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
314         {
315                 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for waiting experiment `%s'\n"),
316                                 "STOP", GNUNET_i2s (&n->id), e->name);
317         }
318
319         if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
320         {
321                 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for running experiment `%s'\n"),
322                                 "STOP", GNUNET_i2s (&n->id), e->name);
323         }
324
325 }
326
327 /**
328  * Add a new experiment for a node
329  *
330  * @param n the node
331  * @param e the experiment
332  * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
333  */
334 void
335 GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
336 {
337         struct ScheduledExperiment *se;
338         struct GNUNET_TIME_Relative start;
339         struct GNUNET_TIME_Relative end;
340
341         GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
342
343         start = GNUNET_TIME_absolute_get_remaining(e->start);
344         end = GNUNET_TIME_absolute_get_remaining(e->stop);
345         if (0 == end.rel_value)
346                         return; /* End of experiment is reached */
347
348         /* Add additional checks here if required */
349         se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
350         se->state = NOT_RUNNING;
351         se->outbound = outbound;
352         se->e = e;
353         se->n = n;
354
355         if (GNUNET_YES == outbound)
356         {
357                 if (0 == start.rel_value)
358                                 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
359                 else
360                                 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
361                 GNUNET_CONTAINER_DLL_insert (waiting_out_head, waiting_out_tail, se);
362         }
363         else
364         {
365                 if (0 == start.rel_value)
366                                 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
367                 else
368                                 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
369                 GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
370         }
371
372         GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node to be scheduled\n",
373                         (GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
374         experiments_scheduled ++;
375         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
376
377 }
378
379 /**
380  * Start the scheduler component
381  */
382 void
383 GED_scheduler_start ()
384 {
385         experiments_requested = 0;
386         experiments_scheduled = 0;
387 }
388
389
390 /**
391  * Stop the scheduler component
392  */
393 void
394 GED_scheduler_stop ()
395 {
396         struct ScheduledExperiment *cur;
397         struct ScheduledExperiment *next;
398
399         next = waiting_in_head;
400         while (NULL != (cur = next))
401         {
402                         next = cur->next;
403                         GNUNET_CONTAINER_DLL_remove (waiting_in_head, waiting_in_tail, cur);
404                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
405                         {
406                                         GNUNET_SCHEDULER_cancel (cur->task);
407                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
408                         }
409                         GNUNET_free (cur);
410                         GNUNET_assert (experiments_scheduled > 0);
411                         experiments_scheduled --;
412                         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
413         }
414
415         next = running_in_head;
416         while (NULL != (cur = next))
417         {
418                         next = cur->next;
419                         GNUNET_CONTAINER_DLL_remove (running_in_head, running_in_tail, cur);
420                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
421                         {
422                                         GNUNET_SCHEDULER_cancel (cur->task);
423                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
424                         }
425                         GNUNET_free (cur);
426                         GNUNET_assert (experiments_running > 0);
427                         experiments_running --;
428                         GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
429         }
430 }
431
432 /* end of gnunet-daemon-experimentation_scheduler.c */