- give out run handle through which master controller's handle can be retrieved
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_barriers.c
index 079096d86c1699e5dcc2bbe85dd14902cf0bfb86..894bf1056a43a26c252de8bb6b35329ad3e4b61e 100644 (file)
@@ -25,6 +25,8 @@
  */
 
 #include "gnunet-service-testbed.h"
+#include "gnunet-service-testbed_barriers.h"
+
 
 /**
  * timeout for outgoing message transmissions in seconds
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
 
 
+/**
+ * Test to see if local peers have reached the required quorum of a barrier
+ */
+#define LOCAL_QUORUM_REACHED(barrier)           \
+  ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
+
+
 /**
  * Barrier
  */
@@ -60,6 +69,7 @@ struct MessageQueue
   struct GNUNET_MessageHeader *msg;
 };
 
+
 /**
  * Context to be associated with each client
  */
@@ -102,6 +112,38 @@ struct ClientCtx
 };
 
 
+/**
+ * Wrapper around Barrier handle
+ */
+struct WBarrier
+{
+  /**
+   * DLL next pointer
+   */
+  struct WBarrier *next;
+
+  /**
+   * DLL prev pointer
+   */
+  struct WBarrier *prev;
+
+  /**
+   * The local barrier associated with the creation of this wrapper
+   */
+  struct Barrier *barrier;
+
+  /**
+   * The barrier handle from API
+   */
+  struct GNUNET_TESTBED_Barrier *hbarrier;
+
+  /**
+   * Has this barrier been crossed?
+   */
+  uint8_t reached;
+};
+
+
 /**
  * Barrier
  */
@@ -112,6 +154,11 @@ struct Barrier
    */
   struct GNUNET_HashCode hash;
 
+  /**
+   * The client handle to the master controller
+   */
+  struct GNUNET_SERVER_Client *client;
+
   /**
    * The name of the barrier
    */
@@ -127,6 +174,41 @@ struct Barrier
    */
   struct ClientCtx *tail;
 
+  /**
+   * DLL head for the list of barrier handles
+   */
+  struct WBarrier *whead;
+
+  /**
+   * DLL tail for the list of barrier handles
+   */
+  struct WBarrier *wtail;
+
+  /**
+   * Identifier for the timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier tout_task;
+  
+  /**
+   * The status of this barrier
+   */
+  enum GNUNET_TESTBED_BarrierStatus status;
+  
+  /**
+   * Number of barriers wrapped in the above DLL
+   */
+  unsigned int num_wbarriers;
+
+  /**
+   * Number of wrapped barriers reached so far
+   */
+  unsigned int num_wbarriers_reached;
+
+  /**
+   * Number of wrapped barrier initialised so far
+   */
+  unsigned int num_wbarriers_inited;
+
   /**
    * Number of peers which have reached this barrier
    */
@@ -142,10 +224,6 @@ struct Barrier
    */
   uint8_t quorum;
   
-  /**
-   * Was there a timeout while propagating initialisation
-   */
-  uint8_t timedout;
 };
 
 
@@ -210,7 +288,6 @@ transmit_ready_cb (void *cls, size_t size, void *buf)
  *
  * @param ctx the context associated with the client
  * @param msg the message to queue.  Will be consumed
- * @param suspended is the client suspended at the time of calling queue_message
  */
 static void
 queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
@@ -228,7 +305,29 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
 }
 
 
-#if 0
+/**
+ * Function to cleanup client context data structure
+ *
+ * @param ctx the client context data structure
+ */
+static void
+cleanup_clientctx (struct ClientCtx *ctx)
+{
+  struct MessageQueue *mq;
+  
+  GNUNET_SERVER_client_drop (ctx->client);
+  if (NULL != ctx->tx)
+    GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
+  if (NULL != (mq = ctx->mq_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
+    GNUNET_free (mq->msg);
+    GNUNET_free (mq);
+  }
+  GNUNET_free (ctx);
+}
+
+
 /**
  * Function to remove a barrier from the barrier map and cleanup resources
  * occupied by a barrier
@@ -238,37 +337,91 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
 static void
 remove_barrier (struct Barrier *barrier)
 {
+  struct ClientCtx *ctx;
+  
   GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
                                                                     &barrier->hash,
                                                                     barrier));
+  while (NULL != (ctx = barrier->head))
+  {
+    GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
+    cleanup_clientctx (ctx);
+  }
   GNUNET_free (barrier->name);
+  GNUNET_SERVER_client_drop (barrier->client);
   GNUNET_free (barrier);
 }
 
 
 /**
- * Function called upon timeout while waiting for a response from the
- * subcontrollers to barrier init message
+ * Cancels all subcontroller barrier handles
  *
- * @param 
- * @return 
+ * @param barrier the local barrier
  */
 static void
-fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+cancel_wrappers (struct Barrier *barrier)
 {
-  struct ForwardedOperationContext *foctx = cls;
-  struct Barrier *barrier = foctx->cls;
-  
-  barrier->nslaves--;
-  barrier->timedout = GNUNET_YES;
-  if (0 == barrier->nslaves)
+  struct WBarrier *wrapper;
+
+  while (NULL != (wrapper = barrier->whead))
   {
-    GST_send_operation_fail_msg (foctx->client, foctx->operation_id,
-                                 "Timeout while contacting a slave controller");
-    remove_barrier (barrier);
+    GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
+    GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
+    GNUNET_free (wrapper);
   }
 }
-#endif
+
+
+/**
+ * Send a status message about a barrier to the given client
+ *
+ * @param client the client to send the message to
+ * @param name the barrier name
+ * @param status the status of the barrier
+ * @param emsg the error message; should be non-NULL for
+ *   status=BARRIER_STATUS_ERROR 
+ */
+static void
+send_client_status_msg (struct GNUNET_SERVER_Client *client,
+                        const char *name,
+                        enum GNUNET_TESTBED_BarrierStatus status,
+                        const char *emsg)
+{
+  struct GNUNET_TESTBED_BarrierStatusMsg *msg;
+  size_t name_len;
+  uint16_t msize;
+
+  GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
+  name_len = strlen (name) + 1;
+  msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
+      + name_len
+      + (NULL == emsg) ? 0 : strlen (emsg) + 1;
+  msg = GNUNET_malloc (msize);
+  msg->status = htons (status);
+  msg->name_len = htons ((uint16_t) name_len);
+  (void) memcpy (msg->data, name, name_len);
+  if (NULL != emsg)
+    (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
+  GST_queue_message (client, &msg->header);
+}
+
+
+/**
+ * Sends a barrier failed message
+ *
+ * @param barrier the corresponding barrier
+ * @param emsg the error message; should be non-NULL for
+ *   status=BARRIER_STATUS_ERROR 
+ */
+static void
+send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
+{
+  GNUNET_assert (0 != barrier->status);
+  send_client_status_msg (barrier->client, barrier->name,
+                          barrier->status, emsg);
+}
+
+
 
 /**
  * Task for sending barrier crossed notifications to waiting client
@@ -281,13 +434,13 @@ notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Barrier *barrier = cls;
   struct ClientCtx *client_ctx;
-  struct GNUNET_TESTBED_BarrierStatus *msg;
+  struct GNUNET_TESTBED_BarrierStatusMsg *msg;
   struct GNUNET_MessageHeader *dup_msg;
   uint16_t name_len;
   uint16_t msize;
 
   name_len = strlen (barrier->name) + 1;
-  msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len;  
+  msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;  
   msg = GNUNET_malloc (msize);
   msg->header.size = htons (msize);
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
@@ -365,7 +518,7 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
     client_ctx->barrier = barrier;
     GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
     barrier->nreached++;
-    if ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
+    if (LOCAL_QUORUM_REACHED (barrier))
       notify_task_cb (barrier, NULL);
   }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -384,16 +537,12 @@ static void
 disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
 {
   struct ClientCtx *client_ctx;
-  struct Barrier *barrier;
   
   client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
   if (NULL == client_ctx)
-    return;
-  barrier = client_ctx->barrier;
-  GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
-  if (NULL != client_ctx->tx)
-    GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
-  
+    return;                     /* We only set user context for locally
+                                   connected clients */
+  cleanup_clientctx (client_ctx);
 }
 
 
@@ -424,11 +573,102 @@ GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
 void
 GST_barriers_stop ()
 {
+  GNUNET_assert (NULL != barrier_map);
+  GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
   GNUNET_assert (NULL != ctx);
   GNUNET_SERVICE_stop (ctx);
 }
 
 
+/**
+ * Functions of this type are to be given as callback argument to
+ * GNUNET_TESTBED_barrier_init().  The callback will be called when status
+ * information is available for the barrier.
+ *
+ * @param cls the closure given to GNUNET_TESTBED_barrier_init()
+ * @param name the name of the barrier
+ * @param b_ the barrier handle
+ * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
+ *   GNUNET_SYSERR upon error
+ * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
+ *   error messsage
+ */
+static void 
+wbarrier_status_cb (void *cls, const char *name,
+                    struct GNUNET_TESTBED_Barrier *b_,
+                    enum GNUNET_TESTBED_BarrierStatus status,
+                    const char *emsg)
+{
+  struct WBarrier *wrapper = cls;
+  struct Barrier *barrier = wrapper->barrier;
+
+  GNUNET_assert (b_ == wrapper->hbarrier);
+  wrapper->hbarrier = NULL;
+  GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
+  GNUNET_free (wrapper);
+  switch (status)
+  {
+  case BARRIER_STATUS_ERROR:
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Initialising barrier `%s' failed at a sub-controller: %s\n",
+         barrier->name, (NULL != emsg) ? emsg : "NULL");
+    cancel_wrappers (barrier);
+    if (NULL == emsg)
+      emsg = "Initialisation failed at a sub-controller";
+    barrier->status = BARRIER_STATUS_ERROR;
+    send_barrier_status_msg (barrier, emsg);
+    return;
+  case BARRIER_STATUS_CROSSED:
+    if (BARRIER_STATUS_INITIALISED != barrier->status)
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    barrier->num_wbarriers_reached++;
+    if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
+        && (LOCAL_QUORUM_REACHED (barrier)))
+    {
+      barrier->status = BARRIER_STATUS_CROSSED;
+      send_barrier_status_msg (barrier, NULL);
+    }
+    return;
+  case BARRIER_STATUS_INITIALISED:
+    if (0 != barrier->status)
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    barrier->num_wbarriers_inited++;
+    if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
+    {
+      barrier->status = BARRIER_STATUS_INITIALISED;
+      send_barrier_status_msg (barrier, NULL);
+    }
+    return;
+  }
+}
+
+
+/**
+ * Function called upon timeout while waiting for a response from the
+ * subcontrollers to barrier init message
+ *
+ * @param cls barrier
+ * @param tc scheduler task context
+ */
+static void
+fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Barrier *barrier = cls;
+  
+  cancel_wrappers (barrier);
+  barrier->status = BARRIER_STATUS_ERROR;
+  send_barrier_status_msg (barrier,
+                           "Timedout while propagating barrier initialisation\n");
+  remove_barrier (barrier);
+}
+
+
 /**
  * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.  This
  * message should always come from a parent controller or the testbed API if we
@@ -446,12 +686,12 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
                          const struct GNUNET_MessageHeader *message)
 {
   const struct GNUNET_TESTBED_BarrierInit *msg;
-  const char *name;
+  char *name;
   struct Barrier *barrier;
   struct Slave *slave;
+  struct WBarrier *wrapper;
   struct GNUNET_HashCode hash;
   size_t name_len;
-  uint64_t op_id;
   unsigned int cnt;
   uint16_t msize;
   
@@ -475,22 +715,25 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
     return;
   }
   msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
-  op_id = GNUNET_ntohll (msg->op_id);
-  name = msg->name;
   name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
+  name = GNUNET_malloc (name_len + 1);
+  (void) memcpy (name, msg->name, name_len);
   GNUNET_CRYPTO_hash (name, name_len, &hash);
   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
   {
-    GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
+  
+    send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
+                            "A barrier with the same name already exists");
+    GNUNET_free (name);
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
     return;
   }
   barrier = GNUNET_malloc (sizeof (struct Barrier));
   (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
   barrier->quorum = msg->quorum;
-  barrier->name = GNUNET_malloc (name_len + 1);
-  barrier->name[name_len] = '\0';
-  (void) memcpy (barrier->name, name, name_len);
+  barrier->name = name;
+  barrier->client = client;
+  GNUNET_SERVER_client_keep (client);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (barrier_map,
                                                     &barrier->hash,
@@ -506,7 +749,85 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
     {
       GNUNET_break (0);/* May happen when we are connecting to the controller */
       continue;
-    }    
-    GNUNET_break (0);           /* FIXME */
+    }
+    wrapper = GNUNET_malloc (sizeof (struct WBarrier));
+    wrapper->barrier = barrier;
+    GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
+    wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
+                                                     barrier->name,
+                                                     barrier->quorum,
+                                                     &wbarrier_status_cb,
+                                                     wrapper);    
+  }
+  if (NULL == barrier->whead)   /* No further propagation */
+  {
+    barrier->status = BARRIER_STATUS_INITIALISED;
+    send_barrier_status_msg (barrier, NULL);
+  }else
+    barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
+                                                       &fwd_tout_barrier_init,
+                                                       barrier);
+}
+
+
+/**
+ * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.  This
+ * message should always come from a parent controller or the testbed API if we
+ * are the root controller.
+ *
+ * This handler is queued in the main service and will handle the messages sent
+ * either from the testbed driver or from a high level controller
+ *
+ * @param cls NULL
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void
+GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
+                           const struct GNUNET_MessageHeader *message)
+{
+  const struct GNUNET_TESTBED_BarrierCancel *msg;
+  char *name;
+  struct Barrier *barrier;
+  struct GNUNET_HashCode hash;
+  size_t name_len;
+  uint16_t msize;
+
+  if (NULL == GST_context)
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }  
+  if (client != GST_context->client)
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
   }
+  msize = ntohs (message->size);
+  if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
+  name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
+  name = GNUNET_malloc (name_len + 1);
+  (void) memcpy (name, msg->name, name_len);
+  GNUNET_CRYPTO_hash (name, name_len, &hash);
+  if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
+  {
+    GNUNET_break_op (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
+  GNUNET_assert (NULL != barrier);
+  cancel_wrappers (barrier);
+  remove_barrier (barrier);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);  
 }
+
+/* end of gnunet-service-testbed_barriers.c */