/*
This file is part of GNUnet
- Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2008--2013 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @author Christian Grothoff
* @author Sree Harsha Totakura
*/
-
-
#include "platform.h"
#include "gnunet_testbed_service.h"
#include "gnunet_core_service.h"
#define TIMEOUT_REL TIME_REL_SECS(1)
-/**
- * The message queue for sending messages to the controller service
- */
-struct MessageQueue
-{
- /**
- * The message to be sent
- */
- struct GNUNET_MessageHeader *msg;
-
- /**
- * next pointer for DLL
- */
- struct MessageQueue *next;
-
- /**
- * prev pointer for DLL
- */
- struct MessageQueue *prev;
-};
-
-
/**
* Context data for forwarded Operation
*/
/**
* The callback to call when reply is available
*/
- GNUNET_CLIENT_MessageHandler cc;
+ GNUNET_MQ_MessageCallback cc;
/**
* The closure for the above callback
while (NULL != entry)
{
entry2 = entry->next;
- GNUNET_CONTAINER_DLL_remove (exop_head, exop_tail, entry);
+ GNUNET_CONTAINER_DLL_remove (exop_head,
+ exop_tail,
+ entry);
GNUNET_free (entry);
entry = entry2;
}
* @param cls the serach context
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
*/
static int
-opc_search_iterator (void *cls, uint32_t key, void *value)
+opc_search_iterator (void *cls,
+ uint32_t key,
+ void *value)
{
struct SearchContext *sc = cls;
struct OperationContext *opc = value;
}
+
+/**
+ * Check #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message is well-formed.
+ *
+ * @param cls the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_add_host_confirm (void *cls,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ const char *emsg;
+ uint16_t msg_size;
+
+ msg_size = ntohs (msg->header.size) - sizeof (*msg);
+ if (0 == msg_size)
+ return GNUNET_OK;
+ /* We have an error message */
+ emsg = (const char *) &msg[1];
+ if ('\0' != emsg[msg_size - 1])
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
+ * controller (testbed service)
+ *
+ * @param cls the controller handler
+ * @param msg message received
+ */
+static void
+handle_add_host_confirm (void *cls,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
+ struct GNUNET_TESTBED_HostRegistrationHandle *rh = c->rh;
+ const char *emsg;
+ uint16_t msg_size;
+
+ if (NULL == rh)
+ return;
+ if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id))
+ {
+ LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n",
+ GNUNET_TESTBED_host_get_id_ (rh->host),
+ ntohl (msg->host_id));
+ return;
+ }
+ c->rh = NULL;
+ msg_size = ntohs (msg->header.size) - sizeof (*msg);
+ if (0 == msg_size)
+ {
+ LOG_DEBUG ("Host %u successfully registered\n",
+ ntohl (msg->host_id));
+ GNUNET_TESTBED_mark_host_registered_at_ (rh->host,
+ c);
+ rh->cc (rh->cc_cls,
+ NULL);
+ GNUNET_free (rh);
+ return;
+ }
+ /* We have an error message */
+ emsg = (const char *) &msg[1];
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _("Adding host %u failed with error: %s\n"),
+ ntohl (msg->host_id),
+ emsg);
+ rh->cc (rh->cc_cls,
+ emsg);
+ GNUNET_free (rh);
+}
+
+
/**
* Handler for forwarded operations
*
* @param msg the message
*/
static void
-handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c,
+handle_forwarded_operation_msg (void *cls,
struct OperationContext *opc,
const struct GNUNET_MessageHeader *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct ForwardedOperationData *fo_data;
fo_data = opc->data;
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS message from
* controller (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
*/
-static int
-handle_opsuccess (struct GNUNET_TESTBED_Controller *c,
- const struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg)
+static void
+handle_opsuccess (void *cls,
+ const struct GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
GNUNET_TESTBED_OperationCompletionCallback op_comp_cb;
void *op_comp_cb_cls;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
event.op = opc->op;
switch (opc->type)
{
case OP_FORWARDED:
- {
- handle_forwarded_operation_msg (c, opc,
- (const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
- }
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return;
+ }
break;
case OP_PEER_DESTROY:
{
if (NULL != c->cc)
c->cc (c->cc_cls, &event);
if (GNUNET_NO == exop_check (event.op))
- return GNUNET_YES;
+ return;
}
else
LOG_DEBUG ("Not calling callback\n");
op_comp_cb (op_comp_cb_cls, event.op, NULL);
/* You could have marked the operation as done by now */
GNUNET_break (GNUNET_NO == exop_check (event.op));
- return GNUNET_YES;
}
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS message from
* controller (testbed service)
*
* @param c the controller handle
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
*/
-static int
-handle_peer_create_success (struct GNUNET_TESTBED_Controller *c,
- const struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg)
+static void
+handle_peer_create_success (void *cls,
+ const struct GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
struct PeerCreateData *data;
struct GNUNET_TESTBED_Peer *peer;
struct GNUNET_TESTBED_Operation *op;
GNUNET_TESTBED_PeerCreateCallback cb;
- void *cls;
+ void *cb_cls;
uint64_t op_id;
GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) ==
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_FORWARDED == opc->type)
{
handle_forwarded_operation_msg (c, opc,
(const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
+ return;
}
GNUNET_assert (OP_PEER_CREATE == opc->type);
GNUNET_assert (NULL != opc->data);
peer->state = TESTBED_PS_CREATED;
GNUNET_TESTBED_peer_register_ (peer);
cb = data->cb;
- cls = data->cls;
+ cb_cls = data->cls;
op = opc->op;
GNUNET_free (opc->data);
GNUNET_TESTBED_remove_opc_ (opc->c, opc);
opc->state = OPC_STATE_FINISHED;
exop_insert (op);
if (NULL != cb)
- cb (cls, peer, NULL);
+ cb (cb_cls, peer, NULL);
/* You could have marked the operation as done by now */
GNUNET_break (GNUNET_NO == exop_check (op));
- return GNUNET_YES;
}
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT message from
* controller (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
*/
-static int
-handle_peer_event (struct GNUNET_TESTBED_Controller *c,
+static void
+handle_peer_event (void *cls,
const struct GNUNET_TESTBED_PeerEventMessage *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
struct GNUNET_TESTBED_Peer *peer;
struct PeerEventData *data;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_FORWARDED == opc->type)
{
handle_forwarded_operation_msg (c, opc,
(const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
+ return;
}
GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type));
data = opc->data;
if (NULL != c->cc)
c->cc (c->cc_cls, &event);
if (GNUNET_NO == exop_check (event.op))
- return GNUNET_YES;
+ return;
}
if (NULL != pcc)
pcc (pcc_cls, NULL);
/* You could have marked the operation as done by now */
GNUNET_break (GNUNET_NO == exop_check (event.op));
- return GNUNET_YES;
}
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT message from
* controller (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
*/
-static int
-handle_peer_conevent (struct GNUNET_TESTBED_Controller *c,
+static void
+handle_peer_conevent (void *cls,
const struct GNUNET_TESTBED_ConnectionEventMessage *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
struct OverlayConnectData *data;
GNUNET_TESTBED_OperationCompletionCallback cb;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_FORWARDED == opc->type)
{
handle_forwarded_operation_msg (c, opc,
(const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
+ return;
}
GNUNET_assert (OP_OVERLAY_CONNECT == opc->type);
GNUNET_assert (NULL != (data = opc->data));
if (NULL != c->cc)
c->cc (c->cc_cls, &event);
if (GNUNET_NO == exop_check (event.op))
- return GNUNET_YES;
+ return;
}
if (NULL != cb)
cb (cb_cls, opc->op, NULL);
/* You could have marked the operation as done by now */
GNUNET_break (GNUNET_NO == exop_check (event.op));
- return GNUNET_YES;
}
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from
* controller (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
*/
static int
-handle_peer_config (struct GNUNET_TESTBED_Controller *c,
- const struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+check_peer_config (void *cls,
+ const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
{
+ /* anything goes? */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static void
+handle_peer_config (void *cls,
+ const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
struct GNUNET_TESTBED_Peer *peer;
struct PeerInfoData *data;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_FORWARDED == opc->type)
{
- handle_forwarded_operation_msg (c, opc,
- (const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
+ handle_forwarded_operation_msg (c,
+ opc,
+ &msg->header);
+ return;
}
data = opc->data;
GNUNET_assert (NULL != data);
{
case GNUNET_TESTBED_PIT_IDENTITY:
pinfo->result.id = GNUNET_new (struct GNUNET_PeerIdentity);
- (void) memcpy (pinfo->result.id, &msg->peer_identity,
+ GNUNET_memcpy (pinfo->result.id,
+ &msg->peer_identity,
sizeof (struct GNUNET_PeerIdentity));
break;
case GNUNET_TESTBED_PIT_CONFIGURATION:
/* We dont check whether the operation is marked as done here as the
operation contains data (cfg/identify) which will be freed at a later point
*/
- return GNUNET_YES;
}
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from
* controller (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
+ * @return #GNUNET_OK if message is well-formed
*/
static int
-handle_op_fail_event (struct GNUNET_TESTBED_Controller *c,
- const struct GNUNET_TESTBED_OperationFailureEventMessage
- *msg)
+check_op_fail_event (void *cls,
+ const struct GNUNET_TESTBED_OperationFailureEventMessage *msg)
{
+ /* we accept anything as a valid error message */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static void
+handle_op_fail_event (void *cls,
+ const struct GNUNET_TESTBED_OperationFailureEventMessage *msg)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
const char *emsg;
uint64_t op_id;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_FORWARDED == opc->type)
{
handle_forwarded_operation_msg (c, opc,
(const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
+ return;
}
GNUNET_TESTBED_remove_opc_ (opc->c, opc);
opc->state = OPC_STATE_FINISHED;
if (NULL != data->cb)
data->cb (data->cb_cls, opc->op, NULL, emsg);
GNUNET_free (data);
- return GNUNET_YES; /* We do not call controller callback for peer info */
+ return; /* We do not call controller callback for peer info */
}
event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
event.op = opc->op;
exop_insert (event.op);
c->cc (c->cc_cls, &event);
if (GNUNET_NO == exop_check (event.op))
- return GNUNET_YES;
+ return;
}
switch (opc->type)
{
case OP_PEER_CREATE:
- {
- struct PeerCreateData *data;
+ {
+ struct PeerCreateData *data;
- data = opc->data;
- GNUNET_free (data->peer);
- if (NULL != data->cb)
- data->cb (data->cls, NULL, emsg);
- GNUNET_free (data);
- }
+ data = opc->data;
+ GNUNET_free (data->peer);
+ if (NULL != data->cb)
+ data->cb (data->cls, NULL, emsg);
+ GNUNET_free (data);
+ }
break;
case OP_PEER_START:
case OP_PEER_STOP:
- {
- struct PeerEventData *data;
+ {
+ struct PeerEventData *data;
- data = opc->data;
- if (NULL != data->pcc)
- data->pcc (data->pcc_cls, emsg);
- GNUNET_free (data);
- }
+ data = opc->data;
+ if (NULL != data->pcc)
+ data->pcc (data->pcc_cls, emsg);
+ GNUNET_free (data);
+ }
break;
case OP_PEER_DESTROY:
break;
case OP_PEER_INFO:
GNUNET_assert (0);
case OP_OVERLAY_CONNECT:
- {
- struct OverlayConnectData *data;
+ {
+ struct OverlayConnectData *data;
- data = opc->data;
- GNUNET_TESTBED_operation_mark_failed (opc->op);
- if (NULL != data->cb)
- data->cb (data->cb_cls, opc->op, emsg);
- }
+ data = opc->data;
+ GNUNET_TESTBED_operation_mark_failed (opc->op);
+ if (NULL != data->cb)
+ data->cb (data->cb_cls, opc->op, emsg);
+ }
break;
case OP_FORWARDED:
GNUNET_assert (0);
case OP_LINK_CONTROLLERS: /* No secondary callback */
break;
case OP_SHUTDOWN_PEERS:
- {
- struct ShutdownPeersData *data;
+ {
+ struct ShutdownPeersData *data;
- data = opc->data;
- GNUNET_free (data); /* FIXME: Decide whether we call data->op_cb */
- opc->data = NULL;
- }
+ data = opc->data;
+ GNUNET_free (data); /* FIXME: Decide whether we call data->op_cb */
+ opc->data = NULL;
+ }
break;
case OP_MANAGE_SERVICE:
- {
- struct ManageServiceData *data = opc->data;
+ {
+ struct ManageServiceData *data = opc->data;
GNUNET_TESTBED_OperationCompletionCallback cb;
- void *cb_cls;
-
- GNUNET_assert (NULL != data);
- cb = data->cb;
- cb_cls = data->cb_cls;
- GNUNET_free (data);
- opc->data = NULL;
- exop_insert (event.op);
- if (NULL != cb)
- cb (cb_cls, opc->op, emsg);
- /* You could have marked the operation as done by now */
- GNUNET_break (GNUNET_NO == exop_check (event.op));
- }
+ void *cb_cls;
+
+ GNUNET_assert (NULL != data);
+ cb = data->cb;
+ cb_cls = data->cb_cls;
+ GNUNET_free (data);
+ opc->data = NULL;
+ exop_insert (event.op);
+ if (NULL != cb)
+ cb (cb_cls, opc->op, emsg);
+ /* You could have marked the operation as done by now */
+ GNUNET_break (GNUNET_NO == exop_check (event.op));
+ }
break;
default:
GNUNET_break (0);
}
- return GNUNET_YES;
}
}
+
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
- * (testbed service)
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_INFORMATION message from
+ * controller (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
*/
static int
-handle_slave_config (struct GNUNET_TESTBED_Controller *c,
+check_slave_config (void *cls,
+ const struct GNUNET_TESTBED_SlaveConfiguration *msg)
+{
+ /* anything goes? */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION message from controller
+ * (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static void
+handle_slave_config (void *cls,
const struct GNUNET_TESTBED_SlaveConfiguration *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
uint64_t op_id;
uint64_t mask;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_GET_SLAVE_CONFIG != opc->type)
{
GNUNET_break (0);
- return GNUNET_YES;
+ return;
}
opc->state = OPC_STATE_FINISHED;
GNUNET_TESTBED_remove_opc_ (opc->c, opc);
event.details.operation_finished.emsg = NULL;
c->cc (c->cc_cls, &event);
}
- return GNUNET_YES;
}
/**
- * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
+ * Check #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller
* (testbed service)
*
* @param c the controller handler
* @param msg message received
- * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
- * not
+ * @return #GNUNET_OK if @a msg is well-formed
*/
static int
-handle_link_controllers_result (struct GNUNET_TESTBED_Controller *c,
- const struct
- GNUNET_TESTBED_ControllerLinkResponse *msg)
+check_link_controllers_result (void *cls,
+ const struct GNUNET_TESTBED_ControllerLinkResponse *msg)
+{
+ /* actual check to be implemented */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller
+ * (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static void
+handle_link_controllers_result (void *cls,
+ const struct GNUNET_TESTBED_ControllerLinkResponse *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
struct ControllerLinkData *data;
struct GNUNET_CONFIGURATION_Handle *cfg;
if (NULL == (opc = find_opc (c, op_id)))
{
LOG_DEBUG ("Operation not found\n");
- return GNUNET_YES;
+ return;
}
if (OP_FORWARDED == opc->type)
{
handle_forwarded_operation_msg (c, opc,
(const struct GNUNET_MessageHeader *) msg);
- return GNUNET_YES;
+ return;
}
if (OP_LINK_CONTROLLERS != opc->type)
{
GNUNET_break (0);
- return GNUNET_YES;
+ return;
}
GNUNET_assert (NULL != (data = opc->data));
host = GNUNET_TESTBED_host_lookup_by_id_ (data->host_id);
emsg = GNUNET_malloc (ntohs (msg->header.size)
- sizeof (struct
GNUNET_TESTBED_ControllerLinkResponse) + 1);
- memcpy (emsg, &msg[1], ntohs (msg->header.size)
- - sizeof (struct
- GNUNET_TESTBED_ControllerLinkResponse));
+ GNUNET_memcpy (emsg,
+ &msg[1],
+ ntohs (msg->header.size)- sizeof (struct GNUNET_TESTBED_ControllerLinkResponse));
event.details.operation_finished.emsg = emsg;
}
else
if (NULL != cfg)
GNUNET_CONFIGURATION_destroy (cfg);
GNUNET_free_non_null (emsg);
- return GNUNET_YES;
}
/**
- * Handler for messages from controller (testbed service)
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS message.
*
- * @param cls the controller handler
- * @param msg message received, NULL on timeout or fatal error
+ * @param cls the controller handle to determine the connection this message
+ * belongs to
+ * @param msg the barrier status message
+ * @return #GNUNET_OK if the message is valid; #GNUNET_SYSERR to tear it
+ * down signalling an error (message malformed)
*/
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+static int
+check_barrier_status (void *cls,
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
{
- struct GNUNET_TESTBED_Controller *c = cls;
- int status;
uint16_t msize;
+ uint16_t name_len;
+ int status;
+ const char *name;
+ size_t emsg_len;
- c->in_receive = GNUNET_NO;
- /* FIXME: Add checks for message integrity */
- if (NULL == msg)
+ msize = ntohs (msg->header.size);
+ name = msg->data;
+ name_len = ntohs (msg->name_len);
+
+ if (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1 > msize)
{
- LOG_DEBUG ("Receive timed out or connection to service dropped\n");
- return;
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- msize = ntohs (msg->size);
- switch (ntohs (msg->type))
+ if ('\0' != name[name_len])
{
- case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS:
- GNUNET_assert (msize >=
- sizeof (struct GNUNET_TESTBED_HostConfirmedMessage));
- status =
- GNUNET_TESTBED_host_handle_addhostconfirm_
- (c, (const struct GNUNET_TESTBED_HostConfirmedMessage*) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS:
- GNUNET_assert (msize ==
- sizeof (struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage));
- status =
- handle_opsuccess (c,
- (const struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage *)
- msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT:
- GNUNET_assert (msize >=
- sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage));
- status =
- handle_op_fail_event (c,
- (const struct
- GNUNET_TESTBED_OperationFailureEventMessage *)
- msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS:
- GNUNET_assert (msize ==
- sizeof (struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage));
- status =
- handle_peer_create_success (c,
- (const struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage
- *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT:
- GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage));
- status =
- handle_peer_event (c,
- (const struct GNUNET_TESTBED_PeerEventMessage *)
- msg);
-
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION:
- GNUNET_assert (msize >=
- sizeof (struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage));
- status =
- handle_peer_config (c,
- (const struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage
- *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT:
- GNUNET_assert (msize ==
- sizeof (struct GNUNET_TESTBED_ConnectionEventMessage));
- status =
- handle_peer_conevent (c,
- (const struct
- GNUNET_TESTBED_ConnectionEventMessage *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
- GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration));
- status =
- handle_slave_config (c,
- (const struct GNUNET_TESTBED_SlaveConfiguration *)
- msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
- status =
- handle_link_controllers_result (c,
- (const struct
- GNUNET_TESTBED_ControllerLinkResponse
- *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS:
- status =
- GNUNET_TESTBED_handle_barrier_status_ (c,
- (const struct
- GNUNET_TESTBED_BarrierStatusMsg *)
- msg);
- break;
- default:
- GNUNET_assert (0);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive))
+ status = ntohs (msg->status);
+ if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)
{
- c->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (c->client, &message_handler, c,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len
+ + 1); /* +1!? */
+ if (0 == emsg_len)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
}
+ return GNUNET_OK;
}
/**
- * Function called to notify a client about the connection begin ready to queue
- * more data. "buf" will be NULL and "size" zero if the connection was closed
- * for writing in the meantime.
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls the controller handle to determine the connection this message
+ * belongs to
+ * @param msg the barrier status message
*/
-static size_t
-transmit_ready_notify (void *cls, size_t size, void *buf)
+static void
+handle_barrier_status (void *cls,
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
{
struct GNUNET_TESTBED_Controller *c = cls;
- struct MessageQueue *mq_entry;
-
- c->th = NULL;
- mq_entry = c->mq_head;
- GNUNET_assert (NULL != mq_entry);
- if ((0 == size) && (NULL == buf)) /* Timeout */
- {
- LOG_DEBUG ("Message sending timed out -- retrying\n");
- c->th =
- GNUNET_CLIENT_notify_transmit_ready (c->client,
- ntohs (mq_entry->msg->size),
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify, c);
- return 0;
- }
- GNUNET_assert (ntohs (mq_entry->msg->size) <= size);
- size = ntohs (mq_entry->msg->size);
- memcpy (buf, mq_entry->msg, size);
- LOG_DEBUG ("Message of type: %u and size: %u sent\n",
- ntohs (mq_entry->msg->type), size);
- GNUNET_free (mq_entry->msg);
- GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry);
- GNUNET_free (mq_entry);
- mq_entry = c->mq_head;
- if (NULL != mq_entry)
- c->th =
- GNUNET_CLIENT_notify_transmit_ready (c->client,
- ntohs (mq_entry->msg->size),
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify, c);
- if (GNUNET_NO == c->in_receive)
- {
- c->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (c->client, &message_handler, c,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return size;
+ struct GNUNET_TESTBED_Barrier *barrier;
+ char *emsg;
+ const char *name;
+ struct GNUNET_HashCode key;
+ size_t emsg_len;
+ int status;
+ uint16_t msize;
+ uint16_t name_len;
+
+ emsg = NULL;
+ barrier = NULL;
+ msize = ntohs (msg->header.size);
+ if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ name = msg->data;
+ name_len = ntohs (msg->name_len);
+ if (name_len >= //name_len is strlen(barrier_name)
+ (msize - ((sizeof msg->header) + sizeof (msg->status)) ) )
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ if ('\0' != name[name_len])
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ LOG_DEBUG ("Received BARRIER_STATUS msg\n");
+ status = ntohs (msg->status);
+ if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)
+ {
+ status = -1;
+ //unlike name_len, emsg_len includes the trailing zero
+ emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
+ + (name_len + 1));
+ if (0 == emsg_len)
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ if ('\0' != (msg->data[(name_len + 1) + (emsg_len - 1)]))
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ emsg = GNUNET_malloc (emsg_len);
+ GNUNET_memcpy (emsg,
+ msg->data + name_len + 1,
+ emsg_len);
+ }
+ if (NULL == c->barrier_map)
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ GNUNET_CRYPTO_hash (name, name_len, &key);
+ barrier = GNUNET_CONTAINER_multihashmap_get (c->barrier_map, &key);
+ if (NULL == barrier)
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ GNUNET_assert (NULL != barrier->cb);
+ if ((GNUNET_YES == barrier->echo) &&
+ (GNUNET_TESTBED_BARRIERSTATUS_CROSSED == status))
+ GNUNET_TESTBED_queue_message_ (c,
+ GNUNET_copy_message (&msg->header));
+ barrier->cb (barrier->cls,
+ name,
+ barrier,
+ status,
+ emsg);
+ if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED == status)
+ return; /* just initialised; skip cleanup */
+
+ cleanup:
+ GNUNET_free_non_null (emsg);
+ /**
+ * Do not remove the barrier if we did not echo the status back; this is
+ * required at the chained testbed controller setup to ensure the only the
+ * test-driver echos the status and the controller hierarchy properly
+ * propagates the status.
+ */
+ if ((NULL != barrier) && (GNUNET_YES == barrier->echo))
+ GNUNET_TESTBED_barrier_remove_ (barrier);
}
GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
struct GNUNET_MessageHeader *msg)
{
- struct MessageQueue *mq_entry;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *m2;
uint16_t type;
uint16_t size;
size = ntohs (msg->size);
GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) &&
(GNUNET_MESSAGE_TYPE_TESTBED_MAX > type));
- mq_entry = GNUNET_new (struct MessageQueue);
- mq_entry->msg = msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %u, size %u for sending\n", type,
- ntohs (msg->size));
- GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail,
- mq_entry);
- if (NULL == controller->th)
- controller->th =
- GNUNET_CLIENT_notify_transmit_ready (controller->client, size,
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify,
- controller);
+ env = GNUNET_MQ_msg_extra (m2,
+ size - sizeof (*m2),
+ type);
+ GNUNET_memcpy (m2, msg, size);
+ GNUNET_free (msg);
+ GNUNET_MQ_send (controller->mq,
+ env);
}
* operation
*/
struct OperationContext *
-GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
- *controller, uint64_t operation_id,
+GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller,
+ uint64_t operation_id,
const struct GNUNET_MessageHeader *msg,
- GNUNET_CLIENT_MessageHandler cc,
+ GNUNET_MQ_MessageCallback cc,
void *cc_cls)
{
struct OperationContext *opc;
struct ForwardedOperationData *data;
- struct GNUNET_MessageHeader *dup_msg;
- uint16_t msize;
-
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *m2;
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ env = GNUNET_MQ_msg_extra (m2,
+ size - sizeof (*m2),
+ type);
+ GNUNET_memcpy (m2,
+ msg,
+ size);
+ GNUNET_MQ_send (controller->mq,
+ env);
data = GNUNET_new (struct ForwardedOperationData);
data->cc = cc;
data->cc_cls = cc_cls;
opc->type = OP_FORWARDED;
opc->data = data;
opc->id = operation_id;
- msize = ntohs (msg->size);
- dup_msg = GNUNET_malloc (msize);
- (void) memcpy (dup_msg, msg, msize);
- GNUNET_TESTBED_queue_message_ (opc->c, dup_msg);
- GNUNET_TESTBED_insert_opc_ (controller, opc);
+ GNUNET_TESTBED_insert_opc_ (controller,
+ opc);
return opc;
}
void
GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc)
{
- GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+ GNUNET_TESTBED_remove_opc_ (opc->c,
+ opc);
GNUNET_free (opc->data);
GNUNET_free (opc);
}
}
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure, a `struct GNUNET_TESTBED_Controller *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ /* struct GNUNET_TESTBED_Controller *c = cls; */
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Encountered MQ error: %d\n",
+ error);
+ /* now what? */
+ GNUNET_SCHEDULER_shutdown (); /* seems most reasonable */
+}
+
+
/**
* Start a controller process using the given configuration at the
* given host.
GNUNET_TESTBED_ControllerCallback cc,
void *cc_cls)
{
- struct GNUNET_TESTBED_Controller *controller;
+ struct GNUNET_TESTBED_Controller *controller
+ = GNUNET_new (struct GNUNET_TESTBED_Controller);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (add_host_confirm,
+ GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS,
+ struct GNUNET_TESTBED_HostConfirmedMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (peer_conevent,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT,
+ struct GNUNET_TESTBED_ConnectionEventMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (opsuccess,
+ GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS,
+ struct GNUNET_TESTBED_GenericOperationSuccessEventMessage,
+ controller),
+ GNUNET_MQ_hd_var_size (op_fail_event,
+ GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT,
+ struct GNUNET_TESTBED_OperationFailureEventMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (peer_create_success,
+ GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS,
+ struct GNUNET_TESTBED_PeerCreateSuccessEventMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (peer_event,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT,
+ struct GNUNET_TESTBED_PeerEventMessage,
+ controller),
+ GNUNET_MQ_hd_var_size (peer_config,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION,
+ struct GNUNET_TESTBED_PeerConfigurationInformationMessage,
+ controller),
+ GNUNET_MQ_hd_var_size (slave_config,
+ GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
+ struct GNUNET_TESTBED_SlaveConfiguration,
+ controller),
+ GNUNET_MQ_hd_var_size (link_controllers_result,
+ GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
+ struct GNUNET_TESTBED_ControllerLinkResponse,
+ controller),
+ GNUNET_MQ_hd_var_size (barrier_status,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS,
+ struct GNUNET_TESTBED_BarrierStatusMsg,
+ controller),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_TESTBED_InitMessage *msg;
+ struct GNUNET_MQ_Envelope *env;
const struct GNUNET_CONFIGURATION_Handle *cfg;
const char *controller_hostname;
unsigned long long max_parallel_operations;
unsigned long long max_parallel_service_connections;
unsigned long long max_parallel_topology_config_operations;
+ size_t slen;
GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host)));
if (GNUNET_OK !=
&max_parallel_operations))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
if (GNUNET_OK !=
&max_parallel_service_connections))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
if (GNUNET_OK !=
&max_parallel_topology_config_operations))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
- controller = GNUNET_new (struct GNUNET_TESTBED_Controller);
controller->cc = cc;
controller->cc_cls = cc_cls;
controller->event_mask = event_mask;
controller->cfg = GNUNET_CONFIGURATION_dup (cfg);
- controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg);
- if (NULL == controller->client)
+ controller->mq = GNUNET_CLIENT_connect (controller->cfg,
+ "testbed",
+ handlers,
+ &mq_error_handler,
+ controller);
+ if (NULL == controller->mq)
{
+ GNUNET_break (0);
GNUNET_TESTBED_controller_disconnect (controller);
return NULL;
}
controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
if (NULL == controller_hostname)
controller_hostname = "127.0.0.1";
- msg =
- GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) +
- strlen (controller_hostname) + 1);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT);
- msg->header.size =
- htons (sizeof (struct GNUNET_TESTBED_InitMessage) +
- strlen (controller_hostname) + 1);
+ slen = strlen (controller_hostname) + 1;
+ env = GNUNET_MQ_msg_extra (msg,
+ slen,
+ GNUNET_MESSAGE_TYPE_TESTBED_INIT);
msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host));
msg->event_mask = GNUNET_htonll (controller->event_mask);
- strcpy ((char *) &msg[1], controller_hostname);
- GNUNET_TESTBED_queue_message_ (controller,
- (struct GNUNET_MessageHeader *) msg);
+ GNUNET_memcpy (&msg[1],
+ controller_hostname,
+ slen);
+ GNUNET_MQ_send (controller->mq,
+ env);
return controller;
}
* @param cls closure
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
*/
static int
opc_free_iterator (void *cls, uint32_t key, void *value)
* @param c handle to controller to stop
*/
void
-GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller
- *c)
+GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller *c)
{
- struct MessageQueue *mq_entry;
-
- if (NULL != c->th)
- GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
- /* Clear the message queue */
- while (NULL != (mq_entry = c->mq_head))
+ if (NULL != c->mq)
{
- GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail,
- mq_entry);
- GNUNET_free (mq_entry->msg);
- GNUNET_free (mq_entry);
+ GNUNET_MQ_destroy (c->mq);
+ c->mq = NULL;
}
- if (NULL != c->client)
- GNUNET_CLIENT_disconnect (c->client);
if (NULL != c->host)
GNUNET_TESTBED_deregister_host_at_ (c->host, c);
GNUNET_CONFIGURATION_destroy (c->cfg);
* @return the size of the xconfig
*/
size_t
-GNUNET_TESTBED_compress_config_ (const char *config, size_t size,
+GNUNET_TESTBED_compress_config_ (const char *config,
+ size_t size,
char **xconfig)
{
size_t xsize;
/**
* Generates configuration by uncompressing configuration in given message. The
* given message should be of the following types:
- * GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION,
- * GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
- * GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST,
- * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS,
- * GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
+ * #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION,
+ * #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
+ * #GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST,
+ * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS,
+ * #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
+ *
+ * FIXME: This API is incredibly ugly.
*
* @param msg the message containing compressed configuration
* @return handle to the parsed configuration; NULL upon error while parsing the message
}
cfg = GNUNET_CONFIGURATION_create ();
if (GNUNET_OK !=
- GNUNET_CONFIGURATION_deserialize (cfg, (const char *) data,
+ GNUNET_CONFIGURATION_deserialize (cfg,
+ (const char *) data,
(size_t) data_len,
- GNUNET_NO))
+ NULL))
{
GNUNET_free (data);
GNUNET_break_op (0); /* De-serialization failure */
opstart_shutdown_peers (void *cls)
{
struct OperationContext *opc = cls;
+ struct GNUNET_MQ_Envelope *env;
struct GNUNET_TESTBED_ShutdownPeersMessage *msg;
opc->state = OPC_STATE_STARTED;
- msg = GNUNET_new (struct GNUNET_TESTBED_ShutdownPeersMessage);
- msg->header.size =
- htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
msg->operation_id = GNUNET_htonll (opc->id);
- GNUNET_TESTBED_insert_opc_ (opc->c, opc);
- GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+ GNUNET_TESTBED_insert_opc_ (opc->c,
+ opc);
+ GNUNET_MQ_send (opc->c->mq,
+ env);
}
return peer->unique_id;
}
+
+/**
+ * Remove a barrier and it was the last one in the barrier hash map, destroy the
+ * hash map
+ *
+ * @param barrier the barrier to remove
+ */
+void
+GNUNET_TESTBED_barrier_remove_ (struct GNUNET_TESTBED_Barrier *barrier)
+{
+ struct GNUNET_TESTBED_Controller *c = barrier->c;
+
+ GNUNET_assert (NULL != c->barrier_map); /* No barriers present */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (c->barrier_map,
+ &barrier->key,
+ barrier));
+ GNUNET_free (barrier->name);
+ GNUNET_free (barrier);
+ if (0 == GNUNET_CONTAINER_multihashmap_size (c->barrier_map))
+ {
+ GNUNET_CONTAINER_multihashmap_destroy (c->barrier_map);
+ c->barrier_map = NULL;
+ }
+}
+
+
+/**
+ * Initialise a barrier and call the given callback when the required percentage
+ * of peers (quorum) reach the barrier OR upon error.
+ *
+ * @param controller the handle to the controller
+ * @param name identification name of the barrier
+ * @param quorum the percentage of peers that is required to reach the barrier.
+ * Peers signal reaching a barrier by calling
+ * GNUNET_TESTBED_barrier_reached().
+ * @param cb the callback to call when the barrier is reached or upon error.
+ * Cannot be NULL.
+ * @param cls closure for the above callback
+ * @param echo GNUNET_YES to echo the barrier crossed status message back to the
+ * controller
+ * @return barrier handle; NULL upon error
+ */
+struct GNUNET_TESTBED_Barrier *
+GNUNET_TESTBED_barrier_init_ (struct GNUNET_TESTBED_Controller *controller,
+ const char *name,
+ unsigned int quorum,
+ GNUNET_TESTBED_barrier_status_cb cb, void *cls,
+ int echo)
+{
+ struct GNUNET_TESTBED_BarrierInit *msg;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TESTBED_Barrier *barrier;
+ struct GNUNET_HashCode key;
+ size_t name_len;
+
+ GNUNET_assert (quorum <= 100);
+ GNUNET_assert (NULL != cb);
+ name_len = strlen (name);
+ GNUNET_assert (0 < name_len);
+ GNUNET_CRYPTO_hash (name, name_len, &key);
+ if (NULL == controller->barrier_map)
+ controller->barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_contains (controller->barrier_map,
+ &key))
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
+ LOG_DEBUG ("Initialising barrier `%s'\n", name);
+ barrier = GNUNET_new (struct GNUNET_TESTBED_Barrier);
+ barrier->c = controller;
+ barrier->name = GNUNET_strdup (name);
+ barrier->cb = cb;
+ barrier->cls = cls;
+ barrier->echo = echo;
+ GNUNET_memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode));
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (controller->barrier_map,
+ &barrier->key,
+ barrier,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+ env = GNUNET_MQ_msg_extra (msg,
+ name_len,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT);
+ msg->quorum = (uint8_t) quorum;
+ GNUNET_memcpy (msg->name,
+ barrier->name,
+ name_len);
+ GNUNET_MQ_send (barrier->c->mq,
+ env);
+ return barrier;
+}
+
+
+/**
+ * Initialise a barrier and call the given callback when the required percentage
+ * of peers (quorum) reach the barrier OR upon error.
+ *
+ * @param controller the handle to the controller
+ * @param name identification name of the barrier
+ * @param quorum the percentage of peers that is required to reach the barrier.
+ * Peers signal reaching a barrier by calling
+ * GNUNET_TESTBED_barrier_reached().
+ * @param cb the callback to call when the barrier is reached or upon error.
+ * Cannot be NULL.
+ * @param cls closure for the above callback
+ * @return barrier handle; NULL upon error
+ */
+struct GNUNET_TESTBED_Barrier *
+GNUNET_TESTBED_barrier_init (struct GNUNET_TESTBED_Controller *controller,
+ const char *name,
+ unsigned int quorum,
+ GNUNET_TESTBED_barrier_status_cb cb, void *cls)
+{
+ return GNUNET_TESTBED_barrier_init_ (controller,
+ name, quorum, cb, cls, GNUNET_YES);
+}
+
+
+/**
+ * Cancel a barrier.
+ *
+ * @param barrier the barrier handle
+ */
+void
+GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TESTBED_BarrierCancel *msg;
+ size_t slen;
+
+ slen = strlen (barrier->name);
+ env = GNUNET_MQ_msg_extra (msg,
+ slen,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL);
+ GNUNET_memcpy (msg->name,
+ barrier->name,
+ slen);
+ GNUNET_MQ_send (barrier->c->mq,
+ env);
+ GNUNET_TESTBED_barrier_remove_ (barrier);
+}
+
+
/* end of testbed_api.c */