-converting CORE service to new transport MQ API
authorChristian Grothoff <christian@grothoff.org>
Tue, 26 Jul 2016 21:12:56 +0000 (21:12 +0000)
committerChristian Grothoff <christian@grothoff.org>
Tue, 26 Jul 2016 21:12:56 +0000 (21:12 +0000)
14 files changed:
src/core/Makefile.am
src/core/core_api.c
src/core/gnunet-service-core.c
src/core/gnunet-service-core_clients.c
src/core/gnunet-service-core_kx.c
src/core/gnunet-service-core_kx.h
src/core/gnunet-service-core_neighbours.c [deleted file]
src/core/gnunet-service-core_neighbours.h [deleted file]
src/core/gnunet-service-core_sessions.c
src/core/test_core_api_data.conf
src/core/test_core_api_reliability.c
src/core/test_core_api_start_only.c
src/core/test_core_defaults.conf
src/core/test_core_quota_compliance.c

index be855b453dc4331f0d7163f2f3599e9a82544832..3437aa43a4d0cfa6933d4e98d61f622425707fd0 100644 (file)
@@ -42,7 +42,6 @@ bin_PROGRAMS = \
 gnunet_service_core_SOURCES = \
  gnunet-service-core.c gnunet-service-core.h \
  gnunet-service-core_clients.c gnunet-service-core_clients.h \
- gnunet-service-core_neighbours.c gnunet-service-core_neighbours.h \
  gnunet-service-core_kx.c gnunet-service-core_kx.h \
  gnunet-service-core_sessions.c gnunet-service-core_sessions.h \
  gnunet-service-core_typemap.c gnunet-service-core_typemap.h
@@ -60,7 +59,9 @@ gnunet_core_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la
 
 if HAVE_TESTING
-  TESTING_TESTS = test_core_api_send_to_self test_core_api_mq
+  TESTING_TESTS = \
+    test_core_api_send_to_self \
+    test_core_api_mq
 endif
 
 check_PROGRAMS = \
index dd912ac22f9e9a725c70941ddc95bb676759f04a..caf614afc186f88553ad7fa812d13a408656cbe6 100644 (file)
@@ -674,6 +674,8 @@ handle_send_ready (void *cls,
   struct GNUNET_TIME_Relative delay;
   struct GNUNET_TIME_Relative overdue;
   unsigned int ret;
+  unsigned int priority;
+  int cork;
 
   GNUNET_break (GNUNET_NO == h->currently_down);
   pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
@@ -708,11 +710,12 @@ handle_send_ready (void *cls,
   sm->priority = htonl ((uint32_t) th->priority);
   sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
   sm->peer = pr->peer;
-  sm->cork = htonl ((uint32_t) th->cork);
+  sm->cork = htonl ((uint32_t) (cork = th->cork));
   sm->reserved = htonl (0);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Calling get_message with buffer of %u bytes\n",
-              (unsigned int) th->msize);
+              "Calling get_message with buffer of %u bytes (%s)\n",
+              (unsigned int) th->msize,
+             cork ? "corked" : "uncorked");
   /* FIXME: this is ugly and a bit brutal, but "get_message"
      may call GNUNET_CORE_notify_transmit_ready() which
      may call GNUNET_MQ_send() as well, and we MUST get this
@@ -725,30 +728,33 @@ handle_send_ready (void *cls,
      be required */
   GNUNET_MQ_send (h->mq,
                   env);
+  delay = GNUNET_TIME_absolute_get_duration (th->request_time);
+  overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
+  priority = th->priority;
   ret = th->get_message (th->get_message_cls,
                          th->msize,
                          &sm[1]);
+  /* after this point, 'th' should not be used anymore, it
+     may now be about another message! */
   sm->header.size = htons (ret + sizeof (struct SendMessage));
-  delay = GNUNET_TIME_absolute_get_duration (th->request_time);
-  overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
   if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Transmitting overdue %u bytes to `%s' at priority %u with %s delay %s\n",
          ret,
          GNUNET_i2s (&pr->peer),
-         (unsigned int) th->priority,
+         priority,
          GNUNET_STRINGS_relative_time_to_string (delay,
                                                  GNUNET_YES),
-         (th->cork) ? " (corked)" : "");
+         (cork) ? " (corked)" : " (uncorked)");
   else
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Transmitting %u bytes to `%s' at priority %u with %s delay %s\n",
          ret,
          GNUNET_i2s (&pr->peer),
-         (unsigned int) th->priority,
+         priority,
          GNUNET_STRINGS_relative_time_to_string (delay,
                                                  GNUNET_YES),
-         (th->cork) ? " (corked)" : "");
+         (cork) ? " (corked)" : " (uncorked)");
 }
 
 
@@ -995,9 +1001,10 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
     return NULL;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asking core for transmission of %u bytes to `%s'\n",
+       "Asking core for transmission of %u bytes to `%s'%s\n",
        (unsigned int) notify_size,
-       GNUNET_i2s (target));
+       GNUNET_i2s (target),
+       cork ? " (corked)" : "");
   pr = GNUNET_CONTAINER_multipeermap_get (handle->peers,
                                           target);
   if (NULL == pr)
index 70b83b24cdc067e7d5e20fc4962084ec90cbe1be..f9391e61690ada9c40754ae921948b2934f9d454 100644 (file)
@@ -29,7 +29,6 @@
 #include "gnunet-service-core.h"
 #include "gnunet-service-core_clients.h"
 #include "gnunet-service-core_kx.h"
-#include "gnunet-service-core_neighbours.h"
 #include "gnunet-service-core_sessions.h"
 #include "gnunet-service-core_typemap.h"
 
@@ -67,13 +66,13 @@ shutdown_task (void *cls)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service shutting down.\n");
   GSC_CLIENTS_done ();
-  GSC_NEIGHBOURS_done ();
   GSC_SESSIONS_done ();
   GSC_KX_done ();
   GSC_TYPEMAP_done ();
   if (NULL != GSC_stats)
   {
-    GNUNET_STATISTICS_destroy (GSC_stats, GNUNET_NO);
+    GNUNET_STATISTICS_destroy (GSC_stats,
+                              GNUNET_NO);
     GSC_stats = NULL;
   }
   GSC_cfg = NULL;
@@ -88,7 +87,8 @@ shutdown_task (void *cls)
  * @param c configuration to use
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server,
+run (void *cls,
+     struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
   struct GNUNET_CRYPTO_EddsaPrivateKey *pk;
@@ -97,7 +97,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
   GSC_cfg = c;
   GSC_server = server;
   if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_filename (GSC_cfg, "PEER", "PRIVATE_KEY",
+      GNUNET_CONFIGURATION_get_value_filename (GSC_cfg,
+                                              "PEER",
+                                              "PRIVATE_KEY",
                                                &keyfile))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -105,7 +107,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  GSC_stats = GNUNET_STATISTICS_create ("core", GSC_cfg);
+  GSC_stats = GNUNET_STATISTICS_create ("core",
+                                       GSC_cfg);
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                 NULL);
   GNUNET_SERVER_suspend (server);
@@ -113,9 +116,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
   pk = GNUNET_CRYPTO_eddsa_key_create_from_file (keyfile);
   GNUNET_free (keyfile);
   GNUNET_assert (NULL != pk);
-  if ((GNUNET_OK != GSC_KX_init (pk,
-                                 server)) ||
-      (GNUNET_OK != GSC_NEIGHBOURS_init ()))
+  if (GNUNET_OK != GSC_KX_init (pk,
+                               server))
   {
     GNUNET_SCHEDULER_shutdown ();
     return;
@@ -124,7 +126,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
   GSC_CLIENTS_init (GSC_server);
   GNUNET_SERVER_resume (GSC_server);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              _("Core service of `%4s' ready.\n"),
+              _("Core service of `%s' ready.\n"),
               GNUNET_i2s (&GSC_my_identity));
 }
 
index 3030eb701bb7698809a5a27d4abeeb7488df2e29..2f042dfbf8b28c02421262c51db4cbbfac239a89 100644 (file)
@@ -29,7 +29,6 @@
 #include "gnunet_transport_service.h"
 #include "gnunet-service-core.h"
 #include "gnunet-service-core_clients.h"
-#include "gnunet-service-core_neighbours.h"
 #include "gnunet-service-core_sessions.h"
 #include "gnunet-service-core_typemap.h"
 #include "core.h"
@@ -402,8 +401,8 @@ handle_client_send_request (void *cls,
     /* dequeue and recycle memory from pending request, there can only
        be at most one per client and peer */
     GNUNET_STATISTICS_update (GSC_stats,
-                              gettext_noop
-                              ("# dequeuing CAR (duplicate request)"), 1,
+                              gettext_noop ("# dequeuing CAR (duplicate request)"),
+                             1,
                               GNUNET_NO);
     GSC_SESSIONS_dequeue_request (car);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -476,7 +475,8 @@ handle_client_send (void *cls,
   if (msize < sizeof (struct SendMessage))
   {
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVER_receive_done (client,
+                               GNUNET_SYSERR);
     return;
   }
   sm = (const struct SendMessage *) message;
@@ -487,12 +487,12 @@ handle_client_send (void *cls,
   {
     /* client did not send INIT first! */
     GNUNET_break (0);
-    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    GNUNET_SERVER_receive_done (client,
+                               GNUNET_SYSERR);
     return;
   }
-  tc.car
-    = GNUNET_CONTAINER_multipeermap_get (c->requests,
-                                         &sm->peer);
+  tc.car = GNUNET_CONTAINER_multipeermap_get (c->requests,
+                                             &sm->peer);
   if (NULL == tc.car)
   {
     /* Must have been that we first approved the request, then got disconnected
@@ -501,9 +501,9 @@ handle_client_send (void *cls,
      * might also now be *again* connected.  So this can happen (but should be
      * rare).  If it does happen, the message is discarded. */
     GNUNET_STATISTICS_update (GSC_stats,
-                              gettext_noop
-                              ("# messages discarded (session disconnected)"),
-                              1, GNUNET_NO);
+                              gettext_noop ("# messages discarded (session disconnected)"),
+                              1,
+                             GNUNET_NO);
     GNUNET_SERVER_receive_done (client,
                                 GNUNET_OK);
     return;
@@ -519,7 +519,7 @@ handle_client_send (void *cls,
                                                         GNUNET_YES),
                 msize,
                 GNUNET_i2s (&sm->peer),
-                tc.cork ? "" : " (corked)");
+                tc.cork ? " (cork)" : " (uncorked)");
   else
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Client waited %s for transmission of %u bytes to `%s'%s\n",
@@ -527,7 +527,7 @@ handle_client_send (void *cls,
                                                         GNUNET_YES),
                 msize,
                 GNUNET_i2s (&sm->peer),
-                tc.cork ? "" : " (corked)");
+                tc.cork ? " (cork)" : " (uncorked)");
 
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (c->requests,
index 6a71099ae46e2faf6be2a9bc027a55e9a1a5f10f..d2b46ff4196f5e271cf7674beba22f6989747f8d 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009-2013 GNUnet e.V.
+     Copyright (C) 2009-2013, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -27,9 +27,9 @@
 #include "gnunet-service-core_kx.h"
 #include "gnunet-service-core.h"
 #include "gnunet-service-core_clients.h"
-#include "gnunet-service-core_neighbours.h"
 #include "gnunet-service-core_sessions.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_transport_core_service.h"
 #include "gnunet_constants.h"
 #include "gnunet_signatures.h"
 #include "gnunet_protocols.h"
@@ -87,8 +87,8 @@ struct EphemeralKeyMessage
   int32_t sender_status GNUNET_PACKED;
 
   /**
-   * An ECC signature of the @e origin_identity asserting the validity of
-   * the given ephemeral key.
+   * An ECC signature of the @e origin_identity asserting the validity
+   * of the given ephemeral key.
    */
   struct GNUNET_CRYPTO_EddsaSignature signature;
 
@@ -113,7 +113,8 @@ struct EphemeralKeyMessage
   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
 
   /**
-   * Public key of the signing peer (persistent version, not the ephemeral public key).
+   * Public key of the signing peer (persistent version, not the
+   * ephemeral public key).
    */
   struct GNUNET_PeerIdentity origin_identity;
 
@@ -202,8 +203,9 @@ struct EncryptedMessage
   /**
    * MAC of the encrypted message (starting at @e sequence_number),
    * used to verify message integrity. Everything after this value
-   * (excluding this value itself) will be encrypted and authenticated.
-   * #ENCRYPTED_HEADER_SIZE must be set to the offset of the *next* field.
+   * (excluding this value itself) will be encrypted and
+   * authenticated.  #ENCRYPTED_HEADER_SIZE must be set to the offset
+   * of the *next* field.
    */
   struct GNUNET_HashCode hmac;
 
@@ -216,7 +218,7 @@ struct EncryptedMessage
   /**
    * Reserved, always zero.
    */
-  uint32_t reserved;
+  uint32_t reserved GNUNET_PACKED;
 
   /**
    * Timestamp.  Used to prevent replay of ancient messages
@@ -254,8 +256,13 @@ struct GSC_KeyExchangeInfo
   /**
    * Identity of the peer.
    */
-  struct GNUNET_PeerIdentity peer;
+  const struct GNUNET_PeerIdentity *peer;
 
+  /**
+   * Message queue for sending messages to @a peer.
+   */
+  struct GNUNET_MQ_Handle *mq;
+  
   /**
    * PING message we transmit to the other peer.
    */
@@ -309,8 +316,8 @@ struct GSC_KeyExchangeInfo
   struct GNUNET_SCHEDULER_Task *keep_alive_task;
 
   /**
-   * Bit map indicating which of the 32 sequence numbers before the last
-   * were received (good for accepting out-of-order packets and
+   * Bit map indicating which of the 32 sequence numbers before the
+   * last were received (good for accepting out-of-order packets and
    * estimating reliability of the connection)
    */
   unsigned int last_packets_bitmap;
@@ -330,6 +337,11 @@ struct GSC_KeyExchangeInfo
    */
   uint32_t ping_challenge;
 
+  /**
+   * #GNUNET_YES if this peer currently has excess bandwidth.
+   */
+  int has_excess_bandwidth;
+
   /**
    * What is our connection status?
    */
@@ -338,6 +350,11 @@ struct GSC_KeyExchangeInfo
 };
 
 
+/**
+ * Transport service.
+ */
+static struct GNUNET_TRANSPORT_CoreHandle *transport;
+
 /**
  * Our private key.
  */
@@ -396,7 +413,7 @@ monitor_notify (struct GNUNET_SERVER_Client *client,
   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY);
   msg.header.size = htons (sizeof (msg));
   msg.state = htonl ((uint32_t) kx->status);
-  msg.peer = kx->peer;
+  msg.peer = *kx->peer;
   msg.timeout = GNUNET_TIME_absolute_hton (kx->timeout);
   GNUNET_SERVER_notification_context_unicast (nc,
                                               client,
@@ -434,7 +451,7 @@ monitor_notify_all (struct GSC_KeyExchangeInfo *kx)
   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY);
   msg.header.size = htons (sizeof (msg));
   msg.state = htonl ((uint32_t) kx->status);
-  msg.peer = kx->peer;
+  msg.peer = *kx->peer;
   msg.timeout = GNUNET_TIME_absolute_hton (kx->timeout);
   GNUNET_SERVER_notification_context_broadcast (nc,
                                                 &msg.header,
@@ -566,9 +583,14 @@ do_encrypt (struct GSC_KeyExchangeInfo *kx,
     return GNUNET_NO;
   }
   GNUNET_assert (size ==
-                 GNUNET_CRYPTO_symmetric_encrypt (in, (uint16_t) size,
-                                            &kx->encrypt_key, iv, out));
-  GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# bytes encrypted"), size,
+                 GNUNET_CRYPTO_symmetric_encrypt (in,
+                                                 (uint16_t) size,
+                                                 &kx->encrypt_key,
+                                                 iv,
+                                                 out));
+  GNUNET_STATISTICS_update (GSC_stats,
+                           gettext_noop ("# bytes encrypted"),
+                           size,
                             GNUNET_NO);
   /* the following is too sensitive to write to log files by accident,
      so we require manual intervention to get this one... */
@@ -576,19 +598,19 @@ do_encrypt (struct GSC_KeyExchangeInfo *kx,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Encrypted %u bytes for `%s' using key %u, IV %u\n",
               (unsigned int) size,
-              GNUNET_i2s (&kx->peer),
-              (unsigned int) kx->encrypt_key.crc32, GNUNET_CRYPTO_crc32_n (iv,
-                                                                           sizeof
-                                                                           (iv)));
+              GNUNET_i2s (kx->peer),
+              (unsigned int) kx->encrypt_key.crc32,
+             GNUNET_CRYPTO_crc32_n (iv,
+                                    sizeof (iv)));
 #endif
   return GNUNET_OK;
 }
 
 
 /**
- * Decrypt size bytes from @a in and write the result to @a out.  Use the
- * @a kx key for inbound traffic of the given neighbour.  This function does
- * NOT do any integrity-checks on the result.
+ * Decrypt size bytes from @a in and write the result to @a out.  Use
+ * the @a kx key for inbound traffic of the given neighbour.  This
+ * function does NOT do any integrity-checks on the result.
  *
  * @param kx key information context
  * @param iv initialization vector to use
@@ -636,7 +658,7 @@ do_decrypt (struct GSC_KeyExchangeInfo *kx,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted %u bytes from `%s' using key %u, IV %u\n",
               (unsigned int) size,
-              GNUNET_i2s (&kx->peer),
+              GNUNET_i2s (kx->peer),
               (unsigned int) kx->decrypt_key.crc32,
               GNUNET_CRYPTO_crc32_n (iv,
                                      sizeof
@@ -690,23 +712,34 @@ setup_fresh_ping (struct GSC_KeyExchangeInfo *kx)
   pm->header.size = htons (sizeof (struct PingMessage));
   pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
   pm->iv_seed = calculate_seed (kx);
-  derive_iv (&iv, &kx->encrypt_key, pm->iv_seed, &kx->peer);
+  derive_iv (&iv,
+            &kx->encrypt_key,
+            pm->iv_seed,
+            kx->peer);
   pp.challenge = kx->ping_challenge;
-  pp.target = kx->peer;
-  do_encrypt (kx, &iv, &pp.target, &pm->target,
+  pp.target = *kx->peer;
+  do_encrypt (kx,
+             &iv,
+             &pp.target,
+             &pm->target,
               sizeof (struct PingMessage) - ((void *) &pm->target -
                                              (void *) pm));
 }
 
 
 /**
- * Start the key exchange with the given peer.
+ * Function called by transport to notify us that
+ * a peer connected to us (on the network level).
+ * Starts the key exchange with the given peer.
  *
+ * @param cls closure (NULL)
  * @param pid identity of the peer to do a key exchange with
  * @return key exchange information context
  */
-struct GSC_KeyExchangeInfo *
-GSC_KX_start (const struct GNUNET_PeerIdentity *pid)
+static void *
+handle_transport_notify_connect (void *cls,
+                                 const struct GNUNET_PeerIdentity *pid,
+                                struct GNUNET_MQ_Handle *mq)
 {
   struct GSC_KeyExchangeInfo *kx;
   struct GNUNET_HashCode h1;
@@ -720,7 +753,8 @@ GSC_KX_start (const struct GNUNET_PeerIdentity *pid)
                             1,
                             GNUNET_NO);
   kx = GNUNET_new (struct GSC_KeyExchangeInfo);
-  kx->peer = *pid;
+  kx->mq = mq;
+  kx->peer = pid;
   kx->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
   GNUNET_CONTAINER_DLL_insert (kx_head,
                               kx_tail,
@@ -755,16 +789,29 @@ GSC_KX_start (const struct GNUNET_PeerIdentity *pid)
 
 
 /**
+ * Function called by transport telling us that a peer
+ * disconnected.
  * Stop key exchange with the given peer.  Clean up key material.
  *
- * @param kx key exchange to stop
+ * @param cls closure
+ * @param peer the peer that disconnected
+ * @param handler_cls the `struct GSC_KeyExchangeInfo` of the peer
  */
-void
-GSC_KX_stop (struct GSC_KeyExchangeInfo *kx)
+static void
+handle_transport_notify_disconnect (void *cls,
+                                    const struct GNUNET_PeerIdentity *peer,
+                                   void *handler_cls)
 {
-  GSC_SESSIONS_end (&kx->peer);
-  GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# key exchanges stopped"),
-                            1, GNUNET_NO);
+  struct GSC_KeyExchangeInfo *kx = handler_cls;
+  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer `%s' disconnected from us.\n",
+              GNUNET_i2s (peer));
+  GSC_SESSIONS_end (kx->peer);
+  GNUNET_STATISTICS_update (GSC_stats,
+                           gettext_noop ("# key exchanges stopped"),
+                            1,
+                           GNUNET_NO);
   if (NULL != kx->retry_set_key_task)
   {
     GNUNET_SCHEDULER_cancel (kx->retry_set_key_task);
@@ -792,13 +839,15 @@ GSC_KX_stop (struct GSC_KeyExchangeInfo *kx)
 static void
 send_ping (struct GSC_KeyExchangeInfo *kx)
 {
+  struct GNUNET_MQ_Envelope *env;
+  
   GNUNET_STATISTICS_update (GSC_stats,
                             gettext_noop ("# PING messages transmitted"),
                             1,
                             GNUNET_NO);
-  GSC_NEIGHBOURS_transmit (&kx->peer,
-                           &kx->ping.header,
-                           kx->set_key_retry_frequency);
+  env = GNUNET_MQ_msg_copy (&kx->ping.header);
+  GNUNET_MQ_send (kx->mq,
+                 env);
 }
 
 
@@ -821,10 +870,10 @@ derive_session_keys (struct GSC_KeyExchangeInfo *kx)
     return;
   }
   derive_aes_key (&GSC_my_identity,
-                 &kx->peer,
+                 kx->peer,
                  &key_material,
                  &kx->encrypt_key);
-  derive_aes_key (&kx->peer,
+  derive_aes_key (kx->peer,
                  &GSC_my_identity,
                  &key_material,
                  &kx->decrypt_key);
@@ -840,27 +889,19 @@ derive_session_keys (struct GSC_KeyExchangeInfo *kx)
  * We received a #GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY message.
  * Validate and update our key material and status.
  *
- * @param kx key exchange status for the corresponding peer
- * @param msg the set key message we received
+ * @param cls key exchange status for the corresponding peer
+ * @param m the set key message we received
  */
-void
-GSC_KX_handle_ephemeral_key (struct GSC_KeyExchangeInfo *kx,
-                            const struct GNUNET_MessageHeader *msg)
+static void
+handle_ephemeral_key (void *cls,
+                     const struct EphemeralKeyMessage *m)
 {
-  const struct EphemeralKeyMessage *m;
+  struct GSC_KeyExchangeInfo *kx = cls;
   struct GNUNET_TIME_Absolute start_t;
   struct GNUNET_TIME_Absolute end_t;
   struct GNUNET_TIME_Absolute now;
   enum GNUNET_CORE_KxState sender_status;
-  uint16_t size;
 
-  size = ntohs (msg->size);
-  if (sizeof (struct EphemeralKeyMessage) != size)
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  m = (const struct EphemeralKeyMessage *) msg;
   end_t = GNUNET_TIME_absolute_ntoh (m->expiration_time);
   if ( ( (GNUNET_CORE_KX_STATE_KEY_RECEIVED == kx->status) ||
         (GNUNET_CORE_KX_STATE_UP == kx->status) ||
@@ -880,19 +921,19 @@ GSC_KX_handle_ephemeral_key (struct GSC_KeyExchangeInfo *kx,
 
   if (0 !=
       memcmp (&m->origin_identity,
-             &kx->peer,
+             kx->peer,
               sizeof (struct GNUNET_PeerIdentity)))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Received EPHEMERAL_KEY from %s, but expected %s\n",
                 GNUNET_i2s (&m->origin_identity),
-                GNUNET_i2s_full (&kx->peer));
+                GNUNET_i2s_full (kx->peer));
     GNUNET_break_op (0);
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives EPHEMERAL_KEY request from `%s'.\n",
-              GNUNET_i2s (&kx->peer));
+              GNUNET_i2s (kx->peer));
   if ((ntohl (m->purpose.size) !=
        sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose) +
        sizeof (struct GNUNET_TIME_AbsoluteNBO) +
@@ -915,7 +956,7 @@ GSC_KX_handle_ephemeral_key (struct GSC_KeyExchangeInfo *kx,
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                _("Ephemeral key message from peer `%s' rejected as its validity range does not match our system time (%llu not in [%llu,%llu]).\n"),
-               GNUNET_i2s (&kx->peer),
+               GNUNET_i2s (kx->peer),
                (unsigned long long) now.abs_value_us,
                 (unsigned long long) start_t.abs_value_us,
                 (unsigned long long) end_t.abs_value_us);
@@ -937,11 +978,11 @@ GSC_KX_handle_ephemeral_key (struct GSC_KeyExchangeInfo *kx,
     break;
   case GNUNET_CORE_KX_STATE_KEY_SENT:
     /* fine, need to send our key after updating our status, see below */
-    GSC_SESSIONS_reinit (&kx->peer);
+    GSC_SESSIONS_reinit (kx->peer);
     break;
   case GNUNET_CORE_KX_STATE_KEY_RECEIVED:
     /* other peer already got our key, but typemap did go down */
-    GSC_SESSIONS_reinit (&kx->peer);
+    GSC_SESSIONS_reinit (kx->peer);
     break;
   case GNUNET_CORE_KX_STATE_UP:
     /* other peer already got our key, typemap NOT down */
@@ -1006,26 +1047,20 @@ GSC_KX_handle_ephemeral_key (struct GSC_KeyExchangeInfo *kx,
  * We received a PING message.  Validate and transmit
  * a PONG message.
  *
- * @param kx key exchange status for the corresponding peer
- * @param msg the encrypted PING message itself
+ * @param cls key exchange status for the corresponding peer
+ * @param m the encrypted PING message itself
  */
-void
-GSC_KX_handle_ping (struct GSC_KeyExchangeInfo *kx,
-                    const struct GNUNET_MessageHeader *msg)
+static void
+handle_ping (void *cls,
+            const struct PingMessage *m)
 {
-  const struct PingMessage *m;
+  struct GSC_KeyExchangeInfo *kx = cls;
   struct PingMessage t;
   struct PongMessage tx;
-  struct PongMessage tp;
+  struct PongMessage *tp;
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_CRYPTO_SymmetricInitializationVector iv;
-  uint16_t msize;
 
-  msize = ntohs (msg->size);
-  if (msize != sizeof (struct PingMessage))
-  {
-    GNUNET_break_op (0);
-    return;
-  }
   GNUNET_STATISTICS_update (GSC_stats,
                             gettext_noop ("# PING messages received"),
                             1,
@@ -1041,13 +1076,18 @@ GSC_KX_handle_ping (struct GSC_KeyExchangeInfo *kx,
                              GNUNET_NO);
     return;
   }
-  m = (const struct PingMessage *) msg;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives PING request from `%s'.\n",
-              GNUNET_i2s (&kx->peer));
-  derive_iv (&iv, &kx->decrypt_key, m->iv_seed, &GSC_my_identity);
+              GNUNET_i2s (kx->peer));
+  derive_iv (&iv,
+            &kx->decrypt_key,
+            m->iv_seed,
+            &GSC_my_identity);
   if (GNUNET_OK !=
-      do_decrypt (kx, &iv, &m->target, &t.target,
+      do_decrypt (kx,
+                 &iv,
+                 &m->target,
+                 &t.target,
                   sizeof (struct PingMessage) - ((void *) &m->target -
                                                  (void *) m)))
   {
@@ -1062,11 +1102,11 @@ GSC_KX_handle_ping (struct GSC_KeyExchangeInfo *kx,
     if (GNUNET_CORE_KX_STATE_REKEY_SENT != kx->status)
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Decryption of PING from peer `%s' failed\n",
-                  GNUNET_i2s (&kx->peer));
+                  GNUNET_i2s (kx->peer));
     else
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Decryption of PING from peer `%s' failed after rekey (harmless)\n",
-                  GNUNET_i2s (&kx->peer));
+                  GNUNET_i2s (kx->peer));
     GNUNET_break_op (0);
     return;
   }
@@ -1074,27 +1114,26 @@ GSC_KX_handle_ping (struct GSC_KeyExchangeInfo *kx,
   tx.reserved = 0;
   tx.challenge = t.challenge;
   tx.target = t.target;
-  tp.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PONG);
-  tp.header.size = htons (sizeof (struct PongMessage));
-  tp.iv_seed = calculate_seed (kx);
+  env = GNUNET_MQ_msg (tp,
+                      GNUNET_MESSAGE_TYPE_CORE_PONG);
+  tp->iv_seed = calculate_seed (kx);
   derive_pong_iv (&iv,
                   &kx->encrypt_key,
-                  tp.iv_seed,
+                  tp->iv_seed,
                   t.challenge,
-                  &kx->peer);
+                  kx->peer);
   do_encrypt (kx,
               &iv,
               &tx.challenge,
-              &tp.challenge,
-              sizeof (struct PongMessage) - ((void *) &tp.challenge -
-                                             (void *) &tp));
+              &tp->challenge,
+              sizeof (struct PongMessage) - ((void *) &tp->challenge -
+                                             (void *) tp));
   GNUNET_STATISTICS_update (GSC_stats,
                             gettext_noop ("# PONG messages created"),
                             1,
                             GNUNET_NO);
-  GSC_NEIGHBOURS_transmit (&kx->peer,
-                           &tp.header,
-                           kx->set_key_retry_frequency);
+  GNUNET_MQ_send (kx->mq,
+                 env);
 }
 
 
@@ -1119,7 +1158,7 @@ send_keep_alive (void *cls)
                               gettext_noop ("# sessions terminated by timeout"),
                               1,
                               GNUNET_NO);
-    GSC_SESSIONS_end (&kx->peer);
+    GSC_SESSIONS_end (kx->peer);
     kx->status = GNUNET_CORE_KX_STATE_KEY_SENT;
     monitor_notify_all (kx);
     send_key (kx);
@@ -1127,7 +1166,7 @@ send_keep_alive (void *cls)
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Sending KEEPALIVE to `%s'\n",
-              GNUNET_i2s (&kx->peer));
+              GNUNET_i2s (kx->peer));
   GNUNET_STATISTICS_update (GSC_stats,
                             gettext_noop ("# keepalive messages sent"),
                             1,
@@ -1182,23 +1221,16 @@ update_timeout (struct GSC_KeyExchangeInfo *kx)
  * We received a PONG message.  Validate and update our status.
  *
  * @param kx key exchange context for the the PONG
- * @param msg the encrypted PONG message itself
+ * @param m the encrypted PONG message itself
  */
-void
-GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
-                    const struct GNUNET_MessageHeader *msg)
+static void
+handle_pong (void *cls,
+            const struct PongMessage *m)
 {
-  const struct PongMessage *m;
+  struct GSC_KeyExchangeInfo *kx = cls;
   struct PongMessage t;
   struct GNUNET_CRYPTO_SymmetricInitializationVector iv;
-  uint16_t msize;
 
-  msize = ntohs (msg->size);
-  if (sizeof (struct PongMessage) != msize)
-  {
-    GNUNET_break_op (0);
-    return;
-  }
   GNUNET_STATISTICS_update (GSC_stats,
                             gettext_noop ("# PONG messages received"),
                             1,
@@ -1225,10 +1257,9 @@ GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
     GNUNET_break (0);
     return;
   }
-  m = (const struct PongMessage *) msg;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Core service receives PONG response from `%s'.\n",
-              GNUNET_i2s (&kx->peer));
+              GNUNET_i2s (kx->peer));
   /* mark as garbage, just to be sure */
   memset (&t, 255, sizeof (t));
   derive_pong_iv (&iv,
@@ -1252,14 +1283,14 @@ GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
                             1,
                             GNUNET_NO);
   if ((0 != memcmp (&t.target,
-                    &kx->peer,
+                    kx->peer,
                     sizeof (struct GNUNET_PeerIdentity))) ||
       (kx->ping_challenge != t.challenge))
   {
     /* PONG malformed */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Received malformed PONG wanted sender `%s' with challenge %u\n",
-                GNUNET_i2s (&kx->peer),
+                GNUNET_i2s (kx->peer),
                 (unsigned int) kx->ping_challenge);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Received malformed PONG received from `%s' with challenge %u\n",
@@ -1269,7 +1300,7 @@ GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received PONG from `%s'\n",
-              GNUNET_i2s (&kx->peer));
+              GNUNET_i2s (kx->peer));
   /* no need to resend key any longer */
   if (NULL != kx->retry_set_key_task)
   {
@@ -1291,7 +1322,7 @@ GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
                               GNUNET_NO);
     kx->status = GNUNET_CORE_KX_STATE_UP;
     monitor_notify_all (kx);
-    GSC_SESSIONS_create (&kx->peer, kx);
+    GSC_SESSIONS_create (kx->peer, kx);
     GNUNET_assert (NULL == kx->keep_alive_task);
     update_timeout (kx);
     break;
@@ -1326,6 +1357,8 @@ GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
 static void
 send_key (struct GSC_KeyExchangeInfo *kx)
 {
+  struct GNUNET_MQ_Envelope *env;
+  
   GNUNET_assert (GNUNET_CORE_KX_STATE_DOWN != kx->status);
   if (NULL != kx->retry_set_key_task)
   {
@@ -1335,12 +1368,12 @@ send_key (struct GSC_KeyExchangeInfo *kx)
   /* always update sender status in SET KEY message */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Sending key to `%s' (my status: %d)\n",
-              GNUNET_i2s (&kx->peer),
+              GNUNET_i2s (kx->peer),
              kx->status);
   current_ekm.sender_status = htonl ((int32_t) (kx->status));
-  GSC_NEIGHBOURS_transmit (&kx->peer,
-                           &current_ekm.header,
-                           kx->set_key_retry_frequency);
+  env = GNUNET_MQ_msg_copy (&current_ekm.header);
+  GNUNET_MQ_send (kx->mq,
+                 env);
   if (GNUNET_CORE_KX_STATE_KEY_SENT != kx->status)
     send_ping (kx);
   kx->retry_set_key_task =
@@ -1364,9 +1397,9 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx,
 {
   size_t used = payload_size + sizeof (struct EncryptedMessage);
   char pbuf[used];              /* plaintext */
-  char cbuf[used];              /* ciphertext */
   struct EncryptedMessage *em;  /* encrypted message */
   struct EncryptedMessage *ph;  /* plaintext header */
+  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_CRYPTO_SymmetricInitializationVector iv;
   struct GNUNET_CRYPTO_AuthKey auth_key;
 
@@ -1376,17 +1409,16 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx,
   ph->reserved = 0;
   ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
   GNUNET_memcpy (&ph[1],
-          payload,
-          payload_size);
-
-  em = (struct EncryptedMessage *) cbuf;
-  em->header.size = htons (used);
-  em->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
+                payload,
+                payload_size);
+  env = GNUNET_MQ_msg_extra (em,
+                            payload_size,
+                            GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE);
   em->iv_seed = ph->iv_seed;
   derive_iv (&iv,
              &kx->encrypt_key,
              ph->iv_seed,
-             &kx->peer);
+             kx->peer);
   GNUNET_assert (GNUNET_OK ==
                  do_encrypt (kx,
                              &iv,
@@ -1396,7 +1428,7 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Encrypted %u bytes for %s\n",
               (unsigned int) (used - ENCRYPTED_HEADER_SIZE),
-              GNUNET_i2s (&kx->peer));
+              GNUNET_i2s (kx->peer));
   derive_auth_key (&auth_key,
                   &kx->encrypt_key,
                   ph->iv_seed);
@@ -1404,9 +1436,8 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx,
                       &em->sequence_number,
                       used - ENCRYPTED_HEADER_SIZE,
                       &em->hmac);
-  GSC_NEIGHBOURS_transmit (&kx->peer,
-                           &em->header,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
+  GNUNET_MQ_send (kx->mq,
+                 env);
 }
 
 
@@ -1428,18 +1459,41 @@ struct DeliverMessageContext
 };
 
 
+/**
+ * We received an encrypted message.  Check that it is
+ * well-formed (size-wise).
+ *
+ * @param cls key exchange context for encrypting the message
+ * @param m encrypted message
+ * @return #GNUNET_OK if @a msg is well-formed (size-wise)
+ */
+static int
+check_encrypted (void *cls,
+                const struct EncryptedMessage *m)
+{
+  uint16_t size = ntohs (m->header.size) - sizeof (*m);
+
+  if (size < sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
 /**
  * We received an encrypted message.  Decrypt, validate and
  * pass on to the appropriate clients.
  *
- * @param kx key exchange context for encrypting the message
- * @param msg encrypted message
+ * @param cls key exchange context for encrypting the message
+ * @param m encrypted message
  */
-void
-GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
-                                 const struct GNUNET_MessageHeader *msg)
+static void
+handle_encrypted (void *cls,
+                 const struct EncryptedMessage *m)
 {
-  const struct EncryptedMessage *m;
+  struct GSC_KeyExchangeInfo *kx = cls;
   struct EncryptedMessage *pt;  /* plaintext */
   struct GNUNET_HashCode ph;
   uint32_t snum;
@@ -1447,16 +1501,9 @@ GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
   struct GNUNET_CRYPTO_SymmetricInitializationVector iv;
   struct GNUNET_CRYPTO_AuthKey auth_key;
   struct DeliverMessageContext dmc;
-  uint16_t size = ntohs (msg->size);
+  uint16_t size = ntohs (m->header.size);
   char buf[size] GNUNET_ALIGN;
 
-  if (size <
-      sizeof (struct EncryptedMessage) + sizeof (struct GNUNET_MessageHeader))
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-  m = (const struct EncryptedMessage *) msg;
   if (GNUNET_CORE_KX_STATE_UP != kx->status)
   {
     GNUNET_STATISTICS_update (GSC_stats,
@@ -1469,11 +1516,11 @@ GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                _("Session to peer `%s' went down due to key expiration (should not happen)\n"),
-               GNUNET_i2s (&kx->peer));
+               GNUNET_i2s (kx->peer));
     GNUNET_STATISTICS_update (GSC_stats,
                               gettext_noop ("# sessions terminated by key expiration"),
                               1, GNUNET_NO);
-    GSC_SESSIONS_end (&kx->peer);
+    GSC_SESSIONS_end (kx->peer);
     if (NULL != kx->keep_alive_task)
     {
       GNUNET_SCHEDULER_cancel (kx->keep_alive_task);
@@ -1500,7 +1547,7 @@ GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
     /* checksum failed */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Failed checksum validation for a message from `%s'\n",
-               GNUNET_i2s (&kx->peer));
+               GNUNET_i2s (kx->peer));
     return;
   }
   derive_iv (&iv,
@@ -1518,7 +1565,7 @@ GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Decrypted %u bytes from %s\n",
               (unsigned int) (size - ENCRYPTED_HEADER_SIZE),
-              GNUNET_i2s (&kx->peer));
+              GNUNET_i2s (kx->peer));
   pt = (struct EncryptedMessage *) buf;
 
   /* validate sequence number */
@@ -1596,9 +1643,10 @@ GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
                             size - sizeof (struct EncryptedMessage),
                             GNUNET_NO);
   dmc.kx = kx;
-  dmc.peer = &kx->peer;
+  dmc.peer = kx->peer;
   if (GNUNET_OK !=
-      GNUNET_SERVER_mst_receive (mst, &dmc,
+      GNUNET_SERVER_mst_receive (mst,
+                                &dmc,
                                  &buf[sizeof (struct EncryptedMessage)],
                                  size - sizeof (struct EncryptedMessage),
                                  GNUNET_YES,
@@ -1608,44 +1656,31 @@ GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
 
 
 /**
- * Obtain the array of message handlers provided by KX.
+ * One of our neighbours has excess bandwidth, remember this.
  *
- * @return NULL-entry terminated array of handlers
+ * @param cls NULL
+ * @param pid identity of the peer with excess bandwidth
+ * @param connect_cls the `struct Neighbour`
  */
-const struct GNUNET_MQ_MessageHandler *
-GSC_KX_get_handlers (void)
+static void
+handle_transport_notify_excess_bw (void *cls,
+                                   const struct GNUNET_PeerIdentity *pid,
+                                  void *connect_cls)
 {
-#if 0
-  GNUNET_MQ_hd_fixed_size (ephemeral_key,
-                          GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY,
-                          struct EphemeralKeyMessage);
-  GNUNET_MQ_hd_fixed_size (ping,
-                          PING,
-                          struct PingMessage);
-  GNUNET_MQ_hd_fixed_size (pong,
-                          PING,
-                          struct PongMessage);
-  GNUNET_MQ_hd_var_size (encrypted,
-                        PING,
-                        struct ping);
-#endif
-  static struct GNUNET_MQ_MessageHandler handlers[] = {
-#if 0
-    make_ephemeral_key_handler (),
-    make_ping_handler (),
-    make_pong_handler (),
-    make_encrypted_handler (),
-#endif
-    GNUNET_MQ_handler_end()
-  };
-  return handlers;
+  struct GSC_KeyExchangeInfo *kx = connect_cls;  
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer %s has excess bandwidth available\n",
+              GNUNET_i2s (pid));
+  kx->has_excess_bandwidth = GNUNET_YES;
+  GSC_SESSIONS_solicit (pid);
 }
 
 
 /**
- * Deliver P2P message to interested clients.
- * Invokes send twice, once for clients that want the full message, and once
- * for clients that only want the header
+ * Deliver P2P message to interested clients.  Invokes send twice,
+ * once for clients that want the full message, and once for clients
+ * that only want the header
  *
  * @param cls always NULL
  * @param client who sent us the message (struct GSC_KeyExchangeInfo)
@@ -1778,11 +1813,31 @@ int
 GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk,
              struct GNUNET_SERVER_Handle *server)
 {
+  GNUNET_MQ_hd_fixed_size (ephemeral_key,
+                          GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY,
+                          struct EphemeralKeyMessage);
+  GNUNET_MQ_hd_fixed_size (ping,
+                          GNUNET_MESSAGE_TYPE_CORE_PING,
+                          struct PingMessage);
+  GNUNET_MQ_hd_fixed_size (pong,
+                          GNUNET_MESSAGE_TYPE_CORE_PONG,
+                          struct PongMessage);
+  GNUNET_MQ_hd_var_size (encrypted,
+                        GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE,
+                        struct EncryptedMessage);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_ephemeral_key_handler (NULL),
+    make_ping_handler (NULL),
+    make_pong_handler (NULL),
+    make_encrypted_handler (NULL),
+    GNUNET_MQ_handler_end()
+  };
+
   nc = GNUNET_SERVER_notification_context_create (server,
                                                   1);
   my_private_key = pk;
   GNUNET_CRYPTO_eddsa_key_get_public (my_private_key,
-                                                 &GSC_my_identity.public_key);
+                                      &GSC_my_identity.public_key);
   my_ephemeral_key = GNUNET_CRYPTO_ecdhe_key_create ();
   if (NULL == my_ephemeral_key)
   {
@@ -1796,6 +1851,19 @@ GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk,
                                              &do_rekey,
                                              NULL);
   mst = GNUNET_SERVER_mst_create (&deliver_message, NULL);
+  transport 
+    = GNUNET_TRANSPORT_core_connect (GSC_cfg,
+                                    &GSC_my_identity,
+                                    handlers,
+                                    NULL,
+                                    &handle_transport_notify_connect,
+                                    &handle_transport_notify_disconnect,
+                                    &handle_transport_notify_excess_bw);
+  if (NULL == transport)
+  {
+    GSC_KX_done ();
+    return GNUNET_SYSERR;
+  }
   return GNUNET_OK;
 }
 
@@ -1806,6 +1874,11 @@ GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk,
 void
 GSC_KX_done ()
 {
+  if (NULL != transport)
+  {
+    GNUNET_TRANSPORT_core_disconnect (transport);
+    transport = NULL;
+  }
   if (NULL != rekey_task)
   {
     GNUNET_SCHEDULER_cancel (rekey_task);
@@ -1834,6 +1907,32 @@ GSC_KX_done ()
 }
 
 
+ /**
+ * Check how many messages are queued for the given neighbour.
+ *
+ * @param kxinfo data about neighbour to check
+ * @return number of items in the message queue
+ */
+unsigned int
+GSC_NEIGHBOURS_get_queue_length (const struct GSC_KeyExchangeInfo *kxinfo)
+{
+  return GNUNET_MQ_get_length (kxinfo->mq);
+}
+
+
+/**
+ * Check if the given neighbour has excess bandwidth available.
+ *
+ * @param target neighbour to check
+ * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
+ */
+int
+GSC_NEIGHBOURS_check_excess_bandwidth (const struct GSC_KeyExchangeInfo *kxinfo)
+{
+  return kxinfo->has_excess_bandwidth;
+}
+
+
 /**
  * Handle #GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS request.  For this
  * request type, the client does not have to have transmitted an INIT
@@ -1860,7 +1959,9 @@ GSC_KX_handle_client_monitor_peers (void *cls,
   done_msg.header.size = htons (sizeof (struct MonitorNotifyMessage));
   done_msg.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY);
   done_msg.state = htonl ((uint32_t) GNUNET_CORE_KX_ITERATION_FINISHED);
-  memset (&done_msg.peer, 0, sizeof (struct GNUNET_PeerIdentity));
+  memset (&done_msg.peer,
+         0,
+         sizeof (struct GNUNET_PeerIdentity));
   done_msg.timeout = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_FOREVER_ABS);
   GNUNET_SERVER_notification_context_unicast (nc,
                                               client,
index 7649315ce49d225a54b45fa8d8860d3564d5f31a..8614f090fec6b2292dd0acb1486d63b164e1c5d5 100644 (file)
 struct GSC_KeyExchangeInfo;
 
 
-/**
- * Obtain the array of message handlers provided by KX.
- *
- * @return NULL-entry terminated array of handlers
- */
-const struct GNUNET_MQ_MessageHandler *
-GSC_KX_get_handlers (void);
-
-
-/**
- * We received a EPHEMERAL_KEY message.  Validate and update
- * our key material and status.
- *
- * @param kx key exchange status for the corresponding peer
- * @param msg the set key message we received
- */
-void
-GSC_KX_handle_ephemeral_key (struct GSC_KeyExchangeInfo *kx,
-                            const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * We received a PING message.  Validate and transmit
- * a PONG message.
- *
- * @param kx key exchange status for the corresponding peer
- * @param msg the encrypted PING message itself
- */
-void
-GSC_KX_handle_ping (struct GSC_KeyExchangeInfo *kx,
-                    const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * We received a PONG message.  Validate and update our status.
- *
- * @param kx key exchange status for the corresponding peer
- * @param msg the encrypted PONG message itself
- */
-void
-GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
-                    const struct GNUNET_MessageHeader *msg);
-
-
 /**
  * Encrypt and transmit a message with the given payload.
  *
@@ -89,57 +45,47 @@ GSC_KX_handle_pong (struct GSC_KeyExchangeInfo *kx,
  */
 void
 GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx,
-                             const void *payload, size_t payload_size);
+                             const void *payload,
+                            size_t payload_size);
 
 
 /**
- * We received an encrypted message.  Decrypt, validate and
- * pass on to the appropriate clients.
- *
- * @param kx key exchange information context
- * @param msg encrypted message
- */
-void
-GSC_KX_handle_encrypted_message (struct GSC_KeyExchangeInfo *kx,
-                                 const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Start the key exchange with the given peer.
+ * Initialize KX subsystem.
  *
- * @param pid identity of the peer to do a key exchange with
- * @return key exchange information context
+ * @param pk private key to use for the peer
+ * @param server the server of the CORE service
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
-struct GSC_KeyExchangeInfo *
-GSC_KX_start (const struct GNUNET_PeerIdentity *pid);
+int
+GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk,
+             struct GNUNET_SERVER_Handle *server);
 
 
 /**
- * Stop key exchange with the given peer.  Clean up key material.
- *
- * @param kx key exchange to stop
+ * Shutdown KX subsystem.
  */
 void
-GSC_KX_stop (struct GSC_KeyExchangeInfo *kx);
+GSC_KX_done (void);
 
 
 /**
- * Initialize KX subsystem.
+ * Check if the given neighbour has excess bandwidth available.
  *
- * @param pk private key to use for the peer
- * @param server the server of the CORE service
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ * @param target neighbour to check
+ * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
  */
 int
-GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk,
-             struct GNUNET_SERVER_Handle *server);
+GSC_NEIGHBOURS_check_excess_bandwidth (const struct GSC_KeyExchangeInfo *target);
 
 
 /**
- * Shutdown KX subsystem.
+ * Check how many messages are queued for the given neighbour.
+ *
+ * @param target neighbour to check
+ * @return number of items in the message queue
  */
-void
-GSC_KX_done (void);
+unsigned int
+GSC_NEIGHBOURS_get_queue_length (const struct GSC_KeyExchangeInfo *target);
 
 
 /**
diff --git a/src/core/gnunet-service-core_neighbours.c b/src/core/gnunet-service-core_neighbours.c
deleted file mode 100644 (file)
index 7af49a3..0000000
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
-     This file is part of GNUnet.
-     Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
-*/
-
-/**
- * @file core/gnunet-service-core_neighbours.c
- * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
- * @author Christian Grothoff
- */
-#include "platform.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_statistics_service.h"
-#include "gnunet_transport_service.h"
-#include "gnunet_transport_core_service.h"
-#include "gnunet-service-core.h"
-#include "gnunet-service-core_neighbours.h"
-#include "gnunet-service-core_kx.h"
-#include "gnunet-service-core_sessions.h"
-#include "gnunet_constants.h"
-
-
-/**
- * Message ready for transmission via transport service.  This struct
- * is followed by the actual content of the message.
- */
-struct NeighbourMessageEntry
-{
-
-  /**
-   * We keep messages in a doubly linked list.
-   */
-  struct NeighbourMessageEntry *next;
-
-  /**
-   * We keep messages in a doubly linked list.
-   */
-  struct NeighbourMessageEntry *prev;
-
-  /**
-   * By when are we supposed to transmit this message?
-   */
-  struct GNUNET_TIME_Absolute deadline;
-
-  /**
-   * What time did we submit the request?
-   */
-  struct GNUNET_TIME_Absolute submission_time;
-
-  /**
-   * How long is the message? (number of bytes following the `struct
-   * MessageEntry`, but not including the size of `struct
-   * MessageEntry` itself!)
-   */
-  size_t size;
-
-};
-
-
-/**
- * Data kept per transport-connected peer.
- */
-struct Neighbour
-{
-
-  /**
-   * Head of the batched message queue (already ordered, transmit
-   * starting with the head).
-   */
-  struct NeighbourMessageEntry *message_head;
-
-  /**
-   * Tail of the batched message queue (already ordered, append new
-   * messages to tail).
-   */
-  struct NeighbourMessageEntry *message_tail;
-
-  /**
-   * Handle for pending requests for transmission to this peer
-   * with the transport service.  NULL if no request is pending.
-   */
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-
-  /**
-   * Information about the key exchange with the other peer.
-   */
-  struct GSC_KeyExchangeInfo *kxinfo;
-
-  /**
-   * Identity of the other peer.
-   */
-  struct GNUNET_PeerIdentity peer;
-
-  /**
-   * ID of task used for re-trying plaintext scheduling.
-   */
-  struct GNUNET_SCHEDULER_Task *retry_plaintext_task;
-
-  /**
-   * How many messages are in the queue for this neighbour?
-   */
-  unsigned int queue_size;
-
-  /**
-   * #GNUNET_YES if this peer currently has excess bandwidth.
-   */
-  int has_excess_bandwidth;
-
-};
-
-
-/**
- * Map of peer identities to `struct Neighbour`.
- */
-static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
-
-/**
- * Transport service.
- */
-static struct GNUNET_TRANSPORT_Handle *transport;
-
-
-/**
- * Find the entry for the given neighbour.
- *
- * @param peer identity of the neighbour
- * @return NULL if we are not connected, otherwise the
- *         neighbour's entry.
- */
-static struct Neighbour *
-find_neighbour (const struct GNUNET_PeerIdentity *peer)
-{
-  if (NULL == neighbours)
-    return NULL;
-  return GNUNET_CONTAINER_multipeermap_get (neighbours,
-                                            peer);
-}
-
-
-/**
- * Free the given entry for the neighbour.
- *
- * @param n neighbour to free
- */
-static void
-free_neighbour (struct Neighbour *n)
-{
-  struct NeighbourMessageEntry *m;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Destroying neighbour entry for peer `%s'\n",
-              GNUNET_i2s (&n->peer));
-  while (NULL != (m = n->message_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (n->message_head,
-                                 n->message_tail,
-                                 m);
-    n->queue_size--;
-    GNUNET_free (m);
-  }
-  GNUNET_assert (0 == n->queue_size);
-  if (NULL != n->th)
-  {
-    GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
-    n->th = NULL;
-  }
-  GNUNET_STATISTICS_update (GSC_stats,
-                            gettext_noop
-                            ("# sessions terminated by transport disconnect"),
-                            1, GNUNET_NO);
-  if (NULL != n->kxinfo)
-  {
-    GSC_KX_stop (n->kxinfo);
-    n->kxinfo = NULL;
-  }
-  if (NULL != n->retry_plaintext_task)
-  {
-    GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
-    n->retry_plaintext_task = NULL;
-  }
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multipeermap_remove (neighbours,
-                                                       &n->peer, n));
-  GNUNET_STATISTICS_set (GSC_stats,
-                         gettext_noop ("# neighbour entries allocated"),
-                         GNUNET_CONTAINER_multipeermap_size (neighbours),
-                         GNUNET_NO);
-  GNUNET_free (n);
-}
-
-
-/**
- * Check if we have encrypted messages for the specified neighbour
- * pending, and if so, check with the transport about sending them
- * out.
- *
- * @param n neighbour to check.
- */
-static void
-process_queue (struct Neighbour *n);
-
-
-/**
- * Function called when the transport service is ready to receive a
- * message for the respective peer
- *
- * @param cls neighbour to use message from
- * @param size number of bytes we can transmit
- * @param buf where to copy the message
- * @return number of bytes transmitted
- */
-static size_t
-transmit_ready (void *cls,
-                size_t size,
-                void *buf)
-{
-  struct Neighbour *n = cls;
-  struct NeighbourMessageEntry *m;
-  size_t ret;
-  char *cbuf;
-  struct GNUNET_TIME_Relative delay;
-  struct GNUNET_TIME_Relative overdue;
-
-  n->th = NULL;
-  m = n->message_head;
-  if (NULL == m)
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  GNUNET_CONTAINER_DLL_remove (n->message_head,
-                               n->message_tail,
-                               m);
-  n->queue_size--;
-  if (NULL == buf)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Transmission of message of type %u and size %u failed\n",
-                (unsigned int)
-                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
-                (unsigned int) m->size);
-    GNUNET_free (m);
-    process_queue (n);
-    return 0;
-  }
-  delay = GNUNET_TIME_absolute_get_duration (m->submission_time);
-  overdue = GNUNET_TIME_absolute_get_duration (m->deadline);
-  cbuf = buf;
-  GNUNET_assert (size >= m->size);
-  GNUNET_memcpy (cbuf,
-          &m[1],
-          m->size);
-  ret = m->size;
-  if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Copied overdue message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
-                (unsigned int)
-                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
-                (unsigned int) ret,
-                GNUNET_i2s (&n->peer),
-                GNUNET_STRINGS_relative_time_to_string (delay,
-                                                        GNUNET_YES));
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Copied message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
-                (unsigned int)
-                ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
-                (unsigned int) ret,
-                GNUNET_i2s (&n->peer),
-                GNUNET_STRINGS_relative_time_to_string (delay,
-                                                        GNUNET_YES));
-  GNUNET_free (m);
-  n->has_excess_bandwidth = GNUNET_NO;
-  process_queue (n);
-  GNUNET_STATISTICS_update (GSC_stats,
-                            gettext_noop
-                            ("# encrypted bytes given to transport"), ret,
-                            GNUNET_NO);
-  return ret;
-}
-
-
-/**
- * Check if we have messages for the specified neighbour pending, and
- * if so, check with the transport about sending them out.
- *
- * @param n neighbour to check.
- */
-static void
-process_queue (struct Neighbour *n)
-{
-  struct NeighbourMessageEntry *m;
-
-  if (NULL != n->th)
-    return;                     /* request already pending */
-  m = n->message_head;
-  if (NULL == m)
-  {
-    /* notify sessions that the queue is empty and more messages
-     * could thus be queued now */
-    GSC_SESSIONS_solicit (&n->peer);
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Asking transport for transmission of %u bytes to `%s' in next %s\n",
-              (unsigned int) m->size,
-              GNUNET_i2s (&n->peer),
-              GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (m->deadline),
-                                                      GNUNET_NO));
-  m->submission_time = GNUNET_TIME_absolute_get ();
-  n->th
-    = GNUNET_TRANSPORT_notify_transmit_ready (transport,
-                                              &n->peer,
-                                              m->size,
-                                              GNUNET_TIME_absolute_get_remaining (m->deadline),
-                                              &transmit_ready,
-                                              n);
-  if (NULL != n->th)
-    return;
-  /* message request too large or duplicate request */
-  GNUNET_break (0);
-  /* discard encrypted message */
-  GNUNET_CONTAINER_DLL_remove (n->message_head,
-                               n->message_tail,
-                               m);
-  n->queue_size--;
-  GNUNET_free (m);
-  process_queue (n);
-}
-
-
-/**
- * Function called by transport to notify us that
- * a peer connected to us (on the network level).
- *
- * @param cls closure
- * @param peer the peer that connected
- */
-static void
-handle_transport_notify_connect (void *cls,
-                                 const struct GNUNET_PeerIdentity *peer)
-{
-  struct Neighbour *n;
-
-  if (0 == memcmp (peer,
-                   &GSC_my_identity,
-                   sizeof (struct GNUNET_PeerIdentity)))
-  {
-    GNUNET_break (0);
-    return;
-  }
-  n = find_neighbour (peer);
-  if (NULL != n)
-  {
-    /* duplicate connect notification!? */
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s exists already\n",
-                GNUNET_i2s (peer));
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received connection from `%s'.\n",
-              GNUNET_i2s (peer));
-  n = GNUNET_new (struct Neighbour);
-  n->peer = *peer;
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multipeermap_put (neighbours,
-                                                    &n->peer,
-                                                    n,
-                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  GNUNET_STATISTICS_set (GSC_stats,
-                         gettext_noop ("# neighbour entries allocated"),
-                         GNUNET_CONTAINER_multipeermap_size (neighbours),
-                         GNUNET_NO);
-  n->kxinfo = GSC_KX_start (peer);
-}
-
-
-/**
- * Function called by transport telling us that a peer
- * disconnected.
- *
- * @param cls closure
- * @param peer the peer that disconnected
- */
-static void
-handle_transport_notify_disconnect (void *cls,
-                                    const struct GNUNET_PeerIdentity *peer)
-{
-  struct Neighbour *n;
-
-  if (0 == memcmp (peer,
-                   &GSC_my_identity,
-                   sizeof (struct GNUNET_PeerIdentity)))
-  {
-    GNUNET_break (0);
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer `%s' disconnected from us; received notification from transport.\n",
-              GNUNET_i2s (peer));
-  n = find_neighbour (peer);
-  if (NULL == n)
-  {
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s not found\n",
-                GNUNET_i2s (peer));
-    return;
-  }
-  free_neighbour (n);
-}
-
-
-/**
- * Function called by the transport for each received message.
- *
- * @param cls closure
- * @param peer (claimed) identity of the other peer
- * @param message the message
- */
-static void
-handle_transport_receive (void *cls,
-                          const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *message)
-{
-  struct Neighbour *n;
-  uint16_t type;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message of type %u from `%s', demultiplexing.\n",
-              (unsigned int) ntohs (message->type),
-              GNUNET_i2s (peer));
-  if (0 == memcmp (peer,
-                   &GSC_my_identity,
-                   sizeof (struct GNUNET_PeerIdentity)))
-  {
-    GNUNET_break (0);
-    return;
-  }
-  n = find_neighbour (peer);
-  if (NULL == n)
-  {
-    /* received message from peer that is not connected!? */
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s not found\n",
-                GNUNET_i2s (peer));
-    return;
-  }
-  type = ntohs (message->type);
-  switch (type)
-  {
-  case GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY:
-    GSC_KX_handle_ephemeral_key (n->kxinfo, message);
-    break;
-  case GNUNET_MESSAGE_TYPE_CORE_PING:
-    GSC_KX_handle_ping (n->kxinfo, message);
-    break;
-  case GNUNET_MESSAGE_TYPE_CORE_PONG:
-    GSC_KX_handle_pong (n->kxinfo, message);
-    break;
-  case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
-    GSC_KX_handle_encrypted_message (n->kxinfo, message);
-    break;
-  default:
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                _("Unsupported message of type %u (%u bytes) received from peer `%s'\n"),
-                (unsigned int) type,
-                (unsigned int) ntohs (message->size),
-                GNUNET_i2s (peer));
-    return;
-  }
-}
-
-
-/**
- * Transmit the given message to the given target.
- *
- * @param target peer that should receive the message (must be connected)
- * @param msg message to transmit
- * @param timeout by when should the transmission be done?
- */
-void
-GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
-                         const struct GNUNET_MessageHeader *msg,
-                         struct GNUNET_TIME_Relative timeout)
-{
-  struct NeighbourMessageEntry *me;
-  struct Neighbour *n;
-  size_t msize;
-
-  n = find_neighbour (target);
-  if (NULL == n)
-  {
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s not found\n",
-                GNUNET_i2s (target));
-    return;
-  }
-  msize = ntohs (msg->size);
-  me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
-  me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
-  me->size = msize;
-  GNUNET_memcpy (&me[1],
-                msg,
-                msize);
-  GNUNET_CONTAINER_DLL_insert_tail (n->message_head,
-                                    n->message_tail,
-                                    me);
-  n->queue_size++;
-  process_queue (n);
-}
-
-
-/**
- * One of our neighbours has excess bandwidth, remember this.
- *
- * @param cls NULL
- * @param pid identity of the peer with excess bandwidth
- */
-static void
-handle_transport_notify_excess_bw (void *cls,
-                                   const struct GNUNET_PeerIdentity *pid)
-{
-  struct Neighbour *n;
-
-  n = find_neighbour (pid);
-  if (NULL == n)
-  {
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s not found\n",
-                GNUNET_i2s (pid));
-    return;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer %s has excess bandwidth available\n",
-              GNUNET_i2s (pid));
-  n->has_excess_bandwidth = GNUNET_YES;
-  GSC_SESSIONS_solicit (pid);
-}
-
-
-/**
- * Check how many messages are queued for the given neighbour.
- *
- * @param target neighbour to check
- * @return number of items in the message queue
- */
-unsigned int
-GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target)
-{
-  struct Neighbour *n;
-
-  n = find_neighbour (target);
-  if (NULL == n)
-  {
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s not found\n",
-                GNUNET_i2s (target));
-    return UINT_MAX;
-  }
-  return n->queue_size;
-}
-
-
-/**
- * Check if the given neighbour has excess bandwidth available.
- *
- * @param target neighbour to check
- * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
- */
-int
-GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
-{
-  struct Neighbour *n;
-
-  n = find_neighbour (target);
-  if (NULL == n)
-  {
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Peer %s not found\n",
-                GNUNET_i2s (target));
-    return GNUNET_SYSERR;
-  }
-  return n->has_excess_bandwidth;
-}
-
-
-/**
- * Initialize neighbours subsystem.
- */
-int
-GSC_NEIGHBOURS_init ()
-{
-  neighbours = GNUNET_CONTAINER_multipeermap_create (128,
-                                                     GNUNET_YES);
-  transport =
-      GNUNET_TRANSPORT_connect2 (GSC_cfg,
-                                 &GSC_my_identity,
-                                 NULL,
-                                 &handle_transport_receive,
-                                 &handle_transport_notify_connect,
-                                 &handle_transport_notify_disconnect,
-                                 &handle_transport_notify_excess_bw);
-  if (NULL == transport)
-  {
-    GNUNET_CONTAINER_multipeermap_destroy (neighbours);
-    neighbours = NULL;
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Wrapper around #free_neighbour().
- *
- * @param cls unused
- * @param key peer identity
- * @param value the `struct Neighbour` to free
- * @return #GNUNET_OK (continue to iterate)
- */
-static int
-free_neighbour_helper (void *cls,
-                      const struct GNUNET_PeerIdentity * key,
-                      void *value)
-{
-  struct Neighbour *n = value;
-
-  /* transport should have 'disconnected' all neighbours... */
-  GNUNET_break (0);
-  free_neighbour (n);
-  return GNUNET_OK;
-}
-
-
-/**
- * Shutdown neighbours subsystem.
- */
-void
-GSC_NEIGHBOURS_done ()
-{
-  if (NULL != transport)
-  {
-    GNUNET_TRANSPORT_disconnect (transport);
-    transport = NULL;
-  }
-  if (NULL != neighbours)
-  {
-    GNUNET_CONTAINER_multipeermap_iterate (neighbours,
-                                           &free_neighbour_helper,
-                                          NULL);
-    GNUNET_CONTAINER_multipeermap_destroy (neighbours);
-    neighbours = NULL;
-  }
-}
-
-/* end of gnunet-service-core_neighbours.c */
diff --git a/src/core/gnunet-service-core_neighbours.h b/src/core/gnunet-service-core_neighbours.h
deleted file mode 100644 (file)
index b366d6b..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
-     This file is part of GNUnet.
-     Copyright (C) 2009-2014 GNUnet e.V.
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
-*/
-
-/**
- * @file core/gnunet-service-core_neighbours.h
- * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
- * @author Christian Grothoff
- */
-#ifndef GNUNET_SERVICE_CORE_NEIGHBOURS_H
-#define GNUNET_SERVICE_CORE_NEIGHBOURS_H
-
-#include "gnunet_util_lib.h"
-
-/**
- * Transmit the given message to the given target.  Note that a
- * non-control messages should only be transmitted after a
- * #GSC_SESSION_solicit() call was made (that call is always invoked
- * when the message queue is empty).  Outbound quotas and memory
- * bounds will then be enfoced (as #GSC_SESSION_solicit() is only called
- * if sufficient bandwidth is available).
- *
- * @param target peer that should receive the message (must be connected)
- * @param msg message to transmit
- * @param timeout by when should the transmission be done?
- */
-void
-GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
-                         const struct GNUNET_MessageHeader *msg,
-                         struct GNUNET_TIME_Relative timeout);
-
-
-/**
- * Check if the given neighbour has excess bandwidth available.
- *
- * @param target neighbour to check
- * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
- */
-int
-GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target);
-
-
-/**
- * Check how many messages are queued for the given neighbour.
- *
- * @param target neighbour to check
- * @return number of items in the message queue
- */
-unsigned int
-GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target);
-
-
-/**
- * Initialize neighbours subsystem.
- */
-int
-GSC_NEIGHBOURS_init (void);
-
-
-/**
- * Shutdown neighbours subsystem.
- */
-void
-GSC_NEIGHBOURS_done (void);
-
-
-#endif
index 41d3cc24bd2237657f0b82666990a4ebaf868e58..ef5285b45761321c6b09d1ddfa7407774abd140c 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009-2014 GNUnet e.V.
+     Copyright (C) 2009-2014, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -25,7 +25,6 @@
  */
 #include "platform.h"
 #include "gnunet-service-core.h"
-#include "gnunet-service-core_neighbours.h"
 #include "gnunet-service-core_kx.h"
 #include "gnunet-service-core_typemap.h"
 #include "gnunet-service-core_sessions.h"
@@ -88,8 +87,13 @@ struct Session
   /**
    * Identity of the other peer.
    */
-  struct GNUNET_PeerIdentity peer;
+  const struct GNUNET_PeerIdentity *peer;
 
+  /**
+   * Key exchange state for this peer.
+   */ 
+  struct GSC_KeyExchangeInfo *kx;
+  
   /**
    * Head of list of requests from clients for transmission to
    * this peer.
@@ -112,11 +116,6 @@ struct Session
    */
   struct SessionMessageEntry *sme_tail;
 
-  /**
-   * Information about the key exchange with the other peer.
-   */
-  struct GSC_KeyExchangeInfo *kxinfo;
-
   /**
    * Current type map for this peer.
    */
@@ -197,7 +196,10 @@ static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
 static struct Session *
 find_session (const struct GNUNET_PeerIdentity *peer)
 {
-  return GNUNET_CONTAINER_multipeermap_get (sessions, peer);
+  if (NULL == sessions)
+    return NULL;
+  return GNUNET_CONTAINER_multipeermap_get (sessions,
+                                           peer);
 }
 
 
@@ -218,8 +220,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
   if (NULL == session)
     return;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Destroying session for peer `%4s'\n",
-              GNUNET_i2s (&session->peer));
+              "Destroying session for peer `%s'\n",
+              GNUNET_i2s (session->peer));
   if (NULL != session->cork_task)
   {
     GNUNET_SCHEDULER_cancel (session->cork_task);
@@ -244,13 +246,15 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
     GNUNET_SCHEDULER_cancel (session->typemap_task);
     session->typemap_task = NULL;
   }
-  GSC_CLIENTS_notify_clients_about_neighbour (&session->peer,
-                                              session->tmap, NULL);
+  GSC_CLIENTS_notify_clients_about_neighbour (session->peer,
+                                              session->tmap,
+                                             NULL);
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (sessions,
-                                                       &session->peer,
+                                                       session->peer,
                                                        session));
-  GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"),
+  GNUNET_STATISTICS_set (GSC_stats,
+                        gettext_noop ("# peers connected"),
                          GNUNET_CONTAINER_multipeermap_size (sessions),
                          GNUNET_NO);
   GSC_TYPEMAP_destroy (session->tmap);
@@ -285,7 +289,7 @@ transmit_typemap_task (void *cls)
                             1,
                             GNUNET_NO);
   hdr = GSC_TYPEMAP_compute_type_map_message ();
-  GSC_KX_encrypt_and_transmit (session->kxinfo,
+  GSC_KX_encrypt_and_transmit (session->kx,
                                hdr,
                                ntohs (hdr->size));
   GNUNET_free (hdr);
@@ -327,14 +331,15 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
               GNUNET_i2s (peer));
   session = GNUNET_new (struct Session);
   session->tmap = GSC_TYPEMAP_create ();
-  session->peer = *peer;
-  session->kxinfo = kx;
+  session->peer = peer;
+  session->kx = kx;
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multipeermap_put (sessions,
-                                                    &session->peer,
+                                                    session->peer,
                                                     session,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"),
+  GNUNET_STATISTICS_set (GSC_stats,
+                        gettext_noop ("# peers connected"),
                          GNUNET_CONTAINER_multipeermap_size (sessions),
                          GNUNET_NO);
   GSC_CLIENTS_notify_clients_about_neighbour (peer,
@@ -433,7 +438,7 @@ notify_client_about_session (void *cls,
   struct Session *session = value;
 
   GSC_CLIENTS_notify_client_about_neighbour (client,
-                                             &session->peer,
+                                             session->peer,
                                              NULL,      /* old TMAP: none */
                                              session->tmap);
   return GNUNET_OK;
@@ -620,7 +625,7 @@ try_transmission (struct Session *session)
   if (GNUNET_YES != session->ready_to_transmit)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Already ready to transmit, not evaluating queue\n");
+                "Not yet ready to transmit, not evaluating queue\n");
     return;
   }
   msize = 0;
@@ -628,13 +633,13 @@ try_transmission (struct Session *session)
   /* if the peer has excess bandwidth, background traffic is allowed,
      otherwise not */
   if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
-      GSC_NEIGHBOURS_get_queue_size (&session->peer))
+      GSC_NEIGHBOURS_get_queue_length (session->kx))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Transmission queue already very long, waiting...\n");
     return; /* queue already too long */
   }
-  excess = GSC_NEIGHBOURS_check_excess_bandwidth (&session->peer);
+  excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx);
   if (GNUNET_YES == excess)
     maxp = GNUNET_CORE_PRIO_BACKGROUND;
   else
@@ -769,7 +774,7 @@ try_transmission (struct Session *session)
                            GNUNET_NO);
     /* now actually transmit... */
     session->ready_to_transmit = GNUNET_NO;
-    GSC_KX_encrypt_and_transmit (session->kxinfo,
+    GSC_KX_encrypt_and_transmit (session->kx,
                                  pbuf,
                                  used);
   }
@@ -797,7 +802,9 @@ do_restart_typemap_message (void *cls,
 
   size = ntohs (hdr->size);
   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
-  GNUNET_memcpy (&sme[1], hdr, size);
+  GNUNET_memcpy (&sme[1],
+                hdr,
+                size);
   sme->size = size;
   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
   GNUNET_CONTAINER_DLL_insert (session->sme_head,
@@ -871,12 +878,18 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
     return;
   msize = ntohs (msg->size);
   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
-  GNUNET_memcpy (&sme[1], msg, msize);
+  GNUNET_memcpy (&sme[1],
+                msg,
+                msize);
   sme->size = msize;
   sme->priority = priority;
   if (GNUNET_YES == cork)
+  {
     sme->deadline =
         GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Mesage corked, delaying transmission\n");
+  }
   pos = session->sme_head;
   while ( (NULL != pos) &&
           (pos->priority >= sme->priority) )
@@ -998,9 +1011,9 @@ free_session_helper (void *cls,
                      const struct GNUNET_PeerIdentity *key,
                      void *value)
 {
-  struct Session *session = value;
+  /* struct Session *session = value; */
 
-  GSC_SESSIONS_end (&session->peer);
+  GSC_SESSIONS_end (key);
   return GNUNET_OK;
 }
 
@@ -1014,7 +1027,8 @@ GSC_SESSIONS_done ()
   if (NULL != sessions)
   {
     GNUNET_CONTAINER_multipeermap_iterate (sessions,
-                                           &free_session_helper, NULL);
+                                           &free_session_helper,
+                                          NULL);
     GNUNET_CONTAINER_multipeermap_destroy (sessions);
     sessions = NULL;
   }
index 53f864bddec3765c8a9387377bbaddccea1a291e..a13cc87064c6efb11ff813a0061ad029b72e613e 100644 (file)
@@ -8,3 +8,4 @@ WAN_QUOTA_OUT = 64 kiB
 [core]
 PORT = 2092
 UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-core.sock
+
index 9924bef971478b87dd60d82c7be62761475fa8e8..a4a6d182cf8fb088df2ecbc1dc842ebce9a8d8e7 100644 (file)
@@ -74,6 +74,8 @@ static struct PeerContext p1;
 
 static struct PeerContext p2;
 
+static struct GNUNET_CORE_TransmitHandle *nth;
+
 static int ok;
 
 static int32_t tr_n;
@@ -131,24 +133,6 @@ terminate_peer (struct PeerContext *p)
 }
 
 
-static void
-terminate_task (void *cls)
-{
-  unsigned long long delta;
-
-  delta = GNUNET_TIME_absolute_get_duration (start_time).rel_value_us;
-  FPRINTF (stderr,
-           "\nThroughput was %llu kb/s\n",
-           total_bytes * 1000000LL / 1024 / delta);
-  GAUGER ("CORE",
-          "Core throughput/s",
-          total_bytes * 1000000LL / 1024 / delta,
-          "kb/s");
-  GNUNET_SCHEDULER_shutdown ();
-  ok = 0;
-}
-
-
 static void
 terminate_task_error (void *cls)
 {
@@ -162,13 +146,29 @@ terminate_task_error (void *cls)
 static void
 do_shutdown (void *cls)
 {
+  unsigned long long delta;
+
+  delta = GNUNET_TIME_absolute_get_duration (start_time).rel_value_us;
+  FPRINTF (stderr,
+           "\nThroughput was %llu kb/s\n",
+           total_bytes * 1000000LL / 1024 / delta);
+  GAUGER ("CORE",
+          "Core throughput/s",
+          total_bytes * 1000000LL / 1024 / delta,
+          "kb/s");
   if (NULL != err_task)
   {
     GNUNET_SCHEDULER_cancel (err_task);
     err_task = NULL;
   }
+  if (NULL != nth)
+  {
+    GNUNET_CORE_notify_transmit_ready_cancel (nth);
+    nth = NULL;
+  }
   terminate_peer (&p1);
   terminate_peer (&p2);
+  
 }
 
 
@@ -182,16 +182,19 @@ transmit_ready (void *cls,
   unsigned int s;
   unsigned int ret;
 
+  nth = NULL;
   GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
   if (NULL == buf)
   {
     if (NULL != p1.ch)
       GNUNET_break (NULL !=
-                    GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
-                                                       GNUNET_CORE_PRIO_BEST_EFFORT,
-                                                       FAST_TIMEOUT, &p2.id,
-                                                       get_size (tr_n),
-                                                       &transmit_ready, &p1));
+                    (nth = GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
+                                                             GNUNET_CORE_PRIO_BEST_EFFORT,
+                                                             FAST_TIMEOUT,
+                                                             &p2.id,
+                                                             get_size (tr_n),
+                                                             &transmit_ready,
+                                                             &p1)));
     return 0;
   }
   GNUNET_assert (tr_n < TOTAL_MSGS);
@@ -223,7 +226,8 @@ transmit_ready (void *cls,
   GNUNET_SCHEDULER_cancel (err_task);
   err_task =
       GNUNET_SCHEDULER_add_delayed (TIMEOUT,
-                                    &terminate_task_error, NULL);
+                                    &terminate_task_error,
+                                   NULL);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Returning total message block of size %u\n",
               ret);
@@ -238,9 +242,11 @@ connect_notify (void *cls,
 {
   struct PeerContext *pc = cls;
 
-  if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (&pc->id,
+                  peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
     return;
-  GNUNET_assert (pc->connect_status == 0);
+  GNUNET_assert (0 == pc->connect_status);
   pc->connect_status = 1;
   if (pc == &p1)
   {
@@ -252,14 +258,19 @@ connect_notify (void *cls,
                 GNUNET_i2s (&p2.id));
     GNUNET_SCHEDULER_cancel (err_task);
     err_task =
-        GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL);
+        GNUNET_SCHEDULER_add_delayed (TIMEOUT,
+                                     &terminate_task_error,
+                                     NULL);
     start_time = GNUNET_TIME_absolute_get ();
     GNUNET_break (NULL !=
-                  GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
-                                                     GNUNET_CORE_PRIO_BEST_EFFORT,
-                                                     TIMEOUT, &p2.id,
-                                                     get_size (0),
-                                                     &transmit_ready, &p1));
+                  (nth = GNUNET_CORE_notify_transmit_ready (p1.ch,
+                                                           GNUNET_NO,
+                                                           GNUNET_CORE_PRIO_BEST_EFFORT,
+                                                           TIMEOUT,
+                                                           &p2.id,
+                                                           get_size (0),
+                                                           &transmit_ready,
+                                                           &p1)));
   }
 }
 
@@ -342,7 +353,8 @@ process_mtype (void *cls,
                 ntohs (message->size),
                 ntohl (hdr->num));
     GNUNET_SCHEDULER_cancel (err_task);
-    err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error, NULL);
+    err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
+                                        NULL);
     return GNUNET_SYSERR;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -354,8 +366,8 @@ process_mtype (void *cls,
     FPRINTF (stderr, "%s",  ".");
   if (n == TOTAL_MSGS)
   {
-    GNUNET_SCHEDULER_cancel (err_task);
-    GNUNET_SCHEDULER_add_now (&terminate_task, NULL);
+    ok = 0;
+    GNUNET_SCHEDULER_shutdown ();
   }
   else
   {
index 42bf4e452665792bba3bebe6af32ce1251e77a15..07a40218942348f61c8123d2e0741b3160d185d4 100644 (file)
 #include "platform.h"
 #include "gnunet_arm_service.h"
 #include "gnunet_core_service.h"
-#include "gnunet_getopt_lib.h"
-#include "gnunet_os_lib.h"
-#include "gnunet_program_lib.h"
-#include "gnunet_scheduler_lib.h"
+#include "gnunet_util_lib.h"
 
 #define TIMEOUT 5
 
@@ -46,32 +43,28 @@ static struct PeerContext p1;
 
 static struct PeerContext p2;
 
-static struct GNUNET_SCHEDULER_Task * timeout_task_id;
+static struct GNUNET_SCHEDULER_Task *timeout_task_id;
 
 static int ok;
 
-#if VERBOSE
-#define OKPP do { ok++; FPRINTF (stderr, "Now at stage %u at %s:%u\n", ok, __FILE__, __LINE__); } while (0)
-#else
-#define OKPP do { ok++; } while (0)
-#endif
-
-
 
 static void
-connect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
+connect_notify (void *cls,
+               const struct GNUNET_PeerIdentity *peer)
 {
 }
 
 
 static void
-disconnect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
+disconnect_notify (void *cls,
+                  const struct GNUNET_PeerIdentity *peer)
 {
 }
 
 
 static int
-inbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
+inbound_notify (void *cls,
+               const struct GNUNET_PeerIdentity *other,
                 const struct GNUNET_MessageHeader *message)
 {
   return GNUNET_OK;
@@ -79,7 +72,8 @@ inbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
 
 
 static int
-outbound_notify (void *cls, const struct GNUNET_PeerIdentity *other,
+outbound_notify (void *cls,
+                const struct GNUNET_PeerIdentity *other,
                  const struct GNUNET_MessageHeader *message)
 {
   return GNUNET_OK;
@@ -112,33 +106,45 @@ init_notify (void *cls,
   {
     /* connect p2 */
     p2.ch =
-        GNUNET_CORE_connect (p2.cfg, &p2, &init_notify, &connect_notify,
-                             &disconnect_notify, &inbound_notify, GNUNET_YES,
-                             &outbound_notify, GNUNET_YES, handlers);
+        GNUNET_CORE_connect (p2.cfg,
+                            &p2,
+                            &init_notify,
+                            &connect_notify,
+                             &disconnect_notify,
+                            &inbound_notify, GNUNET_YES,
+                             &outbound_notify, GNUNET_YES,
+                            handlers);
   }
   else
   {
     GNUNET_assert (p == &p2);
     GNUNET_SCHEDULER_cancel (timeout_task_id);
-    GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+    timeout_task_id = NULL;
+    GNUNET_SCHEDULER_add_now (&shutdown_task,
+                             NULL);
   }
 }
 
 
 static void
-setup_peer (struct PeerContext *p, const char *cfgname)
+setup_peer (struct PeerContext *p,
+           const char *cfgname)
 {
   char *binary;
 
   binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-arm");
   p->cfg = GNUNET_CONFIGURATION_create ();
   p->arm_proc =
-    GNUNET_OS_start_process (GNUNET_YES, GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
+    GNUNET_OS_start_process (GNUNET_YES,
+                            GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
                             NULL, NULL, NULL,
                             binary,
                             "gnunet-service-arm",
-                            "-c", cfgname, NULL);
-  GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_load (p->cfg, cfgname));
+                            "-c", cfgname,
+                            NULL);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONFIGURATION_load (p->cfg,
+                                           cfgname));
   GNUNET_free (binary);
 }
 
@@ -147,12 +153,12 @@ static void
 timeout_task (void *cls)
 {
   FPRINTF (stderr, "%s",  "Timeout.\n");
-  if (p1.ch != NULL)
+  if (NULL != p1.ch)
   {
     GNUNET_CORE_disconnect (p1.ch);
     p1.ch = NULL;
   }
-  if (p2.ch != NULL)
+  if (NULL != p2.ch)
   {
     GNUNET_CORE_disconnect (p2.ch);
     p2.ch = NULL;
@@ -162,34 +168,47 @@ timeout_task (void *cls)
 
 
 static void
-run (void *cls, char *const *args, const char *cfgfile,
+run (void *cls,
+     char *const *args,
+     const char *cfgfile,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   GNUNET_assert (ok == 1);
-  OKPP;
+  ok++;
   setup_peer (&p1, "test_core_api_peer1.conf");
   setup_peer (&p2, "test_core_api_peer2.conf");
   timeout_task_id =
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                    (GNUNET_TIME_UNIT_MINUTES, TIMEOUT),
+                                    (GNUNET_TIME_UNIT_MINUTES,
+                                    TIMEOUT),
                                     &timeout_task, NULL);
-  p1.ch =
-      GNUNET_CORE_connect (p1.cfg, &p1, &init_notify, &connect_notify,
-                           &disconnect_notify, &inbound_notify, GNUNET_YES,
-                           &outbound_notify, GNUNET_YES, handlers);
+  p1.ch = GNUNET_CORE_connect (p1.cfg,
+                              &p1,
+                              &init_notify,
+                              &connect_notify,
+                              &disconnect_notify,
+                              &inbound_notify, GNUNET_YES,
+                              &outbound_notify, GNUNET_YES,
+                              handlers);
 }
 
 
 static void
 stop_arm (struct PeerContext *p)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stopping peer\n");
-  if (0 != GNUNET_OS_process_kill (p->arm_proc, GNUNET_TERM_SIG))
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
-  if (GNUNET_OS_process_wait (p->arm_proc) != GNUNET_OK)
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "waitpid");
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ARM process %u stopped\n",
-              GNUNET_OS_process_get_pid (p->arm_proc));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Stopping peer\n");
+  if (0 != GNUNET_OS_process_kill (p->arm_proc,
+                                  GNUNET_TERM_SIG))
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                        "kill");
+  if (GNUNET_OK !=
+      GNUNET_OS_process_wait (p->arm_proc))
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                        "waitpid");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "ARM process %u stopped\n",
+              (unsigned int) GNUNET_OS_process_get_pid (p->arm_proc));
   GNUNET_OS_process_destroy (p->arm_proc);
   p->arm_proc = NULL;
   GNUNET_CONFIGURATION_destroy (p->cfg);
@@ -199,7 +218,8 @@ stop_arm (struct PeerContext *p)
 static int
 check ()
 {
-  char *const argv[] = { "test-core-api-start-only",
+  char *const argv[] = {
+    "test-core-api-start-only",
     "-c",
     "test_core_api_data.conf",
     NULL
@@ -211,9 +231,15 @@ check ()
   GNUNET_DISK_directory_remove ("/tmp/test-gnunet-core-peer-2");
 
   ok = 1;
-  GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv,
-                      "test-core-api-start-only", "nohelp", options, &run, &ok);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test finished\n");
+  GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
+                     argv,
+                      "test-core-api-start-only",
+                     "nohelp",
+                     options,
+                     &run,
+                     &ok);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Test finished\n");
   stop_arm (&p1);
   stop_arm (&p2);
   return ok;
@@ -221,7 +247,8 @@ check ()
 
 
 int
-main (int argc, char *argv[])
+main (int argc,
+      char *argv[])
 {
   int ret;
 
index 3f6b3dce3f3b98fb5b77e78cf0514b904eaf70fc..eb9a1c379b381ccd371bf0522f4a7be78bb69ebd 100644 (file)
@@ -25,3 +25,4 @@ BROADCAST = NO
 
 [peerinfo]
 NO_IO = YES
+
index a0c456cef8282ecad10f4d81dd0ebb83cb2ea184..04f79b959eb2742034d841a587a8aeff7f3d7a79 100644 (file)
@@ -112,7 +112,7 @@ struct TestMessage
 static void
 terminate_peer (struct PeerContext *p)
 {
-  if (p->nth != NULL)
+  if (NULL != p->nth)
   {
     GNUNET_CORE_notify_transmit_ready_cancel (p->nth);
     p->nth = NULL;
@@ -156,9 +156,18 @@ terminate_peer (struct PeerContext *p)
 
 
 static void
-terminate_task (void *cls)
+shutdown_task (void *cls)
 {
-  err_task = NULL;
+  if (NULL != err_task)
+  {
+    GNUNET_SCHEDULER_cancel (err_task);
+    err_task = NULL;
+  }
+  if (NULL != measure_task)
+  {
+    GNUNET_SCHEDULER_cancel (measure_task);
+    measure_task = NULL;
+  }
   terminate_peer (&p1);
   terminate_peer (&p2);
 }
@@ -169,15 +178,8 @@ terminate_task_error (void *cls)
 {
   err_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-             "Testcase failed!\n");
-  terminate_peer (&p1);
-  terminate_peer (&p2);
-  //GNUNET_break (0);
-  if (NULL != measure_task)
-  {
-    GNUNET_SCHEDULER_cancel (measure_task);
-    measure_task = NULL;
-  }
+             "Testcase failed (timeout)!\n");
+  GNUNET_SCHEDULER_shutdown ();
   ok = 42;
 }
 
@@ -245,37 +247,63 @@ measurement_stop (void *cls)
     ok = 1; /* fail */
   else
     ok = 0; /* pass */
-  GNUNET_STATISTICS_get (p1.stats, "core", "# discarded CORE_SEND requests",
-                         NULL, &print_stat, &p1);
-
-  GNUNET_STATISTICS_get (p1.stats, "core",
+  GNUNET_STATISTICS_get (p1.stats,
+                        "core",
+                        "# discarded CORE_SEND requests",
+                         NULL,
+                        &print_stat,
+                        &p1);
+  GNUNET_STATISTICS_get (p1.stats,
+                        "core",
                          "# discarded CORE_SEND request bytes",
-                         NULL, &print_stat, &p1);
-  GNUNET_STATISTICS_get (p1.stats, "core",
+                         NULL,
+                        &print_stat,
+                        &p1);
+  GNUNET_STATISTICS_get (p1.stats,
+                        "core",
                          "# discarded lower priority CORE_SEND requests",
-                         NULL, &print_stat, NULL);
-  GNUNET_STATISTICS_get (p1.stats, "core",
+                         NULL,
+                        &print_stat,
+                        NULL);
+  GNUNET_STATISTICS_get (p1.stats,
+                        "core",
                          "# discarded lower priority CORE_SEND request bytes",
-                         NULL, &print_stat, &p1);
-  GNUNET_STATISTICS_get (p2.stats, "core", "# discarded CORE_SEND requests",
-                         NULL, &print_stat, &p2);
-
-  GNUNET_STATISTICS_get (p2.stats, "core",
+                         NULL,
+                        &print_stat,
+                        &p1);
+  GNUNET_STATISTICS_get (p2.stats,
+                        "core",
+                        "# discarded CORE_SEND requests",
+                         NULL,
+                        &print_stat,
+                        &p2);
+
+  GNUNET_STATISTICS_get (p2.stats,
+                        "core",
                          "# discarded CORE_SEND request bytes",
-                         NULL, &print_stat, &p2);
-  GNUNET_STATISTICS_get (p2.stats, "core",
+                         NULL,
+                        &print_stat,
+                        &p2);
+  GNUNET_STATISTICS_get (p2.stats,
+                        "core",
                          "# discarded lower priority CORE_SEND requests",
-                         NULL, &print_stat, &p2);
-  GNUNET_STATISTICS_get (p2.stats, "core",
+                         NULL,
+                        &print_stat,
+                        &p2);
+  GNUNET_STATISTICS_get (p2.stats,
+                        "core",
                          "# discarded lower priority CORE_SEND request bytes",
-                         NULL, &print_stat, &p2);
+                         NULL,
+                        &print_stat,
+                        &p2);
 
   if (ok != 0)
     kind = GNUNET_ERROR_TYPE_ERROR;
   switch (test)
   {
   case SYMMETRIC:
-    GNUNET_log (kind, "Core quota compliance test with symmetric quotas: %s\n",
+    GNUNET_log (kind,
+               "Core quota compliance test with symmetric quotas: %s\n",
                 (0 == ok) ? "PASSED" : "FAILED");
     break;
   case ASYMMETRIC_SEND_LIMITED:
@@ -289,23 +317,34 @@ measurement_stop (void *cls)
                 (0 == ok) ? "PASSED" : "FAILED");
     break;
   };
-  GNUNET_log (kind, "Peer 1 send  rate: %llu b/s (%llu bytes in %llu ms)\n",
-              throughput_out, total_bytes_sent, delta);
-  GNUNET_log (kind, "Peer 1 send quota: %llu b/s\n", current_quota_p1_out);
-  GNUNET_log (kind, "Peer 2 receive  rate: %llu b/s (%llu bytes in %llu ms)\n",
-              throughput_in, total_bytes_recv, delta);
-  GNUNET_log (kind, "Peer 2 receive quota: %llu b/s\n", current_quota_p2_in);
+  GNUNET_log (kind,
+             "Peer 1 send  rate: %llu b/s (%llu bytes in %llu ms)\n",
+              throughput_out,
+             total_bytes_sent,
+             delta);
+  GNUNET_log (kind,
+             "Peer 1 send quota: %llu b/s\n",
+             current_quota_p1_out);
+  GNUNET_log (kind,
+             "Peer 2 receive  rate: %llu b/s (%llu bytes in %llu ms)\n",
+              throughput_in,
+             total_bytes_recv,
+             delta);
+  GNUNET_log (kind,
+             "Peer 2 receive quota: %llu b/s\n",
+             current_quota_p2_in);
 /*
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Max. inbound  quota allowed: %llu b/s\n",max_quota_in );
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Max. outbound quota allowed: %llu b/s\n",max_quota_out);
 */
-  GNUNET_SCHEDULER_cancel (err_task);
-  err_task = GNUNET_SCHEDULER_add_now (&terminate_task, NULL);
+  GNUNET_SCHEDULER_shutdown ();
 }
 
 
 static size_t
-transmit_ready (void *cls, size_t size, void *buf)
+transmit_ready (void *cls,
+               size_t size,
+               void *buf)
 {
   char *cbuf = buf;
   struct TestMessage hdr;
@@ -313,43 +352,56 @@ transmit_ready (void *cls, size_t size, void *buf)
 
   p1.nth = NULL;
   GNUNET_assert (size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
-  if (buf == NULL)
+  if (NULL == buf)
   {
-    if ((p1.ch != NULL) && (p1.connect_status == 1))
+    if ( (NULL != p1.ch) &&
+        (1 == p1.connect_status) )
       GNUNET_break (NULL !=
                     (p1.nth =
-                     GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
+                     GNUNET_CORE_notify_transmit_ready (p1.ch,
+                                                       GNUNET_NO,
                                                         GNUNET_CORE_PRIO_BEST_EFFORT,
-                                                        FAST_TIMEOUT, &p2.id,
+                                                        FAST_TIMEOUT,
+                                                       &p2.id,
                                                         MESSAGESIZE,
-                                                        &transmit_ready, &p1)));
+                                                        &transmit_ready,
+                                                       &p1)));
     return 0;
   }
   GNUNET_assert (tr_n < TOTAL_MSGS);
   ret = 0;
   GNUNET_assert (size >= MESSAGESIZE);
-  GNUNET_assert (buf != NULL);
+  GNUNET_assert (NULL != buf);
   cbuf = buf;
   do
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sending message %u of size %u at offset %u\n", tr_n,
-                MESSAGESIZE, ret);
+                "Sending message %u of size %u at offset %u\n",
+               tr_n,
+                MESSAGESIZE,
+               ret);
     hdr.header.size = htons (MESSAGESIZE);
     hdr.header.type = htons (MTYPE);
     hdr.num = htonl (tr_n);
-    GNUNET_memcpy (&cbuf[ret], &hdr, sizeof (struct TestMessage));
+    GNUNET_memcpy (&cbuf[ret],
+                  &hdr,
+                  sizeof (struct TestMessage));
     ret += sizeof (struct TestMessage);
-    memset (&cbuf[ret], tr_n, MESSAGESIZE - sizeof (struct TestMessage));
+    memset (&cbuf[ret],
+           tr_n,
+           MESSAGESIZE - sizeof (struct TestMessage));
     ret += MESSAGESIZE - sizeof (struct TestMessage);
     tr_n++;
-    if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 16))
+    if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                      16))
       break;                    /* sometimes pack buffer full, sometimes not */
   }
   while (size - ret >= MESSAGESIZE);
   GNUNET_SCHEDULER_cancel (err_task);
   err_task =
-      GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL);
+      GNUNET_SCHEDULER_add_delayed (TIMEOUT,
+                                   &terminate_task_error,
+                                   NULL);
 
   total_bytes_sent += ret;
   return ret;
@@ -357,48 +409,60 @@ transmit_ready (void *cls, size_t size, void *buf)
 
 
 static void
-connect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
+connect_notify (void *cls,
+               const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *pc = cls;
 
-  if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (&pc->id,
+                  peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
     return;                     /* loopback */
-  GNUNET_assert (pc->connect_status == 0);
+  GNUNET_assert (0 == pc->connect_status);
   pc->connect_status = 1;
   if (pc == &p1)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Encrypted connection established to peer `%4s'\n",
+                "Encrypted connection established to peer `%s'\n",
                 GNUNET_i2s (peer));
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Asking core (1) for transmission to peer `%4s'\n",
+                "Asking core (1) for transmission to peer `%s'\n",
                 GNUNET_i2s (&p2.id));
     GNUNET_SCHEDULER_cancel (err_task);
     err_task =
-        GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL);
+        GNUNET_SCHEDULER_add_delayed (TIMEOUT,
+                                     &terminate_task_error,
+                                     NULL);
     start_time = GNUNET_TIME_absolute_get ();
     running = GNUNET_YES;
     measure_task =
-        GNUNET_SCHEDULER_add_delayed (MEASUREMENT_LENGTH, &measurement_stop,
+        GNUNET_SCHEDULER_add_delayed (MEASUREMENT_LENGTH,
+                                     &measurement_stop,
                                       NULL);
 
     GNUNET_break (NULL !=
                   (p1.nth =
-                   GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
+                   GNUNET_CORE_notify_transmit_ready (p1.ch,
+                                                     GNUNET_NO,
                                                       GNUNET_CORE_PRIO_BEST_EFFORT,
-                                                      TIMEOUT, &p2.id,
+                                                      TIMEOUT,
+                                                     &p2.id,
                                                       MESSAGESIZE,
-                                                      &transmit_ready, &p1)));
+                                                      &transmit_ready,
+                                                     &p1)));
   }
 }
 
 
 static void
-disconnect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
+disconnect_notify (void *cls,
+                  const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *pc = cls;
 
-  if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (&pc->id,
+                  peer,
+                  sizeof (struct GNUNET_PeerIdentity)))
     return;                     /* loopback */
   pc->connect_status = 0;
   if (NULL != measure_task)
@@ -408,12 +472,13 @@ disconnect_notify (void *cls, const struct GNUNET_PeerIdentity *peer)
     GNUNET_SCHEDULER_cancel (measure_task);
     measure_task = NULL;
   }
-  if (pc->nth != NULL)
+  if (NULL != pc->nth)
   {
     GNUNET_CORE_notify_transmit_ready_cancel (pc->nth);
     pc->nth = NULL;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Encrypted connection to `%4s' cut\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Encrypted connection to `%s' cut\n",
               GNUNET_i2s (peer));
 }
 
@@ -424,7 +489,7 @@ inbound_notify (void *cls,
                 const struct GNUNET_MessageHeader *message)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core provides inbound data from `%4s' %u.\n",
+              "Core provides inbound data from `%s' %u.\n",
               GNUNET_i2s (other),
               (unsigned int) ntohs (message->size));
   total_bytes_recv += ntohs (message->size);
@@ -438,7 +503,7 @@ outbound_notify (void *cls,
                  const struct GNUNET_MessageHeader *message)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core notifies about outbound data for `%4s'.\n",
+              "Core notifies about outbound data for `%s'.\n",
               GNUNET_i2s (other));
   return GNUNET_OK;
 }
@@ -449,7 +514,8 @@ transmit_ready (void *cls, size_t size, void *buf);
 
 
 static int
-process_mtype (void *cls, const struct GNUNET_PeerIdentity *peer,
+process_mtype (void *cls,
+              const struct GNUNET_PeerIdentity *peer,
                const struct GNUNET_MessageHeader *message)
 {
   static int n;
@@ -462,34 +528,46 @@ process_mtype (void *cls, const struct GNUNET_PeerIdentity *peer,
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Expected message %u of size %u, got %u bytes of message %u\n",
-                n, MESSAGESIZE, ntohs (message->size), ntohl (hdr->num));
+                n,
+               MESSAGESIZE,
+               ntohs (message->size),
+               ntohl (hdr->num));
     GNUNET_SCHEDULER_cancel (err_task);
-    err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error, NULL);
+    err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
+                                        NULL);
     return GNUNET_SYSERR;
   }
   if (ntohl (hdr->num) != n)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Expected message %u of size %u, got %u bytes of message %u\n",
-                n, MESSAGESIZE, ntohs (message->size), ntohl (hdr->num));
+                n,
+               MESSAGESIZE,
+               ntohs (message->size),
+               ntohl (hdr->num));
     GNUNET_SCHEDULER_cancel (err_task);
-    err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error, NULL);
+    err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error,
+                                        NULL);
     return GNUNET_SYSERR;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got message %u of size %u\n",
-              ntohl (hdr->num), ntohs (message->size));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Got message %u of size %u\n",
+              ntohl (hdr->num),
+             ntohs (message->size));
   n++;
   if (0 == (n % 10))
     FPRINTF (stderr, "%s",  ".");
 
-
-  if (running == GNUNET_YES)
+  if (GNUNET_YES == running)
     GNUNET_break (NULL !=
-                  GNUNET_CORE_notify_transmit_ready (p1.ch, GNUNET_NO,
-                                                     GNUNET_CORE_PRIO_BEST_EFFORT,
-                                                     FAST_TIMEOUT, &p2.id,
-                                                     MESSAGESIZE,
-                                                     &transmit_ready, &p1));
+                  (p1.nth = GNUNET_CORE_notify_transmit_ready (p1.ch,
+                                                              GNUNET_NO,
+                                                              GNUNET_CORE_PRIO_BEST_EFFORT,
+                                                              FAST_TIMEOUT,
+                                                              &p2.id,
+                                                              MESSAGESIZE,
+                                                              &transmit_ready,
+                                                              &p1)));
   return GNUNET_OK;
 }
 
@@ -518,9 +596,16 @@ init_notify (void *cls,
     OKPP;
     /* connect p2 */
     p2.ch =
-        GNUNET_CORE_connect (p2.cfg, &p2, &init_notify, &connect_notify,
-                             &disconnect_notify, &inbound_notify, GNUNET_YES,
-                             &outbound_notify, GNUNET_YES, handlers);
+        GNUNET_CORE_connect (p2.cfg,
+                            &p2,
+                            &init_notify,
+                            &connect_notify,
+                             &disconnect_notify,
+                            &inbound_notify,
+                            GNUNET_YES,
+                             &outbound_notify,
+                            GNUNET_YES,
+                            handlers);
   }
   else
   {
@@ -550,12 +635,13 @@ offer_hello_done (void *cls)
 
 
 static void
-process_hello (void *cls, const struct GNUNET_MessageHeader *message)
+process_hello (void *cls,
+              const struct GNUNET_MessageHeader *message)
 {
   struct PeerContext *p = cls;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received (my) `%s' from transport service\n", "HELLO");
+              "Received (my) HELLO from transport service\n");
   GNUNET_assert (message != NULL);
   p->hello = GNUNET_malloc (ntohs (message->size));
   GNUNET_memcpy (p->hello, message, ntohs (message->size));
@@ -583,7 +669,8 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message)
 
 
 static void
-setup_peer (struct PeerContext *p, const char *cfgname)
+setup_peer (struct PeerContext *p,
+           const char *cfgname)
 {
   char *binary;
 
@@ -606,13 +693,19 @@ setup_peer (struct PeerContext *p, const char *cfgname)
 
 
 static void
-run (void *cls, char *const *args, const char *cfgfile,
+run (void *cls,
+     char *const *args,
+     const char *cfgfile,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   GNUNET_assert (ok == 1);
   OKPP;
   err_task =
-      GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL);
+      GNUNET_SCHEDULER_add_delayed (TIMEOUT,
+                                   &terminate_task_error,
+                                   NULL);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
   if (test == SYMMETRIC)
   {
     setup_peer (&p1, "test_core_quota_peer1.conf");
@@ -657,11 +750,16 @@ run (void *cls, char *const *args, const char *cfgfile,
 static void
 stop_arm (struct PeerContext *p)
 {
-  if (0 != GNUNET_OS_process_kill (p->arm_proc, GNUNET_TERM_SIG))
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
-  if (GNUNET_OS_process_wait (p->arm_proc) != GNUNET_OK)
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "waitpid");
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ARM process %u stopped\n",
+  if (0 != GNUNET_OS_process_kill (p->arm_proc,
+                                  GNUNET_TERM_SIG))
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                        "kill");
+  if (GNUNET_OK !=
+      GNUNET_OS_process_wait (p->arm_proc))
+    GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                        "waitpid");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "ARM process %u stopped\n",
               GNUNET_OS_process_get_pid (p->arm_proc));
   GNUNET_OS_process_destroy (p->arm_proc);
   p->arm_proc = NULL;
@@ -672,7 +770,8 @@ stop_arm (struct PeerContext *p)
 static int
 check ()
 {
-  char *const argv[] = { "test-core-quota-compliance",
+  char *const argv[] = {
+    "test-core-quota-compliance",
     "-c",
     "test_core_api_data.conf",
     NULL
@@ -681,8 +780,12 @@ check ()
     GNUNET_GETOPT_OPTION_END
   };
   ok = 1;
-  GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, argv,
-                      "test-core-quota-compliance", "nohelp", options, &run,
+  GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
+                     argv,
+                      "test-core-quota-compliance",
+                     "nohelp",
+                     options,
+                     &run,
                       &ok);
   stop_arm (&p1);
   stop_arm (&p2);
@@ -691,7 +794,8 @@ check ()
 
 
 int
-main (int argc, char *argv[])
+main (int argc,
+      char *argv[])
 {
   int ret;