X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api.c;h=a643aacdd4ab34ad9565a83ef2d239ba652741f9;hb=4e2504a967ba09643c6dd7e3b9ce400e30adcb3d;hp=eb98bea4630c502e776b91df16373813da341628;hpb=01e21ba6afe2d16ba45b5a73f2777152fd27ffc3;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index eb98bea46..a643aacdd 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -1,21 +1,19 @@ /* This file is part of GNUnet - (C) 2008--2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2008--2013 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . */ /** @@ -26,8 +24,6 @@ * @author Christian Grothoff * @author Sree Harsha Totakura */ - - #include "platform.h" #include "gnunet_testbed_service.h" #include "gnunet_core_service.h" @@ -41,18 +37,19 @@ #include "testbed_api_hosts.h" #include "testbed_api_peers.h" #include "testbed_api_operations.h" +#include "testbed_api_sd.h" /** * Generic logging shorthand */ #define LOG(kind, ...) \ - GNUNET_log_from (kind, "testbed-api", __VA_ARGS__); + GNUNET_log_from (kind, "testbed-api", __VA_ARGS__) /** * Debug logging */ #define LOG_DEBUG(...) \ - LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__); + LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) /** * Relative time seconds shorthand @@ -68,421 +65,352 @@ /** - * Handle for controller process + * Context data for forwarded Operation */ -struct GNUNET_TESTBED_ControllerProc +struct ForwardedOperationData { - /** - * The process handle - */ - struct GNUNET_HELPER_Handle *helper; - - /** - * The arguments used to start the helper - */ - char **helper_argv; - - /** - * The host where the helper is run - */ - struct GNUNET_TESTBED_Host *host; /** - * The controller error callback + * The callback to call when reply is available */ - GNUNET_TESTBED_ControllerStatusCallback cb; + GNUNET_MQ_MessageCallback cc; /** * The closure for the above callback */ - void *cls; - - /** - * The send handle for the helper - */ - struct GNUNET_HELPER_SendHandle *shandle; - - /** - * The message corresponding to send handle - */ - struct GNUNET_MessageHeader *msg; - - /** - * The configuration of the running testbed service - */ - struct GNUNET_CONFIGURATION_Handle *cfg; + void *cc_cls; }; /** - * The message queue for sending messages to the controller service + * Context data for get slave config operations */ -struct MessageQueue +struct GetSlaveConfigData { /** - * The message to be sent - */ - struct GNUNET_MessageHeader *msg; - - /** - * next pointer for DLL + * The id of the slave controller */ - struct MessageQueue *next; + uint32_t slave_id; - /** - * prev pointer for DLL - */ - struct MessageQueue *prev; }; /** - * Structure for a controller link + * Context data for controller link operations */ -struct ControllerLink +struct ControllerLinkData { /** - * The next ptr for DLL + * The controller link message */ - struct ControllerLink *next; + struct GNUNET_TESTBED_ControllerLinkRequest *msg; /** - * The prev ptr for DLL + * The id of the host which is hosting the controller to be linked */ - struct ControllerLink *prev; + uint32_t host_id; - /** - * The host which will be referred in the peer start request. This is the - * host where the peer should be started - */ - struct GNUNET_TESTBED_Host *delegated_host; +}; - /** - * The host which will contacted to delegate the peer start request - */ - struct GNUNET_TESTBED_Host *slave_host; +/** + * Date context for OP_SHUTDOWN_PEERS operations + */ +struct ShutdownPeersData +{ /** - * The configuration to be used to connect to slave host + * The operation completion callback to call */ - const struct GNUNET_CONFIGURATION_Handle *slave_cfg; + GNUNET_TESTBED_OperationCompletionCallback cb; /** - * GNUNET_YES if the slave should be started (and stopped) by us; GNUNET_NO - * if we are just allowed to use the slave via TCP/IP + * The closure for the above callback */ - int is_subordinate; + void *cb_cls; }; /** - * handle for host registration + * An entry in the stack for keeping operations which are about to expire */ -struct GNUNET_TESTBED_HostRegistrationHandle +struct ExpireOperationEntry { /** - * The host being registered - */ - struct GNUNET_TESTBED_Host *host; - - /** - * The controller at which this host is being registered + * DLL head; new entries are to be inserted here */ - struct GNUNET_TESTBED_Controller *c; + struct ExpireOperationEntry *next; /** - * The Registartion completion callback + * DLL tail; entries are deleted from here */ - GNUNET_TESTBED_HostRegistrationCompletion cc; + struct ExpireOperationEntry *prev; /** - * The closure for above callback + * The operation. This will be a dangling pointer when the operation is freed */ - void *cc_cls; + const struct GNUNET_TESTBED_Operation *op; }; /** - * Context data for forwarded Operation + * DLL head for list of operations marked for expiry */ -struct ForwardedOperationData -{ - - /** - * The callback to call when reply is available - */ - GNUNET_CLIENT_MessageHandler cc; - - /** - * The closure for the above callback - */ - void *cc_cls; +static struct ExpireOperationEntry *exop_head; -}; +/** + * DLL tail for list of operation marked for expiry + */ +static struct ExpireOperationEntry *exop_tail; /** - * Context data for get slave config operations + * Inserts an operation into the list of operations marked for expiry + * + * @param op the operation to insert */ -struct GetSlaveConfigData +static void +exop_insert (struct GNUNET_TESTBED_Operation *op) { - /** - * The id of the slave controller - */ - uint32_t slave_id; + struct ExpireOperationEntry *entry; -}; + entry = GNUNET_new (struct ExpireOperationEntry); + entry->op = op; + GNUNET_CONTAINER_DLL_insert_tail (exop_head, exop_tail, entry); +} /** - * Context data for controller link operations + * Checks if an operation is present in the list of operations marked for + * expiry. If the operation is found, it and the tail of operations after it + * are removed from the list. + * + * @param op the operation to check + * @return GNUNET_NO if the operation is not present in the list; GNUNET_YES if + * the operation is found in the list (the operation is then removed + * from the list -- calling this function again with the same + * paramenter will return GNUNET_NO) */ -struct ControllerLinkData +static int +exop_check (const struct GNUNET_TESTBED_Operation *const op) { - /** - * The controller link message - */ - struct GNUNET_TESTBED_ControllerLinkMessage *msg; + struct ExpireOperationEntry *entry; + struct ExpireOperationEntry *entry2; + int found; -}; + found = GNUNET_NO; + entry = exop_head; + while (NULL != entry) + { + if (op == entry->op) + { + found = GNUNET_YES; + break; + } + entry = entry->next; + } + if (GNUNET_NO == found) + return GNUNET_NO; + /* Truncate the tail */ + while (NULL != entry) + { + entry2 = entry->next; + GNUNET_CONTAINER_DLL_remove (exop_head, + exop_tail, + entry); + GNUNET_free (entry); + entry = entry2; + } + return GNUNET_YES; +} -struct SDEntry +/** + * Context information to be used while searching for operation contexts + */ +struct SearchContext { /** - * DLL next pointer + * The result of the search */ - struct SDEntry *next; + struct OperationContext *opc; /** - * DLL prev pointer - */ - struct SDEntry *prev; - - /** - * The value to store + * The id of the operation context we are searching for */ - unsigned int amount; + uint64_t id; }; -struct SDHandle +/** + * Search iterator for searching an operation context + * + * @param cls the serach context + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +opc_search_iterator (void *cls, + uint32_t key, + void *value) { - /** - * DLL head for storing entries - */ - struct SDEntry *head; - - /** - * DLL tail for storing entries - */ - struct SDEntry *tail; - - /** - * Squared sum of data values - */ - unsigned long long sqsum; - - /** - * Sum of the data values - */ - unsigned long sum; - - /** - * The average of data amounts - */ - float avg; - - /** - * The variance - */ - double vr; + struct SearchContext *sc = cls; + struct OperationContext *opc = value; - /** - * Number of data values; also the length of DLL containing SDEntries - */ - unsigned int cnt; - - /** - * max number of entries we can have in the DLL - */ - unsigned int max_cnt; -}; + GNUNET_assert (NULL != opc); + GNUNET_assert (NULL == sc->opc); + if (opc->id != sc->id) + return GNUNET_YES; + sc->opc = opc; + return GNUNET_NO; +} /** - * FIXME: doc + * Returns the operation context with the given id if found in the Operation + * context queues of the controller * - * @param - * @return + * @param c the controller whose operation context map is searched + * @param id the id which has to be checked + * @return the matching operation context; NULL if no match found */ -static struct SDHandle * -SD_init (unsigned int max_cnt) +static struct OperationContext * +find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id) { - struct SDHandle *h; - - GNUNET_assert (1 < max_cnt); - h = GNUNET_malloc (sizeof (struct SDHandle)); - h->max_cnt = max_cnt; - return h; + struct SearchContext sc; + + sc.id = id; + sc.opc = NULL; + GNUNET_assert (NULL != c->opc_map); + if (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap32_get_multiple (c->opc_map, (uint32_t) id, + &opc_search_iterator, &sc)) + return NULL; + return sc.opc; } /** - * FIXME: doc + * Inserts the given operation context into the operation context map of the + * given controller. Creates the operation context map if one does not exist + * for the controller * - * @param - * @return + * @param c the controller + * @param opc the operation context to be inserted */ -static void -SD_destroy (struct SDHandle *h) -{ - struct SDEntry *entry; - - while (NULL != (entry = h->head)) - { - GNUNET_CONTAINER_DLL_remove (h->head, h->tail, entry); - GNUNET_free (entry); - } - GNUNET_free (h); -} - -static void -SD_add_data (struct SDHandle *h, unsigned int amount) +void +GNUNET_TESTBED_insert_opc_ (struct GNUNET_TESTBED_Controller *c, + struct OperationContext *opc) { - struct SDEntry *entry; - double sqavg; - double sqsum_avg; - - entry = NULL; - if (h->cnt == h->max_cnt) - { - entry = h->head; - GNUNET_CONTAINER_DLL_remove (h->head, h->tail, entry); - h->sum -= entry->amount; - h->sqsum -= ((unsigned long) entry->amount) * - ((unsigned long) entry->amount); - h->cnt--; - } - GNUNET_assert (h->cnt < h->max_cnt); - if (NULL == entry) - entry = GNUNET_malloc (sizeof (struct SDEntry)); - entry->amount = amount; - GNUNET_CONTAINER_DLL_insert_tail (h->head, h->tail, entry); - h->sum += amount; - h->cnt++; - h->avg = ((float) h->sum) / ((float) h->cnt); - h->sqsum += ((unsigned long) amount) * ((unsigned long) amount); - sqsum_avg = ((double) h->sqsum) / ((double) h->cnt); - sqavg = ((double) h->avg) * ((double) h->avg); - h->vr = sqsum_avg - sqavg; + if (NULL == c->opc_map) + c->opc_map = GNUNET_CONTAINER_multihashmap32_create (256); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (c->opc_map, + (uint32_t) opc->id, opc, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); } /** - * Returns the factor by which the given amount differs from the standard deviation + * Removes the given operation context from the operation context map of the + * given controller * - * @param h the SDhandle - * @param amount the value for which the deviation is returned - * @return the deviation from the average; GNUNET_SYSERR if the deviation cannot - * be calculated; a maximum of 4 is returned for deviations equal to - * or larger than 4 + * @param c the controller + * @param opc the operation context to remove */ -static int -SD_deviation_factor (struct SDHandle *h, unsigned int amount) +void +GNUNET_TESTBED_remove_opc_ (const struct GNUNET_TESTBED_Controller *c, + struct OperationContext *opc) { - double diff; - unsigned int n; - - if (h->cnt < 2) - return GNUNET_SYSERR; - if (((float) amount) > h->avg) - diff = ((float) amount) - h->avg; - else - return 0; //diff = h->avg - ((float) amount); - diff *= diff; - for (n = 1; n < 4; n++) - if (diff < (((double) (n * n)) * h->vr)) - break; - return n; + GNUNET_assert (NULL != c->opc_map); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (c->opc_map, + (uint32_t) opc->id, + opc)); + if ( (0 == GNUNET_CONTAINER_multihashmap32_size (c->opc_map)) + && (NULL != c->opcq_empty_cb) ) + c->opcq_empty_cb (c->opcq_empty_cls); } + /** - * Returns the operation context with the given id if found in the Operation - * context queues of the controller + * Check #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message is well-formed. * - * @param c the controller whose queues are searched - * @param id the id which has to be checked - * @return the matching operation context; NULL if no match found + * @param cls the controller handler + * @param msg message received + * @return #GNUNET_OK if message is well-formed */ -static struct OperationContext * -find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id) +static int +check_add_host_confirm (void *cls, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) { - struct OperationContext *opc; + const char *emsg; + uint16_t msg_size; - for (opc = c->ocq_head; NULL != opc; opc = opc->next) + msg_size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == msg_size) + return GNUNET_OK; + /* We have an error message */ + emsg = (const char *) &msg[1]; + if ('\0' != emsg[msg_size - 1]) { - if (id == opc->id) - return opc; + GNUNET_break (0); + return GNUNET_SYSERR; } - return NULL; + return GNUNET_OK; } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from * controller (testbed service) * - * @param c the controller handler + * @param cls the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_addhostconfirm (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_HostConfirmedMessage *msg) +static void +handle_add_host_confirm (void *cls, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) { - struct GNUNET_TESTBED_HostRegistrationHandle *rh; - char *emsg; + struct GNUNET_TESTBED_Controller *c = cls; + struct GNUNET_TESTBED_HostRegistrationHandle *rh = c->rh; + const char *emsg; uint16_t msg_size; - rh = c->rh; if (NULL == rh) - { - return GNUNET_OK; - } + return; if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id)) { LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n", - GNUNET_TESTBED_host_get_id_ (rh->host), ntohl (msg->host_id)); - return GNUNET_OK; + GNUNET_TESTBED_host_get_id_ (rh->host), + ntohl (msg->host_id)); + return; } c->rh = NULL; - msg_size = ntohs (msg->header.size); - if (sizeof (struct GNUNET_TESTBED_HostConfirmedMessage) == msg_size) + msg_size = ntohs (msg->header.size) - sizeof (*msg); + if (0 == msg_size) { - LOG_DEBUG ("Host %u successfully registered\n", ntohl (msg->host_id)); - GNUNET_TESTBED_mark_host_registered_at_ (rh->host, c); - rh->cc (rh->cc_cls, NULL); + LOG_DEBUG ("Host %u successfully registered\n", + ntohl (msg->host_id)); + GNUNET_TESTBED_mark_host_registered_at_ (rh->host, + c); + rh->cc (rh->cc_cls, + NULL); GNUNET_free (rh); - return GNUNET_OK; + return; } /* We have an error message */ - emsg = (char *) &msg[1]; - if ('\0' != - emsg[msg_size - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)]) - { - GNUNET_break (0); - GNUNET_free (rh); - return GNUNET_NO; - } - LOG (GNUNET_ERROR_TYPE_ERROR, _("Adding host %u failed with error: %s\n"), - ntohl (msg->host_id), emsg); - rh->cc (rh->cc_cls, emsg); + emsg = (const char *) &msg[1]; + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Adding host %u failed with error: %s\n"), + ntohl (msg->host_id), + emsg); + rh->cc (rh->cc_cls, + emsg); GNUNET_free (rh); - return GNUNET_OK; } @@ -494,36 +422,37 @@ handle_addhostconfirm (struct GNUNET_TESTBED_Controller *c, * @param msg the message */ static void -handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c, - struct OperationContext *opc, - const struct GNUNET_MessageHeader *msg) +handle_forwarded_operation_msg (void *cls, + struct OperationContext *opc, + const struct GNUNET_MessageHeader *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct ForwardedOperationData *fo_data; fo_data = opc->data; if (NULL != fo_data->cc) fo_data->cc (fo_data->cc_cls, msg); - GNUNET_CONTAINER_DLL_remove (c->ocq_head, c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (c, opc); GNUNET_free (fo_data); GNUNET_free (opc); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_opsuccess (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) +static void +handle_opsuccess (void *cls, + const struct GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; + GNUNET_TESTBED_OperationCompletionCallback op_comp_cb; + void *op_comp_cb_cls; struct GNUNET_TESTBED_EventInformation event; uint64_t op_id; @@ -532,77 +461,100 @@ handle_opsuccess (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; - event.details.operation_finished.operation = opc->op; - event.details.operation_finished.op_cls = opc->op_cls; + event.op = opc->op; + event.op_cls = opc->op_cls; event.details.operation_finished.emsg = NULL; event.details.operation_finished.generic = NULL; + op_comp_cb = NULL; + op_comp_cb_cls = NULL; switch (opc->type) { case OP_FORWARDED: { - handle_forwarded_operation_msg - (c, opc, (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return; } break; case OP_PEER_DESTROY: - { - struct GNUNET_TESTBED_Peer *peer; - - peer = opc->data; - GNUNET_free (peer); - opc->data = NULL; - //PEERDESTROYDATA - } + { + struct GNUNET_TESTBED_Peer *peer; + + peer = opc->data; + GNUNET_TESTBED_peer_deregister_ (peer); + GNUNET_free (peer); + opc->data = NULL; + //PEERDESTROYDATA + } break; - case OP_LINK_CONTROLLERS: - { - struct ControllerLinkData *data; - - data = opc->data; - GNUNET_assert (NULL != data); - GNUNET_free (data); - opc->data = NULL; - } + case OP_SHUTDOWN_PEERS: + { + struct ShutdownPeersData *data; + + data = opc->data; + op_comp_cb = data->cb; + op_comp_cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + GNUNET_TESTBED_cleanup_peers_ (); + } + break; + case OP_MANAGE_SERVICE: + { + struct ManageServiceData *data; + + GNUNET_assert (NULL != (data = opc->data)); + op_comp_cb = data->cb; + op_comp_cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + } + break; + case OP_PEER_RECONFIGURE: break; default: GNUNET_assert (0); - } - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + } + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; + exop_insert (event.op); if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED))) { if (NULL != c->cc) c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } else LOG_DEBUG ("Not calling callback\n"); - return GNUNET_YES; + if (NULL != op_comp_cb) + op_comp_cb (op_comp_cb_cls, event.op, NULL); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS message from * controller (testbed service) * * @param c the controller handle * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) +static void +handle_peer_create_success (void *cls, + const struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct PeerCreateData *data; struct GNUNET_TESTBED_Peer *peer; + struct GNUNET_TESTBED_Operation *op; GNUNET_TESTBED_PeerCreateCallback cb; - void *cls; + void *cb_cls; uint64_t op_id; GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) == @@ -611,13 +563,13 @@ handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + (const struct GNUNET_MessageHeader *) msg); + return; } GNUNET_assert (OP_PEER_CREATE == opc->type); GNUNET_assert (NULL != opc->data); @@ -625,31 +577,34 @@ handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, GNUNET_assert (NULL != data->peer); peer = data->peer; GNUNET_assert (peer->unique_id == ntohl (msg->peer_id)); - peer->state = PS_CREATED; + peer->state = TESTBED_PS_CREATED; + GNUNET_TESTBED_peer_register_ (peer); cb = data->cb; - cls = data->cls; + cb_cls = data->cls; + op = opc->op; GNUNET_free (opc->data); - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; + exop_insert (op); if (NULL != cb) - cb (cls, peer, NULL); - return GNUNET_YES; + cb (cb_cls, peer, NULL); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_event (struct GNUNET_TESTBED_Controller *c, +static void +handle_peer_event (void *cls, const struct GNUNET_TESTBED_PeerEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct GNUNET_TESTBED_Peer *peer; struct PeerEventData *data; @@ -657,6 +612,7 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, void *pcc_cls; struct GNUNET_TESTBED_EventInformation event; uint64_t op_id; + uint64_t mask; GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerEventMessage) == ntohs (msg->header.size)); @@ -664,13 +620,13 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + (const struct GNUNET_MessageHeader *) msg); + return; } GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type)); data = opc->data; @@ -678,15 +634,17 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, peer = data->peer; GNUNET_assert (NULL != peer); event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type); + event.op = opc->op; + event.op_cls = opc->op_cls; switch (event.type) { case GNUNET_TESTBED_ET_PEER_START: - peer->state = PS_STARTED; + peer->state = TESTBED_PS_STARTED; event.details.peer_start.host = peer->host; event.details.peer_start.peer = peer; break; case GNUNET_TESTBED_ET_PEER_STOP: - peer->state = PS_STOPPED; + peer->state = TESTBED_PS_STOPPED; event.details.peer_stop.peer = peer; break; default: @@ -695,59 +653,64 @@ handle_peer_event (struct GNUNET_TESTBED_Controller *c, pcc = data->pcc; pcc_cls = data->pcc_cls; GNUNET_free (data); - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; - if (0 != - ((GNUNET_TESTBED_ET_PEER_START | GNUNET_TESTBED_ET_PEER_STOP) & - c->event_mask)) + exop_insert (event.op); + mask = 1LL << GNUNET_TESTBED_ET_PEER_START; + mask |= 1LL << GNUNET_TESTBED_ET_PEER_STOP; + if (0 != (mask & c->event_mask)) { if (NULL != c->cc) c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } if (NULL != pcc) pcc (pcc_cls, NULL); - return GNUNET_YES; + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ -static int -handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, +static void +handle_peer_conevent (void *cls, const struct GNUNET_TESTBED_ConnectionEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; struct OverlayConnectData *data; GNUNET_TESTBED_OperationCompletionCallback cb; void *cb_cls; struct GNUNET_TESTBED_EventInformation event; uint64_t op_id; + uint64_t mask; op_id = GNUNET_ntohll (msg->operation_id); if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + (const struct GNUNET_MessageHeader *) msg); + return; } GNUNET_assert (OP_OVERLAY_CONNECT == opc->type); - data = opc->data; - GNUNET_assert (NULL != data); + GNUNET_assert (NULL != (data = opc->data)); GNUNET_assert ((ntohl (msg->peer1) == data->p1->unique_id) && (ntohl (msg->peer2) == data->p2->unique_id)); event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type); + event.op = opc->op; + event.op_cls = opc->op_cls; switch (event.type) { case GNUNET_TESTBED_ET_CONNECT: @@ -763,40 +726,57 @@ handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, } cb = data->cb; cb_cls = data->cb_cls; - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; - //GNUNET_free (data); - if (0 != - ((GNUNET_TESTBED_ET_CONNECT | GNUNET_TESTBED_ET_DISCONNECT) & - c->event_mask)) + exop_insert (event.op); + mask = 1LL << GNUNET_TESTBED_ET_CONNECT; + mask |= 1LL << GNUNET_TESTBED_ET_DISCONNECT; + if (0 != (mask & c->event_mask)) { if (NULL != c->cc) c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } if (NULL != cb) cb (cb_cls, opc->op, NULL); - return GNUNET_YES; + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ static int -handle_peer_config (struct GNUNET_TESTBED_Controller *c, - const struct - GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +check_peer_config (void *cls, + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) { - struct OperationContext *opc; - struct GNUNET_TESTBED_Peer *peer; - struct PeerInfoData *data; - struct GNUNET_TESTBED_PeerInformation *pinfo; + /* anything goes? */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_peer_config (void *cls, + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +{ + struct GNUNET_TESTBED_Controller *c = cls; + struct OperationContext *opc; + struct GNUNET_TESTBED_Peer *peer; + struct PeerInfoData *data; + struct GNUNET_TESTBED_PeerInformation *pinfo; GNUNET_TESTBED_PeerInfoCallback cb; void *cb_cls; uint64_t op_id; @@ -805,81 +785,101 @@ handle_peer_config (struct GNUNET_TESTBED_Controller *c, if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { - handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + handle_forwarded_operation_msg (c, + opc, + &msg->header); + return; } data = opc->data; GNUNET_assert (NULL != data); peer = data->peer; GNUNET_assert (NULL != peer); GNUNET_assert (ntohl (msg->peer_id) == peer->unique_id); - pinfo = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerInformation)); + pinfo = GNUNET_new (struct GNUNET_TESTBED_PeerInformation); pinfo->pit = data->pit; cb = data->cb; cb_cls = data->cb_cls; + GNUNET_assert (NULL != cb); GNUNET_free (data); opc->data = NULL; switch (pinfo->pit) { case GNUNET_TESTBED_PIT_IDENTITY: - pinfo->result.id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); - (void) memcpy (pinfo->result.id, &msg->peer_identity, - sizeof (struct GNUNET_PeerIdentity)); + pinfo->result.id = GNUNET_new (struct GNUNET_PeerIdentity); + GNUNET_memcpy (pinfo->result.id, + &msg->peer_identity, + sizeof (struct GNUNET_PeerIdentity)); break; case GNUNET_TESTBED_PIT_CONFIGURATION: - pinfo->result.cfg = /* Freed in oprelease_peer_getinfo */ - GNUNET_TESTBED_extract_config_ (&msg->header); + pinfo->result.cfg = /* Freed in oprelease_peer_getinfo */ + GNUNET_TESTBED_extract_config_ (&msg->header); break; case GNUNET_TESTBED_PIT_GENERIC: GNUNET_assert (0); /* never reach here */ break; } opc->data = pinfo; - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; - if (NULL != cb) - cb (cb_cls, opc->op, pinfo, NULL); - return GNUNET_YES; + cb (cb_cls, opc->op, pinfo, NULL); + /* We dont check whether the operation is marked as done here as the + operation contains data (cfg/identify) which will be freed at a later point + */ } /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not + * @return #GNUNET_OK if message is well-formed */ static int -handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_OperationFailureEventMessage - *msg) +check_op_fail_event (void *cls, + const struct GNUNET_TESTBED_OperationFailureEventMessage *msg) +{ + /* we accept anything as a valid error message */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_op_fail_event (void *cls, + const struct GNUNET_TESTBED_OperationFailureEventMessage *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; const char *emsg; uint64_t op_id; + uint64_t mask; struct GNUNET_TESTBED_EventInformation event; op_id = GNUNET_ntohll (msg->operation_id); if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_FORWARDED == opc->type) { handle_forwarded_operation_msg (c, opc, - (const struct GNUNET_MessageHeader *) msg); - return GNUNET_YES; + (const struct GNUNET_MessageHeader *) msg); + return; } - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); opc->state = OPC_STATE_FINISHED; emsg = GNUNET_TESTBED_parse_error_string_ (msg); if (NULL == emsg) @@ -887,38 +887,44 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, if (OP_PEER_INFO == opc->type) { struct PeerInfoData *data; + data = opc->data; if (NULL != data->cb) data->cb (data->cb_cls, opc->op, NULL, emsg); GNUNET_free (data); - return GNUNET_YES; /* We do not call controller callback for peer info */ + return; /* We do not call controller callback for peer info */ } - if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) && - (NULL != c->cc)) + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.op = opc->op; + event.op_cls = opc->op_cls; + event.details.operation_finished.emsg = emsg; + event.details.operation_finished.generic = NULL; + mask = (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED); + if ((0 != (mask & c->event_mask)) && (NULL != c->cc)) { - event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; - event.details.operation_finished.operation = opc->op; - event.details.operation_finished.op_cls = opc->op_cls; - event.details.operation_finished.emsg = emsg; - event.details.operation_finished.generic = NULL; + exop_insert (event.op); c->cc (c->cc_cls, &event); + if (GNUNET_NO == exop_check (event.op)) + return; } switch (opc->type) { case OP_PEER_CREATE: { - struct PeerCreateData *data; + struct PeerCreateData *data; + data = opc->data; GNUNET_free (data->peer); if (NULL != data->cb) data->cb (data->cls, NULL, emsg); - GNUNET_free (data); + GNUNET_free (data); } break; case OP_PEER_START: case OP_PEER_STOP: { struct PeerEventData *data; + data = opc->data; if (NULL != data->pcc) data->pcc (data->pcc_cls, emsg); @@ -932,20 +938,47 @@ handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, case OP_OVERLAY_CONNECT: { struct OverlayConnectData *data; + data = opc->data; + GNUNET_TESTBED_operation_mark_failed (opc->op); if (NULL != data->cb) data->cb (data->cb_cls, opc->op, emsg); - GNUNET_free (data); } break; case OP_FORWARDED: GNUNET_assert (0); - case OP_LINK_CONTROLLERS: /* No secondary callback */ + case OP_LINK_CONTROLLERS: /* No secondary callback */ + break; + case OP_SHUTDOWN_PEERS: + { + struct ShutdownPeersData *data; + + data = opc->data; + GNUNET_free (data); /* FIXME: Decide whether we call data->op_cb */ + opc->data = NULL; + } + break; + case OP_MANAGE_SERVICE: + { + struct ManageServiceData *data = opc->data; + GNUNET_TESTBED_OperationCompletionCallback cb; + void *cb_cls; + + GNUNET_assert (NULL != data); + cb = data->cb; + cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + exop_insert (event.op); + if (NULL != cb) + cb (cb_cls, opc->op, emsg); + /* You could have marked the operation as done by now */ + GNUNET_break (GNUNET_NO == exop_check (event.op)); + } break; default: GNUNET_break (0); - } - return GNUNET_YES; + } } @@ -961,221 +994,328 @@ GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id) { struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg; uint16_t msize; - + msize = sizeof (struct GNUNET_TESTBED_SlaveGetConfigurationMessage); msg = GNUNET_malloc (msize); msg->header.size = htons (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_GETSLAVECONFIG); + msg->header.type = + htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_SLAVE_CONFIGURATION); msg->operation_id = GNUNET_htonll (op_id); msg->slave_id = htonl (slave_id); return msg; } + /** - * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller - * (testbed service) + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_INFORMATION message from + * controller (testbed service) * * @param c the controller handler * @param msg message received - * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if - * not */ static int -handle_slave_config (struct GNUNET_TESTBED_Controller *c, - const struct GNUNET_TESTBED_SlaveConfiguration * msg) +check_slave_config (void *cls, + const struct GNUNET_TESTBED_SlaveConfiguration *msg) +{ + /* anything goes? */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received + */ +static void +handle_slave_config (void *cls, + const struct GNUNET_TESTBED_SlaveConfiguration *msg) { + struct GNUNET_TESTBED_Controller *c = cls; struct OperationContext *opc; uint64_t op_id; - struct GNUNET_TESTBED_EventInformation event; + uint64_t mask; + struct GNUNET_TESTBED_EventInformation event; op_id = GNUNET_ntohll (msg->operation_id); if (NULL == (opc = find_opc (c, op_id))) { LOG_DEBUG ("Operation not found\n"); - return GNUNET_YES; + return; } if (OP_GET_SLAVE_CONFIG != opc->type) { GNUNET_break (0); - return GNUNET_YES; + return; } - GNUNET_free (opc->data); - opc->data = NULL; opc->state = OPC_STATE_FINISHED; - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); - if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) && + GNUNET_TESTBED_remove_opc_ (opc->c, opc); + mask = 1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED; + if ((0 != (mask & c->event_mask)) && (NULL != c->cc)) { opc->data = GNUNET_TESTBED_extract_config_ (&msg->header); - event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.op = opc->op; + event.op_cls = opc->op_cls; event.details.operation_finished.generic = opc->data; - event.details.operation_finished.operation = opc->op; - event.details.operation_finished.op_cls = opc->op_cls; event.details.operation_finished.emsg = NULL; c->cc (c->cc_cls, &event); } - return GNUNET_YES; } /** - * Handler for messages from controller (testbed service) + * Check #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller + * (testbed service) * - * @param cls the controller handler - * @param msg message received, NULL on timeout or fatal error + * @param c the controller handler + * @param msg message received + * @return #GNUNET_OK if @a msg is well-formed + */ +static int +check_link_controllers_result (void *cls, + const struct GNUNET_TESTBED_ControllerLinkResponse *msg) +{ + /* actual check to be implemented */ + return GNUNET_OK; +} + + +/** + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received */ static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +handle_link_controllers_result (void *cls, + const struct GNUNET_TESTBED_ControllerLinkResponse *msg) { struct GNUNET_TESTBED_Controller *c = cls; - int status; - uint16_t msize; + struct OperationContext *opc; + struct ControllerLinkData *data; + struct GNUNET_CONFIGURATION_Handle *cfg; + struct GNUNET_TESTBED_Host *host; + char *emsg; + uint64_t op_id; + struct GNUNET_TESTBED_EventInformation event; - c->in_receive = GNUNET_NO; - /* FIXME: Add checks for message integrity */ - if (NULL == msg) + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) { - LOG_DEBUG ("Receive timed out or connection to service dropped\n"); + LOG_DEBUG ("Operation not found\n"); return; } - status = GNUNET_OK; - msize = ntohs (msg->size); - switch (ntohs (msg->type)) + if (OP_FORWARDED == opc->type) { - case GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM: - GNUNET_assert (msize >= - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)); - status = - handle_addhostconfirm (c, - (const struct GNUNET_TESTBED_HostConfirmedMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_GENERICOPSUCCESS: - GNUNET_assert (msize == - sizeof (struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage)); - status = - handle_opsuccess (c, - (const struct - GNUNET_TESTBED_GenericOperationSuccessEventMessage *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS: - GNUNET_assert (msize == - sizeof (struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage)); - status = - handle_peer_create_success (c, - (const struct - GNUNET_TESTBED_PeerCreateSuccessEventMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT: - GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage)); - status = - handle_peer_event (c, - (const struct GNUNET_TESTBED_PeerEventMessage *) - msg); + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return; + } + if (OP_LINK_CONTROLLERS != opc->type) + { + GNUNET_break (0); + return; + } + GNUNET_assert (NULL != (data = opc->data)); + host = GNUNET_TESTBED_host_lookup_by_id_ (data->host_id); + GNUNET_assert (NULL != host); + GNUNET_free (data); + opc->data = NULL; + opc->state = OPC_STATE_FINISHED; + GNUNET_TESTBED_remove_opc_ (opc->c, opc); + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.op = opc->op; + event.op_cls = opc->op_cls; + event.details.operation_finished.emsg = NULL; + event.details.operation_finished.generic = NULL; + emsg = NULL; + cfg = NULL; + if (GNUNET_NO == ntohs (msg->success)) + { + emsg = GNUNET_malloc (ntohs (msg->header.size) + - sizeof (struct + GNUNET_TESTBED_ControllerLinkResponse) + 1); + GNUNET_memcpy (emsg, + &msg[1], + ntohs (msg->header.size)- sizeof (struct GNUNET_TESTBED_ControllerLinkResponse)); + event.details.operation_finished.emsg = emsg; + } + else + { + if (0 != ntohs (msg->config_size)) + { + cfg = GNUNET_TESTBED_extract_config_ ((const struct GNUNET_MessageHeader *) msg); + GNUNET_assert (NULL != cfg); + GNUNET_TESTBED_host_replace_cfg_ (host, cfg); + } + } + if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED))) + { + if (NULL != c->cc) + c->cc (c->cc_cls, &event); + } + else + LOG_DEBUG ("Not calling callback\n"); + if (NULL != cfg) + GNUNET_CONFIGURATION_destroy (cfg); + GNUNET_free_non_null (emsg); +} - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG: - GNUNET_assert (msize >= - sizeof (struct - GNUNET_TESTBED_PeerConfigurationInformationMessage)); - status = - handle_peer_config (c, - (const struct - GNUNET_TESTBED_PeerConfigurationInformationMessage - *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT: - GNUNET_assert (msize == - sizeof (struct GNUNET_TESTBED_ConnectionEventMessage)); - status = - handle_peer_conevent (c, - (const struct - GNUNET_TESTBED_ConnectionEventMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT: - GNUNET_assert (msize >= - sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage)); - status = - handle_op_fail_event (c, - (const struct - GNUNET_TESTBED_OperationFailureEventMessage *) - msg); - break; - case GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG: - GNUNET_assert (msize > - sizeof (struct GNUNET_TESTBED_SlaveConfiguration)); - status = - handle_slave_config (c, (const struct - GNUNET_TESTBED_SlaveConfiguration *) msg); - break; - default: - GNUNET_assert (0); + +/** + * Validate #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS message. + * + * @param cls the controller handle to determine the connection this message + * belongs to + * @param msg the barrier status message + * @return #GNUNET_OK if the message is valid; #GNUNET_SYSERR to tear it + * down signalling an error (message malformed) + */ +static int +check_barrier_status (void *cls, + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) +{ + uint16_t msize; + uint16_t name_len; + int status; + const char *name; + size_t emsg_len; + + msize = ntohs (msg->header.size); + name = msg->data; + name_len = ntohs (msg->name_len); + + if (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ('\0' != name[name_len]) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; } - if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive)) + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) { - c->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (c->client, &message_handler, c, - GNUNET_TIME_UNIT_FOREVER_REL); + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + + 1); /* +1!? */ + if (0 == emsg_len) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } } + return GNUNET_OK; } /** - * Function called to notify a client about the connection begin ready to queue - * more data. "buf" will be NULL and "size" zero if the connection was closed - * for writing in the meantime. + * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls the controller handle to determine the connection this message + * belongs to + * @param msg the barrier status message */ -static size_t -transmit_ready_notify (void *cls, size_t size, void *buf) +static void +handle_barrier_status (void *cls, + const struct GNUNET_TESTBED_BarrierStatusMsg *msg) { struct GNUNET_TESTBED_Controller *c = cls; - struct MessageQueue *mq_entry; - - c->th = NULL; - mq_entry = c->mq_head; - GNUNET_assert (NULL != mq_entry); - if ((0 == size) && (NULL == buf)) /* Timeout */ - { - LOG_DEBUG ("Message sending timed out -- retrying\n"); - c->th = - GNUNET_CLIENT_notify_transmit_ready (c->client, - ntohs (mq_entry->msg->size), - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, c); - return 0; - } - GNUNET_assert (ntohs (mq_entry->msg->size) <= size); - size = ntohs (mq_entry->msg->size); - memcpy (buf, mq_entry->msg, size); - LOG_DEBUG ("Message of type: %u and size: %u sent\n", - ntohs (mq_entry->msg->type), size); - GNUNET_free (mq_entry->msg); - GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry); - GNUNET_free (mq_entry); - mq_entry = c->mq_head; - if (NULL != mq_entry) - c->th = - GNUNET_CLIENT_notify_transmit_ready (c->client, - ntohs (mq_entry->msg->size), - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, c); - if (GNUNET_NO == c->in_receive) - { - c->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (c->client, &message_handler, c, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return size; + struct GNUNET_TESTBED_Barrier *barrier; + char *emsg; + const char *name; + struct GNUNET_HashCode key; + size_t emsg_len; + int status; + uint16_t msize; + uint16_t name_len; + + emsg = NULL; + barrier = NULL; + msize = ntohs (msg->header.size); + if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)) + { + GNUNET_break_op (0); + goto cleanup; + } + name = msg->data; + name_len = ntohs (msg->name_len); + if (name_len >= //name_len is strlen(barrier_name) + (msize - ((sizeof msg->header) + sizeof (msg->status)) ) ) + { + GNUNET_break_op (0); + goto cleanup; + } + if ('\0' != name[name_len]) + { + GNUNET_break_op (0); + goto cleanup; + } + LOG_DEBUG ("Received BARRIER_STATUS msg\n"); + status = ntohs (msg->status); + if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) + { + status = -1; + //unlike name_len, emsg_len includes the trailing zero + emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + + (name_len + 1)); + if (0 == emsg_len) + { + GNUNET_break_op (0); + goto cleanup; + } + if ('\0' != (msg->data[(name_len + 1) + (emsg_len - 1)])) + { + GNUNET_break_op (0); + goto cleanup; + } + emsg = GNUNET_malloc (emsg_len); + GNUNET_memcpy (emsg, + msg->data + name_len + 1, + emsg_len); + } + if (NULL == c->barrier_map) + { + GNUNET_break_op (0); + goto cleanup; + } + GNUNET_CRYPTO_hash (name, name_len, &key); + barrier = GNUNET_CONTAINER_multihashmap_get (c->barrier_map, &key); + if (NULL == barrier) + { + GNUNET_break_op (0); + goto cleanup; + } + GNUNET_assert (NULL != barrier->cb); + if ((GNUNET_YES == barrier->echo) && + (GNUNET_TESTBED_BARRIERSTATUS_CROSSED == status)) + GNUNET_TESTBED_queue_message_ (c, + GNUNET_copy_message (&msg->header)); + barrier->cb (barrier->cls, + name, + barrier, + status, + emsg); + if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED == status) + return; /* just initialised; skip cleanup */ + + cleanup: + GNUNET_free_non_null (emsg); + /** + * Do not remove the barrier if we did not echo the status back; this is + * required at the chained testbed controller setup to ensure the only the + * test-driver echos the status and the controller hierarchy properly + * propagates the status. + */ + if ((NULL != barrier) && (GNUNET_YES == barrier->echo)) + GNUNET_TESTBED_barrier_remove_ (barrier); } @@ -1189,7 +1329,8 @@ void GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, struct GNUNET_MessageHeader *msg) { - struct MessageQueue *mq_entry; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; uint16_t type; uint16_t size; @@ -1197,19 +1338,13 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, size = ntohs (msg->size); GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) && (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type)); - mq_entry = GNUNET_malloc (sizeof (struct MessageQueue)); - mq_entry->msg = msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u, size %u for sending\n", type, - ntohs (msg->size)); - GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail, - mq_entry); - if (NULL == controller->th) - controller->th = - GNUNET_CLIENT_notify_transmit_ready (controller->client, size, - TIMEOUT_REL, GNUNET_YES, - &transmit_ready_notify, - controller); + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + GNUNET_memcpy (m2, msg, size); + GNUNET_free (msg); + GNUNET_MQ_send (controller->mq, + env); } @@ -1228,31 +1363,37 @@ GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, * operation */ struct OperationContext * -GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller - *controller, uint64_t operation_id, +GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller, + uint64_t operation_id, const struct GNUNET_MessageHeader *msg, - GNUNET_CLIENT_MessageHandler cc, + GNUNET_MQ_MessageCallback cc, void *cc_cls) { struct OperationContext *opc; struct ForwardedOperationData *data; - struct GNUNET_MessageHeader *dup_msg; - uint16_t msize; - - data = GNUNET_malloc (sizeof (struct ForwardedOperationData)); + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *m2; + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + + env = GNUNET_MQ_msg_extra (m2, + size - sizeof (*m2), + type); + GNUNET_memcpy (m2, + msg, + size); + GNUNET_MQ_send (controller->mq, + env); + data = GNUNET_new (struct ForwardedOperationData); data->cc = cc; data->cc_cls = cc_cls; - opc = GNUNET_malloc (sizeof (struct OperationContext)); + opc = GNUNET_new (struct OperationContext); opc->c = controller; opc->type = OP_FORWARDED; opc->data = data; opc->id = operation_id; - msize = ntohs (msg->size); - dup_msg = GNUNET_malloc (msize); - (void) memcpy (dup_msg, msg, msize); - GNUNET_TESTBED_queue_message_ (opc->c, dup_msg); - GNUNET_CONTAINER_DLL_insert_tail (controller->ocq_head, controller->ocq_tail, - opc); + GNUNET_TESTBED_insert_opc_ (controller, + opc); return opc; } @@ -1266,104 +1407,13 @@ GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller void GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc) { - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, + opc); GNUNET_free (opc->data); GNUNET_free (opc); } -/** - * Functions with this signature are called whenever a - * complete message is received by the tokenizer. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure - * @param client identification of the client - * @param message the actual message - * - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -helper_mst (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_TESTBED_ControllerProc *cp = cls; - const struct GNUNET_TESTBED_HelperReply *msg; - const char *hostname; - char *config; - uLongf config_size; - uLongf xconfig_size; - - msg = (const struct GNUNET_TESTBED_HelperReply *) message; - GNUNET_assert (sizeof (struct GNUNET_TESTBED_HelperReply) < - ntohs (msg->header.size)); - GNUNET_assert (GNUNET_MESSAGE_TYPE_TESTBED_HELPER_REPLY == - ntohs (msg->header.type)); - config_size = (uLongf) ntohs (msg->config_size); - xconfig_size = - (uLongf) (ntohs (msg->header.size) - - sizeof (struct GNUNET_TESTBED_HelperReply)); - config = GNUNET_malloc (config_size); - GNUNET_assert (Z_OK == - uncompress ((Bytef *) config, &config_size, - (const Bytef *) &msg[1], xconfig_size)); - GNUNET_assert (NULL == cp->cfg); - cp->cfg = GNUNET_CONFIGURATION_create (); - GNUNET_assert (GNUNET_CONFIGURATION_deserialize - (cp->cfg, config, config_size, GNUNET_NO)); - GNUNET_free (config); - if ((NULL == cp->host) || - (NULL == (hostname = GNUNET_TESTBED_host_get_hostname (cp->host)))) - hostname = "localhost"; - /* Change the hostname so that we can connect to it */ - GNUNET_CONFIGURATION_set_value_string (cp->cfg, "testbed", "hostname", - hostname); - cp->cb (cp->cls, cp->cfg, GNUNET_OK); - return GNUNET_OK; -} - - -/** - * Continuation function from GNUNET_HELPER_send() - * - * @param cls closure - * @param result GNUNET_OK on success, - * GNUNET_NO if helper process died - * GNUNET_SYSERR during GNUNET_HELPER_stop - */ -static void -clear_msg (void *cls, int result) -{ - struct GNUNET_TESTBED_ControllerProc *cp = cls; - - GNUNET_assert (NULL != cp->shandle); - cp->shandle = NULL; - GNUNET_free (cp->msg); -} - - -/** - * Callback that will be called when the helper process dies. This is not called - * when the helper process is stoped using GNUNET_HELPER_stop() - * - * @param cls the closure from GNUNET_HELPER_start() - */ -static void -helper_exp_cb (void *cls) -{ - struct GNUNET_TESTBED_ControllerProc *cp = cls; - GNUNET_TESTBED_ControllerStatusCallback cb; - void *cb_cls; - - cb = cp->cb; - cb_cls = cp->cls; - cp->helper = NULL; - GNUNET_TESTBED_controller_stop (cp); - if (NULL != cb) - cb (cb_cls, NULL, GNUNET_SYSERR); -} - - /** * Function to call to start a link-controllers type operation once all queues * the operation is part of declare that the operation can be activated. @@ -1375,14 +1425,14 @@ opstart_link_controllers (void *cls) { struct OperationContext *opc = cls; struct ControllerLinkData *data; - struct GNUNET_TESTBED_ControllerLinkMessage *msg; + struct GNUNET_TESTBED_ControllerLinkRequest *msg; GNUNET_assert (NULL != opc->data); data = opc->data; msg = data->msg; data->msg = NULL; opc->state = OPC_STATE_STARTED; - GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_insert_opc_ (opc->c, opc); GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); } @@ -1405,7 +1455,7 @@ oprelease_link_controllers (void *cls) GNUNET_free (data->msg); break; case OPC_STATE_STARTED: - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); break; case OPC_STATE_FINISHED: break; @@ -1424,12 +1474,15 @@ static void opstart_get_slave_config (void *cls) { struct OperationContext *opc = cls; - struct GetSlaveConfigData *data; + struct GetSlaveConfigData *data = opc->data; struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg; - data = opc->data; + GNUNET_assert (NULL != data); msg = GNUNET_TESTBED_generate_slavegetconfig_msg_ (opc->id, data->slave_id); - GNUNET_CONTAINER_DLL_insert_tail (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_free (opc->data); + data = NULL; + opc->data = NULL; + GNUNET_TESTBED_insert_opc_ (opc->c, opc); GNUNET_TESTBED_queue_message_ (opc->c, &msg->header); opc->state = OPC_STATE_STARTED; } @@ -1451,8 +1504,7 @@ oprelease_get_slave_config (void *cls) GNUNET_free (opc->data); break; case OPC_STATE_STARTED: - GNUNET_free (opc->data); - GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + GNUNET_TESTBED_remove_opc_ (opc->c, opc); break; case OPC_STATE_FINISHED: if (NULL != opc->data) @@ -1464,208 +1516,34 @@ oprelease_get_slave_config (void *cls) /** - * FIXME: doc + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param - * @return + * @param cls closure, a `struct GNUNET_TESTBED_Controller *` + * @param error error code */ static void -GNUNET_TESTBED_set_num_parallel_overlay_connects_ (struct - GNUNET_TESTBED_Controller *c, - unsigned int npoc) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - fprintf (stderr, "%d", npoc); - GNUNET_free_non_null (c->tslots); - c->tslots_filled = 0; - c->num_parallel_connects = npoc; - c->tslots = GNUNET_malloc (npoc * sizeof (struct TimeSlot)); - GNUNET_TESTBED_operation_queue_reset_max_active_ - (c->opq_parallel_overlay_connect_operations, npoc); + /* struct GNUNET_TESTBED_Controller *c = cls; */ + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Encountered MQ error: %d\n", + error); + /* now what? */ + GNUNET_SCHEDULER_shutdown (); /* seems most reasonable */ } /** - * Function to copy NULL terminated list of arguments + * Start a controller process using the given configuration at the + * given host. * - * @param argv the NULL terminated list of arguments. Cannot be NULL. - * @return the copied NULL terminated arguments - */ -static char ** -copy_argv (const char *const *argv) -{ - char **argv_dup; - unsigned int argp; - - GNUNET_assert (NULL != argv); - for (argp = 0; NULL != argv[argp]; argp++); - argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1)); - for (argp = 0; NULL != argv[argp]; argp++) - argv_dup[argp] = strdup (argv[argp]); - return argv_dup; -} - - -/** - * Frees the given NULL terminated arguments - * - * @param argv the NULL terminated list of arguments - */ -static void -free_argv (char **argv) -{ - unsigned int argp; - - for (argp = 0; NULL != argv[argp]; argp++) - GNUNET_free (argv[argp]); - GNUNET_free (argv); -} - - -/** - * Starts a controller process at the given host - * - * @param trusted_ip the ip address of the controller which will be set as TRUSTED - * HOST(all connections form this ip are permitted by the testbed) when - * starting testbed controller at host. This can either be a single ip - * address or a network address in CIDR notation. - * @param controller_ip the ip address of the controller. Will be set as TRUSTED - * host when starting testbed controller at host - * @param host the host where the controller has to be started; NULL for - * localhost - * @param cfg template configuration to use for the remote controller; the - * remote controller will be started with a slightly modified - * configuration (port numbers, unix domain sockets and service home - * values are changed as per TESTING library on the remote host) - * @param cb function called when the controller is successfully started or - * dies unexpectedly; GNUNET_TESTBED_controller_stop shouldn't be - * called if cb is called with GNUNET_SYSERR as status. Will never be - * called in the same task as 'GNUNET_TESTBED_controller_start' - * (synchronous errors will be signalled by returning NULL). This - * parameter cannot be NULL. - * @param cls closure for above callbacks - * @return the controller process handle, NULL on errors - */ -struct GNUNET_TESTBED_ControllerProc * -GNUNET_TESTBED_controller_start (const char *trusted_ip, - struct GNUNET_TESTBED_Host *host, - const struct GNUNET_CONFIGURATION_Handle *cfg, - GNUNET_TESTBED_ControllerStatusCallback cb, - void *cls) -{ - struct GNUNET_TESTBED_ControllerProc *cp; - struct GNUNET_TESTBED_HelperInit *msg; - const char *hostname; - static char *const binary_argv[] = { - HELPER_TESTBED_BINARY, NULL - }; - - hostname = NULL; - cp = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_ControllerProc)); - if ((NULL == host) || (0 == GNUNET_TESTBED_host_get_id_ (host))) - { - cp->helper = - GNUNET_HELPER_start (GNUNET_YES, HELPER_TESTBED_BINARY, binary_argv, - &helper_mst, &helper_exp_cb, cp); - } - else - { - char *helper_binary_path; -#define NUM_REMOTE_ARGS 12 - const char *remote_args[NUM_REMOTE_ARGS]; - const char *username; - char *port; - char *dst; - unsigned int argp; - - username = GNUNET_TESTBED_host_get_username_ (host); - hostname = GNUNET_TESTBED_host_get_hostname (host); - GNUNET_asprintf (&port, "%u", GNUNET_TESTBED_host_get_ssh_port_ (host)); - if (NULL == username) - GNUNET_asprintf (&dst, "%s", hostname); - else - GNUNET_asprintf (&dst, "%s@%s", username, hostname); - LOG_DEBUG ("Starting SSH to destination %s\n", dst); - argp = 0; - remote_args[argp++] = "ssh"; - remote_args[argp++] = "-p"; - remote_args[argp++] = port; - remote_args[argp++] = "-o"; - remote_args[argp++] = "BatchMode=yes"; - remote_args[argp++] = "-o"; - remote_args[argp++] = "NoHostAuthenticationForLocalhost=yes"; - remote_args[argp++] = dst; - if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "testbed", - "HELPER_BINARY_PATH", - &helper_binary_path)) - helper_binary_path = GNUNET_OS_get_libexec_binary_path (HELPER_TESTBED_BINARY); - remote_args[argp++] = "sh"; - remote_args[argp++] = "-lc"; - remote_args[argp++] = helper_binary_path; - remote_args[argp++] = NULL; - GNUNET_assert (NUM_REMOTE_ARGS == argp); - cp->helper_argv = copy_argv (remote_args); - GNUNET_free (port); - GNUNET_free (dst); - cp->helper = - GNUNET_HELPER_start (GNUNET_NO, "ssh", cp->helper_argv, &helper_mst, - &helper_exp_cb, cp); - GNUNET_free (helper_binary_path); - } - if (NULL == cp->helper) - { - if (NULL != cp->helper_argv) - free_argv (cp->helper_argv); - GNUNET_free (cp); - return NULL; - } - cp->host = host; - cp->cb = cb; - cp->cls = cls; - msg = GNUNET_TESTBED_create_helper_init_msg_ (trusted_ip, hostname, cfg); - cp->msg = &msg->header; - cp->shandle = - GNUNET_HELPER_send (cp->helper, &msg->header, GNUNET_NO, &clear_msg, cp); - if (NULL == cp->shandle) - { - GNUNET_free (msg); - GNUNET_TESTBED_controller_stop (cp); - return NULL; - } - return cp; -} - - -/** - * Stop the controller process (also will terminate all peers and controllers - * dependent on this controller). This function blocks until the testbed has - * been fully terminated (!). The controller status cb from - * GNUNET_TESTBED_controller_start() will not be called. - * - * @param cproc the controller process handle - */ -void -GNUNET_TESTBED_controller_stop (struct GNUNET_TESTBED_ControllerProc *cproc) -{ - if (NULL != cproc->shandle) - GNUNET_HELPER_send_cancel (cproc->shandle); - if (NULL != cproc->helper) - GNUNET_HELPER_stop (cproc->helper); - if (NULL != cproc->cfg) - GNUNET_CONFIGURATION_destroy (cproc->cfg); - if (NULL != cproc->helper_argv) - free_argv (cproc->helper_argv); - GNUNET_free (cproc); -} - - -/** - * Start a controller process using the given configuration at the - * given host. - * - * @param cfg configuration to use * @param host host to run the controller on; This should be the same host if * the controller was previously started with - * GNUNET_TESTBED_controller_start; NULL for localhost + * GNUNET_TESTBED_controller_start() * @param event_mask bit mask with set of events to call 'cc' for; * or-ed values of "1LL" shifted by the * respective 'enum GNUNET_TESTBED_EventType' @@ -1675,25 +1553,73 @@ GNUNET_TESTBED_controller_stop (struct GNUNET_TESTBED_ControllerProc *cproc) * @return handle to the controller */ struct GNUNET_TESTBED_Controller * -GNUNET_TESTBED_controller_connect (const struct GNUNET_CONFIGURATION_Handle - *cfg, struct GNUNET_TESTBED_Host *host, +GNUNET_TESTBED_controller_connect (struct GNUNET_TESTBED_Host *host, uint64_t event_mask, GNUNET_TESTBED_ControllerCallback cc, void *cc_cls) { - struct GNUNET_TESTBED_Controller *controller; + struct GNUNET_TESTBED_Controller *controller + = GNUNET_new (struct GNUNET_TESTBED_Controller); + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (add_host_confirm, + GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS, + struct GNUNET_TESTBED_HostConfirmedMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_conevent, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT, + struct GNUNET_TESTBED_ConnectionEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (opsuccess, + GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS, + struct GNUNET_TESTBED_GenericOperationSuccessEventMessage, + controller), + GNUNET_MQ_hd_var_size (op_fail_event, + GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT, + struct GNUNET_TESTBED_OperationFailureEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_create_success, + GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS, + struct GNUNET_TESTBED_PeerCreateSuccessEventMessage, + controller), + GNUNET_MQ_hd_fixed_size (peer_event, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT, + struct GNUNET_TESTBED_PeerEventMessage, + controller), + GNUNET_MQ_hd_var_size (peer_config, + GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, + struct GNUNET_TESTBED_PeerConfigurationInformationMessage, + controller), + GNUNET_MQ_hd_var_size (slave_config, + GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, + struct GNUNET_TESTBED_SlaveConfiguration, + controller), + GNUNET_MQ_hd_var_size (link_controllers_result, + GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + struct GNUNET_TESTBED_ControllerLinkResponse, + controller), + GNUNET_MQ_hd_var_size (barrier_status, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS, + struct GNUNET_TESTBED_BarrierStatusMsg, + controller), + GNUNET_MQ_handler_end () + }; struct GNUNET_TESTBED_InitMessage *msg; + struct GNUNET_MQ_Envelope *env; + const struct GNUNET_CONFIGURATION_Handle *cfg; const char *controller_hostname; unsigned long long max_parallel_operations; unsigned long long max_parallel_service_connections; unsigned long long max_parallel_topology_config_operations; + size_t slen; + GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host))); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "testbed", "MAX_PARALLEL_OPERATIONS", &max_parallel_operations)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } if (GNUNET_OK != @@ -1702,6 +1628,7 @@ GNUNET_TESTBED_controller_connect (const struct GNUNET_CONFIGURATION_Handle &max_parallel_service_connections)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } if (GNUNET_OK != @@ -1710,346 +1637,112 @@ GNUNET_TESTBED_controller_connect (const struct GNUNET_CONFIGURATION_Handle &max_parallel_topology_config_operations)) { GNUNET_break (0); + GNUNET_free (controller); return NULL; } - controller = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Controller)); controller->cc = cc; controller->cc_cls = cc_cls; controller->event_mask = event_mask; controller->cfg = GNUNET_CONFIGURATION_dup (cfg); - controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg); - if (NULL == controller->client) + controller->mq = GNUNET_CLIENT_connect (controller->cfg, + "testbed", + handlers, + &mq_error_handler, + controller); + if (NULL == controller->mq) { + GNUNET_break (0); GNUNET_TESTBED_controller_disconnect (controller); return NULL; } - if (NULL == host) - { - host = GNUNET_TESTBED_host_create_by_id_ (0); - if (NULL == host) /* If the above host create fails */ - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Treating NULL host as localhost. Multiple references to localhost " - "may break when localhost freed before calling disconnect \n"); - host = GNUNET_TESTBED_host_lookup_by_id_ (0); - } - else - { - controller->aux_host = GNUNET_YES; - } - } - GNUNET_assert (NULL != host); GNUNET_TESTBED_mark_host_registered_at_ (host, controller); controller->host = host; controller->opq_parallel_operations = - GNUNET_TESTBED_operation_queue_create_ ((unsigned int) - max_parallel_operations); + GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, + (unsigned int) max_parallel_operations); controller->opq_parallel_service_connections = - GNUNET_TESTBED_operation_queue_create_ ((unsigned int) + GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, + (unsigned int) max_parallel_service_connections); - controller->opq_parallel_topology_config_operations= - GNUNET_TESTBED_operation_queue_create_ ((unsigned int) + controller->opq_parallel_topology_config_operations = + GNUNET_TESTBED_operation_queue_create_ (OPERATION_QUEUE_TYPE_FIXED, + (unsigned int) max_parallel_topology_config_operations); - controller->opq_parallel_overlay_connect_operations= - GNUNET_TESTBED_operation_queue_create_ (0); - GNUNET_TESTBED_set_num_parallel_overlay_connects_ (controller, 1); - controller->poc_sd = SD_init (10); controller_hostname = GNUNET_TESTBED_host_get_hostname (host); if (NULL == controller_hostname) controller_hostname = "127.0.0.1"; - msg = - GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT); - msg->header.size = - htons (sizeof (struct GNUNET_TESTBED_InitMessage) + - strlen (controller_hostname) + 1); + slen = strlen (controller_hostname) + 1; + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_INIT); msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); msg->event_mask = GNUNET_htonll (controller->event_mask); - strcpy ((char *) &msg[1], controller_hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); + GNUNET_memcpy (&msg[1], + controller_hostname, + slen); + GNUNET_MQ_send (controller->mq, + env); return controller; } /** - * Configure shared services at a controller. Using this function, - * you can specify that certain services (such as "resolver") - * should not be run for each peer but instead be shared - * across N peers on the specified host. This function - * must be called before any peers are created at the host. + * Iterator to free opc map entries * - * @param controller controller to configure - * @param service_name name of the service to share - * @param num_peers number of peers that should share one instance - * of the specified service (1 for no sharing is the default), - * use 0 to disable the service + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. */ -void -GNUNET_TESTBED_controller_configure_sharing (struct GNUNET_TESTBED_Controller - *controller, - const char *service_name, - uint32_t num_peers) +static int +opc_free_iterator (void *cls, uint32_t key, void *value) { - struct GNUNET_TESTBED_ConfigureSharedServiceMessage *msg; - uint16_t service_name_size; - uint16_t msg_size; + struct GNUNET_CONTAINER_MultiHashMap32 *map = cls; + struct OperationContext *opc = value; - service_name_size = strlen (service_name) + 1; - msg_size = - sizeof (struct GNUNET_TESTBED_ConfigureSharedServiceMessage) + - service_name_size; - msg = GNUNET_malloc (msg_size); - msg->header.size = htons (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SERVICESHARE); - msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (controller->host)); - msg->num_peers = htonl (num_peers); - memcpy (&msg[1], service_name, service_name_size); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); - GNUNET_break (0); /* This function is not yet implemented on the - testbed service */ + GNUNET_assert (NULL != opc); + GNUNET_break (0); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (map, key, value)); + GNUNET_free (opc); + return GNUNET_YES; } /** - * disconnects from the controller. + * Stop the given controller (also will terminate all peers and + * controllers dependent on this controller). This function + * blocks until the testbed has been fully terminated (!). * - * @param controller handle to controller to stop + * @param c handle to controller to stop */ void -GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller - *controller) -{ - struct MessageQueue *mq_entry; - - if (NULL != controller->th) - GNUNET_CLIENT_notify_transmit_ready_cancel (controller->th); - /* Clear the message queue */ - while (NULL != (mq_entry = controller->mq_head)) - { - GNUNET_CONTAINER_DLL_remove (controller->mq_head, controller->mq_tail, - mq_entry); - GNUNET_free (mq_entry->msg); - GNUNET_free (mq_entry); - } - if (NULL != controller->client) - GNUNET_CLIENT_disconnect (controller->client); - GNUNET_CONFIGURATION_destroy (controller->cfg); - if (GNUNET_YES == controller->aux_host) - GNUNET_TESTBED_host_destroy (controller->host); - GNUNET_TESTBED_operation_queue_destroy_ (controller->opq_parallel_operations); - GNUNET_TESTBED_operation_queue_destroy_ - (controller->opq_parallel_service_connections); - GNUNET_TESTBED_operation_queue_destroy_ - (controller->opq_parallel_topology_config_operations); - GNUNET_TESTBED_operation_queue_destroy_ - (controller->opq_parallel_overlay_connect_operations); - SD_destroy (controller->poc_sd); - GNUNET_free_non_null (controller->tslots); - GNUNET_free (controller); -} - - -/** - * Register a host with the controller - * - * @param controller the controller handle - * @param host the host to register - * @param cc the completion callback to call to inform the status of - * registration. After calling this callback the registration handle - * will be invalid. Cannot be NULL. - * @param cc_cls the closure for the cc - * @return handle to the host registration which can be used to cancel the - * registration - */ -struct GNUNET_TESTBED_HostRegistrationHandle * -GNUNET_TESTBED_register_host (struct GNUNET_TESTBED_Controller *controller, - struct GNUNET_TESTBED_Host *host, - GNUNET_TESTBED_HostRegistrationCompletion cc, - void *cc_cls) +GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller *c) { - struct GNUNET_TESTBED_HostRegistrationHandle *rh; - struct GNUNET_TESTBED_AddHostMessage *msg; - const char *username; - const char *hostname; - uint16_t msg_size; - uint16_t user_name_length; - - if (NULL != controller->rh) - return NULL; - hostname = GNUNET_TESTBED_host_get_hostname (host); - if (GNUNET_YES == GNUNET_TESTBED_is_host_registered_ (host, controller)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Host hostname: %s already registered\n", - (NULL == hostname) ? "localhost" : hostname); - return NULL; - } - rh = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_HostRegistrationHandle)); - rh->host = host; - rh->c = controller; - GNUNET_assert (NULL != cc); - rh->cc = cc; - rh->cc_cls = cc_cls; - controller->rh = rh; - username = GNUNET_TESTBED_host_get_username_ (host); - msg_size = (sizeof (struct GNUNET_TESTBED_AddHostMessage)); - user_name_length = 0; - if (NULL != username) - { - user_name_length = strlen (username) + 1; - msg_size += user_name_length; - } - /* FIXME: what happens when hostname is NULL? localhost */ - GNUNET_assert (NULL != hostname); - msg_size += strlen (hostname) + 1; - msg = GNUNET_malloc (msg_size); - msg->header.size = htons (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_ADDHOST); - msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host)); - msg->ssh_port = htons (GNUNET_TESTBED_host_get_ssh_port_ (host)); - if (NULL != username) + if (NULL != c->mq) { - msg->user_name_length = htons (user_name_length - 1); - memcpy (&msg[1], username, user_name_length); + GNUNET_MQ_destroy (c->mq); + c->mq = NULL; } - else - msg->user_name_length = htons (user_name_length); - strcpy (((void *) &msg[1]) + user_name_length, hostname); - GNUNET_TESTBED_queue_message_ (controller, - (struct GNUNET_MessageHeader *) msg); - return rh; -} - - -/** - * Cancel the pending registration. Note that if the registration message is - * already sent to the service the cancellation has only the effect that the - * registration completion callback for the registration is never called. - * - * @param handle the registration handle to cancel - */ -void -GNUNET_TESTBED_cancel_registration (struct GNUNET_TESTBED_HostRegistrationHandle - *handle) -{ - if (handle != handle->c->rh) + if (NULL != c->host) + GNUNET_TESTBED_deregister_host_at_ (c->host, c); + GNUNET_CONFIGURATION_destroy (c->cfg); + GNUNET_TESTBED_operation_queue_destroy_ (c->opq_parallel_operations); + GNUNET_TESTBED_operation_queue_destroy_ + (c->opq_parallel_service_connections); + GNUNET_TESTBED_operation_queue_destroy_ + (c->opq_parallel_topology_config_operations); + if (NULL != c->opc_map) { - GNUNET_break (0); - return; + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap32_iterate (c->opc_map, + &opc_free_iterator, + c->opc_map)); + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap32_size (c->opc_map)); + GNUNET_CONTAINER_multihashmap32_destroy (c->opc_map); } - handle->c->rh = NULL; - GNUNET_free (handle); -} - - -/** - * Same as the GNUNET_TESTBED_controller_link_2, but with ids for delegated host - * and slave host - * - * @param op_cls the operation closure for the event which is generated to - * signal success or failure of this operation - * @param master handle to the master controller who creates the association - * @param delegated_host_id id of the host to which requests should be delegated - * @param slave_host_id id of the host which is used to run the slave controller - * @param sxcfg serialized and compressed configuration - * @param sxcfg_size the size sxcfg - * @param scfg_size the size of uncompressed serialized configuration - * @param is_subordinate GNUNET_YES if the controller at delegated_host should - * be started by the slave controller; GNUNET_NO if the slave - * controller has to connect to the already started delegated - * controller via TCP/IP - * @return the operation handle - */ -struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_controller_link_2_ (void *op_cls, - struct GNUNET_TESTBED_Controller *master, - uint32_t delegated_host_id, - uint32_t slave_host_id, - const char *sxcfg, size_t sxcfg_size, - size_t scfg_size, int is_subordinate) -{ - struct OperationContext *opc; - struct GNUNET_TESTBED_ControllerLinkMessage *msg; - struct ControllerLinkData *data; - uint16_t msg_size; - - msg_size = sxcfg_size + sizeof (struct GNUNET_TESTBED_ControllerLinkMessage); - msg = GNUNET_malloc (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LCONTROLLERS); - msg->header.size = htons (msg_size); - msg->delegated_host_id = htonl (delegated_host_id); - msg->slave_host_id = htonl (slave_host_id); - msg->config_size = htons ((uint16_t) scfg_size); - msg->is_subordinate = (GNUNET_YES == is_subordinate) ? 1 : 0; - memcpy (&msg[1], sxcfg, sxcfg_size); - data = GNUNET_malloc (sizeof (struct ControllerLinkData)); - data->msg = msg; - opc = GNUNET_malloc (sizeof (struct OperationContext)); - opc->c = master; - opc->data = data; - opc->type = OP_LINK_CONTROLLERS; - opc->id = GNUNET_TESTBED_get_next_op_id (opc->c); - opc->state = OPC_STATE_INIT; - opc->op_cls = op_cls; - msg->operation_id = GNUNET_htonll (opc->id); - opc->op = - GNUNET_TESTBED_operation_create_ (opc, &opstart_link_controllers, - &oprelease_link_controllers); - GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations, - opc->op); - GNUNET_TESTBED_operation_begin_wait_ (opc->op); - return opc->op; -} - - -/** - * Same as the GNUNET_TESTBED_controller_link, however expects configuration in - * serialized and compressed - * - * @param op_cls the operation closure for the event which is generated to - * signal success or failure of this operation - * @param master handle to the master controller who creates the association - * @param delegated_host requests to which host should be delegated; cannot be NULL - * @param slave_host which host is used to run the slave controller; use NULL to - * make the master controller connect to the delegated host - * @param sxcfg serialized and compressed configuration - * @param sxcfg_size the size sxcfg - * @param scfg_size the size of uncompressed serialized configuration - * @param is_subordinate GNUNET_YES if the controller at delegated_host should - * be started by the slave controller; GNUNET_NO if the slave - * controller has to connect to the already started delegated - * controller via TCP/IP - * @return the operation handle - */ -struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_controller_link_2 (void *op_cls, - struct GNUNET_TESTBED_Controller *master, - struct GNUNET_TESTBED_Host *delegated_host, - struct GNUNET_TESTBED_Host *slave_host, - const char *sxcfg, size_t sxcfg_size, - size_t scfg_size, int is_subordinate) -{ - uint32_t delegated_host_id; - uint32_t slave_host_id; - - GNUNET_assert (GNUNET_YES == - GNUNET_TESTBED_is_host_registered_ (delegated_host, master)); - delegated_host_id = GNUNET_TESTBED_host_get_id_ (delegated_host); - slave_host_id = - GNUNET_TESTBED_host_get_id_ ((NULL != slave_host) - ? slave_host : master->host); - if ((NULL != slave_host) && (0 != GNUNET_TESTBED_host_get_id_ (slave_host))) - GNUNET_assert (GNUNET_YES == - GNUNET_TESTBED_is_host_registered_ (slave_host, master)); - - return GNUNET_TESTBED_controller_link_2_ (op_cls, - master, - delegated_host_id, - slave_host_id, - sxcfg, sxcfg_size, - scfg_size, is_subordinate); + GNUNET_free (c); } @@ -2063,7 +1756,8 @@ GNUNET_TESTBED_controller_link_2 (void *op_cls, * @return the size of the xconfig */ size_t -GNUNET_TESTBED_compress_config_ (const char *config, size_t size, +GNUNET_TESTBED_compress_config_ (const char *config, + size_t size, char **xconfig) { size_t xsize; @@ -2079,51 +1773,29 @@ GNUNET_TESTBED_compress_config_ (const char *config, size_t size, /** - * Same as the GNUNET_TESTBED_controller_link, but with ids for delegated host - * and slave host + * Function to serialize and compress using zlib a configuration through a + * configuration handle * - * @param op_cls the operation closure for the event which is generated to - * signal success or failure of this operation - * @param master handle to the master controller who creates the association - * @param delegated_host_id id of the host to which requests should be - * delegated; cannot be NULL - * @param slave_host_id id of the host which should connect to controller - * running on delegated host ; use NULL to make the master controller - * connect to the delegated host - * @param slave_cfg configuration to use for the slave controller - * @param is_subordinate GNUNET_YES if the controller at delegated_host should - * be started by the slave controller; GNUNET_NO if the slave - * controller has to connect to the already started delegated - * controller via TCP/IP - * @return the operation handle - */ -struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_controller_link_ (void *op_cls, - struct GNUNET_TESTBED_Controller *master, - uint32_t delegated_host_id, - uint32_t slave_host_id, - const struct GNUNET_CONFIGURATION_Handle - *slave_cfg, - int is_subordinate) + * @param cfg the configuration + * @param size the size of configuration when serialize. Will be set on success. + * @param xsize the sizeo of the compressed configuration. Will be set on success. + * @return the serialized and compressed configuration + */ +char * +GNUNET_TESTBED_compress_cfg_ (const struct GNUNET_CONFIGURATION_Handle *cfg, + size_t *size, size_t *xsize) { - struct GNUNET_TESTBED_Operation *op; char *config; - char *cconfig; - size_t cc_size; - size_t config_size; + char *xconfig; + size_t size_; + size_t xsize_; - config = GNUNET_CONFIGURATION_serialize (slave_cfg, &config_size); - cc_size = GNUNET_TESTBED_compress_config_ (config, config_size, &cconfig); + config = GNUNET_CONFIGURATION_serialize (cfg, &size_); + xsize_ = GNUNET_TESTBED_compress_config_ (config, size_, &xconfig); GNUNET_free (config); - /* Configuration doesn't fit in 1 message */ - GNUNET_assert ((UINT16_MAX - - sizeof (struct GNUNET_TESTBED_ControllerLinkMessage)) >= - cc_size); - op = GNUNET_TESTBED_controller_link_2_ (op_cls, master, delegated_host_id, - slave_host_id, (const char *) cconfig, - cc_size, config_size, is_subordinate); - GNUNET_free (cconfig); - return op; + *size = size_; + *xsize = xsize_; + return xconfig; } @@ -2149,7 +1821,6 @@ GNUNET_TESTBED_controller_link_ (void *op_cls, * @param delegated_host requests to which host should be delegated; cannot be NULL * @param slave_host which host is used to run the slave controller; use NULL to * make the master controller connect to the delegated host - * @param slave_cfg configuration to use for the slave controller * @param is_subordinate GNUNET_YES if the controller at delegated_host should * be started by the slave controller; GNUNET_NO if the slave * controller has to connect to the already started delegated @@ -2158,30 +1829,52 @@ GNUNET_TESTBED_controller_link_ (void *op_cls, */ struct GNUNET_TESTBED_Operation * GNUNET_TESTBED_controller_link (void *op_cls, - struct GNUNET_TESTBED_Controller *master, + struct GNUNET_TESTBED_Controller *master, struct GNUNET_TESTBED_Host *delegated_host, struct GNUNET_TESTBED_Host *slave_host, - const struct GNUNET_CONFIGURATION_Handle - *slave_cfg, int is_subordinate) + int is_subordinate) { + struct OperationContext *opc; + struct GNUNET_TESTBED_ControllerLinkRequest *msg; + struct ControllerLinkData *data; uint32_t slave_host_id; uint32_t delegated_host_id; + uint16_t msg_size; GNUNET_assert (GNUNET_YES == GNUNET_TESTBED_is_host_registered_ (delegated_host, master)); - slave_host_id = - GNUNET_TESTBED_host_get_id_ ((NULL != slave_host) - ? slave_host : master->host); + slave_host_id = + GNUNET_TESTBED_host_get_id_ ((NULL != + slave_host) ? slave_host : master->host); delegated_host_id = GNUNET_TESTBED_host_get_id_ (delegated_host); if ((NULL != slave_host) && (0 != slave_host_id)) GNUNET_assert (GNUNET_YES == GNUNET_TESTBED_is_host_registered_ (slave_host, master)); - return GNUNET_TESTBED_controller_link_ (op_cls, master, - delegated_host_id, - slave_host_id, - slave_cfg, - is_subordinate); - + msg_size = sizeof (struct GNUNET_TESTBED_ControllerLinkRequest); + msg = GNUNET_malloc (msg_size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS); + msg->header.size = htons (msg_size); + msg->delegated_host_id = htonl (delegated_host_id); + msg->slave_host_id = htonl (slave_host_id); + msg->is_subordinate = (GNUNET_YES == is_subordinate) ? 1 : 0; + data = GNUNET_new (struct ControllerLinkData); + data->msg = msg; + data->host_id = delegated_host_id; + opc = GNUNET_new (struct OperationContext); + opc->c = master; + opc->data = data; + opc->type = OP_LINK_CONTROLLERS; + opc->id = GNUNET_TESTBED_get_next_op_id (opc->c); + opc->state = OPC_STATE_INIT; + opc->op_cls = op_cls; + msg->operation_id = GNUNET_htonll (opc->id); + opc->op = + GNUNET_TESTBED_operation_create_ (opc, &opstart_link_controllers, + &oprelease_link_controllers); + GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations, + opc->op); + GNUNET_TESTBED_operation_begin_wait_ (opc->op); + return opc->op; } @@ -2201,13 +1894,13 @@ struct GNUNET_TESTBED_Operation * GNUNET_TESTBED_get_slave_config_ (void *op_cls, struct GNUNET_TESTBED_Controller *master, uint32_t slave_host_id) -{ +{ struct OperationContext *opc; struct GetSlaveConfigData *data; - data = GNUNET_malloc (sizeof (struct GetSlaveConfigData)); + data = GNUNET_new (struct GetSlaveConfigData); data->slave_id = slave_host_id; - opc = GNUNET_malloc (sizeof (struct OperationContext)); + opc = GNUNET_new (struct OperationContext); opc->state = OPC_STATE_INIT; opc->c = master; opc->id = GNUNET_TESTBED_get_next_op_id (master); @@ -2218,7 +1911,7 @@ GNUNET_TESTBED_get_slave_config_ (void *op_cls, GNUNET_TESTBED_operation_create_ (opc, &opstart_get_slave_config, &oprelease_get_slave_config); GNUNET_TESTBED_operation_queue_insert_ (master->opq_parallel_operations, - opc->op); + opc->op); GNUNET_TESTBED_operation_begin_wait_ (opc->op); return opc->op; } @@ -2247,7 +1940,8 @@ GNUNET_TESTBED_get_slave_config (void *op_cls, if (GNUNET_NO == GNUNET_TESTBED_is_host_registered_ (slave_host, master)) return NULL; return GNUNET_TESTBED_get_slave_config_ (op_cls, master, - GNUNET_TESTBED_host_get_id_ (slave_host)); + GNUNET_TESTBED_host_get_id_ + (slave_host)); } @@ -2284,7 +1978,7 @@ GNUNET_TESTBED_overlay_write_topology_to_file (struct GNUNET_TESTBED_Controller */ struct GNUNET_TESTBED_HelperInit * GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip, - const char *hostname, + const char *hostname, const struct GNUNET_CONFIGURATION_Handle *cfg) { @@ -2305,12 +1999,12 @@ GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip, trusted_ip_len = strlen (trusted_ip); hostname_len = (NULL == hostname) ? 0 : strlen (hostname); msg_size = - xconfig_size + trusted_ip_len + 1 + sizeof (struct GNUNET_TESTBED_HelperInit); + xconfig_size + trusted_ip_len + 1 + + sizeof (struct GNUNET_TESTBED_HelperInit); msg_size += hostname_len; msg = GNUNET_realloc (xconfig, msg_size); - (void) memmove (((void *) &msg[1]) + trusted_ip_len + 1 + hostname_len, - msg, - xconfig_size); + (void) memmove (((void *) &msg[1]) + trusted_ip_len + 1 + hostname_len, msg, + xconfig_size); msg->header.size = htons (msg_size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_HELPER_INIT); msg->trusted_ip_size = htons (trusted_ip_len); @@ -2318,55 +2012,58 @@ GNUNET_TESTBED_create_helper_init_msg_ (const char *trusted_ip, msg->config_size = htons (config_size); (void) strcpy ((char *) &msg[1], trusted_ip); if (0 != hostname_len) - (void) strncpy (((char *) &msg[1]) + trusted_ip_len + 1, hostname, hostname_len); + (void) strncpy (((char *) &msg[1]) + trusted_ip_len + 1, hostname, + hostname_len); return msg; } /** - * Cancel a pending operation. Releases all resources - * of the operation and will ensure that no event - * is generated for the operation. Does NOT guarantee - * that the operation will be fully undone (or that + * This function is used to signal that the event information (struct + * GNUNET_TESTBED_EventInformation) from an operation has been fully processed + * i.e. if the event callback is ever called for this operation. If the event + * callback for this operation has not yet been called, calling this function + * cancels the operation, frees its resources and ensures the no event is + * generated with respect to this operation. Note that however cancelling an + * operation does NOT guarantee that the operation will be fully undone (or that * nothing ever happened). * - * @param operation operation to cancel - */ -void -GNUNET_TESTBED_operation_cancel (struct GNUNET_TESTBED_Operation *operation) -{ - GNUNET_TESTBED_operation_done (operation); -} - - -/** - * Signal that the information from an operation has been fully - * processed. This function MUST be called for each event - * of type 'operation_finished' to fully remove the operation - * from the operation queue. After calling this function, the - * 'op_result' becomes invalid (!). + * This function MUST be called for every operation to fully remove the + * operation from the operation queue. After calling this function, if + * operation is completed and its event information is of type + * GNUNET_TESTBED_ET_OPERATION_FINISHED, the 'op_result' becomes invalid (!). + + * If the operation is generated from GNUNET_TESTBED_service_connect() then + * calling this function on such as operation calls the disconnect adapter if + * the connect adapter was ever called. * - * @param operation operation to signal completion for + * @param operation operation to signal completion or cancellation */ void GNUNET_TESTBED_operation_done (struct GNUNET_TESTBED_Operation *operation) { - GNUNET_TESTBED_operation_release_ (operation); + (void) exop_check (operation); + GNUNET_TESTBED_operation_release_ (operation); } /** * Generates configuration by uncompressing configuration in given message. The * given message should be of the following types: - * GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG, - * GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG + * #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION, + * #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION, + * #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST, + * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS, + * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT, + * + * FIXME: This API is incredibly ugly. * * @param msg the message containing compressed configuration - * @return handle to the parsed configuration + * @return handle to the parsed configuration; NULL upon error while parsing the message */ struct GNUNET_CONFIGURATION_Handle * GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg) -{ +{ struct GNUNET_CONFIGURATION_Handle *cfg; Bytef *data; const Bytef *xdata; @@ -2376,42 +2073,98 @@ GNUNET_TESTBED_extract_config_ (const struct GNUNET_MessageHeader *msg) switch (ntohs (msg->type)) { - case GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG: + case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION: + { + const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *imsg; + + imsg = + (const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *) msg; + data_len = (uLong) ntohs (imsg->config_size); + xdata_len = + ntohs (imsg->header.size) - + sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage); + xdata = (const Bytef *) &imsg[1]; + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION: + { + const struct GNUNET_TESTBED_SlaveConfiguration *imsg; + + imsg = (const struct GNUNET_TESTBED_SlaveConfiguration *) msg; + data_len = (uLong) ntohs (imsg->config_size); + xdata_len = + ntohs (imsg->header.size) - + sizeof (struct GNUNET_TESTBED_SlaveConfiguration); + xdata = (const Bytef *) &imsg[1]; + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST: { - const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *imsg; + const struct GNUNET_TESTBED_AddHostMessage *imsg; + uint16_t osize; - imsg = (const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *) - msg; + imsg = (const struct GNUNET_TESTBED_AddHostMessage *) msg; data_len = (uLong) ntohs (imsg->config_size); - xdata_len = ntohs (imsg->header.size) - - sizeof (struct GNUNET_TESTBED_PeerConfigurationInformationMessage); + osize = sizeof (struct GNUNET_TESTBED_AddHostMessage) + + ntohs (imsg->username_length) + ntohs (imsg->hostname_length); + xdata_len = ntohs (imsg->header.size) - osize; + xdata = (const Bytef *) ((const void *) imsg + osize); + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT: + { + const struct GNUNET_TESTBED_ControllerLinkResponse *imsg; + + imsg = (const struct GNUNET_TESTBED_ControllerLinkResponse *) msg; + data_len = ntohs (imsg->config_size); + xdata_len = ntohs (imsg->header.size) - + sizeof (const struct GNUNET_TESTBED_ControllerLinkResponse); xdata = (const Bytef *) &imsg[1]; } break; - case GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG: + case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER: { - const struct GNUNET_TESTBED_SlaveConfiguration *imsg; + const struct GNUNET_TESTBED_PeerCreateMessage *imsg; - imsg = (const struct GNUNET_TESTBED_SlaveConfiguration *) msg; - data_len = (uLong) ntohs (imsg->config_size); - xdata_len = ntohs (imsg->header.size) - - sizeof (struct GNUNET_TESTBED_SlaveConfiguration); + imsg = (const struct GNUNET_TESTBED_PeerCreateMessage *) msg; + data_len = ntohs (imsg->config_size); + xdata_len = ntohs (imsg->header.size) - + sizeof (struct GNUNET_TESTBED_PeerCreateMessage); + xdata = (const Bytef *) &imsg[1]; + } + break; + case GNUNET_MESSAGE_TYPE_TESTBED_RECONFIGURE_PEER: + { + const struct GNUNET_TESTBED_PeerReconfigureMessage *imsg; + + imsg = (const struct GNUNET_TESTBED_PeerReconfigureMessage *) msg; + data_len = ntohs (imsg->config_size); + xdata_len = ntohs (imsg->header.size) - + sizeof (struct GNUNET_TESTBED_PeerReconfigureMessage); xdata = (const Bytef *) &imsg[1]; } break; default: GNUNET_assert (0); - } + } data = GNUNET_malloc (data_len); - if (Z_OK != - (ret = - uncompress (data, &data_len, xdata, xdata_len))) - GNUNET_assert (0); + if (Z_OK != (ret = uncompress (data, &data_len, xdata, xdata_len))) + { + GNUNET_free (data); + GNUNET_break_op (0); /* Un-compression failure */ + return NULL; + } cfg = GNUNET_CONFIGURATION_create (); - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_deserialize (cfg, (const char *) data, - (size_t) data_len, - GNUNET_NO)); + if (GNUNET_OK != + GNUNET_CONFIGURATION_deserialize (cfg, + (const char *) data, + (size_t) data_len, + NULL)) + { + GNUNET_free (data); + GNUNET_break_op (0); /* De-serialization failure */ + return NULL; + } GNUNET_free (data); return cfg; } @@ -2431,7 +2184,7 @@ GNUNET_TESTBED_parse_error_string_ (const struct { uint16_t msize; const char *emsg; - + msize = ntohs (msg->header.size); if (sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage) >= msize) return NULL; @@ -2454,127 +2207,268 @@ GNUNET_TESTBED_parse_error_string_ (const struct * @return the incremented operation id. */ uint64_t -GNUNET_TESTBED_get_next_op_id (struct GNUNET_TESTBED_Controller *controller) +GNUNET_TESTBED_get_next_op_id (struct GNUNET_TESTBED_Controller * controller) { - uint64_t op_id; + uint64_t op_id; op_id = (uint64_t) GNUNET_TESTBED_host_get_id_ (controller->host); - op_id = op_id << 32; + op_id = op_id << 32; op_id |= (uint64_t) controller->operation_counter++; return op_id; } /** - * Returns a timing slot which will be exclusively locked + * Function called when a shutdown peers operation is ready * - * @param c the controller handle - * @return the time slot index in the array of time slots in the controller - * handle + * @param cls the closure from GNUNET_TESTBED_operation_create_() */ -unsigned int -GNUNET_TESTBED_get_tslot_ (struct GNUNET_TESTBED_Controller *c, void *key) +static void +opstart_shutdown_peers (void *cls) { - unsigned int slot; + struct OperationContext *opc = cls; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_ShutdownPeersMessage *msg; - GNUNET_assert (NULL != c->tslots); - GNUNET_assert (NULL != key); - for (slot = 0; slot < c->num_parallel_connects; slot++) - if (NULL == c->tslots[slot].key) - { - c->tslots[slot].key = key; - return slot; - } - GNUNET_assert (0); /* We should always find a free tslot */ + opc->state = OPC_STATE_STARTED; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS); + msg->operation_id = GNUNET_htonll (opc->id); + GNUNET_TESTBED_insert_opc_ (opc->c, + opc); + GNUNET_MQ_send (opc->c->mq, + env); } +/** + * Callback which will be called when shutdown peers operation is released + * + * @param cls the closure from GNUNET_TESTBED_operation_create_() + */ static void -decide_npoc (struct GNUNET_TESTBED_Controller *c) +oprelease_shutdown_peers (void *cls) { - struct GNUNET_TIME_Relative avg; - int sd; - unsigned int slot; + struct OperationContext *opc = cls; - if (c->tslots_filled != c->num_parallel_connects) - return; - avg = GNUNET_TIME_UNIT_ZERO; - for (slot = 0; slot < c->num_parallel_connects; slot++) - avg = GNUNET_TIME_relative_add (avg, c->tslots[slot].time); - avg = GNUNET_TIME_relative_divide (avg, c->num_parallel_connects); - GNUNET_assert (GNUNET_TIME_UNIT_FOREVER_REL.rel_value != avg.rel_value); - sd = SD_deviation_factor (c->poc_sd, (unsigned int) avg.rel_value); - if (GNUNET_SYSERR == sd) - { - SD_add_data (c->poc_sd, (unsigned int) avg.rel_value); - GNUNET_TESTBED_set_num_parallel_overlay_connects_ (c, c->num_parallel_connects); - return; - } - GNUNET_assert (0 <= sd); - if (sd <= 1) - { - SD_add_data (c->poc_sd, (unsigned int) avg.rel_value); - GNUNET_TESTBED_set_num_parallel_overlay_connects_ - (c, c->num_parallel_connects * 2); - return; - } - if (1 == c->num_parallel_connects) + switch (opc->state) { - GNUNET_TESTBED_set_num_parallel_overlay_connects_ (c, 1); - return; + case OPC_STATE_STARTED: + GNUNET_TESTBED_remove_opc_ (opc->c, opc); + /* no break; continue */ + case OPC_STATE_INIT: + GNUNET_free (opc->data); + break; + case OPC_STATE_FINISHED: + break; } - GNUNET_TESTBED_set_num_parallel_overlay_connects_ - (c, c->num_parallel_connects / 2); + GNUNET_free (opc); } -int -GNUNET_TESTBED_release_time_slot_ (struct GNUNET_TESTBED_Controller *c, - unsigned int index, - void *key) +/** + * Stops and destroys all peers. Is equivalent of calling + * GNUNET_TESTBED_peer_stop() and GNUNET_TESTBED_peer_destroy() on all peers, + * except that the peer stop event and operation finished event corresponding to + * the respective functions are not generated. This function should be called + * when there are no other pending operations. If there are pending operations, + * it will return NULL + * + * @param c the controller to send this message to + * @param op_cls closure for the operation + * @param cb the callback to call when all peers are stopped and destroyed + * @param cb_cls the closure for the callback + * @return operation handle on success; NULL if any pending operations are + * present + */ +struct GNUNET_TESTBED_Operation * +GNUNET_TESTBED_shutdown_peers (struct GNUNET_TESTBED_Controller *c, + void *op_cls, + GNUNET_TESTBED_OperationCompletionCallback cb, + void *cb_cls) { - struct TimeSlot *slot; + struct OperationContext *opc; + struct ShutdownPeersData *data; - GNUNET_assert (NULL != key); - if (index >= c->num_parallel_connects) - return GNUNET_NO; - slot = &c->tslots[index]; - if (key != slot->key) - return GNUNET_NO; - slot->key = NULL; - return GNUNET_YES; + if (0 != GNUNET_CONTAINER_multihashmap32_size (c->opc_map)) + return NULL; + data = GNUNET_new (struct ShutdownPeersData); + data->cb = cb; + data->cb_cls = cb_cls; + opc = GNUNET_new (struct OperationContext); + opc->c = c; + opc->op_cls = op_cls; + opc->data = data; + opc->id = GNUNET_TESTBED_get_next_op_id (c); + opc->type = OP_SHUTDOWN_PEERS; + opc->state = OPC_STATE_INIT; + opc->op = GNUNET_TESTBED_operation_create_ (opc, &opstart_shutdown_peers, + &oprelease_shutdown_peers); + GNUNET_TESTBED_operation_queue_insert_ (opc->c->opq_parallel_operations, + opc->op); + GNUNET_TESTBED_operation_begin_wait_ (opc->op); + return opc->op; } /** - * Function to update a time slot + * Return the index of the peer inside of the total peer array, + * aka. the peer's "unique ID". * - * @param c the controller handle - * @param index the index of the time slot to update - * @param time the new time + * @param peer Peer handle. + * + * @return The peer's unique ID. + */ +uint32_t +GNUNET_TESTBED_get_index (const struct GNUNET_TESTBED_Peer *peer) +{ + return peer->unique_id; +} + + +/** + * Remove a barrier and it was the last one in the barrier hash map, destroy the + * hash map + * + * @param barrier the barrier to remove */ void -GNUNET_TESTBED_update_time_slot_ (struct GNUNET_TESTBED_Controller *c, - unsigned int index, - void *key, - struct GNUNET_TIME_Relative time) +GNUNET_TESTBED_barrier_remove_ (struct GNUNET_TESTBED_Barrier *barrier) { - struct GNUNET_TIME_Relative avg; - struct TimeSlot *slot; + struct GNUNET_TESTBED_Controller *c = barrier->c; - if (GNUNET_NO == GNUNET_TESTBED_release_time_slot_ (c, index, key)) - return; - slot = &c->tslots[index]; - if (GNUNET_TIME_UNIT_ZERO.rel_value == slot->time.rel_value) + GNUNET_assert (NULL != c->barrier_map); /* No barriers present */ + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (c->barrier_map, + &barrier->key, + barrier)); + GNUNET_free (barrier->name); + GNUNET_free (barrier); + if (0 == GNUNET_CONTAINER_multihashmap_size (c->barrier_map)) { - slot->time = time; - c->tslots_filled++; - decide_npoc (c); - return; + GNUNET_CONTAINER_multihashmap_destroy (c->barrier_map); + c->barrier_map = NULL; + } +} + + +/** + * Initialise a barrier and call the given callback when the required percentage + * of peers (quorum) reach the barrier OR upon error. + * + * @param controller the handle to the controller + * @param name identification name of the barrier + * @param quorum the percentage of peers that is required to reach the barrier. + * Peers signal reaching a barrier by calling + * GNUNET_TESTBED_barrier_reached(). + * @param cb the callback to call when the barrier is reached or upon error. + * Cannot be NULL. + * @param cls closure for the above callback + * @param echo GNUNET_YES to echo the barrier crossed status message back to the + * controller + * @return barrier handle; NULL upon error + */ +struct GNUNET_TESTBED_Barrier * +GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller, + const char *name, + unsigned int quorum, + GNUNET_TESTBED_barrier_status_cb cb, void *cls, + int echo) +{ + struct GNUNET_TESTBED_BarrierInit *msg; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_Barrier *barrier; + struct GNUNET_HashCode key; + size_t name_len; + + GNUNET_assert (quorum <= 100); + GNUNET_assert (NULL != cb); + name_len = strlen (name); + GNUNET_assert (0 < name_len); + GNUNET_CRYPTO_hash (name, name_len, &key); + if (NULL == controller->barrier_map) + controller->barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES); + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (controller->barrier_map, + &key)) + { + GNUNET_break (0); + return NULL; } - avg = GNUNET_TIME_relative_add (slot->time, time); - avg = GNUNET_TIME_relative_divide (avg, 2); - slot->time = avg; + LOG_DEBUG ("Initialising barrier `%s'\n", name); + barrier = GNUNET_new (struct GNUNET_TESTBED_Barrier); + barrier->c = controller; + barrier->name = GNUNET_strdup (name); + barrier->cb = cb; + barrier->cls = cls; + barrier->echo = echo; + GNUNET_memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (controller->barrier_map, + &barrier->key, + barrier, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + env = GNUNET_MQ_msg_extra (msg, + name_len, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT); + msg->quorum = (uint8_t) quorum; + GNUNET_memcpy (msg->name, + barrier->name, + name_len); + GNUNET_MQ_send (barrier->c->mq, + env); + return barrier; +} + + +/** + * Initialise a barrier and call the given callback when the required percentage + * of peers (quorum) reach the barrier OR upon error. + * + * @param controller the handle to the controller + * @param name identification name of the barrier + * @param quorum the percentage of peers that is required to reach the barrier. + * Peers signal reaching a barrier by calling + * GNUNET_TESTBED_barrier_reached(). + * @param cb the callback to call when the barrier is reached or upon error. + * Cannot be NULL. + * @param cls closure for the above callback + * @return barrier handle; NULL upon error + */ +struct GNUNET_TESTBED_Barrier * +GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller, + const char *name, + unsigned int quorum, + GNUNET_TESTBED_barrier_status_cb cb, void *cls) +{ + return GNUNET_TESTBED_barrier_init_ (controller, + name, quorum, cb, cls, GNUNET_YES); +} + + +/** + * Cancel a barrier. + * + * @param barrier the barrier handle + */ +void +GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier) +{ + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TESTBED_BarrierCancel *msg; + size_t slen; + + slen = strlen (barrier->name); + env = GNUNET_MQ_msg_extra (msg, + slen, + GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL); + GNUNET_memcpy (msg->name, + barrier->name, + slen); + GNUNET_MQ_send (barrier->c->mq, + env); + GNUNET_TESTBED_barrier_remove_ (barrier); }