-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_queued (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
- const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
- size_t msg_size;
-
- GNUNET_assert (NULL != buf);
- msg_size = ntohs (msg->size);
- GNUNET_assert (size >= msg_size);
- memcpy (buf, msg, msg_size);
- state->th = NULL;
-
- GNUNET_MQ_impl_send_continue (mq);
-
- return msg_size;
-}
-
-
-static void
-server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct ServerClientSocketState *state = impl_state;
-
- if (NULL != state->th)
- {
- GNUNET_SERVER_notify_transmit_ready_cancel (state->th);
- state->th = NULL;
- }
-
- GNUNET_assert (NULL != mq);
- GNUNET_assert (NULL != state);
- GNUNET_SERVER_client_drop (state->client);
- GNUNET_free (state);
-}
-
-
-static void
-server_client_send_impl (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
-{
- struct ServerClientSocketState *state = impl_state;
-
- GNUNET_assert (NULL != mq);
- state->th = GNUNET_SERVER_notify_transmit_ready (state->client,
- ntohs (msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_queued, mq);
-}
-
-
-struct GNUNET_MQ_Handle *
-GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
-{
- struct GNUNET_MQ_Handle *mq;
- struct ServerClientSocketState *scss;
-
- mq = GNUNET_new (struct GNUNET_MQ_Handle);
- scss = GNUNET_new (struct ServerClientSocketState);
- mq->impl_state = scss;
- scss->client = client;
- GNUNET_SERVER_client_keep (client);
- mq->send_impl = &server_client_send_impl;
- mq->destroy_impl = &server_client_destroy_impl;
- return mq;
-}
-
-
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void
-handle_client_message (void *cls,
- const struct GNUNET_MessageHeader *msg)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- struct ClientConnectionState *state;
-
- state = mq->impl_state;
- if (NULL == msg)
- {
- GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
- return;
- }
- GNUNET_CLIENT_receive (state->connection,
- &handle_client_message,
- mq,
- GNUNET_TIME_UNIT_FOREVER_REL);
- GNUNET_MQ_inject_message (mq, msg);
-}
-
-
-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in @a buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-connection_client_transmit_queued (void *cls,
- size_t size,
- void *buf)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- const struct GNUNET_MessageHeader *msg;
- struct ClientConnectionState *state = mq->impl_state;
- size_t msg_size;
-
- GNUNET_assert (NULL != mq);
- state->th = NULL;
- msg = GNUNET_MQ_impl_current (mq);
-
- if (NULL == buf)
- {
- GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
- return 0;
- }
-
- if ( (GNUNET_YES == state->receive_requested) &&
- (GNUNET_NO == state->receive_active) )
- {
- state->receive_active = GNUNET_YES;
- GNUNET_CLIENT_receive (state->connection,
- &handle_client_message,
- mq,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
-
- msg_size = ntohs (msg->size);
- GNUNET_assert (size >= msg_size);
- memcpy (buf, msg, msg_size);
- state->th = NULL;
-
- GNUNET_MQ_impl_send_continue (mq);
-
- return msg_size;
-}
-
-
-static void
-connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct ClientConnectionState *state = impl_state;
-
- if (NULL != state->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
- state->th = NULL;
- }
- GNUNET_CLIENT_disconnect (state->connection);
- GNUNET_free (impl_state);
-}
-
-
-static void
-connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
-{
- struct ClientConnectionState *state = impl_state;
-
- GNUNET_assert (NULL != state);
- GNUNET_assert (NULL == state->th);
- state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection,
- ntohs (msg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &connection_client_transmit_queued,
- mq);
- GNUNET_assert (NULL != state->th);
-}
-
-
-static void
-connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
-{
- struct ClientConnectionState *state = impl_state;
-
- GNUNET_assert (NULL != state->th);
- GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
- state->th = NULL;
-}
-
-
-struct GNUNET_MQ_Handle *
-GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_ErrorHandler error_handler,
- void *error_handler_cls)
-{
- struct GNUNET_MQ_Handle *mq;
- struct ClientConnectionState *state;
- unsigned int i;
-
- mq = GNUNET_new (struct GNUNET_MQ_Handle);
- if (NULL != handlers)
- {
- for (i=0;NULL != handlers[i].cb; i++) ;
- mq->handlers = GNUNET_new_array (i,
- struct GNUNET_MQ_MessageHandler);
- memcpy (mq->handlers,
- handlers,
- i * sizeof (struct GNUNET_MQ_MessageHandler));
- }
- mq->error_handler = error_handler;
- mq->error_handler_cls = error_handler_cls;
- state = GNUNET_new (struct ClientConnectionState);
- state->connection = connection;
- mq->impl_state = state;
- mq->send_impl = &connection_client_send_impl;
- mq->destroy_impl = &connection_client_destroy_impl;
- mq->cancel_impl = &connection_client_cancel_impl;
- if (NULL != handlers)
- state->receive_requested = GNUNET_YES;
-
- return mq;
-}
-
-