2 This file is part of GNUnet
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file util/scheduler/scheduler.c
23 * @brief schedule computations using continuation passing style
24 * @author Christian Grothoff
27 #include "gnunet_common.h"
28 #include "gnunet_scheduler_lib.h"
29 #include "gnunet_signal_lib.h"
30 #include "gnunet_time_lib.h"
33 * Linked list of pending tasks.
38 * This is a linked list.
43 * Function to run when ready.
45 GNUNET_SCHEDULER_Task callback;
48 * Closure for the callback.
53 * Set of file descriptors this task is waiting
54 * for for reading. Once ready, this is updated
55 * to reflect the set of file descriptors ready
61 * Set of file descriptors this task is waiting
62 * for for writing. Once ready, this is updated
63 * to reflect the set of file descriptors ready
69 * Unique task identifier.
71 GNUNET_SCHEDULER_TaskIdentifier id;
74 * Identifier of a prerequisite task.
76 GNUNET_SCHEDULER_TaskIdentifier prereq_id;
79 * Absolute timeout value for the task, or
80 * GNUNET_TIME_UNIT_FOREVER_ABS for "no timeout".
82 struct GNUNET_TIME_Absolute timeout;
85 * Why is the task ready? Set after task is added to ready queue.
86 * Initially set to zero. All reasons that have already been
87 * satisfied (i.e. read or write ready) will be set over time.
89 enum GNUNET_SCHEDULER_Reason reason;
94 enum GNUNET_SCHEDULER_Priority priority;
97 * highest-numbered file descriptor in read_set or write_set plus one
102 * Should this task be run on shutdown?
110 * Handle for the scheduling service.
112 struct GNUNET_SCHEDULER_Handle
116 * List of tasks waiting for an event.
118 struct Task *pending;
121 * List of tasks ready to run right now,
122 * grouped by importance.
124 struct Task *ready[GNUNET_SCHEDULER_PRIORITY_COUNT];
127 * Identity of the last task queued. Incremented for each task to
128 * generate a unique task ID (it is virtually impossible to start
129 * more than 2^64 tasks during the lifetime of a process).
131 GNUNET_SCHEDULER_TaskIdentifier last_id;
134 * Highest number so that all tasks with smaller identifiers
135 * have already completed. Also the lowest number of a task
136 * still waiting to be executed.
138 GNUNET_SCHEDULER_TaskIdentifier lowest_pending_id;
141 * GNUNET_NO if we are running normally,
142 * GNUNET_YES if we are in shutdown mode.
147 * Number of tasks on the ready list.
149 unsigned int ready_count;
152 * Priority of the task running right now. Only
153 * valid while a task is running.
155 enum GNUNET_SCHEDULER_Priority current_priority;
161 * Check that the given priority is legal (and return it).
163 static enum GNUNET_SCHEDULER_Priority
164 check_priority (enum GNUNET_SCHEDULER_Priority p)
166 if ((p >= 0) && (p < GNUNET_SCHEDULER_PRIORITY_COUNT))
169 return 0; /* make compiler happy */
174 * Update the timeout value so that it is smaller than min.
177 update_timeout (struct timeval *tv, struct GNUNET_TIME_Relative min)
179 if (((tv->tv_sec * 1000) + (tv->tv_usec / 1000)) > min.value)
181 tv->tv_sec = min.value / 1000;
182 tv->tv_usec = (min.value - tv->tv_sec * 1000) * 1000;
188 * Set the given file descriptor bit in the given set and update max
189 * to the maximum of the existing max and fd+1.
192 set_fd (int fd, int *max, fd_set * set)
201 * Is a task with this identifier still pending? Also updates
202 * "lowest_pending_id" as a side-effect (for faster checks in the
203 * future), but only if the return value is "GNUNET_NO" (and
204 * the "lowest_pending_id" check failed).
206 * @return GNUNET_YES if so, GNUNET_NO if not
209 is_pending (struct GNUNET_SCHEDULER_Handle *sched,
210 GNUNET_SCHEDULER_TaskIdentifier id)
213 enum GNUNET_SCHEDULER_Priority p;
214 GNUNET_SCHEDULER_TaskIdentifier min;
216 if (id < sched->lowest_pending_id)
218 min = -1; /* maximum value */
219 pos = sched->pending;
228 for (p = 0; p < GNUNET_SCHEDULER_PRIORITY_COUNT; p++)
230 pos = sched->ready[p];
240 sched->lowest_pending_id = min;
246 * Update all sets and timeout for select.
249 update_sets (struct GNUNET_SCHEDULER_Handle *sched,
250 int *max, fd_set * rs, fd_set * ws, struct timeval *tv)
255 pos = sched->pending;
258 if ((pos->prereq_id != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) &&
259 (GNUNET_YES == is_pending (sched, pos->prereq_id)))
265 if (pos->timeout.value != GNUNET_TIME_UNIT_FOREVER_ABS.value)
267 GNUNET_TIME_absolute_get_remaining (pos->timeout));
268 for (i = 0; i < pos->nfds; i++)
270 if (FD_ISSET (i, &pos->read_set))
272 if (FD_ISSET (i, &pos->write_set))
281 * Check if the ready set overlaps with the set we want to have ready.
282 * If so, update the want set (set all FDs that are ready). If not,
285 * @param maxfd highest FD that needs to be checked.
286 * @return GNUNET_YES if there was some overlap
289 set_overlaps (const fd_set * ready, fd_set * want, int maxfd)
293 for (i = 0; i < maxfd; i++)
294 if (FD_ISSET (i, want) && FD_ISSET (i, ready))
296 /* copy all over (yes, there maybe unrelated bits,
297 but this should not hurt well-written clients) */
298 memcpy (want, ready, sizeof (fd_set));
306 * Check if the given task is eligible to run now.
307 * Also set the reason why it is eligible.
309 * @return GNUNET_YES if we can run it, GNUNET_NO if not.
312 is_ready (struct GNUNET_SCHEDULER_Handle *sched,
314 struct GNUNET_TIME_Absolute now,
315 const fd_set * rs, const fd_set * ws)
317 if ((GNUNET_NO == task->run_on_shutdown) && (GNUNET_YES == sched->shutdown))
319 if ((GNUNET_YES == task->run_on_shutdown) &&
320 (GNUNET_YES == sched->shutdown))
321 task->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
322 if (now.value >= task->timeout.value)
323 task->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
324 if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
325 (rs != NULL) && (set_overlaps (rs, &task->read_set, task->nfds)))
326 task->reason |= GNUNET_SCHEDULER_REASON_READ_READY;
327 if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
328 (ws != NULL) && (set_overlaps (ws, &task->write_set, task->nfds)))
329 task->reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
330 if (task->reason == 0)
331 return GNUNET_NO; /* not ready */
332 if (task->prereq_id != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
334 if (GNUNET_YES == is_pending (sched, task->prereq_id))
335 return GNUNET_NO; /* prereq waiting */
336 task->reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
343 * Put a task that is ready for execution into the ready queue.
346 queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle, struct Task *task)
348 task->next = handle->ready[check_priority (task->priority)];
349 handle->ready[check_priority (task->priority)] = task;
350 handle->ready_count++;
355 * Check which tasks are ready and move them
356 * to the respective ready queue.
359 check_ready (struct GNUNET_SCHEDULER_Handle *handle,
360 const fd_set * rs, const fd_set * ws)
365 struct GNUNET_TIME_Absolute now;
367 now = GNUNET_TIME_absolute_get ();
369 pos = handle->pending;
373 if (GNUNET_YES == is_ready (handle, pos, now, rs, ws))
376 handle->pending = next;
379 queue_ready_task (handle, pos);
390 * Run at least one task in the highest-priority queue that is not
391 * empty. Keep running tasks until we are either no longer running
392 * "URGENT" tasks or until we have at least one "pending" task (which
393 * may become ready, hence we should select on it). Naturally, if
394 * there are no more ready tasks, we also return.
397 run_ready (struct GNUNET_SCHEDULER_Handle *sched)
399 enum GNUNET_SCHEDULER_Priority p;
401 struct GNUNET_SCHEDULER_TaskContext tc;
405 if (sched->ready_count == 0)
407 GNUNET_assert (sched->ready[GNUNET_SCHEDULER_PRIORITY_KEEP] == NULL);
408 /* yes, p>0 is correct, 0 is "KEEP" which should
409 always be an empty queue (see assertion)! */
410 for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--)
412 pos = sched->ready[p];
416 GNUNET_assert (pos != NULL); /* ready_count wrong? */
417 sched->ready[p] = pos->next;
418 sched->ready_count--;
419 sched->current_priority = p;
420 GNUNET_assert (pos->priority == p);
422 tc.reason = pos->reason;
423 tc.read_ready = &pos->read_set;
424 tc.write_ready = &pos->write_set;
425 pos->callback (pos->callback_cls, &tc);
428 while ((sched->pending == NULL) || (p == GNUNET_SCHEDULER_PRIORITY_URGENT));
433 * Have we (ever) received a SIGINT/TERM/QUIT/HUP?
435 static volatile int sig_shutdown;
439 * Signal handler called for signals that should cause us to shutdown.
442 sighandler_shutdown ()
449 * Initialize a scheduler using this thread. This function will
450 * return when either a shutdown was initiated (via signal) and all
451 * tasks marked to "run_on_shutdown" have been completed or when all
452 * tasks in general have been completed.
454 * @param task task to run immediately
455 * @param cls closure of task
458 GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_Task task, void *cls)
460 struct GNUNET_SCHEDULER_Handle sched;
466 struct GNUNET_SIGNAL_Context *shc_int;
467 struct GNUNET_SIGNAL_Context *shc_term;
468 struct GNUNET_SIGNAL_Context *shc_quit;
469 struct GNUNET_SIGNAL_Context *shc_hup;
474 shc_int = GNUNET_SIGNAL_handler_install (SIGINT, &sighandler_shutdown);
475 shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, &sighandler_shutdown);
476 shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, &sighandler_shutdown);
477 shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, &sighandler_shutdown);
479 memset (&sched, 0, sizeof (sched));
480 sched.current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT;
481 GNUNET_SCHEDULER_add_continuation (&sched,
484 cls, GNUNET_SCHEDULER_REASON_STARTUP);
485 while ((GNUNET_NO == sched.shutdown) &&
487 ((sched.pending != NULL) || (sched.ready_count > 0)))
492 tv.tv_sec = 0x7FFFFFFF;
494 if (sched.ready_count > 0)
496 /* no blocking, more work already ready! */
500 update_sets (&sched, &max, &rs, &ws, &tv);
501 ret = SELECT (max, &rs, &ws, NULL, &tv);
506 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "select");
509 check_ready (&sched, &rs, &ws);
513 sched.shutdown = GNUNET_YES;
514 GNUNET_SIGNAL_handler_uninstall (shc_int);
515 GNUNET_SIGNAL_handler_uninstall (shc_term);
516 GNUNET_SIGNAL_handler_uninstall (shc_quit);
517 GNUNET_SIGNAL_handler_uninstall (shc_hup);
521 check_ready (&sched, NULL, NULL);
523 while (sched.ready_count > 0);
524 while (NULL != (tpos = sched.pending))
526 sched.pending = tpos->next;
533 * Request the shutdown of a scheduler. This function can be used to
534 * stop a scheduling thread when created with the
535 * "GNUNET_SCHEDULER_init_thread" function or from within the signal
536 * handler for signals causing shutdowns.
539 GNUNET_SCHEDULER_shutdown (struct GNUNET_SCHEDULER_Handle *sched)
541 sched->shutdown = GNUNET_YES;
546 * Get information about the current load of this scheduler. Use this
547 * function to determine if an elective task should be added or simply
548 * dropped (if the decision should be made based on the number of
549 * tasks ready to run).
551 * @param sched scheduler to query
552 * @return number of tasks pending right now
555 GNUNET_SCHEDULER_get_load (struct GNUNET_SCHEDULER_Handle *sched,
556 enum GNUNET_SCHEDULER_Priority p)
561 if (p == GNUNET_SCHEDULER_PRIORITY_COUNT)
562 return sched->ready_count;
563 if (p == GNUNET_SCHEDULER_PRIORITY_KEEP)
564 p = sched->current_priority;
566 pos = sched->ready[p];
577 * Cancel the task with the specified identifier.
578 * The task must not yet have run.
580 * @param sched scheduler to use
581 * @param task id of the task to cancel
584 GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Handle *sched,
585 GNUNET_SCHEDULER_TaskIdentifier task)
589 enum GNUNET_SCHEDULER_Priority p;
605 GNUNET_assert (p < GNUNET_SCHEDULER_PRIORITY_COUNT);
612 sched->ready_count--;
622 sched->pending = t->next;
624 sched->ready[p] = t->next;
627 prev->next = t->next;
628 ret = t->callback_cls;
635 * Continue the current execution with the given function. This is
636 * similar to the other "add" functions except that there is no delay
637 * and the reason code can be specified.
639 * @param sched scheduler to use
640 * @param main main function of the task
641 * @param cls closure of task
642 * @param reason reason for task invocation
645 GNUNET_SCHEDULER_add_continuation (struct GNUNET_SCHEDULER_Handle *sched,
647 GNUNET_SCHEDULER_Task main,
649 enum GNUNET_SCHEDULER_Reason reason)
653 task = GNUNET_malloc (sizeof (struct Task));
654 task->callback = main;
655 task->callback_cls = cls;
656 task->id = ++sched->last_id;
657 task->reason = reason;
658 task->priority = sched->current_priority;
659 task->run_on_shutdown = run_on_shutdown;
660 queue_ready_task (sched, task);
665 * Schedule a new task to be run after the specified
666 * prerequisite task has completed.
668 * @param sched scheduler to use
669 * @param run_on_shutdown run on shutdown?
670 * @param prio how important is this task?
671 * @param prerequisite_task run this task after the task with the given
672 * task identifier completes (and any of our other
673 * conditions, such as delay, read or write-readyness
674 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
675 * on completion of other tasks.
676 * @param main main function of the task
677 * @param cls closure of task
678 * @return unique task identifier for the job
679 * only valid until "main" is started!
681 GNUNET_SCHEDULER_TaskIdentifier
682 GNUNET_SCHEDULER_add_after (struct GNUNET_SCHEDULER_Handle *sched,
684 enum GNUNET_SCHEDULER_Priority prio,
685 GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
686 GNUNET_SCHEDULER_Task main, void *cls)
688 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
690 GNUNET_TIME_UNIT_ZERO,
691 0, NULL, NULL, main, cls);
696 * Schedule a new task to be run with a specified delay. The task
697 * will be scheduled for execution once the delay has expired and the
698 * prerequisite task has completed.
700 * @param sched scheduler to use
701 * @param run_on_shutdown run on shutdown? You can use this
702 * argument to run a function only during shutdown
703 * by setting delay to -1. Set this
704 * argument to GNUNET_NO to skip this task if
705 * the user requested process termination.
706 * @param prio how important is this task?
707 * @param prerequisite_task run this task after the task with the given
708 * task identifier completes (and any of our other
709 * conditions, such as delay, read or write-readyness
710 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
711 * on completion of other tasks.
712 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
713 * @param main main function of the task
714 * @param cls closure of task
715 * @return unique task identifier for the job
716 * only valid until "main" is started!
718 GNUNET_SCHEDULER_TaskIdentifier
719 GNUNET_SCHEDULER_add_delayed (struct GNUNET_SCHEDULER_Handle * sched,
721 enum GNUNET_SCHEDULER_Priority prio,
722 GNUNET_SCHEDULER_TaskIdentifier
724 struct GNUNET_TIME_Relative delay,
725 GNUNET_SCHEDULER_Task main, void *cls)
727 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
728 prerequisite_task, delay,
729 0, NULL, NULL, main, cls);
734 * Schedule a new task to be run with a specified delay or when the
735 * specified file descriptor is ready for reading. The delay can be
736 * used as a timeout on the socket being ready. The task will be
737 * scheduled for execution once either the delay has expired or the
738 * socket operation is ready.
740 * @param sched scheduler to use
741 * @param run_on_shutdown run on shutdown? Set this
742 * argument to GNUNET_NO to skip this task if
743 * the user requested process termination.
744 * @param prio how important is this task?
745 * @param prerequisite_task run this task after the task with the given
746 * task identifier completes (and any of our other
747 * conditions, such as delay, read or write-readyness
748 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
749 * on completion of other tasks.
750 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
751 * @param rfd read file-descriptor
752 * @param main main function of the task
753 * @param cls closure of task
754 * @return unique task identifier for the job
755 * only valid until "main" is started!
757 GNUNET_SCHEDULER_TaskIdentifier
758 GNUNET_SCHEDULER_add_read (struct GNUNET_SCHEDULER_Handle * sched,
760 enum GNUNET_SCHEDULER_Priority prio,
761 GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
762 struct GNUNET_TIME_Relative delay,
763 int rfd, GNUNET_SCHEDULER_Task main, void *cls)
767 GNUNET_assert (rfd >= 0);
770 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
771 prerequisite_task, delay,
772 rfd + 1, &rs, NULL, main, cls);
777 * Schedule a new task to be run with a specified delay or when the
778 * specified file descriptor is ready for writing. The delay can be
779 * used as a timeout on the socket being ready. The task will be
780 * scheduled for execution once either the delay has expired or the
781 * socket operation is ready.
783 * @param sched scheduler to use
784 * @param run_on_shutdown run on shutdown? Set this
785 * argument to GNUNET_NO to skip this task if
786 * the user requested process termination.
787 * @param prio how important is this task?
788 * @param prerequisite_task run this task after the task with the given
789 * task identifier completes (and any of our other
790 * conditions, such as delay, read or write-readyness
791 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
792 * on completion of other tasks.
793 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
794 * @param wfd write file-descriptor
795 * @param main main function of the task
796 * @param cls closure of task
797 * @return unique task identifier for the job
798 * only valid until "main" is started!
800 GNUNET_SCHEDULER_TaskIdentifier
801 GNUNET_SCHEDULER_add_write (struct GNUNET_SCHEDULER_Handle * sched,
803 enum GNUNET_SCHEDULER_Priority prio,
804 GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
805 struct GNUNET_TIME_Relative delay,
806 int wfd, GNUNET_SCHEDULER_Task main, void *cls)
810 GNUNET_assert (wfd >= 0);
813 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
814 prerequisite_task, delay,
815 wfd + 1, NULL, &ws, main, cls);
820 * Schedule a new task to be run with a specified delay or when any of
821 * the specified file descriptor sets is ready. The delay can be used
822 * as a timeout on the socket(s) being ready. The task will be
823 * scheduled for execution once either the delay has expired or any of
824 * the socket operations is ready. This is the most general
825 * function of the "add" family. Note that the "prerequisite_task"
826 * must be satisfied in addition to any of the other conditions. In
827 * other words, the task will be started when
833 * || (shutdown-active && run-on-shutdown) )
836 * @param sched scheduler to use
837 * @param run_on_shutdown run on shutdown? Set this
838 * argument to GNUNET_NO to skip this task if
839 * the user requested process termination.
840 * @param prio how important is this task?
841 * @param prerequisite_task run this task after the task with the given
842 * task identifier completes (and any of our other
843 * conditions, such as delay, read or write-readyness
844 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
845 * on completion of other tasks.
846 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
847 * @param nfds highest-numbered file descriptor in any of the two sets plus one
848 * @param rs set of file descriptors we want to read (can be NULL)
849 * @param ws set of file descriptors we want to write (can be NULL)
850 * @param main main function of the task
851 * @param cls closure of task
852 * @return unique task identifier for the job
853 * only valid until "main" is started!
855 GNUNET_SCHEDULER_TaskIdentifier
856 GNUNET_SCHEDULER_add_select (struct GNUNET_SCHEDULER_Handle * sched,
858 enum GNUNET_SCHEDULER_Priority prio,
859 GNUNET_SCHEDULER_TaskIdentifier
861 struct GNUNET_TIME_Relative delay,
862 int nfds, const fd_set * rs, const fd_set * ws,
863 GNUNET_SCHEDULER_Task main, void *cls)
867 task = GNUNET_malloc (sizeof (struct Task));
868 task->callback = main;
869 task->callback_cls = cls;
870 if ((rs != NULL) && (nfds > 0))
871 memcpy (&task->read_set, rs, sizeof (fd_set));
872 if ((ws != NULL) && (nfds > 0))
873 memcpy (&task->write_set, ws, sizeof (fd_set));
874 task->id = ++sched->last_id;
875 task->prereq_id = prerequisite_task;
876 task->timeout = GNUNET_TIME_relative_to_absolute (delay);
878 check_priority ((prio ==
879 GNUNET_SCHEDULER_PRIORITY_KEEP) ? sched->current_priority
882 task->run_on_shutdown = run_on_shutdown;
883 task->next = sched->pending;
884 sched->pending = task;
888 /* end of scheduler.c */