X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Fgnunet-service-testbed_barriers.c;h=4450ddf777c2d59afd01f22e993fdda364be51af;hb=dd1927b960c7cea13733e061a11142274652ba27;hp=71fd8ebceaf63e7a99949bc7e208480de5e47389;hpb=d4922969a54f83b3c2d7503a9bda2708700869c8;p=oweals%2Fgnunet.git diff --git a/src/testbed/gnunet-service-testbed_barriers.c b/src/testbed/gnunet-service-testbed_barriers.c index 71fd8ebce..4450ddf77 100644 --- a/src/testbed/gnunet-service-testbed_barriers.c +++ b/src/testbed/gnunet-service-testbed_barriers.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2008--2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,18 +14,19 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file testbed/gnunet-service-testbed_barriers.c * @brief barrier handling at the testbed controller - * @author Sree Harsha Totakura + * @author Sree Harsha Totakura */ #include "gnunet-service-testbed.h" #include "gnunet-service-testbed_barriers.h" +#include "testbed_api_barriers.h" /** @@ -42,6 +43,17 @@ ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100)) +#ifdef LOG +#undef LOG +#endif + +/** + * Logging shorthand + */ +#define LOG(kind,...) \ + GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__) + + /** * Barrier */ @@ -157,7 +169,7 @@ struct Barrier /** * The client handle to the master controller */ - struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVER_Client *mc; /** * The name of the barrier @@ -187,13 +199,13 @@ struct Barrier /** * Identifier for the timeout task */ - GNUNET_SCHEDULER_TaskIdentifier tout_task; - + struct GNUNET_SCHEDULER_Task * tout_task; + /** * The status of this barrier */ enum GNUNET_TESTBED_BarrierStatus status; - + /** * Number of barriers wrapped in the above DLL */ @@ -223,7 +235,7 @@ struct Barrier * Quorum percentage to be reached */ uint8_t quorum; - + }; @@ -249,7 +261,7 @@ static struct GNUNET_SERVICE_Context *ctx; * @param buf where the callee should write the message * @return number of bytes written to buf */ -static size_t +static size_t transmit_ready_cb (void *cls, size_t size, void *buf) { struct ClientCtx *ctx = cls; @@ -259,12 +271,11 @@ transmit_ready_cb (void *cls, size_t size, void *buf) size_t wrote; ctx->tx = NULL; - wrote = 0; if ((0 == size) || (NULL == buf)) { GNUNET_assert (NULL != ctx->client); GNUNET_SERVER_client_drop (ctx->client); - ctx->client = NULL; + ctx->client = NULL; return 0; } mq = ctx->mq_head; @@ -294,9 +305,11 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg) { struct MessageQueue *mq; struct GNUNET_SERVER_Client *client = ctx->client; - - mq = GNUNET_malloc (sizeof (struct MessageQueue)); + + mq = GNUNET_new (struct MessageQueue); mq->msg = msg; + LOG_DEBUG ("Queueing message of type %u, size %u for sending\n", + ntohs (msg->type), ntohs (msg->size)); GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq); if (NULL == ctx->tx) ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size), @@ -314,8 +327,12 @@ static void cleanup_clientctx (struct ClientCtx *ctx) { struct MessageQueue *mq; - - GNUNET_SERVER_client_drop (ctx->client); + + if (NULL != ctx->client) + { + GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0); + GNUNET_SERVER_client_drop (ctx->client); + } if (NULL != ctx->tx) GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx); if (NULL != (mq = ctx->mq_head)) @@ -338,7 +355,7 @@ static void remove_barrier (struct Barrier *barrier) { struct ClientCtx *ctx; - + GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map, &barrier->hash, barrier)); @@ -348,7 +365,7 @@ remove_barrier (struct Barrier *barrier) cleanup_clientctx (ctx); } GNUNET_free (barrier->name); - GNUNET_SERVER_client_drop (barrier->client); + GNUNET_SERVER_client_drop (barrier->mc); GNUNET_free (barrier); } @@ -379,7 +396,7 @@ cancel_wrappers (struct Barrier *barrier) * @param name the barrier name * @param status the status of the barrier * @param emsg the error message; should be non-NULL for - * status=BARRIER_STATUS_ERROR + * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR */ static void send_client_status_msg (struct GNUNET_SERVER_Client *client, @@ -391,17 +408,19 @@ send_client_status_msg (struct GNUNET_SERVER_Client *client, size_t name_len; uint16_t msize; - GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status)); - name_len = strlen (name) + 1; + GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)); + name_len = strlen (name); msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) - + name_len - + (NULL == emsg) ? 0 : strlen (emsg) + 1; + + (name_len + 1) + + ((NULL == emsg) ? 0 : (strlen (emsg) + 1)); msg = GNUNET_malloc (msize); + msg->header.size = htons (msize); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS); msg->status = htons (status); msg->name_len = htons ((uint16_t) name_len); (void) memcpy (msg->data, name, name_len); if (NULL != emsg) - (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1); + (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg)); GST_queue_message (client, &msg->header); } @@ -411,51 +430,13 @@ send_client_status_msg (struct GNUNET_SERVER_Client *client, * * @param barrier the corresponding barrier * @param emsg the error message; should be non-NULL for - * status=BARRIER_STATUS_ERROR + * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR */ static void send_barrier_status_msg (struct Barrier *barrier, const char *emsg) { GNUNET_assert (0 != barrier->status); - send_client_status_msg (barrier->client, barrier->name, - barrier->status, emsg); -} - - - -/** - * Task for sending barrier crossed notifications to waiting client - * - * @param cls the barrier which is crossed - * @param tc scheduler task context - */ -static void -notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Barrier *barrier = cls; - struct ClientCtx *client_ctx; - struct GNUNET_TESTBED_BarrierStatusMsg *msg; - struct GNUNET_MessageHeader *dup_msg; - uint16_t name_len; - uint16_t msize; - - name_len = strlen (barrier->name) + 1; - msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len; - msg = GNUNET_malloc (msize); - msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS); - msg->status = 0; - msg->name_len = htons (name_len); - (void) memcpy (msg->data, barrier->name, name_len); - msg->data[name_len] = '\0'; - while (NULL != (client_ctx = barrier->head)) - { - dup_msg = GNUNET_copy_message (&msg->header); - queue_message (client_ctx, dup_msg); - GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx); - GNUNET_SERVER_client_set_user_context_ (client_ctx->client, NULL, 0); - GNUNET_free (client_ctx); - } + send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg); } @@ -482,7 +463,7 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_HashCode key; size_t name_len; uint16_t msize; - + msize = ntohs (message->size); if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait)) { @@ -501,25 +482,31 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client, name = GNUNET_malloc (name_len + 1); name[name_len] = '\0'; (void) memcpy (name, msg->name, name_len); - GNUNET_CRYPTO_hash (name, name_len - 1, &key); + LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name); + GNUNET_CRYPTO_hash (name, name_len, &key); + GNUNET_free (name); if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key))) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - GNUNET_free (name); return; } client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx); if (NULL == client_ctx) { - client_ctx = GNUNET_malloc (sizeof (struct ClientCtx)); + client_ctx = GNUNET_new (struct ClientCtx); client_ctx->client = client; GNUNET_SERVER_client_keep (client); client_ctx->barrier = barrier; GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx); - barrier->nreached++; - if (LOCAL_QUORUM_REACHED (barrier)) - notify_task_cb (barrier, NULL); + GNUNET_SERVER_client_set_user_context (client, client_ctx); + } + barrier->nreached++; + if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) + && (LOCAL_QUORUM_REACHED (barrier))) + { + barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED; + send_barrier_status_msg (barrier, NULL); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -537,17 +524,20 @@ static void disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client) { struct ClientCtx *client_ctx; - + + if (NULL == client) + return; client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx); if (NULL == client_ctx) - return; /* We only set user context for locally - connected clients */ + return; cleanup_clientctx (client_ctx); } /** * Function to initialise barrriers component + * + * @param cfg the configuration to use for initialisation */ void GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg) @@ -563,7 +553,31 @@ GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg) GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN); srv = GNUNET_SERVICE_get_server (ctx); GNUNET_SERVER_add_handlers (srv, message_handlers); - GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL); + GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL); +} + + +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static int +barrier_destroy_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Barrier *barrier = value; + + GNUNET_assert (NULL != barrier); + cancel_wrappers (barrier); + remove_barrier (barrier); + return GNUNET_YES; } @@ -571,9 +585,13 @@ GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg) * Function to stop the barrier service */ void -GST_barriers_stop () +GST_barriers_destroy () { GNUNET_assert (NULL != barrier_map); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap_iterate (barrier_map, + &barrier_destroy_iterator, + NULL)); GNUNET_CONTAINER_multihashmap_destroy (barrier_map); GNUNET_assert (NULL != ctx); GNUNET_SERVICE_stop (ctx); @@ -587,13 +605,13 @@ GST_barriers_stop () * * @param cls the closure given to GNUNET_TESTBED_barrier_init() * @param name the name of the barrier - * @param barrier the barrier handle + * @param b_ the barrier handle * @param status status of the barrier; GNUNET_OK if the barrier is crossed; * GNUNET_SYSERR upon error * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the * error messsage */ -static void +static void wbarrier_status_cb (void *cls, const char *name, struct GNUNET_TESTBED_Barrier *b_, enum GNUNET_TESTBED_BarrierStatus status, @@ -608,18 +626,18 @@ wbarrier_status_cb (void *cls, const char *name, GNUNET_free (wrapper); switch (status) { - case BARRIER_STATUS_ERROR: + case GNUNET_TESTBED_BARRIERSTATUS_ERROR: LOG (GNUNET_ERROR_TYPE_ERROR, "Initialising barrier `%s' failed at a sub-controller: %s\n", barrier->name, (NULL != emsg) ? emsg : "NULL"); cancel_wrappers (barrier); if (NULL == emsg) emsg = "Initialisation failed at a sub-controller"; - barrier->status = BARRIER_STATUS_ERROR; + barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR; send_barrier_status_msg (barrier, emsg); return; - case BARRIER_STATUS_CROSSED: - if (BARRIER_STATUS_INITIALISED != barrier->status) + case GNUNET_TESTBED_BARRIERSTATUS_CROSSED: + if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status) { GNUNET_break_op (0); return; @@ -628,11 +646,11 @@ wbarrier_status_cb (void *cls, const char *name, if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) && (LOCAL_QUORUM_REACHED (barrier))) { - barrier->status = BARRIER_STATUS_CROSSED; + barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED; send_barrier_status_msg (barrier, NULL); } return; - case BARRIER_STATUS_INITIALISED: + case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED: if (0 != barrier->status) { GNUNET_break_op (0); @@ -641,7 +659,7 @@ wbarrier_status_cb (void *cls, const char *name, barrier->num_wbarriers_inited++; if (barrier->num_wbarriers_inited == barrier->num_wbarriers) { - barrier->status = BARRIER_STATUS_INITIALISED; + barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED; send_barrier_status_msg (barrier, NULL); } return; @@ -653,16 +671,16 @@ wbarrier_status_cb (void *cls, const char *name, * Function called upon timeout while waiting for a response from the * subcontrollers to barrier init message * - * @param - * @return + * @param cls barrier + * @param tc scheduler task context */ static void fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Barrier *barrier = cls; - + cancel_wrappers (barrier); - barrier->status = BARRIER_STATUS_ERROR; + barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR; send_barrier_status_msg (barrier, "Timedout while propagating barrier initialisation\n"); remove_barrier (barrier); @@ -694,7 +712,7 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, size_t name_len; unsigned int cnt; uint16_t msize; - + if (NULL == GST_context) { GNUNET_break_op (0); @@ -719,20 +737,21 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, name = GNUNET_malloc (name_len + 1); (void) memcpy (name, msg->name, name_len); GNUNET_CRYPTO_hash (name, name_len, &hash); + LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name); if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) { - - send_client_status_msg (client, name, BARRIER_STATUS_ERROR, + + send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR, "A barrier with the same name already exists"); GNUNET_free (name); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - barrier = GNUNET_malloc (sizeof (struct Barrier)); + barrier = GNUNET_new (struct Barrier); (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode)); barrier->quorum = msg->quorum; barrier->name = name; - barrier->client = client; + barrier->mc = client; GNUNET_SERVER_client_keep (client); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (barrier_map, @@ -750,18 +769,21 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_break (0);/* May happen when we are connecting to the controller */ continue; } - wrapper = GNUNET_malloc (sizeof (struct WBarrier)); + wrapper = GNUNET_new (struct WBarrier); wrapper->barrier = barrier; GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper); - wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller, - barrier->name, - barrier->quorum, - &wbarrier_status_cb, - wrapper); + wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller, + barrier->name, + barrier->quorum, + &wbarrier_status_cb, + wrapper, + GNUNET_NO); } if (NULL == barrier->whead) /* No further propagation */ { - barrier->status = BARRIER_STATUS_INITIALISED; + barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED; + LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n", + barrier->name); send_barrier_status_msg (barrier, NULL); }else barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30), @@ -798,7 +820,7 @@ GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_break_op (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; - } + } if (client != GST_context->client) { GNUNET_break_op (0); @@ -827,7 +849,88 @@ GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_assert (NULL != barrier); cancel_wrappers (barrier); remove_barrier (barrier); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages. + * This handler is queued in the main service and will handle the messages sent + * either from the testbed driver or from a high level controller + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_BarrierStatusMsg *msg; + struct Barrier *barrier; + struct ClientCtx *client_ctx; + const char *name; + struct GNUNET_HashCode key; + enum GNUNET_TESTBED_BarrierStatus status; + uint16_t msize; + uint16_t name_len; + + if (NULL == GST_context) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + if (client != GST_context->client) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + msize = ntohs (message->size); + if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message; + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status) + { + GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED + status message this way */ + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + name = msg->data; + name_len = ntohs (msg->name_len); + if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + if ('\0' != name[name_len]) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_CRYPTO_hash (name, name_len, &key); + barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key); + if (NULL == barrier) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_SERVER_receive_done (client, GNUNET_OK); + while (NULL != (client_ctx = barrier->head)) /* Notify peers */ + { + queue_message (client_ctx, GNUNET_copy_message (message)); + GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx); + } } /* end of gnunet-service-testbed_barriers.c */