From: Christian Grothoff Date: Mon, 29 Aug 2016 13:03:20 +0000 (+0000) Subject: -starting with new service MQ logic X-Git-Tag: initial-import-from-subversion-38251~310 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=3fb5bcead0f0acb3d0a6ec96a12c4207b227211b;p=oweals%2Fgnunet.git -starting with new service MQ logic --- diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 89a25983c..8c9bc4b5b 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c @@ -49,12 +49,12 @@ struct GNUNET_SERVER_MessageStreamTokenizer GNUNET_SERVER_MessageTokenizerCallback cb; /** - * Closure for cb. + * Closure for @e cb. */ void *cb_cls; /** - * Size of the buffer (starting at 'hdr'). + * Size of the buffer (starting at @e hdr). */ size_t curr_buf; diff --git a/src/util/service_new.c b/src/util/service_new.c index b716a73e0..fc47289f6 100644 --- a/src/util/service_new.c +++ b/src/util/service_new.c @@ -212,12 +212,28 @@ struct GNUNET_SERVICE_Client */ struct GNUNET_MQ_Handle *mq; + /** + * Tokenizer we use for processing incoming data. + */ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + /** * Task that warns about missing calls to * #GNUNET_SERVICE_client_continue(). */ struct GNUNET_SCHEDULER_Task *warn_task; + /** + * Task that receives data from the client to + * pass it to the handlers. + */ + struct GNUNET_SCHEDULER_Task *recv_task; + + /** + * Task that transmit data to the client. + */ + struct GNUNET_SCHEDULER_Task *send_task; + /** * User context value, value returned from * the connect callback. @@ -408,6 +424,123 @@ GNUNET_SERVICE_suspend (struct GNUNET_SERVICE_Handle *sh) } +/** + * Signature of functions implementing the sending functionality of a + * message queue. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +static void +service_mq_send (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ + struct GNUNET_SERVICE_Client *client = cls; + + // FIXME 1: setup "client->send_task" for transmission. + // FIXME 2: I seriously hope we do not need to make a copy of `msg`! + // OPTIMIZATION: ideally, we'd like the ability to peak at the rest of + // the queue and transmit more than one message if possible. +} + + +/** + * Implements the destruction of a message queue. Implementations + * must not free @a mq, but should take care of @a impl_state. + * Not sure there is anything to do here! (FIXME!) + * + * @param mq the message queue to destroy + * @param impl_state state of the implementation + */ +static void +service_mq_destroy (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct GNUNET_SERVICE_Client *client = cls; + + // FIXME +} + + +/** + * Implementation function that cancels the currently sent message. + * + * @param mq message queue + * @param impl_state state specific to the implementation + */ +static void +service_mq_cancel (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct GNUNET_SERVICE_Client *client = cls; + + // FIXME: semantics? What to do!? +} + + +/** + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure + * @param error error code + */ +static void +service_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SERVICE_Client *client = cls; + + // FIXME! +} + + +/** + * Functions with this signature are called whenever a + * complete message is received by the tokenizer for a client. + * + * Do not call #GNUNET_SERVER_mst_destroy() from within + * the scope of this callback. + * + * @param cls closure with the `struct GNUNET_SERVICE_Client *` + * @param client closure with the `struct GNUNET_SERVICE_Client *` + * @param message the actual message + * @return #GNUNET_OK on success (always) + */ +static int +service_client_mst_cb (void *cls, + void *client, + const struct GNUNET_MessageHeader *message) +{ + struct GNUNET_SERVICE_Client *client = cls; + + GNUNET_MQ_inject_message (client->mq, + message); + return GNUNET_OK; +} + + +/** + * A client sent us data. Receive and process it. If we are done, + * reschedule this task. + * + * @param cls the `struct GNUNET_SERVICE_Client` that sent us data. + */ +static void +service_client_recv (void *cls) +{ + struct GNUNET_SERVICE_Client *client = cls; + + // FIXME: read into buffer, pass to MST, then client->mq inject! + // FIXME: revise MST API to avoid the memcpy! + // i.e.: GNUNET_MST_read (client->sock); +} + + /** * We have successfully accepted a connection from a client. Now * setup the client (with the scheduler) and tell the application. @@ -427,10 +560,23 @@ start_client (struct GNUNET_SERVICE_Handle *sh, client); client->sh = sh; client->sock = csock; - client->mq = NULL; // FIXME! + client->mq = GNUNET_MQ_queue_for_callbacks (&service_mq_send, + &service_mq_destroy, + &service_mq_cancel, + client, + sh->handlers, + &service_mq_error_handler, + client); + client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb, + client); client->user_context = sh->connect_cb (sh->cb_cls, client, client->mq); + GNUNET_MQ_set_handlers_closure (client->mq, + client->user_context); + client->recv_task = GNUNET_SCHEDULER_add_read (client->sock, + &service_client_recv, + client); } @@ -595,6 +741,17 @@ GNUNET_SERVICE_client_drop (struct GNUNET_SERVICE_Client *c) GNUNET_SCHEDULER_cancel (c->warn_task); c->warn_task = NULL; } + if (NULL != c->recv_task) + { + GNUNET_SCHEDULER_cancel (c->recv_task); + c->recv_task = NULL; + } + if (NULL != c->send_task) + { + GNUNET_SCHEDULER_cancel (c->send_task); + c->send_task = NULL; + } + GNUNET_SERVER_mst_destroy (c->mst); GNUNET_MQ_destroy (c->mq); if (GNUNET_NO == c->persist) {