migrate transport_core API to MQ
authorChristian Grothoff <christian@grothoff.org>
Fri, 8 Jul 2016 16:34:31 +0000 (16:34 +0000)
committerChristian Grothoff <christian@grothoff.org>
Fri, 8 Jul 2016 16:34:31 +0000 (16:34 +0000)
22 files changed:
src/cadet/gnunet-service-cadet_peer.c
src/core/test_core_api.c
src/core/test_core_api_reliability.c
src/core/test_core_quota_compliance.c
src/dht/gnunet-service-dht.c
src/dht/gnunet-service-dht.h
src/dht/gnunet-service-dht_neighbours.c
src/hostlist/gnunet-daemon-hostlist_client.c
src/hostlist/test_gnunet_daemon_hostlist.c
src/hostlist/test_gnunet_daemon_hostlist_reconnect.c
src/include/gnunet_transport_core_service.h
src/include/gnunet_transport_service.h
src/peerinfo-tool/gnunet-peerinfo.c
src/testbed/gnunet-service-testbed_connectionpool.c
src/testbed/gnunet-service-testbed_connectionpool.h
src/testbed/gnunet-service-testbed_oc.c
src/topology/gnunet-daemon-topology.c
src/transport/Makefile.am
src/transport/transport-testing.c
src/transport/transport_api.c
src/transport/transport_api_get_hello.c
src/transport/transport_api_offer_hello.c

index fa16db4bbb6b6e99c97b1be34607f385538d84a3..101b9e22a27ed14fa5c85040b48248b8a03079df 100644 (file)
@@ -245,14 +245,14 @@ static unsigned long long drop_percent;
 static struct GNUNET_CORE_Handle *core_handle;
 
 /**
- * Handle to communicate with ATS.
+ * Our configuration;
  */
-static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
 /**
- * Handle to try to start new connections.
+ * Handle to communicate with ATS.
  */
-static struct GNUNET_TRANSPORT_Handle *transport_handle;
+static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
 
 /**
  * Shutdown falg.
@@ -557,6 +557,7 @@ core_init (void *cls,
   const struct GNUNET_CONFIGURATION_Handle *c = cls;
   static int i = 0;
 
+  cfg = c;
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
   if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id)))
   {
@@ -1841,24 +1842,6 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c)
                                      NULL,      /* Don't notify about all outbound messages */
                                      GNUNET_NO, /* For header-only out notification */
                                      core_handlers);    /* Register these handlers */
-  if (GNUNET_YES !=
-      GNUNET_CONFIGURATION_get_value_yesno (c, "CADET", "DISABLE_TRY_CONNECT"))
-  {
-    transport_handle = GNUNET_TRANSPORT_connect (c, &my_full_id, NULL, /* cls */
-                                                 /* Notify callbacks */
-                                                 NULL, NULL, NULL);
-  }
-  else
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
-    LOG (GNUNET_ERROR_TYPE_WARNING, "*  DISABLE TRYING CONNECT in config  *\n");
-    LOG (GNUNET_ERROR_TYPE_WARNING, "*  Use this only for test purposes.  *\n");
-    LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
-    transport_handle = NULL;
-  }
-
-
-
   if (NULL == core_handle)
   {
     GNUNET_break (0);
@@ -1886,11 +1869,6 @@ GCP_shutdown (void)
     GNUNET_CORE_disconnect (core_handle);
     core_handle = NULL;
   }
-  if (NULL != transport_handle)
-  {
-    GNUNET_TRANSPORT_disconnect (transport_handle);
-    transport_handle = NULL;
-  }
   if (NULL != ats_ch)
   {
     GNUNET_ATS_connectivity_done (ats_ch);
@@ -2591,7 +2569,10 @@ GCP_try_connect (struct CadetPeer *peer)
   struct GNUNET_HELLO_Message *hello;
   struct GNUNET_MessageHeader *mh;
 
-  if (NULL == transport_handle)
+  if (GNUNET_YES !=
+      GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                            "CADET",
+                                            "DISABLE_TRY_CONNECT"))
     return;
   GCC_check_connections ();
   if (GNUNET_YES == GCP_is_neighbor (peer))
@@ -2606,7 +2587,7 @@ GCP_try_connect (struct CadetPeer *peer)
     GNUNET_TRANSPORT_offer_hello_cancel (peer->hello_offer);
     peer->hello_offer = NULL;
   }
-  peer->hello_offer = GNUNET_TRANSPORT_offer_hello (transport_handle,
+  peer->hello_offer = GNUNET_TRANSPORT_offer_hello (cfg,
                                                     mh,
                                                     &hello_offer_done,
                                                     peer);
index 92ee038da3f81a9dc274b268bd6585b93a5518eb..43f4c421e3443c1b8642ddc3f521094de03407f9 100644 (file)
@@ -65,9 +65,9 @@ process_hello (void *cls,
               "Received (my) `%s' from transport service\n", "HELLO");
   GNUNET_assert (message != NULL);
   if ((p == &p1) && (p2.th != NULL))
-    GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL);
   if ((p == &p2) && (p1.th != NULL))
-    GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL);
 }
 
 
@@ -280,7 +280,7 @@ setup_peer (struct PeerContext *p,
   GNUNET_assert (NULL != p->th);
   p->ats = GNUNET_ATS_connectivity_init (p->cfg);
   GNUNET_assert (NULL != p->ats);
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
   GNUNET_free (binary);
 }
 
index c7672afdb6582ecd659f9ae18a996ee81628ca3b..94c223b74c0bf3eab8371489dcfdf8b1c3daab64 100644 (file)
@@ -412,14 +412,14 @@ process_hello (void *cls,
   GNUNET_assert (message != NULL);
   p->hello = GNUNET_copy_message (message);
   if ((p == &p1) && (p2.th != NULL))
-    GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL);
   if ((p == &p2) && (p1.th != NULL))
-    GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL);
 
   if ((p == &p1) && (p2.hello != NULL))
-    GNUNET_TRANSPORT_offer_hello (p1.th, p2.hello, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p1.cfg, p2.hello, NULL, NULL);
   if ((p == &p2) && (p1.hello != NULL))
-    GNUNET_TRANSPORT_offer_hello (p2.th, p1.hello, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p2.cfg, p1.hello, NULL, NULL);
 }
 
 
@@ -442,7 +442,7 @@ setup_peer (struct PeerContext *p,
   GNUNET_assert (p->th != NULL);
   p->ats = GNUNET_ATS_connectivity_init (p->cfg);
   GNUNET_assert (NULL != p->ats);
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
   GNUNET_free (binary);
 }
 
index 05b1ae3d9ff3f321020910a103ea1c17422137b9..28d836e2e12588978ec77ec2c84a0b057b9b5b18 100644 (file)
@@ -547,14 +547,14 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message)
   p->hello = GNUNET_malloc (ntohs (message->size));
   memcpy (p->hello, message, ntohs (message->size));
   if ((p == &p1) && (p2.th != NULL))
-    GNUNET_TRANSPORT_offer_hello (p2.th, message, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p2.cfg, message, NULL, NULL);
   if ((p == &p2) && (p1.th != NULL))
-    GNUNET_TRANSPORT_offer_hello (p1.th, message, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p1.cfg, message, NULL, NULL);
 
   if ((p == &p1) && (p2.hello != NULL))
-    GNUNET_TRANSPORT_offer_hello (p1.th, p2.hello, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p1.cfg, p2.hello, NULL, NULL);
   if ((p == &p2) && (p1.hello != NULL))
-    GNUNET_TRANSPORT_offer_hello (p2.th, p1.hello, NULL, NULL);
+    GNUNET_TRANSPORT_offer_hello (p2.cfg, p1.hello, NULL, NULL);
 }
 
 
@@ -579,7 +579,7 @@ setup_peer (struct PeerContext *p, const char *cfgname)
   GNUNET_assert (p->th != NULL);
   p->ats = GNUNET_ATS_connectivity_init (p->cfg);
   GNUNET_assert (NULL != p->ats);
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
   GNUNET_free (binary);
 }
 
index abdd77548be2ac6b4b884f2afb036cfed238ab23..e3b9d59a4b0e029bd352118d0090c74e6bf4436c 100644 (file)
@@ -66,11 +66,6 @@ struct GNUNET_SERVER_Handle *GDS_server;
  */
 struct GNUNET_MessageHeader *GDS_my_hello;
 
-/**
- * Handle to the transport service, for getting our hello
- */
-struct GNUNET_TRANSPORT_Handle *GDS_transport_handle;
-
 /**
  * Handle to get our current HELLO.
  */
@@ -112,11 +107,6 @@ shutdown_task (void *cls)
     GNUNET_TRANSPORT_get_hello_cancel (ghh);
     ghh = NULL;
   }
-  if (GDS_transport_handle != NULL)
-  {
-    GNUNET_TRANSPORT_disconnect (GDS_transport_handle);
-    GDS_transport_handle = NULL;
-  }
   GDS_NEIGHBOURS_done ();
   GDS_DATACACHE_done ();
   GDS_ROUTING_done ();
@@ -170,15 +160,9 @@ run (void *cls,
   }
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                 NULL);
-  GDS_transport_handle =
-      GNUNET_TRANSPORT_connect (GDS_cfg, NULL, NULL, NULL, NULL, NULL);
-  if (GDS_transport_handle == NULL)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                _("Failed to connect to transport service!\n"));
-    return;
-  }
-  ghh = GNUNET_TRANSPORT_get_hello (GDS_transport_handle, &process_hello, NULL);
+  ghh = GNUNET_TRANSPORT_get_hello (GDS_cfg,
+                                    &process_hello,
+                                    NULL);
 }
 
 
index 6f641cb9606844422602a4c5560b80efad082693..4684c2324c31467f803fb9e47ec1e2d2643b6981 100644 (file)
@@ -57,9 +57,5 @@ extern struct GNUNET_SERVER_Handle *GDS_server;
  */
 extern struct GNUNET_MessageHeader *GDS_my_hello;
 
-/**
- * Handle to the transport service, for getting our hello
- */
-extern struct GNUNET_TRANSPORT_Handle *GDS_transport_handle;
 
 #endif
index 4add3c4aee0ea02d4b8426a903ad89ceb57432ca..b24a95ab241558eb0fa25bd27dfecbcaddda6c6a 100644 (file)
@@ -592,13 +592,11 @@ try_connect (const struct GNUNET_PeerIdentity *pid,
                                                       ci,
                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   }
-  if ( (NULL != GDS_transport_handle) &&
-       (NULL != ci->oh) &&
+  if ( (NULL != ci->oh) &&
        (NULL != h) )
     GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
-  if ( (NULL != GDS_transport_handle) &&
-       (NULL != h) )
-    ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_transport_handle,
+  if (NULL != h)
+    ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
                                            h,
                                            &offer_hello_done,
                                            ci);
index df0cabe1d348294a2f0eb800d14955e956e3b446..c8c74a9badc074971d949cdbe17f653f58e69d10 100644 (file)
@@ -142,6 +142,14 @@ struct Hostlist
 };
 
 
+struct HelloOffer
+{
+  struct HelloOffer *next;
+  struct HelloOffer *prev;
+  struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
+};
+
+
 /**
  * Our configuration.
  */
@@ -152,11 +160,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
  */
 static struct GNUNET_STATISTICS_Handle *stats;
 
-/**
- * Transport handle.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
 /**
  * Proxy hostname or ip we are using (can be NULL).
  */
@@ -312,6 +315,25 @@ static unsigned int stat_hellos_obtained;
  */
 static unsigned int stat_connection_count;
 
+static struct HelloOffer *ho_head;
+
+static struct HelloOffer *ho_tail;
+
+
+/**
+ * Hello offer complete. Clean up.
+ */
+static void
+done_offer_hello (void *cls)
+{
+  struct HelloOffer *ho = cls;
+
+  GNUNET_CONTAINER_DLL_remove (ho_head,
+                               ho_tail,
+                               ho);
+  GNUNET_free (ho);
+}
+
 
 /**
  * Process downloaded bits by calling callback on each HELLO.
@@ -331,6 +353,7 @@ callback_download (void *ptr,
   static char download_buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
   const char *cbuf = ptr;
   const struct GNUNET_MessageHeader *msg;
+  struct HelloOffer *ho;
   size_t total;
   size_t cpy;
   size_t left;
@@ -390,7 +413,22 @@ callback_download (void *ptr,
                                 ("# valid HELLOs downloaded from hostlist servers"),
                                 1, GNUNET_NO);
       stat_hellos_obtained++;
-      GNUNET_TRANSPORT_offer_hello (transport, msg, NULL, NULL);
+
+      ho = GNUNET_new (struct HelloOffer);
+      ho->ohh = GNUNET_TRANSPORT_offer_hello (cfg,
+                                              msg,
+                                              &done_offer_hello,
+                                              ho);
+      if (NULL == ho->ohh)
+      {
+        GNUNET_free (ho);
+      }
+      else
+      {
+        GNUNET_CONTAINER_DLL_insert (ho_head,
+                                     ho_tail,
+                                     ho);
+      }
     }
     else
     {
@@ -405,7 +443,9 @@ callback_download (void *ptr,
       stat_hellos_obtained++;
       return total;
     }
-    memmove (download_buffer, &download_buffer[msize], download_pos - msize);
+    memmove (download_buffer,
+             &download_buffer[msize],
+             download_pos - msize);
     download_pos -= msize;
   }
   return total;
@@ -1532,13 +1572,6 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c,
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  transport = GNUNET_TRANSPORT_connect (c, NULL, NULL, NULL, NULL, NULL);
-  if (NULL == transport)
-  {
-    GNUNET_break (0);
-    curl_global_cleanup ();
-    return GNUNET_SYSERR;
-  }
   cfg = c;
   stats = st;
 
@@ -1687,8 +1720,18 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c,
 void
 GNUNET_HOSTLIST_client_stop ()
 {
+  struct HelloOffer *ho;
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Hostlist client shutdown\n");
+  while (NULL != (ho = ho_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (ho_head,
+                                 ho_tail,
+                                 ho);
+    GNUNET_TRANSPORT_offer_hello_cancel (ho->ohh);
+    GNUNET_free (ho);
+  }
   if (NULL != sget)
   {
     GNUNET_STATISTICS_get_cancel (sget);
@@ -1725,11 +1768,6 @@ GNUNET_HOSTLIST_client_stop ()
     ti_check_download = NULL;
     curl_global_cleanup ();
   }
-  if (NULL != transport)
-  {
-    GNUNET_TRANSPORT_disconnect (transport);
-    transport = NULL;
-  }
   GNUNET_free_non_null (proxy);
   proxy = NULL;
   GNUNET_free_non_null (proxy_username);
index 5f8ece9b8d5cbc735915a6e67295e160adcead59..6a5850c4d7796979cf1d22b573f0b6ba7bd14333 100644 (file)
@@ -147,7 +147,7 @@ setup_peer (struct PeerContext *p, const char *cfgname)
   p->th =
       GNUNET_TRANSPORT_connect (p->cfg, NULL, p, NULL, &notify_connect, NULL);
   GNUNET_assert (p->th != NULL);
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
   GNUNET_free (binary);
 }
 
index 3dad137a255411f935f476cfa0c3c80ddc4b7ee8..30f26717f7b98758122531339fd65a29c5bbbc91 100644 (file)
@@ -116,7 +116,7 @@ setup_peer (struct PeerContext *p, const char *cfgname)
   p->th =
       GNUNET_TRANSPORT_connect (p->cfg, NULL, p, NULL, &notify_connect, NULL);
   GNUNET_assert (p->th != NULL);
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th, &process_hello, p);
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg, &process_hello, p);
   GNUNET_free (binary);
 }
 
index 816d5efaa21ccb9ca637826976b3f2446f01322d..6dada4f54effe4aa3788fa6f6431f16374b02f50 100644 (file)
@@ -49,50 +49,42 @@ extern "C"
 #define GNUNET_TRANSPORT_CORE_VERSION 0x00000000
 
 
-/**
- * Function called by the transport for each received message.
- *
- * @param cls closure
- * @param peer (claimed) identity of the other peer
- * @param message the message
- */
-typedef void
-(*GNUNET_TRANSPORT_ReceiveCallback) (void *cls,
-                                     const struct GNUNET_PeerIdentity *peer,
-                                     const struct GNUNET_MessageHeader *message);
-
-
 /**
  * Opaque handle to the service.
  */
-struct GNUNET_TRANSPORT_Handle;
+struct GNUNET_TRANSPORT_CoreHandle;
 
 
 /**
- * Function called to notify CORE service that another
- * @a peer connected to us.
+ * Function called to notify transport users that another
+ * peer connected to us.
  *
  * @param cls closure
- * @param peer the peer that connected, never NULL
- * @param mq message queue for sending messages to this peer
+ * @param peer the peer that connected
+ * @param mq message queue to use to transmit to @a peer
+ * @return closure to use in MQ handlers
  */
-typedef void
-(*GNUNET_TRANSPORT_NotifyConnect) (void *cls,
+typedef void *
+(*GNUNET_TRANSPORT_NotifyConnecT) (void *cls,
                                    const struct GNUNET_PeerIdentity *peer,
                                    struct GNUNET_MQ_Handle *mq);
 
 
 /**
- * Function called to notify CORE service that another
- * @a peer disconnected from us.  The associated message
- * queue must not be used henceforth.
+ * Function called to notify transport users that another peer
+ * disconnected from us.  The message queue that was given to the
+ * connect notification will be destroyed and must not be used
+ * henceforth.
  *
- * @param cls closure
- * @param peer the peer that disconnected, never NULL
+ * @param cls closure from #GNUNET_TRANSPORT_connecT
+ * @param peer the peer that disconnected
+ * @param handlers_cls closure of the handlers, was returned from the
+ *                    connect notification callback
  */
 typedef void
-(*GNUNET_TRANSPORT_NotifyDisconnect) (void *cls,
-                                      const struct GNUNET_PeerIdentity *peer);
+(*GNUNET_TRANSPORT_NotifyDisconnecT) (void *cls,
+                                      const struct GNUNET_PeerIdentity *peer,
+                                      void *handler_cls);
 
 
 /**
@@ -108,34 +100,41 @@ typedef void
  *
  * @param cls the closure
  * @param neighbour peer that we have excess bandwidth to
+ * @param handlers_cls closure of the handlers, was returned from the
+ *                    connect notification callback
  */
 typedef void
-(*GNUNET_TRANSPORT_NotifyExcessBandwidth)(void *cls,
-                                          const struct GNUNET_PeerIdentity *neighbour);
+(*GNUNET_TRANSPORT_NotifyExcessBandwidtH)(void *cls,
+                                          const struct GNUNET_PeerIdentity *neighbour,
+                                          void *handlers_cls);
+
 
 
 /**
- * Connect to the transport service.
+ * Connect to the transport service.  Note that the connection may
+ * complete (or fail) asynchronously.
  *
  * @param cfg configuration to use
- * @param self our own identity (if API should check that it matches
+ * @param self our own identity (API should check that it matches
  *             the identity found by transport), or NULL (no check)
- * @param cls closure for the callbacks
- * @param rec_handlers NULL-terminated array of handlers for incoming
- *                     messages, or NULL
+ * @param handlers array of message handlers; note that the
+ *                 closures provided will be ignored and replaced
+ *                 with the respective return value from @a nc
+ * @param handlers array with handlers to call when we receive messages, or NULL
+ * @param cls closure for the @a nc, @a nd and @a neb callbacks
  * @param nc function to call on connect events, or NULL
  * @param nd function to call on disconnect events, or NULL
- * @param neb function to call if we have excess bandwidth to a peer
+ * @param neb function to call if we have excess bandwidth to a peer, or NULL
  * @return NULL on error
  */
-struct GNUNET_TRANSPORT_Handle *
+struct GNUNET_TRANSPORT_CoreHandle *
 GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                const struct GNUNET_PeerIdentity *self,
+                               const struct GNUNET_MQ_MessageHandler *handlers,
                                void *cls,
-                               GNUNET_MQ_MessageHandler *rec_handlers,
-                               GNUNET_TRANSPORT_NotifyConnect nc,
-                               GNUNET_TRANSPORT_NotifyDisconnect nd,
-                               GNUNET_TRANSPORT_NotifyExcessBandwidth neb);
+                               GNUNET_TRANSPORT_NotifyConnecT nc,
+                               GNUNET_TRANSPORT_NotifyDisconnecT nd,
+                               GNUNET_TRANSPORT_NotifyExcessBandwidtH neb);
 
 
 /**
@@ -144,22 +143,19 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
  * @param handle handle returned from connect
  */
 void
-GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle);
+GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle);
 
 
 /**
- * Checks if a given peer is connected to us. Convenience
- * API in case a client does not track connect/disconnect
- * events internally.
+ * Checks if a given peer is connected to us and get the message queue.
  *
  * @param handle connection to transport service
  * @param peer the peer to check
- * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
+ * @return NULL if disconnected, otherwise message queue for @a peer
  */
-int
-GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
-                                       const struct GNUNET_PeerIdentity *peer);
-
+struct GNUNET_MQ_Handle *
+GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
+                              const struct GNUNET_PeerIdentity *peer);
 
 
 #if 0                           /* keep Emacsens' auto-indent happy */
index 96182424ed66ce59d271baefb0ae26f16f2c527d..b40763b9272ab89bd8aac61731c2ff59e920e700 100644 (file)
@@ -65,12 +65,6 @@ typedef void
                                      const struct GNUNET_MessageHeader *message);
 
 
-/**
- * Opaque handle to the service.
- */
-struct GNUNET_TRANSPORT_Handle;
-
-
 /**
  * Function called to notify transport users that another
  * peer connected to us.
@@ -82,6 +76,7 @@ typedef void
 (*GNUNET_TRANSPORT_NotifyConnect) (void *cls,
                                    const struct GNUNET_PeerIdentity *peer);
 
+
 /**
  * Function called to notify transport users that another
  * peer disconnected from us.
@@ -288,13 +283,13 @@ struct GNUNET_TRANSPORT_GetHelloHandle;
  * Obtain updates on changes to the HELLO message for this peer. The callback
  * given in this function is never called synchronously.
  *
- * @param handle connection to transport service
+ * @param cfg configuration
  * @param rec function to call with the HELLO
  * @param rec_cls closure for @a rec
  * @return handle to cancel the operation
  */
 struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
                             void *rec_cls);
 
@@ -319,7 +314,7 @@ struct GNUNET_TRANSPORT_OfferHelloHandle;
  * the transport service may just ignore this message if the HELLO is
  * malformed or useless due to our local configuration.
  *
- * @param handle connection to transport service
+ * @param cfg configuration
  * @param hello the hello message
  * @param cont continuation to call when HELLO has been sent,
  *      tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
@@ -330,7 +325,7 @@ struct GNUNET_TRANSPORT_OfferHelloHandle;
  *
  */
 struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
                               const struct GNUNET_MessageHeader *hello,
                               GNUNET_SCHEDULER_TaskCallback cont,
                               void *cont_cls);
index b6aa224fd3ac54e0a0bfc3824886d9da8b0b0cbd..14f1e46041468046257c34cec33db53bba648541 100644 (file)
@@ -182,11 +182,6 @@ static struct GNUNET_SCHEDULER_Task * tt;
  */
 static struct GNUNET_TRANSPORT_GetHelloHandle *gh;
 
-/**
- * Connection to transport service.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
 /**
  * Current iterator context (if active, otherwise NULL).
  */
@@ -641,11 +636,6 @@ shutdown_task (void *cls)
     GNUNET_TRANSPORT_get_hello_cancel (gh);
     gh = NULL;
   }
-  if (NULL != transport)
-  {
-    GNUNET_TRANSPORT_disconnect (transport);
-    transport = NULL;
-  }
   while (NULL != (pc = pc_head))
   {
     GNUNET_CONTAINER_DLL_remove (pc_head,
@@ -702,8 +692,6 @@ hello_callback (void *cls,
                                       &my_peer_identity));
   GNUNET_TRANSPORT_get_hello_cancel (gh);
   gh = NULL;
-  GNUNET_TRANSPORT_disconnect (transport);
-  transport = NULL;
   if (NULL != dump_hello)
     dump_my_hello ();
   tt = GNUNET_SCHEDULER_add_now (&state_machine, NULL);
@@ -740,11 +728,7 @@ testservice_task (void *cls,
        (GNUNET_YES == get_uri) ||
        (NULL != dump_hello) )
   {
-    transport = GNUNET_TRANSPORT_connect (cfg,
-                                          NULL,
-                                          NULL,
-                                          NULL, NULL, NULL);
-    gh = GNUNET_TRANSPORT_get_hello (transport,
+    gh = GNUNET_TRANSPORT_get_hello (cfg,
                                      &hello_callback,
                                      NULL);
   }
index 0fa2a64567500eea83f96cfcc62f9d4f555bc9dc..47b6fab0801af6662b39c317da961d91909b864a 100644 (file)
@@ -463,7 +463,8 @@ connection_ready (void *cls)
           entry->handle_core,
           entry->handle_transport,
           entry->handle_ats_connectivity,
-          entry->peer_identity);
+          entry->peer_identity,
+          entry->cfg);
 }
 
 
index 58942184009adf37d8639da80fe43f48569a2603..54b37f6d55cf47d9c16cef65857e62327462d132 100644 (file)
@@ -85,13 +85,15 @@ GST_connection_pool_destroy (void);
  * @param ac the handle to ATS, can be NULL if it is not requested
  * @param peer_id the identity of the peer. Will be NULL if ch is NULL. In other
  *          cases, its value being NULL means that CORE connection has failed.
+ * @param cfg configuration of the peer
  */
 typedef void
 (*GST_connection_pool_connection_ready_cb) (void *cls,
                                             struct GNUNET_CORE_Handle *ch,
                                             struct GNUNET_TRANSPORT_Handle *th,
                                             struct GNUNET_ATS_ConnectivityHandle *ac,
-                                            const struct GNUNET_PeerIdentity *peer_id);
+                                            const struct GNUNET_PeerIdentity *peer_id,
+                                            const struct GNUNET_CONFIGURATION_Handle *cfg);
 
 
 /**
index de462da7a52d45945483ab9550830d3695ce2002..8902a359c89ee6378dc3c8201c9223123af1cb90 100644 (file)
@@ -48,6 +48,11 @@ struct ConnectivitySuggestContext
    */
   struct GNUNET_TRANSPORT_Handle *th_;
 
+  /**
+   * Configuration of the peer from cache. Do not free!
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
   /**
    * The GetCacheHandle for the peer2's transport handle
    * (used to offer the HELLO to the peer).
@@ -699,13 +704,15 @@ overlay_connect_notify (void *cls,
  * @param th the handle to TRANSPORT. Can be NULL if it is not requested
  * @param ac the handle to ATS. Can be NULL if it is not requested
  * @param my_identity the identity of our peer
+ * @param cfg configuration of the peer
  */
 static void
 occ_cache_get_handle_ats_occ_cb (void *cls,
                                  struct GNUNET_CORE_Handle *ch,
                                  struct GNUNET_TRANSPORT_Handle *th,
                                  struct GNUNET_ATS_ConnectivityHandle *ac,
-                                 const struct GNUNET_PeerIdentity *my_identity)
+                                 const struct GNUNET_PeerIdentity *my_identity,
+                                 const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct OverlayConnectContext *occ = cls;
   struct LocalPeer2Context *lp2c;
@@ -754,7 +761,8 @@ occ_cache_get_handle_ats_rocc_cb (void *cls,
                                   struct GNUNET_CORE_Handle *ch,
                                   struct GNUNET_TRANSPORT_Handle *th,
                                   struct GNUNET_ATS_ConnectivityHandle *ac,
-                                  const struct GNUNET_PeerIdentity *my_identity)
+                                  const struct GNUNET_PeerIdentity *my_identity,
+                                  const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct RemoteOverlayConnectCtx *rocc = cls;
 
@@ -896,7 +904,7 @@ send_hello (void *cls)
              other_peer_str);
   GNUNET_free (other_peer_str);
   lp2c->ohh =
-      GNUNET_TRANSPORT_offer_hello (lp2c->tcc.th_,
+      GNUNET_TRANSPORT_offer_hello (lp2c->tcc.cfg,
                                     occ->hello,
                                     occ_hello_sent_cb,
                                     occ);
@@ -922,13 +930,15 @@ send_hello (void *cls)
  * @param th the handle to TRANSPORT. Can be NULL if it is not requested
  * @param ac the handle to ATS. Can be NULL if it is not requested
  * @param ignore_ peer identity which is ignored in this callback
- */
+ * @param cfg configuration of the peer
+*/
 static void
 p2_transport_connect_cache_callback (void *cls,
                                      struct GNUNET_CORE_Handle *ch,
                                      struct GNUNET_TRANSPORT_Handle *th,
                                      struct GNUNET_ATS_ConnectivityHandle *ac,
-                                     const struct GNUNET_PeerIdentity *ignore_)
+                                     const struct GNUNET_PeerIdentity *ignore_,
+                                     const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct OverlayConnectContext *occ = cls;
 
@@ -945,6 +955,7 @@ p2_transport_connect_cache_callback (void *cls,
     return;
   }
   occ->p2ctx.local.tcc.th_ = th;
+  occ->p2ctx.local.tcc.cfg = cfg;
   GNUNET_asprintf (&occ->emsg,
                    "0x%llx: Timeout while offering HELLO to %s",
                    occ->op_id,
@@ -1068,7 +1079,8 @@ p1_transport_connect_cache_callback (void *cls,
                                      struct GNUNET_CORE_Handle *ch,
                                      struct GNUNET_TRANSPORT_Handle *th,
                                      struct GNUNET_ATS_ConnectivityHandle *ac,
-                                     const struct GNUNET_PeerIdentity *ignore_)
+                                     const struct GNUNET_PeerIdentity *ignore_,
+                                     const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct OverlayConnectContext *occ = cls;
 
@@ -1092,7 +1104,7 @@ p1_transport_connect_cache_callback (void *cls,
                    "0x%llx: Timeout while acquiring HELLO of peer %s",
                    occ->op_id,
                    GNUNET_i2s (&occ->peer_identity));
-  occ->ghh = GNUNET_TRANSPORT_get_hello (occ->p1th_,
+  occ->ghh = GNUNET_TRANSPORT_get_hello (cfg,
                                          &hello_update_cb,
                                          occ);
 }
@@ -1112,7 +1124,8 @@ occ_cache_get_handle_core_cb (void *cls,
                               struct GNUNET_CORE_Handle *ch,
                               struct GNUNET_TRANSPORT_Handle *th,
                               struct GNUNET_ATS_ConnectivityHandle *ac,
-                              const struct GNUNET_PeerIdentity *my_identity)
+                              const struct GNUNET_PeerIdentity *my_identity,
+                              const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct OverlayConnectContext *occ = cls;
   const struct GNUNET_MessageHeader *hello;
@@ -1743,7 +1756,7 @@ attempt_connect_task (void *cls)
              GNUNET_i2s (&rocc->a_id),
              rocc->peer->id);
   rocc->ohh =
-      GNUNET_TRANSPORT_offer_hello (rocc->tcc.th_,
+      GNUNET_TRANSPORT_offer_hello (rocc->tcc.cfg,
                                     rocc->hello,
                                     &rocc_hello_sent_cb,
                                     rocc);
@@ -1772,7 +1785,8 @@ rocc_cache_get_handle_transport_cb (void *cls,
                                     struct GNUNET_CORE_Handle *ch,
                                     struct GNUNET_TRANSPORT_Handle *th,
                                     struct GNUNET_ATS_ConnectivityHandle *ac,
-                                    const struct GNUNET_PeerIdentity *ignore_)
+                                    const struct GNUNET_PeerIdentity *ignore_,
+                                    const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct RemoteOverlayConnectCtx *rocc = cls;
 
@@ -1783,6 +1797,7 @@ rocc_cache_get_handle_transport_cb (void *cls,
     return;
   }
   rocc->tcc.th_ = th;
+  rocc->tcc.cfg = cfg;
   if (GNUNET_YES ==
       GNUNET_TRANSPORT_check_peer_connected (rocc->tcc.th_,
                                              &rocc->a_id))
index eddac8c8ad01e35fcb443cb038f2550632160ef2..9baaf513d58669957c7307b0303d27223be9f581 100644 (file)
@@ -141,11 +141,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
  */
 static struct GNUNET_CORE_Handle *handle;
 
-/**
- * Handle to the TRANSPORT service.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
 /**
  * Handle to the ATS service.
  */
@@ -179,6 +174,11 @@ static struct GNUNET_TRANSPORT_Blacklist *blacklist;
  */
 static struct GNUNET_SCHEDULER_Task *add_task;
 
+/**
+ * Active HELLO offering to transport service.
+ */
+static struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
+
 /**
  * Flag to disallow non-friend connections (pure F2F mode).
  */
@@ -1007,6 +1007,16 @@ read_friends_file (const struct GNUNET_CONFIGURATION_Handle *cfg)
 }
 
 
+/**
+ * Hello offer complete. Clean up.
+ */
+static void
+done_offer_hello (void *cls)
+{
+  oh = NULL;
+}
+
+
 /**
  * This function is called whenever an encrypted HELLO message is
  * received.
@@ -1055,11 +1065,12 @@ handle_encrypted_hello (void *cls,
         (friend_count < minimum_friend_count))
       return GNUNET_OK;
   }
-  if (NULL != transport)
-    GNUNET_TRANSPORT_offer_hello (transport,
-                                  message,
-                                  NULL,
-                                  NULL);
+  if (NULL != oh)
+    GNUNET_TRANSPORT_offer_hello_cancel (oh);
+  oh = GNUNET_TRANSPORT_offer_hello (cfg,
+                                     message,
+                                     &done_offer_hello,
+                                     NULL);
   return GNUNET_OK;
 }
 
@@ -1136,11 +1147,6 @@ cleaning_task (void *cls)
     GNUNET_PEERINFO_notify_cancel (peerinfo_notify);
     peerinfo_notify = NULL;
   }
-  if (NULL != transport)
-  {
-    GNUNET_TRANSPORT_disconnect (transport);
-    transport = NULL;
-  }
   if (NULL != handle)
   {
     GNUNET_CORE_disconnect (handle);
@@ -1152,6 +1158,11 @@ cleaning_task (void *cls)
     GNUNET_SCHEDULER_cancel (add_task);
     add_task = NULL;
   }
+  if (NULL != oh)
+  {
+    GNUNET_TRANSPORT_offer_hello_cancel (oh);
+    oh = NULL;
+  }
   GNUNET_CONTAINER_multipeermap_iterate (peers,
                                          &free_peer,
                                          NULL);
@@ -1223,12 +1234,6 @@ run (void *cls,
                                             &blacklist_check,
                                             NULL);
   ats = GNUNET_ATS_connectivity_init (cfg);
-  transport = GNUNET_TRANSPORT_connect (cfg,
-                                        NULL,
-                                        NULL,
-                                        NULL,
-                                        NULL,
-                                        NULL);
   handle =
       GNUNET_CORE_connect (cfg, NULL,
                            &core_init,
@@ -1239,14 +1244,6 @@ run (void *cls,
                            handlers);
   GNUNET_SCHEDULER_add_shutdown (&cleaning_task,
                                 NULL);
-  if (NULL == transport)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Failed to connect to `%s' service.\n"),
-                "transport");
-    GNUNET_SCHEDULER_shutdown ();
-    return;
-  }
   if (NULL == handle)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
index 3a1170c102bf0dbc21e5446a17ec16a33af85bf2..48793bd879eb300f6dc6b2eddf0ad235fd2e4418 100644 (file)
@@ -163,10 +163,12 @@ libgnunettransporttesting_la_LDFLAGS = \
 
 libgnunettransport_la_SOURCES = \
   transport_api.c transport.h \
-  transport_api_blacklist.c \
   transport_api_address_to_string.c \
+  transport_api_blacklist.c \
+  transport_api_get_hello.c \
   transport_api_monitor_peers.c \
-  transport_api_monitor_plugins.c
+  transport_api_monitor_plugins.c \
+  transport_api_offer_hello.c
 
 libgnunettransport_la_LIBADD = \
   $(top_builddir)/src/hello/libgnunethello.la \
index 4a514ea724cbfe404b15da11bd8524c1824641b7..4a3bf3c3e78fd31797068687af6bc45e1f27bbac 100644 (file)
@@ -246,7 +246,7 @@ offer_hello (void *cls)
   if (NULL != cc->oh)
     GNUNET_TRANSPORT_offer_hello_cancel (cc->oh);
   cc->oh =
-    GNUNET_TRANSPORT_offer_hello (cc->p1->th,
+    GNUNET_TRANSPORT_offer_hello (cc->p1->cfg,
                                   (const struct GNUNET_MessageHeader *) cc->p2->hello,
                                   &hello_offered,
                                   cc);
@@ -380,7 +380,7 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_handle *tth
     GNUNET_TRANSPORT_TESTING_stop_peer (tth, p);
     return NULL;
   }
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th,
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
                                        &get_hello,
                                        p);
   GNUNET_assert (p->ghh != NULL);
@@ -465,7 +465,7 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct PeerContext *p,
                                     &notify_disconnect);
   GNUNET_assert (NULL != p->th);
   p->ats = GNUNET_ATS_connectivity_init (p->cfg);
-  p->ghh = GNUNET_TRANSPORT_get_hello (p->th,
+  p->ghh = GNUNET_TRANSPORT_get_hello (p->cfg,
                                        &get_hello,
                                        p);
   GNUNET_assert (NULL != p->ghh);
index 59f249686f19d8fd621e0d0df9170c38a79de4b4..e7db5493eaf3c29dc29e317ae12719e0616dd696 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009-2013 GNUnet e.V.
+     Copyright (C) 2009-2013, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -163,86 +163,6 @@ struct Neighbour
 };
 
 
-/**
- * Linked list of functions to call whenever our HELLO is updated.
- */
-struct GNUNET_TRANSPORT_GetHelloHandle
-{
-
-  /**
-   * This is a doubly linked list.
-   */
-  struct GNUNET_TRANSPORT_GetHelloHandle *next;
-
-  /**
-   * This is a doubly linked list.
-   */
-  struct GNUNET_TRANSPORT_GetHelloHandle *prev;
-
-  /**
-   * Transport handle.
-   */
-  struct GNUNET_TRANSPORT_Handle *handle;
-
-  /**
-   * Callback to call once we got our HELLO.
-   */
-  GNUNET_TRANSPORT_HelloUpdateCallback rec;
-
-  /**
-   * Task for calling the HelloUpdateCallback when we already have a HELLO
-   */
-  struct GNUNET_SCHEDULER_Task *notify_task;
-
-  /**
-   * Closure for @e rec.
-   */
-  void *rec_cls;
-
-};
-
-
-/**
- * Entry in linked list for all offer-HELLO requests.
- */
-struct GNUNET_TRANSPORT_OfferHelloHandle
-{
-  /**
-   * For the DLL.
-   */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
-
-  /**
-   * For the DLL.
-   */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *next;
-
-  /**
-   * Transport service handle we use for transmission.
-   */
-  struct GNUNET_TRANSPORT_Handle *th;
-
-  /**
-   * Transmission handle for this request.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *tth;
-
-  /**
-   * Function to call once we are done.
-   */
-  GNUNET_SCHEDULER_TaskCallback cont;
-
-  /**
-   * Closure for @e cont
-   */
-  void *cls;
-
-  /**
-   * The HELLO message to be transmitted.
-   */
-  struct GNUNET_MessageHeader *msg;
-};
-
 
 /**
  * Handle for the transport service (includes all of the
@@ -276,16 +196,6 @@ struct GNUNET_TRANSPORT_Handle
    */
   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
 
-  /**
-   * Head of DLL of control messages.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *control_head;
-
-  /**
-   * Tail of DLL of control messages.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
-
   /**
    * The current HELLO message for this peer.  Updated
    * whenever transports change their addresses.
@@ -295,32 +205,7 @@ struct GNUNET_TRANSPORT_Handle
   /**
    * My client connection to the transport service.
    */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * Handle to our registration with the client for notification.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *cth;
-
-  /**
-   * Linked list of pending requests for our HELLO.
-   */
-  struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
-
-  /**
-   * Linked list of pending requests for our HELLO.
-   */
-  struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
-
-  /**
-   * Linked list of pending offer HELLO requests head
-   */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
-
-  /**
-   * Linked list of pending offer HELLO requests tail
-   */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * My configuration.
@@ -458,7 +343,8 @@ outbound_bw_tracker_update (void *cls)
        GNUNET_STRINGS_relative_time_to_string (delay,
                                                GNUNET_NO));
   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
-      n->hn, delay.rel_value_us);
+                                     n->hn,
+                                     delay.rel_value_us);
   schedule_transmission (n->h);
 }
 
@@ -558,268 +444,296 @@ neighbour_delete (void *cls,
 
 
 /**
- * Function we use for handling incoming messages.
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Error receiving from transport service, disconnecting temporarily.\n");
+  h->reconnecting = GNUNET_YES;
+  disconnect_and_schedule_reconnect (h);
+}
+
+
+/**
+ * Function we use for checking incoming HELLO messages.
  *
  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
- * @param msg message received, NULL on timeout or fatal error
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_hello (void *cls,
+             const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_PeerIdentity me;
+
+  if (GNUNET_OK !=
+      GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+                           &me))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
+       (unsigned int) ntohs (msg->size),
+       GNUNET_i2s (&me));
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function we use for handling incoming HELLO messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
  */
 static void
-demultiplexer (void *cls,
-               const struct GNUNET_MessageHeader *msg)
+handle_hello (void *cls,
+              const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+
+  GNUNET_free_non_null (h->my_hello);
+  h->my_hello = GNUNET_copy_message (msg);
+}
+
+
+/**
+ * Function we use for handling incoming connect messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param cim message received
+ */
+static void
+handle_connect (void *cls,
+                const struct ConnectInfoMessage *cim)
+{
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  struct Neighbour *n;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Receiving CONNECT message for `%s'.\n",
+       GNUNET_i2s (&cim->id));
+  n = neighbour_find (h, &cim->id);
+  if (NULL != n)
+  {
+    GNUNET_break (0);
+    h->reconnecting = GNUNET_YES;
+    disconnect_and_schedule_reconnect (h);
+    return;
+  }
+  n = neighbour_add (h,
+                     &cim->id);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Receiving CONNECT message for `%s' with quota %u\n",
+       GNUNET_i2s (&cim->id),
+       ntohl (cim->quota_out.value__));
+  GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+                                         cim->quota_out);
+  if (NULL != h->nc_cb)
+    h->nc_cb (h->cls,
+              &n->id);
+}
+
+
+/**
+ * Function we use for handling incoming disconnect messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param dim message received
+ */
+static void
+handle_disconnect (void *cls,
+                   const struct DisconnectInfoMessage *dim)
+{
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  struct Neighbour *n;
+
+  GNUNET_break (ntohl (dim->reserved) == 0);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Receiving DISCONNECT message for `%s'.\n",
+       GNUNET_i2s (&dim->peer));
+  n = neighbour_find (h, &dim->peer);
+  if (NULL == n)
+  {
+    GNUNET_break (0);
+    h->reconnecting = GNUNET_YES;
+    disconnect_and_schedule_reconnect (h);
+    return;
+  }
+  neighbour_delete (h,
+                    &dim->peer,
+                    n);
+}
+
+
+/**
+ * Function we use for handling incoming send-ok messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param okm message received
+ */
+static void
+handle_send_ok (void *cls,
+                const struct SendOkMessage *okm)
 {
   struct GNUNET_TRANSPORT_Handle *h = cls;
-  const struct DisconnectInfoMessage *dim;
-  const struct ConnectInfoMessage *cim;
-  const struct InboundMessage *im;
-  const struct GNUNET_MessageHeader *imm;
-  const struct SendOkMessage *okm;
-  const struct QuotaSetMessage *qm;
-  struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
-  struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
   struct Neighbour *n;
-  struct GNUNET_PeerIdentity me;
-  uint16_t size;
   uint32_t bytes_msg;
   uint32_t bytes_physical;
 
-  GNUNET_assert (NULL != h->client);
-  if (GNUNET_YES == h->reconnecting)
+  bytes_msg = ntohl (okm->bytes_msg);
+  bytes_physical = ntohl (okm->bytes_physical);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Receiving SEND_OK message, transmission to %s %s.\n",
+       GNUNET_i2s (&okm->peer),
+       ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+
+  n = neighbour_find (h,
+                      &okm->peer);
+  if (NULL == n)
   {
+    /* We should never get a 'SEND_OK' for a peer that we are not
+       connected to */
+    GNUNET_break (0);
+    h->reconnecting = GNUNET_YES;
+    disconnect_and_schedule_reconnect (h);
     return;
   }
-  if (NULL == msg)
+  if (bytes_physical > bytes_msg)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Error receiving from transport service, disconnecting temporarily.\n");
+         "Overhead for %u byte message was %u\n",
+         bytes_msg,
+         bytes_physical - bytes_msg);
+    n->traffic_overhead += bytes_physical - bytes_msg;
+  }
+  GNUNET_break (GNUNET_NO == n->is_ready);
+  n->is_ready = GNUNET_YES;
+  if (NULL != n->unready_warn_task)
+  {
+    GNUNET_SCHEDULER_cancel (n->unready_warn_task);
+    n->unready_warn_task = NULL;
+  }
+  if ((NULL != n->th) && (NULL == n->hn))
+  {
+    GNUNET_assert (NULL != n->th->timeout_task);
+    GNUNET_SCHEDULER_cancel (n->th->timeout_task);
+    n->th->timeout_task = NULL;
+    /* we've been waiting for this (congestion, not quota,
+     * caused delayed transmission) */
+    n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
+                                          n,
+                                          0);
+  }
+  schedule_transmission (h);
+}
+
+
+/**
+ * Function we use for checking incoming "inbound" messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param im message received
+ */
+static int
+check_recv (void *cls,
+             const struct InboundMessage *im)
+{
+  const struct GNUNET_MessageHeader *imm;
+  uint16_t size;
+
+  size = ntohs (im->header.size);
+  if (size <
+      sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  imm = (const struct GNUNET_MessageHeader *) &im[1];
+  if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function we use for handling incoming messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param im message received
+ */
+static void
+handle_recv (void *cls,
+             const struct InboundMessage *im)
+{
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  const struct GNUNET_MessageHeader *imm
+    = (const struct GNUNET_MessageHeader *) &im[1];
+  struct Neighbour *n;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received message of type %u with %u bytes from `%s'.\n",
+       (unsigned int) ntohs (imm->type),
+       (unsigned int) ntohs (imm->size),
+       GNUNET_i2s (&im->peer));
+  n = neighbour_find (h, &im->peer);
+  if (NULL == n)
+  {
+    GNUNET_break (0);
     h->reconnecting = GNUNET_YES;
     disconnect_and_schedule_reconnect (h);
     return;
   }
-  GNUNET_CLIENT_receive (h->client,
-                         &demultiplexer,
-                         h,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  size = ntohs (msg->size);
-  switch (ntohs (msg->type))
-  {
-  case GNUNET_MESSAGE_TYPE_HELLO:
-    if (GNUNET_OK !=
-        GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
-                             &me))
-    {
-      GNUNET_break (0);
-      break;
-    }
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
-         (unsigned int) size,
-         GNUNET_i2s (&me));
-    GNUNET_free_non_null (h->my_hello);
-    h->my_hello = NULL;
-    if (size < sizeof (struct GNUNET_MessageHeader))
-    {
-      GNUNET_break (0);
-      break;
-    }
-    h->my_hello = GNUNET_copy_message (msg);
-    hwl = h->hwl_head;
-    while (NULL != hwl)
-    {
-      next_hwl = hwl->next;
-      hwl->rec (hwl->rec_cls,
-                h->my_hello);
-      hwl = next_hwl;
-    }
-    break;
-  case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
-    if (size < sizeof (struct ConnectInfoMessage))
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    cim = (const struct ConnectInfoMessage *) msg;
-    if (size !=
-        sizeof (struct ConnectInfoMessage))
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving CONNECT message for `%s'.\n",
-         GNUNET_i2s (&cim->id));
-    n = neighbour_find (h, &cim->id);
-    if (NULL != n)
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    n = neighbour_add (h,
-                       &cim->id);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving CONNECT message for `%s' with quota %u\n",
-         GNUNET_i2s (&cim->id),
-         ntohl (cim->quota_out.value__));
-    GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
-                                           cim->quota_out);
-    if (NULL != h->nc_cb)
-      h->nc_cb (h->cls,
-                &n->id);
-    break;
-  case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
-    if (size != sizeof (struct DisconnectInfoMessage))
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    dim = (const struct DisconnectInfoMessage *) msg;
-    GNUNET_break (ntohl (dim->reserved) == 0);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving DISCONNECT message for `%s'.\n",
-         GNUNET_i2s (&dim->peer));
-    n = neighbour_find (h, &dim->peer);
-    if (NULL == n)
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    neighbour_delete (h,
-                      &dim->peer,
-                      n);
-    break;
-  case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
-    if (size != sizeof (struct SendOkMessage))
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    okm = (const struct SendOkMessage *) msg;
-    bytes_msg = ntohl (okm->bytes_msg);
-    bytes_physical = ntohl (okm->bytes_physical);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving SEND_OK message, transmission to %s %s.\n",
-         GNUNET_i2s (&okm->peer),
-         ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+  if (NULL != h->rec)
+    h->rec (h->cls,
+            &im->peer,
+            imm);
+}
 
-    n = neighbour_find (h,
-                        &okm->peer);
-    if (NULL == n)
-    {
-      /* We should never get a 'SEND_OK' for a peer that we are not
-         connected to */
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    if (bytes_physical > bytes_msg)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Overhead for %u byte message was %u\n",
-           bytes_msg,
-           bytes_physical - bytes_msg);
-      n->traffic_overhead += bytes_physical - bytes_msg;
-    }
-    GNUNET_break (GNUNET_NO == n->is_ready);
-    n->is_ready = GNUNET_YES;
-    if (NULL != n->unready_warn_task)
-    {
-      GNUNET_SCHEDULER_cancel (n->unready_warn_task);
-      n->unready_warn_task = NULL;
-    }
-    if ((NULL != n->th) && (NULL == n->hn))
-    {
-      GNUNET_assert (NULL != n->th->timeout_task);
-      GNUNET_SCHEDULER_cancel (n->th->timeout_task);
-      n->th->timeout_task = NULL;
-      /* we've been waiting for this (congestion, not quota,
-       * caused delayed transmission) */
-      n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
-                                            n,
-                                            0);
-    }
-    schedule_transmission (h);
-    break;
-  case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
-    if (size <
-        sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    im = (const struct InboundMessage *) msg;
-    imm = (const struct GNUNET_MessageHeader *) &im[1];
-    if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Received message of type %u with %u bytes from `%s'.\n",
-         (unsigned int) ntohs (imm->type),
-         (unsigned int) ntohs (imm->size),
-         GNUNET_i2s (&im->peer));
-    n = neighbour_find (h, &im->peer);
-    if (NULL == n)
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    if (NULL != h->rec)
-      h->rec (h->cls,
-              &im->peer,
-              imm);
-    break;
-  case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
-    if (size != sizeof (struct QuotaSetMessage))
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    qm = (const struct QuotaSetMessage *) msg;
-    n = neighbour_find (h, &qm->peer);
-    if (NULL == n)
-    {
-      GNUNET_break (0);
-      h->reconnecting = GNUNET_YES;
-      disconnect_and_schedule_reconnect (h);
-      break;
-    }
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Receiving SET_QUOTA message for `%s' with quota %u\n",
-         GNUNET_i2s (&qm->peer),
-         ntohl (qm->quota.value__));
-    GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
-                                           qm->quota);
-    break;
-  default:
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         _("Received unexpected message of type %u in %s:%u\n"),
-         ntohs (msg->type),
-         __FILE__,
-         __LINE__);
+
+/**
+ * Function we use for handling incoming set quota messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
+ */
+static void
+handle_set_quota (void *cls,
+                  const struct QuotaSetMessage *qm)
+{
+  struct GNUNET_TRANSPORT_Handle *h = cls;
+  struct Neighbour *n;
+
+  n = neighbour_find (h, &qm->peer);
+  if (NULL == n)
+  {
     GNUNET_break (0);
-    break;
+    h->reconnecting = GNUNET_YES;
+    disconnect_and_schedule_reconnect (h);
+    return;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Receiving SET_QUOTA message for `%s' with quota %u\n",
+       GNUNET_i2s (&qm->peer),
+       ntohl (qm->quota.value__));
+  GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
+                                         qm->quota);
 }
 
 
@@ -854,104 +768,53 @@ timeout_request_due_to_congestion (void *cls)
 
 
 /**
- * Transmit message(s) to service.
+ * Transmit ready message(s) to service.
  *
- * @param cls handle to transport
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
+ * @param h handle to transport
  */
-static size_t
-transport_notify_ready (void *cls,
-                        size_t size,
-                        void *buf)
+static void
+transmit_ready (struct GNUNET_TRANSPORT_Handle *h)
 {
-  struct GNUNET_TRANSPORT_Handle *h = cls;
   struct GNUNET_TRANSPORT_TransmitHandle *th;
   struct GNUNET_TIME_Relative delay;
   struct Neighbour *n;
-  char *cbuf;
-  struct OutboundMessage obm;
-  size_t ret;
-  size_t nret;
+  struct OutboundMessage *obm;
+  struct GNUNET_MQ_Envelope *env;
   size_t mret;
 
-  GNUNET_assert (NULL != h->client);
-  h->cth = NULL;
-  if (NULL == buf)
-  {
-    /* transmission failed */
-    disconnect_and_schedule_reconnect (h);
-    return 0;
-  }
-
-  cbuf = buf;
-  ret = 0;
-  /* first send control messages */
-  while ( (NULL != (th = h->control_head)) &&
-          (th->notify_size <= size) )
-  {
-    GNUNET_CONTAINER_DLL_remove (h->control_head,
-                                 h->control_tail,
-                                 th);
-    nret = th->notify (th->notify_cls,
-                       size,
-                       &cbuf[ret]);
-    delay = GNUNET_TIME_absolute_get_duration (th->request_start);
-    if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
-      LOG (GNUNET_ERROR_TYPE_WARNING,
-           "Added %u bytes of control message at %u after %s delay\n",
-           nret,
-           ret,
-           GNUNET_STRINGS_relative_time_to_string (delay,
-                                                   GNUNET_YES));
-    else
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Added %u bytes of control message at %u after %s delay\n",
-           nret,
-           ret,
-           GNUNET_STRINGS_relative_time_to_string (delay,
-                                                   GNUNET_YES));
-    GNUNET_free (th);
-    ret += nret;
-    size -= nret;
-  }
-
-  /* then, if possible and no control messages pending, send data messages */
-  while ( (NULL == h->control_head) &&
-          (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
+  GNUNET_assert (NULL != h->mq);
+  while (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
   {
+    th = n->th;
     if (GNUNET_YES != n->is_ready)
     {
       /* peer not ready, wait for notification! */
       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
       n->hn = NULL;
       GNUNET_assert (NULL == n->th->timeout_task);
-      n->th->timeout_task
+      th->timeout_task
         = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
-                                        (n->th->timeout),
+                                        (th->timeout),
                                         &timeout_request_due_to_congestion,
-                                        n->th);
+                                        th);
       continue;
     }
-    th = n->th;
-    if (th->notify_size + sizeof (struct OutboundMessage) > size)
-      break;                    /* does not fit */
-    if (GNUNET_BANDWIDTH_tracker_get_delay
-        (&n->out_tracker,
-         th->notify_size).rel_value_us > 0)
+    if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
+                                            th->notify_size).rel_value_us > 0)
       break;                    /* too early */
     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
     n->hn = NULL;
     n->th = NULL;
-    GNUNET_assert (size >= sizeof (struct OutboundMessage));
+    env = GNUNET_MQ_msg_extra (obm,
+                               th->notify_size,
+                               GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
     mret = th->notify (th->notify_cls,
-                       size - sizeof (struct OutboundMessage),
-                       &cbuf[ret + sizeof (struct OutboundMessage)]);
-    GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
+                       th->notify_size,
+                       &obm[1]);
     if (0 == mret)
     {
       GNUNET_free (th);
+      GNUNET_MQ_discard (env);
       continue;
     }
     if (NULL != n->unready_warn_task)
@@ -961,20 +824,13 @@ transport_notify_ready (void *cls,
                                         n);
     n->last_payload = GNUNET_TIME_absolute_get ();
     n->is_ready = GNUNET_NO;
-    GNUNET_assert (mret + sizeof (struct OutboundMessage) <
-                   GNUNET_SERVER_MAX_MESSAGE_SIZE);
-    obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
-    obm.header.size = htons (mret + sizeof (struct OutboundMessage));
-    obm.reserved = htonl (0);
-    obm.timeout =
+    obm->reserved = htonl (0);
+    obm->timeout =
       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
                                  (th->timeout));
-    obm.peer = n->id;
-    memcpy (&cbuf[ret],
-            &obm,
-            sizeof (struct OutboundMessage));
-    ret += (mret + sizeof (struct OutboundMessage));
-    size -= (mret + sizeof (struct OutboundMessage));
+    obm->peer = n->id;
+    GNUNET_MQ_send (h->mq,
+                    env);
     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
                                       mret);
     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
@@ -995,14 +851,9 @@ transport_notify_ready (void *cls,
                                                    GNUNET_YES),
            (unsigned int) n->out_tracker.available_bytes_per_s__);
     GNUNET_free (th);
-    break;
   }
   /* if there are more pending messages, try to schedule those */
   schedule_transmission (h);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting %u bytes to transport service\n",
-       ret);
-  return ret;
 }
 
 
@@ -1016,12 +867,11 @@ static void
 schedule_transmission_task (void *cls)
 {
   struct GNUNET_TRANSPORT_Handle *h = cls;
-  size_t size;
   struct GNUNET_TRANSPORT_TransmitHandle *th;
   struct Neighbour *n;
 
   h->quota_task = NULL;
-  GNUNET_assert (NULL != h->client);
+  GNUNET_assert (NULL != h->mq);
   /* destroy all requests that have timed out */
   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
           (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
@@ -1040,29 +890,12 @@ schedule_transmission_task (void *cls)
                                     NULL));
     GNUNET_free (th);
   }
-  if (NULL != h->cth)
-    return;
-  if (NULL != h->control_head)
-  {
-    size = h->control_head->notify_size;
-  }
-  else
-  {
-    n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
-    if (NULL == n)
-      return;                   /* no pending messages */
-    size = n->th->notify_size + sizeof (struct OutboundMessage);
-  }
+  n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
+  if (NULL == n)
+    return;                   /* no pending messages */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Calling notify_transmit_ready\n");
-  h->cth
-    = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                           size,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           GNUNET_NO,
-                                           &transport_notify_ready,
-                                           h);
-  GNUNET_assert (NULL != h->cth);
+  transmit_ready (h);
 }
 
 
@@ -1078,15 +911,13 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
   struct GNUNET_TIME_Relative delay;
   struct Neighbour *n;
 
-  GNUNET_assert (NULL != h->client);
+  GNUNET_assert (NULL != h->mq);
   if (NULL != h->quota_task)
   {
     GNUNET_SCHEDULER_cancel (h->quota_task);
     h->quota_task = NULL;
   }
-  if (NULL != h->control_head)
-    delay = GNUNET_TIME_UNIT_ZERO;
-  else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
+  if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
   {
     delay =
         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
@@ -1110,83 +941,6 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
 }
 
 
-/**
- * Queue control request for transmission to the transport
- * service.
- *
- * @param h handle to the transport service
- * @param size number of bytes to be transmitted
- * @param notify function to call to get the content
- * @param notify_cls closure for @a notify
- * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
- */
-static struct GNUNET_TRANSPORT_TransmitHandle *
-schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
-                           size_t size,
-                           GNUNET_TRANSPORT_TransmitReadyNotify notify,
-                           void *notify_cls)
-{
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Control transmit of %u bytes requested\n",
-       size);
-  th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
-  th->notify = notify;
-  th->notify_cls = notify_cls;
-  th->notify_size = size;
-  th->request_start = GNUNET_TIME_absolute_get ();
-  GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
-                                    h->control_tail,
-                                    th);
-  schedule_transmission (h);
-  return th;
-}
-
-
-/**
- * Transmit START message to service.
- *
- * @param cls unused
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_start (void *cls,
-            size_t size,
-            void *buf)
-{
-  struct GNUNET_TRANSPORT_Handle *h = cls;
-  struct StartMessage s;
-  uint32_t options;
-
-  if (NULL == buf)
-  {
-    /* Can only be shutdown, just give up */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Shutdown while trying to transmit START request.\n");
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting START request.\n");
-  GNUNET_assert (size >= sizeof (struct StartMessage));
-  s.header.size = htons (sizeof (struct StartMessage));
-  s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
-  options = 0;
-  if (h->check_self)
-    options |= 1;
-  if (NULL != h->rec)
-    options |= 2;
-  s.options = htonl (options);
-  s.self = h->self;
-  memcpy (buf, &s, sizeof (struct StartMessage));
-  GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  return sizeof (struct StartMessage);
-}
-
-
 /**
  * Try again to connect to transport service.
  *
@@ -1195,20 +949,61 @@ send_start (void *cls,
 static void
 reconnect (void *cls)
 {
+  GNUNET_MQ_hd_var_size (hello,
+                         GNUNET_MESSAGE_TYPE_HELLO,
+                         struct GNUNET_MessageHeader);
+  GNUNET_MQ_hd_fixed_size (connect,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
+                           struct ConnectInfoMessage);
+  GNUNET_MQ_hd_fixed_size (disconnect,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
+                           struct DisconnectInfoMessage);
+  GNUNET_MQ_hd_fixed_size (send_ok,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
+                           struct SendOkMessage);
+  GNUNET_MQ_hd_var_size (recv,
+                         GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
+                         struct InboundMessage);
+  GNUNET_MQ_hd_fixed_size (set_quota,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
+                           struct QuotaSetMessage);
   struct GNUNET_TRANSPORT_Handle *h = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_hello_handler (h),
+    make_connect_handler (h),
+    make_disconnect_handler (h),
+    make_send_ok_handler (h),
+    make_recv_handler (h),
+    make_set_quota_handler (h),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MQ_Envelope *env;
+  struct StartMessage *s;
+  uint32_t options;
 
   h->reconnect_task = NULL;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Connecting to transport service.\n");
-  GNUNET_assert (NULL == h->client);
-  GNUNET_assert (NULL == h->control_head);
-  GNUNET_assert (NULL == h->control_tail);
+  GNUNET_assert (NULL == h->mq);
   h->reconnecting = GNUNET_NO;
-  h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
-
-  GNUNET_assert (NULL != h->client);
-  schedule_control_transmit (h, sizeof (struct StartMessage),
-                             &send_start, h);
+  h->mq = GNUNET_CLIENT_connecT (h->cfg,
+                                 "transport",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+    return;
+  env = GNUNET_MQ_msg (s,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+  options = 0;
+  if (h->check_self)
+    options |= 1;
+  if (NULL != h->rec)
+    options |= 2;
+  s->options = htonl (options);
+  s->self = h->self;
+  GNUNET_MQ_send (h->mq,
+                  env);
 }
 
 
@@ -1221,20 +1016,11 @@ reconnect (void *cls)
 static void
 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-
   GNUNET_assert (NULL == h->reconnect_task);
-  if (NULL != h->cth)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
-    h->cth = NULL;
-  }
-  if (NULL != h->client)
+  if (NULL != h->mq)
   {
-    GNUNET_CLIENT_disconnect (h->client);
-    h->client = NULL;
-/*    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "Client disconnect done \n");*/
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
   /* Forget about all neighbours that we used to be connected to */
   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
@@ -1245,16 +1031,6 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
     GNUNET_SCHEDULER_cancel (h->quota_task);
     h->quota_task = NULL;
   }
-  while ((NULL != (th = h->control_head)))
-  {
-    GNUNET_CONTAINER_DLL_remove (h->control_head,
-                                 h->control_tail,
-                                 th);
-    th->notify (th->notify_cls,
-                0,
-                NULL);
-    GNUNET_free (th);
-  }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Scheduling task to reconnect to transport service in %s.\n",
        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
@@ -1268,109 +1044,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
 
 
 /**
- * Cancel control request for transmission to the transport service.
- *
- * @param th handle to the transport service
- * @param tth transmit handle to cancel
- */
-static void
-cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
-                         struct GNUNET_TRANSPORT_TransmitHandle *tth)
-{
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Canceling transmit of contral transmission requested\n");
-  GNUNET_CONTAINER_DLL_remove (th->control_head,
-                               th->control_tail,
-                               tth);
-  GNUNET_free (tth);
-}
-
-
-/**
- * Send HELLO message to the service.
- *
- * @param cls the HELLO message to send
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_hello (void *cls,
-            size_t size,
-            void *buf)
-{
-  struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
-  struct GNUNET_MessageHeader *msg = ohh->msg;
-  uint16_t ssize;
-
-  if (NULL == buf)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Timeout while trying to transmit `%s' request.\n",
-         "HELLO");
-    if (NULL != ohh->cont)
-      ohh->cont (ohh->cls);
-    GNUNET_free (msg);
-    GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
-                                 ohh->th->oh_tail,
-                                 ohh);
-    GNUNET_free (ohh);
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting `%s' request.\n",
-       "HELLO");
-  ssize = ntohs (msg->size);
-  GNUNET_assert (size >= ssize);
-  memcpy (buf,
-          msg,
-          ssize);
-  GNUNET_free (msg);
-  if (NULL != ohh->cont)
-    ohh->cont (ohh->cls);
-  GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
-                               ohh->th->oh_tail,
-                               ohh);
-  GNUNET_free (ohh);
-  return ssize;
-}
-
-
-/**
- * Send traffic metric message to the service.
- *
- * @param cls the message to send
- * @param size number of bytes available in @a buf
- * @param buf where to copy the message
- * @return number of bytes copied to @a buf
- */
-static size_t
-send_metric (void *cls,
-             size_t size,
-             void *buf)
-{
-  struct TrafficMetricMessage *msg = cls;
-  uint16_t ssize;
-
-  if (NULL == buf)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
-    GNUNET_free (msg);
-    return 0;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting TRAFFIC_METRIC request.\n");
-  ssize = ntohs (msg->header.size);
-  GNUNET_assert (size >= ssize);
-  memcpy (buf, msg, ssize);
-  GNUNET_free (msg);
-  return ssize;
-}
-
-
-/**
- * Set transport metrics for a peer and a direction
+ * Set transport metrics for a peer and a direction.
  *
  * @param handle transport handle
  * @param peer the peer to set the metric for
@@ -1388,101 +1062,21 @@ GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
                                      struct GNUNET_TIME_Relative delay_in,
                                      struct GNUNET_TIME_Relative delay_out)
 {
+  struct GNUNET_MQ_Envelope *env;
   struct TrafficMetricMessage *msg;
 
-  msg = GNUNET_new (struct TrafficMetricMessage);
-  msg->header.size = htons (sizeof (struct TrafficMetricMessage));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
+  if (NULL == handle->mq)
+    return;
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
   msg->reserved = htonl (0);
   msg->peer = *peer;
   GNUNET_ATS_properties_hton (&msg->properties,
                               prop);
   msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
   msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
-  schedule_control_transmit (handle,
-                             sizeof (struct TrafficMetricMessage),
-                             &send_metric,
-                             msg);
-}
-
-
-/**
- * Offer the transport service the HELLO of another peer.  Note that
- * the transport service may just ignore this message if the HELLO is
- * malformed or useless due to our local configuration.
- *
- * @param handle connection to transport service
- * @param hello the hello message
- * @param cont continuation to call when HELLO has been sent,
- *     tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
- *     tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
- * @param cont_cls closure for @a cont
- * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
- *      in case of failure @a cont will not be called
- *
- */
-struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
-                              const struct GNUNET_MessageHeader *hello,
-                              GNUNET_SCHEDULER_TaskCallback cont,
-                              void *cont_cls)
-{
-  struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
-  struct GNUNET_MessageHeader *msg;
-  struct GNUNET_PeerIdentity peer;
-  uint16_t size;
-
-  if (NULL == handle->client)
-    return NULL;
-  GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
-  size = ntohs (hello->size);
-  GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
-  if (GNUNET_OK !=
-      GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
-                           &peer))
-  {
-    GNUNET_break (0);
-    return NULL;
-  }
-
-  msg = GNUNET_malloc (size);
-  memcpy (msg, hello, size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Offering HELLO message of `%s' to transport for validation.\n",
-       GNUNET_i2s (&peer));
-
-  ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
-  ohh->th = handle;
-  ohh->cont = cont;
-  ohh->cls = cont_cls;
-  ohh->msg = msg;
-  ohh->tth = schedule_control_transmit (handle,
-                                        size,
-                                        &send_hello,
-                                        ohh);
-  GNUNET_CONTAINER_DLL_insert (handle->oh_head,
-                               handle->oh_tail,
-                               ohh);
-  return ohh;
-}
-
-
-/**
- * Cancel the request to transport to offer the HELLO message
- *
- * @param ohh the handle for the operation to cancel
- */
-void
-GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
-{
-  struct GNUNET_TRANSPORT_Handle *th = ohh->th;
-
-  cancel_control_transmit (ohh->th, ohh->tth);
-  GNUNET_CONTAINER_DLL_remove (th->oh_head,
-                               th->oh_tail,
-                               ohh);
-  GNUNET_free (ohh->msg);
-  GNUNET_free (ohh);
+  GNUNET_MQ_send (handle->mq,
+                  env);
 }
 
 
@@ -1505,76 +1099,6 @@ GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
 }
 
 
-/**
- * Task to call the HelloUpdateCallback of the GetHelloHandle
- *
- * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
- */
-static void
-call_hello_update_cb_async (void *cls)
-{
-  struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
-
-  GNUNET_assert (NULL != ghh->handle->my_hello);
-  GNUNET_assert (NULL != ghh->notify_task);
-  ghh->notify_task = NULL;
-  ghh->rec (ghh->rec_cls,
-            ghh->handle->my_hello);
-}
-
-
-/**
- * Obtain the HELLO message for this peer.  The callback given in this function
- * is never called synchronously.
- *
- * @param handle connection to transport service
- * @param rec function to call with the HELLO, sender will be our peer
- *            identity; message and sender will be NULL on timeout
- *            (handshake with transport service pending/failed).
- *             cost estimate will be 0.
- * @param rec_cls closure for @a rec
- * @return handle to cancel the operation
- */
-struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
-                            GNUNET_TRANSPORT_HelloUpdateCallback rec,
-                            void *rec_cls)
-{
-  struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
-
-  hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
-  hwl->rec = rec;
-  hwl->rec_cls = rec_cls;
-  hwl->handle = handle;
-  GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
-                               handle->hwl_tail,
-                               hwl);
-  if (NULL != handle->my_hello)
-    hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
-                                                 hwl);
-  return hwl;
-}
-
-
-/**
- * Stop receiving updates about changes to our HELLO message.
- *
- * @param ghh handle to cancel
- */
-void
-GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
-{
-  struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
-
-  if (NULL != ghh->notify_task)
-    GNUNET_SCHEDULER_cancel (ghh->notify_task);
-  GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
-                               handle->hwl_tail,
-                               ghh);
-  GNUNET_free (ghh);
-}
-
-
 /**
  * Connect to the transport service.  Note that the connection may
  * complete (or fail) asynchronously.
@@ -1629,40 +1153,35 @@ GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
                            GNUNET_TRANSPORT_NotifyDisconnect nd,
                            GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
 {
-  struct GNUNET_TRANSPORT_Handle *ret;
+  struct GNUNET_TRANSPORT_Handle *h;
 
-  ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
+  h = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
   if (NULL != self)
   {
-    ret->self = *self;
-    ret->check_self = GNUNET_YES;
+    h->self = *self;
+    h->check_self = GNUNET_YES;
   }
-  ret->cfg = cfg;
-  ret->cls = cls;
-  ret->rec = rec;
-  ret->nc_cb = nc;
-  ret->nd_cb = nd;
-  ret->neb_cb = neb;
-  ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+  h->cfg = cfg;
+  h->cls = cls;
+  h->rec = rec;
+  h->nc_cb = nc;
+  h->nd_cb = nd;
+  h->neb_cb = neb;
+  h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Connecting to transport service.\n");
-  ret->client = GNUNET_CLIENT_connect ("transport",
-                                       cfg);
-  if (NULL == ret->client)
+  reconnect (h);
+  if (NULL == h->mq)
   {
-    GNUNET_free (ret);
+    GNUNET_free (h);
     return NULL;
   }
-  ret->neighbours =
+  h->neighbours =
     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
                                           GNUNET_YES);
-  ret->ready_heap =
+  h->ready_heap =
       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  schedule_control_transmit (ret,
-                             sizeof (struct StartMessage),
-                             &send_start,
-                             ret);
-  return ret;
+  return h;
 }
 
 
@@ -1694,8 +1213,6 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
   }
   GNUNET_free_non_null (handle->my_hello);
   handle->my_hello = NULL;
-  GNUNET_assert (NULL == handle->hwl_head);
-  GNUNET_assert (NULL == handle->hwl_tail);
   GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
   handle->ready_heap = NULL;
   GNUNET_free (handle);
index 8087159c61ef23e616de1801f585ff24c418228c..9a65616a9cbcbf6bfb1aab087309aa6976b564bd 100644 (file)
 
 
 /**
- * Linked list of functions to call whenever our HELLO is updated.
+ * Functions to call with this peer's HELLO.
  */
 struct GNUNET_TRANSPORT_GetHelloHandle
 {
 
   /**
-   * This is a doubly linked list.
+   * Our configuration.
    */
-  struct GNUNET_TRANSPORT_GetHelloHandle *next;
-
-  /**
-   * This is a doubly linked list.
-   */
-  struct GNUNET_TRANSPORT_GetHelloHandle *prev;
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
    * Transport handle.
    */
-  struct GNUNET_TRANSPORT_Handle *handle;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Callback to call once we got our HELLO.
    */
   GNUNET_TRANSPORT_HelloUpdateCallback rec;
 
+  /**
+   * Closure for @e rec.
+   */
+  void *rec_cls;
+
   /**
    * Task for calling the HelloUpdateCallback when we already have a HELLO
    */
   struct GNUNET_SCHEDULER_Task *notify_task;
 
   /**
-   * Closure for @e rec.
+   * ID of the task trying to reconnect to the service.
    */
-  void *rec_cls;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+  /**
+   * Delay until we try to reconnect.
+   */
+  struct GNUNET_TIME_Relative reconnect_delay;
 
 };
 
 
+/**
+ * Function we use for checking incoming HELLO messages.
+ *
+ * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
+ * @param msg message received
+ * @return #GNUNET_OK if message is well-formed
+ */
+static int
+check_hello (void *cls,
+             const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_PeerIdentity me;
+
+  if (GNUNET_OK !=
+      GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
+                           &me))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
+              (unsigned int) ntohs (msg->size),
+              GNUNET_i2s (&me));
+  return GNUNET_OK;
+}
+
 
 /**
- * Task to call the HelloUpdateCallback of the GetHelloHandle
+ * Function we use for handling incoming HELLO messages.
  *
- * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
+ * @param cls closure, a `struct GNUNET_TRANSPORT_GetHelloHandle *`
+ * @param msg message received
  */
 static void
-call_hello_update_cb_async (void *cls)
+handle_hello (void *cls,
+              const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
 
-  GNUNET_assert (NULL != ghh->handle->my_hello);
-  GNUNET_assert (NULL != ghh->notify_task);
-  ghh->notify_task = NULL;
   ghh->rec (ghh->rec_cls,
-            ghh->handle->my_hello);
+            msg);
+}
+
+
+/**
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
+ *
+ * @param ghh transport service to reconnect
+ */
+static void
+schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh);
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_TRANSPORT_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Error receiving from transport service, disconnecting temporarily.\n");
+  GNUNET_MQ_destroy (ghh->mq);
+  ghh->mq = NULL;
+  schedule_reconnect (ghh);
+}
+
+
+/**
+ * Try again to connect to transport service.
+ *
+ * @param cls the handle to the transport service
+ */
+static void
+reconnect (void *cls)
+{
+  GNUNET_MQ_hd_var_size (hello,
+                         GNUNET_MESSAGE_TYPE_HELLO,
+                         struct GNUNET_MessageHeader);
+  struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_hello_handler (ghh),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MQ_Envelope *env;
+  struct StartMessage *s;
+
+  ghh->reconnect_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Connecting to transport service.\n");
+  GNUNET_assert (NULL == ghh->mq);
+  ghh->mq = GNUNET_CLIENT_connecT (ghh->cfg,
+                                   "transport",
+                                   handlers,
+                                   &mq_error_handler,
+                                   ghh);
+  if (NULL == ghh->mq)
+    return;
+  env = GNUNET_MQ_msg (s,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+  s->options = htonl (0);
+  GNUNET_MQ_send (ghh->mq,
+                  env);
+}
+
+
+/**
+ * Function that will schedule the job that will try
+ * to connect us again to the client.
+ *
+ * @param ghh transport service to reconnect
+ */
+static void
+schedule_reconnect (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
+{
+  ghh->reconnect_task =
+      GNUNET_SCHEDULER_add_delayed (ghh->reconnect_delay,
+                                    &reconnect,
+                                    ghh);
+  ghh->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ghh->reconnect_delay);
 }
 
 
@@ -95,7 +214,7 @@ call_hello_update_cb_async (void *cls)
  * Obtain the HELLO message for this peer.  The callback given in this function
  * is never called synchronously.
  *
- * @param handle connection to transport service
+ * @param cfg configuration
  * @param rec function to call with the HELLO, sender will be our peer
  *            identity; message and sender will be NULL on timeout
  *            (handshake with transport service pending/failed).
@@ -104,23 +223,23 @@ call_hello_update_cb_async (void *cls)
  * @return handle to cancel the operation
  */
 struct GNUNET_TRANSPORT_GetHelloHandle *
-GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_get_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
                             void *rec_cls)
 {
-  struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
-
-  hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
-  hwl->rec = rec;
-  hwl->rec_cls = rec_cls;
-  hwl->handle = handle;
-  GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
-                               handle->hwl_tail,
-                               hwl);
-  if (NULL != handle->my_hello)
-    hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
-                                                 hwl);
-  return hwl;
+  struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
+
+  ghh = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
+  ghh->rec = rec;
+  ghh->rec_cls = rec_cls;
+  ghh->cfg = cfg;
+  reconnect (ghh);
+  if (NULL == ghh->mq)
+  {
+    GNUNET_free (ghh);
+    return NULL;
+  }
+  return ghh;
 }
 
 
@@ -132,15 +251,13 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
 void
 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
 {
-  struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
-
-  if (NULL != ghh->notify_task)
-    GNUNET_SCHEDULER_cancel (ghh->notify_task);
-  GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
-                               handle->hwl_tail,
-                               ghh);
+  if (NULL != ghh->mq)
+  {
+    GNUNET_MQ_destroy (ghh->mq);
+    ghh->mq = NULL;
+  }
   GNUNET_free (ghh);
 }
 
 
-/* end of transport_api_hello.c */
+/* end of transport_api_get_hello.c */
index 0abce2d62ee1c0de565aabcf218e2a2e4d42dd35..951ab9ba41ade8cca8b5096141fc6624fd4733f9 100644 (file)
  * @brief library to offer HELLOs to transport service
  * @author Christian Grothoff
  */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_hello_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_transport_service.h"
+
 
 /**
  * Entry in linked list for all offer-HELLO requests.
  */
 struct GNUNET_TRANSPORT_OfferHelloHandle
 {
-  /**
-   * For the DLL.
-   */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
-
-  /**
-   * For the DLL.
-   */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *next;
 
   /**
    * Transport service handle we use for transmission.
    */
-  struct GNUNET_TRANSPORT_Handle *th;
-
-  /**
-   * Transmission handle for this request.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *tth;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Function to call once we are done.
@@ -59,20 +51,31 @@ struct GNUNET_TRANSPORT_OfferHelloHandle
    */
   void *cls;
 
-  /**
-   * The HELLO message to be transmitted.
-   */
-  struct GNUNET_MessageHeader *msg;
 };
 
 
+/**
+ * Done sending HELLO message to the service, notify application.
+ *
+ * @param cls the handle for the operation
+ */
+static void
+finished_hello (void *cls)
+{
+  struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
+
+  if (NULL != ohh->cont)
+    ohh->cont (ohh->cls);
+  GNUNET_TRANSPORT_offer_hello_cancel (ohh);
+}
+
 
 /**
  * Offer the transport service the HELLO of another peer.  Note that
  * the transport service may just ignore this message if the HELLO is
  * malformed or useless due to our local configuration.
  *
- * @param handle connection to transport service
+ * @param cfg configuration
  * @param hello the hello message
  * @param cont continuation to call when HELLO has been sent,
  *     tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
@@ -83,46 +86,43 @@ struct GNUNET_TRANSPORT_OfferHelloHandle
  *
  */
 struct GNUNET_TRANSPORT_OfferHelloHandle *
-GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
+GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg,
                               const struct GNUNET_MessageHeader *hello,
                               GNUNET_SCHEDULER_TaskCallback cont,
                               void *cont_cls)
 {
-  struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
-  struct GNUNET_MessageHeader *msg;
+  struct GNUNET_TRANSPORT_OfferHelloHandle *ohh
+    = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_PeerIdentity peer;
-  uint16_t size;
 
-  if (NULL == handle->mq)
-    return NULL;
-  GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
-  size = ntohs (hello->size);
-  GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
   if (GNUNET_OK !=
       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
                            &peer))
   {
     GNUNET_break (0);
+    GNUNET_free (ohh);
+    return NULL;
+  }
+  ohh->mq = GNUNET_CLIENT_connecT (cfg,
+                                   "transport",
+                                   NULL,
+                                   NULL,
+                                   ohh);
+  if (NULL == ohh->mq)
+  {
+    GNUNET_free (ohh);
     return NULL;
   }
-
-  msg = GNUNET_malloc (size);
-  memcpy (msg, hello, size);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Offering HELLO message of `%s' to transport for validation.\n",
-       GNUNET_i2s (&peer));
-  ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
-  ohh->th = handle;
   ohh->cont = cont;
   ohh->cls = cont_cls;
-  ohh->msg = msg;
-  ohh->tth = schedule_control_transmit (handle,
-                                        size,
-                                        &send_hello,
-                                        ohh);
-  GNUNET_CONTAINER_DLL_insert (handle->oh_head,
-                               handle->oh_tail,
-                               ohh);
+  GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
+  env = GNUNET_MQ_msg_copy (hello);
+  GNUNET_MQ_notify_sent (env,
+                         &finished_hello,
+                         ohh);
+  GNUNET_MQ_send (ohh->mq,
+                  env);
   return ohh;
 }
 
@@ -135,13 +135,7 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
 void
 GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
 {
-  struct GNUNET_TRANSPORT_Handle *th = ohh->th;
-
-  cancel_control_transmit (ohh->th, ohh->tth);
-  GNUNET_CONTAINER_DLL_remove (th->oh_head,
-                               th->oh_tail,
-                               ohh);
-  GNUNET_free (ohh->msg);
+  GNUNET_MQ_destroy (ohh->mq);
   GNUNET_free (ohh);
 }