converting monitor plugin functionality to MQ
authorChristian Grothoff <christian@grothoff.org>
Wed, 6 Jul 2016 22:52:03 +0000 (22:52 +0000)
committerChristian Grothoff <christian@grothoff.org>
Wed, 6 Jul 2016 22:52:03 +0000 (22:52 +0000)
src/transport/transport_api_monitor_plugins.c

index eef4a083065a0b655a0d7dd426c4d8250d2fc1c9..01ec2074a61be06a154c30addea551c8f0bfe9dd 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2014 GNUnet e.V.
+     Copyright (C) 2014, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -41,7 +41,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
   /**
    * Connection to the service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Our configuration.
@@ -72,7 +72,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
   /**
    * Task ID for reconnect.
    */
-  struct GNUNET_SCHEDULER_Task * reconnect_task;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
 };
 
@@ -95,39 +95,6 @@ struct GNUNET_TRANSPORT_PluginSession
 };
 
 
-/**
- * Function called with responses from the service.
- *
- * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- *        message with the human-readable address
- */
-static void
-response_processor (void *cls,
-                    const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Send our subscription request to the service.
- *
- * @param pal_ctx our context
- */
-static void
-send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
-{
-  struct GNUNET_MessageHeader msg;
-
-  msg.size = htons (sizeof (struct GNUNET_MessageHeader));
-  msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CLIENT_transmit_and_get_response (pm->client,
-                                                          &msg,
-                                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                                          GNUNET_YES,
-                                                          &response_processor,
-                                                          pm));
-}
-
 
 /**
  * Task run to re-establish the connection.
@@ -135,15 +102,7 @@ send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  */
 static void
-do_plugin_connect (void *cls)
-{
-  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
-
-  pm->reconnect_task = NULL;
-  pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
-  GNUNET_assert (NULL != pm->client);
-  send_plugin_mon_request (pm);
-}
+do_plugin_connect (void *cls);
 
 
 /**
@@ -184,8 +143,8 @@ free_entry (void *cls,
 static void
 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
 {
-  GNUNET_CLIENT_disconnect (pm->client);
-  pm->client = NULL;
+  GNUNET_MQ_destroy (pm->mq);
+  pm->mq = NULL;
   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
                                            &free_entry,
                                            pm);
@@ -257,15 +216,44 @@ locate_by_id (void *cls,
  * Function called with responses from the service.
  *
  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- *        message with the human-readable address
+ * @paramm tpmm message with event data
+ * @return #GNUNET_Ok if message is well-formed
+ */
+static int
+check_event (void *cls,
+             const struct TransportPluginMonitorMessage *tpmm)
+{
+  const char *pname;
+  size_t pname_len;
+  size_t paddr_len;
+
+  pname = (const char *) &tpmm[1];
+  pname_len = ntohs (tpmm->plugin_name_len);
+  paddr_len = ntohs (tpmm->plugin_address_len);
+  if ( (pname_len +
+        paddr_len +
+        sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
+       ( (0 != pname_len) &&
+         ('\0' != pname[pname_len - 1]) ) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function called with responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @paramm tpmm message with event data
  */
 static void
-response_processor (void *cls,
-                    const struct GNUNET_MessageHeader *msg)
+handle_event (void *cls,
+              const struct TransportPluginMonitorMessage *tpmm)
 {
   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
-  const struct TransportPluginMonitorMessage *tpmm;
   struct GNUNET_TRANSPORT_PluginSession *ps;
   const char *pname;
   const void *paddr;
@@ -276,47 +264,9 @@ response_processor (void *cls,
   struct GNUNET_HELLO_Address addr;
   struct SearchContext rv;
 
-  if (NULL == msg)
-  {
-    reconnect_plugin_ctx (pm);
-    return;
-  }
-  if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) &&
-       (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
-  {
-    /* we are in sync */
-    pm->cb (pm->cb_cls,
-            NULL,
-            NULL,
-            NULL);
-    GNUNET_CLIENT_receive (pm->client,
-                           &response_processor,
-                           pm,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-    return;
-  }
-
-  if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) ||
-       (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
-  {
-    GNUNET_break (0);
-    reconnect_plugin_ctx (pm);
-    return;
-  }
-  tpmm = (const struct TransportPluginMonitorMessage *) msg;
   pname = (const char *) &tpmm[1];
   pname_len = ntohs (tpmm->plugin_name_len);
   paddr_len = ntohs (tpmm->plugin_address_len);
-  if ( (pname_len +
-        paddr_len +
-        sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
-       ( (0 != pname_len) &&
-         ('\0' != pname[pname_len - 1]) ) )
-  {
-    GNUNET_break (0);
-    reconnect_plugin_ctx (pm);
-    return;
-  }
   paddr = &pname[pname_len];
   ps = NULL;
   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
@@ -372,10 +322,83 @@ response_processor (void *cls,
                                                            ps));
     GNUNET_free (ps);
   }
-  GNUNET_CLIENT_receive (pm->client,
-                         &response_processor,
-                         pm,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Function called with sync responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @param msg message from the service
+ */
+static void
+handle_sync (void *cls,
+             const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+  /* we are in sync, notify callback */
+  pm->cb (pm->cb_cls,
+          NULL,
+          NULL,
+          NULL);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_NSE_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+  reconnect_plugin_ctx (pm);
+}
+
+
+/**
+ * Task run to re-establish the connection.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ */
+static void
+do_plugin_connect (void *cls)
+{
+  GNUNET_MQ_hd_var_size (event,
+                         GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
+                         struct TransportPluginMonitorMessage);
+  GNUNET_MQ_hd_fixed_size (sync,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
+                           struct GNUNET_MessageHeader);
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_event_handler (pm),
+    make_sync_handler (pm),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *env;
+
+  pm->reconnect_task = NULL;
+  pm->mq = GNUNET_CLIENT_connecT (pm->cfg,
+                                  "transport",
+                                  handlers,
+                                  &mq_error_handler,
+                                  pm);
+  if (NULL == pm->mq)
+    return;
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
+  GNUNET_MQ_send (pm->mq,
+                  env);
 }
 
 
@@ -394,19 +417,18 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                   void *cb_cls)
 {
   struct GNUNET_TRANSPORT_PluginMonitor *pm;
-  struct GNUNET_CLIENT_Connection *client;
 
-  client = GNUNET_CLIENT_connect ("transport",
-                                  cfg);
-  if (NULL == client)
-    return NULL;
   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
   pm->cb = cb;
   pm->cb_cls = cb_cls;
   pm->cfg = cfg;
-  pm->client = client;
+  do_plugin_connect (pm);
+  if (NULL == pm->mq)
+  {
+    GNUNET_free (pm);
+    return NULL;
+  }
   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
-  send_plugin_mon_request (pm);
   return pm;
 }
 
@@ -422,10 +444,10 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
 void
 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
 {
-  if (NULL != pm->client)
+  if (NULL != pm->mq)
   {
-    GNUNET_CLIENT_disconnect (pm->client);
-    pm->client = NULL;
+    GNUNET_MQ_destroy (pm->mq);
+    pm->mq = NULL;
   }
   if (NULL != pm->reconnect_task)
   {