2 This file is part of GNUnet
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file util/scheduler/scheduler.c
23 * @brief schedule computations using continuation passing style
24 * @author Christian Grothoff
27 #include "gnunet_common.h"
28 #include "gnunet_scheduler_lib.h"
29 #include "gnunet_signal_lib.h"
30 #include "gnunet_time_lib.h"
33 * Linked list of pending tasks.
38 * This is a linked list.
43 * Function to run when ready.
45 GNUNET_SCHEDULER_Task callback;
48 * Closure for the callback.
53 * Set of file descriptors this task is waiting
54 * for for reading. Once ready, this is updated
55 * to reflect the set of file descriptors ready
61 * Set of file descriptors this task is waiting
62 * for for writing. Once ready, this is updated
63 * to reflect the set of file descriptors ready
69 * Unique task identifier.
71 GNUNET_SCHEDULER_TaskIdentifier id;
74 * Identifier of a prerequisite task.
76 GNUNET_SCHEDULER_TaskIdentifier prereq_id;
79 * Absolute timeout value for the task, or
80 * GNUNET_TIME_UNIT_FOREVER_ABS for "no timeout".
82 struct GNUNET_TIME_Absolute timeout;
85 * Why is the task ready? Set after task is added to ready queue.
86 * Initially set to zero. All reasons that have already been
87 * satisfied (i.e. read or write ready) will be set over time.
89 enum GNUNET_SCHEDULER_Reason reason;
94 enum GNUNET_SCHEDULER_Priority priority;
97 * highest-numbered file descriptor in read_set or write_set plus one
102 * Should this task be run on shutdown?
110 * Handle for the scheduling service.
112 struct GNUNET_SCHEDULER_Handle
116 * List of tasks waiting for an event.
118 struct Task *pending;
121 * List of tasks ready to run right now,
122 * grouped by importance.
124 struct Task *ready[GNUNET_SCHEDULER_PRIORITY_COUNT];
127 * Identity of the last task queued. Incremented for each task to
128 * generate a unique task ID (it is virtually impossible to start
129 * more than 2^64 tasks during the lifetime of a process).
131 GNUNET_SCHEDULER_TaskIdentifier last_id;
134 * Highest number so that all tasks with smaller identifiers
135 * have already completed. Also the lowest number of a task
136 * still waiting to be executed.
138 GNUNET_SCHEDULER_TaskIdentifier lowest_pending_id;
141 * GNUNET_NO if we are running normally,
142 * GNUNET_YES if we are in shutdown mode.
147 * Number of tasks on the ready list.
149 unsigned int ready_count;
152 * Priority of the task running right now. Only
153 * valid while a task is running.
155 enum GNUNET_SCHEDULER_Priority current_priority;
161 * Check that the given priority is legal (and return it).
163 static enum GNUNET_SCHEDULER_Priority
164 check_priority (enum GNUNET_SCHEDULER_Priority p)
166 if ((p >= 0) && (p < GNUNET_SCHEDULER_PRIORITY_COUNT))
169 return 0; /* make compiler happy */
174 * Update the timeout value so that it is smaller than min.
177 update_timeout (struct timeval *tv, struct GNUNET_TIME_Relative min)
179 if (((tv->tv_sec * 1000) + (tv->tv_usec / 1000)) > min.value)
181 tv->tv_sec = min.value / 1000;
182 tv->tv_usec = (min.value - tv->tv_sec * 1000) * 1000;
188 * Set the given file descriptor bit in the given set and update max
189 * to the maximum of the existing max and fd+1.
192 set_fd (int fd, int *max, fd_set * set)
201 * Is a task with this identifier still pending? Also updates
202 * "lowest_pending_id" as a side-effect (for faster checks in the
203 * future), but only if the return value is "GNUNET_NO" (and
204 * the "lowest_pending_id" check failed).
206 * @return GNUNET_YES if so, GNUNET_NO if not
209 is_pending (struct GNUNET_SCHEDULER_Handle *sched,
210 GNUNET_SCHEDULER_TaskIdentifier id)
213 enum GNUNET_SCHEDULER_Priority p;
214 GNUNET_SCHEDULER_TaskIdentifier min;
216 if (id < sched->lowest_pending_id)
218 min = -1; /* maximum value */
219 pos = sched->pending;
228 for (p = 0; p < GNUNET_SCHEDULER_PRIORITY_COUNT; p++)
230 pos = sched->ready[p];
240 sched->lowest_pending_id = min;
246 * Update all sets and timeout for select.
249 update_sets (struct GNUNET_SCHEDULER_Handle *sched,
250 int *max, fd_set * rs, fd_set * ws, struct timeval *tv)
255 pos = sched->pending;
258 if ((pos->prereq_id != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK) &&
259 (GNUNET_YES == is_pending (sched, pos->prereq_id)))
265 if (pos->timeout.value != GNUNET_TIME_UNIT_FOREVER_ABS.value)
267 GNUNET_TIME_absolute_get_remaining (pos->timeout));
268 for (i = 0; i < pos->nfds; i++)
270 if (FD_ISSET (i, &pos->read_set))
272 if (FD_ISSET (i, &pos->write_set))
281 * Check if the ready set overlaps with the set we want to have ready.
282 * If so, update the want set (set all FDs that are ready). If not,
285 * @param maxfd highest FD that needs to be checked.
286 * @return GNUNET_YES if there was some overlap
289 set_overlaps (const fd_set * ready, fd_set * want, int maxfd)
293 for (i = 0; i < maxfd; i++)
294 if (FD_ISSET (i, want) && FD_ISSET (i, ready))
296 /* copy all over (yes, there maybe unrelated bits,
297 but this should not hurt well-written clients) */
298 memcpy (want, ready, sizeof (fd_set));
306 * Check if the given task is eligible to run now.
307 * Also set the reason why it is eligible.
309 * @return GNUNET_YES if we can run it, GNUNET_NO if not.
312 is_ready (struct GNUNET_SCHEDULER_Handle *sched,
314 struct GNUNET_TIME_Absolute now,
315 const fd_set * rs, const fd_set * ws)
317 if ((GNUNET_NO == task->run_on_shutdown) && (GNUNET_YES == sched->shutdown))
319 if ((GNUNET_YES == task->run_on_shutdown) &&
320 (GNUNET_YES == sched->shutdown))
321 task->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
322 if (now.value >= task->timeout.value)
323 task->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
324 if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
325 (rs != NULL) && (set_overlaps (rs, &task->read_set, task->nfds)))
326 task->reason |= GNUNET_SCHEDULER_REASON_READ_READY;
327 if ((0 == (task->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
328 (ws != NULL) && (set_overlaps (ws, &task->write_set, task->nfds)))
329 task->reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
330 if (task->reason == 0)
331 return GNUNET_NO; /* not ready */
332 if (task->prereq_id != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
334 if (GNUNET_YES == is_pending (sched, task->prereq_id))
335 return GNUNET_NO; /* prereq waiting */
336 task->reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
343 * Put a task that is ready for execution into the ready queue.
346 queue_ready_task (struct GNUNET_SCHEDULER_Handle *handle, struct Task *task)
348 task->next = handle->ready[check_priority (task->priority)];
349 handle->ready[check_priority (task->priority)] = task;
350 handle->ready_count++;
355 * Check which tasks are ready and move them
356 * to the respective ready queue.
359 check_ready (struct GNUNET_SCHEDULER_Handle *handle,
360 const fd_set * rs, const fd_set * ws)
365 struct GNUNET_TIME_Absolute now;
367 now = GNUNET_TIME_absolute_get ();
369 pos = handle->pending;
373 if (GNUNET_YES == is_ready (handle, pos, now, rs, ws))
376 handle->pending = next;
379 queue_ready_task (handle, pos);
390 * Run at least one task in the highest-priority queue that is not
391 * empty. Keep running tasks until we are either no longer running
392 * "URGENT" tasks or until we have at least one "pending" task (which
393 * may become ready, hence we should select on it). Naturally, if
394 * there are no more ready tasks, we also return.
397 run_ready (struct GNUNET_SCHEDULER_Handle *sched)
399 enum GNUNET_SCHEDULER_Priority p;
401 struct GNUNET_SCHEDULER_TaskContext tc;
405 if (sched->ready_count == 0)
407 GNUNET_assert (sched->ready[GNUNET_SCHEDULER_PRIORITY_KEEP] == NULL);
408 /* yes, p>0 is correct, 0 is "KEEP" which should
409 always be an empty queue (see assertion)! */
410 for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--)
412 pos = sched->ready[p];
416 GNUNET_assert (pos != NULL); /* ready_count wrong? */
417 sched->ready[p] = pos->next;
418 sched->ready_count--;
419 sched->current_priority = p;
420 GNUNET_assert (pos->priority == p);
422 tc.reason = pos->reason;
423 tc.read_ready = &pos->read_set;
424 tc.write_ready = &pos->write_set;
425 pos->callback (pos->callback_cls, &tc);
428 while ((sched->pending == NULL) || (p == GNUNET_SCHEDULER_PRIORITY_URGENT));
433 * Have we (ever) received a SIGINT/TERM/QUIT/HUP?
435 static volatile int sig_shutdown;
439 * Signal handler called for signals that should cause us to shutdown.
442 sighandler_shutdown ()
449 * Initialize a scheduler using this thread. This function will
450 * return when either a shutdown was initiated (via signal) and all
451 * tasks marked to "run_on_shutdown" have been completed or when all
452 * tasks in general have been completed.
454 * @param task task to run immediately
455 * @param cls closure of task
458 GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_Task task, void *cls)
460 struct GNUNET_SCHEDULER_Handle sched;
466 struct GNUNET_SIGNAL_Context *shc_int;
467 struct GNUNET_SIGNAL_Context *shc_term;
468 struct GNUNET_SIGNAL_Context *shc_quit;
469 struct GNUNET_SIGNAL_Context *shc_hup;
473 shc_int = GNUNET_SIGNAL_handler_install (SIGINT, &sighandler_shutdown);
474 shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, &sighandler_shutdown);
475 shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, &sighandler_shutdown);
476 shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, &sighandler_shutdown);
477 memset (&sched, 0, sizeof (sched));
478 sched.current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT;
479 GNUNET_SCHEDULER_add_continuation (&sched,
482 cls, GNUNET_SCHEDULER_REASON_STARTUP);
483 while ((GNUNET_NO == sched.shutdown) &&
485 ((sched.pending != NULL) || (sched.ready_count > 0)))
490 tv.tv_sec = 0x7FFFFFFF;
492 if (sched.ready_count > 0)
494 /* no blocking, more work already ready! */
498 update_sets (&sched, &max, &rs, &ws, &tv);
499 ret = SELECT (max, &rs, &ws, NULL, &tv);
504 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "select");
507 check_ready (&sched, &rs, &ws);
511 sched.shutdown = GNUNET_YES;
512 GNUNET_SIGNAL_handler_uninstall (shc_int);
513 GNUNET_SIGNAL_handler_uninstall (shc_term);
514 GNUNET_SIGNAL_handler_uninstall (shc_quit);
515 GNUNET_SIGNAL_handler_uninstall (shc_hup);
519 check_ready (&sched, NULL, NULL);
521 while (sched.ready_count > 0);
522 while (NULL != (tpos = sched.pending))
524 sched.pending = tpos->next;
531 * Request the shutdown of a scheduler. This function can be used to
532 * stop a scheduling thread when created with the
533 * "GNUNET_SCHEDULER_init_thread" function or from within the signal
534 * handler for signals causing shutdowns.
537 GNUNET_SCHEDULER_shutdown (struct GNUNET_SCHEDULER_Handle *sched)
539 sched->shutdown = GNUNET_YES;
544 * Get information about the current load of this scheduler. Use this
545 * function to determine if an elective task should be added or simply
546 * dropped (if the decision should be made based on the number of
547 * tasks ready to run).
549 * @param sched scheduler to query
550 * @return number of tasks pending right now
553 GNUNET_SCHEDULER_get_load (struct GNUNET_SCHEDULER_Handle *sched,
554 enum GNUNET_SCHEDULER_Priority p)
559 if (p == GNUNET_SCHEDULER_PRIORITY_COUNT)
560 return sched->ready_count;
561 if (p == GNUNET_SCHEDULER_PRIORITY_KEEP)
562 p = sched->current_priority;
564 pos = sched->ready[p];
575 * Cancel the task with the specified identifier.
576 * The task must not yet have run.
578 * @param sched scheduler to use
579 * @param task id of the task to cancel
582 GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Handle *sched,
583 GNUNET_SCHEDULER_TaskIdentifier task)
587 enum GNUNET_SCHEDULER_Priority p;
603 GNUNET_assert (p < GNUNET_SCHEDULER_PRIORITY_COUNT);
610 sched->ready_count--;
620 sched->pending = t->next;
622 sched->ready[p] = t->next;
625 prev->next = t->next;
626 ret = t->callback_cls;
633 * Continue the current execution with the given function. This is
634 * similar to the other "add" functions except that there is no delay
635 * and the reason code can be specified.
637 * @param sched scheduler to use
638 * @param main main function of the task
639 * @param cls closure of task
640 * @param reason reason for task invocation
643 GNUNET_SCHEDULER_add_continuation (struct GNUNET_SCHEDULER_Handle *sched,
645 GNUNET_SCHEDULER_Task main,
647 enum GNUNET_SCHEDULER_Reason reason)
651 task = GNUNET_malloc (sizeof (struct Task));
652 task->callback = main;
653 task->callback_cls = cls;
654 task->id = ++sched->last_id;
655 task->reason = reason;
656 task->priority = sched->current_priority;
657 task->run_on_shutdown = run_on_shutdown;
658 queue_ready_task (sched, task);
663 * Schedule a new task to be run after the specified
664 * prerequisite task has completed.
666 * @param sched scheduler to use
667 * @param run_on_shutdown run on shutdown?
668 * @param prio how important is this task?
669 * @param prerequisite_task run this task after the task with the given
670 * task identifier completes (and any of our other
671 * conditions, such as delay, read or write-readyness
672 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
673 * on completion of other tasks.
674 * @param main main function of the task
675 * @param cls closure of task
676 * @return unique task identifier for the job
677 * only valid until "main" is started!
679 GNUNET_SCHEDULER_TaskIdentifier
680 GNUNET_SCHEDULER_add_after (struct GNUNET_SCHEDULER_Handle *sched,
682 enum GNUNET_SCHEDULER_Priority prio,
683 GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
684 GNUNET_SCHEDULER_Task main, void *cls)
686 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
688 GNUNET_TIME_UNIT_ZERO,
689 0, NULL, NULL, main, cls);
694 * Schedule a new task to be run with a specified delay. The task
695 * will be scheduled for execution once the delay has expired and the
696 * prerequisite task has completed.
698 * @param sched scheduler to use
699 * @param run_on_shutdown run on shutdown? You can use this
700 * argument to run a function only during shutdown
701 * by setting delay to -1. Set this
702 * argument to GNUNET_NO to skip this task if
703 * the user requested process termination.
704 * @param prio how important is this task?
705 * @param prerequisite_task run this task after the task with the given
706 * task identifier completes (and any of our other
707 * conditions, such as delay, read or write-readyness
708 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
709 * on completion of other tasks.
710 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
711 * @param main main function of the task
712 * @param cls closure of task
713 * @return unique task identifier for the job
714 * only valid until "main" is started!
716 GNUNET_SCHEDULER_TaskIdentifier
717 GNUNET_SCHEDULER_add_delayed (struct GNUNET_SCHEDULER_Handle * sched,
719 enum GNUNET_SCHEDULER_Priority prio,
720 GNUNET_SCHEDULER_TaskIdentifier
722 struct GNUNET_TIME_Relative delay,
723 GNUNET_SCHEDULER_Task main, void *cls)
725 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
726 prerequisite_task, delay,
727 0, NULL, NULL, main, cls);
732 * Schedule a new task to be run with a specified delay or when the
733 * specified file descriptor is ready for reading. The delay can be
734 * used as a timeout on the socket being ready. The task will be
735 * scheduled for execution once either the delay has expired or the
736 * socket operation is ready.
738 * @param sched scheduler to use
739 * @param run_on_shutdown run on shutdown? Set this
740 * argument to GNUNET_NO to skip this task if
741 * the user requested process termination.
742 * @param prio how important is this task?
743 * @param prerequisite_task run this task after the task with the given
744 * task identifier completes (and any of our other
745 * conditions, such as delay, read or write-readyness
746 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
747 * on completion of other tasks.
748 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
749 * @param rfd read file-descriptor
750 * @param main main function of the task
751 * @param cls closure of task
752 * @return unique task identifier for the job
753 * only valid until "main" is started!
755 GNUNET_SCHEDULER_TaskIdentifier
756 GNUNET_SCHEDULER_add_read (struct GNUNET_SCHEDULER_Handle * sched,
758 enum GNUNET_SCHEDULER_Priority prio,
759 GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
760 struct GNUNET_TIME_Relative delay,
761 int rfd, GNUNET_SCHEDULER_Task main, void *cls)
765 GNUNET_assert (rfd >= 0);
768 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
769 prerequisite_task, delay,
770 rfd + 1, &rs, NULL, main, cls);
775 * Schedule a new task to be run with a specified delay or when the
776 * specified file descriptor is ready for writing. The delay can be
777 * used as a timeout on the socket being ready. The task will be
778 * scheduled for execution once either the delay has expired or the
779 * socket operation is ready.
781 * @param sched scheduler to use
782 * @param run_on_shutdown run on shutdown? Set this
783 * argument to GNUNET_NO to skip this task if
784 * the user requested process termination.
785 * @param prio how important is this task?
786 * @param prerequisite_task run this task after the task with the given
787 * task identifier completes (and any of our other
788 * conditions, such as delay, read or write-readyness
789 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
790 * on completion of other tasks.
791 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
792 * @param wfd write file-descriptor
793 * @param main main function of the task
794 * @param cls closure of task
795 * @return unique task identifier for the job
796 * only valid until "main" is started!
798 GNUNET_SCHEDULER_TaskIdentifier
799 GNUNET_SCHEDULER_add_write (struct GNUNET_SCHEDULER_Handle * sched,
801 enum GNUNET_SCHEDULER_Priority prio,
802 GNUNET_SCHEDULER_TaskIdentifier prerequisite_task,
803 struct GNUNET_TIME_Relative delay,
804 int wfd, GNUNET_SCHEDULER_Task main, void *cls)
808 GNUNET_assert (wfd >= 0);
811 return GNUNET_SCHEDULER_add_select (sched, run_on_shutdown, prio,
812 prerequisite_task, delay,
813 wfd + 1, NULL, &ws, main, cls);
818 * Schedule a new task to be run with a specified delay or when any of
819 * the specified file descriptor sets is ready. The delay can be used
820 * as a timeout on the socket(s) being ready. The task will be
821 * scheduled for execution once either the delay has expired or any of
822 * the socket operations is ready. This is the most general
823 * function of the "add" family. Note that the "prerequisite_task"
824 * must be satisfied in addition to any of the other conditions. In
825 * other words, the task will be started when
831 * || (shutdown-active && run-on-shutdown) )
834 * @param sched scheduler to use
835 * @param run_on_shutdown run on shutdown? Set this
836 * argument to GNUNET_NO to skip this task if
837 * the user requested process termination.
838 * @param prio how important is this task?
839 * @param prerequisite_task run this task after the task with the given
840 * task identifier completes (and any of our other
841 * conditions, such as delay, read or write-readyness
842 * are satisfied). Use GNUNET_SCHEDULER_NO_PREREQUISITE_TASK to not have any dependency
843 * on completion of other tasks.
844 * @param delay how long should we wait? Use GNUNET_TIME_UNIT_FOREVER_REL for "forever"
845 * @param nfds highest-numbered file descriptor in any of the two sets plus one
846 * @param rs set of file descriptors we want to read (can be NULL)
847 * @param ws set of file descriptors we want to write (can be NULL)
848 * @param main main function of the task
849 * @param cls closure of task
850 * @return unique task identifier for the job
851 * only valid until "main" is started!
853 GNUNET_SCHEDULER_TaskIdentifier
854 GNUNET_SCHEDULER_add_select (struct GNUNET_SCHEDULER_Handle * sched,
856 enum GNUNET_SCHEDULER_Priority prio,
857 GNUNET_SCHEDULER_TaskIdentifier
859 struct GNUNET_TIME_Relative delay,
860 int nfds, const fd_set * rs, const fd_set * ws,
861 GNUNET_SCHEDULER_Task main, void *cls)
865 task = GNUNET_malloc (sizeof (struct Task));
866 task->callback = main;
867 task->callback_cls = cls;
868 if ((rs != NULL) && (nfds > 0))
869 memcpy (&task->read_set, rs, sizeof (fd_set));
870 if ((ws != NULL) && (nfds > 0))
871 memcpy (&task->write_set, ws, sizeof (fd_set));
872 task->id = ++sched->last_id;
873 task->prereq_id = prerequisite_task;
874 task->timeout = GNUNET_TIME_relative_to_absolute (delay);
876 check_priority ((prio ==
877 GNUNET_SCHEDULER_PRIORITY_KEEP) ? sched->current_priority
880 task->run_on_shutdown = run_on_shutdown;
881 task->next = sched->pending;
882 sched->pending = task;
886 /* end of scheduler.c */