*/
struct GNUNET_MQ_Handle *mq;
+ /**
+ * Tokenizer we use for processing incoming data.
+ */
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
/**
* Task that warns about missing calls to
* #GNUNET_SERVICE_client_continue().
*/
struct GNUNET_SCHEDULER_Task *warn_task;
+ /**
+ * Task that receives data from the client to
+ * pass it to the handlers.
+ */
+ struct GNUNET_SCHEDULER_Task *recv_task;
+
+ /**
+ * Task that transmit data to the client.
+ */
+ struct GNUNET_SCHEDULER_Task *send_task;
+
/**
* User context value, value returned from
* the connect callback.
}
+/**
+ * Signature of functions implementing the sending functionality of a
+ * message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+service_mq_send (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ // FIXME 1: setup "client->send_task" for transmission.
+ // FIXME 2: I seriously hope we do not need to make a copy of `msg`!
+ // OPTIMIZATION: ideally, we'd like the ability to peak at the rest of
+ // the queue and transmit more than one message if possible.
+}
+
+
+/**
+ * Implements the destruction of a message queue. Implementations
+ * must not free @a mq, but should take care of @a impl_state.
+ * Not sure there is anything to do here! (FIXME!)
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+service_mq_destroy (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ // FIXME
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+service_mq_cancel (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ // FIXME: semantics? What to do!?
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+service_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ // FIXME!
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer for a client.
+ *
+ * Do not call #GNUNET_SERVER_mst_destroy() from within
+ * the scope of this callback.
+ *
+ * @param cls closure with the `struct GNUNET_SERVICE_Client *`
+ * @param client closure with the `struct GNUNET_SERVICE_Client *`
+ * @param message the actual message
+ * @return #GNUNET_OK on success (always)
+ */
+static int
+service_client_mst_cb (void *cls,
+ void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ GNUNET_MQ_inject_message (client->mq,
+ message);
+ return GNUNET_OK;
+}
+
+
+/**
+ * A client sent us data. Receive and process it. If we are done,
+ * reschedule this task.
+ *
+ * @param cls the `struct GNUNET_SERVICE_Client` that sent us data.
+ */
+static void
+service_client_recv (void *cls)
+{
+ struct GNUNET_SERVICE_Client *client = cls;
+
+ // FIXME: read into buffer, pass to MST, then client->mq inject!
+ // FIXME: revise MST API to avoid the memcpy!
+ // i.e.: GNUNET_MST_read (client->sock);
+}
+
+
/**
* We have successfully accepted a connection from a client. Now
* setup the client (with the scheduler) and tell the application.
client);
client->sh = sh;
client->sock = csock;
- client->mq = NULL; // FIXME!
+ client->mq = GNUNET_MQ_queue_for_callbacks (&service_mq_send,
+ &service_mq_destroy,
+ &service_mq_cancel,
+ client,
+ sh->handlers,
+ &service_mq_error_handler,
+ client);
+ client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb,
+ client);
client->user_context = sh->connect_cb (sh->cb_cls,
client,
client->mq);
+ GNUNET_MQ_set_handlers_closure (client->mq,
+ client->user_context);
+ client->recv_task = GNUNET_SCHEDULER_add_read (client->sock,
+ &service_client_recv,
+ client);
}
GNUNET_SCHEDULER_cancel (c->warn_task);
c->warn_task = NULL;
}
+ if (NULL != c->recv_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->recv_task);
+ c->recv_task = NULL;
+ }
+ if (NULL != c->send_task)
+ {
+ GNUNET_SCHEDULER_cancel (c->send_task);
+ c->send_task = NULL;
+ }
+ GNUNET_SERVER_mst_destroy (c->mst);
GNUNET_MQ_destroy (c->mq);
if (GNUNET_NO == c->persist)
{