-starting with new service MQ logic
authorChristian Grothoff <christian@grothoff.org>
Mon, 29 Aug 2016 13:03:20 +0000 (13:03 +0000)
committerChristian Grothoff <christian@grothoff.org>
Mon, 29 Aug 2016 13:03:20 +0000 (13:03 +0000)
src/util/server_mst.c
src/util/service_new.c

index 89a25983c0fd766fd59939cdd0926460100943e2..8c9bc4b5bc48267fcf7fa37dfab95b57540e0809 100644 (file)
@@ -49,12 +49,12 @@ struct GNUNET_SERVER_MessageStreamTokenizer
   GNUNET_SERVER_MessageTokenizerCallback cb;
 
   /**
-   * Closure for cb.
+   * Closure for @e cb.
    */
   void *cb_cls;
 
   /**
-   * Size of the buffer (starting at 'hdr').
+   * Size of the buffer (starting at @e hdr).
    */
   size_t curr_buf;
 
index b716a73e0919c5a3feb685cb59d520b0815c7f05..fc47289f6f15f93e336b82f584dd87f9f42e266f 100644 (file)
@@ -212,12 +212,28 @@ struct GNUNET_SERVICE_Client
    */
   struct GNUNET_MQ_Handle *mq;
 
+  /**
+   * Tokenizer we use for processing incoming data.
+   */
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
   /**
    * Task that warns about missing calls to
    * #GNUNET_SERVICE_client_continue().
    */
   struct GNUNET_SCHEDULER_Task *warn_task;
 
+  /**
+   * Task that receives data from the client to
+   * pass it to the handlers.
+   */
+  struct GNUNET_SCHEDULER_Task *recv_task;
+
+  /**
+   * Task that transmit data to the client.
+   */
+  struct GNUNET_SCHEDULER_Task *send_task;
+
   /**
    * User context value, value returned from
    * the connect callback.
@@ -408,6 +424,123 @@ GNUNET_SERVICE_suspend (struct GNUNET_SERVICE_Handle *sh)
 }
 
 
+/**
+ * Signature of functions implementing the sending functionality of a
+ * message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+service_mq_send (struct GNUNET_MQ_Handle *mq,
+                 const struct GNUNET_MessageHeader *msg,
+                 void *impl_state)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  // FIXME 1: setup "client->send_task" for transmission.
+  // FIXME 2: I seriously hope we do not need to make a copy of `msg`!
+  // OPTIMIZATION: ideally, we'd like the ability to peak at the rest of
+  //               the queue and transmit more than one message if possible.
+}
+
+
+/**
+ * Implements the destruction of a message queue.  Implementations
+ * must not free @a mq, but should take care of @a impl_state.
+ * Not sure there is anything to do here! (FIXME!)
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+service_mq_destroy (struct GNUNET_MQ_Handle *mq,
+                    void *impl_state)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  // FIXME
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+service_mq_cancel (struct GNUNET_MQ_Handle *mq,
+                   void *impl_state)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  // FIXME: semantics? What to do!?
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+service_mq_error_handler (void *cls,
+                          enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  // FIXME!
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received by the tokenizer for a client.
+ *
+ * Do not call #GNUNET_SERVER_mst_destroy() from within
+ * the scope of this callback.
+ *
+ * @param cls closure with the `struct GNUNET_SERVICE_Client *`
+ * @param client closure with the `struct GNUNET_SERVICE_Client *`
+ * @param message the actual message
+ * @return #GNUNET_OK on success (always)
+ */
+static int
+service_client_mst_cb (void *cls,
+                       void *client,
+                       const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  GNUNET_MQ_inject_message (client->mq,
+                            message);
+  return GNUNET_OK;
+}
+
+
+/**
+ * A client sent us data. Receive and process it.  If we are done,
+ * reschedule this task.
+ *
+ * @param cls the `struct GNUNET_SERVICE_Client` that sent us data.
+ */
+static void
+service_client_recv (void *cls)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  // FIXME: read into buffer, pass to MST, then client->mq inject!
+  // FIXME: revise MST API to avoid the memcpy!
+  // i.e.: GNUNET_MST_read (client->sock);
+}
+
+
 /**
  * We have successfully accepted a connection from a client.  Now
  * setup the client (with the scheduler) and tell the application.
@@ -427,10 +560,23 @@ start_client (struct GNUNET_SERVICE_Handle *sh,
                                client);
   client->sh = sh;
   client->sock = csock;
-  client->mq = NULL; // FIXME!
+  client->mq = GNUNET_MQ_queue_for_callbacks (&service_mq_send,
+                                              &service_mq_destroy,
+                                              &service_mq_cancel,
+                                              client,
+                                              sh->handlers,
+                                              &service_mq_error_handler,
+                                              client);
+  client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb,
+                                          client);
   client->user_context = sh->connect_cb (sh->cb_cls,
                                          client,
                                          client->mq);
+  GNUNET_MQ_set_handlers_closure (client->mq,
+                                  client->user_context);
+  client->recv_task = GNUNET_SCHEDULER_add_read (client->sock,
+                                                 &service_client_recv,
+                                                 client);
 }
 
 
@@ -595,6 +741,17 @@ GNUNET_SERVICE_client_drop (struct GNUNET_SERVICE_Client *c)
     GNUNET_SCHEDULER_cancel (c->warn_task);
     c->warn_task = NULL;
   }
+  if (NULL != c->recv_task)
+  {
+    GNUNET_SCHEDULER_cancel (c->recv_task);
+    c->recv_task = NULL;
+  }
+  if (NULL != c->send_task)
+  {
+    GNUNET_SCHEDULER_cancel (c->send_task);
+    c->send_task = NULL;
+  }
+  GNUNET_SERVER_mst_destroy (c->mst);
   GNUNET_MQ_destroy (c->mq);
   if (GNUNET_NO == c->persist)
   {