REST/NAMESTORE: rework API
[oweals/gnunet.git] / src / transport / transport_api_monitor_plugins.c
index 7c11194f2ce3d1cee9f7b60600a88a5faeb9124d..f0acc24085f9c57614099d9152a15d0793ca3afc 100644 (file)
@@ -1,21 +1,21 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2014 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2014, 2016 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     SPDX-License-Identifier: AGPL3.0-or-later
 */
 
 /**
@@ -41,7 +41,7 @@ struct GNUNET_TRANSPORT_PluginMonitor
   /**
    * Connection to the service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Our configuration.
@@ -72,14 +72,14 @@ struct GNUNET_TRANSPORT_PluginMonitor
   /**
    * Task ID for reconnect.
    */
-  struct GNUNET_SCHEDULER_Task * reconnect_task;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
 };
 
 
 /**
  * Abstract representation of a plugin's session.
- * Corresponds to the `struct Session` within the TRANSPORT service.
+ * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
  */
 struct GNUNET_TRANSPORT_PluginSession
 {
@@ -95,57 +95,14 @@ struct GNUNET_TRANSPORT_PluginSession
 };
 
 
-/**
- * Function called with responses from the service.
- *
- * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- *        message with the human-readable address
- */
-static void
-response_processor (void *cls,
-                    const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Send our subscription request to the service.
- *
- * @param pal_ctx our context
- */
-static void
-send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
-{
-  struct GNUNET_MessageHeader msg;
-
-  msg.size = htons (sizeof (struct GNUNET_MessageHeader));
-  msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CLIENT_transmit_and_get_response (pm->client,
-                                                          &msg,
-                                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                                          GNUNET_YES,
-                                                          &response_processor,
-                                                          pm));
-}
-
 
 /**
  * Task run to re-establish the connection.
  *
  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param tc scheduler context, unused
  */
 static void
-do_plugin_connect (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
-
-  pm->reconnect_task = NULL;
-  pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
-  GNUNET_assert (NULL != pm->client);
-  send_plugin_mon_request (pm);
-}
+do_plugin_connect (void *cls);
 
 
 /**
@@ -178,21 +135,6 @@ free_entry (void *cls,
 }
 
 
-/**
- * We got disconnected, remove all existing entries from
- * the map and notify client.
- *
- * @param pm montitor that got disconnected
- */
-static void
-clear_map (struct GNUNET_TRANSPORT_PluginMonitor *pm)
-{
-  GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
-                                           &free_entry,
-                                           pm);
-}
-
-
 /**
  * Cut the existing connection and reconnect.
  *
@@ -201,9 +143,11 @@ clear_map (struct GNUNET_TRANSPORT_PluginMonitor *pm)
 static void
 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
 {
-  GNUNET_CLIENT_disconnect (pm->client);
-  pm->client = NULL;
-  clear_map (pm);
+  GNUNET_MQ_destroy (pm->mq);
+  pm->mq = NULL;
+  GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
+                                           &free_entry,
+                                           pm);
   pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
   pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
                                                      &do_plugin_connect,
@@ -272,15 +216,44 @@ locate_by_id (void *cls,
  * Function called with responses from the service.
  *
  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- *        message with the human-readable address
+ * @paramm tpmm message with event data
+ * @return #GNUNET_Ok if message is well-formed
+ */
+static int
+check_event (void *cls,
+             const struct TransportPluginMonitorMessage *tpmm)
+{
+  const char *pname;
+  size_t pname_len;
+  size_t paddr_len;
+
+  pname = (const char *) &tpmm[1];
+  pname_len = ntohs (tpmm->plugin_name_len);
+  paddr_len = ntohs (tpmm->plugin_address_len);
+  if ( (pname_len +
+        paddr_len +
+        sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
+       ( (0 != pname_len) &&
+         ('\0' != pname[pname_len - 1]) ) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function called with responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @paramm tpmm message with event data
  */
 static void
-response_processor (void *cls,
-                    const struct GNUNET_MessageHeader *msg)
+handle_event (void *cls,
+              const struct TransportPluginMonitorMessage *tpmm)
 {
   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
-  const struct TransportPluginMonitorMessage *tpmm;
   struct GNUNET_TRANSPORT_PluginSession *ps;
   const char *pname;
   const void *paddr;
@@ -291,47 +264,9 @@ response_processor (void *cls,
   struct GNUNET_HELLO_Address addr;
   struct SearchContext rv;
 
-  if (NULL == msg)
-  {
-    reconnect_plugin_ctx (pm);
-    return;
-  }
-  if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) &&
-       (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
-  {
-    /* we are in sync */
-    pm->cb (pm->cb_cls,
-            NULL,
-            NULL,
-            NULL);
-    GNUNET_CLIENT_receive (pm->client,
-                           &response_processor,
-                           pm,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-    return;
-  }
-
-  if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) ||
-       (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
-  {
-    GNUNET_break (0);
-    reconnect_plugin_ctx (pm);
-    return;
-  }
-  tpmm = (const struct TransportPluginMonitorMessage *) msg;
   pname = (const char *) &tpmm[1];
   pname_len = ntohs (tpmm->plugin_name_len);
   paddr_len = ntohs (tpmm->plugin_address_len);
-  if ( (pname_len +
-        paddr_len +
-        sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
-       ( (0 != pname_len) &&
-         ('\0' != pname[pname_len - 1]) ) )
-  {
-    GNUNET_break (0);
-    reconnect_plugin_ctx (pm);
-    return;
-  }
   paddr = &pname[pname_len];
   ps = NULL;
   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
@@ -387,10 +322,83 @@ response_processor (void *cls,
                                                            ps));
     GNUNET_free (ps);
   }
-  GNUNET_CLIENT_receive (pm->client,
-                         &response_processor,
-                         pm,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Function called with sync responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @param msg message from the service
+ */
+static void
+handle_sync (void *cls,
+             const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+  /* we are in sync, notify callback */
+  pm->cb (pm->cb_cls,
+          NULL,
+          NULL,
+          NULL);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_NSE_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+  reconnect_plugin_ctx (pm);
+}
+
+
+/**
+ * Task run to re-establish the connection.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ */
+static void
+do_plugin_connect (void *cls)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (event,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
+                           struct TransportPluginMonitorMessage,
+                           pm),
+    GNUNET_MQ_hd_fixed_size (sync,
+                             GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
+                             struct GNUNET_MessageHeader,
+                             pm),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *env;
+
+  pm->reconnect_task = NULL;
+  pm->mq = GNUNET_CLIENT_connect (pm->cfg,
+                                  "transport",
+                                  handlers,
+                                  &mq_error_handler,
+                                  pm);
+  if (NULL == pm->mq)
+    return;
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
+  GNUNET_MQ_send (pm->mq,
+                  env);
 }
 
 
@@ -409,19 +417,18 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                   void *cb_cls)
 {
   struct GNUNET_TRANSPORT_PluginMonitor *pm;
-  struct GNUNET_CLIENT_Connection *client;
 
-  client = GNUNET_CLIENT_connect ("transport",
-                                  cfg);
-  if (NULL == client)
-    return NULL;
   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
   pm->cb = cb;
   pm->cb_cls = cb_cls;
   pm->cfg = cfg;
-  pm->client = client;
+  do_plugin_connect (pm);
+  if (NULL == pm->mq)
+  {
+    GNUNET_free (pm);
+    return NULL;
+  }
   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
-  send_plugin_mon_request (pm);
   return pm;
 }
 
@@ -437,17 +444,19 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
 void
 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
 {
-  if (NULL != pm->client)
+  if (NULL != pm->mq)
   {
-    GNUNET_CLIENT_disconnect (pm->client);
-    pm->client = NULL;
+    GNUNET_MQ_destroy (pm->mq);
+    pm->mq = NULL;
   }
   if (NULL != pm->reconnect_task)
   {
     GNUNET_SCHEDULER_cancel (pm->reconnect_task);
     pm->reconnect_task = NULL;
   }
-  clear_map (pm);
+  GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
+                                           &free_entry,
+                                           pm);
   GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
   GNUNET_free (pm);
 }