tighten formatting rules
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
index eb42d6fbce8cf4a9ca284dab5c23b3963145d3ce..6c446fc7c74d607d6190a631a53e12ddb3c02f35 100644 (file)
@@ -1,22 +1,22 @@
 /*
      This file is part of GNUnet.
-     (C) 2009-2013 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009-2014, 2016 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
+     Affero General Public License for more details.
 
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
-*/
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
+ */
 
 /**
  * @file core/gnunet-service-core_sessions.c
  */
 #include "platform.h"
 #include "gnunet-service-core.h"
-#include "gnunet-service-core_neighbours.h"
 #include "gnunet-service-core_kx.h"
 #include "gnunet-service-core_typemap.h"
 #include "gnunet-service-core_sessions.h"
-#include "gnunet-service-core_clients.h"
 #include "gnunet_constants.h"
+#include "core.h"
 
-/**
- * How often do we transmit our typemap?
- */
-#define TYPEMAP_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
 
 /**
- * How often do we transmit our typemap on first attempt?
+ * How many encrypted messages do we queue at most?
+ * Needed to bound memory consumption.
  */
-#define TYPEMAP_FREQUENCY_FIRST GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+#define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4
 
 
 /**
@@ -49,7 +45,6 @@
  */
 struct SessionMessageEntry
 {
-
   /**
    * We keep messages in a doubly linked list.
    */
@@ -60,6 +55,21 @@ struct SessionMessageEntry
    */
   struct SessionMessageEntry *prev;
 
+  /**
+   * How important is this message.
+   */
+  enum GNUNET_MQ_PriorityPreferences priority;
+
+  /**
+   * Flag set to #GNUNET_YES if this is a typemap message.
+   */
+  int is_typemap;
+
+  /**
+   * Flag set to #GNUNET_YES if this is a typemap confirmation message.
+   */
+  int is_typemap_confirm;
+
   /**
    * Deadline for transmission, 1s after we received it (if we
    * are not corking), otherwise "now".  Note that this message
@@ -68,12 +78,11 @@ struct SessionMessageEntry
   struct GNUNET_TIME_Absolute deadline;
 
   /**
-   * How long is the message? (number of bytes following the "struct
-   * MessageEntry", but not including the size of "struct
-   * MessageEntry" itself!)
+   * How long is the message? (number of bytes following the `struct
+   * MessageEntry`, but not including the size of `struct
+   * MessageEntry` itself!)
    */
   size_t size;
-
 };
 
 
@@ -85,7 +94,12 @@ struct Session
   /**
    * Identity of the other peer.
    */
-  struct GNUNET_PeerIdentity peer;
+  const struct GNUNET_PeerIdentity *peer;
+
+  /**
+   * Key exchange state for this peer.
+   */
+  struct GSC_KeyExchangeInfo *kx;
 
   /**
    * Head of list of requests from clients for transmission to
@@ -109,11 +123,6 @@ struct Session
    */
   struct SessionMessageEntry *sme_tail;
 
-  /**
-   * Information about the key exchange with the other peer.
-   */
-  struct GSC_KeyExchangeInfo *kxinfo;
-
   /**
    * Current type map for this peer.
    */
@@ -122,18 +131,18 @@ struct Session
   /**
    * Task to transmit corked messages with a delay.
    */
-  GNUNET_SCHEDULER_TaskIdentifier cork_task;
+  struct GNUNET_SCHEDULER_Task *cork_task;
 
   /**
    * Task to transmit our type map.
    */
-  GNUNET_SCHEDULER_TaskIdentifier typemap_task;
+  struct GNUNET_SCHEDULER_Task *typemap_task;
 
   /**
-   * Is the neighbour queue empty and thus ready for us
-   * to transmit an encrypted message?
+   * Retransmission delay we currently use for the typemap
+   * transmissions (if not confirmed).
    */
-  int ready_to_transmit;
+  struct GNUNET_TIME_Relative typemap_delay;
 
   /**
    * Is this the first time we're sending the typemap? If so,
@@ -144,8 +153,34 @@ struct Session
 };
 
 
+GNUNET_NETWORK_STRUCT_BEGIN
+
 /**
- * Map of peer identities to 'struct Session'.
+ * Message sent to confirm that a typemap was received.
+ */
+struct TypeMapConfirmationMessage
+{
+  /**
+   * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Reserved, always zero.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /**
+   * Hash of the (decompressed) type map that was received.
+   */
+  struct GNUNET_HashCode tm_hash;
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+
+/**
+ * Map of peer identities to `struct Session`.
  */
 static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
 
@@ -160,6 +195,8 @@ static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
 static struct Session *
 find_session (const struct GNUNET_PeerIdentity *peer)
 {
+  if (NULL == sessions)
+    return NULL;
   return GNUNET_CONTAINER_multipeermap_get (sessions, peer);
 }
 
@@ -180,32 +217,39 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
   session = find_session (pid);
   if (NULL == session)
     return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying session for peer `%4s'\n",
-              GNUNET_i2s (&session->peer));
-  if (GNUNET_SCHEDULER_NO_TASK != session->cork_task)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Destroying session for peer `%s'\n",
+              GNUNET_i2s (session->peer));
+  if (NULL != session->cork_task)
   {
     GNUNET_SCHEDULER_cancel (session->cork_task);
-    session->cork_task = GNUNET_SCHEDULER_NO_TASK;
+    session->cork_task = NULL;
   }
   while (NULL != (car = session->active_client_request_head))
   {
     GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
-                                 session->active_client_request_tail, car);
-    GSC_CLIENTS_reject_request (car);
+                                 session->active_client_request_tail,
+                                 car);
+    GSC_CLIENTS_reject_request (car, GNUNET_NO);
   }
   while (NULL != (sme = session->sme_head))
   {
     GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
     GNUNET_free (sme);
   }
-  GNUNET_SCHEDULER_cancel (session->typemap_task);
-  GSC_CLIENTS_notify_clients_about_neighbour (&session->peer,
-                                              session->tmap, NULL);
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multipeermap_remove (sessions,
-                                                       &session->peer,
-                                                       session));
-  GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"),
+  if (NULL != session->typemap_task)
+  {
+    GNUNET_SCHEDULER_cancel (session->typemap_task);
+    session->typemap_task = NULL;
+  }
+  GSC_CLIENTS_notify_clients_about_neighbour (session->peer,
+                                              session->tmap,
+                                              NULL);
+  GNUNET_assert (
+    GNUNET_YES ==
+    GNUNET_CONTAINER_multipeermap_remove (sessions, session->peer, session));
+  GNUNET_STATISTICS_set (GSC_stats,
+                         gettext_noop ("# peers connected"),
                          GNUNET_CONTAINER_multipeermap_size (sessions),
                          GNUNET_NO);
   GSC_TYPEMAP_destroy (session->tmap);
@@ -216,41 +260,54 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
 
 /**
  * Transmit our current typemap message to the other peer.
- * (Done periodically in case an update got lost).
+ * (Done periodically until the typemap is confirmed).
  *
- * @param cls the 'struct Session*'
- * @param tc unused
+ * @param cls the `struct Session *`
  */
 static void
-transmit_typemap_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+transmit_typemap_task (void *cls)
 {
   struct Session *session = cls;
   struct GNUNET_MessageHeader *hdr;
   struct GNUNET_TIME_Relative delay;
 
-  if (0 == session->first_typemap)
-  {
-    delay = TYPEMAP_FREQUENCY_FIRST;
-    session->first_typemap = 1;
-  }
-  else
-  {
-    delay = TYPEMAP_FREQUENCY;
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending TYPEMAP to %s\n",
+              GNUNET_i2s (session->peer));
+  session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay);
+  delay = session->typemap_delay;
   /* randomize a bit to avoid spont. sync */
   delay.rel_value_us +=
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000);
+    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000);
   session->typemap_task =
-      GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session);
+    GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session);
   GNUNET_STATISTICS_update (GSC_stats,
-                            gettext_noop ("# type map refreshes sent"), 1,
+                            gettext_noop ("# type map refreshes sent"),
+                            1,
                             GNUNET_NO);
   hdr = GSC_TYPEMAP_compute_type_map_message ();
-  GSC_KX_encrypt_and_transmit (session->kxinfo, hdr, ntohs (hdr->size));
+  GSC_KX_encrypt_and_transmit (session->kx, hdr, ntohs (hdr->size));
   GNUNET_free (hdr);
 }
 
 
+/**
+ * Restart the typemap task for the given session.
+ *
+ * @param session session to restart typemap transmission for
+ */
+static void
+start_typemap_task (struct Session *session)
+{
+  if (NULL != session->typemap_task)
+    GNUNET_SCHEDULER_cancel (session->typemap_task);
+  session->typemap_delay = GNUNET_TIME_UNIT_SECONDS;
+  session->typemap_task = GNUNET_SCHEDULER_add_delayed (session->typemap_delay,
+                                                        &transmit_typemap_task,
+                                                        session);
+}
+
+
 /**
  * Create a session, a key exchange was just completed.
  *
@@ -263,43 +320,126 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
 {
   struct Session *session;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating session for peer `%4s'\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Creating session for peer `%s'\n",
               GNUNET_i2s (peer));
   session = GNUNET_new (struct Session);
   session->tmap = GSC_TYPEMAP_create ();
-  session->peer = *peer;
-  session->kxinfo = kx;
-  session->typemap_task =
-      GNUNET_SCHEDULER_add_now (&transmit_typemap_task, session);
+  session->peer = peer;
+  session->kx = kx;
   GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multipeermap_put (sessions, peer,
-                                                    session,
-                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"),
+                 GNUNET_CONTAINER_multipeermap_put (
+                   sessions,
+                   session->peer,
+                   session,
+                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  GNUNET_STATISTICS_set (GSC_stats,
+                         gettext_noop ("# peers connected"),
                          GNUNET_CONTAINER_multipeermap_size (sessions),
                          GNUNET_NO);
-  GSC_CLIENTS_notify_clients_about_neighbour (peer,
-                                              NULL, session->tmap);
+  GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, session->tmap);
+  start_typemap_task (session);
+}
+
+
+/**
+ * The other peer has indicated that it 'lost' the session
+ * (KX down), reinitialize the session on our end, in particular
+ * this means to restart the typemap transmission.
+ *
+ * @param peer peer that is now connected
+ */
+void
+GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer)
+{
+  struct Session *session;
+
+  session = find_session (peer);
+  if (NULL == session)
+  {
+    /* KX/session is new for both sides; thus no need to restart what
+       has not yet begun */
+    return;
+  }
+  start_typemap_task (session);
+}
+
+
+/**
+ * The other peer has confirmed receiving our type map,
+ * check if it is current and if so, stop retransmitting it.
+ *
+ * @param peer peer that confirmed the type map
+ * @param msg confirmation message we received
+ */
+void
+GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer,
+                              const struct GNUNET_MessageHeader *msg)
+{
+  const struct TypeMapConfirmationMessage *cmsg;
+  struct Session *session;
+
+  session = find_session (peer);
+  if (NULL == session)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  if (ntohs (msg->size) != sizeof(struct TypeMapConfirmationMessage))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  cmsg = (const struct TypeMapConfirmationMessage *) msg;
+  if (GNUNET_YES != GSC_TYPEMAP_check_hash (&cmsg->tm_hash))
+  {
+    /* our typemap has changed in the meantime, do not
+       accept confirmation */
+    GNUNET_STATISTICS_update (GSC_stats,
+                              gettext_noop (
+                                "# outdated typemap confirmations received"),
+                              1,
+                              GNUNET_NO);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Got outdated typemap confirmated from peer `%s'\n",
+                GNUNET_i2s (session->peer));
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Got typemap confirmation from peer `%s'\n",
+              GNUNET_i2s (session->peer));
+  if (NULL != session->typemap_task)
+  {
+    GNUNET_SCHEDULER_cancel (session->typemap_task);
+    session->typemap_task = NULL;
+  }
+  GNUNET_STATISTICS_update (GSC_stats,
+                            gettext_noop (
+                              "# valid typemap confirmations received"),
+                            1,
+                            GNUNET_NO);
 }
 
 
 /**
  * Notify the given client about the session (client is new).
  *
- * @param cls the 'struct GSC_Client'
+ * @param cls the `struct GSC_Client`
  * @param key peer identity
- * @param value the 'struct Session'
- * @return GNUNET_OK (continue to iterate)
+ * @param value the `struct Session`
+ * @return #GNUNET_OK (continue to iterate)
  */
 static int
-notify_client_about_session (void *cls, const struct GNUNET_PeerIdentity * key,
+notify_client_about_session (void *cls,
+                             const struct GNUNET_PeerIdentity *key,
                              void *value)
 {
   struct GSC_Client *client = cls;
   struct Session *session = value;
 
-  GSC_CLIENTS_notify_client_about_neighbour (client, &session->peer,
-                                             NULL,      /* old TMAP: none */
+  GSC_CLIENTS_notify_client_about_neighbour (client,
+                                             session->peer,
+                                             NULL, /* old TMAP: none */
                                              session->tmap);
   return GNUNET_OK;
 }
@@ -314,7 +454,8 @@ void
 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
 {
   /* notify new client about existing sessions */
-  GNUNET_CONTAINER_multipeermap_iterate (sessions, &notify_client_about_session,
+  GNUNET_CONTAINER_multipeermap_iterate (sessions,
+                                         &notify_client_about_session,
                                          client);
 }
 
@@ -334,8 +475,8 @@ try_transmission (struct Session *session);
  *
  * @param car request to queue; this handle is then shared between
  *         the caller (CLIENTS subsystem) and SESSIONS and must not
- *         be released by either until either 'GNUNET_SESSIONS_dequeue',
- *         'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
+ *         be released by either until either #GSC_SESSIONS_dequeue(),
+ *         #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
  *         have been invoked on it
  */
 void
@@ -344,24 +485,25 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
   struct Session *session;
 
   session = find_session (&car->target);
-  if (session == NULL)
+  if (NULL == session)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Dropped client request for transmission (am disconnected)\n");
-    GNUNET_break (0);           /* should have been rejected earlier */
-    GSC_CLIENTS_reject_request (car);
+    GNUNET_break (0);  /* should have been rejected earlier */
+    GSC_CLIENTS_reject_request (car, GNUNET_NO);
     return;
   }
   if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
   {
     GNUNET_break (0);
-    GSC_CLIENTS_reject_request (car);
+    GSC_CLIENTS_reject_request (car, GNUNET_YES);
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received client transmission request. queueing\n");
-  GNUNET_CONTAINER_DLL_insert (session->active_client_request_head,
-                               session->active_client_request_tail, car);
+  GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head,
+                                    session->active_client_request_tail,
+                                    car);
   try_transmission (session);
 }
 
@@ -375,78 +517,68 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
 void
 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
 {
-  struct Session *s;
+  struct Session *session;
 
-  if (0 ==
-      memcmp (&car->target, &GSC_my_identity,
-              sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (&car->target,
+                   &GSC_my_identity,
+                   sizeof(struct GNUNET_PeerIdentity)))
     return;
-  s = find_session (&car->target);
-  GNUNET_assert (NULL != s);
-  GNUNET_CONTAINER_DLL_remove (s->active_client_request_head,
-                               s->active_client_request_tail, car);
-}
-
-
-/**
- * Discard all expired active transmission requests from clients.
- *
- * @param session session to clean up
- */
-static void
-discard_expired_requests (struct Session *session)
-{
-  struct GSC_ClientActiveRequest *pos;
-  struct GSC_ClientActiveRequest *nxt;
-  struct GNUNET_TIME_Absolute now;
-
-  now = GNUNET_TIME_absolute_get ();
-  pos = NULL;
-  nxt = session->active_client_request_head;
-  while (NULL != nxt)
-  {
-    pos = nxt;
-    nxt = pos->next;
-    if ((pos->deadline.abs_value_us < now.abs_value_us) &&
-        (GNUNET_YES != pos->was_solicited))
-    {
-      GNUNET_STATISTICS_update (GSC_stats,
-                                gettext_noop
-                                ("# messages discarded (expired prior to transmission)"),
-                                1, GNUNET_NO);
-      GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
-                                   session->active_client_request_tail, pos);
-      GSC_CLIENTS_reject_request (pos);
-    }
-  }
+  session = find_session (&car->target);
+  GNUNET_assert (NULL != session);
+  GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
+                               session->active_client_request_tail,
+                               car);
+  /* dequeueing of 'high' priority messages may unblock
+     transmission for lower-priority messages, so we also
+     need to try in this case. */
+  try_transmission (session);
 }
 
 
 /**
- * Solicit messages for transmission.
+ * Solicit messages for transmission, starting with those of the highest
+ * priority.
  *
  * @param session session to solict messages for
+ * @param msize how many bytes do we have already
  */
 static void
-solicit_messages (struct Session *session)
+solicit_messages (struct Session *session, size_t msize)
 {
   struct GSC_ClientActiveRequest *car;
   struct GSC_ClientActiveRequest *nxt;
   size_t so_size;
+  enum GNUNET_MQ_PriorityPreferences pmax;
 
-  discard_expired_requests (session);
-  so_size = 0;
+  so_size = msize;
+  pmax = GNUNET_MQ_PRIO_BACKGROUND;
+  for (car = session->active_client_request_head; NULL != car; car = car->next)
+  {
+    if (GNUNET_YES == car->was_solicited)
+      continue;
+    pmax = GNUNET_MAX (pmax, car->priority & GNUNET_MQ_PRIORITY_MASK);
+  }
   nxt = session->active_client_request_head;
   while (NULL != (car = nxt))
   {
     nxt = car->next;
+    if (car->priority < pmax)
+      continue;
     if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
       break;
     so_size += car->msize;
-    if (car->was_solicited == GNUNET_YES)
+    if (GNUNET_YES == car->was_solicited)
       continue;
     car->was_solicited = GNUNET_YES;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Soliciting message with priority %u\n",
+                car->priority);
     GSC_CLIENTS_solicit_request (car);
+    /* The above call may *dequeue* requests and thereby
+       clobber 'nxt'. Hence we need to restart from the
+       head of the list. */
+    nxt = session->active_client_request_head;
+    so_size = msize;
   }
 }
 
@@ -455,22 +587,22 @@ solicit_messages (struct Session *session)
  * Some messages were delayed (corked), but the timeout has now expired.
  * Send them now.
  *
- * @param cls 'struct Session' with the messages to transmit now
- * @param tc scheduler context (unused)
+ * @param cls `struct Session` with the messages to transmit now
  */
 static void
-pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+pop_cork_task (void *cls)
 {
   struct Session *session = cls;
 
-  session->cork_task = GNUNET_SCHEDULER_NO_TASK;
+  session->cork_task = NULL;
   try_transmission (session);
 }
 
 
 /**
  * Try to perform a transmission on the given session. Will solicit
- * additional messages if the 'sme' queue is not full enough.
+ * additional messages if the 'sme' queue is not full enough or has
+ * only low-priority messages.
  *
  * @param session session to transmit messages from
  */
@@ -481,52 +613,142 @@ try_transmission (struct Session *session)
   size_t msize;
   struct GNUNET_TIME_Absolute now;
   struct GNUNET_TIME_Absolute min_deadline;
+  enum GNUNET_MQ_PriorityPreferences maxp;
+  enum GNUNET_MQ_PriorityPreferences maxpc;
+  struct GSC_ClientActiveRequest *car;
+  int excess;
 
-  if (GNUNET_YES != session->ready_to_transmit)
-    return;
   msize = 0;
   min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
-  /* check 'ready' messages */
+  /* if the peer has excess bandwidth, background traffic is allowed,
+     otherwise not */
+  if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
+      GSC_NEIGHBOURS_get_queue_length (session->kx))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Transmission queue already very long, waiting...\n");
+    return;   /* queue already too long */
+  }
+  excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx);
+  if (GNUNET_YES == excess)
+    maxp = GNUNET_MQ_PRIO_BACKGROUND;
+  else
+    maxp = GNUNET_MQ_PRIO_BEST_EFFORT;
+  /* determine highest priority of 'ready' messages we already solicited from clients */
   pos = session->sme_head;
   while ((NULL != pos) &&
          (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
   {
     GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
     msize += pos->size;
+    maxp = GNUNET_MAX (maxp, pos->priority & GNUNET_MQ_PRIORITY_MASK);
     min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline);
     pos = pos->next;
   }
+  GNUNET_log (
+    GNUNET_ERROR_TYPE_DEBUG,
+    "Calculating transmission set with %u priority (%s) and %s earliest deadline\n",
+    maxp,
+    (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth",
+    GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (
+                                              min_deadline),
+                                            GNUNET_YES));
+
+  if (maxp < GNUNET_MQ_PRIO_CRITICAL_CONTROL)
+  {
+    /* if highest already solicited priority from clients is not critical,
+       check if there are higher-priority messages to be solicited from clients */
+    if (GNUNET_YES == excess)
+      maxpc = GNUNET_MQ_PRIO_BACKGROUND;
+    else
+      maxpc = GNUNET_MQ_PRIO_BEST_EFFORT;
+    for (car = session->active_client_request_head; NULL != car;
+         car = car->next)
+    {
+      if (GNUNET_YES == car->was_solicited)
+        continue;
+      maxpc = GNUNET_MAX (maxpc, car->priority & GNUNET_MQ_PRIORITY_MASK);
+    }
+    if (maxpc > maxp)
+    {
+      /* we have messages waiting for solicitation that have a higher
+         priority than those that we already accepted; solicit the
+         high-priority messages first */
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Soliciting messages based on priority (%u > %u)\n",
+                  maxpc,
+                  maxp);
+      solicit_messages (session, 0);
+      return;
+    }
+  }
+  else
+  {
+    /* never solicit more, we have critical messages to process */
+    excess = GNUNET_NO;
+    maxpc = GNUNET_MQ_PRIO_BACKGROUND;
+  }
   now = GNUNET_TIME_absolute_get ();
-  if ((msize == 0) ||
-      ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
-       (min_deadline.abs_value_us > now.abs_value_us)))
+  if (((GNUNET_YES == excess) || (maxpc >= GNUNET_MQ_PRIO_BEST_EFFORT)) &&
+      ((0 == msize) ||
+       ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
+        (min_deadline.abs_value_us > now.abs_value_us))))
   {
-    /* not enough ready yet, try to solicit more */
-    solicit_messages (session);
+    /* not enough ready yet (tiny message & cork possible), or no messages at all,
+       and either excess bandwidth or best-effort or higher message waiting at
+       client; in this case, we try to solicit more */
+    GNUNET_log (
+      GNUNET_ERROR_TYPE_DEBUG,
+      "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n",
+      excess,
+      maxpc,
+      (unsigned int) msize,
+      GNUNET_STRINGS_relative_time_to_string (
+        GNUNET_TIME_absolute_get_remaining (
+          min_deadline),
+        GNUNET_YES));
+    solicit_messages (session, msize);
     if (msize > 0)
     {
       /* if there is data to send, just not yet, make sure we do transmit
        * it once the deadline is reached */
-      if (session->cork_task != GNUNET_SCHEDULER_NO_TASK)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Corking until %s\n",
+                  GNUNET_STRINGS_relative_time_to_string (
+                    GNUNET_TIME_absolute_get_remaining (min_deadline),
+                    GNUNET_YES));
+      if (NULL != session->cork_task)
         GNUNET_SCHEDULER_cancel (session->cork_task);
       session->cork_task =
-          GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
-                                        (min_deadline), &pop_cork_task,
-                                        session);
+        GNUNET_SCHEDULER_add_at (min_deadline, &pop_cork_task, session);
+    }
+    else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Queue empty, waiting for solicitations\n");
     }
     return;
   }
-  /* create plaintext buffer of all messages, encrypt and transmit */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Building combined plaintext buffer to transmit message!\n");
+  /* create plaintext buffer of all messages (that fit), encrypt and
+     transmit */
   {
     static unsigned long long total_bytes;
     static unsigned int total_msgs;
-    char pbuf[msize];           /* plaintext */
+    char pbuf[msize]; /* plaintext */
     size_t used;
 
     used = 0;
     while ((NULL != (pos = session->sme_head)) && (used + pos->size <= msize))
     {
-      memcpy (&pbuf[used], &pos[1], pos->size);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Adding message of type %d (%d/%d) to payload for %s\n",
+                  ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
+                  pos->is_typemap,
+                  pos->is_typemap_confirm,
+                  GNUNET_i2s (session->peer));
+      GNUNET_memcpy (&pbuf[used], &pos[1], pos->size);
       used += pos->size;
       GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, pos);
       GNUNET_free (pos);
@@ -540,52 +762,73 @@ try_transmission (struct Session *session)
       total_msgs = 1;
       total_bytes = used;
     }
-    GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message",
-                           total_bytes / total_msgs, GNUNET_NO);
+    GNUNET_STATISTICS_set (GSC_stats,
+                           "# avg payload per encrypted message",
+                           total_bytes / total_msgs,
+                           GNUNET_NO);
     /* now actually transmit... */
-    session->ready_to_transmit = GNUNET_NO;
-    GSC_KX_encrypt_and_transmit (session->kxinfo, pbuf, used);
+    GSC_KX_encrypt_and_transmit (session->kx, pbuf, used);
   }
 }
 
 
 /**
- * Send a message to the neighbour now.
+ * Send an updated typemap message to the neighbour now,
+ * and restart typemap transmissions.
  *
  * @param cls the message
  * @param key neighbour's identity
- * @param value 'struct Neighbour' of the target
- * @return always GNUNET_OK
+ * @param value `struct Neighbour` of the target
+ * @return always #GNUNET_OK
  */
 static int
-do_send_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
+do_restart_typemap_message (void *cls,
+                            const struct GNUNET_PeerIdentity *key,
+                            void *value)
 {
   const struct GNUNET_MessageHeader *hdr = cls;
   struct Session *session = value;
-  struct SessionMessageEntry *m;
+  struct SessionMessageEntry *sme;
   uint16_t size;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Restarting sending TYPEMAP to %s\n",
+              GNUNET_i2s (session->peer));
   size = ntohs (hdr->size);
-  m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
-  memcpy (&m[1], hdr, size);
-  m->size = size;
-  GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, m);
+  for (sme = session->sme_head; NULL != sme; sme = sme->next)
+  {
+    if (GNUNET_YES == sme->is_typemap)
+    {
+      GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
+      GNUNET_free (sme);
+      break;
+    }
+  }
+  sme = GNUNET_malloc (sizeof(struct SessionMessageEntry) + size);
+  sme->is_typemap = GNUNET_YES;
+  GNUNET_memcpy (&sme[1], hdr, size);
+  sme->size = size;
+  sme->priority = GNUNET_MQ_PRIO_CRITICAL_CONTROL;
+  GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, sme);
   try_transmission (session);
+  start_typemap_task (session);
   return GNUNET_OK;
 }
 
 
 /**
- * Broadcast a message to all neighbours.
+ * Broadcast an updated typemap message to all neighbours.
+ * Restarts the retransmissions until the typemaps are confirmed.
  *
  * @param msg message to transmit
  */
 void
-GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg)
+GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg)
 {
   if (NULL == sessions)
     return;
-  GNUNET_CONTAINER_multipeermap_iterate (sessions, &do_send_message,
+  GNUNET_CONTAINER_multipeermap_iterate (sessions,
+                                         &do_restart_typemap_message,
                                          (void *) msg);
 }
 
@@ -602,10 +845,12 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
 {
   struct Session *session;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Transport solicits for %s\n",
+              GNUNET_i2s (pid));
   session = find_session (pid);
   if (NULL == session)
     return;
-  session->ready_to_transmit = GNUNET_YES;
   try_transmission (session);
 }
 
@@ -616,86 +861,51 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
  * @param car original request that was queued and then solicited;
  *            this handle will now be 'owned' by the SESSIONS subsystem
  * @param msg message to transmit
- * @param cork is corking allowed?
+ * @param priority how important is this message
  */
 void
 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
-                       const struct GNUNET_MessageHeader *msg, int cork)
+                       const struct GNUNET_MessageHeader *msg,
+                       enum GNUNET_MQ_PriorityPreferences priority)
 {
   struct Session *session;
   struct SessionMessageEntry *sme;
+  struct SessionMessageEntry *pos;
   size_t msize;
 
   session = find_session (&car->target);
   if (NULL == session)
     return;
   msize = ntohs (msg->size);
-  sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
-  memcpy (&sme[1], msg, msize);
+  sme = GNUNET_malloc (sizeof(struct SessionMessageEntry) + msize);
+  GNUNET_memcpy (&sme[1], msg, msize);
   sme->size = msize;
-  if (GNUNET_YES == cork)
+  sme->priority = priority;
+  if (0 != (GNUNET_MQ_PREF_CORK_ALLOWED & priority))
+  {
     sme->deadline =
-        GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
-  GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, sme);
+      GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Mesage corked, delaying transmission\n");
+  }
+  pos = session->sme_head;
+  while ((NULL != pos) && (pos->priority >= sme->priority))
+    pos = pos->next;
+  if (NULL == pos)
+    GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
+                                      session->sme_tail,
+                                      sme);
+  else
+    GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
+                                       session->sme_tail,
+                                       pos->prev,
+                                       sme);
   try_transmission (session);
 }
 
 
 /**
- * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
- *
- * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
- * @param key identity of the connected peer
- * @param value the 'struct Neighbour' for the peer
- * @return GNUNET_OK (continue to iterate)
- */
-#include "core.h"
-static int
-queue_connect_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
-{
-  struct GNUNET_SERVER_TransmitContext *tc = cls;
-  struct Session *session = value;
-  struct ConnectNotifyMessage cnm;
-
-  /* FIXME: code duplication with clients... */
-  cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
-  cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
-  cnm.reserved = htonl (0);
-  cnm.peer = session->peer;
-  GNUNET_SERVER_transmit_context_append_message (tc, &cnm.header);
-  return GNUNET_OK;
-}
-
-
-/**
- * Handle CORE_ITERATE_PEERS request. For this request type, the client
- * does not have to have transmitted an INIT request.  All current peers
- * are returned, regardless of which message types they accept.
- *
- * @param cls unused
- * @param client client sending the iteration request
- * @param message iteration request message
- */
-void
-GSC_SESSIONS_handle_client_iterate_peers (void *cls,
-                                          struct GNUNET_SERVER_Client *client,
-                                          const struct GNUNET_MessageHeader
-                                          *message)
-{
-  struct GNUNET_MessageHeader done_msg;
-  struct GNUNET_SERVER_TransmitContext *tc;
-
-  tc = GNUNET_SERVER_transmit_context_create (client);
-  GNUNET_CONTAINER_multipeermap_iterate (sessions, &queue_connect_message, tc);
-  done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
-  done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
-  GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
-  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
-}
-
-
-/**
- * We've received a typemap message from a peer, update ours.
+ * We have received a typemap message from a peer, update ours.
  * Notifies clients about the session.
  *
  * @param peer peer this is about
@@ -707,18 +917,48 @@ GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer,
 {
   struct Session *session;
   struct GSC_TypeMap *nmap;
+  struct SessionMessageEntry *sme;
+  struct TypeMapConfirmationMessage *tmc;
 
   nmap = GSC_TYPEMAP_get_from_message (msg);
   if (NULL == nmap)
-    return;                     /* malformed */
+  {
+    GNUNET_break_op (0);
+    return;   /* malformed */
+  }
   session = find_session (peer);
   if (NULL == session)
   {
+    GSC_TYPEMAP_destroy (nmap);
     GNUNET_break (0);
     return;
   }
-  GSC_CLIENTS_notify_clients_about_neighbour (peer,
-                                              session->tmap, nmap);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received TYPEMAP from %s\n",
+              GNUNET_i2s (session->peer));
+  for (sme = session->sme_head; NULL != sme; sme = sme->next)
+  {
+    if (GNUNET_YES == sme->is_typemap_confirm)
+    {
+      GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
+      GNUNET_free (sme);
+      break;
+    }
+  }
+  sme = GNUNET_malloc (sizeof(struct SessionMessageEntry)
+                       + sizeof(struct TypeMapConfirmationMessage));
+  sme->deadline = GNUNET_TIME_absolute_get ();
+  sme->size = sizeof(struct TypeMapConfirmationMessage);
+  sme->priority = GNUNET_MQ_PRIO_CRITICAL_CONTROL;
+  sme->is_typemap_confirm = GNUNET_YES;
+  tmc = (struct TypeMapConfirmationMessage *) &sme[1];
+  tmc->header.size = htons (sizeof(struct TypeMapConfirmationMessage));
+  tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP);
+  tmc->reserved = htonl (0);
+  GSC_TYPEMAP_hash (nmap, &tmc->tm_hash);
+  GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, sme);
+  try_transmission (session);
+  GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap);
   GSC_TYPEMAP_destroy (session->tmap);
   session->tmap = nmap;
 }
@@ -739,15 +979,14 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
   struct Session *session;
   struct GSC_TypeMap *nmap;
 
-  if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
+  if (0 == memcmp (peer, &GSC_my_identity, sizeof(struct GNUNET_PeerIdentity)))
     return;
   session = find_session (peer);
   GNUNET_assert (NULL != session);
   if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1))
-    return;                     /* already in it */
+    return; /* already in it */
   nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1);
-  GSC_CLIENTS_notify_clients_about_neighbour (peer,
-                                              session->tmap, nmap);
+  GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap);
   GSC_TYPEMAP_destroy (session->tmap);
   session->tmap = nmap;
 }
@@ -759,24 +998,27 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
 void
 GSC_SESSIONS_init ()
 {
-  sessions = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
+  sessions = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
 }
 
 
 /**
- * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
+ * Helper function for #GSC_SESSIONS_done() to free all
+ * active sessions.
  *
  * @param cls NULL
  * @param key identity of the connected peer
- * @param value the 'struct Session' for the peer
- * @return GNUNET_OK (continue to iterate)
+ * @param value the `struct Session` for the peer
+ * @return #GNUNET_OK (continue to iterate)
  */
 static int
-free_session_helper (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
+free_session_helper (void *cls,
+                     const struct GNUNET_PeerIdentity *key,
+                     void *value)
 {
-  struct Session *session = value;
+  /* struct Session *session = value; */
 
-  GSC_SESSIONS_end (&session->peer);
+  GSC_SESSIONS_end (key);
   return GNUNET_OK;
 }
 
@@ -789,10 +1031,13 @@ GSC_SESSIONS_done ()
 {
   if (NULL != sessions)
   {
-    GNUNET_CONTAINER_multipeermap_iterate (sessions, &free_session_helper, NULL);
+    GNUNET_CONTAINER_multipeermap_iterate (sessions,
+                                           &free_session_helper,
+                                           NULL);
     GNUNET_CONTAINER_multipeermap_destroy (sessions);
     sessions = NULL;
   }
 }
 
+
 /* end of gnunet-service-core_sessions.c */