From 6a1cd49f753ee946599266d0265afdd9ba20e68e Mon Sep 17 00:00:00 2001 From: lurchi Date: Tue, 8 Aug 2017 18:35:53 +0200 Subject: [PATCH] separate the select driver's fd sets from the driver-internal fdsets --- src/include/gnunet_scheduler_lib.h | 2 +- src/util/scheduler.c | 849 ++++++++++++++--------------- 2 files changed, 407 insertions(+), 444 deletions(-) diff --git a/src/include/gnunet_scheduler_lib.h b/src/include/gnunet_scheduler_lib.h index 837a23ba5..68a5ac534 100644 --- a/src/include/gnunet_scheduler_lib.h +++ b/src/include/gnunet_scheduler_lib.h @@ -359,7 +359,7 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, * * @return NULL on error */ -const struct GNUNET_SCHEDULER_Driver * +struct GNUNET_SCHEDULER_Driver * GNUNET_SCHEDULER_driver_select (void); diff --git a/src/util/scheduler.c b/src/util/scheduler.c index 42309c199..7cd42dcea 100644 --- a/src/util/scheduler.c +++ b/src/util/scheduler.c @@ -89,12 +89,6 @@ struct GNUNET_SCHEDULER_Handle * @deprecated */ struct GNUNET_NETWORK_FDSet *ws; - - /** - * Driver we used for the event loop. - */ - const struct GNUNET_SCHEDULER_Driver *driver; - }; @@ -123,11 +117,6 @@ struct GNUNET_SCHEDULER_Task */ void *callback_cls; - /** - * Handle to the scheduler's state. - */ - const struct GNUNET_SCHEDULER_Handle *sh; - /** * Set of file descriptors this task is waiting * for for reading. Once ready, this is updated @@ -224,9 +213,38 @@ struct GNUNET_SCHEDULER_Task int num_backtrace_strings; #endif +}; + + +struct Scheduled +{ + struct Scheduled *prev; + + struct Scheduled *next; + + struct GNUNET_SCHEDULER_Task *task; + struct GNUNET_SCHEDULER_FdInfo *fdi; }; + +/** + * Driver context used by GNUNET_SCHEDULER_run + */ +struct DriverContext +{ + struct Scheduled *scheduled_in_head; + + struct Scheduled *scheduled_in_tail; + + struct Scheduled *scheduled_out_head; + + struct Scheduled *scheduled_out_tail; + + struct GNUNET_TIME_Relative timeout; +}; + + /** * The driver used for the event loop. Will be handed over to * the scheduler in #GNUNET_SCHEDULER_run_from_driver(), peristed @@ -338,6 +356,11 @@ static struct GNUNET_SCHEDULER_TaskContext tc; */ static void *scheduler_select_cls; +/** + * Scheduler handle used for the driver functions + */ +static struct GNUNET_SCHEDULER_Handle sh; + /** * Sets the select function to use in the scheduler (scheduler_select). @@ -372,127 +395,44 @@ check_priority (enum GNUNET_SCHEDULER_Priority p) /** - * Update all sets and timeout for select. - * - * @param rs read-set, set to all FDs we would like to read (updated) - * @param ws write-set, set to all FDs we would like to write (updated) - * @param timeout next timeout (updated) + * chooses the nearest timeout from all pending tasks, to be used + * to tell the driver the next wakeup time (using its set_wakeup + * callback) */ -void getNextPendingTimeout(struct GNUNET_TIME_Relative *timeout) +struct GNUNET_TIME_Absolute +get_timeout () { - struct GNUNET_SCHEDULER_Task *pos; struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative to; + struct GNUNET_TIME_Absolute timeout; - now = GNUNET_TIME_absolute_get (); pos = pending_timeout_head; + now = GNUNET_TIME_absolute_get (); + timeout = GNUNET_TIME_UNIT_FOREVER_ABS; if (NULL != pos) { - to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); - if (timeout->rel_value_us > to.rel_value_us) - *timeout = to; if (0 != pos->reason) - *timeout = GNUNET_TIME_UNIT_ZERO; + { + timeout = now; + } + else + { + timeout = pos->timeout; + } } -} - -static void -update_sets (struct GNUNET_NETWORK_FDSet *rs, - struct GNUNET_NETWORK_FDSet *ws, - struct GNUNET_TIME_Relative *timeout) -{ - struct GNUNET_SCHEDULER_Task *pos; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative to; - - now = GNUNET_TIME_absolute_get (); - - getNextPendingTimeout(timeout); for (pos = pending_head; NULL != pos; pos = pos->next) { - if (pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) + if (0 != pos->reason) { - to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); - if (timeout->rel_value_us > to.rel_value_us) - *timeout = to; + timeout = now; + } + else if ((pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) && + (timeout.abs_value_us > pos->timeout.abs_value_us)) + { + timeout = pos->timeout; } - if (-1 != pos->read_fd) - GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd); - if (-1 != pos->write_fd) - GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd); - if (NULL != pos->read_set) - GNUNET_NETWORK_fdset_add (rs, pos->read_set); - if (NULL != pos->write_set) - GNUNET_NETWORK_fdset_add (ws, pos->write_set); - if (0 != pos->reason) - *timeout = GNUNET_TIME_UNIT_ZERO; - } -} - - -/** - * Check if the ready set overlaps with the set we want to have ready. - * If so, update the want set (set all FDs that are ready). If not, - * return #GNUNET_NO. - * - * @param ready set that is ready - * @param want set that we want to be ready - * @return #GNUNET_YES if there was some overlap - */ -static int -set_overlaps (const struct GNUNET_NETWORK_FDSet *ready, - struct GNUNET_NETWORK_FDSet *want) -{ - if ((NULL == want) || (NULL == ready)) - return GNUNET_NO; - if (GNUNET_NETWORK_fdset_overlap (ready, want)) - { - /* copy all over (yes, there maybe unrelated bits, - * but this should not hurt well-written clients) */ - GNUNET_NETWORK_fdset_copy (want, ready); - return GNUNET_YES; } - return GNUNET_NO; -} - - -/** - * Check if the given task is eligible to run now. - * Also set the reason why it is eligible. - * - * @param task task to check if it is ready - * @param now the current time - * @param rs set of FDs ready for reading - * @param ws set of FDs ready for writing - * @return #GNUNET_YES if we can run it, #GNUNET_NO if not. - */ -static int -is_ready (struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_TIME_Absolute now, - const struct GNUNET_NETWORK_FDSet *rs, - const struct GNUNET_NETWORK_FDSet *ws) -{ - enum GNUNET_SCHEDULER_Reason reason; - - reason = task->reason; - if (now.abs_value_us >= task->timeout.abs_value_us) - reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if ((0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) && - (((task->read_fd != -1) && - (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd))) || - (set_overlaps (rs, task->read_set)))) - reason |= GNUNET_SCHEDULER_REASON_READ_READY; - if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (((task->write_fd != -1) && - (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd))) - || (set_overlaps (ws, task->write_set)))) - reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; - if (0 == reason) - return GNUNET_NO; /* not ready */ - reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; - task->reason = reason; - return GNUNET_YES; + return timeout; } @@ -514,53 +454,6 @@ queue_ready_task (struct GNUNET_SCHEDULER_Task *task) } -/** - * Check which tasks are ready and move them - * to the respective ready queue. - * - * @param rs FDs ready for reading - * @param ws FDs ready for writing - */ -static void -check_ready (const struct GNUNET_NETWORK_FDSet *rs, - const struct GNUNET_NETWORK_FDSet *ws) -{ - struct GNUNET_SCHEDULER_Task *pos; - struct GNUNET_SCHEDULER_Task *next; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - while (NULL != (pos = pending_timeout_head)) - { - if (now.abs_value_us >= pos->timeout.abs_value_us) - pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if (0 == pos->reason) - break; - scheduler_driver->set_wakeup (scheduler_driver->cls, - pending_timeout_head->timeout); - GNUNET_CONTAINER_DLL_remove (pending_timeout_head, - pending_timeout_tail, - pos); - if (pending_timeout_last == pos) - pending_timeout_last = NULL; - queue_ready_task (pos); - } - pos = pending_head; - while (NULL != pos) - { - next = pos->next; - if (GNUNET_YES == is_ready (pos, now, rs, ws)) - { - GNUNET_CONTAINER_DLL_remove (pending_head, - pending_tail, - pos); - queue_ready_task (pos); - } - pos = next; - } -} - - /** * Request the shutdown of a scheduler. Marks all tasks * awaiting shutdown as ready. Note that tasks @@ -614,7 +507,7 @@ dump_backtrace (struct GNUNET_SCHEDULER_Task *t) unsigned int i; for (i = 0; i < t->num_backtrace_strings; i++) - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Task %p trace %u: %s\n", t, i, @@ -623,82 +516,6 @@ dump_backtrace (struct GNUNET_SCHEDULER_Task *t) } -/** - * Run at least one task in the highest-priority queue that is not - * empty. Keep running tasks until we are either no longer running - * "URGENT" tasks or until we have at least one "pending" task (which - * may become ready, hence we should select on it). Naturally, if - * there are no more ready tasks, we also return. - * - * @param rs FDs ready for reading - * @param ws FDs ready for writing - */ -static void -run_ready (struct GNUNET_NETWORK_FDSet *rs, - struct GNUNET_NETWORK_FDSet *ws) -{ - enum GNUNET_SCHEDULER_Priority p; - struct GNUNET_SCHEDULER_Task *pos; - - max_priority_added = GNUNET_SCHEDULER_PRIORITY_KEEP; - do - { - if (0 == ready_count) - return; - GNUNET_assert (NULL == ready_head[GNUNET_SCHEDULER_PRIORITY_KEEP]); - /* yes, p>0 is correct, 0 is "KEEP" which should - * always be an empty queue (see assertion)! */ - for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--) - { - pos = ready_head[p]; - if (NULL != pos) - break; - } - GNUNET_assert (NULL != pos); /* ready_count wrong? */ - GNUNET_CONTAINER_DLL_remove (ready_head[p], - ready_tail[p], - pos); - ready_count--; - current_priority = pos->priority; - current_lifeness = pos->lifeness; - active_task = pos; -#if PROFILE_DELAYS - if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us > - DELAY_THRESHOLD.rel_value_us) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p took %s to be scheduled\n", - pos, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), - GNUNET_YES)); - } -#endif - tc.reason = pos->reason; - tc.read_ready = (NULL == pos->read_set) ? rs : pos->read_set; - if ((-1 != pos->read_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY))) - GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd); - tc.write_ready = (NULL == pos->write_set) ? ws : pos->write_set; - if ((-1 != pos->write_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))) - GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd); - if ((0 != (tc.reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (-1 != pos->write_fd) && - (!GNUNET_NETWORK_fdset_test_native (ws, pos->write_fd))) - GNUNET_assert (0); // added to ready in previous select loop! - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Running task: %p\n", - pos); - pos->callback (pos->callback_cls); - dump_backtrace (pos); - active_task = NULL; - destroy_task (pos); - tasks_run++; - } - while ((NULL == pending_head) || (p >= max_priority_added)); -} - - /** * Pipe used to communicate shutdown via signal. */ @@ -766,152 +583,31 @@ sighandler_shutdown () * @return #GNUNET_OK to continue the main loop, * #GNUNET_NO to exit */ -static int -check_lifeness () -{ - struct GNUNET_SCHEDULER_Task *t; - - if (ready_count > 0) - return GNUNET_OK; - for (t = pending_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; - for (t = shutdown_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; - for (t = pending_timeout_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; - if (NULL != shutdown_head) - { - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; - } - return GNUNET_NO; -} - +//static int +//check_lifeness () +//{ +// struct GNUNET_SCHEDULER_Task *t; +// +// if (ready_count > 0) +// return GNUNET_OK; +// for (t = pending_head; NULL != t; t = t->next) +// if (t->lifeness == GNUNET_YES) +// return GNUNET_OK; +// for (t = shutdown_head; NULL != t; t = t->next) +// if (t->lifeness == GNUNET_YES) +// return GNUNET_OK; +// for (t = pending_timeout_head; NULL != t; t = t->next) +// if (t->lifeness == GNUNET_YES) +// return GNUNET_OK; +// if (NULL != shutdown_head) +// { +// GNUNET_SCHEDULER_shutdown (); +// return GNUNET_OK; +// } +// return GNUNET_NO; +//} -int while_live(struct GNUNET_NETWORK_FDSet *rs, struct GNUNET_NETWORK_FDSet *ws) -{ - int ret; - unsigned int busy_wait_warning; - unsigned long long last_tr; - const struct GNUNET_DISK_FileHandle *pr; - char c; - struct GNUNET_TIME_Relative timeout; - - busy_wait_warning = 0; - pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, - GNUNET_DISK_PIPE_END_READ); - GNUNET_assert (NULL != pr); - last_tr = 0; - - while (GNUNET_OK == check_lifeness ()) - { - GNUNET_NETWORK_fdset_zero (rs); - GNUNET_NETWORK_fdset_zero (ws); - timeout = GNUNET_TIME_UNIT_FOREVER_REL; - update_sets (rs, ws, &timeout); - GNUNET_NETWORK_fdset_handle_set (rs, pr); - if (ready_count > 0) - { - /* no blocking, more work already ready! */ - timeout = GNUNET_TIME_UNIT_ZERO; - } - if (NULL == scheduler_select) - ret = GNUNET_NETWORK_socket_select (rs, - ws, - NULL, - timeout); - else - ret = scheduler_select (scheduler_select_cls, - rs, - ws, - NULL, - timeout); - if (ret == GNUNET_SYSERR) - { - if (errno == EINTR) - continue; - - LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select"); -#ifndef MINGW -#if USE_LSOF - char lsof[512]; - - snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ()); - (void) close (1); - (void) dup2 (2, 1); - if (0 != system (lsof)) - LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, - "system"); -#endif -#endif -#if DEBUG_FDS - struct GNUNET_SCHEDULER_Task *t; - - for (t = pending_head; NULL != t; t = t->next) - { - if (-1 != t->read_fd) - { - int flags = fcntl (t->read_fd, F_GETFD); - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - t->read_fd); - dump_backtrace (t); - } - } - if (-1 != t->write_fd) - { - int flags = fcntl (t->write_fd, F_GETFD); - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - t->write_fd); - dump_backtrace (t); - } - } - } -#endif - GNUNET_assert (0); - break; - } - - if ( (0 == ret) && - (0 == timeout.rel_value_us) && - (busy_wait_warning > 16) ) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Looks like we're busy waiting...\n"); - short_wait (100); /* mitigate */ - } - check_ready (rs, ws); - run_ready (rs, ws); - if (GNUNET_NETWORK_fdset_handle_isset (rs, pr)) - { - /* consume the signal */ - GNUNET_DISK_file_read (pr, &c, sizeof (c)); - /* mark all active tasks as ready due to shutdown */ - GNUNET_SCHEDULER_shutdown (); - } - if (last_tr == tasks_run) - { - short_wait (1); - busy_wait_warning++; - } - else - { - last_tr = tasks_run; - busy_wait_warning = 0; - } - } - return ret; -} - /** * Initialize and run scheduler. This function will return when all * tasks have completed. On systems with signals, receiving a SIGTERM @@ -930,9 +626,19 @@ void GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { - - GNUNET_SCHEDULER_run_with_driver(GNUNET_SCHEDULER_driver_select (), task, task_cls); - + struct GNUNET_SCHEDULER_Driver *driver; + struct DriverContext context = {.scheduled_in_head = NULL, + .scheduled_in_tail = NULL, + .scheduled_out_head = NULL, + .scheduled_out_tail = NULL, + .timeout = GNUNET_TIME_UNIT_FOREVER_REL}; + + driver = GNUNET_SCHEDULER_driver_select (); + driver->cls = &context; + + GNUNET_SCHEDULER_run_with_driver (driver, task, task_cls); + + GNUNET_free (driver); } @@ -1010,39 +716,63 @@ initFdInfo(struct GNUNET_SCHEDULER_Task *t, { struct GNUNET_SCHEDULER_FdInfo read_fdi = { .fd = read_nh, .et = GNUNET_SCHEDULER_ET_IN, .sock = GNUNET_NETWORK_get_fd (read_nh)}; t->fdx = read_fdi; + t->fds = &t->fdx; } else if (NULL != write_nh) { struct GNUNET_SCHEDULER_FdInfo write_fdi = { .fd = write_nh, .et = GNUNET_SCHEDULER_ET_OUT, .sock = GNUNET_NETWORK_get_fd (write_nh)}; t->fdx = write_fdi; + t->fds = &t->fdx; } else if (NULL != read_fh) { struct GNUNET_SCHEDULER_FdInfo read_fdi = { .fh = read_fh, .et = GNUNET_SCHEDULER_ET_IN, .sock = read_fh->fd}; t->fdx = read_fdi; + t->fds = &t->fdx; } else if (NULL != write_fh) { struct GNUNET_SCHEDULER_FdInfo write_fdi = { .fh = write_fh, .et = GNUNET_SCHEDULER_ET_OUT, .sock = write_fh->fd}; t->fdx = write_fdi; + t->fds = &t->fdx; } } int scheduler_multi_function_call(struct GNUNET_SCHEDULER_Task *t, int (*driver_func)()) { - if (t->fds_len > 1){ + if (t->fds_len > 1) + { int success = GNUNET_YES; - for (int i = 0; i < t->fds_len;i++){ - success = driver_func(scheduler_driver->cls, t , t->fds+i) && success; + for (int i = 0; i < t->fds_len;i++) + { + success = driver_func (scheduler_driver->cls, t , t->fds+i) && success; } return success; - }else{ - return driver_func(scheduler_driver->cls, t , t->fds); + } + else + { + return driver_func (scheduler_driver->cls, t , t->fds); } } +void +shutdown_task (void *cls) +{ + char c; + const struct GNUNET_DISK_FileHandle *pr; + + pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, + GNUNET_DISK_PIPE_END_READ); + GNUNET_assert (! GNUNET_DISK_handle_invalid (pr)); + /* consume the signal */ + GNUNET_DISK_file_read (pr, &c, sizeof (c)); + /* mark all active tasks as ready due to shutdown */ + GNUNET_SCHEDULER_shutdown (); +} + + /** * Cancel the task with the specified identifier. * The task must not yet have run. @@ -1078,7 +808,7 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task) pending_timeout_last = NULL; else scheduler_driver->set_wakeup (scheduler_driver->cls, - pending_timeout_head->timeout); + get_timeout ()); } if (task == pending_timeout_last) pending_timeout_last = NULL; @@ -1099,10 +829,10 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task) task); ready_count--; } - ret = task->callback_cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Canceling task %p\n", task); + ret = task->callback_cls; destroy_task (task); return ret; } @@ -1161,7 +891,7 @@ GNUNET_SCHEDULER_add_with_reason_and_priority (GNUNET_SCHEDULER_TaskCallback tas t->reason = reason; t->priority = priority; t->lifeness = current_lifeness; - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Adding continuation task %p\n", t); init_backtrace (t); @@ -1211,7 +941,8 @@ GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at, GNUNET_CONTAINER_DLL_insert (pending_timeout_head, pending_timeout_tail, t); - scheduler_driver->set_wakeup(scheduler_driver->cls,pending_timeout_head->timeout); + scheduler_driver->set_wakeup (scheduler_driver->cls, + at); } else { @@ -1236,11 +967,13 @@ GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at, pending_timeout_tail, prev, t); + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout()); } /* finally, update heuristic insertion point to last insertion... */ pending_timeout_last = t; - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Adding task: %p\n", t); init_backtrace (t); @@ -1356,11 +1089,11 @@ GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { return GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_ZERO, - task, - task_cls); + task, + task_cls); } @@ -1376,7 +1109,7 @@ GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { struct GNUNET_SCHEDULER_Task *t; @@ -1395,10 +1128,10 @@ GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task, t->on_shutdown = GNUNET_YES; t->lifeness = GNUNET_YES; GNUNET_CONTAINER_DLL_insert (shutdown_head, - shutdown_tail, - t); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task: %p\n", + shutdown_tail, + t); + LOG (GNUNET_ERROR_TYPE_WARNING, + "Adding shutdown task: %p\n", t); init_backtrace (t); return t; @@ -1465,8 +1198,6 @@ add_without_sets (struct GNUNET_TIME_Relative delay, const struct GNUNET_NETWORK_Handle *write_nh, const struct GNUNET_DISK_FileHandle *read_fh, const struct GNUNET_DISK_FileHandle *write_fh, - //int rfd, - //int wfd, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { @@ -1517,6 +1248,8 @@ add_without_sets (struct GNUNET_TIME_Relative delay, pending_tail, t); scheduler_multi_function_call(t, scheduler_driver->add); + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); max_priority_added = GNUNET_MAX (max_priority_added, t->priority); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1758,7 +1491,7 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, return ret; #else GNUNET_assert (on_read || on_write); - GNUNET_assert(fd->fd >= 0); + GNUNET_assert (fd->fd >= 0); return add_without_sets (delay, priority, NULL, NULL, @@ -1840,9 +1573,11 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, pending_tail, t); scheduler_multi_function_call(t, scheduler_driver->add); + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); max_priority_added = GNUNET_MAX (max_priority_added, t->priority); - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Adding task %p\n", t); init_backtrace (t); @@ -1862,11 +1597,15 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, */ void GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, - enum GNUNET_SCHEDULER_EventType et) + enum GNUNET_SCHEDULER_EventType et) { enum GNUNET_SCHEDULER_Reason reason; struct GNUNET_TIME_Absolute now; + LOG (GNUNET_ERROR_TYPE_WARNING, + "task ready: %p\n", + task); + now = GNUNET_TIME_absolute_get (); reason = task->reason; if (now.abs_value_us >= task->timeout.abs_value_us) @@ -1882,6 +1621,9 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, task->fds = &task->fdx; task->fdx.et = et; task->fds_len = 1; + GNUNET_CONTAINER_DLL_remove (pending_head, + pending_tail, + task); queue_ready_task (task); } @@ -1905,6 +1647,7 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, int GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) { + // FIXME: we have to check lifeness here! enum GNUNET_SCHEDULER_Priority p; struct GNUNET_SCHEDULER_Task *pos; struct GNUNET_TIME_Absolute now; @@ -1920,7 +1663,6 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) GNUNET_CONTAINER_DLL_remove (pending_timeout_head, pending_timeout_tail, pos); - scheduler_driver->set_wakeup(scheduler_driver->cls,pending_timeout_head->timeout); if (pending_timeout_last == pos) pending_timeout_last = NULL; queue_ready_task (pos); @@ -1957,7 +1699,7 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us > DELAY_THRESHOLD.rel_value_us) { - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Task %p took %s to be scheduled\n", pos, GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), @@ -1979,8 +1721,8 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))) GNUNET_NETWORK_fdset_set_native (sh->ws, pos->write_fd); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Running task: %p\n", + LOG (GNUNET_ERROR_TYPE_WARNING, + "Running task from driver: %p\n", pos); pos->callback (pos->callback_cls); active_task = NULL; @@ -1988,6 +1730,8 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) destroy_task (pos); tasks_run++; } + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); if (0 == ready_count) return GNUNET_NO; return GNUNET_OK; @@ -2012,11 +1756,10 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) */ int GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, - GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + GNUNET_SCHEDULER_TaskCallback task, + void *task_cls) { int ret; - struct GNUNET_SCHEDULER_Handle sh; struct GNUNET_SIGNAL_Context *shc_int; struct GNUNET_SIGNAL_Context *shc_term; #if (SIGTERM != GNUNET_TERM_SIG) @@ -2029,7 +1772,6 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, #endif struct GNUNET_SCHEDULER_Task tsk; const struct GNUNET_DISK_FileHandle *pr; - scheduler_driver = driver; /* general set-up */ GNUNET_assert (NULL == active_task); @@ -2041,11 +1783,11 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, GNUNET_assert (NULL != shutdown_pipe_handle); pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, GNUNET_DISK_PIPE_END_READ); - GNUNET_assert (NULL != pr); my_pid = getpid (); + scheduler_driver = driver; /* install signal handlers */ - LOG (GNUNET_ERROR_TYPE_DEBUG, + LOG (GNUNET_ERROR_TYPE_WARNING, "Registering signal handlers\n"); shc_int = GNUNET_SIGNAL_handler_install (SIGINT, &sighandler_shutdown); @@ -2071,7 +1813,6 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, 0, sizeof (tsk)); active_task = &tsk; - tsk.sh = &sh; GNUNET_SCHEDULER_add_with_reason_and_priority (task, task_cls, GNUNET_SCHEDULER_REASON_STARTUP, @@ -2079,17 +1820,19 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, GNUNET_SCHEDULER_add_now_with_lifeness (GNUNET_NO, &GNUNET_OS_install_parent_control_handler, NULL); + GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, + pr, + &shutdown_task, + NULL); active_task = NULL; - driver->set_wakeup (driver->cls, - GNUNET_TIME_absolute_get ()); - /* begin main event loop */ sh.rs = GNUNET_NETWORK_fdset_create (); sh.ws = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_handle_set (sh.rs, pr); - sh.driver = driver; + //GNUNET_NETWORK_fdset_handle_set (sh.rs, pr); ret = driver->loop (driver->cls, &sh); + LOG (GNUNET_ERROR_TYPE_WARNING, + "loop finished!"); GNUNET_NETWORK_fdset_destroy (sh.rs); GNUNET_NETWORK_fdset_destroy (sh.ws); @@ -2109,29 +1852,249 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, return ret; } + int -select_add(void *cls, - struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_SCHEDULER_FdInfo *fdi) +select_add (void *cls, + struct GNUNET_SCHEDULER_Task *task, + struct GNUNET_SCHEDULER_FdInfo *fdi) { + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + + if (!((NULL != fdi->fd) ^ (NULL != fdi->fh)) || (0 >= fdi->sock)) + { + /* exactly one out of {fd, hf} must be != NULL and the OS handle must be valid */ + return GNUNET_SYSERR; + } + + struct Scheduled *scheduled = GNUNET_new (struct Scheduled); + scheduled->task = task; + scheduled->fdi = fdi; + + switch (fdi->et) + { + case GNUNET_SCHEDULER_ET_IN: + { + GNUNET_CONTAINER_DLL_insert (context->scheduled_in_head, + context->scheduled_in_tail, + scheduled); + break; + } + case GNUNET_SCHEDULER_ET_OUT: + { + GNUNET_CONTAINER_DLL_insert (context->scheduled_out_head, + context->scheduled_out_tail, + scheduled); + break; + } + default: + { + // FIXME: other event types not implemented yet + GNUNET_assert (0); + } + } return GNUNET_OK; } int -select_del(void *cls, - struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_SCHEDULER_FdInfo *fdi) +select_del (void *cls, + struct GNUNET_SCHEDULER_Task *task, + struct GNUNET_SCHEDULER_FdInfo *fdi) { - return GNUNET_OK; + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + + int ret = GNUNET_SYSERR; + struct Scheduled *pos; + switch (fdi->et) + { + case GNUNET_SCHEDULER_ET_IN: + { + for (pos = context->scheduled_in_head; NULL != pos; pos = pos->next) + { + if (pos->task == task) + { + GNUNET_CONTAINER_DLL_remove (context->scheduled_in_head, + context->scheduled_in_tail, + pos); + ret = GNUNET_OK; + } + } + break; + } + case GNUNET_SCHEDULER_ET_OUT: + { + for (pos = context->scheduled_out_head; NULL != pos; pos = pos->next) + { + if (pos->task == task) + { + GNUNET_CONTAINER_DLL_remove (context->scheduled_out_head, + context->scheduled_out_tail, + pos); + ret = GNUNET_OK; + } + } + break; + } + default: + { + // FIXME: other event types not implemented yet + GNUNET_assert (0); + } + } + return ret; } +//int +//select_loop_condition (const struct DriverContext *context) +//{ +// struct GNUNET_TIME_absolute_ +//} + + int -select_loop(void *cls, - struct GNUNET_SCHEDULER_Handle *sh) +select_loop (void *cls, + struct GNUNET_SCHEDULER_Handle *sh) { - return while_live(sh->rs, sh->ws); + struct GNUNET_NETWORK_FDSet *rs; + struct GNUNET_NETWORK_FDSet *ws; + struct DriverContext *context; + int select_result; + unsigned long long last_tr; + unsigned int busy_wait_warning; + + context = cls; + GNUNET_assert (NULL != context); + rs = GNUNET_NETWORK_fdset_create (); + ws = GNUNET_NETWORK_fdset_create (); + last_tr = 0; + busy_wait_warning = 0; + while ((NULL != context->scheduled_in_head) || + (NULL != context->scheduled_out_head)) + { + GNUNET_NETWORK_fdset_zero (rs); + GNUNET_NETWORK_fdset_zero (ws); + struct Scheduled *pos; + for (pos = context->scheduled_in_head; NULL != pos; pos = pos->next) + { + GNUNET_NETWORK_fdset_set_native (rs, pos->fdi->sock); + } + for (pos = context->scheduled_out_head; NULL != pos; pos = pos->next) + { + GNUNET_NETWORK_fdset_set_native (ws, pos->fdi->sock); + } + if (ready_count > 0) + { + /* no blocking, more work already ready! */ + context->timeout = GNUNET_TIME_UNIT_ZERO; + } + if (NULL == scheduler_select) + { + select_result = GNUNET_NETWORK_socket_select (rs, + ws, + NULL, + context->timeout); + } + else + { + select_result = scheduler_select (scheduler_select_cls, + rs, + ws, + NULL, + context->timeout); + } + if (select_result == GNUNET_SYSERR) + { + if (errno == EINTR) + continue; + + LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select"); +#ifndef MINGW +#if USE_LSOF + char lsof[512]; + + snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ()); + (void) close (1); + (void) dup2 (2, 1); + if (0 != system (lsof)) + LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, + "system"); +#endif +#endif +#if DEBUG_FDS + struct GNUNET_SCHEDULER_Task *t; + for (t = pending_head; NULL != t; t = t->next) + { + if (-1 != t->read_fd) + { + int flags = fcntl (t->read_fd, F_GETFD); + if ((flags == -1) && (errno == EBADF)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Got invalid file descriptor %d!\n", + t->read_fd); + dump_backtrace (t); + } + } + if (-1 != t->write_fd) + { + int flags = fcntl (t->write_fd, F_GETFD); + if ((flags == -1) && (errno == EBADF)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Got invalid file descriptor %d!\n", + t->write_fd); + dump_backtrace (t); + } + } + } +#endif + GNUNET_assert (0); + return GNUNET_SYSERR; + } + if ( (0 == select_result) && + (0 == context->timeout.rel_value_us) && + (busy_wait_warning > 16) ) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Looks like we're busy waiting...\n"); + short_wait (100); /* mitigate */ + } + for (pos = context->scheduled_in_head; NULL != pos; pos = pos->next) + { + if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, pos->fdi->sock)) + { + GNUNET_SCHEDULER_task_ready (pos->task, GNUNET_SCHEDULER_ET_IN); + GNUNET_CONTAINER_DLL_remove (context->scheduled_in_head, + context->scheduled_in_tail, + pos); + } + } + for (pos = context->scheduled_out_head; NULL != pos; pos = pos->next) + { + if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, pos->fdi->sock)) + { + GNUNET_SCHEDULER_task_ready (pos->task, GNUNET_SCHEDULER_ET_OUT); + GNUNET_CONTAINER_DLL_remove (context->scheduled_out_head, + context->scheduled_out_tail, + pos); + } + } + GNUNET_SCHEDULER_run_from_driver (sh); + if (last_tr == tasks_run) + { + short_wait (1); + busy_wait_warning++; + } + else + { + last_tr = tasks_run; + busy_wait_warning = 0; + } + } + return GNUNET_OK; } @@ -2139,7 +2102,10 @@ void select_set_wakeup(void *cls, struct GNUNET_TIME_Absolute dt) { - + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + + context->timeout = GNUNET_TIME_absolute_get_remaining (dt); } @@ -2148,12 +2114,10 @@ select_set_wakeup(void *cls, * * @return NULL on error */ -const struct GNUNET_SCHEDULER_Driver * +struct GNUNET_SCHEDULER_Driver * GNUNET_SCHEDULER_driver_select () { - struct GNUNET_SCHEDULER_Driver *select_driver; - select_driver = GNUNET_new (struct GNUNET_SCHEDULER_Driver); select_driver->loop = &select_loop; @@ -2161,7 +2125,6 @@ GNUNET_SCHEDULER_driver_select () select_driver->del = &select_del; select_driver->set_wakeup = &select_set_wakeup; - return select_driver; } -- 2.25.1