*/
#include "gnunet-service-testbed.h"
+#include "gnunet-service-testbed_barriers.h"
+
/**
* timeout for outgoing message transmissions in seconds
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
+/**
+ * Test to see if local peers have reached the required quorum of a barrier
+ */
+#define LOCAL_QUORUM_REACHED(barrier) \
+ ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
+
+
/**
* Barrier
*/
struct GNUNET_MessageHeader *msg;
};
+
/**
* Context to be associated with each client
*/
};
+/**
+ * Wrapper around Barrier handle
+ */
+struct WBarrier
+{
+ /**
+ * DLL next pointer
+ */
+ struct WBarrier *next;
+
+ /**
+ * DLL prev pointer
+ */
+ struct WBarrier *prev;
+
+ /**
+ * The local barrier associated with the creation of this wrapper
+ */
+ struct Barrier *barrier;
+
+ /**
+ * The barrier handle from API
+ */
+ struct GNUNET_TESTBED_Barrier *hbarrier;
+
+ /**
+ * Has this barrier been crossed?
+ */
+ uint8_t reached;
+};
+
+
/**
* Barrier
*/
*/
struct GNUNET_HashCode hash;
+ /**
+ * The client handle to the master controller
+ */
+ struct GNUNET_SERVER_Client *client;
+
/**
* The name of the barrier
*/
*/
struct ClientCtx *tail;
+ /**
+ * DLL head for the list of barrier handles
+ */
+ struct WBarrier *whead;
+
+ /**
+ * DLL tail for the list of barrier handles
+ */
+ struct WBarrier *wtail;
+
+ /**
+ * Identifier for the timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier tout_task;
+
+ /**
+ * The status of this barrier
+ */
+ enum GNUNET_TESTBED_BarrierStatus status;
+
+ /**
+ * Number of barriers wrapped in the above DLL
+ */
+ unsigned int num_wbarriers;
+
+ /**
+ * Number of wrapped barriers reached so far
+ */
+ unsigned int num_wbarriers_reached;
+
+ /**
+ * Number of wrapped barrier initialised so far
+ */
+ unsigned int num_wbarriers_inited;
+
/**
* Number of peers which have reached this barrier
*/
*/
uint8_t quorum;
- /**
- * Was there a timeout while propagating initialisation
- */
- uint8_t timedout;
};
*
* @param ctx the context associated with the client
* @param msg the message to queue. Will be consumed
- * @param suspended is the client suspended at the time of calling queue_message
*/
static void
queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
}
-#if 0
+/**
+ * Function to cleanup client context data structure
+ *
+ * @param ctx the client context data structure
+ */
+static void
+cleanup_clientctx (struct ClientCtx *ctx)
+{
+ struct MessageQueue *mq;
+
+ GNUNET_SERVER_client_drop (ctx->client);
+ if (NULL != ctx->tx)
+ GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
+ if (NULL != (mq = ctx->mq_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
+ GNUNET_free (mq->msg);
+ GNUNET_free (mq);
+ }
+ GNUNET_free (ctx);
+}
+
+
/**
* Function to remove a barrier from the barrier map and cleanup resources
* occupied by a barrier
static void
remove_barrier (struct Barrier *barrier)
{
+ struct ClientCtx *ctx;
+
GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
&barrier->hash,
barrier));
+ while (NULL != (ctx = barrier->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
+ cleanup_clientctx (ctx);
+ }
GNUNET_free (barrier->name);
+ GNUNET_SERVER_client_drop (barrier->client);
GNUNET_free (barrier);
}
/**
- * Function called upon timeout while waiting for a response from the
- * subcontrollers to barrier init message
+ * Cancels all subcontroller barrier handles
*
- * @param
- * @return
+ * @param barrier the local barrier
*/
static void
-fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+cancel_wrappers (struct Barrier *barrier)
{
- struct ForwardedOperationContext *foctx = cls;
- struct Barrier *barrier = foctx->cls;
-
- barrier->nslaves--;
- barrier->timedout = GNUNET_YES;
- if (0 == barrier->nslaves)
+ struct WBarrier *wrapper;
+
+ while (NULL != (wrapper = barrier->whead))
{
- GST_send_operation_fail_msg (foctx->client, foctx->operation_id,
- "Timeout while contacting a slave controller");
- remove_barrier (barrier);
+ GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
+ GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
+ GNUNET_free (wrapper);
}
}
-#endif
+
+
+/**
+ * Send a status message about a barrier to the given client
+ *
+ * @param client the client to send the message to
+ * @param name the barrier name
+ * @param status the status of the barrier
+ * @param emsg the error message; should be non-NULL for
+ * status=BARRIER_STATUS_ERROR
+ */
+static void
+send_client_status_msg (struct GNUNET_SERVER_Client *client,
+ const char *name,
+ enum GNUNET_TESTBED_BarrierStatus status,
+ const char *emsg)
+{
+ struct GNUNET_TESTBED_BarrierStatusMsg *msg;
+ size_t name_len;
+ uint16_t msize;
+
+ GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
+ name_len = strlen (name) + 1;
+ msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
+ + name_len
+ + (NULL == emsg) ? 0 : strlen (emsg) + 1;
+ msg = GNUNET_malloc (msize);
+ msg->status = htons (status);
+ msg->name_len = htons ((uint16_t) name_len);
+ (void) memcpy (msg->data, name, name_len);
+ if (NULL != emsg)
+ (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1);
+ GST_queue_message (client, &msg->header);
+}
+
+
+/**
+ * Sends a barrier failed message
+ *
+ * @param barrier the corresponding barrier
+ * @param emsg the error message; should be non-NULL for
+ * status=BARRIER_STATUS_ERROR
+ */
+static void
+send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
+{
+ GNUNET_assert (0 != barrier->status);
+ send_client_status_msg (barrier->client, barrier->name,
+ barrier->status, emsg);
+}
+
+
/**
* Task for sending barrier crossed notifications to waiting client
{
struct Barrier *barrier = cls;
struct ClientCtx *client_ctx;
- struct GNUNET_TESTBED_BarrierStatus *msg;
+ struct GNUNET_TESTBED_BarrierStatusMsg *msg;
struct GNUNET_MessageHeader *dup_msg;
uint16_t name_len;
uint16_t msize;
name_len = strlen (barrier->name) + 1;
- msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len;
+ msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;
msg = GNUNET_malloc (msize);
msg->header.size = htons (msize);
msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
client_ctx->barrier = barrier;
GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
barrier->nreached++;
- if ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
+ if (LOCAL_QUORUM_REACHED (barrier))
notify_task_cb (barrier, NULL);
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
{
struct ClientCtx *client_ctx;
- struct Barrier *barrier;
client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
if (NULL == client_ctx)
- return;
- barrier = client_ctx->barrier;
- GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
- if (NULL != client_ctx->tx)
- GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx);
-
+ return; /* We only set user context for locally
+ connected clients */
+ cleanup_clientctx (client_ctx);
}
void
GST_barriers_stop ()
{
+ GNUNET_assert (NULL != barrier_map);
+ GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
GNUNET_assert (NULL != ctx);
GNUNET_SERVICE_stop (ctx);
}
+/**
+ * Functions of this type are to be given as callback argument to
+ * GNUNET_TESTBED_barrier_init(). The callback will be called when status
+ * information is available for the barrier.
+ *
+ * @param cls the closure given to GNUNET_TESTBED_barrier_init()
+ * @param name the name of the barrier
+ * @param b_ the barrier handle
+ * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
+ * GNUNET_SYSERR upon error
+ * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
+ * error messsage
+ */
+static void
+wbarrier_status_cb (void *cls, const char *name,
+ struct GNUNET_TESTBED_Barrier *b_,
+ enum GNUNET_TESTBED_BarrierStatus status,
+ const char *emsg)
+{
+ struct WBarrier *wrapper = cls;
+ struct Barrier *barrier = wrapper->barrier;
+
+ GNUNET_assert (b_ == wrapper->hbarrier);
+ wrapper->hbarrier = NULL;
+ GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
+ GNUNET_free (wrapper);
+ switch (status)
+ {
+ case BARRIER_STATUS_ERROR:
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Initialising barrier `%s' failed at a sub-controller: %s\n",
+ barrier->name, (NULL != emsg) ? emsg : "NULL");
+ cancel_wrappers (barrier);
+ if (NULL == emsg)
+ emsg = "Initialisation failed at a sub-controller";
+ barrier->status = BARRIER_STATUS_ERROR;
+ send_barrier_status_msg (barrier, emsg);
+ return;
+ case BARRIER_STATUS_CROSSED:
+ if (BARRIER_STATUS_INITIALISED != barrier->status)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ barrier->num_wbarriers_reached++;
+ if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
+ && (LOCAL_QUORUM_REACHED (barrier)))
+ {
+ barrier->status = BARRIER_STATUS_CROSSED;
+ send_barrier_status_msg (barrier, NULL);
+ }
+ return;
+ case BARRIER_STATUS_INITIALISED:
+ if (0 != barrier->status)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ barrier->num_wbarriers_inited++;
+ if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
+ {
+ barrier->status = BARRIER_STATUS_INITIALISED;
+ send_barrier_status_msg (barrier, NULL);
+ }
+ return;
+ }
+}
+
+
+/**
+ * Function called upon timeout while waiting for a response from the
+ * subcontrollers to barrier init message
+ *
+ * @param cls barrier
+ * @param tc scheduler task context
+ */
+static void
+fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Barrier *barrier = cls;
+
+ cancel_wrappers (barrier);
+ barrier->status = BARRIER_STATUS_ERROR;
+ send_barrier_status_msg (barrier,
+ "Timedout while propagating barrier initialisation\n");
+ remove_barrier (barrier);
+}
+
+
/**
* Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
* message should always come from a parent controller or the testbed API if we
const struct GNUNET_MessageHeader *message)
{
const struct GNUNET_TESTBED_BarrierInit *msg;
- const char *name;
+ char *name;
struct Barrier *barrier;
struct Slave *slave;
+ struct WBarrier *wrapper;
struct GNUNET_HashCode hash;
size_t name_len;
- uint64_t op_id;
unsigned int cnt;
uint16_t msize;
return;
}
msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
- op_id = GNUNET_ntohll (msg->op_id);
- name = msg->name;
name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
+ name = GNUNET_malloc (name_len + 1);
+ (void) memcpy (name, msg->name, name_len);
GNUNET_CRYPTO_hash (name, name_len, &hash);
if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
{
- GST_send_operation_fail_msg (client, op_id, "Barrier already initialised");
+
+ send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
+ "A barrier with the same name already exists");
+ GNUNET_free (name);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
barrier = GNUNET_malloc (sizeof (struct Barrier));
(void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
barrier->quorum = msg->quorum;
- barrier->name = GNUNET_malloc (name_len + 1);
- barrier->name[name_len] = '\0';
- (void) memcpy (barrier->name, name, name_len);
+ barrier->name = name;
+ barrier->client = client;
+ GNUNET_SERVER_client_keep (client);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (barrier_map,
&barrier->hash,
{
GNUNET_break (0);/* May happen when we are connecting to the controller */
continue;
- }
- GNUNET_break (0); /* FIXME */
+ }
+ wrapper = GNUNET_malloc (sizeof (struct WBarrier));
+ wrapper->barrier = barrier;
+ GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
+ wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
+ barrier->name,
+ barrier->quorum,
+ &wbarrier_status_cb,
+ wrapper);
+ }
+ if (NULL == barrier->whead) /* No further propagation */
+ {
+ barrier->status = BARRIER_STATUS_INITIALISED;
+ send_barrier_status_msg (barrier, NULL);
+ }else
+ barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
+ &fwd_tout_barrier_init,
+ barrier);
+}
+
+
+/**
+ * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
+ * message should always come from a parent controller or the testbed API if we
+ * are the root controller.
+ *
+ * This handler is queued in the main service and will handle the messages sent
+ * either from the testbed driver or from a high level controller
+ *
+ * @param cls NULL
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void
+GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ const struct GNUNET_TESTBED_BarrierCancel *msg;
+ char *name;
+ struct Barrier *barrier;
+ struct GNUNET_HashCode hash;
+ size_t name_len;
+ uint16_t msize;
+
+ if (NULL == GST_context)
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ if (client != GST_context->client)
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
}
+ msize = ntohs (message->size);
+ if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
+ name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
+ name = GNUNET_malloc (name_len + 1);
+ (void) memcpy (name, msg->name, name_len);
+ GNUNET_CRYPTO_hash (name, name_len, &hash);
+ if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
+ GNUNET_assert (NULL != barrier);
+ cancel_wrappers (barrier);
+ remove_barrier (barrier);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+
+/* end of gnunet-service-testbed_barriers.c */