5a10987ac510052c7d9d148cb868d52cb24b55b2
[oweals/gnunet.git] / src / util / scheduler.c
1 /*
2       This file is part of GNUnet
3       (C) 2009 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 2, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file util/scheduler.c
23  * @brief schedule computations using continuation passing style
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_common.h"
28 #include "gnunet_os_lib.h"
29 #include "gnunet_scheduler_lib.h"
30 #include "gnunet_signal_lib.h"
31 #include "gnunet_time_lib.h"
32 #ifdef LINUX
33 #include "execinfo.h"
34
35 /**
36  * Use lsof to generate file descriptor reports on select error?
37  * (turn off for stable releases).
38  */
39 #define USE_LSOF GNUNET_YES
40
41 /**
42  * Obtain trace information for all scheduler calls that schedule tasks.
43  */
44 #define EXECINFO GNUNET_NO
45
46 /**
47  * Depth of the traces collected via EXECINFO.
48  */
49 #define MAX_TRACE_DEPTH 50
50 #endif
51
52 #define DEBUG_TASKS GNUNET_NO
53
54 /**
55  * Should we figure out which tasks are delayed for a while
56  * before they are run? (Consider using in combination with EXECINFO).
57  */
58 #define PROFILE_DELAYS GNUNET_NO
59
60 /**
61  * Task that were in the queue for longer than this are reported if
62  * PROFILE_DELAYS is active.
63  */
64 #define DELAY_THRESHOLD GNUNET_TIME_UNIT_SECONDS
65
66 /**
67  * Linked list of pending tasks.
68  */
69 struct Task
70 {
71   /**
72    * This is a linked list.
73    */
74   struct Task *next;
75
76   /**
77    * Function to run when ready.
78    */
79   GNUNET_SCHEDULER_Task callback;
80
81   /**
82    * Closure for the callback.
83    */
84   void *callback_cls;
85
86   /**
87    * Set of file descriptors this task is waiting
88    * for for reading.  Once ready, this is updated
89    * to reflect the set of file descriptors ready
90    * for operation.
91    */
92   struct GNUNET_NETWORK_FDSet *read_set;
93
94   /**
95    * Set of file descriptors this task is waiting for for writing.
96    * Once ready, this is updated to reflect the set of file
97    * descriptors ready for operation.
98    */
99   struct GNUNET_NETWORK_FDSet *write_set;
100
101   /**
102    * Unique task identifier.
103    */
104   GNUNET_SCHEDULER_TaskIdentifier id;
105
106   /**
107    * Identifier of a prerequisite task.
108    */
109   GNUNET_SCHEDULER_TaskIdentifier prereq_id;
110
111   /**
112    * Absolute timeout value for the task, or
113    * GNUNET_TIME_UNIT_FOREVER_ABS for "no timeout".
114    */
115   struct GNUNET_TIME_Absolute timeout;
116
117 #if PROFILE_DELAYS
118   /**
119    * When was the task scheduled?
120    */
121   struct GNUNET_TIME_Absolute start_time;
122 #endif
123
124   /**
125    * Why is the task ready?  Set after task is added to ready queue.
126    * Initially set to zero.  All reasons that have already been
127    * satisfied (i.e.  read or write ready) will be set over time.
128    */
129   enum GNUNET_SCHEDULER_Reason reason;
130
131   /**
132    * Task priority.
133    */
134   enum GNUNET_SCHEDULER_Priority priority;
135
136 #if EXECINFO
137   /**
138    * Array of strings which make up a backtrace from the point when this
139    * task was scheduled (essentially, who scheduled the task?)
140    */
141   char **backtrace_strings;
142
143   /**
144    * Size of the backtrace_strings array
145    */
146   int num_backtrace_strings;
147 #endif
148
149 };
150
151
152 /**
153  * Handle for the scheduling service.
154  */
155 struct GNUNET_SCHEDULER_Handle
156 {
157
158   /**
159    * List of tasks waiting for an event.
160    */
161   struct Task *pending;
162
163   /**
164    * ID of the task that is running right now.
165    */
166   struct Task *active_task;
167
168   /**
169    * List of tasks ready to run right now,
170    * grouped by importance.
171    */
172   struct Task *ready[GNUNET_SCHEDULER_PRIORITY_COUNT];
173
174   /**
175    * Identity of the last task queued.  Incremented for each task to
176    * generate a unique task ID (it is virtually impossible to start
177    * more than 2^64 tasks during the lifetime of a process).
178    */
179   GNUNET_SCHEDULER_TaskIdentifier last_id;
180
181   /**
182    * Highest number so that all tasks with smaller identifiers
183    * have already completed.  Also the lowest number of a task
184    * still waiting to be executed.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier lowest_pending_id;
187
188   /**
189    * Number of tasks on the ready list.
190    */
191   unsigned int ready_count;
192
193   /**
194    * How many tasks have we run so far?
195    */
196   unsigned long long tasks_run;
197
198   /**
199    * Priority of the task running right now.  Only
200    * valid while a task is running.
201    */
202   enum GNUNET_SCHEDULER_Priority current_priority;
203
204   /**
205    * How 'nice' are we right now?
206    */
207   int nice_level;
208
209 };
210
211
212 /**
213  * Check that the given priority is legal (and return it).
214  *
215  * @param p priority value to check
216  * @return p on success, 0 on error
217  */
218 static enum GNUNET_SCHEDULER_Priority
219 check_priority (enum GNUNET_SCHEDULER_Priority p)
220 {
221   if ((p >= 0) && (p < GNUNET_SCHEDULER_PRIORITY_COUNT))
222     return p;
223   GNUNET_assert (0);
224   return 0;                     /* make compiler happy */
225 }
226
227
228 /**
229  * Is a task with this identifier still pending?  Also updates
230  * "lowest_pending_id" as a side-effect (for faster checks in the
231  * future), but only if the return value is "GNUNET_NO" (and
232  * the "lowest_pending_id" check failed).
233  *
234  * @param sched the scheduler
235  * @param id which task are we checking for
236  * @return GNUNET_YES if so, GNUNET_NO if not
237  */
238 static int
239 is_pending (struct GNUNET_SCHEDULER_Handle *sched,
240             GNUNET_SCHEDULER_TaskIdentifier id)
241 {
242   struct Task *pos;
243   enum GNUNET_SCHEDULER_Priority p;
244   GNUNET_SCHEDULER_TaskIdentifier min;
245
246   if (id < sched->lowest_pending_id)
247     return GNUNET_NO;
248   min = -1;                     /* maximum value */
249   pos = sched->pending;
250   while (pos != NULL)
251     {
252       if (pos->id == id)
253         return GNUNET_YES;
254       if (pos->id < min)
255         min = pos->id;
256       pos = pos->next;
257     }
258   for (p = 0; p < GNUNET_SCHEDULER_PRIORITY_COUNT; p++)
259     {
260       pos = sched->ready[p];
261       while (pos != NULL)
262         {
263           if (pos->id == id)
264             return GNUNET_YES;
265           if (pos->id < min)
266             min = pos->id;
267           pos = pos->next;
268         }
269     }
270   sched->lowest_pending_id = min;
271   return GNUNET_NO;
272 }
273
274
275 /**
276  * Update all sets and timeout for select.
277  *
278  * @param sched the scheduler
279  * @param rs read-set, set to all FDs we would like to read (updated)
280  * @param ws write-set, set to all FDs we would like to write (updated)
281  * @param timeout next timeout (updated)
282  */
283 static void
284 update_sets (struct GNUNET_SCHEDULER_Handle *sched,
285              struct GNUNET_NETWORK_FDSet *rs,
286              struct GNUNET_NETWORK_FDSet *ws,
287              struct GNUNET_TIME_Relative *timeout)
288 {
289   struct Task *pos;
290
291   pos = sched->pending;
292   while (pos != NULL)
293     {
294       if ((pos->prereq_id != GNUNET_SCHEDULER_NO_TASK) &&
295           (GNUNET_YES == is_pending (sched, pos->prereq_id)))
296         {
297           pos = pos->next;
298           continue;
299         }
300
301       if (pos->timeout.value != GNUNET_TIME_UNIT_FOREVER_ABS.value)
302         {
303           struct GNUNET_TIME_Relative to;
304
305           to = GNUNET_TIME_absolute_get_remaining (pos->timeout);
306           if (timeout->value > to.value)
307             *timeout = to;
308         }
309       /* FIXME: this is a very expensive (9% of runtime for some
310          benchmarks!) way to merge the bit sets; specializing
311          the common case where we only have one bit in the pos's
312          set should improve performance dramatically! */
313       if (pos->read_set != NULL)
314         GNUNET_NETWORK_fdset_add (rs, pos->read_set);
315       if (pos->write_set != NULL)
316         GNUNET_NETWORK_fdset_add (ws, pos->write_set);
317       if (pos->reason != 0)
318         *timeout = GNUNET_TIME_UNIT_ZERO;
319       pos = pos->next;
320     }
321 }
322
323
324 /**
325  * Check if the ready set overlaps with the set we want to have ready.
326  * If so, update the want set (set all FDs that are ready).  If not,
327  * return GNUNET_NO.
328  *
329  * @param ready set that is ready
330  * @param want set that we want to be ready
331  * @return GNUNET_YES if there was some overlap
332  */
333 static int
334 set_overlaps (const struct GNUNET_NETWORK_FDSet *ready,
335               struct GNUNET_NETWORK_FDSet *want)
336 {
337   if (NULL == want)
338     return GNUNET_NO;
339   /* FIXME: this is a very expensive (10% of runtime for some
340      benchmarks!) way to merge the bit sets; specializing
341      the common case where we only have one bit in the pos's
342      set should improve performance dramatically! */
343   if (GNUNET_NETWORK_fdset_overlap (ready, want))
344     {
345       /* copy all over (yes, there maybe unrelated bits,
346          but this should not hurt well-written clients) */
347       GNUNET_NETWORK_fdset_copy (want, ready);
348       return GNUNET_YES;
349     }
350   return GNUNET_NO;
351 }
352
353
354 /**
355  * Check if the given task is eligible to run now.
356  * Also set the reason why it is eligible.
357  *
358  * @param sched the scheduler
359  * @param task task to check if it is ready
360  * @param now the current time
361  * @param rs set of FDs ready for reading
362  * @param ws set of FDs ready for writing
363  * @return GNUNET_YES if we can run it, GNUNET_NO if not.
364  */
365 static int
366 is_ready (struct GNUNET_SCHEDULER_Handle *sched,
367           struct Task *task,
368           struct GNUNET_TIME_Absolute now,
369           const struct GNUNET_NETWORK_FDSet *rs,
370           const struct GNUNET_NETWORK_FDSet *ws)
371 {
372   if (now.value >= task->timeout.value)
373     task->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
374   if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
375       (rs != NULL) && (set_overlaps (rs, task->read_set)))
376     task->reason |= GNUNET_SCHEDULER_REASON_READ_READY;
377   if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
378       (ws != NULL) && (set_overlaps (ws, task->write_set)))
379     task->reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
380   if (task->reason == 0)
381     return GNUNET_NO;           /* not ready */
382   if (task->prereq_id != GNUNET_SCHEDULER_NO_TASK)
383     {
384       if (GNUNET_YES == is_pending (sched, task->prereq_id))
385         return GNUNET_NO;       /* prereq waiting */
386       task->reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
387     }
388   return GNUNET_YES;
389 }
390
391
392 /**
393  * Put a task that is ready for execution into the ready queue.
394  *
395  * @param handle the scheduler
396  * @param task task ready for execution
397  */
398 static void
399 queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle, struct Task *task)
400 {
401   enum GNUNET_SCHEDULER_Priority p = task->priority;
402   if (0 != (task->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
403     p = GNUNET_SCHEDULER_PRIORITY_SHUTDOWN;
404   task->next = handle->ready[check_priority (p)];
405   handle->ready[check_priority (p)] = task;
406   handle->ready_count++;
407 }
408
409
410 /**
411  * Check which tasks are ready and move them
412  * to the respective ready queue.
413  *
414  * @param handle the scheduler
415  * @param rs FDs ready for reading
416  * @param ws FDs ready for writing
417  */
418 static void
419 check_ready (struct GNUNET_SCHEDULER_Handle *handle,
420              const struct GNUNET_NETWORK_FDSet *rs,
421              const struct GNUNET_NETWORK_FDSet *ws)
422 {
423   struct Task *pos;
424   struct Task *prev;
425   struct Task *next;
426   struct GNUNET_TIME_Absolute now;
427
428   now = GNUNET_TIME_absolute_get ();
429   prev = NULL;
430   pos = handle->pending;
431   while (pos != NULL)
432     {
433 #if DEBUG_TASKS
434       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
435                   "Checking readiness of task: %llu / %p\n",
436                   pos->id, pos->callback_cls);
437 #endif
438       next = pos->next;
439       if (GNUNET_YES == is_ready (handle, pos, now, rs, ws))
440         {
441           if (prev == NULL)
442             handle->pending = next;
443           else
444             prev->next = next;
445           queue_ready_task (handle, pos);
446           pos = next;
447           continue;
448         }
449       prev = pos;
450       pos = next;
451     }
452 }
453
454
455 /**
456  * Request the shutdown of a scheduler.  Marks all currently
457  * pending tasks as ready because of shutdown.  This will
458  * cause all tasks to run (as soon as possible, respecting
459  * priorities and prerequisite tasks).  Note that tasks
460  * scheduled AFTER this call may still be delayed arbitrarily.
461  *
462  * @param sched the scheduler
463  */
464 void
465 GNUNET_SCHEDULER_shutdown (struct GNUNET_SCHEDULER_Handle *sched)
466 {
467   struct Task *pos;
468   int i;
469
470   pos = sched->pending;
471   while (pos != NULL)
472     {
473       pos->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
474       /* we don't move the task into the ready queue yet; check_ready
475          will do that later, possibly adding additional
476          readiness-factors */
477       pos = pos->next;
478     }
479   for (i=0;i<GNUNET_SCHEDULER_PRIORITY_COUNT;i++)
480     {
481       pos = sched->ready[i];
482       while (pos != NULL)
483         {
484           pos->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
485           /* we don't move the task into the ready queue yet; check_ready
486              will do that later, possibly adding additional
487              readiness-factors */
488           pos = pos->next;
489         }
490     }  
491 }
492
493
494 /**
495  * Destroy a task (release associated resources)
496  *
497  * @param t task to destroy
498  */
499 static void
500 destroy_task (struct Task *t)
501 {
502   if (NULL != t->read_set)
503     GNUNET_NETWORK_fdset_destroy (t->read_set);
504   if (NULL != t->write_set)
505     GNUNET_NETWORK_fdset_destroy (t->write_set);
506 #if EXECINFO
507   GNUNET_free (t->backtrace_strings);
508 #endif
509   GNUNET_free (t);
510 }
511
512
513 /**
514  * Run at least one task in the highest-priority queue that is not
515  * empty.  Keep running tasks until we are either no longer running
516  * "URGENT" tasks or until we have at least one "pending" task (which
517  * may become ready, hence we should select on it).  Naturally, if
518  * there are no more ready tasks, we also return.  
519  *
520  * @param sched the scheduler
521  */
522 static void
523 run_ready (struct GNUNET_SCHEDULER_Handle *sched)
524 {
525   enum GNUNET_SCHEDULER_Priority p;
526   struct Task *pos;
527   struct GNUNET_SCHEDULER_TaskContext tc;
528
529   do
530     {
531       if (sched->ready_count == 0)
532         return;
533       GNUNET_assert (sched->ready[GNUNET_SCHEDULER_PRIORITY_KEEP] == NULL);
534       /* yes, p>0 is correct, 0 is "KEEP" which should
535          always be an empty queue (see assertion)! */
536       for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--)
537         {
538           pos = sched->ready[p];
539           if (pos != NULL)
540             break;
541         }
542       GNUNET_assert (pos != NULL);      /* ready_count wrong? */
543       sched->ready[p] = pos->next;
544       sched->ready_count--;
545       if (sched->current_priority != pos->priority)
546         {
547           sched->current_priority = pos->priority;
548           (void) GNUNET_OS_set_process_priority (0, pos->priority);
549         }
550       sched->active_task = pos;
551 #if PROFILE_DELAYS
552       if (GNUNET_TIME_absolute_get_duration (pos->start_time).value >
553           DELAY_THRESHOLD.value)
554         {
555           GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
556                       "Task %u took %llums to be scheduled\n",
557                       pos->id,
558                       (unsigned long long) GNUNET_TIME_absolute_get_duration (pos->start_time).value);
559         }
560 #endif
561       tc.sched = sched;
562       tc.reason = pos->reason;
563       tc.read_ready = pos->read_set;
564       tc.write_ready = pos->write_set;
565 #if DEBUG_TASKS
566       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567                   "Running task: %llu / %p\n", pos->id, pos->callback_cls);
568 #endif
569       pos->callback (pos->callback_cls, &tc);
570 #if EXECINFO
571       int i;
572       for (i=0;i<pos->num_backtrace_strings;i++)
573         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
574                     "Task %u trace %d: %s\n",
575                     pos->id,
576                     i,
577                     pos->backtrace_strings[i]);
578 #endif
579       sched->active_task = NULL;
580       destroy_task (pos);
581       sched->tasks_run++;
582     }
583   while ((sched->pending == NULL) || (p == GNUNET_SCHEDULER_PRIORITY_URGENT));
584 }
585
586 /**
587  * Pipe used to communicate shutdown via signal.
588  */
589 static struct GNUNET_DISK_PipeHandle *sigpipe;
590
591
592 /**
593  * Signal handler called for signals that should cause us to shutdown.
594  */
595 static void
596 sighandler_shutdown ()
597 {
598   static char c;
599
600   GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
601                           (sigpipe, GNUNET_DISK_PIPE_END_WRITE), &c,
602                           sizeof (c));
603 }
604
605
606 /**
607  * Initialize and run scheduler.  This function will return when all
608  * tasks have completed.  On systems with signals, receiving a SIGTERM
609  * (and other similar signals) will cause "GNUNET_SCHEDULER_shutdown"
610  * to be run after the active task is complete.  As a result, SIGTERM
611  * causes all active tasks to be scheduled with reason
612  * "GNUNET_SCHEDULER_REASON_SHUTDOWN".  (However, tasks added
613  * afterwards will execute normally!). Note that any particular signal
614  * will only shut down one scheduler; applications should always only
615  * create a single scheduler.
616  *
617  * @param task task to run immediately
618  * @param task_cls closure of task
619  */
620 void
621 GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_Task task, void *task_cls)
622 {
623   struct GNUNET_SCHEDULER_Handle sched;
624   struct GNUNET_NETWORK_FDSet *rs;
625   struct GNUNET_NETWORK_FDSet *ws;
626   struct GNUNET_TIME_Relative timeout;
627   int ret;
628   struct GNUNET_SIGNAL_Context *shc_int;
629   struct GNUNET_SIGNAL_Context *shc_term;
630   struct GNUNET_SIGNAL_Context *shc_quit;
631   struct GNUNET_SIGNAL_Context *shc_hup;
632   unsigned long long last_tr;
633   unsigned int busy_wait_warning;
634   const struct GNUNET_DISK_FileHandle *pr;
635   char c;
636
637   rs = GNUNET_NETWORK_fdset_create ();
638   ws = GNUNET_NETWORK_fdset_create ();
639   GNUNET_assert (sigpipe == NULL);
640   sigpipe = GNUNET_DISK_pipe (GNUNET_NO);
641   GNUNET_assert (sigpipe != NULL);
642   pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
643   GNUNET_assert (pr != NULL);
644   shc_int = GNUNET_SIGNAL_handler_install (SIGINT, &sighandler_shutdown);
645   shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, &sighandler_shutdown);
646 #ifndef MINGW
647   shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, &sighandler_shutdown);
648   shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, &sighandler_shutdown);
649 #endif
650   memset (&sched, 0, sizeof (sched));
651   sched.current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT;
652   GNUNET_SCHEDULER_add_continuation (&sched,
653                                      task,
654                                      task_cls,
655                                      GNUNET_SCHEDULER_REASON_STARTUP);
656   last_tr = 0;
657   busy_wait_warning = 0;
658   while ((sched.pending != NULL) || (sched.ready_count > 0))
659     {
660       GNUNET_NETWORK_fdset_zero (rs);
661       GNUNET_NETWORK_fdset_zero (ws);
662       timeout = GNUNET_TIME_UNIT_FOREVER_REL;
663       update_sets (&sched, rs, ws, &timeout);
664       GNUNET_NETWORK_fdset_handle_set (rs, pr);
665       if (sched.ready_count > 0)
666         {
667           /* no blocking, more work already ready! */
668           timeout = GNUNET_TIME_UNIT_ZERO;
669         }
670       ret = GNUNET_NETWORK_socket_select (rs, ws, NULL, timeout);
671       if (ret == GNUNET_SYSERR)
672         {
673           if (errno == EINTR)
674             continue;
675
676           GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "select");
677 #ifndef MINGW
678 #if USE_LSOF
679           char lsof[512];
680           snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid());
681           close (1);
682           dup2 (2, 1);
683           system (lsof);                  
684 #endif
685 #endif
686           abort ();
687           break;
688         }
689       if (GNUNET_NETWORK_fdset_handle_isset (rs, pr))
690         {
691           /* consume the signal */
692           GNUNET_DISK_file_read (pr, &c, sizeof (c));
693           /* mark all active tasks as ready due to shutdown */
694           GNUNET_SCHEDULER_shutdown (&sched);
695         }
696       if (last_tr == sched.tasks_run)
697         {
698           busy_wait_warning++;
699         }
700       else
701         {
702           last_tr = sched.tasks_run;
703           busy_wait_warning = 0;
704         }
705       if ((ret == 0) && (timeout.value == 0) && (busy_wait_warning > 16))
706         {
707           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
708                       _("Looks like we're busy waiting...\n"));
709           sleep (1);            /* mitigate */
710         }
711       check_ready (&sched, rs, ws);
712       run_ready (&sched);
713     }
714   GNUNET_SIGNAL_handler_uninstall (shc_int);
715   GNUNET_SIGNAL_handler_uninstall (shc_term);
716 #ifndef MINGW
717   GNUNET_SIGNAL_handler_uninstall (shc_quit);
718   GNUNET_SIGNAL_handler_uninstall (shc_hup);
719 #endif
720   GNUNET_DISK_pipe_close (sigpipe);
721   sigpipe = NULL;
722   GNUNET_NETWORK_fdset_destroy (rs);
723   GNUNET_NETWORK_fdset_destroy (ws);
724 }
725
726
727 /**
728  * Obtain the reason code for why the current task was
729  * started.  Will return the same value as 
730  * the GNUNET_SCHEDULER_TaskContext's reason field.
731  *
732  * @param sched scheduler to query
733  * @return reason(s) why the current task is run
734  */
735 enum GNUNET_SCHEDULER_Reason
736 GNUNET_SCHEDULER_get_reason (struct GNUNET_SCHEDULER_Handle *sched)
737 {
738   return sched->active_task->reason;
739 }
740
741
742 /**
743  * Get information about the current load of this scheduler.  Use this
744  * function to determine if an elective task should be added or simply
745  * dropped (if the decision should be made based on the number of
746  * tasks ready to run).
747  *
748  * @param sched scheduler to query
749  * @param p priority level to look at
750  * @return number of tasks pending right now
751  */
752 unsigned int
753 GNUNET_SCHEDULER_get_load (struct GNUNET_SCHEDULER_Handle *sched,
754                            enum GNUNET_SCHEDULER_Priority p)
755 {
756   struct Task *pos;
757   unsigned int ret;
758
759   if (p == GNUNET_SCHEDULER_PRIORITY_COUNT)
760     return sched->ready_count;
761   if (p == GNUNET_SCHEDULER_PRIORITY_KEEP)
762     p = sched->current_priority;
763   ret = 0;
764   pos = sched->ready[p];
765   while (pos != NULL)
766     {
767       pos = pos->next;
768       ret++;
769     }
770   return ret;
771 }
772
773
774 /**
775  * Cancel the task with the specified identifier.
776  * The task must not yet have run.
777  *
778  * @param sched scheduler to use
779  * @param task id of the task to cancel
780  * @return original closure of the task
781  */
782 void *
783 GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Handle *sched,
784                          GNUNET_SCHEDULER_TaskIdentifier task)
785 {
786   struct Task *t;
787   struct Task *prev;
788   enum GNUNET_SCHEDULER_Priority p;
789   void *ret;
790 #if EXECINFO
791   int i;
792 #endif
793   prev = NULL;
794   t = sched->pending;
795   while (t != NULL)
796     {
797       if (t->id == task)
798         break;
799       prev = t;
800       t = t->next;
801     }
802   p = 0;
803   while (t == NULL)
804     {
805       p++;
806       GNUNET_assert (p < GNUNET_SCHEDULER_PRIORITY_COUNT);
807       prev = NULL;
808       t = sched->ready[p];
809       while (t != NULL)
810         {
811           if (t->id == task)
812             {
813               sched->ready_count--;
814               break;
815             }
816           prev = t;
817           t = t->next;
818         }
819     }
820   if (prev == NULL)
821     {
822       if (p == 0)
823         sched->pending = t->next;
824       else
825         sched->ready[p] = t->next;
826     }
827   else
828     prev->next = t->next;
829   ret = t->callback_cls;
830 #if DEBUG_TASKS
831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832               "Canceling task: %llu / %p\n", task, t->callback_cls);
833 #endif
834   destroy_task (t);
835   return ret;
836 }
837
838
839 /**
840  * Continue the current execution with the given function.  This is
841  * similar to the other "add" functions except that there is no delay
842  * and the reason code can be specified.
843  *
844  * @param sched scheduler to use
845  * @param task main function of the task
846  * @param task_cls closure for 'main'
847  * @param reason reason for task invocation
848  */
849 void
850 GNUNET_SCHEDULER_add_continuation (struct GNUNET_SCHEDULER_Handle *sched,
851                                    GNUNET_SCHEDULER_Task task,
852                                    void *task_cls,
853                                    enum GNUNET_SCHEDULER_Reason reason)
854 {
855   struct Task *t;
856 #if EXECINFO
857   void *backtrace_array[50];
858 #endif
859   t = GNUNET_malloc (sizeof (struct Task));
860 #if EXECINFO
861   t->num_backtrace_strings = backtrace(backtrace_array, 50);
862   t->backtrace_strings = backtrace_symbols(backtrace_array, t->num_backtrace_strings);
863 #endif
864   t->callback = task;
865   t->callback_cls = task_cls;
866   t->id = ++sched->last_id;
867 #if PROFILE_DELAYS
868   t->start_time = GNUNET_TIME_absolute_get ();
869 #endif
870   t->reason = reason;
871   t->priority = sched->current_priority;
872 #if DEBUG_TASKS
873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874               "Adding continuation task: %llu / %p\n",
875               t->id, t->callback_cls);
876 #endif
877   queue_ready_task (sched, t);
878 }
879
880
881
882 /**
883  * Schedule a new task to be run after the specified prerequisite task
884  * has completed. It will be run with the priority of the calling
885  * task.
886  *
887  * @param sched scheduler to use
888  * @param prerequisite_task run this task after the task with the given
889  *        task identifier completes (and any of our other
890  *        conditions, such as delay, read or write-readiness
891  *        are satisfied).  Use  GNUNET_SCHEDULER_NO_TASK to not have any dependency
892  *        on completion of other tasks (this will cause the task to run as
893  *        soon as possible).
894  * @param task main function of the task
895  * @param task_cls closure of task
896  * @return unique task identifier for the job
897  *         only valid until "task" is started!
898  */
899 GNUNET_SCHEDULER_TaskIdentifier
900 GNUNET_SCHEDULER_add_after (struct GNUNET_SCHEDULER_Handle *sched,
901                             GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
902                             GNUNET_SCHEDULER_Task task, void *task_cls)
903 {
904   return GNUNET_SCHEDULER_add_select (sched,
905                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
906                                       prerequisite_task,
907                                       GNUNET_TIME_UNIT_ZERO,
908                                       NULL, NULL, task, task_cls);
909 }
910
911
912 /**
913  * Schedule a new task to be run with a specified priority.
914  *
915  * @param sched scheduler to use
916  * @param prio how important is the new task?
917  * @param task main function of the task
918  * @param task_cls closure of task
919  * @return unique task identifier for the job
920  *         only valid until "task" is started!
921  */
922 GNUNET_SCHEDULER_TaskIdentifier
923 GNUNET_SCHEDULER_add_with_priority (struct GNUNET_SCHEDULER_Handle * sched,
924                                     enum GNUNET_SCHEDULER_Priority prio,
925                                     GNUNET_SCHEDULER_Task task,
926                                     void *task_cls)
927 {
928   return GNUNET_SCHEDULER_add_select (sched,
929                                       prio,
930                                       GNUNET_SCHEDULER_NO_TASK,
931                                       GNUNET_TIME_UNIT_ZERO,
932                                       NULL, NULL, task, task_cls);
933 }
934
935
936
937 /**
938  * Schedule a new task to be run with a specified delay.  The task
939  * will be scheduled for execution once the delay has expired. It
940  * will be run with the priority of the calling task.
941  *
942  * @param sched scheduler to use
943  * @param delay when should this operation time out? Use 
944  *        GNUNET_TIME_UNIT_FOREVER_REL for "on shutdown"
945  * @param task main function of the task
946  * @param task_cls closure of task
947  * @return unique task identifier for the job
948  *         only valid until "task" is started!
949  */
950 GNUNET_SCHEDULER_TaskIdentifier
951 GNUNET_SCHEDULER_add_delayed (struct GNUNET_SCHEDULER_Handle * sched,
952                               struct GNUNET_TIME_Relative delay,
953                               GNUNET_SCHEDULER_Task task, void *task_cls)
954 {
955   return GNUNET_SCHEDULER_add_select (sched,
956                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
957                                       GNUNET_SCHEDULER_NO_TASK, delay,
958                                       NULL, NULL, task, task_cls);
959 }
960
961
962
963 /**
964  * Schedule a new task to be run as soon as possible. The task
965  * will be run with the priority of the calling task.
966  *
967  * @param sched scheduler to use
968  * @param task main function of the task
969  * @param task_cls closure of task
970  * @return unique task identifier for the job
971  *         only valid until "task" is started!
972  */
973 GNUNET_SCHEDULER_TaskIdentifier
974 GNUNET_SCHEDULER_add_now (struct GNUNET_SCHEDULER_Handle *sched,
975                           GNUNET_SCHEDULER_Task task,
976                           void *task_cls)
977 {
978   return GNUNET_SCHEDULER_add_select (sched,
979                                       GNUNET_SCHEDULER_PRIORITY_KEEP,
980                                       GNUNET_SCHEDULER_NO_TASK,
981                                       GNUNET_TIME_UNIT_ZERO,
982                                       NULL, NULL, task, task_cls);
983 }
984
985
986
987 /**
988  * Schedule a new task to be run with a specified delay or when the
989  * specified file descriptor is ready for reading.  The delay can be
990  * used as a timeout on the socket being ready.  The task will be
991  * scheduled for execution once either the delay has expired or the
992  * socket operation is ready.  It will be run with the priority of
993  * the calling task.
994  *
995  * @param sched scheduler to use
996  * @param delay when should this operation time out? Use 
997  *        GNUNET_TIME_UNIT_FOREVER_REL for "on shutdown"
998  * @param rfd read file-descriptor
999  * @param task main function of the task
1000  * @param task_cls closure of task
1001  * @return unique task identifier for the job
1002  *         only valid until "task" is started!
1003  */
1004 GNUNET_SCHEDULER_TaskIdentifier
1005 GNUNET_SCHEDULER_add_read_net (struct GNUNET_SCHEDULER_Handle * sched,
1006                                struct GNUNET_TIME_Relative delay,
1007                                struct GNUNET_NETWORK_Handle * rfd,
1008                                GNUNET_SCHEDULER_Task task, void *task_cls)
1009 {
1010   struct GNUNET_NETWORK_FDSet *rs;
1011   GNUNET_SCHEDULER_TaskIdentifier ret;
1012
1013   GNUNET_assert (rfd != NULL);
1014   rs = GNUNET_NETWORK_fdset_create ();
1015   GNUNET_NETWORK_fdset_set (rs, rfd);
1016   ret = GNUNET_SCHEDULER_add_select (sched,
1017                                      GNUNET_SCHEDULER_PRIORITY_KEEP,
1018                                      GNUNET_SCHEDULER_NO_TASK,
1019                                      delay, rs, NULL, task, task_cls);
1020   GNUNET_NETWORK_fdset_destroy (rs);
1021   return ret;
1022 }
1023
1024
1025 /**
1026  * Schedule a new task to be run with a specified delay or when the
1027  * specified file descriptor is ready for writing.  The delay can be
1028  * used as a timeout on the socket being ready.  The task will be
1029  * scheduled for execution once either the delay has expired or the
1030  * socket operation is ready.  It will be run with the priority of
1031  * the calling task.
1032  *
1033  * @param sched scheduler to use
1034  * @param delay when should this operation time out? Use 
1035  *        GNUNET_TIME_UNIT_FOREVER_REL for "on shutdown"
1036  * @param wfd write file-descriptor
1037  * @param task main function of the task
1038  * @param task_cls closure of task
1039  * @return unique task identifier for the job
1040  *         only valid until "task" is started!
1041  */
1042 GNUNET_SCHEDULER_TaskIdentifier
1043 GNUNET_SCHEDULER_add_write_net (struct GNUNET_SCHEDULER_Handle * sched,
1044                                 struct GNUNET_TIME_Relative delay,
1045                                 struct GNUNET_NETWORK_Handle * wfd,
1046                                 GNUNET_SCHEDULER_Task task, void *task_cls)
1047 {
1048   struct GNUNET_NETWORK_FDSet *ws;
1049   GNUNET_SCHEDULER_TaskIdentifier ret;
1050
1051   GNUNET_assert (wfd != NULL);
1052   ws = GNUNET_NETWORK_fdset_create ();
1053   GNUNET_NETWORK_fdset_set (ws, wfd);
1054   ret = GNUNET_SCHEDULER_add_select (sched,
1055                                      GNUNET_SCHEDULER_PRIORITY_KEEP,
1056                                      GNUNET_SCHEDULER_NO_TASK, delay,
1057                                      NULL, ws, task, task_cls);
1058   GNUNET_NETWORK_fdset_destroy (ws);
1059   return ret;
1060 }
1061
1062
1063 /**
1064  * Schedule a new task to be run with a specified delay or when the
1065  * specified file descriptor is ready for reading.  The delay can be
1066  * used as a timeout on the socket being ready.  The task will be
1067  * scheduled for execution once either the delay has expired or the
1068  * socket operation is ready. It will be run with the priority of
1069  * the calling task.
1070  *
1071  * @param sched scheduler to use
1072  * @param delay when should this operation time out? Use 
1073  *        GNUNET_TIME_UNIT_FOREVER_REL for "on shutdown"
1074  * @param rfd read file-descriptor
1075  * @param task main function of the task
1076  * @param task_cls closure of task
1077  * @return unique task identifier for the job
1078  *         only valid until "task" is started!
1079  */
1080 GNUNET_SCHEDULER_TaskIdentifier
1081 GNUNET_SCHEDULER_add_read_file (struct GNUNET_SCHEDULER_Handle * sched,
1082                                 struct GNUNET_TIME_Relative delay,
1083                                 const struct GNUNET_DISK_FileHandle * rfd,
1084                                 GNUNET_SCHEDULER_Task task, void *task_cls)
1085 {
1086   struct GNUNET_NETWORK_FDSet *rs;
1087   GNUNET_SCHEDULER_TaskIdentifier ret;
1088
1089   GNUNET_assert (rfd != NULL);
1090   rs = GNUNET_NETWORK_fdset_create ();
1091   GNUNET_NETWORK_fdset_handle_set (rs, rfd);
1092   ret = GNUNET_SCHEDULER_add_select (sched,
1093                                      GNUNET_SCHEDULER_PRIORITY_KEEP,
1094                                      GNUNET_SCHEDULER_NO_TASK, delay,
1095                                      rs, NULL, task, task_cls);
1096   GNUNET_NETWORK_fdset_destroy (rs);
1097   return ret;
1098 }
1099
1100
1101 /**
1102  * Schedule a new task to be run with a specified delay or when the
1103  * specified file descriptor is ready for writing.  The delay can be
1104  * used as a timeout on the socket being ready.  The task will be
1105  * scheduled for execution once either the delay has expired or the
1106  * socket operation is ready. It will be run with the priority of
1107  * the calling task.
1108  *
1109  * @param sched scheduler to use
1110  * @param delay when should this operation time out? Use 
1111  *        GNUNET_TIME_UNIT_FOREVER_REL for "on shutdown"
1112  * @param wfd write file-descriptor
1113  * @param task main function of the task
1114  * @param task_cls closure of task
1115  * @return unique task identifier for the job
1116  *         only valid until "task" is started!
1117  */
1118 GNUNET_SCHEDULER_TaskIdentifier
1119 GNUNET_SCHEDULER_add_write_file (struct GNUNET_SCHEDULER_Handle * sched,
1120                                  struct GNUNET_TIME_Relative delay,
1121                                  const struct GNUNET_DISK_FileHandle * wfd,
1122                                  GNUNET_SCHEDULER_Task task, void *task_cls)
1123 {
1124   struct GNUNET_NETWORK_FDSet *ws;
1125   GNUNET_SCHEDULER_TaskIdentifier ret;
1126
1127   GNUNET_assert (wfd != NULL);
1128   ws = GNUNET_NETWORK_fdset_create ();
1129   GNUNET_NETWORK_fdset_handle_set (ws, wfd);
1130   ret = GNUNET_SCHEDULER_add_select (sched,
1131                                      GNUNET_SCHEDULER_PRIORITY_KEEP,
1132                                      GNUNET_SCHEDULER_NO_TASK,
1133                                      delay, NULL, ws, task, task_cls);
1134   GNUNET_NETWORK_fdset_destroy (ws);
1135   return ret;
1136 }
1137
1138
1139
1140 /**
1141  * Schedule a new task to be run with a specified delay or when any of
1142  * the specified file descriptor sets is ready.  The delay can be used
1143  * as a timeout on the socket(s) being ready.  The task will be
1144  * scheduled for execution once either the delay has expired or any of
1145  * the socket operations is ready.  This is the most general
1146  * function of the "add" family.  Note that the "prerequisite_task"
1147  * must be satisfied in addition to any of the other conditions.  In
1148  * other words, the task will be started when
1149  * <code>
1150  * (prerequisite-run)
1151  * && (delay-ready
1152  *     || any-rs-ready
1153  *     || any-ws-ready
1154  *     || (shutdown-active && run-on-shutdown) )
1155  * </code>
1156  *
1157  * @param sched scheduler to use
1158  * @param prio how important is this task?
1159  * @param prerequisite_task run this task after the task with the given
1160  *        task identifier completes (and any of our other
1161  *        conditions, such as delay, read or write-readiness
1162  *        are satisfied).  Use GNUNET_SCHEDULER_NO_TASK to not have any dependency
1163  *        on completion of other tasks.
1164  * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever",
1165  *        which means that the task will only be run after we receive SIGTERM
1166  * @param rs set of file descriptors we want to read (can be NULL)
1167  * @param ws set of file descriptors we want to write (can be NULL)
1168  * @param task main function of the task
1169  * @param task_cls closure of task
1170  * @return unique task identifier for the job
1171  *         only valid until "task" is started!
1172  */
1173 GNUNET_SCHEDULER_TaskIdentifier
1174 GNUNET_SCHEDULER_add_select (struct GNUNET_SCHEDULER_Handle * sched,
1175                              enum GNUNET_SCHEDULER_Priority prio,
1176                              GNUNET_SCHEDULER_TaskIdentifier
1177                              prerequisite_task,
1178                              struct GNUNET_TIME_Relative delay,
1179                              const struct GNUNET_NETWORK_FDSet * rs,
1180                              const struct GNUNET_NETWORK_FDSet * ws,
1181                              GNUNET_SCHEDULER_Task task, void *task_cls)
1182 {
1183   struct Task *t;
1184 #if EXECINFO
1185   void *backtrace_array[MAX_TRACE_DEPTH];
1186 #endif
1187
1188   GNUNET_assert (NULL != task);
1189   t = GNUNET_malloc (sizeof (struct Task));
1190   t->callback = task;
1191   t->callback_cls = task_cls;
1192 #if EXECINFO
1193   t->num_backtrace_strings = backtrace(backtrace_array, MAX_TRACE_DEPTH);
1194   t->backtrace_strings = backtrace_symbols(backtrace_array, t->num_backtrace_strings);
1195 #endif
1196   if (rs != NULL)
1197     {
1198       t->read_set = GNUNET_NETWORK_fdset_create ();
1199       GNUNET_NETWORK_fdset_copy (t->read_set, rs);
1200     }
1201   if (ws != NULL)
1202     {
1203       t->write_set = GNUNET_NETWORK_fdset_create ();
1204       GNUNET_NETWORK_fdset_copy (t->write_set, ws);
1205     }
1206   t->id = ++sched->last_id;
1207 #if PROFILE_DELAYS
1208   t->start_time = GNUNET_TIME_absolute_get ();
1209 #endif
1210   t->prereq_id = prerequisite_task;
1211   t->timeout = GNUNET_TIME_relative_to_absolute (delay);
1212   t->priority =
1213     check_priority ((prio ==
1214                      GNUNET_SCHEDULER_PRIORITY_KEEP) ? sched->current_priority
1215                     : prio);
1216   t->next = sched->pending;
1217   sched->pending = t;
1218 #if DEBUG_TASKS
1219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220               "Adding task: %llu / %p\n", t->id, t->callback_cls);
1221 #endif
1222 #if EXECINFO
1223   int i;
1224
1225   for (i=0;i<t->num_backtrace_strings;i++)
1226       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227                   "Task %u trace %d: %s\n",
1228                   t->id,
1229                   i,
1230                   t->backtrace_strings[i]);
1231 #endif
1232   return t->id;
1233 }
1234
1235 /* end of scheduler.c */