X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api.c;h=a643aacdd4ab34ad9565a83ef2d239ba652741f9;hb=4e2504a967ba09643c6dd7e3b9ce400e30adcb3d;hp=47c4e6aa3771d73c3eb7dd9d6381abb31043a680;hpb=c01848497e2a6ce6e8a00dd2fde8717120b1a39e;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index 47c4e6aa3..a643aacdd 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -1,21 +1,19 @@ /* This file is part of GNUnet - (C) 2008--2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2008--2013 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ /** @@ -26,8 +24,6 @@ * @author Christian Grothoff * @author Sree Harsha Totakura */ - - #include "platform.h" #include "gnunet_testbed_service.h" #include "gnunet_core_service.h" @@ -68,28 +64,6 @@ #define TIMEOUT_REL TIME_REL_SECS(1) -/** - * The message queue for sending messages to the controller service - */ -struct MessageQueue -{ - /** - * The message to be sent - */ - struct GNUNET_MessageHeader *msg; - - /** - * next pointer for DLL - */ - struct MessageQueue *next; - - /** - * prev pointer for DLL - */ - struct MessageQueue *prev; -}; - - /** * Context data for forwarded Operation */ @@ -99,7 +73,7 @@ struct ForwardedOperationData /** * The callback to call when reply is available */ - GNUNET_CLIENT_MessageHandler cc; + GNUNET_MQ_MessageCallback cc; /** * The closure for the above callback @@ -241,7 +215,9 @@ exop_check (const struct GNUNET_TESTBED_Operation *const op) while (NULL != entry) { entry2 = entry->next; - GNUNET_CONTAINER_DLL_remove (exop_head, exop_tail, entry); + GNUNET_CONTAINER_DLL_remove (exop_head, + exop_tail, + entry); GNUNET_free (entry); entry = entry2; } @@ -272,12 +248,13 @@ struct SearchContext * @param cls the serach context * @param key current key code * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. */ static int -opc_search_iterator (void *cls, uint32_t key, void *value) +opc_search_iterator (void *cls, + uint32_t key, + void *value) { struct SearchContext *sc = cls; struct OperationContext *opc = value; @@ -358,6 +335,85 @@ GNUNET_TESTBED_remove_opc_ (const struct GNUNET_TESTBED_Controller *c, } + +/** + * Check #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message is well-formed. + * + * @param cls the controller handler + * @param msg message received + * @return #GNUNET_OK if message is well-formed + */ +static int +check_add_host_confirm (void *cls, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) +{ + const char *emsg; + uint16_t msg_size; + + msg_size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == msg_size) + return GNUNET_OK; + /* We have an error message */ + emsg = (const char *) &msg[1]; + if ('\0' != emsg[msg_size - 1]) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * controller (testbed service) + * + * @param cls the controller handler + * @param msg message received + */ +static void +handle_add_host_confirm (void *cls, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) +{ + struct GNUNET_TESTBED_Controller *c = cls; + struct GNUNET_TESTBED_HostRegistrationHandle *rh = c->rh; + const char *emsg; + uint16_t msg_size; + + if (NULL == rh) + return; + if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id)) + { + LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n", + GNUNET_TESTBED_host_get_id_ (rh->host), + ntohl (msg->host_id)); + return; + } + c->rh = NULL; + msg_size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == msg_size) + { + LOG_DEBUG ("Host %u successfully registered\n", + ntohl (msg->host_id)); + GNUNET_TESTBED_mark_host_registered_at_ (rh->host, + c); + rh->cc (rh->cc_cls, + NULL); + GNUNET_free (rh); + return; + } + /* We have an error message */ + emsg = (const char *) &msg[1]; + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Adding host %u failed with error: %s\n"), + ntohl (msg->host_id), + emsg); + rh->cc (rh->cc_cls, + emsg); + GNUNET_free (rh); +} + + /** * Handler for forwarded operations * @@ -366,10 +422,11 @@ GNUNET_TESTBED_remove_opc_ (const struct GNUNET_TESTBED_Controller *c, * @param msg the message */ static void -handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c, +handle_forwarded_operation_msg (void *cls, struct OperationContext *opc, const struct GNUNET_MessageHeader *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct ForwardedOperationData *fo_data; fo_data = opc->data; @@ -382,19 +439,17 @@ handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c, /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_opsuccess (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) +static void +handle_opsuccess (void *cls, + const struct GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; GNUNET_TESTBED_OperationCompletionCallback op_comp_cb; void *op_comp_cb_cls; @@ -406,7 +461,7 @@ handle_opsuccess (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; event.op = opc->op; @@ -418,11 +473,11 @@ handle_opsuccess (struct GNUNET_TESTBED_Controller *c, switch (opc->type) { case OP_FORWARDED: - { - handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; - } + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return; + } break; case OP_PEER_DESTROY: { @@ -471,7 +526,7 @@ handle_opsuccess (struct GNUNET_TESTBED_Controller *c, if (NULL != c->cc) c->cc (c->cc_cls, &event); if (GNUNET_NO == exop_check (event.op)) - return GNUNET_YES; + return; } else LOG_DEBUG ("Not calling callback\n"); @@ -479,30 +534,27 @@ handle_opsuccess (struct GNUNET_TESTBED_Controller *c, op_comp_cb (op_comp_cb_cls, event.op, NULL); /* You could have marked the operation as done by now */ GNUNET_break (GNUNET_NO == exop_check (event.op)); - return GNUNET_YES; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS message from * controller (testbed service) * * @param c the controller handle * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) +static void +handle_peer_create_success (void *cls, + const struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct PeerCreateData *data; struct GNUNET_TESTBED_Peer *peer; struct GNUNET_TESTBED_Operation *op; GNUNET_TESTBED_PeerCreateCallback cb; - void *cls; + void *cb_cls; uint64_t op_id; GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) == @@ -511,13 +563,13 @@ handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_assert (OP_PEER_CREATE == opc->type); GNUNET_assert (NULL != opc->data); @@ -528,33 +580,31 @@ handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, peer->state = TESTBED_PS_CREATED; GNUNET_TESTBED_peer_register_ (peer); cb = data->cb; - cls = data->cls; + cb_cls = data->cls; op = opc->op; GNUNET_free (opc->data); GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; exop_insert (op); if (NULL != cb) - cb (cls, peer, NULL); + cb (cb_cls, peer, NULL); /* You could have marked the operation as done by now */ GNUNET_break (GNUNET_NO == exop_check (op)); - return GNUNET_YES; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_event (struct GNUNET_TESTBED_Controller *c, +static void +handle_peer_event (void *cls, const struct GNUNET_TESTBED_PeerEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct GNUNET_TESTBED_Peer *peer; struct PeerEventData *data; @@ -570,13 +620,13 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type)); data = opc->data; @@ -613,29 +663,27 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, if (NULL != c->cc) c->cc (c->cc_cls, &event); if (GNUNET_NO == exop_check (event.op)) - return GNUNET_YES; + return; } if (NULL != pcc) pcc (pcc_cls, NULL); /* You could have marked the operation as done by now */ GNUNET_break (GNUNET_NO == exop_check (event.op)); - return GNUNET_YES; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, +static void +handle_peer_conevent (void *cls, const struct GNUNET_TESTBED_ConnectionEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct OverlayConnectData *data; GNUNET_TESTBED_OperationCompletionCallback cb; @@ -648,13 +696,13 @@ handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_assert (OP_OVERLAY_CONNECT == opc->type); GNUNET_assert (NULL != (data = opc->data)); @@ -688,30 +736,43 @@ handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, if (NULL != c->cc) c->cc (c->cc_cls, &event); if (GNUNET_NO == exop_check (event.op)) - return GNUNET_YES; + return; } if (NULL != cb) cb (cb_cls, opc->op, NULL); /* You could have marked the operation as done by now */ GNUNET_break (GNUNET_NO == exop_check (event.op)); - return GNUNET_YES; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ static int -handle_peer_config (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +check_peer_config (void *cls, + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) { + /* anything goes? */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_peer_config (void *cls, + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +{ + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct GNUNET_TESTBED_Peer *peer; struct PeerInfoData *data; @@ -724,13 +785,14 @@ handle_peer_config (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { - handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + handle_forwarded_operation_msg (c, + opc, + &msg->header); + return; } data = opc->data; GNUNET_assert (NULL != data); @@ -748,7 +810,8 @@ handle_peer_config (struct GNUNET_TESTBED_Controller *c, { case GNUNET_TESTBED_PIT_IDENTITY: pinfo->result.id = GNUNET_new (struct GNUNET_PeerIdentity); - (void) memcpy (pinfo->result.id, &msg->peer_identity, + GNUNET_memcpy (pinfo->result.id, + &msg->peer_identity, sizeof (struct GNUNET_PeerIdentity)); break; case GNUNET_TESTBED_PIT_CONFIGURATION: @@ -766,24 +829,38 @@ handle_peer_config (struct GNUNET_TESTBED_Controller *c, /* We dont check whether the operation is marked as done here as the operation contains data (cfg/identify) which will be freed at a later point */ - return GNUNET_YES; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not + * @return #GNUNET_OK if message is well-formed */ static int -handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_OperationFailureEventMessage - *msg) +check_op_fail_event (void *cls, + const struct GNUNET_TESTBED_OperationFailureEventMessage *msg) { + /* we accept anything as a valid error message */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_op_fail_event (void *cls, + const struct GNUNET_TESTBED_OperationFailureEventMessage *msg) +{ + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; const char *emsg; uint64_t op_id; @@ -794,13 +871,13 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; @@ -815,7 +892,7 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, if (NULL != data->cb) data->cb (data->cb_cls, opc->op, NULL, emsg); GNUNET_free (data); - return GNUNET_YES; /* We do not call controller callback for peer info */ + return; /* We do not call controller callback for peer info */ } event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; event.op = opc->op; @@ -828,81 +905,80 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, exop_insert (event.op); c->cc (c->cc_cls, &event); if (GNUNET_NO == exop_check (event.op)) - return GNUNET_YES; + return; } switch (opc->type) { case OP_PEER_CREATE: - { - struct PeerCreateData *data; + { + struct PeerCreateData *data; - data = opc->data; - GNUNET_free (data->peer); - if (NULL != data->cb) - data->cb (data->cls, NULL, emsg); - GNUNET_free (data); - } + data = opc->data; + GNUNET_free (data->peer); + if (NULL != data->cb) + data->cb (data->cls, NULL, emsg); + GNUNET_free (data); + } break; case OP_PEER_START: case OP_PEER_STOP: - { - struct PeerEventData *data; + { + struct PeerEventData *data; - data = opc->data; - if (NULL != data->pcc) - data->pcc (data->pcc_cls, emsg); - GNUNET_free (data); - } + data = opc->data; + if (NULL != data->pcc) + data->pcc (data->pcc_cls, emsg); + GNUNET_free (data); + } break; case OP_PEER_DESTROY: break; case OP_PEER_INFO: GNUNET_assert (0); case OP_OVERLAY_CONNECT: - { - struct OverlayConnectData *data; + { + struct OverlayConnectData *data; - data = opc->data; - GNUNET_TESTBED_operation_mark_failed (opc->op); - if (NULL != data->cb) - data->cb (data->cb_cls, opc->op, emsg); - } + data = opc->data; + GNUNET_TESTBED_operation_mark_failed (opc->op); + if (NULL != data->cb) + data->cb (data->cb_cls, opc->op, emsg); + } break; case OP_FORWARDED: GNUNET_assert (0); case OP_LINK_CONTROLLERS: /* No secondary callback */ break; case OP_SHUTDOWN_PEERS: - { - struct ShutdownPeersData *data; + { + struct ShutdownPeersData *data; - data = opc->data; - GNUNET_free (data); /* FIXME: Decide whether we call data->op_cb */ - opc->data = NULL; - } + data = opc->data; + GNUNET_free (data); /* FIXME: Decide whether we call data->op_cb */ + opc->data = NULL; + } break; case OP_MANAGE_SERVICE: - { - struct ManageServiceData *data = opc->data; + { + struct ManageServiceData *data = opc->data; GNUNET_TESTBED_OperationCompletionCallback cb; - void *cb_cls; - - GNUNET_assert (NULL != data); - cb = data->cb; - cb_cls = data->cb_cls; - GNUNET_free (data); - opc->data = NULL; - exop_insert (event.op); - if (NULL != cb) - cb (cb_cls, opc->op, emsg); - /* You could have marked the operation as done by now */ - GNUNET_break (GNUNET_NO == exop_check (event.op)); - } + void *cb_cls; + + GNUNET_assert (NULL != data); + cb = data->cb; + cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + exop_insert (event.op); + if (NULL != cb) + cb (cb_cls, opc->op, emsg); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); + } break; default: GNUNET_break (0); } - return GNUNET_YES; } @@ -930,19 +1006,35 @@ GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id) } + /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller - * (testbed service) + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_INFORMATION message from + * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ static int -handle_slave_config (struct GNUNET_TESTBED_Controller *c, +check_slave_config (void *cls, + const struct GNUNET_TESTBED_SlaveConfiguration *msg) +{ + /* anything goes? */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_slave_config (void *cls, const struct GNUNET_TESTBED_SlaveConfiguration *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; uint64_t op_id; uint64_t mask; @@ -952,12 +1044,12 @@ handle_slave_config (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_GET_SLAVE_CONFIG != opc->type) { GNUNET_break (0); - return GNUNET_YES; + return; } opc->state = OPC_STATE_FINISHED; GNUNET_TESTBED_remove_opc_ (opc->c, opc); @@ -973,24 +1065,38 @@ handle_slave_config (struct GNUNET_TESTBED_Controller *c, event.details.operation_finished.emsg = NULL; c->cc (c->cc_cls, &event); } - return GNUNET_YES; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller + * Check #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller * (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not + * @return #GNUNET_OK if @a msg is well-formed */ static int -handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_ControllerLinkResponse *msg) +check_link_controllers_result (void *cls, + const struct GNUNET_TESTBED_ControllerLinkResponse *msg) +{ + /* actual check to be implemented */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_link_controllers_result (void *cls, + const struct GNUNET_TESTBED_ControllerLinkResponse *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct ControllerLinkData *data; struct GNUNET_CONFIGURATION_Handle *cfg; @@ -1003,18 +1109,18 @@ handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } if (OP_LINK_CONTROLLERS != opc->type) { GNUNET_break (0); - return GNUNET_YES; + return; } GNUNET_assert (NULL != (data = opc->data)); host = GNUNET_TESTBED_host_lookup_by_id_ (data->host_id); @@ -1035,9 +1141,9 @@ handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c, emsg = GNUNET_malloc (ntohs (msg->header.size) - sizeof (struct GNUNET_TESTBED_ControllerLinkResponse) + 1); - memcpy (emsg, &msg[1], ntohs (msg->header.size) - - sizeof (struct - GNUNET_TESTBED_ControllerLinkResponse)); + GNUNET_memcpy (emsg, + &msg[1], + ntohs (msg->header.size)- sizeof (struct GNUNET_TESTBED_ControllerLinkResponse)); event.details.operation_finished.emsg = emsg; } else @@ -1059,179 +1165,157 @@ handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c, if (NULL != cfg) GNUNET_CONFIGURATION_destroy (cfg); GNUNET_free_non_null (emsg); - return GNUNET_YES; } /** - * Handler for messages from controller (testbed service) + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS message. * - * @param cls the controller handler - * @param msg message received, NULL on timeout or fatal error + * @param cls the controller handle to determine the connection this message + * belongs to + * @param msg the barrier status message + * @return #GNUNET_OK if the message is valid; #GNUNET_SYSERR to tear it + * down signalling an error (message malformed) */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +static int +check_barrier_status (void *cls, + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { - struct GNUNET_TESTBED_Controller *c = cls; - int status; uint16_t msize; + uint16_t name_len; + int status; + const char *name; + size_t emsg_len; - c->in_receive = GNUNET_NO; - /* FIXME: Add checks for message integrity */ - if (NULL == msg) + msize = ntohs (msg->header.size); + name = msg->data; + name_len = ntohs (msg->name_len); + + if (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize) { - LOG_DEBUG ("Receive timed out or connection to service dropped\n"); - return; + GNUNET_break_op (0); + return GNUNET_SYSERR; } - msize = ntohs (msg->size); - switch (ntohs (msg->type)) + if ('\0' != name[name_len]) { - case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS: - GNUNET_assert (msize >= - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)); - status = - GNUNET_TESTBED_host_handle_addhostconfirm_ - (c, (const struct GNUNET_TESTBED_HostConfirmedMessage*) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS: - GNUNET_assert (msize == - sizeof (struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage)); - status = - handle_opsuccess (c, - (const struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT: - GNUNET_assert (msize >= - sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage)); - status = - handle_op_fail_event (c, - (const struct - GNUNET_TESTBED_OperationFailureEventMessage *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS: - GNUNET_assert (msize == - sizeof (struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage)); - status = - handle_peer_create_success (c, - (const struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT: - GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - status = - handle_peer_event (c, - (const struct GNUNET_TESTBED_PeerEventMessage *) - msg); - - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION: - GNUNET_assert (msize >= - sizeof (struct - GNUNET_TESTBED_PeerConfigurationInformationMessage)); - status = - handle_peer_config (c, - (const struct - GNUNET_TESTBED_PeerConfigurationInformationMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT: - GNUNET_assert (msize == - sizeof (struct GNUNET_TESTBED_ConnectionEventMessage)); - status = - handle_peer_conevent (c, - (const struct - GNUNET_TESTBED_ConnectionEventMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION: - GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration)); - status = - handle_slave_config (c, - (const struct GNUNET_TESTBED_SlaveConfiguration *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT: - status = - handle_link_controllers_result (c, - (const struct - GNUNET_TESTBED_ControllerLinkResponse - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS: - status = - GNUNET_TESTBED_handle_barrier_status_ (c, - (const struct - GNUNET_TESTBED_BarrierStatusMsg *) - msg); - break; - default: - GNUNET_assert (0); + GNUNET_break_op (0); + return GNUNET_SYSERR; } - if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive)) + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) { - c->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (c->client, &message_handler, c, - GNUNET_TIME_UNIT_FOREVER_REL); + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + + 1); /* +1!? */ + if (0 == emsg_len) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } } + return GNUNET_OK; } /** - * Function called to notify a client about the connection begin ready to queue - * more data. "buf" will be NULL and "size" zero if the connection was closed - * for writing in the meantime. + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls the controller handle to determine the connection this message + * belongs to + * @param msg the barrier status message */ -static size_t -transmit_ready_notify (void *cls, size_t size, void *buf) +static void +handle_barrier_status (void *cls, + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { struct GNUNET_TESTBED_Controller *c = cls; - struct MessageQueue *mq_entry; - - c->th = NULL; - mq_entry = c->mq_head; - GNUNET_assert (NULL != mq_entry); - if ((0 == size) && (NULL == buf)) /* Timeout */ - { - LOG_DEBUG ("Message sending timed out -- retrying\n"); - c->th = - GNUNET_CLIENT_notify_transmit_ready (c->client, - ntohs (mq_entry->msg->size), - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, c); - return 0; - } - GNUNET_assert (ntohs (mq_entry->msg->size) <= size); - size = ntohs (mq_entry->msg->size); - memcpy (buf, mq_entry->msg, size); - LOG_DEBUG ("Message of type: %u and size: %u sent\n", - ntohs (mq_entry->msg->type), size); - GNUNET_free (mq_entry->msg); - GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry); - GNUNET_free (mq_entry); - mq_entry = c->mq_head; - if (NULL != mq_entry) - c->th = - GNUNET_CLIENT_notify_transmit_ready (c->client, - ntohs (mq_entry->msg->size), - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, c); - if (GNUNET_NO == c->in_receive) - { - c->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (c->client, &message_handler, c, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return size; + struct GNUNET_TESTBED_Barrier *barrier; + char *emsg; + const char *name; + struct GNUNET_HashCode key; + size_t emsg_len; + int status; + uint16_t msize; + uint16_t name_len; + + emsg = NULL; + barrier = NULL; + msize = ntohs (msg->header.size); + if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)) + { + GNUNET_break_op (0); + goto cleanup; + } + name = msg->data; + name_len = ntohs (msg->name_len); + if (name_len >= //name_len is strlen(barrier_name) + (msize - ((sizeof msg->header) + sizeof (msg->status)) ) ) + { + GNUNET_break_op (0); + goto cleanup; + } + if ('\0' != name[name_len]) + { + GNUNET_break_op (0); + goto cleanup; + } + LOG_DEBUG ("Received BARRIER_STATUS msg\n"); + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) + { + status = -1; + //unlike name_len, emsg_len includes the trailing zero + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + + (name_len + 1)); + if (0 == emsg_len) + { + GNUNET_break_op (0); + goto cleanup; + } + if ('\0' != (msg->data[(name_len + 1) + (emsg_len - 1)])) + { + GNUNET_break_op (0); + goto cleanup; + } + emsg = GNUNET_malloc (emsg_len); + GNUNET_memcpy (emsg, + msg->data + name_len + 1, + emsg_len); + } + if (NULL == c->barrier_map) + { + GNUNET_break_op (0); + goto cleanup; + } + GNUNET_CRYPTO_hash (name, name_len, &key); + barrier = GNUNET_CONTAINER_multihashmap_get (c->barrier_map, &key); + if (NULL == barrier) + { + GNUNET_break_op (0); + goto cleanup; + } + GNUNET_assert (NULL != barrier->cb); + if ((GNUNET_YES == barrier->echo) && + (GNUNET_TESTBED_BARRIERSTATUS_CROSSED == status)) + GNUNET_TESTBED_queue_message_ (c, + GNUNET_copy_message (&msg->header)); + barrier->cb (barrier->cls, + name, + barrier, + status, + emsg); + if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED == status) + return; /* just initialised; skip cleanup */ + + cleanup: + GNUNET_free_non_null (emsg); + /** + * Do not remove the barrier if we did not echo the status back; this is + * required at the chained testbed controller setup to ensure the only the + * test-driver echos the status and the controller hierarchy properly + * propagates the status. + */ + if ((NULL != barrier) && (GNUNET_YES == barrier->echo)) + GNUNET_TESTBED_barrier_remove_ (barrier); } @@ -1245,7 +1329,8 @@ void GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, struct GNUNET_MessageHeader *msg) { - struct MessageQueue *mq_entry; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; uint16_t type; uint16_t size; @@ -1253,19 +1338,13 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, size = ntohs (msg->size); GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) && (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type)); - mq_entry = GNUNET_new (struct MessageQueue); - mq_entry->msg = msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u, size %u for sending\n", type, - ntohs (msg->size)); - GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail, - mq_entry); - if (NULL == controller->th) - controller->th = - GNUNET_CLIENT_notify_transmit_ready (controller->client, size, - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, - controller); + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + GNUNET_memcpy (m2, msg, size); + GNUNET_free (msg); + GNUNET_MQ_send (controller->mq, + env); } @@ -1284,17 +1363,27 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, * operation */ struct OperationContext * -GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller - *controller, uint64_t operation_id, +GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller, + uint64_t operation_id, const struct GNUNET_MessageHeader *msg, - GNUNET_CLIENT_MessageHandler cc, + GNUNET_MQ_MessageCallback cc, void *cc_cls) { struct OperationContext *opc; struct ForwardedOperationData *data; - struct GNUNET_MessageHeader *dup_msg; - uint16_t msize; - + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + GNUNET_memcpy (m2, + msg, + size); + GNUNET_MQ_send (controller->mq, + env); data = GNUNET_new (struct ForwardedOperationData); data->cc = cc; data->cc_cls = cc_cls; @@ -1303,11 +1392,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller opc->type = OP_FORWARDED; opc->data = data; opc->id = operation_id; - msize = ntohs (msg->size); - dup_msg = GNUNET_malloc (msize); - (void) memcpy (dup_msg, msg, msize); - GNUNET_TESTBED_queue_message_ (opc->c, dup_msg); - GNUNET_TESTBED_insert_opc_ (controller, opc); + GNUNET_TESTBED_insert_opc_ (controller, + opc); return opc; } @@ -1321,7 +1407,8 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller void GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc) { - GNUNET_TESTBED_remove_opc_ (opc->c, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, + opc); GNUNET_free (opc->data); GNUNET_free (opc); } @@ -1428,6 +1515,28 @@ oprelease_get_slave_config (void *cls) } +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure, a `struct GNUNET_TESTBED_Controller *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + /* struct GNUNET_TESTBED_Controller *c = cls; */ + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Encountered MQ error: %d\n", + error); + /* now what? */ + GNUNET_SCHEDULER_shutdown (); /* seems most reasonable */ +} + + /** * Start a controller process using the given configuration at the * given host. @@ -1449,13 +1558,59 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, GNUNET_TESTBED_ControllerCallback cc, void *cc_cls) { - struct GNUNET_TESTBED_Controller *controller; + struct GNUNET_TESTBED_Controller *controller + = GNUNET_new (struct GNUNET_TESTBED_Controller); + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (add_host_confirm, + GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS, + struct GNUNET_TESTBED_HostConfirmedMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_conevent, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT, + struct GNUNET_TESTBED_ConnectionEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (opsuccess, + GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS, + struct GNUNET_TESTBED_GenericOperationSuccessEventMessage, + controller), + GNUNET_MQ_hd_var_size (op_fail_event, + GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT, + struct GNUNET_TESTBED_OperationFailureEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_create_success, + GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS, + struct GNUNET_TESTBED_PeerCreateSuccessEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_event, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT, + struct GNUNET_TESTBED_PeerEventMessage, + controller), + GNUNET_MQ_hd_var_size (peer_config, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, + struct GNUNET_TESTBED_PeerConfigurationInformationMessage, + controller), + GNUNET_MQ_hd_var_size (slave_config, + GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, + struct GNUNET_TESTBED_SlaveConfiguration, + controller), + GNUNET_MQ_hd_var_size (link_controllers_result, + GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + struct GNUNET_TESTBED_ControllerLinkResponse, + controller), + GNUNET_MQ_hd_var_size (barrier_status, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS, + struct GNUNET_TESTBED_BarrierStatusMsg, + controller), + GNUNET_MQ_handler_end () + }; struct GNUNET_TESTBED_InitMessage *msg; + struct GNUNET_MQ_Envelope *env; const struct GNUNET_CONFIGURATION_Handle *cfg; const char *controller_hostname; unsigned long long max_parallel_operations; unsigned long long max_parallel_service_connections; unsigned long long max_parallel_topology_config_operations; + size_t slen; GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host))); if (GNUNET_OK != @@ -1464,6 +1619,7 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, &max_parallel_operations)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } if (GNUNET_OK != @@ -1472,6 +1628,7 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, &max_parallel_service_connections)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } if (GNUNET_OK != @@ -1480,16 +1637,21 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, &max_parallel_topology_config_operations)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } - controller = GNUNET_new (struct GNUNET_TESTBED_Controller); controller->cc = cc; controller->cc_cls = cc_cls; controller->event_mask = event_mask; controller->cfg = GNUNET_CONFIGURATION_dup (cfg); - controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg); - if (NULL == controller->client) + controller->mq = GNUNET_CLIENT_connect (controller->cfg, + "testbed", + handlers, + &mq_error_handler, + controller); + if (NULL == controller->mq) { + GNUNET_break (0); GNUNET_TESTBED_controller_disconnect (controller); return NULL; } @@ -1509,18 +1671,17 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, controller_hostname = GNUNET_TESTBED_host_get_hostname (host); if (NULL == controller_hostname) controller_hostname = "127.0.0.1"; - msg = - GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); + slen = strlen (controller_hostname) + 1; + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_INIT); msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); msg->event_mask = GNUNET_htonll (controller->event_mask); - strcpy ((char *) &msg[1], controller_hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); + GNUNET_memcpy (&msg[1], + controller_hostname, + slen); + GNUNET_MQ_send (controller->mq, + env); return controller; } @@ -1531,9 +1692,8 @@ GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, * @param cls closure * @param key current key code * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. */ static int opc_free_iterator (void *cls, uint32_t key, void *value) @@ -1558,23 +1718,13 @@ opc_free_iterator (void *cls, uint32_t key, void *value) * @param c handle to controller to stop */ void -GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller - *c) +GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller *c) { - struct MessageQueue *mq_entry; - - if (NULL != c->th) - GNUNET_CLIENT_notify_transmit_ready_cancel (c->th); - /* Clear the message queue */ - while (NULL != (mq_entry = c->mq_head)) + if (NULL != c->mq) { - GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, - mq_entry); - GNUNET_free (mq_entry->msg); - GNUNET_free (mq_entry); + GNUNET_MQ_destroy (c->mq); + c->mq = NULL; } - if (NULL != c->client) - GNUNET_CLIENT_disconnect (c->client); if (NULL != c->host) GNUNET_TESTBED_deregister_host_at_ (c->host, c); GNUNET_CONFIGURATION_destroy (c->cfg); @@ -1606,7 +1756,8 @@ GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller * @return the size of the xconfig */ size_t -GNUNET_TESTBED_compress_config_ (const char *config, size_t size, +GNUNET_TESTBED_compress_config_ (const char *config, + size_t size, char **xconfig) { size_t xsize; @@ -1899,11 +2050,13 @@ GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation) /** * Generates configuration by uncompressing configuration in given message. The * given message should be of the following types: - * GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, - * GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, - * GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST, - * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS, - * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + * #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, + * #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, + * #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST, + * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS, + * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + * + * FIXME: This API is incredibly ugly. * * @param msg the message containing compressed configuration * @return handle to the parsed configuration; NULL upon error while parsing the message @@ -2003,9 +2156,10 @@ GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg) } cfg = GNUNET_CONFIGURATION_create (); if (GNUNET_OK != - GNUNET_CONFIGURATION_deserialize (cfg, (const char *) data, + GNUNET_CONFIGURATION_deserialize (cfg, + (const char *) data, (size_t) data_len, - GNUNET_NO)) + NULL)) { GNUNET_free (data); GNUNET_break_op (0); /* De-serialization failure */ @@ -2073,16 +2227,17 @@ static void opstart_shutdown_peers (void *cls) { struct OperationContext *opc = cls; + struct GNUNET_MQ_Envelope *env; struct GNUNET_TESTBED_ShutdownPeersMessage *msg; opc->state = OPC_STATE_STARTED; - msg = GNUNET_new (struct GNUNET_TESTBED_ShutdownPeersMessage); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); msg->operation_id = GNUNET_htonll (opc->id); - GNUNET_TESTBED_insert_opc_ (opc->c, opc); - GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); + GNUNET_TESTBED_insert_opc_ (opc->c, + opc); + GNUNET_MQ_send (opc->c->mq, + env); } @@ -2170,4 +2325,151 @@ GNUNET_TESTBED_get_index (const struct GNUNET_TESTBED_Peer *peer) return peer->unique_id; } + +/** + * Remove a barrier and it was the last one in the barrier hash map, destroy the + * hash map + * + * @param barrier the barrier to remove + */ +void +GNUNET_TESTBED_barrier_remove_ (struct GNUNET_TESTBED_Barrier *barrier) +{ + struct GNUNET_TESTBED_Controller *c = barrier->c; + + GNUNET_assert (NULL != c->barrier_map); /* No barriers present */ + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (c->barrier_map, + &barrier->key, + barrier)); + GNUNET_free (barrier->name); + GNUNET_free (barrier); + if (0 == GNUNET_CONTAINER_multihashmap_size (c->barrier_map)) + { + GNUNET_CONTAINER_multihashmap_destroy (c->barrier_map); + c->barrier_map = NULL; + } +} + + +/** + * Initialise a barrier and call the given callback when the required percentage + * of peers (quorum) reach the barrier OR upon error. + * + * @param controller the handle to the controller + * @param name identification name of the barrier + * @param quorum the percentage of peers that is required to reach the barrier. + * Peers signal reaching a barrier by calling + * GNUNET_TESTBED_barrier_reached(). + * @param cb the callback to call when the barrier is reached or upon error. + * Cannot be NULL. + * @param cls closure for the above callback + * @param echo GNUNET_YES to echo the barrier crossed status message back to the + * controller + * @return barrier handle; NULL upon error + */ +struct GNUNET_TESTBED_Barrier * +GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, + const char *name, + unsigned int quorum, + GNUNET_TESTBED_barrier_status_cb cb, void *cls, + int echo) +{ + struct GNUNET_TESTBED_BarrierInit *msg; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_Barrier *barrier; + struct GNUNET_HashCode key; + size_t name_len; + + GNUNET_assert (quorum <= 100); + GNUNET_assert (NULL != cb); + name_len = strlen (name); + GNUNET_assert (0 < name_len); + GNUNET_CRYPTO_hash (name, name_len, &key); + if (NULL == controller->barrier_map) + controller->barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES); + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (controller->barrier_map, + &key)) + { + GNUNET_break (0); + return NULL; + } + LOG_DEBUG ("Initialising barrier `%s'\n", name); + barrier = GNUNET_new (struct GNUNET_TESTBED_Barrier); + barrier->c = controller; + barrier->name = GNUNET_strdup (name); + barrier->cb = cb; + barrier->cls = cls; + barrier->echo = echo; + GNUNET_memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (controller->barrier_map, + &barrier->key, + barrier, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + env = GNUNET_MQ_msg_extra (msg, + name_len, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); + msg->quorum = (uint8_t) quorum; + GNUNET_memcpy (msg->name, + barrier->name, + name_len); + GNUNET_MQ_send (barrier->c->mq, + env); + return barrier; +} + + +/** + * Initialise a barrier and call the given callback when the required percentage + * of peers (quorum) reach the barrier OR upon error. + * + * @param controller the handle to the controller + * @param name identification name of the barrier + * @param quorum the percentage of peers that is required to reach the barrier. + * Peers signal reaching a barrier by calling + * GNUNET_TESTBED_barrier_reached(). + * @param cb the callback to call when the barrier is reached or upon error. + * Cannot be NULL. + * @param cls closure for the above callback + * @return barrier handle; NULL upon error + */ +struct GNUNET_TESTBED_Barrier * +GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller, + const char *name, + unsigned int quorum, + GNUNET_TESTBED_barrier_status_cb cb, void *cls) +{ + return GNUNET_TESTBED_barrier_init_ (controller, + name, quorum, cb, cls, GNUNET_YES); +} + + +/** + * Cancel a barrier. + * + * @param barrier the barrier handle + */ +void +GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_BarrierCancel *msg; + size_t slen; + + slen = strlen (barrier->name); + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); + GNUNET_memcpy (msg->name, + barrier->name, + slen); + GNUNET_MQ_send (barrier->c->mq, + env); + GNUNET_TESTBED_barrier_remove_ (barrier); +} + + /* end of testbed_api.c */