- sync wait period with deamon
[oweals/gnunet.git] / src / regex / gnunet-regex-profiler.c
index 626198de03fc6f9e93c7e54c750c0809bd929463..83b8ec50f0d36e0222abde11f545f27d3d53fab8 100644 (file)
 #include "gnunet_dht_service.h"
 #include "gnunet_testbed_service.h"
 
-#define FIND_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 90)
-#define SEARCHES_IN_PARALLEL 2
-#define ANNOUNCE_DELAY GNUNET_TIME_relative_multiply(\
-                                                GNUNET_TIME_UNIT_MILLISECONDS,\
-                                                300)
-#define SEARCH_DELAY GNUNET_TIME_relative_multiply(\
-                                                GNUNET_TIME_UNIT_MILLISECONDS,\
-                                                200)
+#define FIND_TIMEOUT \
+        GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 90)
+#define SEARCHES_IN_PARALLEL 100
 
 /**
  * DLL of operations
@@ -146,7 +141,7 @@ struct RegexPeer
   char *policy_file;
 
   /**
-   * Peers search string.
+   * Peer's search string.
    */
   const char *search_str;
 
@@ -351,6 +346,11 @@ static unsigned int peer_cnt;
  */
 static unsigned int peers_found;
 
+/**
+ * Index of peer to start next announce/search..
+ */
+static unsigned int next_search;
+
 /**
  * Search task identifier
  */
@@ -364,7 +364,7 @@ static GNUNET_SCHEDULER_TaskIdentifier search_timeout_task;
 /**
  * Search timeout in seconds.
  */
-static struct GNUNET_TIME_Relative search_timeout = { 60000 };
+static struct GNUNET_TIME_Relative search_timeout_time = { 60000 };
 
 /**
  * How long do we wait before starting the search?
@@ -393,6 +393,11 @@ static unsigned int max_path_compression;
  */
 static char * regex_prefix;
 
+/**
+ * What's the maximum regex reannounce period.
+ */
+static struct GNUNET_TIME_Relative reannounce_period_max;
+
 
 /******************************************************************************/
 /******************************  DECLARATIONS  ********************************/
@@ -460,6 +465,16 @@ static void
 do_collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
+/**
+ * Start announcing the next regex in the DHT.
+ *
+ * @param cls Index of the next peer in the peers array.
+ * @param tc TaskContext.
+ */
+static void
+announce_next_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /******************************************************************************/
 /********************************  SHUTDOWN  **********************************/
 /******************************************************************************/
@@ -556,7 +571,7 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if (NULL != cfg)
     GNUNET_CONFIGURATION_destroy (cfg);
 
-  GNUNET_SCHEDULER_shutdown ();        /* Stop scheduler to shutdown testbed run */
+  GNUNET_SCHEDULER_shutdown (); /* Stop scheduler to shutdown testbed run */
 }
 
 
@@ -698,13 +713,13 @@ stats_cb (void *cls,
   {
     peer->stats_op_handle =
       GNUNET_TESTBED_service_connect (NULL,
-                                     peer->peer_handle,
-                                     "statistics",
-                                     &stats_connect_cb,
-                                     peer,
-                                     &stats_ca,
-                                     &stats_da,
-                                     peer);
+                                      peer->peer_handle,
+                                      "statistics",
+                                      &stats_connect_cb,
+                                      peer,
+                                      &stats_ca,
+                                      &stats_da,
+                                      peer);
   }
 }
 
@@ -739,12 +754,12 @@ stats_connect_cb (void *cls,
   peer->stats_handle = ca_result;
 
   if (NULL == GNUNET_STATISTICS_get (peer->stats_handle, NULL, NULL,
-                                    GNUNET_TIME_UNIT_FOREVER_REL,
-                                    &stats_cb,
-                                    &stats_iterator, peer))
+                                     GNUNET_TIME_UNIT_FOREVER_REL,
+                                     &stats_cb,
+                                     &stats_iterator, peer))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-               "Could not get statistics of peer %u!\n", peer->id);
+                "Could not get statistics of peer %u!\n", peer->id);
   }
 }
 
@@ -765,13 +780,13 @@ do_collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   peer->stats_op_handle =
     GNUNET_TESTBED_service_connect (NULL,
-                                   peer->peer_handle,
-                                   "statistics",
-                                   &stats_connect_cb,
-                                   peer,
-                                   &stats_ca,
-                                   &stats_da,
-                                   peer);
+                                    peer->peer_handle,
+                                    "statistics",
+                                    &stats_connect_cb,
+                                    peer,
+                                    &stats_ca,
+                                    &stats_da,
+                                    peer);
 }
 
 
@@ -787,7 +802,7 @@ do_collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * @param tc TaskContext.
  */
 static void
-find_next_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+find_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
@@ -828,7 +843,7 @@ regex_found_handler (void *cls,
   {
     GNUNET_SCHEDULER_cancel (peer->timeout);
     peer->timeout = GNUNET_SCHEDULER_NO_TASK;
-    GNUNET_SCHEDULER_add_delayed (SEARCH_DELAY, &find_next_string, NULL);
+    GNUNET_SCHEDULER_add_now (&announce_next_regex, NULL);
   }
 
   if (NULL == id)
@@ -899,48 +914,26 @@ regex_found_handler (void *cls,
  * @param tc the task context
  */
 static void
-do_connect_by_string_timeout (void *cls,
+search_timeout (void *cls,
                               const struct GNUNET_SCHEDULER_TaskContext * tc)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Finding matches to all strings did not succeed after %s.\n",
-              GNUNET_STRINGS_relative_time_to_string (search_timeout, GNUNET_NO));
+              GNUNET_STRINGS_relative_time_to_string (search_timeout_time,
+                                                      GNUNET_NO));
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Found %i of %i strings\n", peers_found, num_search_strings);
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Search timed out after %s."
               "Collecting stats and shutting down.\n", 
-              GNUNET_STRINGS_relative_time_to_string (search_timeout,
+              GNUNET_STRINGS_relative_time_to_string (search_timeout_time,
                                                       GNUNET_NO));
 
   GNUNET_SCHEDULER_add_now (&do_collect_stats, NULL);
 }
 
 
-/**
- * Connect by string task that is run to search for a string in the
- * NFA. It first connects to the mesh service and when a connection is
- * established it starts to search for the string.
- *
- * @param cls NULL
- * @param tc the task context
- */
-static void
-do_connect_by_string (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext * tc)
-{
-  unsigned int i;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting string search.\n");
-
-  search_timeout_task = GNUNET_SCHEDULER_add_delayed (search_timeout,
-                                                      &do_connect_by_string_timeout, NULL);
-  for (i = 0; i < SEARCHES_IN_PARALLEL; i++)
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &find_next_string, NULL);
-}
-
-
 /**
  * Search timed out. It might still complete in the future,
  * but we should start another one.
@@ -961,24 +954,23 @@ find_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
               "Searching for string \"%s\" on peer %d timed out. Starting new search.\n",
               p->search_str,
               p->id);
-  GNUNET_SCHEDULER_add_now (&find_next_string, NULL);
+  GNUNET_SCHEDULER_add_now (&announce_next_regex, NULL);
 }
 
 
 /**
- * Start searching for the next string in the DHT.
+ * Start searching for a string in the DHT.
  *
  * @param cls Index of the next peer in the peers array.
  * @param tc TaskContext.
  */
 static void
-find_next_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+find_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) ||
       peer_cnt >= num_search_strings)
     return;
 
-  parallel_searches++;
   peers[peer_cnt].search_str = search_strings[peer_cnt];
   peers[peer_cnt].search_str_matched = GNUNET_NO;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1004,17 +996,6 @@ find_next_string (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 }
 
 
-
-/**
- * Start announcing the next regex in the DHT.
- *
- * @param cls Index of the next peer in the peers array.
- * @param tc TaskContext.
- */
-static void
-announce_next_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
 /**
  * ARM connect adapter. Opens a connection to the ARM service.
  *
@@ -1028,8 +1009,7 @@ arm_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct RegexPeer *peer = cls;
 
-  peer->arm_handle = GNUNET_ARM_alloc (cfg);
-  GNUNET_ARM_connect (peer->arm_handle, NULL, NULL);
+  peer->arm_handle = GNUNET_ARM_connect (cfg, NULL, NULL);
 
   return peer->arm_handle;
 }
@@ -1050,7 +1030,7 @@ arm_da (void *cls, void *op_result)
 
   if (NULL != peer->arm_handle)
   {
-    GNUNET_ARM_disconnect (peer->arm_handle);
+    GNUNET_ARM_disconnect_and_free (peer->arm_handle);
     peer->arm_handle = NULL;
   }
 }
@@ -1072,14 +1052,22 @@ arm_op_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   peer->op_handle = NULL;
 }
 
+
+/**
+ * Callback called when arm has started the daemon we asked for.
+ * 
+ * @param cls           Closure ().
+ * @param arm           Arm handle.
+ * @param rs            Status of the request.
+ * @param service       Service we asked to start (deamon).
+ * @param result        Result of the request.
+ */
 static void
 arm_start_cb (void *cls, struct GNUNET_ARM_Handle *arm,
     enum GNUNET_ARM_RequestStatus rs, const char *service,
     enum GNUNET_ARM_Result result)
 {
   struct RegexPeer *peer = (struct RegexPeer *) cls;
-  static unsigned int arm_peer_cnt;
-  unsigned int next_p;
 
   if (rs != GNUNET_ARM_REQUEST_SENT_OK)
   {
@@ -1101,23 +1089,37 @@ arm_start_cb (void *cls, struct GNUNET_ARM_Handle *arm,
     case GNUNET_ARM_RESULT_STARTING:
       GNUNET_SCHEDULER_add_now (&arm_op_done, peer);
 
-      if (arm_peer_cnt < (num_peers - 1))
       {
-        next_p = (++arm_peer_cnt % num_peers);
-        GNUNET_SCHEDULER_add_delayed (ANNOUNCE_DELAY,
-                                      &announce_next_regex,
-                                      (void *) (long) next_p);
+        long search_peer;
+        unsigned int i;
+        unsigned int me;
+
+        me = peer - peers;
+
+        /* Find a peer to look for a string matching the regex announced */
+        search_peer = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                num_peers);
+        for (i = 0; peers[search_peer].search_str != NULL; i++)
+        {
+          search_peer = (search_peer + 1) % num_peers;
+          if (i > num_peers)
+            GNUNET_abort (); /* we ran out of peers, must be a bug */
+        }
+        peers[search_peer].search_str = search_strings[me];
+        GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
+                                        reannounce_period_max,
+                                        2),
+                                      &find_string,
+                                      (void *) search_peer);
       }
-      else
+      if (next_search >= num_peers &&
+          GNUNET_SCHEDULER_NO_TASK == search_timeout_task)
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                    "All daemons started."
-                    " Waiting %s to start string searches\n",
-                    GNUNET_STRINGS_relative_time_to_string (search_delay,
-                                                            GNUNET_NO));
-        GNUNET_SCHEDULER_add_delayed (search_delay,
-                                      do_connect_by_string, 
-                                      NULL);
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "All daemons started.\n");
+        /* FIXME start GLOBAL timeout to abort experiment */
+        search_timeout_task = GNUNET_SCHEDULER_add_delayed (search_timeout_time,
+                                                            &search_timeout,
+                                                            NULL);
       }
       break;
 
@@ -1150,6 +1152,7 @@ arm_connect_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
   }
 
   GNUNET_assert (NULL != peer->arm_handle);
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "[]%p - ()%p\n", peer->op_handle, op);
   GNUNET_assert (peer->op_handle == op);
   GNUNET_assert (peer->arm_handle == ca_result);
 
@@ -1170,48 +1173,47 @@ arm_connect_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
 static void
 do_announce (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting announce.\n");
+  unsigned int i;
 
-    /* First connect to arm service, then announce. Next
-       announce will be in arm_connect_cb */
-    peers[0].op_handle =
-      GNUNET_TESTBED_service_connect (NULL,
-                                      peers[0].peer_handle,
-                                      "arm",
-                                      &arm_connect_cb,
-                                      &peers[0],
-                                      &arm_ca,
-                                      &arm_da,
-                                      &peers[0]);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting announce.\n");
 
+  for (i = 0; i < SEARCHES_IN_PARALLEL; i++)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "  scheduling announce %u\n",
+                i);
+    (void) GNUNET_SCHEDULER_add_now (&announce_next_regex, NULL);
+  }
 }
 
 
 /**
  * Start announcing the next regex in the DHT.
  *
- * @param cls Index of the next peer in the peers array.
+ * @param cls Closure (unused).
  * @param tc TaskContext.
  */
 static void
 announce_next_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  long next_p = (long) cls;
-
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) ||
+            next_search >= num_peers)
     return;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting daemon %ld\n", next_p);
-
-  peers[next_p].op_handle =
+  /* First connect to arm service, then announce. Next
+   * a nnounce will be in arm_connect_cb */
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting daemon %u\n", next_search);
+  peers[next_search].op_handle =
     GNUNET_TESTBED_service_connect (NULL,
-                                    peers[next_p].peer_handle,
+                                    peers[next_search].peer_handle,
                                     "arm",
                                     &arm_connect_cb,
-                                    &peers[next_p],
+                                    &peers[next_search],
                                     &arm_ca,
                                     &arm_da,
-                                    &peers[next_p]);
+                                    &peers[next_search]);
+  next_search++;
+  parallel_searches++;
 }
 
 /**
@@ -1474,9 +1476,9 @@ policy_filename_cb (void *cls, const char *filename)
 
   peer->policy_file = GNUNET_strdup (filename);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Creating peer %i on host %s for policy file %s\n",
-              peer->id,
-              GNUNET_TESTBED_host_get_hostname (peer->host_handle),
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Creating peer %i on host %s for policy file %s\n",
+              peer->id, GNUNET_TESTBED_host_get_hostname (peer->host_handle),
               filename);
 
   /* Set configuration options specific for this peer
@@ -1484,7 +1486,8 @@ policy_filename_cb (void *cls, const char *filename)
   peer->cfg = GNUNET_CONFIGURATION_dup (cfg);
   GNUNET_CONFIGURATION_set_value_number (peer->cfg, "REGEXPROFILER",
                                          "MAX_PATH_COMPRESSION",
-                                         (unsigned long long)max_path_compression);
+                                         (unsigned long long)
+                                         max_path_compression);
   GNUNET_CONFIGURATION_set_value_string (peer->cfg, "REGEXPROFILER",
                                          "POLICY_FILE", filename);
 
@@ -1527,10 +1530,10 @@ controller_event_cb (void *cls,
         static unsigned int slaves_started;
         unsigned int peer_cnt;
 
-        dll_op = event->details.operation_finished.op_cls;
+        dll_op = event->op_cls;
         GNUNET_CONTAINER_DLL_remove (dll_op_head, dll_op_tail, dll_op);
         GNUNET_free (dll_op);
-        op = event->details.operation_finished.operation;
+        op = event->op;
         if (NULL != event->details.operation_finished.emsg)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1635,7 +1638,7 @@ controller_event_cb (void *cls,
      {
        prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time);
        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                   "\n%u links established in %s\n",
+                   "%u links established in %s\n",
                    num_links,
                    GNUNET_STRINGS_relative_time_to_string (prof_time,
                                                            GNUNET_NO));
@@ -2009,7 +2012,7 @@ run (void *cls, char *const *args, const char *cfgfile,
 
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_string (config, "REGEXPROFILER", "REGEX_PREFIX",
-                                            &regex_prefix))
+                                             &regex_prefix))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 _("Configuration option \"regex_prefix\" missing. Exiting\n"));
@@ -2060,10 +2063,20 @@ run (void *cls, char *const *args, const char *cfgfile,
     shutdown_task = GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
     return;
   }
+  cfg = GNUNET_CONFIGURATION_dup (config);
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_time (cfg, "REGEXPROFILER",
+                                           "REANNOUNCE_PERIOD_MAX",
+                                           &reannounce_period_max))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
+                "reannounce_period_max not given. Using 10 minutes.\n");
+    reannounce_period_max =
+      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 10);
+  }
   unsigned int i;
   for (i = 0; i < num_search_strings; i++)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "search string: %s\n", search_strings[i]);
-  cfg = GNUNET_CONFIGURATION_dup (config);
   abort_task =
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                     (GNUNET_TIME_UNIT_SECONDS, 5), &do_abort,
@@ -2090,7 +2103,8 @@ main (int argc, char *const *argv)
       GNUNET_YES, &GNUNET_GETOPT_set_uint, &linking_factor },
     {'t', "matching-timeout", "TIMEOUT",
       gettext_noop ("wait TIMEOUT before considering a string match as failed"),
-      GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &search_timeout },
+      GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &search_timeout_time
+        },
     {'s', "search-delay", "DELAY",
       gettext_noop ("wait DELAY before starting string search"),
       GNUNET_YES, &GNUNET_GETOPT_set_relative_time, &search_delay },