X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api.c;h=a643aacdd4ab34ad9565a83ef2d239ba652741f9;hb=4e2504a967ba09643c6dd7e3b9ce400e30adcb3d;hp=ad77003fcc501adee2fcc7c9efe24d5fe36e8f07;hpb=277f2ffeed022c1de410776902858d1edeaee910;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index ad77003fc..a643aacdd 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -1,21 +1,19 @@ /* This file is part of GNUnet - (C) 2008--2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2008--2013 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ /** @@ -26,8 +24,6 @@ * @author Christian Grothoff * @author Sree Harsha Totakura */ - - #include "platform.h" #include "gnunet_testbed_service.h" #include "gnunet_core_service.h" @@ -47,13 +43,13 @@ * Generic logging shorthand */ #define LOG(kind, ...) \ - GNUNET_log_from (kind, "testbed-api", __VA_ARGS__); + GNUNET_log_from (kind, "testbed-api", __VA_ARGS__) /** * Debug logging */ #define LOG_DEBUG(...) \ - LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__); + LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) /** * Relative time seconds shorthand @@ -69,272 +65,352 @@ /** - * Handle for controller process + * Context data for forwarded Operation */ -struct GNUNET_TESTBED_ControllerProc +struct ForwardedOperationData { - /** - * The process handle - */ - struct GNUNET_HELPER_Handle *helper; - - /** - * The arguments used to start the helper - */ - char **helper_argv; - - /** - * The host where the helper is run - */ - struct GNUNET_TESTBED_Host *host; /** - * The controller error callback + * The callback to call when reply is available */ - GNUNET_TESTBED_ControllerStatusCallback cb; + GNUNET_MQ_MessageCallback cc; /** * The closure for the above callback */ - void *cls; - - /** - * The send handle for the helper - */ - struct GNUNET_HELPER_SendHandle *shandle; - - /** - * The message corresponding to send handle - */ - struct GNUNET_MessageHeader *msg; - - /** - * The configuration of the running testbed service - */ - struct GNUNET_CONFIGURATION_Handle *cfg; + void *cc_cls; }; /** - * The message queue for sending messages to the controller service + * Context data for get slave config operations */ -struct MessageQueue +struct GetSlaveConfigData { /** - * The message to be sent - */ - struct GNUNET_MessageHeader *msg; - - /** - * next pointer for DLL + * The id of the slave controller */ - struct MessageQueue *next; + uint32_t slave_id; - /** - * prev pointer for DLL - */ - struct MessageQueue *prev; }; /** - * Structure for a controller link + * Context data for controller link operations */ -struct ControllerLink +struct ControllerLinkData { /** - * The next ptr for DLL + * The controller link message */ - struct ControllerLink *next; + struct GNUNET_TESTBED_ControllerLinkRequest *msg; /** - * The prev ptr for DLL + * The id of the host which is hosting the controller to be linked */ - struct ControllerLink *prev; + uint32_t host_id; - /** - * The host which will be referred in the peer start request. This is the - * host where the peer should be started - */ - struct GNUNET_TESTBED_Host *delegated_host; +}; - /** - * The host which will contacted to delegate the peer start request - */ - struct GNUNET_TESTBED_Host *slave_host; +/** + * Date context for OP_SHUTDOWN_PEERS operations + */ +struct ShutdownPeersData +{ /** - * The configuration to be used to connect to slave host + * The operation completion callback to call */ - const struct GNUNET_CONFIGURATION_Handle *slave_cfg; + GNUNET_TESTBED_OperationCompletionCallback cb; /** - * GNUNET_YES if the slave should be started (and stopped) by us; GNUNET_NO - * if we are just allowed to use the slave via TCP/IP + * The closure for the above callback */ - int is_subordinate; + void *cb_cls; }; /** - * handle for host registration + * An entry in the stack for keeping operations which are about to expire */ -struct GNUNET_TESTBED_HostRegistrationHandle +struct ExpireOperationEntry { /** - * The host being registered - */ - struct GNUNET_TESTBED_Host *host; - - /** - * The controller at which this host is being registered + * DLL head; new entries are to be inserted here */ - struct GNUNET_TESTBED_Controller *c; + struct ExpireOperationEntry *next; /** - * The Registartion completion callback + * DLL tail; entries are deleted from here */ - GNUNET_TESTBED_HostRegistrationCompletion cc; + struct ExpireOperationEntry *prev; /** - * The closure for above callback + * The operation. This will be a dangling pointer when the operation is freed */ - void *cc_cls; + const struct GNUNET_TESTBED_Operation *op; }; /** - * Context data for forwarded Operation + * DLL head for list of operations marked for expiry */ -struct ForwardedOperationData -{ +static struct ExpireOperationEntry *exop_head; - /** - * The callback to call when reply is available - */ - GNUNET_CLIENT_MessageHandler cc; +/** + * DLL tail for list of operation marked for expiry + */ +static struct ExpireOperationEntry *exop_tail; - /** - * The closure for the above callback - */ - void *cc_cls; -}; +/** + * Inserts an operation into the list of operations marked for expiry + * + * @param op the operation to insert + */ +static void +exop_insert (struct GNUNET_TESTBED_Operation *op) +{ + struct ExpireOperationEntry *entry; + + entry = GNUNET_new (struct ExpireOperationEntry); + entry->op = op; + GNUNET_CONTAINER_DLL_insert_tail (exop_head, exop_tail, entry); +} /** - * Context data for get slave config operations + * Checks if an operation is present in the list of operations marked for + * expiry. If the operation is found, it and the tail of operations after it + * are removed from the list. + * + * @param op the operation to check + * @return GNUNET_NO if the operation is not present in the list; GNUNET_YES if + * the operation is found in the list (the operation is then removed + * from the list -- calling this function again with the same + * paramenter will return GNUNET_NO) */ -struct GetSlaveConfigData +static int +exop_check (const struct GNUNET_TESTBED_Operation *const op) { - /** - * The id of the slave controller - */ - uint32_t slave_id; + struct ExpireOperationEntry *entry; + struct ExpireOperationEntry *entry2; + int found; -}; + found = GNUNET_NO; + entry = exop_head; + while (NULL != entry) + { + if (op == entry->op) + { + found = GNUNET_YES; + break; + } + entry = entry->next; + } + if (GNUNET_NO == found) + return GNUNET_NO; + /* Truncate the tail */ + while (NULL != entry) + { + entry2 = entry->next; + GNUNET_CONTAINER_DLL_remove (exop_head, + exop_tail, + entry); + GNUNET_free (entry); + entry = entry2; + } + return GNUNET_YES; +} /** - * Context data for controller link operations + * Context information to be used while searching for operation contexts */ -struct ControllerLinkData +struct SearchContext { /** - * The controller link message + * The result of the search */ - struct GNUNET_TESTBED_ControllerLinkMessage *msg; + struct OperationContext *opc; + /** + * The id of the operation context we are searching for + */ + uint64_t id; }; /** - * This variable is set to the operation that has been last marked as done. It - * is used to verify whether the state associated with an operation is valid - * after the first notify callback is called. Such checks are necessary for - * certain operations where we have 2 notify callbacks. Examples are - * OP_PEER_CREATE, OP_PEER_START/STOP, OP_OVERLAY_CONNECT. + * Search iterator for searching an operation context * - * This variable should ONLY be used to compare; it is a dangling pointer!! + * @param cls the serach context + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. */ -static const struct GNUNET_TESTBED_Operation *last_finished_operation; +static int +opc_search_iterator (void *cls, + uint32_t key, + void *value) +{ + struct SearchContext *sc = cls; + struct OperationContext *opc = value; + + GNUNET_assert (NULL != opc); + GNUNET_assert (NULL == sc->opc); + if (opc->id != sc->id) + return GNUNET_YES; + sc->opc = opc; + return GNUNET_NO; +} /** * Returns the operation context with the given id if found in the Operation * context queues of the controller * - * @param c the controller whose queues are searched + * @param c the controller whose operation context map is searched * @param id the id which has to be checked * @return the matching operation context; NULL if no match found */ static struct OperationContext * find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id) { - struct OperationContext *opc; + struct SearchContext sc; + + sc.id = id; + sc.opc = NULL; + GNUNET_assert (NULL != c->opc_map); + if (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap32_get_multiple (c->opc_map, (uint32_t) id, + &opc_search_iterator, &sc)) + return NULL; + return sc.opc; +} + + +/** + * Inserts the given operation context into the operation context map of the + * given controller. Creates the operation context map if one does not exist + * for the controller + * + * @param c the controller + * @param opc the operation context to be inserted + */ +void +GNUNET_TESTBED_insert_opc_ (struct GNUNET_TESTBED_Controller *c, + struct OperationContext *opc) +{ + if (NULL == c->opc_map) + c->opc_map = GNUNET_CONTAINER_multihashmap32_create (256); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (c->opc_map, + (uint32_t) opc->id, opc, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); +} + + +/** + * Removes the given operation context from the operation context map of the + * given controller + * + * @param c the controller + * @param opc the operation context to remove + */ +void +GNUNET_TESTBED_remove_opc_ (const struct GNUNET_TESTBED_Controller *c, + struct OperationContext *opc) +{ + GNUNET_assert (NULL != c->opc_map); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (c->opc_map, + (uint32_t) opc->id, + opc)); + if ( (0 == GNUNET_CONTAINER_multihashmap32_size (c->opc_map)) + && (NULL != c->opcq_empty_cb) ) + c->opcq_empty_cb (c->opcq_empty_cls); +} + + + +/** + * Check #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message is well-formed. + * + * @param cls the controller handler + * @param msg message received + * @return #GNUNET_OK if message is well-formed + */ +static int +check_add_host_confirm (void *cls, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) +{ + const char *emsg; + uint16_t msg_size; - for (opc = c->ocq_head; NULL != opc; opc = opc->next) + msg_size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == msg_size) + return GNUNET_OK; + /* We have an error message */ + emsg = (const char *) &msg[1]; + if ('\0' != emsg[msg_size - 1]) { - if (id == opc->id) - return opc; + GNUNET_break (0); + return GNUNET_SYSERR; } - return NULL; + return GNUNET_OK; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from * controller (testbed service) * - * @param c the controller handler + * @param cls the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_addhostconfirm (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_HostConfirmedMessage *msg) +static void +handle_add_host_confirm (void *cls, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) { - struct GNUNET_TESTBED_HostRegistrationHandle *rh; - char *emsg; + struct GNUNET_TESTBED_Controller *c = cls; + struct GNUNET_TESTBED_HostRegistrationHandle *rh = c->rh; + const char *emsg; uint16_t msg_size; - rh = c->rh; if (NULL == rh) - { - return GNUNET_OK; - } + return; if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id)) { LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n", - GNUNET_TESTBED_host_get_id_ (rh->host), ntohl (msg->host_id)); - return GNUNET_OK; + GNUNET_TESTBED_host_get_id_ (rh->host), + ntohl (msg->host_id)); + return; } c->rh = NULL; - msg_size = ntohs (msg->header.size); - if (sizeof (struct GNUNET_TESTBED_HostConfirmedMessage) == msg_size) - { - LOG_DEBUG ("Host %u successfully registered\n", ntohl (msg->host_id)); - GNUNET_TESTBED_mark_host_registered_at_ (rh->host, c); - rh->cc (rh->cc_cls, NULL); + msg_size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == msg_size) + { + LOG_DEBUG ("Host %u successfully registered\n", + ntohl (msg->host_id)); + GNUNET_TESTBED_mark_host_registered_at_ (rh->host, + c); + rh->cc (rh->cc_cls, + NULL); GNUNET_free (rh); - return GNUNET_OK; + return; } /* We have an error message */ - emsg = (char *) &msg[1]; - if ('\0' != - emsg[msg_size - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)]) - { - GNUNET_break (0); - GNUNET_free (rh); - return GNUNET_NO; - } - LOG (GNUNET_ERROR_TYPE_ERROR, _("Adding host %u failed with error: %s\n"), - ntohl (msg->host_id), emsg); - rh->cc (rh->cc_cls, emsg); + emsg = (const char *) &msg[1]; + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Adding host %u failed with error: %s\n"), + ntohl (msg->host_id), + emsg); + rh->cc (rh->cc_cls, + emsg); GNUNET_free (rh); - return GNUNET_OK; } @@ -346,36 +422,37 @@ handle_addhostconfirm (struct GNUNET_TESTBED_Controller *c, * @param msg the message */ static void -handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c, +handle_forwarded_operation_msg (void *cls, struct OperationContext *opc, const struct GNUNET_MessageHeader *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct ForwardedOperationData *fo_data; fo_data = opc->data; if (NULL != fo_data->cc) fo_data->cc (fo_data->cc_cls, msg); - GNUNET_CONTAINER_DLL_remove (c->ocq_head, c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (c, opc); GNUNET_free (fo_data); GNUNET_free (opc); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_opsuccess (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) +static void +handle_opsuccess (void *cls, + const struct GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; + GNUNET_TESTBED_OperationCompletionCallback op_comp_cb; + void *op_comp_cb_cls; struct GNUNET_TESTBED_EventInformation event; uint64_t op_id; @@ -384,77 +461,100 @@ handle_opsuccess (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; - event.details.operation_finished.operation = opc->op; - event.details.operation_finished.op_cls = opc->op_cls; + event.op = opc->op; + event.op_cls = opc->op_cls; event.details.operation_finished.emsg = NULL; event.details.operation_finished.generic = NULL; + op_comp_cb = NULL; + op_comp_cb_cls = NULL; switch (opc->type) { case OP_FORWARDED: - { - handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; - } + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return; + } break; case OP_PEER_DESTROY: { struct GNUNET_TESTBED_Peer *peer; peer = opc->data; + GNUNET_TESTBED_peer_deregister_ (peer); GNUNET_free (peer); opc->data = NULL; //PEERDESTROYDATA } break; - case OP_LINK_CONTROLLERS: + case OP_SHUTDOWN_PEERS: { - struct ControllerLinkData *data; + struct ShutdownPeersData *data; data = opc->data; - GNUNET_assert (NULL != data); + op_comp_cb = data->cb; + op_comp_cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + GNUNET_TESTBED_cleanup_peers_ (); + } + break; + case OP_MANAGE_SERVICE: + { + struct ManageServiceData *data; + + GNUNET_assert (NULL != (data = opc->data)); + op_comp_cb = data->cb; + op_comp_cb_cls = data->cb_cls; GNUNET_free (data); opc->data = NULL; } break; + case OP_PEER_RECONFIGURE: + break; default: GNUNET_assert (0); } - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; + exop_insert (event.op); if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED))) { if (NULL != c->cc) c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } else LOG_DEBUG ("Not calling callback\n"); - return GNUNET_YES; + if (NULL != op_comp_cb) + op_comp_cb (op_comp_cb_cls, event.op, NULL); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS message from * controller (testbed service) * * @param c the controller handle * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) +static void +handle_peer_create_success (void *cls, + const struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct PeerCreateData *data; struct GNUNET_TESTBED_Peer *peer; + struct GNUNET_TESTBED_Operation *op; GNUNET_TESTBED_PeerCreateCallback cb; - void *cls; + void *cb_cls; uint64_t op_id; GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) == @@ -463,13 +563,13 @@ handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_assert (OP_PEER_CREATE == opc->type); GNUNET_assert (NULL != opc->data); @@ -477,31 +577,34 @@ handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, GNUNET_assert (NULL != data->peer); peer = data->peer; GNUNET_assert (peer->unique_id == ntohl (msg->peer_id)); - peer->state = PS_CREATED; + peer->state = TESTBED_PS_CREATED; + GNUNET_TESTBED_peer_register_ (peer); cb = data->cb; - cls = data->cls; + cb_cls = data->cls; + op = opc->op; GNUNET_free (opc->data); - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; + exop_insert (op); if (NULL != cb) - cb (cls, peer, NULL); - return GNUNET_YES; + cb (cb_cls, peer, NULL); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_event (struct GNUNET_TESTBED_Controller *c, +static void +handle_peer_event (void *cls, const struct GNUNET_TESTBED_PeerEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct GNUNET_TESTBED_Peer *peer; struct PeerEventData *data; @@ -509,6 +612,7 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, void *pcc_cls; struct GNUNET_TESTBED_EventInformation event; uint64_t op_id; + uint64_t mask; GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerEventMessage) == ntohs (msg->header.size)); @@ -516,13 +620,13 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type)); data = opc->data; @@ -530,15 +634,17 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, peer = data->peer; GNUNET_assert (NULL != peer); event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type); + event.op = opc->op; + event.op_cls = opc->op_cls; switch (event.type) { case GNUNET_TESTBED_ET_PEER_START: - peer->state = PS_STARTED; + peer->state = TESTBED_PS_STARTED; event.details.peer_start.host = peer->host; event.details.peer_start.peer = peer; break; case GNUNET_TESTBED_ET_PEER_STOP: - peer->state = PS_STOPPED; + peer->state = TESTBED_PS_STOPPED; event.details.peer_stop.peer = peer; break; default: @@ -547,59 +653,64 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, pcc = data->pcc; pcc_cls = data->pcc_cls; GNUNET_free (data); - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; - if (0 != - ((GNUNET_TESTBED_ET_PEER_START | GNUNET_TESTBED_ET_PEER_STOP) & - c->event_mask)) + exop_insert (event.op); + mask = 1LL << GNUNET_TESTBED_ET_PEER_START; + mask |= 1LL << GNUNET_TESTBED_ET_PEER_STOP; + if (0 != (mask & c->event_mask)) { if (NULL != c->cc) c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } if (NULL != pcc) pcc (pcc_cls, NULL); - return GNUNET_YES; + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, +static void +handle_peer_conevent (void *cls, const struct GNUNET_TESTBED_ConnectionEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct OverlayConnectData *data; GNUNET_TESTBED_OperationCompletionCallback cb; void *cb_cls; struct GNUNET_TESTBED_EventInformation event; uint64_t op_id; + uint64_t mask; op_id = GNUNET_ntohll (msg->operation_id); if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } GNUNET_assert (OP_OVERLAY_CONNECT == opc->type); - data = opc->data; - GNUNET_assert (NULL != data); + GNUNET_assert (NULL != (data = opc->data)); GNUNET_assert ((ntohl (msg->peer1) == data->p1->unique_id) && (ntohl (msg->peer2) == data->p2->unique_id)); event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type); + event.op = opc->op; + event.op_cls = opc->op_cls; switch (event.type) { case GNUNET_TESTBED_ET_CONNECT: @@ -615,35 +726,53 @@ handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, } cb = data->cb; cb_cls = data->cb_cls; - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; - if (NULL != cb) - cb (cb_cls, opc->op, NULL); - if (0 != - ((GNUNET_TESTBED_ET_CONNECT | GNUNET_TESTBED_ET_DISCONNECT) & - c->event_mask)) + exop_insert (event.op); + mask = 1LL << GNUNET_TESTBED_ET_CONNECT; + mask |= 1LL << GNUNET_TESTBED_ET_DISCONNECT; + if (0 != (mask & c->event_mask)) { if (NULL != c->cc) c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } - return GNUNET_YES; + if (NULL != cb) + cb (cb_cls, opc->op, NULL); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ static int -handle_peer_config (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +check_peer_config (void *cls, + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +{ + /* anything goes? */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_peer_config (void *cls, + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct GNUNET_TESTBED_Peer *peer; struct PeerInfoData *data; @@ -656,30 +785,33 @@ handle_peer_config (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { - handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + handle_forwarded_operation_msg (c, + opc, + &msg->header); + return; } data = opc->data; GNUNET_assert (NULL != data); peer = data->peer; GNUNET_assert (NULL != peer); GNUNET_assert (ntohl (msg->peer_id) == peer->unique_id); - pinfo = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerInformation)); + pinfo = GNUNET_new (struct GNUNET_TESTBED_PeerInformation); pinfo->pit = data->pit; cb = data->cb; cb_cls = data->cb_cls; + GNUNET_assert (NULL != cb); GNUNET_free (data); opc->data = NULL; switch (pinfo->pit) { case GNUNET_TESTBED_PIT_IDENTITY: - pinfo->result.id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); - (void) memcpy (pinfo->result.id, &msg->peer_identity, + pinfo->result.id = GNUNET_new (struct GNUNET_PeerIdentity); + GNUNET_memcpy (pinfo->result.id, + &msg->peer_identity, sizeof (struct GNUNET_PeerIdentity)); break; case GNUNET_TESTBED_PIT_CONFIGURATION: @@ -691,46 +823,63 @@ handle_peer_config (struct GNUNET_TESTBED_Controller *c, break; } opc->data = pinfo; - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; - if (NULL != cb) - cb (cb_cls, opc->op, pinfo, NULL); - return GNUNET_YES; + cb (cb_cls, opc->op, pinfo, NULL); + /* We dont check whether the operation is marked as done here as the + operation contains data (cfg/identify) which will be freed at a later point + */ } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not + * @return #GNUNET_OK if message is well-formed */ static int -handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_OperationFailureEventMessage - *msg) +check_op_fail_event (void *cls, + const struct GNUNET_TESTBED_OperationFailureEventMessage *msg) +{ + /* we accept anything as a valid error message */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_op_fail_event (void *cls, + const struct GNUNET_TESTBED_OperationFailureEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; const char *emsg; uint64_t op_id; + uint64_t mask; struct GNUNET_TESTBED_EventInformation event; op_id = GNUNET_ntohll (msg->operation_id); if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + return; } - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; emsg = GNUNET_TESTBED_parse_error_string_ (msg); if (NULL == emsg) @@ -743,66 +892,93 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, if (NULL != data->cb) data->cb (data->cb_cls, opc->op, NULL, emsg); GNUNET_free (data); - return GNUNET_YES; /* We do not call controller callback for peer info */ + return; /* We do not call controller callback for peer info */ } - if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) && - (NULL != c->cc)) + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.op = opc->op; + event.op_cls = opc->op_cls; + event.details.operation_finished.emsg = emsg; + event.details.operation_finished.generic = NULL; + mask = (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED); + if ((0 != (mask & c->event_mask)) && (NULL != c->cc)) { - event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; - event.details.operation_finished.operation = opc->op; - event.details.operation_finished.op_cls = opc->op_cls; - event.details.operation_finished.emsg = emsg; - event.details.operation_finished.generic = NULL; + exop_insert (event.op); c->cc (c->cc_cls, &event); - if (event.details.operation_finished.operation == last_finished_operation) - return GNUNET_YES; + if (GNUNET_NO == exop_check (event.op)) + return; } switch (opc->type) { case OP_PEER_CREATE: - { - struct PeerCreateData *data; + { + struct PeerCreateData *data; - data = opc->data; - GNUNET_free (data->peer); - if (NULL != data->cb) - data->cb (data->cls, NULL, emsg); - GNUNET_free (data); - } + data = opc->data; + GNUNET_free (data->peer); + if (NULL != data->cb) + data->cb (data->cls, NULL, emsg); + GNUNET_free (data); + } break; case OP_PEER_START: case OP_PEER_STOP: - { - struct PeerEventData *data; + { + struct PeerEventData *data; - data = opc->data; - if (NULL != data->pcc) - data->pcc (data->pcc_cls, emsg); - GNUNET_free (data); - } + data = opc->data; + if (NULL != data->pcc) + data->pcc (data->pcc_cls, emsg); + GNUNET_free (data); + } break; case OP_PEER_DESTROY: break; case OP_PEER_INFO: GNUNET_assert (0); case OP_OVERLAY_CONNECT: - { - struct OverlayConnectData *data; + { + struct OverlayConnectData *data; - data = opc->data; - data->failed = GNUNET_YES; - if (NULL != data->cb) - data->cb (data->cb_cls, opc->op, emsg); - } + data = opc->data; + GNUNET_TESTBED_operation_mark_failed (opc->op); + if (NULL != data->cb) + data->cb (data->cb_cls, opc->op, emsg); + } break; case OP_FORWARDED: GNUNET_assert (0); case OP_LINK_CONTROLLERS: /* No secondary callback */ break; + case OP_SHUTDOWN_PEERS: + { + struct ShutdownPeersData *data; + + data = opc->data; + GNUNET_free (data); /* FIXME: Decide whether we call data->op_cb */ + opc->data = NULL; + } + break; + case OP_MANAGE_SERVICE: + { + struct ManageServiceData *data = opc->data; + GNUNET_TESTBED_OperationCompletionCallback cb; + void *cb_cls; + + GNUNET_assert (NULL != data); + cb = data->cb; + cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + exop_insert (event.op); + if (NULL != cb) + cb (cb_cls, opc->op, emsg); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); + } + break; default: GNUNET_break (0); } - return GNUNET_YES; } @@ -830,210 +1006,316 @@ GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id) } + /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller - * (testbed service) + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_INFORMATION message from + * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ static int -handle_slave_config (struct GNUNET_TESTBED_Controller *c, +check_slave_config (void *cls, + const struct GNUNET_TESTBED_SlaveConfiguration *msg) +{ + /* anything goes? */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_slave_config (void *cls, const struct GNUNET_TESTBED_SlaveConfiguration *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; uint64_t op_id; + uint64_t mask; struct GNUNET_TESTBED_EventInformation event; op_id = GNUNET_ntohll (msg->operation_id); if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_GET_SLAVE_CONFIG != opc->type) { GNUNET_break (0); - return GNUNET_YES; + return; } - GNUNET_free (opc->data); - opc->data = NULL; opc->state = OPC_STATE_FINISHED; - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); - if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) && + GNUNET_TESTBED_remove_opc_ (opc->c, opc); + mask = 1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED; + if ((0 != (mask & c->event_mask)) && (NULL != c->cc)) { opc->data = GNUNET_TESTBED_extract_config_ (&msg->header); event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.op = opc->op; + event.op_cls = opc->op_cls; event.details.operation_finished.generic = opc->data; - event.details.operation_finished.operation = opc->op; - event.details.operation_finished.op_cls = opc->op_cls; event.details.operation_finished.emsg = NULL; c->cc (c->cc_cls, &event); } - return GNUNET_YES; } /** - * Handler for messages from controller (testbed service) + * Check #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller + * (testbed service) * - * @param cls the controller handler - * @param msg message received, NULL on timeout or fatal error + * @param c the controller handler + * @param msg message received + * @return #GNUNET_OK if @a msg is well-formed + */ +static int +check_link_controllers_result (void *cls, + const struct GNUNET_TESTBED_ControllerLinkResponse *msg) +{ + /* actual check to be implemented */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received */ static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +handle_link_controllers_result (void *cls, + const struct GNUNET_TESTBED_ControllerLinkResponse *msg) { struct GNUNET_TESTBED_Controller *c = cls; - int status; - uint16_t msize; + struct OperationContext *opc; + struct ControllerLinkData *data; + struct GNUNET_CONFIGURATION_Handle *cfg; + struct GNUNET_TESTBED_Host *host; + char *emsg; + uint64_t op_id; + struct GNUNET_TESTBED_EventInformation event; - c->in_receive = GNUNET_NO; - /* FIXME: Add checks for message integrity */ - if (NULL == msg) + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) { - LOG_DEBUG ("Receive timed out or connection to service dropped\n"); + LOG_DEBUG ("Operation not found\n"); return; } - status = GNUNET_OK; - msize = ntohs (msg->size); - switch (ntohs (msg->type)) + if (OP_FORWARDED == opc->type) { - case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS: - GNUNET_assert (msize >= - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)); - status = - handle_addhostconfirm (c, - (const struct GNUNET_TESTBED_HostConfirmedMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS: - GNUNET_assert (msize == - sizeof (struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage)); - status = - handle_opsuccess (c, - (const struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS: - GNUNET_assert (msize == - sizeof (struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage)); - status = - handle_peer_create_success (c, - (const struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT: - GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - status = - handle_peer_event (c, - (const struct GNUNET_TESTBED_PeerEventMessage *) - msg); + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return; + } + if (OP_LINK_CONTROLLERS != opc->type) + { + GNUNET_break (0); + return; + } + GNUNET_assert (NULL != (data = opc->data)); + host = GNUNET_TESTBED_host_lookup_by_id_ (data->host_id); + GNUNET_assert (NULL != host); + GNUNET_free (data); + opc->data = NULL; + opc->state = OPC_STATE_FINISHED; + GNUNET_TESTBED_remove_opc_ (opc->c, opc); + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.op = opc->op; + event.op_cls = opc->op_cls; + event.details.operation_finished.emsg = NULL; + event.details.operation_finished.generic = NULL; + emsg = NULL; + cfg = NULL; + if (GNUNET_NO == ntohs (msg->success)) + { + emsg = GNUNET_malloc (ntohs (msg->header.size) + - sizeof (struct + GNUNET_TESTBED_ControllerLinkResponse) + 1); + GNUNET_memcpy (emsg, + &msg[1], + ntohs (msg->header.size)- sizeof (struct GNUNET_TESTBED_ControllerLinkResponse)); + event.details.operation_finished.emsg = emsg; + } + else + { + if (0 != ntohs (msg->config_size)) + { + cfg = GNUNET_TESTBED_extract_config_ ((const struct GNUNET_MessageHeader *) msg); + GNUNET_assert (NULL != cfg); + GNUNET_TESTBED_host_replace_cfg_ (host, cfg); + } + } + if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED))) + { + if (NULL != c->cc) + c->cc (c->cc_cls, &event); + } + else + LOG_DEBUG ("Not calling callback\n"); + if (NULL != cfg) + GNUNET_CONFIGURATION_destroy (cfg); + GNUNET_free_non_null (emsg); +} - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION: - GNUNET_assert (msize >= - sizeof (struct - GNUNET_TESTBED_PeerConfigurationInformationMessage)); - status = - handle_peer_config (c, - (const struct - GNUNET_TESTBED_PeerConfigurationInformationMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT: - GNUNET_assert (msize == - sizeof (struct GNUNET_TESTBED_ConnectionEventMessage)); - status = - handle_peer_conevent (c, - (const struct - GNUNET_TESTBED_ConnectionEventMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT: - GNUNET_assert (msize >= - sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage)); - status = - handle_op_fail_event (c, - (const struct - GNUNET_TESTBED_OperationFailureEventMessage *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION: - GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration)); - status = - handle_slave_config (c, - (const struct GNUNET_TESTBED_SlaveConfiguration *) - msg); - break; - default: - GNUNET_assert (0); + +/** + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS message. + * + * @param cls the controller handle to determine the connection this message + * belongs to + * @param msg the barrier status message + * @return #GNUNET_OK if the message is valid; #GNUNET_SYSERR to tear it + * down signalling an error (message malformed) + */ +static int +check_barrier_status (void *cls, + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) +{ + uint16_t msize; + uint16_t name_len; + int status; + const char *name; + size_t emsg_len; + + msize = ntohs (msg->header.size); + name = msg->data; + name_len = ntohs (msg->name_len); + + if (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; } - if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive)) + if ('\0' != name[name_len]) { - c->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (c->client, &message_handler, c, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_break_op (0); + return GNUNET_SYSERR; } + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) + { + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + + 1); /* +1!? */ + if (0 == emsg_len) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + } + return GNUNET_OK; } /** - * Function called to notify a client about the connection begin ready to queue - * more data. "buf" will be NULL and "size" zero if the connection was closed - * for writing in the meantime. + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls the controller handle to determine the connection this message + * belongs to + * @param msg the barrier status message */ -static size_t -transmit_ready_notify (void *cls, size_t size, void *buf) +static void +handle_barrier_status (void *cls, + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { struct GNUNET_TESTBED_Controller *c = cls; - struct MessageQueue *mq_entry; - - c->th = NULL; - mq_entry = c->mq_head; - GNUNET_assert (NULL != mq_entry); - if ((0 == size) && (NULL == buf)) /* Timeout */ - { - LOG_DEBUG ("Message sending timed out -- retrying\n"); - c->th = - GNUNET_CLIENT_notify_transmit_ready (c->client, - ntohs (mq_entry->msg->size), - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, c); - return 0; - } - GNUNET_assert (ntohs (mq_entry->msg->size) <= size); - size = ntohs (mq_entry->msg->size); - memcpy (buf, mq_entry->msg, size); - LOG_DEBUG ("Message of type: %u and size: %u sent\n", - ntohs (mq_entry->msg->type), size); - GNUNET_free (mq_entry->msg); - GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry); - GNUNET_free (mq_entry); - mq_entry = c->mq_head; - if (NULL != mq_entry) - c->th = - GNUNET_CLIENT_notify_transmit_ready (c->client, - ntohs (mq_entry->msg->size), - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, c); - if (GNUNET_NO == c->in_receive) - { - c->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (c->client, &message_handler, c, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return size; + struct GNUNET_TESTBED_Barrier *barrier; + char *emsg; + const char *name; + struct GNUNET_HashCode key; + size_t emsg_len; + int status; + uint16_t msize; + uint16_t name_len; + + emsg = NULL; + barrier = NULL; + msize = ntohs (msg->header.size); + if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)) + { + GNUNET_break_op (0); + goto cleanup; + } + name = msg->data; + name_len = ntohs (msg->name_len); + if (name_len >= //name_len is strlen(barrier_name) + (msize - ((sizeof msg->header) + sizeof (msg->status)) ) ) + { + GNUNET_break_op (0); + goto cleanup; + } + if ('\0' != name[name_len]) + { + GNUNET_break_op (0); + goto cleanup; + } + LOG_DEBUG ("Received BARRIER_STATUS msg\n"); + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) + { + status = -1; + //unlike name_len, emsg_len includes the trailing zero + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + + (name_len + 1)); + if (0 == emsg_len) + { + GNUNET_break_op (0); + goto cleanup; + } + if ('\0' != (msg->data[(name_len + 1) + (emsg_len - 1)])) + { + GNUNET_break_op (0); + goto cleanup; + } + emsg = GNUNET_malloc (emsg_len); + GNUNET_memcpy (emsg, + msg->data + name_len + 1, + emsg_len); + } + if (NULL == c->barrier_map) + { + GNUNET_break_op (0); + goto cleanup; + } + GNUNET_CRYPTO_hash (name, name_len, &key); + barrier = GNUNET_CONTAINER_multihashmap_get (c->barrier_map, &key); + if (NULL == barrier) + { + GNUNET_break_op (0); + goto cleanup; + } + GNUNET_assert (NULL != barrier->cb); + if ((GNUNET_YES == barrier->echo) && + (GNUNET_TESTBED_BARRIERSTATUS_CROSSED == status)) + GNUNET_TESTBED_queue_message_ (c, + GNUNET_copy_message (&msg->header)); + barrier->cb (barrier->cls, + name, + barrier, + status, + emsg); + if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED == status) + return; /* just initialised; skip cleanup */ + + cleanup: + GNUNET_free_non_null (emsg); + /** + * Do not remove the barrier if we did not echo the status back; this is + * required at the chained testbed controller setup to ensure the only the + * test-driver echos the status and the controller hierarchy properly + * propagates the status. + */ + if ((NULL != barrier) && (GNUNET_YES == barrier->echo)) + GNUNET_TESTBED_barrier_remove_ (barrier); } @@ -1047,7 +1329,8 @@ void GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, struct GNUNET_MessageHeader *msg) { - struct MessageQueue *mq_entry; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; uint16_t type; uint16_t size; @@ -1055,19 +1338,13 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, size = ntohs (msg->size); GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) && (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type)); - mq_entry = GNUNET_malloc (sizeof (struct MessageQueue)); - mq_entry->msg = msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u, size %u for sending\n", type, - ntohs (msg->size)); - GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail, - mq_entry); - if (NULL == controller->th) - controller->th = - GNUNET_CLIENT_notify_transmit_ready (controller->client, size, - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, - controller); + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + GNUNET_memcpy (m2, msg, size); + GNUNET_free (msg); + GNUNET_MQ_send (controller->mq, + env); } @@ -1086,31 +1363,37 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, * operation */ struct OperationContext * -GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller - *controller, uint64_t operation_id, +GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller, + uint64_t operation_id, const struct GNUNET_MessageHeader *msg, - GNUNET_CLIENT_MessageHandler cc, + GNUNET_MQ_MessageCallback cc, void *cc_cls) { struct OperationContext *opc; struct ForwardedOperationData *data; - struct GNUNET_MessageHeader *dup_msg; - uint16_t msize; - - data = GNUNET_malloc (sizeof (struct ForwardedOperationData)); + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + GNUNET_memcpy (m2, + msg, + size); + GNUNET_MQ_send (controller->mq, + env); + data = GNUNET_new (struct ForwardedOperationData); data->cc = cc; data->cc_cls = cc_cls; - opc = GNUNET_malloc (sizeof (struct OperationContext)); + opc = GNUNET_new (struct OperationContext); opc->c = controller; opc->type = OP_FORWARDED; opc->data = data; opc->id = operation_id; - msize = ntohs (msg->size); - dup_msg = GNUNET_malloc (msize); - (void) memcpy (dup_msg, msg, msize); - GNUNET_TESTBED_queue_message_ (opc->c, dup_msg); - GNUNET_CONTAINER_DLL_insert_tail (controller->ocq_head, controller->ocq_tail, - opc); + GNUNET_TESTBED_insert_opc_ (controller, + opc); return opc; } @@ -1124,104 +1407,13 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller void GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc) { - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, + opc); GNUNET_free (opc->data); GNUNET_free (opc); } -/** - * Functions with this signature are called whenever a - * complete message is received by the tokenizer. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure - * @param client identification of the client - * @param message the actual message - * - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -helper_mst (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_TESTBED_ControllerProc *cp = cls; - const struct GNUNET_TESTBED_HelperReply *msg; - const char *hostname; - char *config; - uLongf config_size; - uLongf xconfig_size; - - msg = (const struct GNUNET_TESTBED_HelperReply *) message; - GNUNET_assert (sizeof (struct GNUNET_TESTBED_HelperReply) < - ntohs (msg->header.size)); - GNUNET_assert (GNUNET_MESSAGE_TYPE_TESTBED_HELPER_REPLY == - ntohs (msg->header.type)); - config_size = (uLongf) ntohs (msg->config_size); - xconfig_size = - (uLongf) (ntohs (msg->header.size) - - sizeof (struct GNUNET_TESTBED_HelperReply)); - config = GNUNET_malloc (config_size); - GNUNET_assert (Z_OK == - uncompress ((Bytef *) config, &config_size, - (const Bytef *) &msg[1], xconfig_size)); - GNUNET_assert (NULL == cp->cfg); - cp->cfg = GNUNET_CONFIGURATION_create (); - GNUNET_assert (GNUNET_CONFIGURATION_deserialize - (cp->cfg, config, config_size, GNUNET_NO)); - GNUNET_free (config); - if ((NULL == cp->host) || - (NULL == (hostname = GNUNET_TESTBED_host_get_hostname (cp->host)))) - hostname = "localhost"; - /* Change the hostname so that we can connect to it */ - GNUNET_CONFIGURATION_set_value_string (cp->cfg, "testbed", "hostname", - hostname); - cp->cb (cp->cls, cp->cfg, GNUNET_OK); - return GNUNET_OK; -} - - -/** - * Continuation function from GNUNET_HELPER_send() - * - * @param cls closure - * @param result GNUNET_OK on success, - * GNUNET_NO if helper process died - * GNUNET_SYSERR during GNUNET_HELPER_stop - */ -static void -clear_msg (void *cls, int result) -{ - struct GNUNET_TESTBED_ControllerProc *cp = cls; - - GNUNET_assert (NULL != cp->shandle); - cp->shandle = NULL; - GNUNET_free (cp->msg); -} - - -/** - * Callback that will be called when the helper process dies. This is not called - * when the helper process is stoped using GNUNET_HELPER_stop() - * - * @param cls the closure from GNUNET_HELPER_start() - */ -static void -helper_exp_cb (void *cls) -{ - struct GNUNET_TESTBED_ControllerProc *cp = cls; - GNUNET_TESTBED_ControllerStatusCallback cb; - void *cb_cls; - - cb = cp->cb; - cb_cls = cp->cls; - cp->helper = NULL; - GNUNET_TESTBED_controller_stop (cp); - if (NULL != cb) - cb (cb_cls, NULL, GNUNET_SYSERR); -} - - /** * Function to call to start a link-controllers type operation once all queues * the operation is part of declare that the operation can be activated. @@ -1233,14 +1425,14 @@ opstart_link_controllers (void *cls) { struct OperationContext *opc = cls; struct ControllerLinkData *data; - struct GNUNET_TESTBED_ControllerLinkMessage *msg; + struct GNUNET_TESTBED_ControllerLinkRequest *msg; GNUNET_assert (NULL != opc->data); data = opc->data; msg = data->msg; data->msg = NULL; opc->state = OPC_STATE_STARTED; - GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_insert_opc_ (opc->c, opc); GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); } @@ -1263,7 +1455,7 @@ oprelease_link_controllers (void *cls) GNUNET_free (data->msg); break; case OPC_STATE_STARTED: - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); break; case OPC_STATE_FINISHED: break; @@ -1282,12 +1474,15 @@ static void opstart_get_slave_config (void *cls) { struct OperationContext *opc = cls; - struct GetSlaveConfigData *data; + struct GetSlaveConfigData *data = opc->data; struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg; - data = opc->data; + GNUNET_assert (NULL != data); msg = GNUNET_TESTBED_generate_slavegetconfig_msg_ (opc->id, data->slave_id); - GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_free (opc->data); + data = NULL; + opc->data = NULL; + GNUNET_TESTBED_insert_opc_ (opc->c, opc); GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); opc->state = OPC_STATE_STARTED; } @@ -1309,8 +1504,7 @@ oprelease_get_slave_config (void *cls) GNUNET_free (opc->data); break; case OPC_STATE_STARTED: - GNUNET_free (opc->data); - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); break; case OPC_STATE_FINISHED: if (NULL != opc->data) @@ -1322,286 +1516,24 @@ oprelease_get_slave_config (void *cls) /** - * Function to copy NULL terminated list of arguments - * - * @param argv the NULL terminated list of arguments. Cannot be NULL. - * @return the copied NULL terminated arguments - */ -static char ** -copy_argv (const char *const *argv) -{ - char **argv_dup; - unsigned int argp; - - GNUNET_assert (NULL != argv); - for (argp = 0; NULL != argv[argp]; argp++) ; - argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1)); - for (argp = 0; NULL != argv[argp]; argp++) - argv_dup[argp] = strdup (argv[argp]); - return argv_dup; -} - - -/** - * Function to join NULL terminated list of arguments + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param argv1 the NULL terminated list of arguments. Cannot be NULL. - * @param argv2 the NULL terminated list of arguments. Cannot be NULL. - * @return the joined NULL terminated arguments - */ -static char ** -join_argv (const char *const *argv1, const char *const *argv2) -{ - char **argvj; - char *argv; - unsigned int carg; - unsigned int cnt; - - carg = 0; - argvj = NULL; - for (cnt = 0; NULL != argv1[cnt]; cnt++) - { - argv = GNUNET_strdup (argv1[cnt]); - GNUNET_array_append (argvj, carg, argv); - } - for (cnt = 0; NULL != argv2[cnt]; cnt++) - { - argv = GNUNET_strdup (argv2[cnt]); - GNUNET_array_append (argvj, carg, argv); - } - GNUNET_array_append (argvj, carg, NULL); - return argvj; -} - - -/** - * Frees the given NULL terminated arguments - * - * @param argv the NULL terminated list of arguments + * @param cls closure, a `struct GNUNET_TESTBED_Controller *` + * @param error error code */ static void -free_argv (char **argv) -{ - unsigned int argp; - - for (argp = 0; NULL != argv[argp]; argp++) - GNUNET_free (argv[argp]); - GNUNET_free (argv); -} - - -/** - * Generates arguments for opening a remote shell. Builds up the arguments - * from the environment variable GNUNET_TESTBED_RSH_CMD. The variable - * should not mention `-p' (port) option and destination address as these will - * be set locally in the function from its parameteres. If the environmental - * variable is not found then it defaults to `ssh -o BatchMode=yes -o - * NoHostAuthenticationForLocalhost=yes' - * - * @param port the destination port number - * @param dst the destination address - * @return NULL terminated list of arguments - */ -static char ** -gen_rsh_args (const char *port, const char *dst) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - static const char *default_ssh_args[] = { - "ssh", - "-o", - "BatchMode=yes", - "-o", - "NoHostAuthenticationForLocalhost=yes", - NULL - }; - char **ssh_args; - char *ssh_cmd; - char *ssh_cmd_cp; - char *arg; - unsigned int cnt; - - ssh_args = NULL; - if (NULL != (ssh_cmd = getenv ("GNUNET_TESTBED_RSH_CMD"))) - { - ssh_cmd = GNUNET_strdup (ssh_cmd); - ssh_cmd_cp = ssh_cmd; - for (cnt = 0; NULL != (arg = strtok (ssh_cmd, " ")); ssh_cmd = NULL) - GNUNET_array_append (ssh_args, cnt, GNUNET_strdup (arg)); - GNUNET_free (ssh_cmd_cp); - } - else - { - ssh_args = copy_argv (default_ssh_args); - cnt = (sizeof (default_ssh_args)) / (sizeof (const char *)); - GNUNET_array_grow (ssh_args, cnt, cnt - 1); - } - GNUNET_array_append (ssh_args, cnt, GNUNET_strdup ("-p")); - GNUNET_array_append (ssh_args, cnt, GNUNET_strdup (port)); - GNUNET_array_append (ssh_args, cnt, GNUNET_strdup (dst)); - GNUNET_array_append (ssh_args, cnt, NULL); - return ssh_args; -} + /* struct GNUNET_TESTBED_Controller *c = cls; */ - -/** - * Generates the arguments needed for executing the given binary in a remote - * shell. Builds the arguments from the environmental variable - * GNUNET_TETSBED_RSH_CMD_SUFFIX. If the environmental variable is not found, - * only the given binary name will be present in the returned arguments - * - * @param helper_binary_path the path of the binary to execute - * @return NULL-terminated args - */ -static char ** -gen_rsh_suffix_args (const char *helper_binary_path) -{ - char **rshell_args; - char *rshell_cmd; - char *rshell_cmd_cp; - char *arg; - unsigned int cnt; - - rshell_args = NULL; - cnt = 0; - if (NULL != (rshell_cmd = getenv ("GNUNET_TESTBED_RSH_CMD_SUFFIX"))) - { - rshell_cmd = GNUNET_strdup (rshell_cmd); - rshell_cmd_cp = rshell_cmd; - for (; NULL != (arg = strtok (rshell_cmd, " ")); rshell_cmd = NULL) - GNUNET_array_append (rshell_args, cnt, GNUNET_strdup (arg)); - GNUNET_free (rshell_cmd_cp); - } - GNUNET_array_append (rshell_args, cnt, GNUNET_strdup (helper_binary_path)); - GNUNET_array_append (rshell_args, cnt, NULL); - return rshell_args; -} - - -/** - * Starts a controller process at the given host - * - * @param trusted_ip the ip address of the controller which will be set as TRUSTED - * HOST(all connections form this ip are permitted by the testbed) when - * starting testbed controller at host. This can either be a single ip - * address or a network address in CIDR notation. - * @param host the host where the controller has to be started; NULL for - * localhost - * @param cfg template configuration to use for the remote controller; the - * remote controller will be started with a slightly modified - * configuration (port numbers, unix domain sockets and service home - * values are changed as per TESTING library on the remote host) - * @param cb function called when the controller is successfully started or - * dies unexpectedly; GNUNET_TESTBED_controller_stop shouldn't be - * called if cb is called with GNUNET_SYSERR as status. Will never be - * called in the same task as 'GNUNET_TESTBED_controller_start' - * (synchronous errors will be signalled by returning NULL). This - * parameter cannot be NULL. - * @param cls closure for above callbacks - * @return the controller process handle, NULL on errors - */ -struct GNUNET_TESTBED_ControllerProc * -GNUNET_TESTBED_controller_start (const char *trusted_ip, - struct GNUNET_TESTBED_Host *host, - const struct GNUNET_CONFIGURATION_Handle *cfg, - GNUNET_TESTBED_ControllerStatusCallback cb, - void *cls) -{ - struct GNUNET_TESTBED_ControllerProc *cp; - struct GNUNET_TESTBED_HelperInit *msg; - const char *hostname; - - static char *const binary_argv[] = { - HELPER_TESTBED_BINARY, NULL - }; - - hostname = NULL; - cp = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_ControllerProc)); - if ((NULL == host) || (0 == GNUNET_TESTBED_host_get_id_ (host))) - { - cp->helper = - GNUNET_HELPER_start (GNUNET_YES, HELPER_TESTBED_BINARY, binary_argv, - &helper_mst, &helper_exp_cb, cp); - } - else - { - char *helper_binary_path; - char **ssh_args; - char **rshell_args; - const char *username; - char *port; - char *dst; - - username = GNUNET_TESTBED_host_get_username_ (host); - hostname = GNUNET_TESTBED_host_get_hostname (host); - GNUNET_asprintf (&port, "%u", GNUNET_TESTBED_host_get_ssh_port_ (host)); - if (NULL == username) - GNUNET_asprintf (&dst, "%s", hostname); - else - GNUNET_asprintf (&dst, "%s@%s", username, hostname); - LOG_DEBUG ("Starting SSH to destination %s\n", dst); - - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cfg, "testbed", - "HELPER_BINARY_PATH", - &helper_binary_path)) - helper_binary_path = - GNUNET_OS_get_libexec_binary_path (HELPER_TESTBED_BINARY); - ssh_args = gen_rsh_args (port, dst); - rshell_args = gen_rsh_suffix_args (helper_binary_path); - cp->helper_argv = - join_argv ((const char **) ssh_args, (const char **) rshell_args); - free_argv (ssh_args); - free_argv (rshell_args); - GNUNET_free (port); - GNUNET_free (dst); - cp->helper = - GNUNET_HELPER_start (GNUNET_NO, "ssh", cp->helper_argv, &helper_mst, - &helper_exp_cb, cp); - GNUNET_free (helper_binary_path); - } - if (NULL == cp->helper) - { - if (NULL != cp->helper_argv) - free_argv (cp->helper_argv); - GNUNET_free (cp); - return NULL; - } - cp->host = host; - cp->cb = cb; - cp->cls = cls; - msg = GNUNET_TESTBED_create_helper_init_msg_ (trusted_ip, hostname, cfg); - cp->msg = &msg->header; - cp->shandle = - GNUNET_HELPER_send (cp->helper, &msg->header, GNUNET_NO, &clear_msg, cp); - if (NULL == cp->shandle) - { - GNUNET_free (msg); - GNUNET_TESTBED_controller_stop (cp); - return NULL; - } - return cp; -} - - -/** - * Stop the controller process (also will terminate all peers and controllers - * dependent on this controller). This function blocks until the testbed has - * been fully terminated (!). The controller status cb from - * GNUNET_TESTBED_controller_start() will not be called. - * - * @param cproc the controller process handle - */ -void -GNUNET_TESTBED_controller_stop (struct GNUNET_TESTBED_ControllerProc *cproc) -{ - if (NULL != cproc->shandle) - GNUNET_HELPER_send_cancel (cproc->shandle); - if (NULL != cproc->helper) - GNUNET_HELPER_soft_stop (cproc->helper); - if (NULL != cproc->cfg) - GNUNET_CONFIGURATION_destroy (cproc->cfg); - if (NULL != cproc->helper_argv) - free_argv (cproc->helper_argv); - GNUNET_free (cproc); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Encountered MQ error: %d\n", + error); + /* now what? */ + GNUNET_SCHEDULER_shutdown (); /* seems most reasonable */ } @@ -1609,10 +1541,9 @@ GNUNET_TESTBED_controller_stop (struct GNUNET_TESTBED_ControllerProc *cproc) * Start a controller process using the given configuration at the * given host. * - * @param cfg configuration to use * @param host host to run the controller on; This should be the same host if * the controller was previously started with - * GNUNET_TESTBED_controller_start; NULL for localhost + * GNUNET_TESTBED_controller_start() * @param event_mask bit mask with set of events to call 'cc' for; * or-ed values of "1LL" shifted by the * respective 'enum GNUNET_TESTBED_EventType' @@ -1622,25 +1553,73 @@ GNUNET_TESTBED_controller_stop (struct GNUNET_TESTBED_ControllerProc *cproc) * @return handle to the controller */ struct GNUNET_TESTBED_Controller * -GNUNET_TESTBED_controller_connect (const struct GNUNET_CONFIGURATION_Handle - *cfg, struct GNUNET_TESTBED_Host *host, +GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, uint64_t event_mask, GNUNET_TESTBED_ControllerCallback cc, void *cc_cls) { - struct GNUNET_TESTBED_Controller *controller; + struct GNUNET_TESTBED_Controller *controller + = GNUNET_new (struct GNUNET_TESTBED_Controller); + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (add_host_confirm, + GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS, + struct GNUNET_TESTBED_HostConfirmedMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_conevent, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT, + struct GNUNET_TESTBED_ConnectionEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (opsuccess, + GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS, + struct GNUNET_TESTBED_GenericOperationSuccessEventMessage, + controller), + GNUNET_MQ_hd_var_size (op_fail_event, + GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT, + struct GNUNET_TESTBED_OperationFailureEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_create_success, + GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS, + struct GNUNET_TESTBED_PeerCreateSuccessEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_event, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT, + struct GNUNET_TESTBED_PeerEventMessage, + controller), + GNUNET_MQ_hd_var_size (peer_config, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, + struct GNUNET_TESTBED_PeerConfigurationInformationMessage, + controller), + GNUNET_MQ_hd_var_size (slave_config, + GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, + struct GNUNET_TESTBED_SlaveConfiguration, + controller), + GNUNET_MQ_hd_var_size (link_controllers_result, + GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + struct GNUNET_TESTBED_ControllerLinkResponse, + controller), + GNUNET_MQ_hd_var_size (barrier_status, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS, + struct GNUNET_TESTBED_BarrierStatusMsg, + controller), + GNUNET_MQ_handler_end () + }; struct GNUNET_TESTBED_InitMessage *msg; + struct GNUNET_MQ_Envelope *env; + const struct GNUNET_CONFIGURATION_Handle *cfg; const char *controller_hostname; unsigned long long max_parallel_operations; unsigned long long max_parallel_service_connections; unsigned long long max_parallel_topology_config_operations; + size_t slen; + GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host))); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "testbed", "MAX_PARALLEL_OPERATIONS", &max_parallel_operations)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } if (GNUNET_OK != @@ -1649,6 +1628,7 @@ GNUNET_TESTBED_controller_connect (const struct GNUNET_CONFIGURATION_Handle &max_parallel_service_connections)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } if (GNUNET_OK != @@ -1657,335 +1637,112 @@ GNUNET_TESTBED_controller_connect (const struct GNUNET_CONFIGURATION_Handle &max_parallel_topology_config_operations)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } - controller = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Controller)); controller->cc = cc; controller->cc_cls = cc_cls; controller->event_mask = event_mask; controller->cfg = GNUNET_CONFIGURATION_dup (cfg); - controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg); - if (NULL == controller->client) - { - GNUNET_TESTBED_controller_disconnect (controller); - return NULL; - } - if (NULL == host) - { - host = GNUNET_TESTBED_host_create_by_id_ (0); - if (NULL == host) /* If the above host create fails */ - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Treating NULL host as localhost. Multiple references to localhost " - "may break when localhost freed before calling disconnect \n"); - host = GNUNET_TESTBED_host_lookup_by_id_ (0); - } - else - { - controller->aux_host = GNUNET_YES; - } - } - GNUNET_assert (NULL != host); - GNUNET_TESTBED_mark_host_registered_at_ (host, controller); - controller->host = host; - controller->opq_parallel_operations = - GNUNET_TESTBED_operation_queue_create_ ((unsigned int) - max_parallel_operations); - controller->opq_parallel_service_connections = - GNUNET_TESTBED_operation_queue_create_ ((unsigned int) - max_parallel_service_connections); - controller->opq_parallel_topology_config_operations = - GNUNET_TESTBED_operation_queue_create_ ((unsigned int) - max_parallel_topology_config_operations); - controller_hostname = GNUNET_TESTBED_host_get_hostname (host); - if (NULL == controller_hostname) - controller_hostname = "127.0.0.1"; - msg = - GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); - msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); - msg->event_mask = GNUNET_htonll (controller->event_mask); - strcpy ((char *) &msg[1], controller_hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); - return controller; -} - - -/** - * Configure shared services at a controller. Using this function, - * you can specify that certain services (such as "resolver") - * should not be run for each peer but instead be shared - * across N peers on the specified host. This function - * must be called before any peers are created at the host. - * - * @param controller controller to configure - * @param service_name name of the service to share - * @param num_peers number of peers that should share one instance - * of the specified service (1 for no sharing is the default), - * use 0 to disable the service - */ -void -GNUNET_TESTBED_controller_configure_sharing (struct GNUNET_TESTBED_Controller - *controller, - const char *service_name, - uint32_t num_peers) -{ - struct GNUNET_TESTBED_ConfigureSharedServiceMessage *msg; - uint16_t service_name_size; - uint16_t msg_size; - - service_name_size = strlen (service_name) + 1; - msg_size = - sizeof (struct GNUNET_TESTBED_ConfigureSharedServiceMessage) + - service_name_size; - msg = GNUNET_malloc (msg_size); - msg->header.size = htons (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHARE_SERVICE); - msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (controller->host)); - msg->num_peers = htonl (num_peers); - memcpy (&msg[1], service_name, service_name_size); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); - GNUNET_break (0); /* This function is not yet implemented on the - * testbed service */ -} - - -/** - * disconnects from the controller. - * - * @param controller handle to controller to stop - */ -void -GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller - *controller) -{ - struct MessageQueue *mq_entry; - - if (NULL != controller->th) - GNUNET_CLIENT_notify_transmit_ready_cancel (controller->th); - /* Clear the message queue */ - while (NULL != (mq_entry = controller->mq_head)) - { - GNUNET_CONTAINER_DLL_remove (controller->mq_head, controller->mq_tail, - mq_entry); - GNUNET_free (mq_entry->msg); - GNUNET_free (mq_entry); - } - if (NULL != controller->client) - GNUNET_CLIENT_disconnect (controller->client); - GNUNET_CONFIGURATION_destroy (controller->cfg); - if (GNUNET_YES == controller->aux_host) - GNUNET_TESTBED_host_destroy (controller->host); - GNUNET_TESTBED_operation_queue_destroy_ (controller->opq_parallel_operations); - GNUNET_TESTBED_operation_queue_destroy_ - (controller->opq_parallel_service_connections); - GNUNET_TESTBED_operation_queue_destroy_ - (controller->opq_parallel_topology_config_operations); - GNUNET_free (controller); -} - - -/** - * Register a host with the controller - * - * @param controller the controller handle - * @param host the host to register - * @param cc the completion callback to call to inform the status of - * registration. After calling this callback the registration handle - * will be invalid. Cannot be NULL. - * @param cc_cls the closure for the cc - * @return handle to the host registration which can be used to cancel the - * registration - */ -struct GNUNET_TESTBED_HostRegistrationHandle * -GNUNET_TESTBED_register_host (struct GNUNET_TESTBED_Controller *controller, - struct GNUNET_TESTBED_Host *host, - GNUNET_TESTBED_HostRegistrationCompletion cc, - void *cc_cls) -{ - struct GNUNET_TESTBED_HostRegistrationHandle *rh; - struct GNUNET_TESTBED_AddHostMessage *msg; - const char *username; - const char *hostname; - uint16_t msg_size; - uint16_t user_name_length; - - if (NULL != controller->rh) - return NULL; - hostname = GNUNET_TESTBED_host_get_hostname (host); - if (GNUNET_YES == GNUNET_TESTBED_is_host_registered_ (host, controller)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Host hostname: %s already registered\n", - (NULL == hostname) ? "localhost" : hostname); - return NULL; - } - rh = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_HostRegistrationHandle)); - rh->host = host; - rh->c = controller; - GNUNET_assert (NULL != cc); - rh->cc = cc; - rh->cc_cls = cc_cls; - controller->rh = rh; - username = GNUNET_TESTBED_host_get_username_ (host); - msg_size = (sizeof (struct GNUNET_TESTBED_AddHostMessage)); - user_name_length = 0; - if (NULL != username) - { - user_name_length = strlen (username) + 1; - msg_size += user_name_length; - } - /* FIXME: what happens when hostname is NULL? localhost */ - GNUNET_assert (NULL != hostname); - msg_size += strlen (hostname) + 1; - msg = GNUNET_malloc (msg_size); - msg->header.size = htons (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST); - msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); - msg->ssh_port = htons (GNUNET_TESTBED_host_get_ssh_port_ (host)); - if (NULL != username) - { - msg->user_name_length = htons (user_name_length - 1); - memcpy (&msg[1], username, user_name_length); - } - else - msg->user_name_length = htons (user_name_length); - strcpy (((void *) &msg[1]) + user_name_length, hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); - return rh; -} - - -/** - * Cancel the pending registration. Note that if the registration message is - * already sent to the service the cancellation has only the effect that the - * registration completion callback for the registration is never called. - * - * @param handle the registration handle to cancel - */ -void -GNUNET_TESTBED_cancel_registration (struct GNUNET_TESTBED_HostRegistrationHandle - *handle) -{ - if (handle != handle->c->rh) + controller->mq = GNUNET_CLIENT_connect (controller->cfg, + "testbed", + handlers, + &mq_error_handler, + controller); + if (NULL == controller->mq) { GNUNET_break (0); - return; + GNUNET_TESTBED_controller_disconnect (controller); + return NULL; } - handle->c->rh = NULL; - GNUNET_free (handle); + GNUNET_TESTBED_mark_host_registered_at_ (host, controller); + controller->host = host; + controller->opq_parallel_operations = + GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, + (unsigned int) max_parallel_operations); + controller->opq_parallel_service_connections = + GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, + (unsigned int) + max_parallel_service_connections); + controller->opq_parallel_topology_config_operations = + GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, + (unsigned int) + max_parallel_topology_config_operations); + controller_hostname = GNUNET_TESTBED_host_get_hostname (host); + if (NULL == controller_hostname) + controller_hostname = "127.0.0.1"; + slen = strlen (controller_hostname) + 1; + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_INIT); + msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); + msg->event_mask = GNUNET_htonll (controller->event_mask); + GNUNET_memcpy (&msg[1], + controller_hostname, + slen); + GNUNET_MQ_send (controller->mq, + env); + return controller; } /** - * Same as the GNUNET_TESTBED_controller_link_2, but with ids for delegated host - * and slave host + * Iterator to free opc map entries * - * @param op_cls the operation closure for the event which is generated to - * signal success or failure of this operation - * @param master handle to the master controller who creates the association - * @param delegated_host_id id of the host to which requests should be delegated - * @param slave_host_id id of the host which is used to run the slave controller - * @param sxcfg serialized and compressed configuration - * @param sxcfg_size the size sxcfg - * @param scfg_size the size of uncompressed serialized configuration - * @param is_subordinate GNUNET_YES if the controller at delegated_host should - * be started by the slave controller; GNUNET_NO if the slave - * controller has to connect to the already started delegated - * controller via TCP/IP - * @return the operation handle + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. */ -struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_controller_link_2_ (void *op_cls, - struct GNUNET_TESTBED_Controller *master, - uint32_t delegated_host_id, - uint32_t slave_host_id, const char *sxcfg, - size_t sxcfg_size, size_t scfg_size, - int is_subordinate) +static int +opc_free_iterator (void *cls, uint32_t key, void *value) { - struct OperationContext *opc; - struct GNUNET_TESTBED_ControllerLinkMessage *msg; - struct ControllerLinkData *data; - uint16_t msg_size; + struct GNUNET_CONTAINER_MultiHashMap32 *map = cls; + struct OperationContext *opc = value; - msg_size = sxcfg_size + sizeof (struct GNUNET_TESTBED_ControllerLinkMessage); - msg = GNUNET_malloc (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS); - msg->header.size = htons (msg_size); - msg->delegated_host_id = htonl (delegated_host_id); - msg->slave_host_id = htonl (slave_host_id); - msg->config_size = htons ((uint16_t) scfg_size); - msg->is_subordinate = (GNUNET_YES == is_subordinate) ? 1 : 0; - memcpy (&msg[1], sxcfg, sxcfg_size); - data = GNUNET_malloc (sizeof (struct ControllerLinkData)); - data->msg = msg; - opc = GNUNET_malloc (sizeof (struct OperationContext)); - opc->c = master; - opc->data = data; - opc->type = OP_LINK_CONTROLLERS; - opc->id = GNUNET_TESTBED_get_next_op_id (opc->c); - opc->state = OPC_STATE_INIT; - opc->op_cls = op_cls; - msg->operation_id = GNUNET_htonll (opc->id); - opc->op = - GNUNET_TESTBED_operation_create_ (opc, &opstart_link_controllers, - &oprelease_link_controllers); - GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations, - opc->op); - GNUNET_TESTBED_operation_begin_wait_ (opc->op); - return opc->op; + GNUNET_assert (NULL != opc); + GNUNET_break (0); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (map, key, value)); + GNUNET_free (opc); + return GNUNET_YES; } /** - * Same as the GNUNET_TESTBED_controller_link, however expects configuration in - * serialized and compressed + * Stop the given controller (also will terminate all peers and + * controllers dependent on this controller). This function + * blocks until the testbed has been fully terminated (!). * - * @param op_cls the operation closure for the event which is generated to - * signal success or failure of this operation - * @param master handle to the master controller who creates the association - * @param delegated_host requests to which host should be delegated; cannot be NULL - * @param slave_host which host is used to run the slave controller; use NULL to - * make the master controller connect to the delegated host - * @param sxcfg serialized and compressed configuration - * @param sxcfg_size the size sxcfg - * @param scfg_size the size of uncompressed serialized configuration - * @param is_subordinate GNUNET_YES if the controller at delegated_host should - * be started by the slave controller; GNUNET_NO if the slave - * controller has to connect to the already started delegated - * controller via TCP/IP - * @return the operation handle + * @param c handle to controller to stop */ -struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_controller_link_2 (void *op_cls, - struct GNUNET_TESTBED_Controller *master, - struct GNUNET_TESTBED_Host *delegated_host, - struct GNUNET_TESTBED_Host *slave_host, - const char *sxcfg, size_t sxcfg_size, - size_t scfg_size, int is_subordinate) +void +GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller *c) { - uint32_t delegated_host_id; - uint32_t slave_host_id; - - GNUNET_assert (GNUNET_YES == - GNUNET_TESTBED_is_host_registered_ (delegated_host, master)); - delegated_host_id = GNUNET_TESTBED_host_get_id_ (delegated_host); - slave_host_id = - GNUNET_TESTBED_host_get_id_ ((NULL != - slave_host) ? slave_host : master->host); - if ((NULL != slave_host) && (0 != GNUNET_TESTBED_host_get_id_ (slave_host))) - GNUNET_assert (GNUNET_YES == - GNUNET_TESTBED_is_host_registered_ (slave_host, master)); - - return GNUNET_TESTBED_controller_link_2_ (op_cls, master, delegated_host_id, - slave_host_id, sxcfg, sxcfg_size, - scfg_size, is_subordinate); + if (NULL != c->mq) + { + GNUNET_MQ_destroy (c->mq); + c->mq = NULL; + } + if (NULL != c->host) + GNUNET_TESTBED_deregister_host_at_ (c->host, c); + GNUNET_CONFIGURATION_destroy (c->cfg); + GNUNET_TESTBED_operation_queue_destroy_ (c->opq_parallel_operations); + GNUNET_TESTBED_operation_queue_destroy_ + (c->opq_parallel_service_connections); + GNUNET_TESTBED_operation_queue_destroy_ + (c->opq_parallel_topology_config_operations); + if (NULL != c->opc_map) + { + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap32_iterate (c->opc_map, + &opc_free_iterator, + c->opc_map)); + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap32_size (c->opc_map)); + GNUNET_CONTAINER_multihashmap32_destroy (c->opc_map); + } + GNUNET_free (c); } @@ -1999,7 +1756,8 @@ GNUNET_TESTBED_controller_link_2 (void *op_cls, * @return the size of the xconfig */ size_t -GNUNET_TESTBED_compress_config_ (const char *config, size_t size, +GNUNET_TESTBED_compress_config_ (const char *config, + size_t size, char **xconfig) { size_t xsize; @@ -2015,50 +1773,29 @@ GNUNET_TESTBED_compress_config_ (const char *config, size_t size, /** - * Same as the GNUNET_TESTBED_controller_link, but with ids for delegated host - * and slave host + * Function to serialize and compress using zlib a configuration through a + * configuration handle * - * @param op_cls the operation closure for the event which is generated to - * signal success or failure of this operation - * @param master handle to the master controller who creates the association - * @param delegated_host_id id of the host to which requests should be - * delegated; cannot be NULL - * @param slave_host_id id of the host which should connect to controller - * running on delegated host ; use NULL to make the master controller - * connect to the delegated host - * @param slave_cfg configuration to use for the slave controller - * @param is_subordinate GNUNET_YES if the controller at delegated_host should - * be started by the slave controller; GNUNET_NO if the slave - * controller has to connect to the already started delegated - * controller via TCP/IP - * @return the operation handle + * @param cfg the configuration + * @param size the size of configuration when serialize. Will be set on success. + * @param xsize the sizeo of the compressed configuration. Will be set on success. + * @return the serialized and compressed configuration */ -struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_controller_link_ (void *op_cls, - struct GNUNET_TESTBED_Controller *master, - uint32_t delegated_host_id, - uint32_t slave_host_id, - const struct GNUNET_CONFIGURATION_Handle - *slave_cfg, int is_subordinate) +char * +GNUNET_TESTBED_compress_cfg_ (const struct GNUNET_CONFIGURATION_Handle *cfg, + size_t *size, size_t *xsize) { - struct GNUNET_TESTBED_Operation *op; char *config; - char *cconfig; - size_t cc_size; - size_t config_size; + char *xconfig; + size_t size_; + size_t xsize_; - config = GNUNET_CONFIGURATION_serialize (slave_cfg, &config_size); - cc_size = GNUNET_TESTBED_compress_config_ (config, config_size, &cconfig); + config = GNUNET_CONFIGURATION_serialize (cfg, &size_); + xsize_ = GNUNET_TESTBED_compress_config_ (config, size_, &xconfig); GNUNET_free (config); - /* Configuration doesn't fit in 1 message */ - GNUNET_assert ((UINT16_MAX - - sizeof (struct GNUNET_TESTBED_ControllerLinkMessage)) >= - cc_size); - op = GNUNET_TESTBED_controller_link_2_ (op_cls, master, delegated_host_id, - slave_host_id, (const char *) cconfig, - cc_size, config_size, is_subordinate); - GNUNET_free (cconfig); - return op; + *size = size_; + *xsize = xsize_; + return xconfig; } @@ -2084,7 +1821,6 @@ GNUNET_TESTBED_controller_link_ (void *op_cls, * @param delegated_host requests to which host should be delegated; cannot be NULL * @param slave_host which host is used to run the slave controller; use NULL to * make the master controller connect to the delegated host - * @param slave_cfg configuration to use for the slave controller * @param is_subordinate GNUNET_YES if the controller at delegated_host should * be started by the slave controller; GNUNET_NO if the slave * controller has to connect to the already started delegated @@ -2096,11 +1832,14 @@ GNUNET_TESTBED_controller_link (void *op_cls, struct GNUNET_TESTBED_Controller *master, struct GNUNET_TESTBED_Host *delegated_host, struct GNUNET_TESTBED_Host *slave_host, - const struct GNUNET_CONFIGURATION_Handle - *slave_cfg, int is_subordinate) + int is_subordinate) { + struct OperationContext *opc; + struct GNUNET_TESTBED_ControllerLinkRequest *msg; + struct ControllerLinkData *data; uint32_t slave_host_id; uint32_t delegated_host_id; + uint16_t msg_size; GNUNET_assert (GNUNET_YES == GNUNET_TESTBED_is_host_registered_ (delegated_host, master)); @@ -2111,10 +1850,31 @@ GNUNET_TESTBED_controller_link (void *op_cls, if ((NULL != slave_host) && (0 != slave_host_id)) GNUNET_assert (GNUNET_YES == GNUNET_TESTBED_is_host_registered_ (slave_host, master)); - return GNUNET_TESTBED_controller_link_ (op_cls, master, delegated_host_id, - slave_host_id, slave_cfg, - is_subordinate); - + msg_size = sizeof (struct GNUNET_TESTBED_ControllerLinkRequest); + msg = GNUNET_malloc (msg_size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS); + msg->header.size = htons (msg_size); + msg->delegated_host_id = htonl (delegated_host_id); + msg->slave_host_id = htonl (slave_host_id); + msg->is_subordinate = (GNUNET_YES == is_subordinate) ? 1 : 0; + data = GNUNET_new (struct ControllerLinkData); + data->msg = msg; + data->host_id = delegated_host_id; + opc = GNUNET_new (struct OperationContext); + opc->c = master; + opc->data = data; + opc->type = OP_LINK_CONTROLLERS; + opc->id = GNUNET_TESTBED_get_next_op_id (opc->c); + opc->state = OPC_STATE_INIT; + opc->op_cls = op_cls; + msg->operation_id = GNUNET_htonll (opc->id); + opc->op = + GNUNET_TESTBED_operation_create_ (opc, &opstart_link_controllers, + &oprelease_link_controllers); + GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations, + opc->op); + GNUNET_TESTBED_operation_begin_wait_ (opc->op); + return opc->op; } @@ -2138,9 +1898,9 @@ GNUNET_TESTBED_get_slave_config_ (void *op_cls, struct OperationContext *opc; struct GetSlaveConfigData *data; - data = GNUNET_malloc (sizeof (struct GetSlaveConfigData)); + data = GNUNET_new (struct GetSlaveConfigData); data->slave_id = slave_host_id; - opc = GNUNET_malloc (sizeof (struct OperationContext)); + opc = GNUNET_new (struct OperationContext); opc->state = OPC_STATE_INIT; opc->c = master; opc->id = GNUNET_TESTBED_get_next_op_id (master); @@ -2259,34 +2019,30 @@ GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip, /** - * Cancel a pending operation. Releases all resources - * of the operation and will ensure that no event - * is generated for the operation. Does NOT guarantee - * that the operation will be fully undone (or that + * This function is used to signal that the event information (struct + * GNUNET_TESTBED_EventInformation) from an operation has been fully processed + * i.e. if the event callback is ever called for this operation. If the event + * callback for this operation has not yet been called, calling this function + * cancels the operation, frees its resources and ensures the no event is + * generated with respect to this operation. Note that however cancelling an + * operation does NOT guarantee that the operation will be fully undone (or that * nothing ever happened). * - * @param operation operation to cancel - */ -void -GNUNET_TESTBED_operation_cancel (struct GNUNET_TESTBED_Operation *operation) -{ - GNUNET_TESTBED_operation_done (operation); -} - - -/** - * Signal that the information from an operation has been fully - * processed. This function MUST be called for each event - * of type 'operation_finished' to fully remove the operation - * from the operation queue. After calling this function, the - * 'op_result' becomes invalid (!). + * This function MUST be called for every operation to fully remove the + * operation from the operation queue. After calling this function, if + * operation is completed and its event information is of type + * GNUNET_TESTBED_ET_OPERATION_FINISHED, the 'op_result' becomes invalid (!). + + * If the operation is generated from GNUNET_TESTBED_service_connect() then + * calling this function on such as operation calls the disconnect adapter if + * the connect adapter was ever called. * - * @param operation operation to signal completion for + * @param operation operation to signal completion or cancellation */ void GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation) { - last_finished_operation = operation; + (void) exop_check (operation); GNUNET_TESTBED_operation_release_ (operation); } @@ -2294,11 +2050,16 @@ GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation) /** * Generates configuration by uncompressing configuration in given message. The * given message should be of the following types: - * GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG, - * GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG + * #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, + * #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, + * #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST, + * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS, + * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + * + * FIXME: This API is incredibly ugly. * * @param msg the message containing compressed configuration - * @return handle to the parsed configuration + * @return handle to the parsed configuration; NULL upon error while parsing the message */ struct GNUNET_CONFIGURATION_Handle * GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg) @@ -2312,7 +2073,7 @@ GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg) switch (ntohs (msg->type)) { - case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION: + case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION: { const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *imsg; @@ -2336,18 +2097,74 @@ GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg) sizeof (struct GNUNET_TESTBED_SlaveConfiguration); xdata = (const Bytef *) &imsg[1]; } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST: + { + const struct GNUNET_TESTBED_AddHostMessage *imsg; + uint16_t osize; + + imsg = (const struct GNUNET_TESTBED_AddHostMessage *) msg; + data_len = (uLong) ntohs (imsg->config_size); + osize = sizeof (struct GNUNET_TESTBED_AddHostMessage) + + ntohs (imsg->username_length) + ntohs (imsg->hostname_length); + xdata_len = ntohs (imsg->header.size) - osize; + xdata = (const Bytef *) ((const void *) imsg + osize); + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT: + { + const struct GNUNET_TESTBED_ControllerLinkResponse *imsg; + + imsg = (const struct GNUNET_TESTBED_ControllerLinkResponse *) msg; + data_len = ntohs (imsg->config_size); + xdata_len = ntohs (imsg->header.size) - + sizeof (const struct GNUNET_TESTBED_ControllerLinkResponse); + xdata = (const Bytef *) &imsg[1]; + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER: + { + const struct GNUNET_TESTBED_PeerCreateMessage *imsg; + + imsg = (const struct GNUNET_TESTBED_PeerCreateMessage *) msg; + data_len = ntohs (imsg->config_size); + xdata_len = ntohs (imsg->header.size) - + sizeof (struct GNUNET_TESTBED_PeerCreateMessage); + xdata = (const Bytef *) &imsg[1]; + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER: + { + const struct GNUNET_TESTBED_PeerReconfigureMessage *imsg; + + imsg = (const struct GNUNET_TESTBED_PeerReconfigureMessage *) msg; + data_len = ntohs (imsg->config_size); + xdata_len = ntohs (imsg->header.size) - + sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage); + xdata = (const Bytef *) &imsg[1]; + } break; default: GNUNET_assert (0); } data = GNUNET_malloc (data_len); if (Z_OK != (ret = uncompress (data, &data_len, xdata, xdata_len))) - GNUNET_assert (0); + { + GNUNET_free (data); + GNUNET_break_op (0); /* Un-compression failure */ + return NULL; + } cfg = GNUNET_CONFIGURATION_create (); - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_deserialize (cfg, (const char *) data, - (size_t) data_len, - GNUNET_NO)); + if (GNUNET_OK != + GNUNET_CONFIGURATION_deserialize (cfg, + (const char *) data, + (size_t) data_len, + NULL)) + { + GNUNET_free (data); + GNUNET_break_op (0); /* De-serialization failure */ + return NULL; + } GNUNET_free (data); return cfg; } @@ -2401,4 +2218,258 @@ GNUNET_TESTBED_get_next_op_id (struct GNUNET_TESTBED_Controller * controller) } +/** + * Function called when a shutdown peers operation is ready + * + * @param cls the closure from GNUNET_TESTBED_operation_create_() + */ +static void +opstart_shutdown_peers (void *cls) +{ + struct OperationContext *opc = cls; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_ShutdownPeersMessage *msg; + + opc->state = OPC_STATE_STARTED; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); + msg->operation_id = GNUNET_htonll (opc->id); + GNUNET_TESTBED_insert_opc_ (opc->c, + opc); + GNUNET_MQ_send (opc->c->mq, + env); +} + + +/** + * Callback which will be called when shutdown peers operation is released + * + * @param cls the closure from GNUNET_TESTBED_operation_create_() + */ +static void +oprelease_shutdown_peers (void *cls) +{ + struct OperationContext *opc = cls; + + switch (opc->state) + { + case OPC_STATE_STARTED: + GNUNET_TESTBED_remove_opc_ (opc->c, opc); + /* no break; continue */ + case OPC_STATE_INIT: + GNUNET_free (opc->data); + break; + case OPC_STATE_FINISHED: + break; + } + GNUNET_free (opc); +} + + +/** + * Stops and destroys all peers. Is equivalent of calling + * GNUNET_TESTBED_peer_stop() and GNUNET_TESTBED_peer_destroy() on all peers, + * except that the peer stop event and operation finished event corresponding to + * the respective functions are not generated. This function should be called + * when there are no other pending operations. If there are pending operations, + * it will return NULL + * + * @param c the controller to send this message to + * @param op_cls closure for the operation + * @param cb the callback to call when all peers are stopped and destroyed + * @param cb_cls the closure for the callback + * @return operation handle on success; NULL if any pending operations are + * present + */ +struct GNUNET_TESTBED_Operation * +GNUNET_TESTBED_shutdown_peers (struct GNUNET_TESTBED_Controller *c, + void *op_cls, + GNUNET_TESTBED_OperationCompletionCallback cb, + void *cb_cls) +{ + struct OperationContext *opc; + struct ShutdownPeersData *data; + + if (0 != GNUNET_CONTAINER_multihashmap32_size (c->opc_map)) + return NULL; + data = GNUNET_new (struct ShutdownPeersData); + data->cb = cb; + data->cb_cls = cb_cls; + opc = GNUNET_new (struct OperationContext); + opc->c = c; + opc->op_cls = op_cls; + opc->data = data; + opc->id = GNUNET_TESTBED_get_next_op_id (c); + opc->type = OP_SHUTDOWN_PEERS; + opc->state = OPC_STATE_INIT; + opc->op = GNUNET_TESTBED_operation_create_ (opc, &opstart_shutdown_peers, + &oprelease_shutdown_peers); + GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations, + opc->op); + GNUNET_TESTBED_operation_begin_wait_ (opc->op); + return opc->op; +} + + +/** + * Return the index of the peer inside of the total peer array, + * aka. the peer's "unique ID". + * + * @param peer Peer handle. + * + * @return The peer's unique ID. + */ +uint32_t +GNUNET_TESTBED_get_index (const struct GNUNET_TESTBED_Peer *peer) +{ + return peer->unique_id; +} + + +/** + * Remove a barrier and it was the last one in the barrier hash map, destroy the + * hash map + * + * @param barrier the barrier to remove + */ +void +GNUNET_TESTBED_barrier_remove_ (struct GNUNET_TESTBED_Barrier *barrier) +{ + struct GNUNET_TESTBED_Controller *c = barrier->c; + + GNUNET_assert (NULL != c->barrier_map); /* No barriers present */ + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (c->barrier_map, + &barrier->key, + barrier)); + GNUNET_free (barrier->name); + GNUNET_free (barrier); + if (0 == GNUNET_CONTAINER_multihashmap_size (c->barrier_map)) + { + GNUNET_CONTAINER_multihashmap_destroy (c->barrier_map); + c->barrier_map = NULL; + } +} + + +/** + * Initialise a barrier and call the given callback when the required percentage + * of peers (quorum) reach the barrier OR upon error. + * + * @param controller the handle to the controller + * @param name identification name of the barrier + * @param quorum the percentage of peers that is required to reach the barrier. + * Peers signal reaching a barrier by calling + * GNUNET_TESTBED_barrier_reached(). + * @param cb the callback to call when the barrier is reached or upon error. + * Cannot be NULL. + * @param cls closure for the above callback + * @param echo GNUNET_YES to echo the barrier crossed status message back to the + * controller + * @return barrier handle; NULL upon error + */ +struct GNUNET_TESTBED_Barrier * +GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, + const char *name, + unsigned int quorum, + GNUNET_TESTBED_barrier_status_cb cb, void *cls, + int echo) +{ + struct GNUNET_TESTBED_BarrierInit *msg; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_Barrier *barrier; + struct GNUNET_HashCode key; + size_t name_len; + + GNUNET_assert (quorum <= 100); + GNUNET_assert (NULL != cb); + name_len = strlen (name); + GNUNET_assert (0 < name_len); + GNUNET_CRYPTO_hash (name, name_len, &key); + if (NULL == controller->barrier_map) + controller->barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES); + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (controller->barrier_map, + &key)) + { + GNUNET_break (0); + return NULL; + } + LOG_DEBUG ("Initialising barrier `%s'\n", name); + barrier = GNUNET_new (struct GNUNET_TESTBED_Barrier); + barrier->c = controller; + barrier->name = GNUNET_strdup (name); + barrier->cb = cb; + barrier->cls = cls; + barrier->echo = echo; + GNUNET_memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (controller->barrier_map, + &barrier->key, + barrier, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + env = GNUNET_MQ_msg_extra (msg, + name_len, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); + msg->quorum = (uint8_t) quorum; + GNUNET_memcpy (msg->name, + barrier->name, + name_len); + GNUNET_MQ_send (barrier->c->mq, + env); + return barrier; +} + + +/** + * Initialise a barrier and call the given callback when the required percentage + * of peers (quorum) reach the barrier OR upon error. + * + * @param controller the handle to the controller + * @param name identification name of the barrier + * @param quorum the percentage of peers that is required to reach the barrier. + * Peers signal reaching a barrier by calling + * GNUNET_TESTBED_barrier_reached(). + * @param cb the callback to call when the barrier is reached or upon error. + * Cannot be NULL. + * @param cls closure for the above callback + * @return barrier handle; NULL upon error + */ +struct GNUNET_TESTBED_Barrier * +GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller, + const char *name, + unsigned int quorum, + GNUNET_TESTBED_barrier_status_cb cb, void *cls) +{ + return GNUNET_TESTBED_barrier_init_ (controller, + name, quorum, cb, cls, GNUNET_YES); +} + + +/** + * Cancel a barrier. + * + * @param barrier the barrier handle + */ +void +GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_BarrierCancel *msg; + size_t slen; + + slen = strlen (barrier->name); + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); + GNUNET_memcpy (msg->name, + barrier->name, + slen); + GNUNET_MQ_send (barrier->c->mq, + env); + GNUNET_TESTBED_barrier_remove_ (barrier); +} + + /* end of testbed_api.c */