Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / util / scheduler.c
1 /*
2       This file is part of GNUnet
3       Copyright (C) 2009-2016 GNUnet e.V.
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18       Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file util/scheduler.c
23  * @brief schedule computations using continuation passing style
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "disk.h"
29
30 #define LOG(kind,...) GNUNET_log_from (kind, "util-scheduler", __VA_ARGS__)
31
32 #define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util-scheduler", syscall)
33
34
35 #if HAVE_EXECINFO_H
36 #include "execinfo.h"
37
38 /**
39  * Use lsof to generate file descriptor reports on select error?
40  * (turn off for stable releases).
41  */
42 #define USE_LSOF GNUNET_NO
43
44 /**
45  * Obtain trace information for all scheduler calls that schedule tasks.
46  */
47 #define EXECINFO GNUNET_NO
48
49 /**
50  * Check each file descriptor before adding
51  */
52 #define DEBUG_FDS GNUNET_NO
53
54 /**
55  * Depth of the traces collected via EXECINFO.
56  */
57 #define MAX_TRACE_DEPTH 50
58 #endif
59
60 /**
61  * Should we figure out which tasks are delayed for a while
62  * before they are run? (Consider using in combination with EXECINFO).
63  */
64 #define PROFILE_DELAYS GNUNET_NO
65
66 /**
67  * Task that were in the queue for longer than this are reported if
68  * PROFILE_DELAYS is active.
69  */
70 #define DELAY_THRESHOLD GNUNET_TIME_UNIT_SECONDS
71
72
73 /**
74  * Entry in list of pending tasks.
75  */
76 struct GNUNET_SCHEDULER_Task
77 {
78   /**
79    * This is a linked list.
80    */
81   struct GNUNET_SCHEDULER_Task *next;
82
83   /**
84    * This is a linked list.
85    */
86   struct GNUNET_SCHEDULER_Task *prev;
87
88   /**
89    * Function to run when ready.
90    */
91   GNUNET_SCHEDULER_TaskCallback callback;
92
93   /**
94    * Closure for the @e callback.
95    */
96   void *callback_cls;
97
98   /**
99    * Set of file descriptors this task is waiting
100    * for for reading.  Once ready, this is updated
101    * to reflect the set of file descriptors ready
102    * for operation.
103    */
104   struct GNUNET_NETWORK_FDSet *read_set;
105
106   /**
107    * Set of file descriptors this task is waiting for for writing.
108    * Once ready, this is updated to reflect the set of file
109    * descriptors ready for operation.
110    */
111   struct GNUNET_NETWORK_FDSet *write_set;
112
113   /**
114    * Absolute timeout value for the task, or
115    * #GNUNET_TIME_UNIT_FOREVER_ABS for "no timeout".
116    */
117   struct GNUNET_TIME_Absolute timeout;
118
119 #if PROFILE_DELAYS
120   /**
121    * When was the task scheduled?
122    */
123   struct GNUNET_TIME_Absolute start_time;
124 #endif
125
126   /**
127    * Why is the task ready?  Set after task is added to ready queue.
128    * Initially set to zero.  All reasons that have already been
129    * satisfied (i.e.  read or write ready) will be set over time.
130    */
131   enum GNUNET_SCHEDULER_Reason reason;
132
133   /**
134    * Task priority.
135    */
136   enum GNUNET_SCHEDULER_Priority priority;
137
138   /**
139    * Set if we only wait for reading from a single FD, otherwise -1.
140    */
141   int read_fd;
142
143   /**
144    * Set if we only wait for writing to a single FD, otherwise -1.
145    */
146   int write_fd;
147
148   /**
149    * Should the existence of this task in the queue be counted as
150    * reason to not shutdown the scheduler?
151    */
152   int lifeness;
153
154   /**
155    * Is this task run on shutdown?
156    */
157   int on_shutdown;
158
159   /**
160    * Is this task in the ready list?
161    */
162   int in_ready_list;
163
164 #if EXECINFO
165   /**
166    * Array of strings which make up a backtrace from the point when this
167    * task was scheduled (essentially, who scheduled the task?)
168    */
169   char **backtrace_strings;
170
171   /**
172    * Size of the backtrace_strings array
173    */
174   int num_backtrace_strings;
175 #endif
176
177
178 };
179
180
181 /**
182  * Head of list of tasks waiting for an event.
183  */
184 static struct GNUNET_SCHEDULER_Task *pending_head;
185
186 /**
187  * Tail of list of tasks waiting for an event.
188  */
189 static struct GNUNET_SCHEDULER_Task *pending_tail;
190
191 /**
192  * Head of list of tasks waiting for shutdown.
193  */
194 static struct GNUNET_SCHEDULER_Task *shutdown_head;
195
196 /**
197  * Tail of list of tasks waiting for shutdown.
198  */
199 static struct GNUNET_SCHEDULER_Task *shutdown_tail;
200
201 /**
202  * List of tasks waiting ONLY for a timeout event.
203  * Sorted by timeout (earliest first).  Used so that
204  * we do not traverse the list of these tasks when
205  * building select sets (we just look at the head
206  * to determine the respective timeout ONCE).
207  */
208 static struct GNUNET_SCHEDULER_Task *pending_timeout_head;
209
210 /**
211  * List of tasks waiting ONLY for a timeout event.
212  * Sorted by timeout (earliest first).  Used so that
213  * we do not traverse the list of these tasks when
214  * building select sets (we just look at the head
215  * to determine the respective timeout ONCE).
216  */
217 static struct GNUNET_SCHEDULER_Task *pending_timeout_tail;
218
219 /**
220  * Last inserted task waiting ONLY for a timeout event.
221  * Used to (heuristically) speed up insertion.
222  */
223 static struct GNUNET_SCHEDULER_Task *pending_timeout_last;
224
225 /**
226  * ID of the task that is running right now.
227  */
228 static struct GNUNET_SCHEDULER_Task *active_task;
229
230 /**
231  * Head of list of tasks ready to run right now, grouped by importance.
232  */
233 static struct GNUNET_SCHEDULER_Task *ready_head[GNUNET_SCHEDULER_PRIORITY_COUNT];
234
235 /**
236  * Tail of list of tasks ready to run right now, grouped by importance.
237  */
238 static struct GNUNET_SCHEDULER_Task *ready_tail[GNUNET_SCHEDULER_PRIORITY_COUNT];
239
240 /**
241  * Number of tasks on the ready list.
242  */
243 static unsigned int ready_count;
244
245 /**
246  * How many tasks have we run so far?
247  */
248 static unsigned long long tasks_run;
249
250 /**
251  * Priority of the task running right now.  Only
252  * valid while a task is running.
253  */
254 static enum GNUNET_SCHEDULER_Priority current_priority;
255
256 /**
257  * Priority of the highest task added in the current select
258  * iteration.
259  */
260 static enum GNUNET_SCHEDULER_Priority max_priority_added;
261
262 /**
263  * Value of the 'lifeness' flag for the current task.
264  */
265 static int current_lifeness;
266
267 /**
268  * Function to use as a select() in the scheduler.
269  * If NULL, we use GNUNET_NETWORK_socket_select().
270  */
271 static GNUNET_SCHEDULER_select scheduler_select;
272
273 /**
274  * Task context of the current task.
275  */
276 static struct GNUNET_SCHEDULER_TaskContext tc;
277
278 /**
279  * Closure for #scheduler_select.
280  */
281 static void *scheduler_select_cls;
282
283
284 /**
285  * Sets the select function to use in the scheduler (scheduler_select).
286  *
287  * @param new_select new select function to use
288  * @param new_select_cls closure for @a new_select
289  * @return previously used select function, NULL for default
290  */
291 void
292 GNUNET_SCHEDULER_set_select (GNUNET_SCHEDULER_select new_select,
293                              void *new_select_cls)
294 {
295   scheduler_select = new_select;
296   scheduler_select_cls = new_select_cls;
297 }
298
299
300 /**
301  * Check that the given priority is legal (and return it).
302  *
303  * @param p priority value to check
304  * @return p on success, 0 on error
305  */
306 static enum GNUNET_SCHEDULER_Priority
307 check_priority (enum GNUNET_SCHEDULER_Priority p)
308 {
309   if ((p >= 0) && (p < GNUNET_SCHEDULER_PRIORITY_COUNT))
310     return p;
311   GNUNET_assert (0);
312   return 0;                     /* make compiler happy */
313 }
314
315
316 /**
317  * Update all sets and timeout for select.
318  *
319  * @param rs read-set, set to all FDs we would like to read (updated)
320  * @param ws write-set, set to all FDs we would like to write (updated)
321  * @param timeout next timeout (updated)
322  */
323 static void
324 update_sets (struct GNUNET_NETWORK_FDSet *rs,
325              struct GNUNET_NETWORK_FDSet *ws,
326              struct GNUNET_TIME_Relative *timeout)
327 {
328   struct GNUNET_SCHEDULER_Task *pos;
329   struct GNUNET_TIME_Absolute now;
330   struct GNUNET_TIME_Relative to;
331
332   now = GNUNET_TIME_absolute_get ();
333   pos = pending_timeout_head;
334   if (NULL != pos)
335   {
336     to = GNUNET_TIME_absolute_get_difference (now, pos->timeout);
337     if (timeout->rel_value_us > to.rel_value_us)
338       *timeout = to;
339     if (0 != pos->reason)
340       *timeout = GNUNET_TIME_UNIT_ZERO;
341   }
342   for (pos = pending_head; NULL != pos; pos = pos->next)
343   {
344     if (pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us)
345     {
346       to = GNUNET_TIME_absolute_get_difference (now, pos->timeout);
347       if (timeout->rel_value_us > to.rel_value_us)
348         *timeout = to;
349     }
350     if (-1 != pos->read_fd)
351       GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd);
352     if (-1 != pos->write_fd)
353       GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd);
354     if (NULL != pos->read_set)
355       GNUNET_NETWORK_fdset_add (rs, pos->read_set);
356     if (NULL != pos->write_set)
357       GNUNET_NETWORK_fdset_add (ws, pos->write_set);
358     if (0 != pos->reason)
359       *timeout = GNUNET_TIME_UNIT_ZERO;
360   }
361 }
362
363
364 /**
365  * Check if the ready set overlaps with the set we want to have ready.
366  * If so, update the want set (set all FDs that are ready).  If not,
367  * return #GNUNET_NO.
368  *
369  * @param ready set that is ready
370  * @param want set that we want to be ready
371  * @return #GNUNET_YES if there was some overlap
372  */
373 static int
374 set_overlaps (const struct GNUNET_NETWORK_FDSet *ready,
375               struct GNUNET_NETWORK_FDSet *want)
376 {
377   if ((NULL == want) || (NULL == ready))
378     return GNUNET_NO;
379   if (GNUNET_NETWORK_fdset_overlap (ready, want))
380   {
381     /* copy all over (yes, there maybe unrelated bits,
382      * but this should not hurt well-written clients) */
383     GNUNET_NETWORK_fdset_copy (want, ready);
384     return GNUNET_YES;
385   }
386   return GNUNET_NO;
387 }
388
389
390 /**
391  * Check if the given task is eligible to run now.
392  * Also set the reason why it is eligible.
393  *
394  * @param task task to check if it is ready
395  * @param now the current time
396  * @param rs set of FDs ready for reading
397  * @param ws set of FDs ready for writing
398  * @return #GNUNET_YES if we can run it, #GNUNET_NO if not.
399  */
400 static int
401 is_ready (struct GNUNET_SCHEDULER_Task *task,
402           struct GNUNET_TIME_Absolute now,
403           const struct GNUNET_NETWORK_FDSet *rs,
404           const struct GNUNET_NETWORK_FDSet *ws)
405 {
406   enum GNUNET_SCHEDULER_Reason reason;
407
408   reason = task->reason;
409   if (now.abs_value_us >= task->timeout.abs_value_us)
410     reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
411   if ((0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
412       (((task->read_fd != -1) &&
413         (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd))) ||
414        (set_overlaps (rs, task->read_set))))
415     reason |= GNUNET_SCHEDULER_REASON_READ_READY;
416   if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
417       (((task->write_fd != -1) &&
418         (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd)))
419        || (set_overlaps (ws, task->write_set))))
420     reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
421   if (0 == reason)
422     return GNUNET_NO;           /* not ready */
423   reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
424   task->reason = reason;
425   return GNUNET_YES;
426 }
427
428
429 /**
430  * Put a task that is ready for execution into the ready queue.
431  *
432  * @param task task ready for execution
433  */
434 static void
435 queue_ready_task (struct GNUNET_SCHEDULER_Task *task)
436 {
437   enum GNUNET_SCHEDULER_Priority p = check_priority (task->priority);
438
439   GNUNET_CONTAINER_DLL_insert (ready_head[p],
440                                ready_tail[p],
441                                task);
442   task->in_ready_list = GNUNET_YES;
443   ready_count++;
444 }
445
446
447 /**
448  * Check which tasks are ready and move them
449  * to the respective ready queue.
450  *
451  * @param rs FDs ready for reading
452  * @param ws FDs ready for writing
453  */
454 static void
455 check_ready (const struct GNUNET_NETWORK_FDSet *rs,
456              const struct GNUNET_NETWORK_FDSet *ws)
457 {
458   struct GNUNET_SCHEDULER_Task *pos;
459   struct GNUNET_SCHEDULER_Task *next;
460   struct GNUNET_TIME_Absolute now;
461
462   now = GNUNET_TIME_absolute_get ();
463   while (NULL != (pos = pending_timeout_head))
464   {
465     if (now.abs_value_us >= pos->timeout.abs_value_us)
466       pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
467     if (0 == pos->reason)
468       break;
469     GNUNET_CONTAINER_DLL_remove (pending_timeout_head,
470                                  pending_timeout_tail,
471                                  pos);
472     if (pending_timeout_last == pos)
473       pending_timeout_last = NULL;
474     queue_ready_task (pos);
475   }
476   pos = pending_head;
477   while (NULL != pos)
478   {
479     next = pos->next;
480     if (GNUNET_YES == is_ready (pos, now, rs, ws))
481     {
482       GNUNET_CONTAINER_DLL_remove (pending_head,
483                                    pending_tail,
484                                    pos);
485       queue_ready_task (pos);
486     }
487     pos = next;
488   }
489 }
490
491
492 /**
493  * Request the shutdown of a scheduler.  Marks all tasks
494  * awaiting shutdown as ready. Note that tasks
495  * scheduled with #GNUNET_SCHEDULER_add_shutdown() AFTER this call
496  * will be delayed until the next shutdown signal.
497  */
498 void
499 GNUNET_SCHEDULER_shutdown ()
500 {
501   struct GNUNET_SCHEDULER_Task *pos;
502
503   while (NULL != (pos = shutdown_head))
504   {
505     GNUNET_CONTAINER_DLL_remove (shutdown_head,
506                                  shutdown_tail,
507                                  pos);
508     pos->reason |= GNUNET_SCHEDULER_REASON_SHUTDOWN;
509     queue_ready_task (pos);
510   }
511 }
512
513
514 /**
515  * Destroy a task (release associated resources)
516  *
517  * @param t task to destroy
518  */
519 static void
520 destroy_task (struct GNUNET_SCHEDULER_Task *t)
521 {
522   if (NULL != t->read_set)
523     GNUNET_NETWORK_fdset_destroy (t->read_set);
524   if (NULL != t->write_set)
525     GNUNET_NETWORK_fdset_destroy (t->write_set);
526 #if EXECINFO
527   GNUNET_free (t->backtrace_strings);
528 #endif
529   GNUNET_free (t);
530 }
531
532
533 /**
534  * Output stack trace of task @a t.
535  *
536  * @param t task to dump stack trace of
537  */
538 static void
539 dump_backtrace (struct GNUNET_SCHEDULER_Task *t)
540 {
541 #if EXECINFO
542   unsigned int i;
543
544   for (i = 0; i < t->num_backtrace_strings; i++)
545     LOG (GNUNET_ERROR_TYPE_DEBUG,
546          "Task %p trace %u: %s\n",
547          t,
548          i,
549          t->backtrace_strings[i]);
550 #endif
551 }
552
553
554 /**
555  * Run at least one task in the highest-priority queue that is not
556  * empty.  Keep running tasks until we are either no longer running
557  * "URGENT" tasks or until we have at least one "pending" task (which
558  * may become ready, hence we should select on it).  Naturally, if
559  * there are no more ready tasks, we also return.
560  *
561  * @param rs FDs ready for reading
562  * @param ws FDs ready for writing
563  */
564 static void
565 run_ready (struct GNUNET_NETWORK_FDSet *rs,
566            struct GNUNET_NETWORK_FDSet *ws)
567 {
568   enum GNUNET_SCHEDULER_Priority p;
569   struct GNUNET_SCHEDULER_Task *pos;
570
571   max_priority_added = GNUNET_SCHEDULER_PRIORITY_KEEP;
572   do
573   {
574     if (0 == ready_count)
575       return;
576     GNUNET_assert (NULL == ready_head[GNUNET_SCHEDULER_PRIORITY_KEEP]);
577     /* yes, p>0 is correct, 0 is "KEEP" which should
578      * always be an empty queue (see assertion)! */
579     for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--)
580     {
581       pos = ready_head[p];
582       if (NULL != pos)
583         break;
584     }
585     GNUNET_assert (NULL != pos);        /* ready_count wrong? */
586     GNUNET_CONTAINER_DLL_remove (ready_head[p],
587                                  ready_tail[p],
588                                  pos);
589     ready_count--;
590     current_priority = pos->priority;
591     current_lifeness = pos->lifeness;
592     active_task = pos;
593 #if PROFILE_DELAYS
594     if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us >
595         DELAY_THRESHOLD.rel_value_us)
596     {
597       LOG (GNUNET_ERROR_TYPE_DEBUG,
598            "Task %p took %s to be scheduled\n",
599            pos,
600            GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time),
601                                                    GNUNET_YES));
602     }
603 #endif
604     tc.reason = pos->reason;
605     tc.read_ready = (NULL == pos->read_set) ? rs : pos->read_set;
606     if ((-1 != pos->read_fd) &&
607         (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY)))
608       GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd);
609     tc.write_ready = (NULL == pos->write_set) ? ws : pos->write_set;
610     if ((-1 != pos->write_fd) &&
611         (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)))
612       GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd);
613     if ((0 != (tc.reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) &&
614         (-1 != pos->write_fd) &&
615         (!GNUNET_NETWORK_fdset_test_native (ws, pos->write_fd)))
616       GNUNET_assert (0);          // added to ready in previous select loop!
617     LOG (GNUNET_ERROR_TYPE_DEBUG,
618          "Running task: %p\n",
619          pos);
620     pos->callback (pos->callback_cls);
621     dump_backtrace (pos);
622     active_task = NULL;
623     destroy_task (pos);
624     tasks_run++;
625   }
626   while ((NULL == pending_head) || (p >= max_priority_added));
627 }
628
629
630 /**
631  * Pipe used to communicate shutdown via signal.
632  */
633 static struct GNUNET_DISK_PipeHandle *shutdown_pipe_handle;
634
635 /**
636  * Process ID of this process at the time we installed the various
637  * signal handlers.
638  */
639 static pid_t my_pid;
640
641 /**
642  * Signal handler called for SIGPIPE.
643  */
644 #ifndef MINGW
645 static void
646 sighandler_pipe ()
647 {
648   return;
649 }
650 #endif
651
652
653 /**
654  * Wait for a short time.
655  * Sleeps for @a ms ms (as that should be long enough for virtually all
656  * modern systems to context switch and allow another process to do
657  * some 'real' work).
658  *
659  * @param ms how many ms to wait
660  */
661 static void
662 short_wait (unsigned int ms)
663 {
664   struct GNUNET_TIME_Relative timeout;
665
666   timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ms);
667   (void) GNUNET_NETWORK_socket_select (NULL, NULL, NULL, timeout);
668 }
669
670
671 /**
672  * Signal handler called for signals that should cause us to shutdown.
673  */
674 static void
675 sighandler_shutdown ()
676 {
677   static char c;
678   int old_errno = errno;        /* backup errno */
679
680   if (getpid () != my_pid)
681     exit (1);                   /* we have fork'ed since the signal handler was created,
682                                  * ignore the signal, see https://gnunet.org/vfork discussion */
683   GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
684                           (shutdown_pipe_handle, GNUNET_DISK_PIPE_END_WRITE),
685                           &c, sizeof (c));
686   errno = old_errno;
687 }
688
689
690 /**
691  * Check if the system is still alive. Trigger shutdown if we
692  * have tasks, but none of them give us lifeness.
693  *
694  * @return #GNUNET_OK to continue the main loop,
695  *         #GNUNET_NO to exit
696  */
697 static int
698 check_lifeness ()
699 {
700   struct GNUNET_SCHEDULER_Task *t;
701
702   if (ready_count > 0)
703     return GNUNET_OK;
704   for (t = pending_head; NULL != t; t = t->next)
705     if (t->lifeness == GNUNET_YES)
706       return GNUNET_OK;
707   for (t = shutdown_head; NULL != t; t = t->next)
708     if (t->lifeness == GNUNET_YES)
709       return GNUNET_OK;
710   for (t = pending_timeout_head; NULL != t; t = t->next)
711     if (t->lifeness == GNUNET_YES)
712       return GNUNET_OK;
713   if (NULL != shutdown_head)
714   {
715     GNUNET_SCHEDULER_shutdown ();
716     return GNUNET_OK;
717   }
718   return GNUNET_NO;
719 }
720
721
722 /**
723  * Initialize and run scheduler.  This function will return when all
724  * tasks have completed.  On systems with signals, receiving a SIGTERM
725  * (and other similar signals) will cause #GNUNET_SCHEDULER_shutdown()
726  * to be run after the active task is complete.  As a result, SIGTERM
727  * causes all active tasks to be scheduled with reason
728  * #GNUNET_SCHEDULER_REASON_SHUTDOWN.  (However, tasks added
729  * afterwards will execute normally!). Note that any particular signal
730  * will only shut down one scheduler; applications should always only
731  * create a single scheduler.
732  *
733  * @param task task to run immediately
734  * @param task_cls closure of @a task
735  */
736 void
737 GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
738                       void *task_cls)
739 {
740   struct GNUNET_NETWORK_FDSet *rs;
741   struct GNUNET_NETWORK_FDSet *ws;
742   struct GNUNET_TIME_Relative timeout;
743   int ret;
744   struct GNUNET_SIGNAL_Context *shc_int;
745   struct GNUNET_SIGNAL_Context *shc_term;
746 #if (SIGTERM != GNUNET_TERM_SIG)
747   struct GNUNET_SIGNAL_Context *shc_gterm;
748 #endif
749
750 #ifndef MINGW
751   struct GNUNET_SIGNAL_Context *shc_quit;
752   struct GNUNET_SIGNAL_Context *shc_hup;
753   struct GNUNET_SIGNAL_Context *shc_pipe;
754 #endif
755   unsigned long long last_tr;
756   unsigned int busy_wait_warning;
757   const struct GNUNET_DISK_FileHandle *pr;
758   char c;
759
760   GNUNET_assert (NULL == active_task);
761   rs = GNUNET_NETWORK_fdset_create ();
762   ws = GNUNET_NETWORK_fdset_create ();
763   GNUNET_assert (NULL == shutdown_pipe_handle);
764   shutdown_pipe_handle = GNUNET_DISK_pipe (GNUNET_NO,
765                                            GNUNET_NO,
766                                            GNUNET_NO,
767                                            GNUNET_NO);
768   GNUNET_assert (NULL != shutdown_pipe_handle);
769   pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle,
770                                 GNUNET_DISK_PIPE_END_READ);
771   GNUNET_assert (NULL != pr);
772   my_pid = getpid ();
773   LOG (GNUNET_ERROR_TYPE_DEBUG,
774        "Registering signal handlers\n");
775   shc_int = GNUNET_SIGNAL_handler_install (SIGINT,
776                                            &sighandler_shutdown);
777   shc_term = GNUNET_SIGNAL_handler_install (SIGTERM,
778                                             &sighandler_shutdown);
779 #if (SIGTERM != GNUNET_TERM_SIG)
780   shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG,
781                                              &sighandler_shutdown);
782 #endif
783 #ifndef MINGW
784   shc_pipe = GNUNET_SIGNAL_handler_install (SIGPIPE,
785                                             &sighandler_pipe);
786   shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT,
787                                             &sighandler_shutdown);
788   shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP,
789                                            &sighandler_shutdown);
790 #endif
791   current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT;
792   current_lifeness = GNUNET_YES;
793   GNUNET_SCHEDULER_add_with_reason_and_priority (task,
794                                                  task_cls,
795                                                  GNUNET_SCHEDULER_REASON_STARTUP,
796                                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT);
797   active_task = (void *) (long) -1;     /* force passing of sanity check */
798   GNUNET_SCHEDULER_add_now_with_lifeness (GNUNET_NO,
799                                           &GNUNET_OS_install_parent_control_handler,
800                                           NULL);
801   active_task = NULL;
802   last_tr = 0;
803   busy_wait_warning = 0;
804   while (GNUNET_OK == check_lifeness ())
805   {
806     GNUNET_NETWORK_fdset_zero (rs);
807     GNUNET_NETWORK_fdset_zero (ws);
808     timeout = GNUNET_TIME_UNIT_FOREVER_REL;
809     update_sets (rs, ws, &timeout);
810     GNUNET_NETWORK_fdset_handle_set (rs, pr);
811     if (ready_count > 0)
812     {
813       /* no blocking, more work already ready! */
814       timeout = GNUNET_TIME_UNIT_ZERO;
815     }
816     if (NULL == scheduler_select)
817       ret = GNUNET_NETWORK_socket_select (rs,
818                                           ws,
819                                           NULL,
820                                           timeout);
821     else
822       ret = scheduler_select (scheduler_select_cls,
823                               rs,
824                               ws,
825                               NULL,
826                               timeout);
827     if (ret == GNUNET_SYSERR)
828     {
829       if (errno == EINTR)
830         continue;
831
832       LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select");
833 #ifndef MINGW
834 #if USE_LSOF
835       char lsof[512];
836
837       snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ());
838       (void) close (1);
839       (void) dup2 (2, 1);
840       if (0 != system (lsof))
841         LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
842                       "system");
843 #endif
844 #endif
845 #if DEBUG_FDS
846       struct GNUNET_SCHEDULER_Task *t;
847
848       for (t = pending_head; NULL != t; t = t->next)
849       {
850         if (-1 != t->read_fd)
851         {
852           int flags = fcntl (t->read_fd, F_GETFD);
853           if ((flags == -1) && (errno == EBADF))
854             {
855               LOG (GNUNET_ERROR_TYPE_ERROR,
856                    "Got invalid file descriptor %d!\n",
857                    t->read_fd);
858               dump_backtrace (t);
859             }
860         }
861         if (-1 != t->write_fd)
862           {
863             int flags = fcntl (t->write_fd, F_GETFD);
864             if ((flags == -1) && (errno == EBADF))
865               {
866                 LOG (GNUNET_ERROR_TYPE_ERROR,
867                      "Got invalid file descriptor %d!\n",
868                      t->write_fd);
869                 dump_backtrace (t);
870               }
871           }
872       }
873 #endif
874       GNUNET_assert (0);
875       break;
876     }
877
878     if ( (0 == ret) &&
879          (0 == timeout.rel_value_us) &&
880          (busy_wait_warning > 16) )
881     {
882       LOG (GNUNET_ERROR_TYPE_WARNING,
883            "Looks like we're busy waiting...\n");
884       short_wait (100);                /* mitigate */
885     }
886     check_ready (rs, ws);
887     run_ready (rs, ws);
888     if (GNUNET_NETWORK_fdset_handle_isset (rs, pr))
889     {
890       /* consume the signal */
891       GNUNET_DISK_file_read (pr, &c, sizeof (c));
892       /* mark all active tasks as ready due to shutdown */
893       GNUNET_SCHEDULER_shutdown ();
894     }
895     if (last_tr == tasks_run)
896     {
897       short_wait (1);
898       busy_wait_warning++;
899     }
900     else
901     {
902       last_tr = tasks_run;
903       busy_wait_warning = 0;
904     }
905   }
906   GNUNET_SIGNAL_handler_uninstall (shc_int);
907   GNUNET_SIGNAL_handler_uninstall (shc_term);
908 #if (SIGTERM != GNUNET_TERM_SIG)
909   GNUNET_SIGNAL_handler_uninstall (shc_gterm);
910 #endif
911 #ifndef MINGW
912   GNUNET_SIGNAL_handler_uninstall (shc_pipe);
913   GNUNET_SIGNAL_handler_uninstall (shc_quit);
914   GNUNET_SIGNAL_handler_uninstall (shc_hup);
915 #endif
916   GNUNET_DISK_pipe_close (shutdown_pipe_handle);
917   shutdown_pipe_handle = NULL;
918   GNUNET_NETWORK_fdset_destroy (rs);
919   GNUNET_NETWORK_fdset_destroy (ws);
920 }
921
922
923 /**
924  * Obtain the task context, giving the reason why the current task was
925  * started.
926  *
927  * @return current tasks' scheduler context
928  */
929 const struct GNUNET_SCHEDULER_TaskContext *
930 GNUNET_SCHEDULER_get_task_context ()
931 {
932   GNUNET_assert (NULL != active_task);
933   return &tc;
934 }
935
936
937 /**
938  * Get information about the current load of this scheduler.  Use this
939  * function to determine if an elective task should be added or simply
940  * dropped (if the decision should be made based on the number of
941  * tasks ready to run).
942  *
943  * @param p priority level to look at
944  * @return number of tasks pending right now
945  */
946 unsigned int
947 GNUNET_SCHEDULER_get_load (enum GNUNET_SCHEDULER_Priority p)
948 {
949   struct GNUNET_SCHEDULER_Task *pos;
950   unsigned int ret;
951
952   GNUNET_assert (NULL != active_task);
953   if (p == GNUNET_SCHEDULER_PRIORITY_COUNT)
954     return ready_count;
955   if (p == GNUNET_SCHEDULER_PRIORITY_KEEP)
956     p = current_priority;
957   ret = 0;
958   for (pos = ready_head[check_priority (p)]; NULL != pos; pos = pos->next)
959     ret++;
960   return ret;
961 }
962
963
964 /**
965  * Cancel the task with the specified identifier.
966  * The task must not yet have run.
967  *
968  * @param task id of the task to cancel
969  * @return original closure of the task
970  */
971 void *
972 GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task)
973 {
974   enum GNUNET_SCHEDULER_Priority p;
975   void *ret;
976
977   GNUNET_assert ( (NULL != active_task) ||
978                   (GNUNET_NO == task->lifeness) );
979   if (! task->in_ready_list)
980   {
981     if ( (-1 == task->read_fd) &&
982          (-1 == task->write_fd) &&
983          (NULL == task->read_set) &&
984          (NULL == task->write_set) )
985     {
986       if (GNUNET_YES == task->on_shutdown)
987         GNUNET_CONTAINER_DLL_remove (shutdown_head,
988                                      shutdown_tail,
989                                      task);
990       else
991         GNUNET_CONTAINER_DLL_remove (pending_timeout_head,
992                                      pending_timeout_tail,
993                                      task);
994       if (task == pending_timeout_last)
995         pending_timeout_last = NULL;
996     }
997     else
998     {
999       GNUNET_CONTAINER_DLL_remove (pending_head,
1000                                    pending_tail,
1001                                    task);
1002     }
1003   }
1004   else
1005   {
1006     p = check_priority (task->priority);
1007     GNUNET_CONTAINER_DLL_remove (ready_head[p],
1008                                  ready_tail[p],
1009                                  task);
1010     ready_count--;
1011   }
1012   ret = task->callback_cls;
1013   LOG (GNUNET_ERROR_TYPE_DEBUG,
1014        "Canceling task %p\n",
1015        task);
1016   destroy_task (task);
1017   return ret;
1018 }
1019
1020
1021 /**
1022  * Initialize backtrace data for task @a t
1023  *
1024  * @param t task to initialize
1025  */
1026 static void
1027 init_backtrace (struct GNUNET_SCHEDULER_Task *t)
1028 {
1029 #if EXECINFO
1030   void *backtrace_array[MAX_TRACE_DEPTH];
1031
1032   t->num_backtrace_strings
1033     = backtrace (backtrace_array, MAX_TRACE_DEPTH);
1034   t->backtrace_strings =
1035       backtrace_symbols (backtrace_array,
1036                          t->num_backtrace_strings);
1037   dump_backtrace (t);
1038 #endif
1039 }
1040
1041
1042 /**
1043  * Continue the current execution with the given function.  This is
1044  * similar to the other "add" functions except that there is no delay
1045  * and the reason code can be specified.
1046  *
1047  * @param task main function of the task
1048  * @param task_cls closure for @a task
1049  * @param reason reason for task invocation
1050  * @param priority priority to use for the task
1051  */
1052 void
1053 GNUNET_SCHEDULER_add_with_reason_and_priority (GNUNET_SCHEDULER_TaskCallback task,
1054                                                void *task_cls,
1055                                                enum GNUNET_SCHEDULER_Reason reason,
1056                                                enum GNUNET_SCHEDULER_Priority priority)
1057 {
1058   struct GNUNET_SCHEDULER_Task *t;
1059
1060   GNUNET_assert (NULL != task);
1061   GNUNET_assert ((NULL != active_task) ||
1062                  (GNUNET_SCHEDULER_REASON_STARTUP == reason));
1063   t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
1064   t->read_fd = -1;
1065   t->write_fd = -1;
1066   t->callback = task;
1067   t->callback_cls = task_cls;
1068 #if PROFILE_DELAYS
1069   t->start_time = GNUNET_TIME_absolute_get ();
1070 #endif
1071   t->reason = reason;
1072   t->priority = priority;
1073   t->lifeness = current_lifeness;
1074   LOG (GNUNET_ERROR_TYPE_DEBUG,
1075        "Adding continuation task %p\n",
1076        t);
1077   init_backtrace (t);
1078   queue_ready_task (t);
1079 }
1080
1081
1082 /**
1083  * Schedule a new task to be run at the specified time.  The task
1084  * will be scheduled for execution at time @a at.
1085  *
1086  * @param at time when the operation should run
1087  * @param priority priority to use for the task
1088  * @param task main function of the task
1089  * @param task_cls closure of @a task
1090  * @return unique task identifier for the job
1091  *         only valid until @a task is started!
1092  */
1093 struct GNUNET_SCHEDULER_Task *
1094 GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at,
1095                                        enum GNUNET_SCHEDULER_Priority priority,
1096                                        GNUNET_SCHEDULER_TaskCallback task,
1097                                        void *task_cls)
1098 {
1099   struct GNUNET_SCHEDULER_Task *t;
1100   struct GNUNET_SCHEDULER_Task *pos;
1101   struct GNUNET_SCHEDULER_Task *prev;
1102
1103   GNUNET_assert (NULL != active_task);
1104   GNUNET_assert (NULL != task);
1105   t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
1106   t->callback = task;
1107   t->callback_cls = task_cls;
1108   t->read_fd = -1;
1109   t->write_fd = -1;
1110 #if PROFILE_DELAYS
1111   t->start_time = GNUNET_TIME_absolute_get ();
1112 #endif
1113   t->timeout = at;
1114   t->priority = priority;
1115   t->lifeness = current_lifeness;
1116   /* try tail first (optimization in case we are
1117    * appending to a long list of tasks with timeouts) */
1118   if ( (NULL == pending_timeout_head) ||
1119        (at.abs_value_us < pending_timeout_head->timeout.abs_value_us) )
1120   {
1121     GNUNET_CONTAINER_DLL_insert (pending_timeout_head,
1122                                  pending_timeout_tail,
1123                                  t);
1124   }
1125   else
1126   {
1127     /* first move from heuristic start backwards to before start time */
1128     prev = pending_timeout_last;
1129     while ( (NULL != prev) &&
1130             (prev->timeout.abs_value_us > t->timeout.abs_value_us) )
1131       prev = prev->prev;
1132     /* now, move from heuristic start (or head of list) forward to insertion point */
1133     if (NULL == prev)
1134       pos = pending_timeout_head;
1135     else
1136       pos = prev->next;
1137     while ( (NULL != pos) &&
1138             ( (pos->timeout.abs_value_us <= t->timeout.abs_value_us) ||
1139               (0 != pos->reason) ) )
1140     {
1141       prev = pos;
1142       pos = pos->next;
1143     }
1144     GNUNET_CONTAINER_DLL_insert_after (pending_timeout_head,
1145                                        pending_timeout_tail,
1146                                        prev,
1147                                        t);
1148   }
1149   /* finally, update heuristic insertion point to last insertion... */
1150   pending_timeout_last = t;
1151
1152   LOG (GNUNET_ERROR_TYPE_DEBUG,
1153        "Adding task: %p\n",
1154        t);
1155   init_backtrace (t);
1156   return t;
1157 }
1158
1159
1160 /**
1161  * Schedule a new task to be run with a specified delay.  The task
1162  * will be scheduled for execution once the delay has expired.
1163  *
1164  * @param delay when should this operation time out?
1165  * @param priority priority to use for the task
1166  * @param task main function of the task
1167  * @param task_cls closure of @a task
1168  * @return unique task identifier for the job
1169  *         only valid until @a task is started!
1170  */
1171 struct GNUNET_SCHEDULER_Task *
1172 GNUNET_SCHEDULER_add_delayed_with_priority (struct GNUNET_TIME_Relative delay,
1173                                             enum GNUNET_SCHEDULER_Priority priority,
1174                                             GNUNET_SCHEDULER_TaskCallback task,
1175                                             void *task_cls)
1176 {
1177   return GNUNET_SCHEDULER_add_at_with_priority (GNUNET_TIME_relative_to_absolute (delay),
1178                                                 priority,
1179                                                 task,
1180                                                 task_cls);
1181 }
1182
1183
1184 /**
1185  * Schedule a new task to be run with a specified priority.
1186  *
1187  * @param prio how important is the new task?
1188  * @param task main function of the task
1189  * @param task_cls closure of @a task
1190  * @return unique task identifier for the job
1191  *         only valid until @a task is started!
1192  */
1193 struct GNUNET_SCHEDULER_Task *
1194 GNUNET_SCHEDULER_add_with_priority (enum GNUNET_SCHEDULER_Priority prio,
1195                                     GNUNET_SCHEDULER_TaskCallback task,
1196                                     void *task_cls)
1197 {
1198   return GNUNET_SCHEDULER_add_delayed_with_priority (GNUNET_TIME_UNIT_ZERO,
1199                                                      prio,
1200                                                      task,
1201                                                      task_cls);
1202 }
1203
1204
1205 /**
1206  * Schedule a new task to be run at the specified time.  The task
1207  * will be scheduled for execution once specified time has been
1208  * reached. It will be run with the DEFAULT priority.
1209  *
1210  * @param at time at which this operation should run
1211  * @param task main function of the task
1212  * @param task_cls closure of @a task
1213  * @return unique task identifier for the job
1214  *         only valid until @a task is started!
1215  */
1216 struct GNUNET_SCHEDULER_Task *
1217 GNUNET_SCHEDULER_add_at (struct GNUNET_TIME_Absolute at,
1218                          GNUNET_SCHEDULER_TaskCallback task,
1219                          void *task_cls)
1220 {
1221   return GNUNET_SCHEDULER_add_at_with_priority (at,
1222                                                 GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1223                                                 task,
1224                                                 task_cls);
1225 }
1226
1227
1228 /**
1229  * Schedule a new task to be run with a specified delay.  The task
1230  * will be scheduled for execution once the delay has expired. It
1231  * will be run with the DEFAULT priority.
1232  *
1233  * @param delay when should this operation time out?
1234  * @param task main function of the task
1235  * @param task_cls closure of @a task
1236  * @return unique task identifier for the job
1237  *         only valid until @a task is started!
1238  */
1239 struct GNUNET_SCHEDULER_Task *
1240 GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay,
1241                               GNUNET_SCHEDULER_TaskCallback task,
1242                               void *task_cls)
1243 {
1244   return GNUNET_SCHEDULER_add_delayed_with_priority (delay,
1245                                                      GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1246                                                      task,
1247                                                      task_cls);
1248 }
1249
1250
1251 /**
1252  * Schedule a new task to be run as soon as possible.  Note that this
1253  * does not guarantee that this will be the next task that is being
1254  * run, as other tasks with higher priority (or that are already ready
1255  * to run) might get to run first.  Just as with delays, clients must
1256  * not rely on any particular order of execution between tasks
1257  * scheduled concurrently.
1258  *
1259  * The task will be run with the DEFAULT priority.
1260  *
1261  * @param task main function of the task
1262  * @param task_cls closure of @a task
1263  * @return unique task identifier for the job
1264  *         only valid until @a task is started!
1265  */
1266 struct GNUNET_SCHEDULER_Task *
1267 GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task,
1268                           void *task_cls)
1269 {
1270   return GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_ZERO,
1271                                        task,
1272                                        task_cls);
1273 }
1274
1275
1276 /**
1277  * Schedule a new task to be run on shutdown, that is when a CTRL-C
1278  * signal is received, or when #GNUNET_SCHEDULER_shutdown() is being
1279  * invoked.
1280  *
1281  * @param task main function of the task
1282  * @param task_cls closure of @a task
1283  * @return unique task identifier for the job
1284  *         only valid until @a task is started!
1285  */
1286 struct GNUNET_SCHEDULER_Task *
1287 GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task,
1288                                void *task_cls)
1289 {
1290   struct GNUNET_SCHEDULER_Task *t;
1291
1292   GNUNET_assert (NULL != active_task);
1293   GNUNET_assert (NULL != task);
1294   t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
1295   t->callback = task;
1296   t->callback_cls = task_cls;
1297   t->read_fd = -1;
1298   t->write_fd = -1;
1299 #if PROFILE_DELAYS
1300   t->start_time = GNUNET_TIME_absolute_get ();
1301 #endif
1302   t->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1303   t->priority = GNUNET_SCHEDULER_PRIORITY_SHUTDOWN;
1304   t->on_shutdown = GNUNET_YES;
1305   t->lifeness = GNUNET_YES;
1306   GNUNET_CONTAINER_DLL_insert (shutdown_head,
1307                                shutdown_tail,
1308                                t);
1309   LOG (GNUNET_ERROR_TYPE_DEBUG,
1310        "Adding task: %p\n",
1311        t);
1312   init_backtrace (t);
1313   return t;
1314 }
1315
1316
1317 /**
1318  * Schedule a new task to be run as soon as possible with the
1319  * (transitive) ignore-shutdown flag either explicitly set or
1320  * explicitly enabled.  This task (and all tasks created from it,
1321  * other than by another call to this function) will either count or
1322  * not count for the "lifeness" of the process.  This API is only
1323  * useful in a few special cases.
1324  *
1325  * @param lifeness #GNUNET_YES if the task counts for lifeness, #GNUNET_NO if not.
1326  * @param task main function of the task
1327  * @param task_cls closure of @a task
1328  * @return unique task identifier for the job
1329  *         only valid until @a task is started!
1330  */
1331 struct GNUNET_SCHEDULER_Task *
1332 GNUNET_SCHEDULER_add_now_with_lifeness (int lifeness,
1333                                         GNUNET_SCHEDULER_TaskCallback task,
1334                                         void *task_cls)
1335 {
1336   struct GNUNET_SCHEDULER_Task *ret;
1337
1338   ret = GNUNET_SCHEDULER_add_now (task, task_cls);
1339   ret->lifeness = lifeness;
1340   return ret;
1341 }
1342
1343
1344 /**
1345  * Schedule a new task to be run with a specified delay or when any of
1346  * the specified file descriptor sets is ready.  The delay can be used
1347  * as a timeout on the socket(s) being ready.  The task will be
1348  * scheduled for execution once either the delay has expired or any of
1349  * the socket operations is ready.  This is the most general
1350  * function of the "add" family.  Note that the "prerequisite_task"
1351  * must be satisfied in addition to any of the other conditions.  In
1352  * other words, the task will be started when
1353  * <code>
1354  * (prerequisite-run)
1355  * && (delay-ready
1356  *     || any-rs-ready
1357  *     || any-ws-ready)
1358  * </code>
1359  *
1360  * @param delay how long should we wait?
1361  * @param priority priority to use
1362  * @param rfd file descriptor we want to read (can be -1)
1363  * @param wfd file descriptors we want to write (can be -1)
1364  * @param task main function of the task
1365  * @param task_cls closure of @a task
1366  * @return unique task identifier for the job
1367  *         only valid until @a task is started!
1368  */
1369 #ifndef MINGW
1370 static struct GNUNET_SCHEDULER_Task *
1371 add_without_sets (struct GNUNET_TIME_Relative delay,
1372                   enum GNUNET_SCHEDULER_Priority priority,
1373                   int rfd,
1374                   int wfd,
1375                   GNUNET_SCHEDULER_TaskCallback task,
1376                   void *task_cls)
1377 {
1378   struct GNUNET_SCHEDULER_Task *t;
1379
1380   GNUNET_assert (NULL != active_task);
1381   GNUNET_assert (NULL != task);
1382   t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
1383   t->callback = task;
1384   t->callback_cls = task_cls;
1385 #if DEBUG_FDS
1386   if (-1 != rfd)
1387   {
1388     int flags = fcntl (rfd, F_GETFD);
1389
1390     if ((flags == -1) && (errno == EBADF))
1391     {
1392       LOG (GNUNET_ERROR_TYPE_ERROR,
1393            "Got invalid file descriptor %d!\n",
1394            rfd);
1395       init_backtrace (t);
1396       GNUNET_assert (0);
1397     }
1398   }
1399   if (-1 != wfd)
1400   {
1401     int flags = fcntl (wfd, F_GETFD);
1402
1403     if (flags == -1 && errno == EBADF)
1404     {
1405       LOG (GNUNET_ERROR_TYPE_ERROR,
1406            "Got invalid file descriptor %d!\n",
1407            wfd);
1408       init_backtrace (t);
1409       GNUNET_assert (0);
1410     }
1411   }
1412 #endif
1413   t->read_fd = rfd;
1414   GNUNET_assert (wfd >= -1);
1415   t->write_fd = wfd;
1416 #if PROFILE_DELAYS
1417   t->start_time = GNUNET_TIME_absolute_get ();
1418 #endif
1419   t->timeout = GNUNET_TIME_relative_to_absolute (delay);
1420   t->priority = check_priority ((priority == GNUNET_SCHEDULER_PRIORITY_KEEP) ? current_priority : priority);
1421   t->lifeness = current_lifeness;
1422   GNUNET_CONTAINER_DLL_insert (pending_head,
1423                                pending_tail,
1424                                t);
1425   max_priority_added = GNUNET_MAX (max_priority_added,
1426                                    t->priority);
1427   LOG (GNUNET_ERROR_TYPE_DEBUG,
1428        "Adding task %p\n",
1429        t);
1430   init_backtrace (t);
1431   return t;
1432 }
1433 #endif
1434
1435
1436 /**
1437  * Schedule a new task to be run with a specified delay or when the
1438  * specified file descriptor is ready for reading.  The delay can be
1439  * used as a timeout on the socket being ready.  The task will be
1440  * scheduled for execution once either the delay has expired or the
1441  * socket operation is ready.  It will be run with the DEFAULT priority.
1442  *
1443  * @param delay when should this operation time out?
1444  * @param rfd read file-descriptor
1445  * @param task main function of the task
1446  * @param task_cls closure of @a task
1447  * @return unique task identifier for the job
1448  *         only valid until @a task is started!
1449  */
1450 struct GNUNET_SCHEDULER_Task *
1451 GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay,
1452                                struct GNUNET_NETWORK_Handle *rfd,
1453                                GNUNET_SCHEDULER_TaskCallback task,
1454                                void *task_cls)
1455 {
1456   return GNUNET_SCHEDULER_add_read_net_with_priority (delay,
1457                                                       GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1458                                                       rfd, task, task_cls);
1459 }
1460
1461
1462 /**
1463  * Schedule a new task to be run with a specified priority and to be
1464  * run after the specified delay or when the specified file descriptor
1465  * is ready for reading.  The delay can be used as a timeout on the
1466  * socket being ready.  The task will be scheduled for execution once
1467  * either the delay has expired or the socket operation is ready.  It
1468  * will be run with the DEFAULT priority.
1469  *
1470  * @param delay when should this operation time out?
1471  * @param priority priority to use for the task
1472  * @param rfd read file-descriptor
1473  * @param task main function of the task
1474  * @param task_cls closure of @a task
1475  * @return unique task identifier for the job
1476  *         only valid until @a task is started!
1477  */
1478 struct GNUNET_SCHEDULER_Task *
1479 GNUNET_SCHEDULER_add_read_net_with_priority (struct GNUNET_TIME_Relative delay,
1480                                              enum GNUNET_SCHEDULER_Priority priority,
1481                                              struct GNUNET_NETWORK_Handle *rfd,
1482                                              GNUNET_SCHEDULER_TaskCallback task,
1483                                              void *task_cls)
1484 {
1485   return GNUNET_SCHEDULER_add_net_with_priority (delay, priority,
1486                                                  rfd,
1487                                                  GNUNET_YES,
1488                                                  GNUNET_NO,
1489                                                  task, task_cls);
1490 }
1491
1492
1493 /**
1494  * Schedule a new task to be run with a specified delay or when the
1495  * specified file descriptor is ready for writing.  The delay can be
1496  * used as a timeout on the socket being ready.  The task will be
1497  * scheduled for execution once either the delay has expired or the
1498  * socket operation is ready.  It will be run with the priority of
1499  * the calling task.
1500  *
1501  * @param delay when should this operation time out?
1502  * @param wfd write file-descriptor
1503  * @param task main function of the task
1504  * @param task_cls closure of @a task
1505  * @return unique task identifier for the job
1506  *         only valid until @a task is started!
1507  */
1508 struct GNUNET_SCHEDULER_Task *
1509 GNUNET_SCHEDULER_add_write_net (struct GNUNET_TIME_Relative delay,
1510                                 struct GNUNET_NETWORK_Handle *wfd,
1511                                 GNUNET_SCHEDULER_TaskCallback task,
1512                                 void *task_cls)
1513 {
1514   return GNUNET_SCHEDULER_add_net_with_priority (delay,
1515                                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1516                                                  wfd,
1517                                                  GNUNET_NO, GNUNET_YES,
1518                                                  task, task_cls);
1519 }
1520
1521 /**
1522  * Schedule a new task to be run with a specified delay or when the
1523  * specified file descriptor is ready.  The delay can be
1524  * used as a timeout on the socket being ready.  The task will be
1525  * scheduled for execution once either the delay has expired or the
1526  * socket operation is ready.
1527  *
1528  * @param delay when should this operation time out?
1529  * @param priority priority of the task
1530  * @param fd file-descriptor
1531  * @param on_read whether to poll the file-descriptor for readability
1532  * @param on_write whether to poll the file-descriptor for writability
1533  * @param task main function of the task
1534  * @param task_cls closure of task
1535  * @return unique task identifier for the job
1536  *         only valid until "task" is started!
1537  */
1538 struct GNUNET_SCHEDULER_Task *
1539 GNUNET_SCHEDULER_add_net_with_priority  (struct GNUNET_TIME_Relative delay,
1540                                          enum GNUNET_SCHEDULER_Priority priority,
1541                                          struct GNUNET_NETWORK_Handle *fd,
1542                                          int on_read,
1543                                          int on_write,
1544                                          GNUNET_SCHEDULER_TaskCallback task,
1545                                          void *task_cls)
1546 {
1547 #if MINGW
1548   struct GNUNET_NETWORK_FDSet *s;
1549   struct GNUNET_SCHEDULER_Task * ret;
1550
1551   GNUNET_assert (NULL != fd);
1552   s = GNUNET_NETWORK_fdset_create ();
1553   GNUNET_NETWORK_fdset_set (s, fd);
1554   ret = GNUNET_SCHEDULER_add_select (
1555       priority, delay,
1556       on_read  ? s : NULL,
1557       on_write ? s : NULL,
1558       task, task_cls);
1559   GNUNET_NETWORK_fdset_destroy (s);
1560   return ret;
1561 #else
1562   GNUNET_assert (GNUNET_NETWORK_get_fd (fd) >= 0);
1563   return add_without_sets (delay, priority,
1564                            on_read  ? GNUNET_NETWORK_get_fd (fd) : -1,
1565                            on_write ? GNUNET_NETWORK_get_fd (fd) : -1,
1566                            task, task_cls);
1567 #endif
1568 }
1569
1570
1571 /**
1572  * Schedule a new task to be run with a specified delay or when the
1573  * specified file descriptor is ready for reading.  The delay can be
1574  * used as a timeout on the socket being ready.  The task will be
1575  * scheduled for execution once either the delay has expired or the
1576  * socket operation is ready. It will be run with the DEFAULT priority.
1577  *
1578  * @param delay when should this operation time out?
1579  * @param rfd read file-descriptor
1580  * @param task main function of the task
1581  * @param task_cls closure of @a task
1582  * @return unique task identifier for the job
1583  *         only valid until @a task is started!
1584  */
1585 struct GNUNET_SCHEDULER_Task *
1586 GNUNET_SCHEDULER_add_read_file (struct GNUNET_TIME_Relative delay,
1587                                 const struct GNUNET_DISK_FileHandle *rfd,
1588                                 GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
1589 {
1590   return GNUNET_SCHEDULER_add_file_with_priority (
1591       delay, GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1592       rfd, GNUNET_YES, GNUNET_NO,
1593       task, task_cls);
1594 }
1595
1596
1597 /**
1598  * Schedule a new task to be run with a specified delay or when the
1599  * specified file descriptor is ready for writing.  The delay can be
1600  * used as a timeout on the socket being ready.  The task will be
1601  * scheduled for execution once either the delay has expired or the
1602  * socket operation is ready. It will be run with the DEFAULT priority.
1603  *
1604  * @param delay when should this operation time out?
1605  * @param wfd write file-descriptor
1606  * @param task main function of the task
1607  * @param task_cls closure of @a task
1608  * @return unique task identifier for the job
1609  *         only valid until @a task is started!
1610  */
1611 struct GNUNET_SCHEDULER_Task *
1612 GNUNET_SCHEDULER_add_write_file (struct GNUNET_TIME_Relative delay,
1613                                  const struct GNUNET_DISK_FileHandle *wfd,
1614                                  GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
1615 {
1616   return GNUNET_SCHEDULER_add_file_with_priority (
1617       delay, GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1618       wfd, GNUNET_NO, GNUNET_YES,
1619       task, task_cls);
1620 }
1621
1622
1623 /**
1624  * Schedule a new task to be run with a specified delay or when the
1625  * specified file descriptor is ready.  The delay can be
1626  * used as a timeout on the socket being ready.  The task will be
1627  * scheduled for execution once either the delay has expired or the
1628  * socket operation is ready.
1629  *
1630  * @param delay when should this operation time out?
1631  * @param priority priority of the task
1632  * @param fd file-descriptor
1633  * @param on_read whether to poll the file-descriptor for readability
1634  * @param on_write whether to poll the file-descriptor for writability
1635  * @param task main function of the task
1636  * @param task_cls closure of @a task
1637  * @return unique task identifier for the job
1638  *         only valid until @a task is started!
1639  */
1640 struct GNUNET_SCHEDULER_Task *
1641 GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay,
1642                                          enum GNUNET_SCHEDULER_Priority priority,
1643                                          const struct GNUNET_DISK_FileHandle *fd,
1644                                          int on_read, int on_write,
1645                                          GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
1646 {
1647 #if MINGW
1648   struct GNUNET_NETWORK_FDSet *s;
1649   struct GNUNET_SCHEDULER_Task * ret;
1650
1651   GNUNET_assert (NULL != fd);
1652   s = GNUNET_NETWORK_fdset_create ();
1653   GNUNET_NETWORK_fdset_handle_set (s, fd);
1654   ret = GNUNET_SCHEDULER_add_select (
1655       priority, delay,
1656       on_read  ? s : NULL,
1657       on_write ? s : NULL,
1658       task, task_cls);
1659   GNUNET_NETWORK_fdset_destroy (s);
1660   return ret;
1661 #else
1662   int real_fd;
1663
1664   GNUNET_DISK_internal_file_handle_ (fd, &real_fd, sizeof (int));
1665   GNUNET_assert (real_fd >= 0);
1666   return add_without_sets (
1667       delay, priority,
1668       on_read  ? real_fd : -1,
1669       on_write ? real_fd : -1,
1670       task, task_cls);
1671 #endif
1672 }
1673
1674
1675 /**
1676  * Schedule a new task to be run with a specified delay or when any of
1677  * the specified file descriptor sets is ready.  The delay can be used
1678  * as a timeout on the socket(s) being ready.  The task will be
1679  * scheduled for execution once either the delay has expired or any of
1680  * the socket operations is ready.  This is the most general
1681  * function of the "add" family.  Note that the "prerequisite_task"
1682  * must be satisfied in addition to any of the other conditions.  In
1683  * other words, the task will be started when
1684  * <code>
1685  * (prerequisite-run)
1686  * && (delay-ready
1687  *     || any-rs-ready
1688  *     || any-ws-ready) )
1689  * </code>
1690  *
1691  * @param prio how important is this task?
1692  * @param delay how long should we wait?
1693  * @param rs set of file descriptors we want to read (can be NULL)
1694  * @param ws set of file descriptors we want to write (can be NULL)
1695  * @param task main function of the task
1696  * @param task_cls closure of @a task
1697  * @return unique task identifier for the job
1698  *         only valid until @a task is started!
1699  */
1700 struct GNUNET_SCHEDULER_Task *
1701 GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio,
1702                              struct GNUNET_TIME_Relative delay,
1703                              const struct GNUNET_NETWORK_FDSet *rs,
1704                              const struct GNUNET_NETWORK_FDSet *ws,
1705                              GNUNET_SCHEDULER_TaskCallback task,
1706                              void *task_cls)
1707 {
1708   struct GNUNET_SCHEDULER_Task *t;
1709
1710   if ( (NULL == rs) &&
1711        (NULL == ws) )
1712     return GNUNET_SCHEDULER_add_delayed_with_priority (delay,
1713                                                        prio,
1714                                                        task,
1715                                                        task_cls);
1716   GNUNET_assert (NULL != active_task);
1717   GNUNET_assert (NULL != task);
1718   t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
1719   t->callback = task;
1720   t->callback_cls = task_cls;
1721   t->read_fd = -1;
1722   t->write_fd = -1;
1723   if (NULL != rs)
1724   {
1725     t->read_set = GNUNET_NETWORK_fdset_create ();
1726     GNUNET_NETWORK_fdset_copy (t->read_set, rs);
1727   }
1728   if (NULL != ws)
1729   {
1730     t->write_set = GNUNET_NETWORK_fdset_create ();
1731     GNUNET_NETWORK_fdset_copy (t->write_set, ws);
1732   }
1733 #if PROFILE_DELAYS
1734   t->start_time = GNUNET_TIME_absolute_get ();
1735 #endif
1736   t->timeout = GNUNET_TIME_relative_to_absolute (delay);
1737   t->priority =
1738       check_priority ((prio ==
1739                        GNUNET_SCHEDULER_PRIORITY_KEEP) ? current_priority :
1740                       prio);
1741   t->lifeness = current_lifeness;
1742   GNUNET_CONTAINER_DLL_insert (pending_head,
1743                                pending_tail,
1744                                t);
1745   max_priority_added = GNUNET_MAX (max_priority_added, t->priority);
1746   LOG (GNUNET_ERROR_TYPE_DEBUG,
1747        "Adding task %p\n",
1748        t);
1749   init_backtrace (t);
1750   return t;
1751 }
1752
1753 /* end of scheduler.c */