RPS API: Fix whitespaces
[oweals/gnunet.git] / src / rps / test_rps.c
index 542919425da19ca90984fe8603233794f3157e12..63a6007aed3724184714fc4433123dbd41c72acb 100644 (file)
@@ -2,20 +2,18 @@
      This file is part of GNUnet.
      Copyright (C) 2009, 2012 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 /**
  * @file rps/test_rps.c
  */
 static uint32_t num_peers;
 
+/**
+ * How long do we run the test?
+ * In seconds.
+ */
+static uint32_t timeout_s;
+
 /**
  * How long do we run the test?
  */
@@ -185,6 +189,11 @@ struct RPSPeer
    */
   struct GNUNET_RPS_Handle *rps_handle;
 
+  /**
+   * Handle to stream requests
+   */
+  struct GNUNET_RPS_StreamRequestHandle *rps_srh;
+
   /**
    * ID of the peer.
    */
@@ -368,6 +377,11 @@ static unsigned int view_sizes;
  */
 static int ok;
 
+/**
+ * Identifier for the churn task that runs periodically
+ */
+static struct GNUNET_SCHEDULER_Task *post_test_task;
+
 /**
  * Identifier for the churn task that runs periodically
  */
@@ -555,6 +569,11 @@ struct SingleTestRun
   uint32_t stat_collect_flags;
 } cur_test_run;
 
+/**
+ * Did we finish the test?
+ */
+static int post_test;
+
 /**
  * Are we shutting down?
  */
@@ -658,6 +677,13 @@ ids_to_file (char *file_name,
   }
 } */
 
+/**
+ * Task run on timeout to collect statistics and potentially shut down.
+ */
+static void
+post_test_op (void *cls);
+
+
 /**
  * Test the success of a single test
  */
@@ -718,6 +744,8 @@ static int check_statistics_collect_completed_single_peer (
   }
   return GNUNET_YES;
 }
+
+
 /**
  * @brief Checks if all peers already received their statistics value from the
  * statistics service.
@@ -744,6 +772,7 @@ static int check_statistics_collect_completed ()
   return GNUNET_YES;
 }
 
+
 /**
  * Task run on timeout to shut everything down.
  */
@@ -751,10 +780,16 @@ static void
 shutdown_op (void *cls)
 {
   unsigned int i;
+  (void) cls;
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Shutdown task scheduled, going down.\n");
   in_shutdown = GNUNET_YES;
+  if (NULL != post_test_task)
+  {
+    GNUNET_SCHEDULER_cancel (post_test_task);
+    post_test_op (NULL);
+  }
   if (NULL != churn_task)
   {
     GNUNET_SCHEDULER_cancel (churn_task);
@@ -762,13 +797,48 @@ shutdown_op (void *cls)
   }
   for (i = 0; i < num_peers; i++)
   {
+    if (NULL != rps_peers[i].rps_handle)
+    {
+      GNUNET_RPS_disconnect (rps_peers[i].rps_handle);
+    }
     if (NULL != rps_peers[i].op)
+    {
       GNUNET_TESTBED_operation_done (rps_peers[i].op);
+    }
+  }
+}
+
+
+/**
+ * Task run on timeout to collect statistics and potentially shut down.
+ */
+static void
+post_test_op (void *cls)
+{
+  unsigned int i;
+  (void) cls;
+
+  post_test_task = NULL;
+  post_test = GNUNET_YES;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Post test task scheduled, going down.\n");
+  if (NULL != churn_task)
+  {
+    GNUNET_SCHEDULER_cancel (churn_task);
+    churn_task = NULL;
+  }
+  for (i = 0; i < num_peers; i++)
+  {
     if (NULL != cur_test_run.post_test)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n", i);
       cur_test_run.post_test (&rps_peers[i]);
     }
+    if (NULL != rps_peers[i].op)
+    {
+      GNUNET_TESTBED_operation_done (rps_peers[i].op);
+      rps_peers[i].op = NULL;
+    }
   }
   /* If we do not collect statistics, shut down directly */
   if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
@@ -789,6 +859,13 @@ seed_peers (void *cls)
   unsigned int amount;
   unsigned int i;
 
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
+  {
+    return;
+  }
+
+  GNUNET_assert (NULL != peer->rps_handle);
+
   // TODO if malicious don't seed mal peers
   amount = round (.5 * num_peers);
 
@@ -846,8 +923,9 @@ info_cb (void *cb_cls,
          const char *emsg)
 {
   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
+  (void) op;
 
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -901,7 +979,9 @@ rps_connect_complete_cb (void *cls,
   struct RPSPeer *rps_peer = cls;
   struct GNUNET_RPS_Handle *rps = ca_result;
 
-  if (GNUNET_YES == in_shutdown)
+  GNUNET_assert (NULL != ca_result);
+
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -944,9 +1024,11 @@ rps_connect_adapter (void *cls,
   struct GNUNET_RPS_Handle *h;
 
   h = GNUNET_RPS_connect (cfg);
+  GNUNET_assert (NULL != h);
 
   if (NULL != cur_test_run.pre_test)
     cur_test_run.pre_test (cls, h);
+  GNUNET_assert (NULL != h);
 
   return h;
 }
@@ -1007,6 +1089,9 @@ stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
 {
   //struct GNUNET_STATISTICS_Handle *sh = ca_result;
   //struct RPSPeer *peer = (struct RPSPeer *) cls;
+  (void) cls;
+  (void) op;
+  (void) ca_result;
 
   if (NULL != emsg)
   {
@@ -1035,6 +1120,12 @@ rps_disconnect_adapter (void *cls,
 {
   struct RPSPeer *peer = cls;
   struct GNUNET_RPS_Handle *h = op_result;
+
+  if (NULL != peer->rps_srh)
+  {
+    GNUNET_RPS_stream_cancel (peer->rps_srh);
+    peer->rps_srh = NULL;
+  }
   GNUNET_assert (NULL != peer);
   GNUNET_RPS_disconnect (h);
   peer->rps_handle = NULL;
@@ -1105,10 +1196,10 @@ default_reply_handle (void *cls,
   if (0 == evaluate () && HAVE_QUICK_QUIT == cur_test_run.have_quick_quit)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test succeeded before timeout\n");
-    GNUNET_assert (NULL != shutdown_task);
-    GNUNET_SCHEDULER_cancel (shutdown_task);
-    shutdown_task = GNUNET_SCHEDULER_add_now (&shutdown_op, NULL);
-    GNUNET_assert (NULL!= shutdown_task);
+    GNUNET_assert (NULL != post_test_task);
+    GNUNET_SCHEDULER_cancel (post_test_task);
+    post_test_task = GNUNET_SCHEDULER_add_now (&post_test_op, NULL);
+    GNUNET_assert (NULL!= post_test_task);
   }
 }
 
@@ -1122,7 +1213,7 @@ request_peers (void *cls)
   struct RPSPeer *rps_peer;
   struct PendingReply *pending_rep;
 
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
     return;
   rps_peer = pending_req->rps_peer;
   GNUNET_assert (1 <= rps_peer->num_pending_reqs);
@@ -1185,7 +1276,7 @@ cancel_request_cb (void *cls)
   struct RPSPeer *rps_peer = cls;
   struct PendingReply *pending_rep;
 
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
     return;
   pending_rep = rps_peer->pending_rep_head;
   GNUNET_assert (1 <= rps_peer->num_pending_reps);
@@ -1290,7 +1381,7 @@ mal_cb (struct RPSPeer *rps_peer)
 {
   uint32_t num_mal_peers;
 
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1317,7 +1408,7 @@ mal_cb (struct RPSPeer *rps_peer)
 static void
 single_req_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1331,7 +1422,7 @@ single_req_cb (struct RPSPeer *rps_peer)
 static void
 delay_req_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1345,7 +1436,7 @@ delay_req_cb (struct RPSPeer *rps_peer)
 static void
 seed_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1361,7 +1452,7 @@ seed_cb (struct RPSPeer *rps_peer)
 static void
 seed_big_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1378,6 +1469,7 @@ seed_big_cb (struct RPSPeer *rps_peer)
 static void
 single_peer_seed_cb (struct RPSPeer *rps_peer)
 {
+  (void) rps_peer;
   // TODO
 }
 
@@ -1387,7 +1479,7 @@ single_peer_seed_cb (struct RPSPeer *rps_peer)
 static void
 seed_req_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1406,7 +1498,7 @@ seed_req_cb (struct RPSPeer *rps_peer)
 static void
 req_cancel_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1439,7 +1531,7 @@ churn (void *cls);
 static void
 churn_test_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1461,6 +1553,63 @@ churn_test_cb (struct RPSPeer *rps_peer)
   schedule_missing_requests (rps_peer);
 }
 
+/***********************************
+ * SUB
+***********************************/
+
+static void
+got_stream_peer_cb (void *cls,
+                    uint64_t num_peers,
+                    const struct GNUNET_PeerIdentity *peers)
+{
+  const struct RPSPeer *rps_peer = cls;
+
+  for (uint64_t i = 0; i < num_peers; i++)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Peer %" PRIu32 " received [%s] from stream.\n",
+                rps_peer->index,
+                GNUNET_i2s (&peers[i]));
+    if (0 != rps_peer->index &&
+        0 == memcmp (&peers[i],
+                     &rps_peers[0].peer_id,
+                     sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub\n");
+      ok = 1;
+    }
+    else if (0 == rps_peer->index &&
+             0 != memcmp (&peers[i],
+                          &rps_peers[0].peer_id,
+                          sizeof (struct GNUNET_PeerIdentity)))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub (lonely)\n");
+      ok = 1;
+    }
+  }
+}
+
+
+static void
+sub_post (struct RPSPeer *rps_peer)
+{
+  if (0 != rps_peer->index) GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
+  else GNUNET_RPS_sub_stop (rps_peer->rps_handle, "lonely");
+}
+
+
+static void
+sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
+{
+  (void) rps_peer;
+
+  if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test");
+  else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */
+  rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
+                                                 &got_stream_peer_cb,
+                                                 rps_peer);
+}
+
 /***********************************
  * PROFILER
 ***********************************/
@@ -1477,10 +1626,11 @@ churn_cb (void *cls,
           struct GNUNET_TESTBED_Operation *op,
           const char *emsg)
 {
+  (void) op;
   // FIXME
   struct OpListEntry *entry = cls;
 
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1607,6 +1757,7 @@ manage_service_wrapper (unsigned int i, unsigned int j,
 static void
 churn (void *cls)
 {
+  (void) cls;
   unsigned int i;
   unsigned int j;
   double portion_online;
@@ -1615,7 +1766,7 @@ churn (void *cls)
   double portion_go_online;
   double portion_go_offline;
 
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1731,7 +1882,7 @@ profiler_reply_handle (void *cls,
 static void
 profiler_cb (struct RPSPeer *rps_peer)
 {
-  if (GNUNET_YES == in_shutdown)
+  if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
     return;
   }
@@ -1769,6 +1920,8 @@ profiler_cb (struct RPSPeer *rps_peer)
 int
 file_name_cb (void *cls, const char *filename)
 {
+  (void) cls;
+
   if (NULL != strstr (filename, "sampler_el"))
   {
     struct RPS_SamplerElement *s_elem;
@@ -2438,6 +2591,8 @@ stat_iterator (void *cls,
                uint64_t value,
                int is_persistent)
 {
+  (void) subsystem;
+  (void) is_persistent;
   const struct STATcls *stat_cls = (const struct STATcls *) cls;
   struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
@@ -2572,6 +2727,9 @@ run (void *cls,
      unsigned int links_succeeded,
      unsigned int links_failed)
 {
+  (void) cls;
+  (void) h;
+  (void) links_failed;
   unsigned int i;
   struct OpListEntry *entry;
 
@@ -2644,6 +2802,9 @@ run (void *cls,
 
   if (NULL != churn_task)
     GNUNET_SCHEDULER_cancel (churn_task);
+  post_test_task = GNUNET_SCHEDULER_add_delayed (timeout, &post_test_op, NULL);
+  timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+      (timeout_s * 1.2) + 0.1 * num_peers);
   shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
 }
 
@@ -2659,6 +2820,7 @@ int
 main (int argc, char *argv[])
 {
   int ret_value;
+  (void) argc;
 
   /* Defaults for tests */
   num_peers = 5;
@@ -2673,13 +2835,14 @@ main (int argc, char *argv[])
   cur_test_run.stat_collect_flags = 0;
   cur_test_run.have_collect_view = NO_COLLECT_VIEW;
   churn_task = NULL;
-  timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
+  timeout_s = 30;
 
   if (strstr (argv[0], "malicious") != NULL)
   {
     cur_test_run.pre_test = mal_pre;
     cur_test_run.main_test = mal_cb;
     cur_test_run.init_peer = mal_init_peer;
+    timeout_s = 40;
 
     if (strstr (argv[0], "_1") != NULL)
     {
@@ -2725,7 +2888,7 @@ main (int argc, char *argv[])
     cur_test_run.main_test = seed_big_cb;
     cur_test_run.eval_cb = no_eval;
     cur_test_run.have_churn = HAVE_NO_CHURN;
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
+    timeout_s = 10;
   }
 
   else if (strstr (argv[0], "_single_peer_seed") != NULL)
@@ -2761,7 +2924,7 @@ main (int argc, char *argv[])
     cur_test_run.main_test = req_cancel_cb;
     cur_test_run.eval_cb = no_eval;
     cur_test_run.have_churn = HAVE_NO_CHURN;
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
+    timeout_s = 10;
   }
 
   else if (strstr (argv[0], "_churn") != NULL)
@@ -2775,14 +2938,29 @@ main (int argc, char *argv[])
     cur_test_run.eval_cb = default_eval_cb;
     cur_test_run.have_churn = HAVE_NO_CHURN;
     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
+    timeout_s = 40;
+  }
+
+  else if (strstr (argv[0], "_sub") != NULL)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test subs\n");
+    cur_test_run.name = "test-rps-sub";
+    num_peers = 5;
+    //cur_test_run.init_peer = &default_init_peer;
+    cur_test_run.pre_test = &sub_pre;
+    cur_test_run.main_test = &single_req_cb;
+    //cur_test_run.reply_handle = default_reply_handle;
+    cur_test_run.post_test = &sub_post;
+    //cur_test_run.eval_cb = default_eval_cb;
+    cur_test_run.have_churn = HAVE_NO_CHURN;
+    cur_test_run.have_quick_quit = HAVE_QUICK_QUIT;
   }
 
   else if (strstr (argv[0], "profiler") != NULL)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
     cur_test_run.name = "test-rps-profiler";
-    num_peers = 10;
+    num_peers = 16;
     mal_type = 3;
     cur_test_run.init_peer = profiler_init_peer;
     //cur_test_run.pre_test = mal_pre;
@@ -2814,12 +2992,13 @@ main (int argc, char *argv[])
                                       STAT_TYPE_RECV_PULL_REQ |
                                       STAT_TYPE_RECV_PULL_REP;
     cur_test_run.have_collect_view = COLLECT_VIEW;
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300);
+    timeout_s = 150;
 
     /* 'Clean' directory */
     (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
     GNUNET_DISK_directory_create ("/tmp/rps/");
   }
+  timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, timeout_s);
 
   rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
   peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
@@ -2847,6 +3026,7 @@ main (int argc, char *argv[])
   }
 
   ret_value = cur_test_run.eval_cb();
+
   if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
   {
     GNUNET_array_grow (rps_peers->cur_view,
@@ -2859,4 +3039,5 @@ main (int argc, char *argv[])
   return ret_value;
 }
 
+
 /* end of test_rps.c */