-more datacache integration work
[oweals/gnunet.git] / src / testbed / testbed_api_barriers.c
index 23d34ffe893bebadf8d8d9c553f13f1841c7ef4e..8ac3fdab549e2addbd9e11829b4dc6f9e708a9f6 100644 (file)
@@ -1,6 +1,6 @@
 /*
   This file is part of GNUnet.
-  (C) 2008--2013 Christian Grothoff (and other contributing authors)
+  Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors)
 
   GNUnet is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published
 /**
  * @file testbed/testbed_api_barriers.c
  * @brief API implementation for testbed barriers
- * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
+ * @author Sree Harsha Totakura <sreeharsha@totakura.in>
  */
 
 #include "platform.h"
 #include "gnunet_testbed_service.h"
 #include "testbed_api.h"
+#include "testbed_api_barriers.h"
+
+/**
+ * Logging shorthand
+ */
+#define LOG(type, ...)                          \
+  GNUNET_log_from (type, "testbed-api-barriers", __VA_ARGS__);
+
+/**
+ * Debug logging shorthand
+ */
+#define LOG_DEBUG(...)                          \
+  LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__);
 
 /**
  * Handle for barrier
@@ -42,7 +55,7 @@ struct GNUNET_TESTBED_Barrier
    * The controller handle given while initiliasing this barrier
    */
   struct GNUNET_TESTBED_Controller *c;
-  
+
   /**
    * The name of the barrier
    */
@@ -57,7 +70,11 @@ struct GNUNET_TESTBED_Barrier
    * the closure for the above callback
    */
   void *cls;
+
+  /**
+   * Should the barrier crossed status message be echoed back to the controller?
+   */
+  int echo;
 };
 
 
@@ -108,25 +125,30 @@ GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
   struct GNUNET_TESTBED_Barrier *barrier;
   char *emsg;
   const char *name;
-  struct GNUNET_HashCode key;  
+  struct GNUNET_HashCode key;
   size_t emsg_len;
   int status;
   uint16_t msize;
   uint16_t name_len;
-  
+
   emsg = NULL;
   barrier = NULL;
-  msize = ntohs (msg->header.size);  
+  msize = ntohs (msg->header.size);
   name = msg->data;
   name_len = ntohs (msg->name_len);
-  if (  (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize)
-        || ('\0' != name[name_len])  )
+  LOG_DEBUG ("Received BARRIER_STATUS msg\n");
+  if (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if ('\0' != name[name_len])
   {
     GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
   status = ntohs (msg->status);
-  if (BARRIER_STATUS_ERROR == status)
+  if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)
   {
     status = -1;
     emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len
@@ -143,14 +165,22 @@ GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
     (void) memcpy (emsg, msg->data + name_len + 1, emsg_len);
   }
   if (NULL == barrier_map)
+  {
+    GNUNET_break_op (0);
     goto cleanup;
+  }
   GNUNET_CRYPTO_hash (name, name_len, &key);
   barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
   if (NULL == barrier)
+  {
+    GNUNET_break_op (0);
     goto cleanup;
+  }
   GNUNET_assert (NULL != barrier->cb);
+  if ((GNUNET_YES == barrier->echo) && (GNUNET_TESTBED_BARRIERSTATUS_CROSSED == status))
+    GNUNET_TESTBED_queue_message_ (c, GNUNET_copy_message (&msg->header));
   barrier->cb (barrier->cls, name, barrier, status, emsg);
-  if (BARRIER_STATUS_INITIALISED == status)
+  if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED == status)
     return GNUNET_OK;           /* just initialised; skip cleanup */
 
  cleanup:
@@ -173,20 +203,23 @@ GNUNET_TESTBED_handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
  * @param cb the callback to call when the barrier is reached or upon error.
  *   Cannot be NULL.
  * @param cls closure for the above callback
+ * @param echo GNUNET_YES to echo the barrier crossed status message back to the
+ *   controller
  * @return barrier handle; NULL upon error
  */
 struct GNUNET_TESTBED_Barrier *
-GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller,
-                             const char *name,
-                             unsigned int quorum,
-                             GNUNET_TESTBED_barrier_status_cb cb, void *cls)
+GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller,
+                              const char *name,
+                              unsigned int quorum,
+                              GNUNET_TESTBED_barrier_status_cb cb, void *cls,
+                              int echo)
 {
   struct GNUNET_TESTBED_BarrierInit *msg;
   struct GNUNET_TESTBED_Barrier *barrier;
   struct GNUNET_HashCode key;
   size_t name_len;
   uint16_t msize;
-  
+
   GNUNET_assert (quorum <= 100);
   GNUNET_assert (NULL != cb);
   name_len = strlen (name);
@@ -200,11 +233,13 @@ GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller,
     GNUNET_break (0);
     return NULL;
   }
-  barrier = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Barrier));
+  LOG_DEBUG ("Initialising barrier `%s'\n", name);
+  barrier = GNUNET_new (struct GNUNET_TESTBED_Barrier);
   barrier->c = controller;
   barrier->name = GNUNET_strdup (name);
   barrier->cb = cb;
   barrier->cls = cls;
+  barrier->echo = echo;
   (void) memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode));
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (barrier_map, &barrier->key,
@@ -221,6 +256,30 @@ GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller,
 }
 
 
+/**
+ * Initialise a barrier and call the given callback when the required percentage
+ * of peers (quorum) reach the barrier OR upon error.
+ *
+ * @param controller the handle to the controller
+ * @param name identification name of the barrier
+ * @param quorum the percentage of peers that is required to reach the barrier.
+ *   Peers signal reaching a barrier by calling
+ *   GNUNET_TESTBED_barrier_reached().
+ * @param cb the callback to call when the barrier is reached or upon error.
+ *   Cannot be NULL.
+ * @param cls closure for the above callback
+ * @return barrier handle; NULL upon error
+ */
+struct GNUNET_TESTBED_Barrier *
+GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller,
+                             const char *name,
+                             unsigned int quorum,
+                             GNUNET_TESTBED_barrier_status_cb cb, void *cls)
+{
+  return GNUNET_TESTBED_barrier_init_ (controller, name, quorum, cb, cls, GNUNET_YES);
+}
+
+
 /**
  * Cancel a barrier.
  *
@@ -241,4 +300,232 @@ GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier)
   barrier_remove (barrier);
 }
 
+
+/**
+ * Barrier wait handle
+ */
+struct GNUNET_TESTBED_BarrierWaitHandle
+{
+  /**
+   * The name of the barrier
+   */
+  char *name;
+
+  /**
+   * Then configuration used for the client connection
+   */
+  struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
+   * The client connection
+   */
+  struct GNUNET_CLIENT_Connection *conn;
+
+  /**
+   * Transmit handle
+   */
+  struct GNUNET_CLIENT_TransmitHandle *tx;
+
+  /**
+   * The message to transmit with tx
+   */
+  struct GNUNET_MessageHeader *msg;
+
+  /**
+   * The barrier wait callback
+   */
+  GNUNET_TESTBED_barrier_wait_cb cb;
+
+  /**
+   * The closure for the above callback
+   */
+  void *cls;
+};
+
+
+/**
+ * Function to destroy barrier wait handle
+ *
+ * @param h the handle to destroy
+ */
+static void
+destroy_handle (struct GNUNET_TESTBED_BarrierWaitHandle *h)
+{
+  GNUNET_free (h->name);
+  if (NULL != h->tx)
+    GNUNET_CLIENT_notify_transmit_ready_cancel (h->tx);
+  if (NULL != h->conn)
+    GNUNET_CLIENT_disconnect (h->conn);
+  if (NULL != h->msg)
+    GNUNET_free (h->msg);
+  GNUNET_CONFIGURATION_destroy (h->cfg);
+  GNUNET_free (h);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param message received message; NULL on timeout or fatal error
+ */
+static void
+receive_handler (void *cls, const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_TESTBED_BarrierWaitHandle *h = cls;
+  const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
+  uint16_t msize;
+
+  if (NULL == message)
+  {
+    GNUNET_break_op (0);
+    goto fail;
+  }
+  if (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS != ntohs (message->type))
+  {
+    GNUNET_break_op (0);
+    goto fail;
+  }
+  msize = ntohs (message->size);
+  if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
+  {
+    GNUNET_break_op (0);
+    goto fail;
+  }
+  msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
+  switch (ntohs (msg->status))
+  {
+  case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
+    goto fail;
+  case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
+    GNUNET_break (0);           /* FIXME */
+    goto destroy;
+  case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
+    h->cb (h->cls, h->name, GNUNET_OK);
+    goto destroy;
+  default:
+    GNUNET_break_op (0);
+  }
+
+ fail:
+  h->cb (h->cls, h->name, GNUNET_SYSERR);
+
+ destroy:
+  destroy_handle (h);
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_notify (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_TESTBED_BarrierWaitHandle *h = cls;
+  uint16_t msize;
+
+  h->tx = NULL;
+  if ((0 == size) || (NULL == buf))
+  {
+    destroy_handle (h);
+    return 0;
+  }
+  msize = htons (h->msg->size);
+  GNUNET_assert (msize <= size);
+  (void) memcpy (buf, h->msg, msize);
+  GNUNET_free (h->msg);
+  h->msg = NULL;
+  GNUNET_CLIENT_receive (h->conn, &receive_handler, h, GNUNET_TIME_UNIT_FOREVER_REL);
+  return msize;
+}
+
+
+/**
+ * Wait for a barrier to be crossed.  This function should be called by the
+ * peers which have been started by the testbed.  If the peer is not started by
+ * testbed this function may return error
+ *
+ * @param name the name of the barrier
+ * @param cb the barrier wait callback
+ * @param cls the closure for the above callback
+ * @return barrier wait handle which can be used to cancel the waiting at
+ *   anytime before the callback is called.  NULL upon error.
+ */
+struct GNUNET_TESTBED_BarrierWaitHandle *
+GNUNET_TESTBED_barrier_wait (const char *name,
+                             GNUNET_TESTBED_barrier_wait_cb cb,
+                             void *cls)
+{
+  struct GNUNET_TESTBED_BarrierWait *msg;
+  struct GNUNET_CONFIGURATION_Handle *cfg;
+  struct GNUNET_TESTBED_BarrierWaitHandle *h;
+  char *cfg_filename;
+  size_t name_len;
+  uint16_t msize;
+
+  GNUNET_assert (NULL != cb);
+  GNUNET_assert (NULL != name);
+  cfg_filename = getenv (ENV_TESTBED_CONFIG);
+  if (NULL == cfg_filename)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Are you running under testbed?\n");
+    return NULL;
+  }
+  cfg = GNUNET_CONFIGURATION_create ();
+  if (GNUNET_OK != GNUNET_CONFIGURATION_load (cfg, cfg_filename))
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Unable to load configuration from file `%s'\n",
+         cfg_filename);
+    GNUNET_CONFIGURATION_destroy (cfg);
+    return NULL;
+  }
+  h = GNUNET_new (struct GNUNET_TESTBED_BarrierWaitHandle);
+  h->name = GNUNET_strdup (name);
+  h->cfg = cfg;
+  h->conn = GNUNET_CLIENT_connect ("testbed-barrier", h->cfg);
+  h->cb = cb;
+  h->cls = cls;
+  if (NULL == h->conn)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Unable to connect to local testbed-barrier service\n");
+    destroy_handle (h);
+    return NULL;
+  }
+  name_len = strlen (name);
+  msize = sizeof (struct GNUNET_TESTBED_BarrierWait) + name_len;
+  msg = GNUNET_malloc (msize);
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT);
+  msg->header.size = htons (msize);
+  (void) memcpy (msg->name, name, name_len);
+  h->msg = &msg->header;
+  h->tx =
+      GNUNET_CLIENT_notify_transmit_ready (h->conn, msize,
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           GNUNET_NO,
+                                           &transmit_notify,
+                                           h);
+  return h;
+}
+
+
+/**
+ * Cancel a barrier wait handle
+ *
+ * @param h the barrier wait handle
+ */
+void
+GNUNET_TESTBED_barrier_wait_cancel (struct GNUNET_TESTBED_BarrierWaitHandle *h)
+{
+  destroy_handle (h);
+}
+
 /* end of testbed_api_barriers.c */