tasks in the pending queue must be be checked for reached timeouts, too; allow multip...
authorlurchi <lurchi@strangeplace.net>
Wed, 23 Aug 2017 13:44:52 +0000 (15:44 +0200)
committerlurchi <lurchi@strangeplace.net>
Wed, 23 Aug 2017 13:44:52 +0000 (15:44 +0200)
src/util/scheduler.c

index 71a3bd9563ba3865b0f15b2b729126fff7ae8c6e..084ca43df7132fc1fc7242637ec3bac7af525800 100644 (file)
@@ -120,7 +120,7 @@ struct GNUNET_SCHEDULER_Task
   /**
    * Information about which FDs are ready for this task (and why).
    */
-  const struct GNUNET_SCHEDULER_FdInfo *fds;
+  struct GNUNET_SCHEDULER_FdInfo *fds;
 
   /**
    * Storage location used for @e fds if we want to avoid
@@ -229,6 +229,10 @@ struct Scheduled
   struct GNUNET_SCHEDULER_Task *task;
 
   struct GNUNET_SCHEDULER_FdInfo *fdi;
+
+  enum GNUNET_SCHEDULER_EventType et;
+
+  int is_ready;
 };
 
 
@@ -237,13 +241,9 @@ struct Scheduled
  */
 struct DriverContext
 {
-  struct Scheduled *scheduled_in_head;
-
-  struct Scheduled *scheduled_in_tail;
+  struct Scheduled *scheduled_head;
 
-  struct Scheduled *scheduled_out_head;
-
-  struct Scheduled *scheduled_out_tail;
+  struct Scheduled *scheduled_tail;
 
   struct GNUNET_TIME_Relative timeout;
 };
@@ -480,6 +480,27 @@ GNUNET_SCHEDULER_shutdown ()
 }
 
 
+/**
+ * Output stack trace of task @a t.
+ *
+ * @param t task to dump stack trace of
+ */
+static void
+dump_backtrace (struct GNUNET_SCHEDULER_Task *t)
+{
+#if EXECINFO
+  unsigned int i;
+
+  for (i = 0; i < t->num_backtrace_strings; i++)
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+   "Task %p trace %u: %s\n",
+   t,
+   i,
+   t->backtrace_strings[i]);
+#endif
+}
+
+
 /**
  * Destroy a task (release associated resources)
  *
@@ -523,27 +544,6 @@ destroy_task (struct GNUNET_SCHEDULER_Task *t)
 }
 
 
-/**
- * Output stack trace of task @a t.
- *
- * @param t task to dump stack trace of
- */
-static void
-dump_backtrace (struct GNUNET_SCHEDULER_Task *t)
-{
-#if EXECINFO
-  unsigned int i;
-
-  for (i = 0; i < t->num_backtrace_strings; i++)
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-   "Task %p trace %u: %s\n",
-   t,
-   i,
-   t->backtrace_strings[i]);
-#endif
-}
-
-
 /**
  * Pipe used to communicate shutdown via signal.
  */
@@ -645,6 +645,34 @@ GNUNET_SCHEDULER_check_lifeness ()
 }
 
 
+void
+shutdown_if_no_lifeness ()
+{
+  struct GNUNET_SCHEDULER_Task *t;
+
+  if (ready_count > 0)
+    return;
+  for (t = pending_head; NULL != t; t = t->next)
+    if (GNUNET_YES == t->lifeness)
+      return;
+  for (t = shutdown_head; NULL != t; t = t->next)
+    if (GNUNET_YES == t->lifeness)
+      return;
+  for (t = pending_timeout_head; NULL != t; t = t->next)
+    if (GNUNET_YES == t->lifeness)
+      return;
+  /* No lifeness! Cancel all pending tasks the driver knows about and shutdown */
+  t = pending_head;
+  while (NULL != t)
+  {
+    struct GNUNET_SCHEDULER_Task *next = t->next;
+    GNUNET_SCHEDULER_cancel (t);
+    t = next;
+  }
+  GNUNET_SCHEDULER_shutdown ();
+}
+
+
 /**
  * Initialize and run scheduler.  This function will return when all
  * tasks have completed.  On systems with signals, receiving a SIGTERM
@@ -664,10 +692,8 @@ GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task,
                       void *task_cls)
 {
   struct GNUNET_SCHEDULER_Driver *driver;
-  struct DriverContext context = {.scheduled_in_head = NULL,
-                                  .scheduled_in_tail = NULL,
-                                  .scheduled_out_head = NULL,
-                                  .scheduled_out_tail = NULL,
+  struct DriverContext context = {.scheduled_head = NULL,
+                                  .scheduled_tail = NULL,
                                   .timeout = GNUNET_TIME_UNIT_FOREVER_REL};
   
   driver = GNUNET_SCHEDULER_driver_select ();
@@ -731,6 +757,9 @@ init_fd_info (struct GNUNET_SCHEDULER_Task *t,
               const struct GNUNET_DISK_FileHandle *const *write_fh,
               unsigned int write_fh_len)
 {
+  // FIXME: if we have exactly two network handles / exactly two file handles
+  // and they are equal, we can make one FdInfo with both
+  // GNUNET_SCHEDULER_ET_IN and GNUNET_SCHEDULER_ET_OUT set.
   struct GNUNET_SCHEDULER_FdInfo *fdi;
 
   t->fds_len = read_nh_len + write_nh_len + read_fh_len + write_fh_len;
@@ -818,20 +847,39 @@ init_fd_info (struct GNUNET_SCHEDULER_Task *t,
 }
 
 
-void scheduler_multi_function_call(struct GNUNET_SCHEDULER_Task *t, int (*driver_func)())
+/**
+ * calls the given function @a func on each FdInfo related to @a t.
+ * Optionally updates the event type field in each FdInfo after calling
+ * @a func.
+ *
+ * @param t the task
+ * @param driver_func the function to call with each FdInfo contained in
+ *                    in @a t
+ * @param if_not_ready only call @a driver_func on FdInfos that are not
+ *                     ready
+ * @param et the event type to be set in each FdInfo after calling
+ *           @a driver_func on it, or -1 if no updating not desired.
+ */
+void scheduler_multi_function_call (struct GNUNET_SCHEDULER_Task *t,
+                                    int (*driver_func)(),
+                                    int if_not_ready,
+                                    enum GNUNET_SCHEDULER_EventType et)
 {
+  struct GNUNET_SCHEDULER_FdInfo *fdi;
   int success = GNUNET_YES;
-  if (t->fds_len > 1)
+
+  for (int i = 0; i != t->fds_len; ++i)
   {
-    for (int i = 0; i < t->fds_len;i++)
+    fdi = &t->fds[i];
+    if ((GNUNET_NO == if_not_ready) || (GNUNET_SCHEDULER_ET_NONE == fdi->et))
     {
-      success = driver_func (scheduler_driver->cls, t , t->fds+i) && success;
+      success = driver_func (scheduler_driver->cls, t, fdi) && success;
+      if (et != -1)
+      {
+        fdi->et = et;
+      }
     }
   }
-  else
-  {
-    success = driver_func (scheduler_driver->cls, t , t->fds);
-  }
   if (GNUNET_YES != success)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -841,7 +889,7 @@ void scheduler_multi_function_call(struct GNUNET_SCHEDULER_Task *t, int (*driver
 
 
 void
-shutdown_task (void *cls)
+shutdown_cb (void *cls)
 {
   char c;
   const struct GNUNET_DISK_FileHandle *pr;
@@ -900,7 +948,7 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task)
       GNUNET_CONTAINER_DLL_remove (pending_head,
                                    pending_tail,
                                    task);
-      scheduler_multi_function_call(task, scheduler_driver->del);
+      scheduler_multi_function_call(task, scheduler_driver->del, GNUNET_NO, -1);
     }
   }
   else
@@ -911,9 +959,6 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task)
                                  task);
     ready_count--;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Canceling task %p\n",
-       task);
   ret = task->callback_cls;
   destroy_task (task);
   return ret;
@@ -1307,11 +1352,10 @@ add_without_sets (struct GNUNET_TIME_Relative delay,
                   void *task_cls)
 {
   struct GNUNET_SCHEDULER_Task *t;
-  
+
   GNUNET_assert (NULL != active_task);
   GNUNET_assert (NULL != task);
   t = GNUNET_new (struct GNUNET_SCHEDULER_Task);
-
   init_fd_info (t,
                 &read_nh,
                 read_nh ? 1 : 0,
@@ -1338,7 +1382,7 @@ add_without_sets (struct GNUNET_TIME_Relative delay,
   GNUNET_CONTAINER_DLL_insert (pending_head,
                                pending_tail,
                                t);
-  scheduler_multi_function_call (t, scheduler_driver->add);
+  scheduler_multi_function_call (t, scheduler_driver->add, GNUNET_NO, GNUNET_SCHEDULER_ET_NONE);
   max_priority_added = GNUNET_MAX (max_priority_added,
                                    t->priority);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1785,7 +1829,7 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio,
   GNUNET_CONTAINER_DLL_insert (pending_head,
                                pending_tail,
                                t);
-  scheduler_multi_function_call (t, scheduler_driver->add);
+  scheduler_multi_function_call (t, scheduler_driver->add, GNUNET_NO, GNUNET_SCHEDULER_ET_NONE);
   max_priority_added = GNUNET_MAX (max_priority_added,
            t->priority);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1825,10 +1869,6 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task,
     reason |= GNUNET_SCHEDULER_REASON_WRITE_READY;
   reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE;
   task->reason = reason;
-  if (task->fds_len > 1)
-  {
-    GNUNET_array_append (task->ready_fds, task->ready_fds_len, *fdi); 
-  }
   if (GNUNET_NO == task->in_ready_list)
   {
     GNUNET_CONTAINER_DLL_remove (pending_head,
@@ -1858,11 +1898,9 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task,
 int
 GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh)
 {
-  // FIXME: call check_lifeness here!
   enum GNUNET_SCHEDULER_Priority p;
   struct GNUNET_SCHEDULER_Task *pos;
   struct GNUNET_TIME_Absolute now;
-  int i;
 
   /* check for tasks that reached the timeout! */
   now = GNUNET_TIME_absolute_get ();
@@ -1879,9 +1917,27 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh)
       pending_timeout_last = NULL;
     queue_ready_task (pos);
   }
+  pos = pending_head;
+  while (NULL != pos)
+  {
+    struct GNUNET_SCHEDULER_Task *next = pos->next;
+    if (now.abs_value_us >= pos->timeout.abs_value_us)
+    {
+      pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT;
+      GNUNET_CONTAINER_DLL_remove (pending_head,
+                                   pending_tail,
+                                   pos);
+      queue_ready_task (pos);
+    }
+    pos = next;
+  }
 
   if (0 == ready_count)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "no tasks run!\n");
     return GNUNET_NO;
+  }
 
   /* find out which task priority level we are going to
      process this time */
@@ -1921,41 +1977,36 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh)
     tc.reason = pos->reason;
     GNUNET_NETWORK_fdset_zero (sh->rs);
     GNUNET_NETWORK_fdset_zero (sh->ws);
-    tc.fds_len = pos->ready_fds_len;
-    tc.fds = pos->ready_fds;
-    for (i = 0; i != pos->ready_fds_len; ++i)
+    // FIXME: do we have to remove FdInfos from fds if they are not ready?
+    tc.fds_len = pos->fds_len;
+    tc.fds = pos->fds;
+    for (int i = 0; i != pos->fds_len; ++i)
     {
-      struct GNUNET_SCHEDULER_FdInfo *fdi = pos->ready_fds + i;
-      if (GNUNET_SCHEDULER_ET_IN == fdi->et)
+      struct GNUNET_SCHEDULER_FdInfo *fdi = &pos->fds[i];
+      if (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et))
       {
         GNUNET_NETWORK_fdset_set_native (sh->rs,
                                          fdi->sock);
       }
-      else if (GNUNET_SCHEDULER_ET_OUT == fdi->et)
+      if (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et))
       {
         GNUNET_NETWORK_fdset_set_native (sh->ws,
                                          fdi->sock);
       }
     }
-    if ( (-1 != pos->read_fd) &&
-         (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY)) )
-      GNUNET_NETWORK_fdset_set_native (sh->rs,
-                                       pos->read_fd);
-    if ( (-1 != pos->write_fd) &&
-         (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) )
-      GNUNET_NETWORK_fdset_set_native (sh->ws,
-                                       pos->write_fd);
     tc.read_ready = sh->rs;
     tc.write_ready = sh->ws;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Running task %p\n",
          pos);
     pos->callback (pos->callback_cls);
+    scheduler_multi_function_call (pos, scheduler_driver->del, GNUNET_YES, -1);
     active_task = NULL;
     dump_backtrace (pos);
     destroy_task (pos);
     tasks_run++;
   }
+  shutdown_if_no_lifeness ();
   if (0 == ready_count)
   {
     scheduler_driver->set_wakeup (scheduler_driver->cls,
@@ -2047,7 +2098,7 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver,
                             NULL);
   GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
                                   pr,
-                                  &shutdown_task,
+                                  &shutdown_cb,
                                   NULL);
   current_lifeness = GNUNET_YES;
   GNUNET_SCHEDULER_add_with_reason_and_priority (task,
@@ -2093,8 +2144,10 @@ select_add (void *cls,
   GNUNET_assert (NULL != context);
   GNUNET_assert (NULL != task);
   GNUNET_assert (NULL != fdi);
+  GNUNET_assert (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et) ||
+                 0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et));
 
-  if (!((NULL != fdi->fd) ^ (NULL != fdi->fh)) || (0 > fdi->sock))
+  if (!((NULL != fdi->fd) ^ (NULL != fdi->fh)) || (fdi->sock < 0))
   {
     /* exactly one out of {fd, hf} must be != NULL and the OS handle must be valid */
     return GNUNET_SYSERR;
@@ -2103,29 +2156,31 @@ select_add (void *cls,
   struct Scheduled *scheduled = GNUNET_new (struct Scheduled);
   scheduled->task = task;
   scheduled->fdi = fdi;
-
-  switch (fdi->et)
-  {
-  case GNUNET_SCHEDULER_ET_IN:
-  {
-    GNUNET_CONTAINER_DLL_insert (context->scheduled_in_head,
-                                 context->scheduled_in_tail,
-                                 scheduled);
-    break;
-  }
-  case GNUNET_SCHEDULER_ET_OUT:
-  {
-    GNUNET_CONTAINER_DLL_insert (context->scheduled_out_head,
-                                 context->scheduled_out_tail,
-                                 scheduled);
-    break;
-  }
-  default:
-  {
-    // FIXME: other event types not implemented yet
-    GNUNET_assert (0);
-  }
-  }
+  scheduled->et = fdi->et;
+
+  GNUNET_CONTAINER_DLL_insert (context->scheduled_head,
+                               context->scheduled_tail,
+                               scheduled);
+  //if (0 != (GNUNET_SCHEDULER_ET_IN & scheduled->et))
+  //{
+  //  GNUNET_CONTAINER_DLL_insert (context->scheduled_in_head,
+  //                               context->scheduled_in_tail,
+  //                               scheduled);
+  //}
+  //if (0 != (GNUNET_SCHEDULER_ET_OUT & scheduled->et))
+  //{
+  //  GNUNET_CONTAINER_DLL_insert (context->scheduled_out_head,
+  //                           context->scheduled_out_tail,
+  //                           scheduled);
+  //}
+  //if (0 != (GNUNET_SCHEDULER_ET_HUP & scheduled->et) ||
+  //    0 != (GNUNET_SCHEDULER_ET_ERR & scheduled->et) ||
+  //    0 != (GNUNET_SCHEDULER_ET_PRI & scheduled->et) ||
+  //    0 != (GNUNET_SCHEDULER_ET_NVAL & scheduled->et))
+  //{
+  //  // FIXME: other event types not implemented yet
+  //  GNUNET_assert (0);
+  //}
   return GNUNET_OK;
 }
 
@@ -2135,47 +2190,24 @@ select_del (void *cls,
             struct GNUNET_SCHEDULER_Task *task,
             struct GNUNET_SCHEDULER_FdInfo *fdi)
 {
-  struct DriverContext *context = cls;
-  GNUNET_assert (NULL != context);
-
-  int ret = GNUNET_SYSERR;
+  struct DriverContext *context;
   struct Scheduled *pos;
-  // FIXME: are multiple ORed event types allowed?
-  switch (fdi->et)
-  {
-  case GNUNET_SCHEDULER_ET_IN:
-  {
-    for (pos = context->scheduled_in_head; NULL != pos; pos = pos->next)
-    {
-      if (pos->task == task)
-      {
-        GNUNET_CONTAINER_DLL_remove (context->scheduled_in_head,
-                                     context->scheduled_in_tail,
-                                     pos);
-        ret = GNUNET_OK;
-      }
-    }
-    break; 
-  }
-  case GNUNET_SCHEDULER_ET_OUT:
+  int ret;
+
+  GNUNET_assert (NULL != cls);
+
+  context = cls;
+  ret = GNUNET_SYSERR;
+  for (pos = context->scheduled_head; NULL != pos; pos = pos->next)
   {
-    for (pos = context->scheduled_out_head; NULL != pos; pos = pos->next)
+    if (pos->task == task && pos->fdi == fdi)
     {
-      if (pos->task == task)
-      {
-        GNUNET_CONTAINER_DLL_remove (context->scheduled_out_head,
-                                     context->scheduled_out_tail,
-                                     pos);
-        ret = GNUNET_OK;
-      }
+      GNUNET_CONTAINER_DLL_remove (context->scheduled_head,
+                                   context->scheduled_tail,
+                                   pos);
+      ret = GNUNET_OK;
+      break;
     }
-    break;
-  }
-  default:
-  {
-    // FIXME: other event types not implemented yet
-    GNUNET_assert (0);
-  }
   }
   return ret;
 }
@@ -2189,6 +2221,7 @@ select_loop (void *cls,
   struct GNUNET_NETWORK_FDSet *ws;
   struct DriverContext *context;
   int select_result;
+  int tasks_ready;
   unsigned long long last_tr;
   unsigned int busy_wait_warning;
   
@@ -2196,11 +2229,10 @@ select_loop (void *cls,
   GNUNET_assert (NULL != context);
   rs = GNUNET_NETWORK_fdset_create ();
   ws = GNUNET_NETWORK_fdset_create ();
+  tasks_ready = GNUNET_NO;
   last_tr = 0;
   busy_wait_warning = 0;
-  // FIXME: remove check_lifeness, instead the condition should be:
-  // pending_in_head != NULL || pending_out_head != NULL || tasks_ready
-  while (GNUNET_YES == GNUNET_SCHEDULER_check_lifeness ())
+  while (NULL != context->scheduled_head || GNUNET_YES == tasks_ready)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "[%p] timeout = %s\n",
@@ -2210,13 +2242,16 @@ select_loop (void *cls,
     GNUNET_NETWORK_fdset_zero (rs);
     GNUNET_NETWORK_fdset_zero (ws);
     struct Scheduled *pos;
-    for (pos = context->scheduled_in_head; NULL != pos; pos = pos->next)
-    {
-      GNUNET_NETWORK_fdset_set_native (rs, pos->fdi->sock);
-    }
-    for (pos = context->scheduled_out_head; NULL != pos; pos = pos->next)
+    for (pos = context->scheduled_head; NULL != pos; pos = pos->next)
     {
-      GNUNET_NETWORK_fdset_set_native (ws, pos->fdi->sock);
+      if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et))
+      {
+        GNUNET_NETWORK_fdset_set_native (rs, pos->fdi->sock);
+      }
+      if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et))
+      {
+        GNUNET_NETWORK_fdset_set_native (ws, pos->fdi->sock);
+      }
     }
     if (NULL == scheduler_select)
     {
@@ -2235,6 +2270,8 @@ select_loop (void *cls,
     }
     if (select_result == GNUNET_SYSERR)
     {
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "select_result = GNUNET_SYSERR\n");
       if (errno == EINTR)
         continue;
 
@@ -2293,27 +2330,30 @@ select_loop (void *cls,
       //GNUNET_assert (0);
       short_wait (100);                /* mitigate */
     }
-    for (pos = context->scheduled_in_head; NULL != pos; pos = pos->next)
+    for (pos = context->scheduled_head; NULL != pos; pos = pos->next)
     {
-      if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, pos->fdi->sock))
+      int is_ready = GNUNET_NO;
+      if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et) &&
+          GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, pos->fdi->sock))
       {
-        GNUNET_CONTAINER_DLL_remove (context->scheduled_in_head,
-                                     context->scheduled_in_tail,
-                                     pos);
-        GNUNET_SCHEDULER_task_ready (pos->task, pos->fdi);
+        pos->fdi->et |= GNUNET_SCHEDULER_ET_IN;
+        is_ready = GNUNET_YES;
       }
-    }
-    for (pos = context->scheduled_out_head; NULL != pos; pos = pos->next)
-    {
-      if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, pos->fdi->sock))
+      if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et) &&
+          GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, pos->fdi->sock))
+      {
+        pos->fdi->et |= GNUNET_SCHEDULER_ET_OUT;
+        is_ready = GNUNET_YES;
+      }
+      if (GNUNET_YES == is_ready)
       {
-        GNUNET_CONTAINER_DLL_remove (context->scheduled_out_head,
-                                     context->scheduled_out_tail,
+        GNUNET_CONTAINER_DLL_remove (context->scheduled_head,
+                                     context->scheduled_tail,
                                      pos);
         GNUNET_SCHEDULER_task_ready (pos->task, pos->fdi);
       }
     }
-    int tasks_ready = GNUNET_SCHEDULER_run_from_driver (sh);
+    tasks_ready = GNUNET_SCHEDULER_run_from_driver (sh);
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "[%p] tasks_ready: %d\n",
          sh,