#define TIMEOUT_REL TIME_REL_SECS(1)
-/**
- * The message queue for sending messages to the controller service
- */
-struct MessageQueue
-{
- /**
- * The message to be sent
- */
- struct GNUNET_MessageHeader *msg;
-
- /**
- * next pointer for DLL
- */
- struct MessageQueue *next;
-
- /**
- * prev pointer for DLL
- */
- struct MessageQueue *prev;
-};
-
-
/**
* Context data for forwarded Operation
*/
* @param cls the serach context
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
*/
static int
-opc_search_iterator (void *cls, uint32_t key, void *value)
+opc_search_iterator (void *cls,
+ uint32_t key,
+ void *value)
{
struct SearchContext *sc = cls;
struct OperationContext *opc = value;
}
+
+/**
+ * Check #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message is well-formed.
+ *
+ * @param cls the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_add_host_confirm (void *cls,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ const char *emsg;
+ uint16_t msg_size;
+
+ msg_size = ntohs (msg->header.size) - sizeof (*msg);
+ if (0 == msg_size)
+ return GNUNET_OK;
+ /* We have an error message */
+ emsg = (const char *) &msg[1];
+ if ('\0' != emsg[msg_size - 1])
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for #GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
+ * controller (testbed service)
+ *
+ * @param cls the controller handler
+ * @param msg message received
+ */
+static void
+handle_add_host_confirm (void *cls,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
+ struct GNUNET_TESTBED_HostRegistrationHandle *rh = c->rh;
+ const char *emsg;
+ uint16_t msg_size;
+
+ if (NULL == rh)
+ return;
+ if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id))
+ {
+ LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n",
+ GNUNET_TESTBED_host_get_id_ (rh->host),
+ ntohl (msg->host_id));
+ return;
+ }
+ c->rh = NULL;
+ msg_size = ntohs (msg->header.size) - sizeof (*msg);
+ if (0 == msg_size)
+ {
+ LOG_DEBUG ("Host %u successfully registered\n",
+ ntohl (msg->host_id));
+ GNUNET_TESTBED_mark_host_registered_at_ (rh->host,
+ c);
+ rh->cc (rh->cc_cls,
+ NULL);
+ GNUNET_free (rh);
+ return;
+ }
+ /* We have an error message */
+ emsg = (const char *) &msg[1];
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ _("Adding host %u failed with error: %s\n"),
+ ntohl (msg->host_id),
+ emsg);
+ rh->cc (rh->cc_cls,
+ emsg);
+ GNUNET_free (rh);
+}
+
+
/**
* Handler for forwarded operations
*
}
+/**
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static int
+check_peer_config (void *cls,
+ const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+{
+ /* anything goes? */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION message from
* controller (testbed service)
*/
static void
handle_peer_config (void *cls,
- const struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+ const struct GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
{
struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
}
if (OP_FORWARDED == opc->type)
{
- handle_forwarded_operation_msg (c, opc,
- (const struct GNUNET_MessageHeader *) msg);
+ handle_forwarded_operation_msg (c,
+ opc,
+ &msg->header);
return;
}
data = opc->data;
}
+/**
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_op_fail_event (void *cls,
+ const struct GNUNET_TESTBED_OperationFailureEventMessage *msg)
+{
+ /* we accept anything as a valid error message */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT message from
* controller (testbed service)
}
+
+/**
+ * Validate #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_INFORMATION message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ */
+static int
+check_slave_config (void *cls,
+ const struct GNUNET_TESTBED_SlaveConfiguration *msg)
+{
+ /* anything goes? */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION message from controller
* (testbed service)
}
+/**
+ * Check #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller
+ * (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+static int
+check_link_controllers_result (void *cls,
+ const struct GNUNET_TESTBED_ControllerLinkResponse *msg)
+{
+ /* actual check to be implemented */
+ return GNUNET_OK;
+}
+
+
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT message from controller
* (testbed service)
*/
static void
handle_link_controllers_result (void *cls,
- const struct
- GNUNET_TESTBED_ControllerLinkResponse *msg)
+ const struct GNUNET_TESTBED_ControllerLinkResponse *msg)
{
struct GNUNET_TESTBED_Controller *c = cls;
struct OperationContext *opc;
* down signalling an error (message malformed)
*/
static int
-check_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
- const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
+check_barrier_status (void *cls,
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
{
uint16_t msize;
uint16_t name_len;
/**
* Handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages
*
- * @param c the controller handle to determine the connection this message
+ * @param cls the controller handle to determine the connection this message
* belongs to
* @param msg the barrier status message
*/
static void
-handle_barrier_status_ (struct GNUNET_TESTBED_Controller *c,
- const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
+handle_barrier_status (void *cls,
+ const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
{
+ struct GNUNET_TESTBED_Controller *c = cls;
struct GNUNET_TESTBED_Barrier *barrier;
char *emsg;
const char *name;
}
-/**
- * Handler for messages from controller (testbed service)
- *
- * @param cls the controller handler
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
-{
- struct GNUNET_TESTBED_Controller *c = cls;
- int status;
- uint16_t msize;
-
- c->in_receive = GNUNET_NO;
- /* FIXME: Add checks for message integrity */
- if (NULL == msg)
- {
- LOG_DEBUG ("Receive timed out or connection to service dropped\n");
- return;
- }
- msize = ntohs (msg->size);
- switch (ntohs (msg->type))
- {
- case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS:
- GNUNET_assert (msize >=
- sizeof (struct GNUNET_TESTBED_HostConfirmedMessage));
- status =
- GNUNET_TESTBED_host_handle_addhostconfirm_
- (c, (const struct GNUNET_TESTBED_HostConfirmedMessage*) msg);
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS:
- GNUNET_assert (msize ==
- sizeof (struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage));
- handle_opsuccess (c,
- (const struct
- GNUNET_TESTBED_GenericOperationSuccessEventMessage *)
- msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT:
- GNUNET_assert (msize >=
- sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage));
- handle_op_fail_event (c,
- (const struct
- GNUNET_TESTBED_OperationFailureEventMessage *)
- msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS:
- GNUNET_assert (msize ==
- sizeof (struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage));
- handle_peer_create_success (c,
- (const struct
- GNUNET_TESTBED_PeerCreateSuccessEventMessage
- *) msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT:
- GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage));
- handle_peer_event (c,
- (const struct GNUNET_TESTBED_PeerEventMessage *)
- msg);
-
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION:
- GNUNET_assert (msize >=
- sizeof (struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage));
- handle_peer_config (c,
- (const struct
- GNUNET_TESTBED_PeerConfigurationInformationMessage
- *) msg);
- status = GNUNET_YES;
-
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT:
- GNUNET_assert (msize ==
- sizeof (struct GNUNET_TESTBED_ConnectionEventMessage));
- handle_peer_conevent (c,
- (const struct
- GNUNET_TESTBED_ConnectionEventMessage *) msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
- GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration));
- handle_slave_config (c,
- (const struct GNUNET_TESTBED_SlaveConfiguration *)
- msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT:
- handle_link_controllers_result (c,
- (const struct
- GNUNET_TESTBED_ControllerLinkResponse
- *) msg);
- status = GNUNET_YES;
- break;
- case GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS:
- status = check_barrier_status_ (c,
- (const struct GNUNET_TESTBED_BarrierStatusMsg *) msg);
- if (GNUNET_OK == status)
- handle_barrier_status_ (c,
- (const struct
- GNUNET_TESTBED_BarrierStatusMsg *)
- msg);
- break;
- default:
- GNUNET_assert (0);
- }
- if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive))
- {
- c->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (c->client, &message_handler, c,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
-}
-
-
-/**
- * Function called to notify a client about the connection begin ready to queue
- * more data. "buf" will be NULL and "size" zero if the connection was closed
- * for writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_ready_notify (void *cls, size_t size, void *buf)
-{
- struct GNUNET_TESTBED_Controller *c = cls;
- struct MessageQueue *mq_entry;
-
- c->th = NULL;
- mq_entry = c->mq_head;
- GNUNET_assert (NULL != mq_entry);
- if ((0 == size) && (NULL == buf)) /* Timeout */
- {
- LOG_DEBUG ("Message sending timed out -- retrying\n");
- c->th =
- GNUNET_CLIENT_notify_transmit_ready (c->client,
- ntohs (mq_entry->msg->size),
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify, c);
- return 0;
- }
- GNUNET_assert (ntohs (mq_entry->msg->size) <= size);
- size = ntohs (mq_entry->msg->size);
- memcpy (buf, mq_entry->msg, size);
- LOG_DEBUG ("Message of type: %u and size: %u sent\n",
- ntohs (mq_entry->msg->type), size);
- GNUNET_free (mq_entry->msg);
- GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry);
- GNUNET_free (mq_entry);
- mq_entry = c->mq_head;
- if (NULL != mq_entry)
- c->th =
- GNUNET_CLIENT_notify_transmit_ready (c->client,
- ntohs (mq_entry->msg->size),
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify, c);
- if (GNUNET_NO == c->in_receive)
- {
- c->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (c->client, &message_handler, c,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return size;
-}
-
-
/**
* Queues a message in send queue for sending to the service
*
* @param controller the handle to the controller
* @param msg the message to queue
+ * @deprecated
*/
void
GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
struct GNUNET_MessageHeader *msg)
{
- struct MessageQueue *mq_entry;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_MessageHeader *m2;
uint16_t type;
uint16_t size;
size = ntohs (msg->size);
GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) &&
(GNUNET_MESSAGE_TYPE_TESTBED_MAX > type));
- mq_entry = GNUNET_new (struct MessageQueue);
- mq_entry->msg = msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %u, size %u for sending\n", type,
- ntohs (msg->size));
- GNUNET_CONTAINER_DLL_insert_tail (controller->mq_head, controller->mq_tail,
- mq_entry);
- if (NULL == controller->th)
- controller->th =
- GNUNET_CLIENT_notify_transmit_ready (controller->client, size,
- TIMEOUT_REL, GNUNET_YES,
- &transmit_ready_notify,
- controller);
+ env = GNUNET_MQ_msg_extra (m2,
+ size - sizeof (*m2),
+ type);
+ memcpy (m2, msg, size);
+ GNUNET_free (msg);
+ GNUNET_MQ_send (controller->mq,
+ env);
}
}
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure, a `struct GNUNET_TESTBED_Controller *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ /* struct GNUNET_TESTBED_Controller *c = cls; */
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Encountered MQ error: %d\n",
+ error);
+ /* now what? */
+ GNUNET_SCHEDULER_shutdown (); /* seems most reasonable */
+}
+
+
/**
* Start a controller process using the given configuration at the
* given host.
GNUNET_TESTBED_ControllerCallback cc,
void *cc_cls)
{
- struct GNUNET_TESTBED_Controller *controller;
+ GNUNET_MQ_hd_var_size (add_host_confirm,
+ GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS,
+ struct GNUNET_TESTBED_HostConfirmedMessage);
+ GNUNET_MQ_hd_fixed_size (peer_conevent,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT,
+ struct GNUNET_TESTBED_ConnectionEventMessage);
+ GNUNET_MQ_hd_fixed_size (opsuccess,
+ GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS,
+ struct GNUNET_TESTBED_GenericOperationSuccessEventMessage);
+ GNUNET_MQ_hd_var_size (op_fail_event,
+ GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT,
+ struct GNUNET_TESTBED_OperationFailureEventMessage);
+ GNUNET_MQ_hd_fixed_size (peer_create_success,
+ GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS,
+ struct GNUNET_TESTBED_PeerCreateSuccessEventMessage);
+ GNUNET_MQ_hd_fixed_size (peer_event,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT,
+ struct GNUNET_TESTBED_PeerEventMessage);
+ GNUNET_MQ_hd_var_size (peer_config,
+ GNUNET_MESSAGE_TYPE_TESTBED_PEER_INFORMATION,
+ struct GNUNET_TESTBED_PeerConfigurationInformationMessage);
+ GNUNET_MQ_hd_var_size (slave_config,
+ GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION,
+ struct GNUNET_TESTBED_SlaveConfiguration);
+ GNUNET_MQ_hd_var_size (link_controllers_result,
+ GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
+ struct GNUNET_TESTBED_ControllerLinkResponse);
+ GNUNET_MQ_hd_var_size (barrier_status,
+ GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS,
+ const struct GNUNET_TESTBED_BarrierStatusMsg);
+ struct GNUNET_TESTBED_Controller *controller
+ = GNUNET_new (struct GNUNET_TESTBED_Controller);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_add_host_confirm_handler (controller),
+ make_peer_conevent_handler (controller),
+ make_opsuccess_handler (controller),
+ make_op_fail_event_handler (controller),
+ make_peer_create_success_handler (controller),
+ make_peer_event_handler (controller),
+ make_peer_config_handler (controller),
+ make_slave_config_handler (controller),
+ make_link_controllers_result_handler (controller),
+ make_barrier_status_handler (controller),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_TESTBED_InitMessage *msg;
const struct GNUNET_CONFIGURATION_Handle *cfg;
const char *controller_hostname;
&max_parallel_operations))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
if (GNUNET_OK !=
&max_parallel_service_connections))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
if (GNUNET_OK !=
&max_parallel_topology_config_operations))
{
GNUNET_break (0);
+ GNUNET_free (controller);
return NULL;
}
- controller = GNUNET_new (struct GNUNET_TESTBED_Controller);
controller->cc = cc;
controller->cc_cls = cc_cls;
controller->event_mask = event_mask;
controller->cfg = GNUNET_CONFIGURATION_dup (cfg);
- controller->client = GNUNET_CLIENT_connect ("testbed", controller->cfg);
- if (NULL == controller->client)
+ controller->mq = GNUNET_CLIENT_connecT (controller->cfg,
+ "testbed",
+ handlers,
+ &mq_error_handler,
+ controller);
+ if (NULL == controller->mq)
{
+ GNUNET_break (0);
GNUNET_TESTBED_controller_disconnect (controller);
return NULL;
}
* @param cls closure
* @param key current key code
* @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- * iterate,
- * GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
*/
static int
opc_free_iterator (void *cls, uint32_t key, void *value)
void
GNUNET_TESTBED_controller_disconnect (struct GNUNET_TESTBED_Controller *c)
{
- struct MessageQueue *mq_entry;
-
- if (NULL != c->th)
- GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
- /* Clear the message queue */
- while (NULL != (mq_entry = c->mq_head))
+ if (NULL != c->mq)
{
- GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail,
- mq_entry);
- GNUNET_free (mq_entry->msg);
- GNUNET_free (mq_entry);
+ GNUNET_MQ_destroy (c->mq);
+ c->mq = NULL;
}
- if (NULL != c->client)
- GNUNET_CLIENT_disconnect (c->client);
if (NULL != c->host)
GNUNET_TESTBED_deregister_host_at_ (c->host, c);
GNUNET_CONFIGURATION_destroy (c->cfg);
* @return the size of the xconfig
*/
size_t
-GNUNET_TESTBED_compress_config_ (const char *config, size_t size,
+GNUNET_TESTBED_compress_config_ (const char *config,
+ size_t size,
char **xconfig)
{
size_t xsize;
* #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS,
* #GNUNET_MESSAGE_TYPE_TESTBED_LINK_CONTROLLERS_RESULT,
*
+ * FIXME: This API is incredibly ugly.
+ *
* @param msg the message containing compressed configuration
* @return handle to the parsed configuration; NULL upon error while parsing the message
*/
}
-
-
-
/* end of testbed_api.c */