#define TIMEOUT_REL TIME_REL_SECS(1)
-/**
- * The message queue for sending messages to the controller service
- */
-struct MessageQueue
-{
- /**
- * The message to be sent
- */
- struct GNUNET_MessageHeader *msg;
-
- /**
- * next pointer for DLL
- */
- struct MessageQueue *next;
-
- /**
- * prev pointer for DLL
- */
- struct MessageQueue *prev;
-};
-
-
/**
* Context data for forwarded Operation
*/
/**
* The callback to call when reply is available
*/
- GNUNET_CLIENT_MessageHandler cc;
+ GNUNET_MQ_MessageCallback cc;
/**
* The closure for the above callback
* @param cls the serach context
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
*/
static int
-opc_search_iterator (void *cls, uint32_t key, void *value)
+opc_search_iterator (void *cls,
+ uint32_t key,
+ void *value)
{
struct SearchContext *sc = cls;
struct OperationContext *opc = value;
}
+
+/**
+ * Check #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message is well-formed.
+ *
+ * @param cls the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_add_host_confirm (void *cls,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ const char *emsg;
+ uint16_t msg_size;
+
+ msg_size = ntohs (msg->header.size) - sizeof (*msg);
+ if (0 == msg_size)
+ return GNUNET_OK;
+ /* We have an error message */
+ emsg = (const char *) &msg[1];
+ if ('\0' != emsg[msg_size - 1])
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
+ * controller (testbed service)
+ *
+ * @param cls the controller handler
+ * @param msg message received
+ */
+static void
+handle_add_host_confirm (void *cls,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
+ struct GNUNET_TESTBED_HostRegistrationHandle *rh = c->rh;
+ const char *emsg;
+ uint16_t msg_size;
+
+ if (NULL == rh)
+ return;
+ if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id))
+ {
+ LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n",
+ GNUNET_TESTBED_host_get_id_ (rh->host),
+ ntohl (msg->host_id));
+ return;
+ }
+ c->rh = NULL;
+ msg_size = ntohs (msg->header.size) - sizeof (*msg);
+ if (0 == msg_size)
+ {
+ LOG_DEBUG ("Host %u successfully registered\n",
+ ntohl (msg->host_id));
+ GNUNET_TESTBED_mark_host_registered_at_ (rh->host,
+ c);
+ rh->cc (rh->cc_cls,
+ NULL);
+ GNUNET_free (rh);
+ return;
+ }
+ /* We have an error message */
+ emsg = (const char *) &msg[1];
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _("Adding host %u failed with error: %s\n"),
+ ntohl (msg->host_id),
+ emsg);
+ rh->cc (rh->cc_cls,
+ emsg);
+ GNUNET_free (rh);
+}
+
+
/**
* Handler for forwarded operations
*
}
+/**
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static int
+check_peer_config (void *cls,
+ const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+{
+ /* anything goes? */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from
* controller (testbed service)
*/
static void
handle_peer_config (void *cls,
- const struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+ const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
{
struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
}
if (OP_FORWARDED == opc->type)
{
- handle_forwarded_operation_msg (c, opc,
- (const struct GNUNET_MessageHeader *) msg);
+ handle_forwarded_operation_msg (c,
+ opc,
+ &msg->header);
return;
}
data = opc->data;
{
case GNUNET_TESTBED_PIT_IDENTITY:
pinfo->result.id = GNUNET_new (struct GNUNET_PeerIdentity);
- (void) memcpy (pinfo->result.id,
+ GNUNET_memcpy (pinfo->result.id,
&msg->peer_identity,
sizeof (struct GNUNET_PeerIdentity));
break;
}
+/**
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_op_fail_event (void *cls,
+ const struct GNUNET_TESTBED_OperationFailureEventMessage *msg)
+{
+ /* we accept anything as a valid error message */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from
* controller (testbed service)
}
+
+/**
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_INFORMATION message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static int
+check_slave_config (void *cls,
+ const struct GNUNET_TESTBED_SlaveConfiguration *msg)
+{
+ /* anything goes? */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION message from controller
* (testbed service)
}
+/**
+ * Check #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller
+ * (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+static int
+check_link_controllers_result (void *cls,
+ const struct GNUNET_TESTBED_ControllerLinkResponse *msg)
+{
+ /* actual check to be implemented */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller
* (testbed service)
*/
static void
handle_link_controllers_result (void *cls,
- const struct
- GNUNET_TESTBED_ControllerLinkResponse *msg)
+ const struct GNUNET_TESTBED_ControllerLinkResponse *msg)
{
struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
emsg = GNUNET_malloc (ntohs (msg->header.size)
- sizeof (struct
GNUNET_TESTBED_ControllerLinkResponse) + 1);
- memcpy (emsg, &msg[1], ntohs (msg->header.size)
- - sizeof (struct
- GNUNET_TESTBED_ControllerLinkResponse));
+ GNUNET_memcpy (emsg,
+ &msg[1],
+ ntohs (msg->header.size)- sizeof (struct GNUNET_TESTBED_ControllerLinkResponse));
event.details.operation_finished.emsg = emsg;
}
else
* down signalling an error (message malformed)
*/
static int
-check_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
- const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
+check_barrier_status (void *cls,
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
{
uint16_t msize;
uint16_t name_len;
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages
*
- * @param c the controller handle to determine the connection this message
+ * @param cls the controller handle to determine the connection this message
* belongs to
* @param msg the barrier status message
*/
static void
-handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
- const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
+handle_barrier_status (void *cls,
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct GNUNET_TESTBED_Barrier *barrier;
char *emsg;
const char *name;
emsg = NULL;
barrier = NULL;
msize = ntohs (msg->header.size);
+ if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
name = msg->data;
name_len = ntohs (msg->name_len);
+ if (name_len >= //name_len is strlen(barrier_name)
+ (msize - ((sizeof msg->header) + sizeof (msg->status)) ) )
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ if ('\0' != name[name_len])
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
LOG_DEBUG ("Received BARRIER_STATUS msg\n");
status = ntohs (msg->status);
if (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status)
{
status = -1;
- emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len
- + 1);
- emsg = GNUNET_malloc (emsg_len + 1);
- memcpy (emsg,
- msg->data + name_len + 1,
- emsg_len);
+ //unlike name_len, emsg_len includes the trailing zero
+ emsg_len = msize - (sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
+ + (name_len + 1));
+ if (0 == emsg_len)
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ if ('\0' != (msg->data[(name_len + 1) + (emsg_len - 1)]))
+ {
+ GNUNET_break_op (0);
+ goto cleanup;
+ }
+ emsg = GNUNET_malloc (emsg_len);
+ GNUNET_memcpy (emsg,
+ msg->data + name_len + 1,
+ emsg_len);
}
if (NULL == c->barrier_map)
{
GNUNET_assert (NULL != barrier->cb);
if ((GNUNET_YES == barrier->echo) &&
(GNUNET_TESTBED_BARRIERSTATUS_CROSSED == status))
- GNUNET_TESTBED_queue_message_ (c, GNUNET_copy_message (&msg->header));
- barrier->cb (barrier->cls, name, barrier, status, emsg);
+ GNUNET_TESTBED_queue_message_ (c,
+ GNUNET_copy_message (&msg->header));
+ barrier->cb (barrier->cls,
+ name,
+ barrier,
+ status,
+ emsg);
if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED == status)
return; /* just initialised; skip cleanup */
cleanup:
GNUNET_free_non_null (emsg);
- if (NULL != barrier)
+ /**
+ * Do not remove the barrier if we did not echo the status back; this is
+ * required at the chained testbed controller setup to ensure the only the
+ * test-driver echos the status and the controller hierarchy properly
+ * propagates the status.
+ */
+ if ((NULL != barrier) && (GNUNET_YES == barrier->echo))
GNUNET_TESTBED_barrier_remove_ (barrier);
}
-/**
- * Handler for messages from controller (testbed service)
- *
- * @param cls the controller handler
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
-{
- struct GNUNET_TESTBED_Controller *c = cls;
- int status;
- uint16_t msize;
-
- c->in_receive = GNUNET_NO;
- /* FIXME: Add checks for message integrity */
- if (NULL == msg)
- {
- LOG_DEBUG ("Receive timed out or connection to service dropped\n");
- return;
- }
- msize = ntohs (msg->size);
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS:
- GNUNET_assert (msize >=
- sizeof (struct GNUNET_TESTBED_HostConfirmedMessage));
- status =
- GNUNET_TESTBED_host_handle_addhostconfirm_
- (c, (const struct GNUNET_TESTBED_HostConfirmedMessage*) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS:
- GNUNET_assert (msize ==
- sizeof (struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage));
- handle_opsuccess (c,
- (const struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage *)
- msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT:
- GNUNET_assert (msize >=
- sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage));
- handle_op_fail_event (c,
- (const struct
- GNUNET_TESTBED_OperationFailureEventMessage *)
- msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS:
- GNUNET_assert (msize ==
- sizeof (struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage));
- handle_peer_create_success (c,
- (const struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage
- *) msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT:
- GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage));
- handle_peer_event (c,
- (const struct GNUNET_TESTBED_PeerEventMessage *)
- msg);
-
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION:
- GNUNET_assert (msize >=
- sizeof (struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage));
- handle_peer_config (c,
- (const struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage
- *) msg);
- status = GNUNET_YES;
-
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT:
- GNUNET_assert (msize ==
- sizeof (struct GNUNET_TESTBED_ConnectionEventMessage));
- handle_peer_conevent (c,
- (const struct
- GNUNET_TESTBED_ConnectionEventMessage *) msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
- GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration));
- handle_slave_config (c,
- (const struct GNUNET_TESTBED_SlaveConfiguration *)
- msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
- handle_link_controllers_result (c,
- (const struct
- GNUNET_TESTBED_ControllerLinkResponse
- *) msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS:
- status = check_barrier_status_ (c,
- (const struct GNUNET_TESTBED_BarrierStatusMsg *) msg);
- if (GNUNET_OK == status)
- handle_barrier_status_ (c,
- (const struct
- GNUNET_TESTBED_BarrierStatusMsg *)
- msg);
- break;
- default:
- GNUNET_assert (0);
- }
- if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive))
- {
- c->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (c->client, &message_handler, c,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
-}
-
-
-/**
- * Function called to notify a client about the connection begin ready to queue
- * more data. "buf" will be NULL and "size" zero if the connection was closed
- * for writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_ready_notify (void *cls, size_t size, void *buf)
-{
- struct GNUNET_TESTBED_Controller *c = cls;
- struct MessageQueue *mq_entry;
-
- c->th = NULL;
- mq_entry = c->mq_head;
- GNUNET_assert (NULL != mq_entry);
- if ((0 == size) && (NULL == buf)) /* Timeout */
- {
- LOG_DEBUG ("Message sending timed out -- retrying\n");
- c->th =
- GNUNET_CLIENT_notify_transmit_ready (c->client,
- ntohs (mq_entry->msg->size),
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify, c);
- return 0;
- }
- GNUNET_assert (ntohs (mq_entry->msg->size) <= size);
- size = ntohs (mq_entry->msg->size);
- memcpy (buf, mq_entry->msg, size);
- LOG_DEBUG ("Message of type: %u and size: %u sent\n",
- ntohs (mq_entry->msg->type), size);
- GNUNET_free (mq_entry->msg);
- GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry);
- GNUNET_free (mq_entry);
- mq_entry = c->mq_head;
- if (NULL != mq_entry)
- c->th =
- GNUNET_CLIENT_notify_transmit_ready (c->client,
- ntohs (mq_entry->msg->size),
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify, c);
- if (GNUNET_NO == c->in_receive)
- {
- c->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (c->client, &message_handler, c,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return size;
-}
-
-
/**
* Queues a message in send queue for sending to the service
*
GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
struct GNUNET_MessageHeader *msg)
{
- struct MessageQueue *mq_entry;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *m2;
uint16_t type;
uint16_t size;
size = ntohs (msg->size);
GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) &&
(GNUNET_MESSAGE_TYPE_TESTBED_MAX > type));
- mq_entry = GNUNET_new (struct MessageQueue);
- mq_entry->msg = msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %u, size %u for sending\n", type,
- ntohs (msg->size));
- GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail,
- mq_entry);
- if (NULL == controller->th)
- controller->th =
- GNUNET_CLIENT_notify_transmit_ready (controller->client, size,
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify,
- controller);
+ env = GNUNET_MQ_msg_extra (m2,
+ size - sizeof (*m2),
+ type);
+ GNUNET_memcpy (m2, msg, size);
+ GNUNET_free (msg);
+ GNUNET_MQ_send (controller->mq,
+ env);
}
* operation
*/
struct OperationContext *
-GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller
- *controller, uint64_t operation_id,
+GNUNET_TESTBED_forward_operation_msg_ (struct GNUNET_TESTBED_Controller *controller,
+ uint64_t operation_id,
const struct GNUNET_MessageHeader *msg,
- GNUNET_CLIENT_MessageHandler cc,
+ GNUNET_MQ_MessageCallback cc,
void *cc_cls)
{
struct OperationContext *opc;
struct ForwardedOperationData *data;
- struct GNUNET_MessageHeader *dup_msg;
- uint16_t msize;
-
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *m2;
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ env = GNUNET_MQ_msg_extra (m2,
+ size - sizeof (*m2),
+ type);
+ GNUNET_memcpy (m2,
+ msg,
+ size);
+ GNUNET_MQ_send (controller->mq,
+ env);
data = GNUNET_new (struct ForwardedOperationData);
data->cc = cc;
data->cc_cls = cc_cls;
opc->type = OP_FORWARDED;
opc->data = data;
opc->id = operation_id;
- msize = ntohs (msg->size);
- dup_msg = GNUNET_malloc (msize);
- (void) memcpy (dup_msg, msg, msize);
- GNUNET_TESTBED_queue_message_ (opc->c, dup_msg);
- GNUNET_TESTBED_insert_opc_ (controller, opc);
+ GNUNET_TESTBED_insert_opc_ (controller,
+ opc);
return opc;
}
void
GNUNET_TESTBED_forward_operation_msg_cancel_ (struct OperationContext *opc)
{
- GNUNET_TESTBED_remove_opc_ (opc->c, opc);
+ GNUNET_TESTBED_remove_opc_ (opc->c,
+ opc);
GNUNET_free (opc->data);
GNUNET_free (opc);
}
}
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure, a `struct GNUNET_TESTBED_Controller *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ /* struct GNUNET_TESTBED_Controller *c = cls; */
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Encountered MQ error: %d\n",
+ error);
+ /* now what? */
+ GNUNET_SCHEDULER_shutdown (); /* seems most reasonable */
+}
+
+
/**
* Start a controller process using the given configuration at the
* given host.
GNUNET_TESTBED_ControllerCallback cc,
void *cc_cls)
{
- struct GNUNET_TESTBED_Controller *controller;
+ struct GNUNET_TESTBED_Controller *controller
+ = GNUNET_new (struct GNUNET_TESTBED_Controller);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (add_host_confirm,
+ GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS,
+ struct GNUNET_TESTBED_HostConfirmedMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (peer_conevent,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT,
+ struct GNUNET_TESTBED_ConnectionEventMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (opsuccess,
+ GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS,
+ struct GNUNET_TESTBED_GenericOperationSuccessEventMessage,
+ controller),
+ GNUNET_MQ_hd_var_size (op_fail_event,
+ GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT,
+ struct GNUNET_TESTBED_OperationFailureEventMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (peer_create_success,
+ GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS,
+ struct GNUNET_TESTBED_PeerCreateSuccessEventMessage,
+ controller),
+ GNUNET_MQ_hd_fixed_size (peer_event,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT,
+ struct GNUNET_TESTBED_PeerEventMessage,
+ controller),
+ GNUNET_MQ_hd_var_size (peer_config,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION,
+ struct GNUNET_TESTBED_PeerConfigurationInformationMessage,
+ controller),
+ GNUNET_MQ_hd_var_size (slave_config,
+ GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
+ struct GNUNET_TESTBED_SlaveConfiguration,
+ controller),
+ GNUNET_MQ_hd_var_size (link_controllers_result,
+ GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
+ struct GNUNET_TESTBED_ControllerLinkResponse,
+ controller),
+ GNUNET_MQ_hd_var_size (barrier_status,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS,
+ struct GNUNET_TESTBED_BarrierStatusMsg,
+ controller),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_TESTBED_InitMessage *msg;
+ struct GNUNET_MQ_Envelope *env;
const struct GNUNET_CONFIGURATION_Handle *cfg;
const char *controller_hostname;
unsigned long long max_parallel_operations;
unsigned long long max_parallel_service_connections;
unsigned long long max_parallel_topology_config_operations;
+ size_t slen;
GNUNET_assert (NULL != (cfg = GNUNET_TESTBED_host_get_cfg_ (host)));
if (GNUNET_OK !=
&max_parallel_operations))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
if (GNUNET_OK !=
&max_parallel_service_connections))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
if (GNUNET_OK !=
&max_parallel_topology_config_operations))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
- controller = GNUNET_new (struct GNUNET_TESTBED_Controller);
controller->cc = cc;
controller->cc_cls = cc_cls;
controller->event_mask = event_mask;
controller->cfg = GNUNET_CONFIGURATION_dup (cfg);
- controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg);
- if (NULL == controller->client)
+ controller->mq = GNUNET_CLIENT_connect (controller->cfg,
+ "testbed",
+ handlers,
+ &mq_error_handler,
+ controller);
+ if (NULL == controller->mq)
{
+ GNUNET_break (0);
GNUNET_TESTBED_controller_disconnect (controller);
return NULL;
}
controller_hostname = GNUNET_TESTBED_host_get_hostname (host);
if (NULL == controller_hostname)
controller_hostname = "127.0.0.1";
- msg =
- GNUNET_malloc (sizeof (struct GNUNET_TESTBED_InitMessage) +
- strlen (controller_hostname) + 1);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_INIT);
- msg->header.size =
- htons (sizeof (struct GNUNET_TESTBED_InitMessage) +
- strlen (controller_hostname) + 1);
+ slen = strlen (controller_hostname) + 1;
+ env = GNUNET_MQ_msg_extra (msg,
+ slen,
+ GNUNET_MESSAGE_TYPE_TESTBED_INIT);
msg->host_id = htonl (GNUNET_TESTBED_host_get_id_ (host));
msg->event_mask = GNUNET_htonll (controller->event_mask);
- strcpy ((char *) &msg[1], controller_hostname);
- GNUNET_TESTBED_queue_message_ (controller,
- (struct GNUNET_MessageHeader *) msg);
+ GNUNET_memcpy (&msg[1],
+ controller_hostname,
+ slen);
+ GNUNET_MQ_send (controller->mq,
+ env);
return controller;
}
* @param cls closure
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
*/
static int
opc_free_iterator (void *cls, uint32_t key, void *value)
void
GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller *c)
{
- struct MessageQueue *mq_entry;
-
- if (NULL != c->th)
- GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
- /* Clear the message queue */
- while (NULL != (mq_entry = c->mq_head))
+ if (NULL != c->mq)
{
- GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail,
- mq_entry);
- GNUNET_free (mq_entry->msg);
- GNUNET_free (mq_entry);
+ GNUNET_MQ_destroy (c->mq);
+ c->mq = NULL;
}
- if (NULL != c->client)
- GNUNET_CLIENT_disconnect (c->client);
if (NULL != c->host)
GNUNET_TESTBED_deregister_host_at_ (c->host, c);
GNUNET_CONFIGURATION_destroy (c->cfg);
* @return the size of the xconfig
*/
size_t
-GNUNET_TESTBED_compress_config_ (const char *config, size_t size,
+GNUNET_TESTBED_compress_config_ (const char *config,
+ size_t size,
char **xconfig)
{
size_t xsize;
* #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS,
* #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
*
+ * FIXME: This API is incredibly ugly.
+ *
* @param msg the message containing compressed configuration
* @return handle to the parsed configuration; NULL upon error while parsing the message
*/
opstart_shutdown_peers (void *cls)
{
struct OperationContext *opc = cls;
+ struct GNUNET_MQ_Envelope *env;
struct GNUNET_TESTBED_ShutdownPeersMessage *msg;
opc->state = OPC_STATE_STARTED;
- msg = GNUNET_new (struct GNUNET_TESTBED_ShutdownPeersMessage);
- msg->header.size =
- htons (sizeof (struct GNUNET_TESTBED_ShutdownPeersMessage));
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_TESTBED_SHUTDOWN_PEERS);
msg->operation_id = GNUNET_htonll (opc->id);
- GNUNET_TESTBED_insert_opc_ (opc->c, opc);
- GNUNET_TESTBED_queue_message_ (opc->c, &msg->header);
+ GNUNET_TESTBED_insert_opc_ (opc->c,
+ opc);
+ GNUNET_MQ_send (opc->c->mq,
+ env);
}
int echo)
{
struct GNUNET_TESTBED_BarrierInit *msg;
+ struct GNUNET_MQ_Envelope *env;
struct GNUNET_TESTBED_Barrier *barrier;
struct GNUNET_HashCode key;
size_t name_len;
- uint16_t msize;
GNUNET_assert (quorum <= 100);
GNUNET_assert (NULL != cb);
barrier->cb = cb;
barrier->cls = cls;
barrier->echo = echo;
- (void) memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode));
+ GNUNET_memcpy (&barrier->key, &key, sizeof (struct GNUNET_HashCode));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (controller->barrier_map,
&barrier->key,
barrier,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
- msize = name_len + sizeof (struct GNUNET_TESTBED_BarrierInit);
- msg = GNUNET_malloc (msize);
- msg->header.size = htons (msize);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT);
+
+ env = GNUNET_MQ_msg_extra (msg,
+ name_len,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT);
msg->quorum = (uint8_t) quorum;
- (void) memcpy (msg->name, barrier->name, name_len);
- GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header);
+ GNUNET_memcpy (msg->name,
+ barrier->name,
+ name_len);
+ GNUNET_MQ_send (barrier->c->mq,
+ env);
return barrier;
}
void
GNUNET_TESTBED_barrier_cancel (struct GNUNET_TESTBED_Barrier *barrier)
{
+ struct GNUNET_MQ_Envelope *env;
struct GNUNET_TESTBED_BarrierCancel *msg;
- uint16_t msize;
-
- msize = sizeof (struct GNUNET_TESTBED_BarrierCancel) + strlen (barrier->name);
- msg = GNUNET_malloc (msize);
- msg->header.size = htons (msize);
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL);
- (void) memcpy (msg->name, barrier->name, strlen (barrier->name));
- GNUNET_TESTBED_queue_message_ (barrier->c, &msg->header);
+ size_t slen;
+
+ slen = strlen (barrier->name);
+ env = GNUNET_MQ_msg_extra (msg,
+ slen,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL);
+ GNUNET_memcpy (msg->name,
+ barrier->name,
+ slen);
+ GNUNET_MQ_send (barrier->c->mq,
+ env);
GNUNET_TESTBED_barrier_remove_ (barrier);
}
-
-
-
/* end of testbed_api.c */