WIP commit of scalar product 2.0. It is unfinished and does not yet pass tests. This...
[oweals/gnunet.git] / src / testbed / gnunet_testbed_mpi_spawn.c
index c147d460ba9458369c26791bcbc6a167c1905c63..41d8054711d460b28f978d725264dcf68eb10277 100644 (file)
@@ -1,13 +1,13 @@
 #include "platform.h"
 #include "gnunet_util_lib.h"
-#include "gnunet_resolver_service.h"
-#include <mpi.h>
+#include "gnunet_testbed_service.h"
+
 
 /**
  * Generic logging shorthand
  */
-#define LOG(kind,...)                                           \
-  GNUNET_log_from (kind, "gnunet-mpi-test", __VA_ARGS__)
+#define LOG(kind,...)                           \
+  GNUNET_log (kind, __VA_ARGS__)
 
 /**
  * Debug logging shorthand
   LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
 
 /**
- * Timeout for resolving IPs
+ * Global result
  */
-#define RESOLVE_TIMEOUT                         \
-  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+static int ret;
 
 /**
- * Log an error message at log-level 'level' that indicates
- * a failure of the command 'cmd' with the message given
- * by gcry_strerror(rc).
+ * The child process we spawn
  */
-#define LOG_GAI(level, cmd, rc) do { LOG(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, gai_strerror(rc)); } while(0)
+static struct GNUNET_OS_Process *child;
 
 /**
- * Global result
+ * The arguments including the binary to spawn
  */
-static int ret;
+static char **argv2;
 
 /**
- * The array of hostnames
+ * Pipe used to communicate shutdown via signal.
  */
-static char **hostnames;
+static struct GNUNET_DISK_PipeHandle *sigpipe;
 
 /**
- * The array of host's addresses
+ * Filename of the unique file
  */
-static char **hostaddrs;
+static char *fn;
 
 /**
- * The resolution request handles; one per each hostname resolution
+ * Handle to the unique file
  */
-struct GNUNET_RESOLVER_RequestHandle **rhs;
+static int fh;
 
 /**
- * Number of hosts in the hostname array
+ * The return code of the binary
  */
-static unsigned int nhosts;
+static unsigned long child_exit_code;
 
 /**
- * Number of addresses in the hostaddr array
+ * The process status of the child
  */
-static unsigned int nhostaddrs;
+static enum GNUNET_OS_ProcessStatusType child_status;
 
 /**
- * Did we connect to the resolver service
+ * The shutdown task
  */
-static unsigned int resolver_connected;
+static GNUNET_SCHEDULER_TaskIdentifier shutdown_task_id;
 
 /**
- * Task for resolving ips
+ * Task to kill the child
  */
-static GNUNET_SCHEDULER_TaskIdentifier resolve_task_id;
+static GNUNET_SCHEDULER_TaskIdentifier terminate_task_id;
 
+/**
+ * Task to kill the child
+ */
+static GNUNET_SCHEDULER_TaskIdentifier child_death_task_id;
 
 /**
- * Resolves the hostnames array
- *
- * @param cls NULL
- * @param tc the scheduler task context
+ * The shutdown task
  */
 static void
-resolve_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct addrinfo hint;
-  const struct sockaddr_in *in_addr; 
-  struct addrinfo *res;
-  char *hostip;
-  unsigned int host;
-  unsigned int rc;
-
-  resolve_task_id = GNUNET_SCHEDULER_NO_TASK;
-  hint.ai_family = AF_INET;    /* IPv4 */
-  hint.ai_socktype = 0;
-  hint.ai_protocol = 0;
-  hint.ai_addrlen = 0;
-  hint.ai_addr = NULL;
-  hint.ai_canonname = NULL;
-  hint.ai_next = NULL;
-  hint.ai_flags = AI_NUMERICSERV;
-  for (host = 0; host < nhosts; host++)
+  shutdown_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != child_exit_code)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Child exited with error code: %lu\n",
+         child_exit_code);
+    ret = 128 + (int) child_exit_code;
+  }
+  if (0 != fh)
+  {
+    close (fh);
+  }
+  if ((NULL != fn) && (0 != unlink (fn)))
+  {
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "open");
+    ret = GNUNET_SYSERR;
+  }
+}
+
+
+static void
+terminate_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  static int hard_kill;
+
+  GNUNET_assert (NULL != child);
+  terminate_task_id =
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &terminate_task, NULL);
+  if (0 != hard_kill)
   {
-    res = NULL;
-    LOG_DEBUG ("Resolving: %s host\n", hostnames[host]);
-    if (0 != (rc = getaddrinfo (hostnames[host], "22", &hint, &res)))
+    switch (hard_kill)
     {
-      LOG_GAI (GNUNET_ERROR_TYPE_ERROR, "getaddrinfo", rc);
-      ret = GNUNET_SYSERR;
+    case 1:
+    case 2:
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+           "%d more interrupts needed to send SIGKILL to the child\n",
+           3 - hard_kill);
+      hard_kill++;
+      return;
+    case 3:
+      GNUNET_break (0 == GNUNET_OS_process_kill (child, SIGKILL));
       return;
     }
-    GNUNET_assert (NULL != res);
-    GNUNET_assert (NULL != res->ai_addr);
-    GNUNET_assert (sizeof (struct sockaddr_in) == res->ai_addrlen);
-    in_addr = (const struct sockaddr_in *) res->ai_addr;
-    hostip = inet_ntoa (in_addr->sin_addr);
-    GNUNET_assert (NULL != hostip);
-    GNUNET_array_append (hostaddrs, nhostaddrs, GNUNET_strdup (hostip));
-    LOG_DEBUG ("%s --> %s\n", hostnames[host], hostaddrs[host]);
-    freeaddrinfo (res);
   }
-  ret = GNUNET_OK;
+  hard_kill++;
+  GNUNET_break (0 == GNUNET_OS_process_kill (child, GNUNET_TERM_SIG));
+  LOG (GNUNET_ERROR_TYPE_INFO, _("Waiting for child to exit.\n"));
 }
 
 
 /**
- * Loads the set of host allocated by the LoadLeveler Job Scheduler.  This
- * function is only available when compiled with support for LoadLeveler and is
- * used for running on the SuperMUC
+ * Task triggered whenever we receive a SIGCHLD (child
+ * process died).
  *
- * @param hostlist set to the hosts found in the file; caller must free this if
- *          number of hosts returned is greater than 0
- * @return number of hosts returned in 'hosts', 0 on error
+ * @param cls closure, NULL if we need to self-restart
+ * @param tc context
  */
-unsigned int
-get_loadleveler_hosts ()
+static void
+child_death_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  const char *hostfile;
-  char *buf;
-  char *hostname;
-  struct addrinfo *ret;
-  struct addrinfo hint;
-  ssize_t rsize;
-  uint64_t size;
-  uint64_t offset;
-  enum {
-    SCAN,
-    SKIP,
-    TRIM,
-    READHOST
-  } pstep;
-  unsigned int host;
+  const struct GNUNET_DISK_FileHandle *pr;
+  char c[16];
 
-  if (NULL == (hostfile = getenv ("MP_SAVEHOSTFILE")))
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  if (GNUNET_SYSERR == GNUNET_DISK_file_size (hostfile, &size, GNUNET_YES,
-                                              GNUNET_YES))
+  pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+  child_death_task_id = GNUNET_SCHEDULER_NO_TASK;
+  if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
   {
-    GNUNET_break (0);
-    return 0;
-  }
-  if (0 == size)
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  buf = GNUNET_malloc (size + 1);
-  rsize = GNUNET_DISK_fn_read (hostfile, buf, (size_t) size);
-  if ( (GNUNET_SYSERR == rsize) || ((ssize_t) size != rsize) )
-  {
-    GNUNET_free (buf);
-    GNUNET_break (0);
-    return 0;
-  }
-  size++;
-  offset = 0;
-  pstep = SCAN;
-  hostname = NULL;
-  while (offset < size)
-  {
-    switch (pstep)
-    {
-    case SCAN:
-      if ('!' == buf[offset])
-        pstep = SKIP;
-      else 
-        pstep = TRIM;
-      break;
-    case SKIP:
-      if ('\n' == buf[offset])
-        pstep = SCAN;
-      break;
-    case TRIM:
-      if ('!' == buf[offset])
-      {
-        pstep = SKIP;
-        break;
-      }
-      if ( (' ' == buf[offset]) 
-           || ('\t' == buf[offset])
-           || ('\r' == buf[offset]) )
-        pstep = TRIM;
-      else
-      {
-        pstep = READHOST;
-        hostname = &buf[offset];        
-      }
-      break;
-    case READHOST:
-      if (isspace (buf[offset]))
-      {
-        buf[offset] = '\0';
-        for (host = 0; host < nhosts; host++)
-          if (0 == strcmp (hostnames[host], hostname))
-            break;
-        if (host == nhosts)
-        {
-          LOG_DEBUG ("Adding host: %s\n", hostname);
-          hostname = GNUNET_strdup (hostname);
-          GNUNET_array_append (hostnames, nhosts, hostname);
-        }
-        else
-          LOG_DEBUG ("Not adding host %s as it is already included\n", hostname);
-        hostname = NULL;
-        pstep = SCAN;
-      }
-      break;
-    }
-    offset++;
+    child_death_task_id =
+       GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+                                       pr, &child_death_task, NULL);
+    return;
   }
-  GNUNET_free_non_null (buf);
-  return nhosts;
+  /* consume the signal */
+  GNUNET_break (0 < GNUNET_DISK_file_read (pr, &c, sizeof (c)));
+  LOG_DEBUG ("Child died\n");
+  GNUNET_SCHEDULER_cancel (terminate_task_id);
+  terminate_task_id = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (GNUNET_OK == GNUNET_OS_process_status (child, &child_status,
+                                                        &child_exit_code));
+  GNUNET_OS_process_destroy (child);
+  child = NULL;
+  shutdown_task_id = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+}
+
+
+static void
+destroy_hosts(struct GNUNET_TESTBED_Host **hosts, unsigned int nhosts)
+{
+  unsigned int host;
+
+  GNUNET_assert (NULL != hosts);
+  for (host = 0; host < nhosts; host++)
+    if (NULL != hosts[host])
+      GNUNET_TESTBED_host_destroy (hosts[host]);
+  GNUNET_free (hosts);
+  hosts = NULL;
 }
 
 
 /**
- * Main function that will be run by the scheduler.
+ * The main scheduler run task
  *
- * @param cls closure
- * @param args remaining command-line arguments
- * @param cfgfile name of the configuration file used (for saving, can be NULL!)
- * @param config configuration
+ * @param cls NULL
+ * @param tc scheduler task context
  */
 static void
-run (void *cls, char *const *args, const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *config)
+run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_OS_Process *proc;
-  unsigned long code;
-  enum GNUNET_OS_ProcessStatusType proc_status;
-  int rank;
-  int msg_size;
-  
-  if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
+  struct GNUNET_TESTBED_Host **hosts;
+  const struct GNUNET_CONFIGURATION_Handle *null_cfg;
+  char *tmpdir;
+  char *hostname;
+  size_t hostname_len;
+  unsigned int nhosts;
+
+  null_cfg = GNUNET_CONFIGURATION_create ();
+  nhosts = GNUNET_TESTBED_hosts_load_from_loadleveler (null_cfg, &hosts);
+  if (0 == nhosts)
   {
     GNUNET_break (0);
+    ret = GNUNET_SYSERR;
     return;
   }
-  if (0 != rank)
+  hostname_len = GNUNET_OS_get_hostname_max_length ();
+  hostname = GNUNET_malloc (hostname_len);
+  if (0 != gethostname (hostname, hostname_len))
   {
-    ret = GNUNET_OK;
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Cannot get hostname.  Exiting\n");
+    GNUNET_free (hostname);
+    destroy_hosts (hosts, nhosts);
+    ret = GNUNET_SYSERR;
     return;
   }
-  PRINTF ("Spawning process\n");
-  proc =
-      GNUNET_OS_start_process_vap (GNUNET_NO, GNUNET_OS_INHERIT_STD_ALL, NULL,
-                                   NULL, args[0], args);
-  if (NULL == proc)
+  if (NULL == strstr (GNUNET_TESTBED_host_get_hostname (hosts[0]), hostname))
   {
-    printf ("Cannot exec\n");
+    LOG_DEBUG ("Exiting as `%s' is not the lowest host\n", hostname);
+    GNUNET_free (hostname);
+    ret = GNUNET_OK;
     return;
   }
-  do
-  {
-    (void) sleep (1);
-    ret = GNUNET_OS_process_status (proc, &proc_status, &code);
-  }
-  while (GNUNET_NO == ret);
-  GNUNET_assert (GNUNET_NO != ret);
-  if (GNUNET_OK == ret)
+  LOG_DEBUG ("Will be executing `%s' on host `%s'\n", argv2[0], hostname);
+  GNUNET_free (hostname);
+  destroy_hosts (hosts, nhosts);
+  tmpdir = getenv ("TMPDIR");
+  if (NULL == tmpdir)
+    tmpdir = getenv ("TMP");
+  if (NULL == tmpdir)
+    tmpdir = getenv ("TEMP");
+  if (NULL == tmpdir)
+    tmpdir = "/tmp";
+  (void) GNUNET_asprintf (&fn, "%s/gnunet-testbed-spawn.lock", tmpdir);
+  /* Open the unique file; we can create it then we can spawn the child process
+     else we exit */
+  fh = open (fn, O_CREAT | O_EXCL | O_CLOEXEC,
+             S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+  if (-1 == fh)
   {
-    if (0 != code)
+    if (EEXIST == errno)
     {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "Child terminated abnormally\n");
-      ret = GNUNET_SYSERR;
-      GNUNET_break (0);
+      LOG_DEBUG ("Lock file already created by other process.  Exiting\n");
+      ret = GNUNET_OK;
       return;
     }
-  }
-  else
-  {
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "open");
     ret = GNUNET_SYSERR;
-    GNUNET_break (0);
     return;
   }
-  if (0 == get_loadleveler_hosts())
+  /* Spawn the new process here */
+  LOG (GNUNET_ERROR_TYPE_INFO, _("Spawning process `%s'\n"), argv2[0]);
+  child = GNUNET_OS_start_process_vap (GNUNET_NO, GNUNET_OS_INHERIT_STD_ALL, NULL,
+                                       NULL, NULL,
+                                       argv2[0], argv2);
+  if (NULL == child)
   {
     GNUNET_break (0);
     ret = GNUNET_SYSERR;
+    shutdown_task_id = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
     return;
   }
-  resolve_task_id = GNUNET_SCHEDULER_add_now (&resolve_task, NULL);
+  ret = GNUNET_OK;
+  terminate_task_id =
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &terminate_task, NULL);
+  child_death_task_id =
+    GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
+                                   GNUNET_DISK_pipe_handle (sigpipe,
+                                                            GNUNET_DISK_PIPE_END_READ),
+                                   &child_death_task, NULL);
+}
+
+
+/**
+ * Signal handler called for SIGCHLD.
+ */
+static void
+sighandler_child_death ()
+{
+  static char c;
+  int old_errno = errno;       /* back-up errno */
+
+  GNUNET_break (1 ==
+               GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
+                                       (sigpipe, GNUNET_DISK_PIPE_END_WRITE),
+                                       &c, sizeof (c)));
+  errno = old_errno;           /* restore errno */
 }
 
 
@@ -307,37 +290,44 @@ run (void *cls, char *const *args, const char *cfgfile,
 int
 main (int argc, char *argv[])
 {
-  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
-    GNUNET_GETOPT_OPTION_END
-  };
-  unsigned int host;
-  int rres;
-  
-  ret = GNUNET_SYSERR;
+  struct GNUNET_SIGNAL_Context *shc_chld;
+  unsigned int cnt;
+
+  ret = -1;
   if (argc < 2)
   {
     printf ("Need arguments: gnunet-testbed-mpi-spawn <cmd> <cmd_args>");
     return 1;
   }
-  if (MPI_SUCCESS != MPI_Init (&argc, &argv))
+  if (GNUNET_OK != GNUNET_log_setup ("gnunet-testbed-spawn", NULL, NULL))
   {
     GNUNET_break (0);
     return 1;
   }
-  rres =
-      GNUNET_PROGRAM_run (argc, argv,
-                          "gnunet-testbed-mpi-spawn <cmd> <cmd_args>",
-                          _("Spawns cmd after starting my the MPI run-time"),
-                          options, &run, NULL);
-  (void) MPI_Finalize ();
-  for (host = 0; host < nhosts; host++)
-    GNUNET_free (hostnames[host]);
-  for (host = 0; host < nhostaddrs; host++)
-    GNUNET_free (hostaddrs[host]);
-  GNUNET_free_non_null (hostnames);
-  GNUNET_free_non_null (hostaddrs);
-  if ((GNUNET_OK == rres) && (GNUNET_OK == ret))
-    return 0;
-  printf ("Something went wrong\n");
-  return 1;
+  if (NULL == (sigpipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO,
+                                           GNUNET_NO, GNUNET_NO)))
+  {
+    GNUNET_break (0);
+    ret = GNUNET_SYSERR;
+    return 1;
+  }
+  shc_chld =
+      GNUNET_SIGNAL_handler_install (GNUNET_SIGCHLD, &sighandler_child_death);
+  if (NULL == shc_chld)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Cannot install a signal handler\n");
+    return 1;
+  }
+  argv2 = GNUNET_malloc (sizeof (char *) * argc);
+  for (cnt = 1; cnt < argc; cnt++)
+    argv2[cnt - 1] = argv[cnt];
+  GNUNET_SCHEDULER_run (run, NULL);
+  GNUNET_free (argv2);
+  GNUNET_SIGNAL_handler_uninstall (shc_chld);
+  shc_chld = NULL;
+  GNUNET_DISK_pipe_close (sigpipe);
+  GNUNET_free_non_null (fn);
+  if (GNUNET_OK != ret)
+    return ret;
+  return 0;
 }