changing time measurement from milliseconds to microseconds
[oweals/gnunet.git] / src / experimentation / gnunet-daemon-experimentation_scheduler.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file experimentation/gnunet-daemon-experimentation_scheduler.c
23  * @brief experimentation daemon: execute experiments
24  * @author Christian Grothoff
25  * @author Matthias Wachs
26  */
27 #include "platform.h"
28 #include "gnunet_getopt_lib.h"
29 #include "gnunet_util_lib.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet-daemon-experimentation.h"
33
34 /**
35  * An experiment is added during startup as not running NOT_RUNNING
36  *
37  * The scheduler then decides to schedule it and sends a request to the
38  * remote peer, if core cannot send since it is busy we wait for some time
39  * and change state to BUSY, if we can send we change to REQUESTED and wait
40  * for remote peers ACK.
41  *
42  * When we receive an ACK we change to STARTED and when scheduler decides that
43  * the experiment is finished we change to STOPPED.
44  */
45
46 enum ExperimentState
47 {
48         /* Experiment is added and waiting to be executed */
49         NOT_RUNNING,
50         /* Cannot send request to remote peer, core is busy*/
51         BUSY,
52         /* We requested experiment and wait for remote peer to ACK */
53         REQUESTED,
54         /* Experiment is running */
55         STARTED,
56         /* Experiment is done */
57         STOPPED
58 };
59
60 struct ScheduledExperiment {
61         struct ScheduledExperiment *next;
62         struct ScheduledExperiment *prev;
63
64         struct Experiment *e;
65         struct Node *n;
66         int state;
67         int outbound;
68         GNUNET_SCHEDULER_TaskIdentifier task;
69 };
70
71 struct ScheduledExperiment *waiting_in_head;
72 struct ScheduledExperiment *waiting_in_tail;
73
74 struct ScheduledExperiment *running_in_head;
75 struct ScheduledExperiment *running_in_tail;
76
77 struct ScheduledExperiment *waiting_out_head;
78 struct ScheduledExperiment *waiting_out_tail;
79
80 struct ScheduledExperiment *running_out_head;
81 struct ScheduledExperiment *running_out_tail;
82
83
84 static unsigned int experiments_scheduled;
85 static unsigned int experiments_outbound_running;
86 static unsigned int experiments_inbound_running;
87 static unsigned int experiments_requested;
88
89
90 static struct ScheduledExperiment *
91 find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment *tail,
92                                                                  struct Node *n, struct Experiment *e, int outbound)
93 {
94         struct ScheduledExperiment *cur;
95         for (cur = head; NULL != cur; cur = cur->next)
96         {
97                 if ((cur->n == n) && (cur->e == e) && (cur->outbound == outbound)) /* Node and experiment are equal */
98                         break;
99         }
100         return cur;
101 }
102
103 static void
104 request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
105 {
106         struct ScheduledExperiment *se = cls;
107         se->task = GNUNET_SCHEDULER_NO_TASK;
108
109         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Peer `%s' did not respond to request for experiment `%s'\n"),
110                         GNUNET_i2s (&se->n->id), se->e->name);
111
112         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
113         GNUNET_free (se);
114
115         /* Remove experiment */
116         GNUNET_assert (experiments_requested > 0);
117         experiments_requested --;
118         GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
119 }
120
121 static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
122 {
123         struct ScheduledExperiment *se = cls;
124         struct GNUNET_TIME_Relative start;
125         struct GNUNET_TIME_Relative end;
126
127         se->task = GNUNET_SCHEDULER_NO_TASK;
128
129         switch (se->state) {
130                 case NOT_RUNNING:
131                         /* Send START_ACK message */
132                         GED_nodes_send_start_ack (se->n, se->e);
133                         se->state = REQUESTED;
134                         /* Schedule to run */
135                         start = GNUNET_TIME_absolute_get_remaining(se->e->start);
136                         if (0 == start.rel_value_us)
137                                         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
138                         else
139                                         se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
140                         break;
141                 case REQUESTED:
142                         experiments_inbound_running ++;
143                         GNUNET_STATISTICS_set (GED_stats, "# experiments inbound running", experiments_inbound_running, GNUNET_NO);
144                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting inbound experiment `%s' with peer `%s'\n"),
145                                         se->e->name, GNUNET_i2s (&se->n->id));
146                         se->state = STARTED;
147                         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
148                         break;
149                 case STARTED:
150                         /* Experiment is running */
151                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
152                                         "inbound", GNUNET_i2s (&se->n->id), se->e->name);
153
154                         /* do work here */
155
156                         /* Reschedule */
157                         end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
158                         if (0 == end.rel_value_us)
159                         {
160                                 se->state = STOPPED;
161                                 return; /* End of experiment is reached */
162                         }
163                         /* Reschedule */
164                         se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_inbound, se);
165                         break;
166                 case STOPPED:
167                         /* Experiment expired */
168                         break;
169                 default:
170                         break;
171         }
172
173 }
174
175 static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
176 {
177         struct ScheduledExperiment *se = cls;
178         struct GNUNET_TIME_Relative end;
179
180         se->task = GNUNET_SCHEDULER_NO_TASK;
181
182         switch (se->state) {
183                 case NOT_RUNNING:
184                         /* Send START message */
185                         GED_nodes_send_start (se->n, se->e);
186                         se->state = REQUESTED;
187                         se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
188                         experiments_requested ++;
189                         GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
190                         break;
191                 case REQUESTED:
192                         /* Expecting START_ACK */
193                         GNUNET_break (0);
194                         break;
195                 case STARTED:
196                         /* Experiment is running */
197                         GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
198                                         "outbound", GNUNET_i2s (&se->n->id), se->e->name);
199
200                         /* do work here */
201
202                         /* Reschedule */
203                         end = GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, se->e->frequency));
204                         if (0 == end.rel_value_us)
205                         {
206                                 se->state = STOPPED;
207                                 return; /* End of experiment is reached */
208                         }
209                         /* Reschedule */
210                 se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, &run_experiment_outbound, se);
211                         break;
212                 case STOPPED:
213                         /* Experiment expired */
214                         break;
215                 default:
216                         break;
217         }
218 }
219
220
221 /**
222  * Handle a START message from a remote node
223  *
224  * @param n the node
225  * @param e the experiment
226  */
227 void
228 GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
229 {
230         struct ScheduledExperiment *se;
231
232         if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) ||
233                  (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO))))
234         {
235                 GNUNET_break_op (0);
236                 return;
237         }
238
239         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for experiment `%s'\n",
240                         "START", GNUNET_i2s (&n->id), e->name);
241
242         GED_scheduler_add (n, e, GNUNET_NO);
243 }
244
245 /**
246  * Handle a START_ACK message from a remote node
247  *
248  * @param n the node
249  * @param e the experiment
250  */
251 void
252 GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
253 {
254         struct ScheduledExperiment *se;
255
256         if (NULL == (se = find_experiment (waiting_out_head, waiting_out_tail, n, e, GNUNET_YES)))
257         {
258                 GNUNET_break (0);
259                 return;
260         }
261
262         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for requested experiment `%s'\n",
263                         "START_ACK", GNUNET_i2s (&n->id), e->name);
264
265         if (GNUNET_SCHEDULER_NO_TASK != se->task)
266         {
267                 GNUNET_SCHEDULER_cancel (se->task); /* *Canceling timeout task */
268                 se->task = GNUNET_SCHEDULER_NO_TASK;
269         }
270
271         /* Remove from waiting list, add to running list */
272         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
273         GNUNET_CONTAINER_DLL_insert (running_out_head, running_out_tail, se);
274
275         /* Change state and schedule to run */
276         experiments_outbound_running ++;
277         GNUNET_STATISTICS_set (GED_stats, "# experiments outbound running", experiments_outbound_running, GNUNET_NO);
278         GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Starting outbound experiment `%s' with peer `%s'\n"),
279                         e->name, GNUNET_i2s (&n->id));
280         se->state = STARTED;
281         se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
282 }
283
284
285 /**
286  * Handle a STOP message from a remote node
287  *
288  * @param n the node
289  * @param e the experiment
290  */
291 void
292 GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
293 {
294         struct ScheduledExperiment *se;
295
296         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _("Received %s message from peer %s for experiment `%s'\n"),
297                         "STOP", GNUNET_i2s (&n->id), e->name);
298
299         if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO)))
300         {
301                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for waiting experiment `%s'\n",
302                                 "STOP", GNUNET_i2s (&n->id), e->name);
303         }
304
305         if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, e, GNUNET_NO)))
306         {
307                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %s message from peer %s for running experiment `%s'\n",
308                                 "STOP", GNUNET_i2s (&n->id), e->name);
309         }
310
311 }
312
313 /**
314  * Add a new experiment for a node
315  *
316  * @param n the node
317  * @param e the experiment
318  * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
319  */
320 void
321 GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
322 {
323         struct ScheduledExperiment *se;
324         struct GNUNET_TIME_Relative start;
325         struct GNUNET_TIME_Relative end;
326
327         GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
328
329         start = GNUNET_TIME_absolute_get_remaining(e->start);
330         end = GNUNET_TIME_absolute_get_remaining(e->stop);
331         if (0 == end.rel_value_us)
332                         return; /* End of experiment is reached */
333
334         /* Add additional checks here if required */
335         se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
336         se->state = NOT_RUNNING;
337         se->outbound = outbound;
338         se->e = e;
339         se->n = n;
340
341         if (GNUNET_YES == outbound)
342         {
343           if (0 == start.rel_value_us)
344                                 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
345                 else
346                                 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_outbound, se);
347                 GNUNET_CONTAINER_DLL_insert (waiting_out_head, waiting_out_tail, se);
348         }
349         else
350         {
351                 if (0 == start.rel_value_us)
352                                 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_inbound, se);
353                 else
354                                 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
355                 GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, se);
356         }
357
358         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Added %s experiment `%s' for node to be scheduled\n",
359                         (GNUNET_YES == outbound) ? "outbound" : "inbound", e->name, GNUNET_i2s(&se->n->id));
360         experiments_scheduled ++;
361         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
362
363 }
364
365 /**
366  * Start the scheduler component
367  */
368 void
369 GED_scheduler_start ()
370 {
371         experiments_requested = 0;
372         experiments_scheduled = 0;
373 }
374
375
376 /**
377  * Stop the scheduler component
378  */
379 void
380 GED_scheduler_stop ()
381 {
382         struct ScheduledExperiment *cur;
383         struct ScheduledExperiment *next;
384
385         next = waiting_in_head;
386         while (NULL != (cur = next))
387         {
388                         next = cur->next;
389                         GNUNET_CONTAINER_DLL_remove (waiting_in_head, waiting_in_tail, cur);
390                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
391                         {
392                                         GNUNET_SCHEDULER_cancel (cur->task);
393                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
394                         }
395                         GNUNET_free (cur);
396                         GNUNET_assert (experiments_scheduled > 0);
397                         experiments_scheduled --;
398                         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
399         }
400
401         next = running_in_head;
402         while (NULL != (cur = next))
403         {
404                         next = cur->next;
405                         GNUNET_CONTAINER_DLL_remove (running_in_head, running_in_tail, cur);
406                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
407                         {
408                                         GNUNET_SCHEDULER_cancel (cur->task);
409                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
410                         }
411                         GNUNET_free (cur);
412                         GNUNET_assert (experiments_outbound_running > 0);
413                         experiments_inbound_running --;
414                         GNUNET_STATISTICS_set (GED_stats, "# experiments inbound running", experiments_inbound_running, GNUNET_NO);
415         }
416
417         next = waiting_out_head;
418         while (NULL != (cur = next))
419         {
420                         next = cur->next;
421                         GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, cur);
422                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
423                         {
424                                         GNUNET_SCHEDULER_cancel (cur->task);
425                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
426                         }
427                         GNUNET_free (cur);
428                         GNUNET_assert (experiments_scheduled > 0);
429                         experiments_scheduled --;
430                         GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
431         }
432
433         next = running_out_head;
434         while (NULL != (cur = next))
435         {
436                         next = cur->next;
437                         GNUNET_CONTAINER_DLL_remove (running_out_head, running_out_tail, cur);
438                         if (GNUNET_SCHEDULER_NO_TASK != cur->task)
439                         {
440                                         GNUNET_SCHEDULER_cancel (cur->task);
441                                         cur->task = GNUNET_SCHEDULER_NO_TASK;
442                         }
443                         GNUNET_free (cur);
444                         GNUNET_assert (experiments_outbound_running > 0);
445                         experiments_outbound_running --;
446                         GNUNET_STATISTICS_set (GED_stats, "# experiments outbound running", experiments_outbound_running, GNUNET_NO);
447         }
448 }
449
450 /* end of gnunet-daemon-experimentation_scheduler.c */