X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Fgnunet-service-testbed_barriers.c;h=894bf1056a43a26c252de8bb6b35329ad3e4b61e;hb=2ca4b8a208f6f910c819bbaa6785a25ae2955501;hp=079096d86c1699e5dcc2bbe85dd14902cf0bfb86;hpb=5e1efe185cf484018f53dd33d64e546ac042fdee;p=oweals%2Fgnunet.git diff --git a/src/testbed/gnunet-service-testbed_barriers.c b/src/testbed/gnunet-service-testbed_barriers.c index 079096d86..894bf1056 100644 --- a/src/testbed/gnunet-service-testbed_barriers.c +++ b/src/testbed/gnunet-service-testbed_barriers.c @@ -25,6 +25,8 @@ */ #include "gnunet-service-testbed.h" +#include "gnunet-service-testbed_barriers.h" + /** * timeout for outgoing message transmissions in seconds @@ -33,6 +35,13 @@ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s) +/** + * Test to see if local peers have reached the required quorum of a barrier + */ +#define LOCAL_QUORUM_REACHED(barrier) \ + ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100)) + + /** * Barrier */ @@ -60,6 +69,7 @@ struct MessageQueue struct GNUNET_MessageHeader *msg; }; + /** * Context to be associated with each client */ @@ -102,6 +112,38 @@ struct ClientCtx }; +/** + * Wrapper around Barrier handle + */ +struct WBarrier +{ + /** + * DLL next pointer + */ + struct WBarrier *next; + + /** + * DLL prev pointer + */ + struct WBarrier *prev; + + /** + * The local barrier associated with the creation of this wrapper + */ + struct Barrier *barrier; + + /** + * The barrier handle from API + */ + struct GNUNET_TESTBED_Barrier *hbarrier; + + /** + * Has this barrier been crossed? + */ + uint8_t reached; +}; + + /** * Barrier */ @@ -112,6 +154,11 @@ struct Barrier */ struct GNUNET_HashCode hash; + /** + * The client handle to the master controller + */ + struct GNUNET_SERVER_Client *client; + /** * The name of the barrier */ @@ -127,6 +174,41 @@ struct Barrier */ struct ClientCtx *tail; + /** + * DLL head for the list of barrier handles + */ + struct WBarrier *whead; + + /** + * DLL tail for the list of barrier handles + */ + struct WBarrier *wtail; + + /** + * Identifier for the timeout task + */ + GNUNET_SCHEDULER_TaskIdentifier tout_task; + + /** + * The status of this barrier + */ + enum GNUNET_TESTBED_BarrierStatus status; + + /** + * Number of barriers wrapped in the above DLL + */ + unsigned int num_wbarriers; + + /** + * Number of wrapped barriers reached so far + */ + unsigned int num_wbarriers_reached; + + /** + * Number of wrapped barrier initialised so far + */ + unsigned int num_wbarriers_inited; + /** * Number of peers which have reached this barrier */ @@ -142,10 +224,6 @@ struct Barrier */ uint8_t quorum; - /** - * Was there a timeout while propagating initialisation - */ - uint8_t timedout; }; @@ -210,7 +288,6 @@ transmit_ready_cb (void *cls, size_t size, void *buf) * * @param ctx the context associated with the client * @param msg the message to queue. Will be consumed - * @param suspended is the client suspended at the time of calling queue_message */ static void queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg) @@ -228,7 +305,29 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg) } -#if 0 +/** + * Function to cleanup client context data structure + * + * @param ctx the client context data structure + */ +static void +cleanup_clientctx (struct ClientCtx *ctx) +{ + struct MessageQueue *mq; + + GNUNET_SERVER_client_drop (ctx->client); + if (NULL != ctx->tx) + GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx); + if (NULL != (mq = ctx->mq_head)) + { + GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq); + GNUNET_free (mq->msg); + GNUNET_free (mq); + } + GNUNET_free (ctx); +} + + /** * Function to remove a barrier from the barrier map and cleanup resources * occupied by a barrier @@ -238,37 +337,91 @@ queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg) static void remove_barrier (struct Barrier *barrier) { + struct ClientCtx *ctx; + GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map, &barrier->hash, barrier)); + while (NULL != (ctx = barrier->head)) + { + GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx); + cleanup_clientctx (ctx); + } GNUNET_free (barrier->name); + GNUNET_SERVER_client_drop (barrier->client); GNUNET_free (barrier); } /** - * Function called upon timeout while waiting for a response from the - * subcontrollers to barrier init message + * Cancels all subcontroller barrier handles * - * @param - * @return + * @param barrier the local barrier */ static void -fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +cancel_wrappers (struct Barrier *barrier) { - struct ForwardedOperationContext *foctx = cls; - struct Barrier *barrier = foctx->cls; - - barrier->nslaves--; - barrier->timedout = GNUNET_YES; - if (0 == barrier->nslaves) + struct WBarrier *wrapper; + + while (NULL != (wrapper = barrier->whead)) { - GST_send_operation_fail_msg (foctx->client, foctx->operation_id, - "Timeout while contacting a slave controller"); - remove_barrier (barrier); + GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier); + GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); + GNUNET_free (wrapper); } } -#endif + + +/** + * Send a status message about a barrier to the given client + * + * @param client the client to send the message to + * @param name the barrier name + * @param status the status of the barrier + * @param emsg the error message; should be non-NULL for + * status=BARRIER_STATUS_ERROR + */ +static void +send_client_status_msg (struct GNUNET_SERVER_Client *client, + const char *name, + enum GNUNET_TESTBED_BarrierStatus status, + const char *emsg) +{ + struct GNUNET_TESTBED_BarrierStatusMsg *msg; + size_t name_len; + uint16_t msize; + + GNUNET_assert ((NULL == emsg) || (BARRIER_STATUS_ERROR == status)); + name_len = strlen (name) + 1; + msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + + name_len + + (NULL == emsg) ? 0 : strlen (emsg) + 1; + msg = GNUNET_malloc (msize); + msg->status = htons (status); + msg->name_len = htons ((uint16_t) name_len); + (void) memcpy (msg->data, name, name_len); + if (NULL != emsg) + (void) memcpy (msg->data + name_len, emsg, strlen (emsg) + 1); + GST_queue_message (client, &msg->header); +} + + +/** + * Sends a barrier failed message + * + * @param barrier the corresponding barrier + * @param emsg the error message; should be non-NULL for + * status=BARRIER_STATUS_ERROR + */ +static void +send_barrier_status_msg (struct Barrier *barrier, const char *emsg) +{ + GNUNET_assert (0 != barrier->status); + send_client_status_msg (barrier->client, barrier->name, + barrier->status, emsg); +} + + /** * Task for sending barrier crossed notifications to waiting client @@ -281,13 +434,13 @@ notify_task_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Barrier *barrier = cls; struct ClientCtx *client_ctx; - struct GNUNET_TESTBED_BarrierStatus *msg; + struct GNUNET_TESTBED_BarrierStatusMsg *msg; struct GNUNET_MessageHeader *dup_msg; uint16_t name_len; uint16_t msize; name_len = strlen (barrier->name) + 1; - msize = sizeof (struct GNUNET_TESTBED_BarrierStatus) + name_len; + msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len; msg = GNUNET_malloc (msize); msg->header.size = htons (msize); msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS); @@ -365,7 +518,7 @@ handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client, client_ctx->barrier = barrier; GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx); barrier->nreached++; - if ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100)) + if (LOCAL_QUORUM_REACHED (barrier)) notify_task_cb (barrier, NULL); } GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -384,16 +537,12 @@ static void disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client) { struct ClientCtx *client_ctx; - struct Barrier *barrier; client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx); if (NULL == client_ctx) - return; - barrier = client_ctx->barrier; - GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx); - if (NULL != client_ctx->tx) - GNUNET_SERVER_notify_transmit_ready_cancel (client_ctx->tx); - + return; /* We only set user context for locally + connected clients */ + cleanup_clientctx (client_ctx); } @@ -424,11 +573,102 @@ GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg) void GST_barriers_stop () { + GNUNET_assert (NULL != barrier_map); + GNUNET_CONTAINER_multihashmap_destroy (barrier_map); GNUNET_assert (NULL != ctx); GNUNET_SERVICE_stop (ctx); } +/** + * Functions of this type are to be given as callback argument to + * GNUNET_TESTBED_barrier_init(). The callback will be called when status + * information is available for the barrier. + * + * @param cls the closure given to GNUNET_TESTBED_barrier_init() + * @param name the name of the barrier + * @param b_ the barrier handle + * @param status status of the barrier; GNUNET_OK if the barrier is crossed; + * GNUNET_SYSERR upon error + * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the + * error messsage + */ +static void +wbarrier_status_cb (void *cls, const char *name, + struct GNUNET_TESTBED_Barrier *b_, + enum GNUNET_TESTBED_BarrierStatus status, + const char *emsg) +{ + struct WBarrier *wrapper = cls; + struct Barrier *barrier = wrapper->barrier; + + GNUNET_assert (b_ == wrapper->hbarrier); + wrapper->hbarrier = NULL; + GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper); + GNUNET_free (wrapper); + switch (status) + { + case BARRIER_STATUS_ERROR: + LOG (GNUNET_ERROR_TYPE_ERROR, + "Initialising barrier `%s' failed at a sub-controller: %s\n", + barrier->name, (NULL != emsg) ? emsg : "NULL"); + cancel_wrappers (barrier); + if (NULL == emsg) + emsg = "Initialisation failed at a sub-controller"; + barrier->status = BARRIER_STATUS_ERROR; + send_barrier_status_msg (barrier, emsg); + return; + case BARRIER_STATUS_CROSSED: + if (BARRIER_STATUS_INITIALISED != barrier->status) + { + GNUNET_break_op (0); + return; + } + barrier->num_wbarriers_reached++; + if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) + && (LOCAL_QUORUM_REACHED (barrier))) + { + barrier->status = BARRIER_STATUS_CROSSED; + send_barrier_status_msg (barrier, NULL); + } + return; + case BARRIER_STATUS_INITIALISED: + if (0 != barrier->status) + { + GNUNET_break_op (0); + return; + } + barrier->num_wbarriers_inited++; + if (barrier->num_wbarriers_inited == barrier->num_wbarriers) + { + barrier->status = BARRIER_STATUS_INITIALISED; + send_barrier_status_msg (barrier, NULL); + } + return; + } +} + + +/** + * Function called upon timeout while waiting for a response from the + * subcontrollers to barrier init message + * + * @param cls barrier + * @param tc scheduler task context + */ +static void +fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Barrier *barrier = cls; + + cancel_wrappers (barrier); + barrier->status = BARRIER_STATUS_ERROR; + send_barrier_status_msg (barrier, + "Timedout while propagating barrier initialisation\n"); + remove_barrier (barrier); +} + + /** * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This * message should always come from a parent controller or the testbed API if we @@ -446,12 +686,12 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { const struct GNUNET_TESTBED_BarrierInit *msg; - const char *name; + char *name; struct Barrier *barrier; struct Slave *slave; + struct WBarrier *wrapper; struct GNUNET_HashCode hash; size_t name_len; - uint64_t op_id; unsigned int cnt; uint16_t msize; @@ -475,22 +715,25 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, return; } msg = (const struct GNUNET_TESTBED_BarrierInit *) message; - op_id = GNUNET_ntohll (msg->op_id); - name = msg->name; name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit); + name = GNUNET_malloc (name_len + 1); + (void) memcpy (name, msg->name, name_len); GNUNET_CRYPTO_hash (name, name_len, &hash); if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) { - GST_send_operation_fail_msg (client, op_id, "Barrier already initialised"); + + send_client_status_msg (client, name, BARRIER_STATUS_ERROR, + "A barrier with the same name already exists"); + GNUNET_free (name); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } barrier = GNUNET_malloc (sizeof (struct Barrier)); (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode)); barrier->quorum = msg->quorum; - barrier->name = GNUNET_malloc (name_len + 1); - barrier->name[name_len] = '\0'; - (void) memcpy (barrier->name, name, name_len); + barrier->name = name; + barrier->client = client; + GNUNET_SERVER_client_keep (client); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (barrier_map, &barrier->hash, @@ -506,7 +749,85 @@ GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client, { GNUNET_break (0);/* May happen when we are connecting to the controller */ continue; - } - GNUNET_break (0); /* FIXME */ + } + wrapper = GNUNET_malloc (sizeof (struct WBarrier)); + wrapper->barrier = barrier; + GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper); + wrapper->hbarrier = GNUNET_TESTBED_barrier_init (slave->controller, + barrier->name, + barrier->quorum, + &wbarrier_status_cb, + wrapper); + } + if (NULL == barrier->whead) /* No further propagation */ + { + barrier->status = BARRIER_STATUS_INITIALISED; + send_barrier_status_msg (barrier, NULL); + }else + barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30), + &fwd_tout_barrier_init, + barrier); +} + + +/** + * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This + * message should always come from a parent controller or the testbed API if we + * are the root controller. + * + * This handler is queued in the main service and will handle the messages sent + * either from the testbed driver or from a high level controller + * + * @param cls NULL + * @param client identification of the client + * @param message the actual message + */ +void +GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_TESTBED_BarrierCancel *msg; + char *name; + struct Barrier *barrier; + struct GNUNET_HashCode hash; + size_t name_len; + uint16_t msize; + + if (NULL == GST_context) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + if (client != GST_context->client) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; } + msize = ntohs (message->size); + if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel)) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + msg = (const struct GNUNET_TESTBED_BarrierCancel *) message; + name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel); + name = GNUNET_malloc (name_len + 1); + (void) memcpy (name, msg->name, name_len); + GNUNET_CRYPTO_hash (name, name_len, &hash); + if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash)) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash); + GNUNET_assert (NULL != barrier); + cancel_wrappers (barrier); + remove_barrier (barrier); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } + +/* end of gnunet-service-testbed_barriers.c */