/**
* @file testbed/gnunet-service-testbed_barriers.c
* @brief barrier handling at the testbed controller
- * @author Sree Harsha Totakura <sreeharsha@totakura.in>
+ * @author Sree Harsha Totakura <sreeharsha@totakura.in>
*/
#include "gnunet-service-testbed.h"
#include "gnunet-service-testbed_barriers.h"
+#include "testbed_api_barriers.h"
/**
/**
* The client handle to the master controller
*/
- struct GNUNET_SERVER_Client *client;
+ struct GNUNET_SERVER_Client *mc;
/**
* The name of the barrier
* Identifier for the timeout task
*/
GNUNET_SCHEDULER_TaskIdentifier tout_task;
-
+
/**
* The status of this barrier
*/
enum GNUNET_TESTBED_BarrierStatus status;
-
+
/**
* Number of barriers wrapped in the above DLL
*/
* Quorum percentage to be reached
*/
uint8_t quorum;
-
+
};
* @param buf where the callee should write the message
* @return number of bytes written to buf
*/
-static size_t
+static size_t
transmit_ready_cb (void *cls, size_t size, void *buf)
{
struct ClientCtx *ctx = cls;
{
GNUNET_assert (NULL != ctx->client);
GNUNET_SERVER_client_drop (ctx->client);
- ctx->client = NULL;
+ ctx->client = NULL;
return 0;
}
mq = ctx->mq_head;
{
struct MessageQueue *mq;
struct GNUNET_SERVER_Client *client = ctx->client;
-
+
mq = GNUNET_malloc (sizeof (struct MessageQueue));
mq->msg = msg;
+ LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
+ ntohs (msg->type), ntohs (msg->size));
GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
if (NULL == ctx->tx)
ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
cleanup_clientctx (struct ClientCtx *ctx)
{
struct MessageQueue *mq;
-
- GNUNET_SERVER_client_drop (ctx->client);
+
+ if (NULL != ctx->client)
+ {
+ GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
+ GNUNET_SERVER_client_drop (ctx->client);
+ }
if (NULL != ctx->tx)
GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
if (NULL != (mq = ctx->mq_head))
remove_barrier (struct Barrier *barrier)
{
struct ClientCtx *ctx;
-
+
GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
&barrier->hash,
barrier));
cleanup_clientctx (ctx);
}
GNUNET_free (barrier->name);
- GNUNET_SERVER_client_drop (barrier->client);
+ GNUNET_SERVER_client_drop (barrier->mc);
GNUNET_free (barrier);
}
* @param name the barrier name
* @param status the status of the barrier
* @param emsg the error message; should be non-NULL for
- * status=BARRIER_STATUS_ERROR
+ * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
*/
static void
send_client_status_msg (struct GNUNET_SERVER_Client *client,
size_t name_len;
uint16_t msize;
- GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status));
+ GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
name_len = strlen (name);
msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
+ (name_len + 1)
*
* @param barrier the corresponding barrier
* @param emsg the error message; should be non-NULL for
- * status=BARRIER_STATUS_ERROR
+ * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
*/
static void
send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
{
GNUNET_assert (0 != barrier->status);
- send_client_status_msg (barrier->client, barrier->name,
- barrier->status, emsg);
-}
-
-
-
-/**
- * Task for sending barrier crossed notifications to waiting client
- *
- * @param cls the barrier which is crossed
- * @param tc scheduler task context
- */
-static void
-notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct Barrier *barrier = cls;
- struct ClientCtx *client_ctx;
- struct GNUNET_TESTBED_BarrierStatusMsg *msg;
- struct GNUNET_MessageHeader *dup_msg;
- uint16_t name_len;
- uint16_t msize;
-
- name_len = strlen (barrier->name) + 1;
- msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len;
- msg = GNUNET_malloc (msize);
- msg->header.size = htons (msize);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
- msg->status = htons (BARRIER_STATUS_CROSSED);
- msg->name_len = htons (name_len);
- (void) memcpy (msg->data, barrier->name, name_len);
- msg->data[name_len] = '\0';
- while (NULL != (client_ctx = barrier->head))
- {
- dup_msg = GNUNET_copy_message (&msg->header);
- queue_message (client_ctx, dup_msg);
- GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
- }
+ send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
}
struct GNUNET_HashCode key;
size_t name_len;
uint16_t msize;
-
+
msize = ntohs (message->size);
if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
{
(void) memcpy (name, msg->name, name_len);
LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
GNUNET_CRYPTO_hash (name, name_len, &key);
+ GNUNET_free (name);
if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
{
GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- GNUNET_free (name);
return;
}
client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
GNUNET_SERVER_client_keep (client);
client_ctx->barrier = barrier;
GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
+ GNUNET_SERVER_client_set_user_context (client, client_ctx);
}
barrier->nreached++;
if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
&& (LOCAL_QUORUM_REACHED (barrier)))
{
- barrier->status = BARRIER_STATUS_CROSSED;
+ barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
send_barrier_status_msg (barrier, NULL);
- notify_task_cb (barrier, NULL);
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
{
struct ClientCtx *client_ctx;
-
+
if (NULL == client)
return;
client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
if (NULL == client_ctx)
- return; /* We only set user context for locally
- connected clients */
+ return;
cleanup_clientctx (client_ctx);
}
GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
srv = GNUNET_SERVICE_get_server (ctx);
GNUNET_SERVER_add_handlers (srv, message_handlers);
- GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
+ GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
+}
+
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to
+ * iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+barrier_destroy_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Barrier *barrier = value;
+
+ GNUNET_assert (NULL != barrier);
+ cancel_wrappers (barrier);
+ remove_barrier (barrier);
+ return GNUNET_YES;
}
GST_barriers_destroy ()
{
GNUNET_assert (NULL != barrier_map);
+ GNUNET_assert (GNUNET_SYSERR !=
+ GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
+ &barrier_destroy_iterator,
+ NULL));
GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
GNUNET_assert (NULL != ctx);
GNUNET_SERVICE_stop (ctx);
* @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
* error messsage
*/
-static void
+static void
wbarrier_status_cb (void *cls, const char *name,
struct GNUNET_TESTBED_Barrier *b_,
enum GNUNET_TESTBED_BarrierStatus status,
GNUNET_free (wrapper);
switch (status)
{
- case BARRIER_STATUS_ERROR:
+ case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
LOG (GNUNET_ERROR_TYPE_ERROR,
"Initialising barrier `%s' failed at a sub-controller: %s\n",
barrier->name, (NULL != emsg) ? emsg : "NULL");
cancel_wrappers (barrier);
if (NULL == emsg)
emsg = "Initialisation failed at a sub-controller";
- barrier->status = BARRIER_STATUS_ERROR;
+ barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
send_barrier_status_msg (barrier, emsg);
return;
- case BARRIER_STATUS_CROSSED:
- if (BARRIER_STATUS_INITIALISED != barrier->status)
+ case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
+ if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
{
GNUNET_break_op (0);
return;
if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
&& (LOCAL_QUORUM_REACHED (barrier)))
{
- barrier->status = BARRIER_STATUS_CROSSED;
+ barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
send_barrier_status_msg (barrier, NULL);
}
return;
- case BARRIER_STATUS_INITIALISED:
+ case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
if (0 != barrier->status)
{
GNUNET_break_op (0);
barrier->num_wbarriers_inited++;
if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
{
- barrier->status = BARRIER_STATUS_INITIALISED;
+ barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
send_barrier_status_msg (barrier, NULL);
}
return;
fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Barrier *barrier = cls;
-
+
cancel_wrappers (barrier);
- barrier->status = BARRIER_STATUS_ERROR;
+ barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
send_barrier_status_msg (barrier,
"Timedout while propagating barrier initialisation\n");
remove_barrier (barrier);
size_t name_len;
unsigned int cnt;
uint16_t msize;
-
+
if (NULL == GST_context)
{
GNUNET_break_op (0);
LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
{
-
- send_client_status_msg (client, name, BARRIER_STATUS_ERROR,
+
+ send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR,
"A barrier with the same name already exists");
GNUNET_free (name);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
(void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
barrier->quorum = msg->quorum;
barrier->name = name;
- barrier->client = client;
+ barrier->mc = client;
GNUNET_SERVER_client_keep (client);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (barrier_map,
wrapper = GNUNET_malloc (sizeof (struct WBarrier));
wrapper->barrier = barrier;
GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
- wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller,
- barrier->name,
- barrier->quorum,
- &wbarrier_status_cb,
- wrapper);
+ wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
+ barrier->name,
+ barrier->quorum,
+ &wbarrier_status_cb,
+ wrapper,
+ GNUNET_NO);
}
if (NULL == barrier->whead) /* No further propagation */
{
- barrier->status = BARRIER_STATUS_INITIALISED;
- LOG_DEBUG ("Sending BARRIER_STATUS_INITIALISED for barrier `%s'\n",
+ barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
+ LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
barrier->name);
send_barrier_status_msg (barrier, NULL);
}else
GNUNET_break_op (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
- }
+ }
if (client != GST_context->client)
{
GNUNET_break_op (0);
GNUNET_assert (NULL != barrier);
cancel_wrappers (barrier);
remove_barrier (barrier);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
+ * This handler is queued in the main service and will handle the messages sent
+ * either from the testbed driver or from a high level controller
+ *
+ * @param cls NULL
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void
+GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
+ struct Barrier *barrier;
+ struct ClientCtx *client_ctx;
+ const char *name;
+ struct GNUNET_HashCode key;
+ enum GNUNET_TESTBED_BarrierStatus status;
+ uint16_t msize;
+ uint16_t name_len;
+
+ if (NULL == GST_context)
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ if (client != GST_context->client)
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ msize = ntohs (message->size);
+ if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
+ status = ntohs (msg->status);
+ if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
+ {
+ GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
+ status message this way */
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ name = msg->data;
+ name_len = ntohs (msg->name_len);
+ if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ if ('\0' != name[name_len])
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_CRYPTO_hash (name, name_len, &key);
+ barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
+ if (NULL == barrier)
+ {
+ GNUNET_break_op (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ while (NULL != (client_ctx = barrier->head)) /* Notify peers */
+ {
+ queue_message (client_ctx, GNUNET_copy_message (message));
+ GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
+ }
}
/* end of gnunet-service-testbed_barriers.c */