hxing typemap
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
index 13593e9d67698248e887e400524c41c31e3af3a3..e97954438229ca2591e304f5b8cb3b59bcf91fc6 100644 (file)
@@ -1,11 +1,39 @@
-/* code for managing of 'encrypted' sessions (key exchange done) */
+/*
+     This file is part of GNUnet.
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
 
+/**
+ * @file core/gnunet-service-core_sessions.c
+ * @brief code for managing of 'encrypted' sessions (key exchange done) 
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_service_core.h"
+#include "gnunet_service_core_neighbours.h"
+#include "gnunet_service_core_kx.h"
+#include "gnunet_service_core_sessions.h"
 
 /**
  * Record kept for each request for transmission issued by a
  * client that is still pending.
  */
-struct ClientActiveRequest;
+struct GSC_ClientActiveRequest;
 
 /**
  * Data kept per session.
@@ -21,13 +49,13 @@ struct Session
    * Head of list of requests from clients for transmission to
    * this peer.
    */
-  struct ClientActiveRequest *active_client_request_head;
+  struct GSC_ClientActiveRequest *active_client_request_head;
 
   /**
    * Tail of list of requests from clients for transmission to
    * this peer.
    */
-  struct ClientActiveRequest *active_client_request_tail;
+  struct GSC_ClientActiveRequest *active_client_request_tail;
 
   /**
    * Performance data for the peer.
@@ -39,11 +67,6 @@ struct Session
    */
   struct GSC_KeyExchangeInfo *kxinfo;
 
-  /**
-   * ID of task used for sending keep-alive pings.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier keep_alive_task;
-
   /**
    * ID of task used for cleaning up dead neighbour entries.
    */
@@ -66,16 +89,6 @@ struct Session
    */
   struct GNUNET_TIME_Absolute last_activity;
 
-  /**
-   * Tracking bandwidth for sending to this peer.
-   */
-  struct GNUNET_BANDWIDTH_Tracker available_send_window;
-
-  /**
-   * Tracking bandwidth for receiving from this peer.
-   */
-  struct GNUNET_BANDWIDTH_Tracker available_recv_window;
-
   /**
    * How valueable were the messages of this peer recently?
    */
@@ -86,23 +99,6 @@ struct Session
    */
   unsigned int ats_count;
 
-  /**
-   * Bit map indicating which of the 32 sequence numbers before the last
-   * were received (good for accepting out-of-order packets and
-   * estimating reliability of the connection)
-   */
-  unsigned int last_packets_bitmap;
-
-  /**
-   * last sequence number received on this connection (highest)
-   */
-  uint32_t last_sequence_number_received;
-
-  /**
-   * last sequence number transmitted
-   */
-  uint32_t last_sequence_number_sent;
-
   /**
    * Available bandwidth in for this peer (current target).
    */
@@ -280,9 +276,8 @@ handle_peer_status_change (struct Neighbour *n)
 static void
 schedule_peer_messages (struct Neighbour *n)
 {
-  struct SendMessageReady smr;
-  struct ClientActiveRequest *car;
-  struct ClientActiveRequest *pos;
+  struct GSC_ClientActiveRequest *car;
+  struct GSC_ClientActiveRequest *pos;
   struct Client *c;
   struct MessageEntry *mqe;
   unsigned int queue_size;
@@ -326,20 +321,7 @@ schedule_peer_messages (struct Neighbour *n)
               "Permitting client transmission request to `%s'\n",
               GNUNET_i2s (&n->peer));
 #endif
-  c = car->client;
-  GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
-                               n->active_client_request_tail, car);
-  GNUNET_assert (GNUNET_YES ==
-                 GNUNET_CONTAINER_multihashmap_remove (c->requests,
-                                                       &n->peer.hashPubKey,
-                                                       car));
-  smr.header.size = htons (sizeof (struct SendMessageReady));
-  smr.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_READY);
-  smr.size = htons (car->msize);
-  smr.smr_id = car->smr_id;
-  smr.peer = n->peer;
-  send_to_client (c, &smr.header, GNUNET_NO);
-  GNUNET_free (car);
+  GSC_CLIENTS_solicite_request (car);
 }
 
 
@@ -354,7 +336,7 @@ static void
 free_neighbour (struct Neighbour *n)
 {
   struct MessageEntry *m;
-  struct ClientActiveRequest *car;
+  struct GSC_ClientActiveRequest *car;
 
 #if DEBUG_CORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -396,8 +378,6 @@ free_neighbour (struct Neighbour *n)
   if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (n->quota_update_task);
   if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel (n->dead_clean_task);
-  if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (n->keep_alive_task);
   if (n->status == PEER_STATE_KEY_CONFIRMED)
     GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
@@ -410,64 +390,6 @@ free_neighbour (struct Neighbour *n)
 
 
 
-/**
- * Task triggered when a neighbour entry is about to time out
- * (and we should prevent this by sending a PING).
- *
- * @param cls the 'struct Neighbour'
- * @param tc scheduler context (not used)
- */
-static void
-send_keep_alive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Neighbour *n = cls;
-  struct GNUNET_TIME_Relative retry;
-  struct GNUNET_TIME_Relative left;
-  struct MessageEntry *me;
-  struct PingMessage pp;
-  struct PingMessage *pm;
-  struct GNUNET_CRYPTO_AesInitializationVector iv;
-
-  n->keep_alive_task = GNUNET_SCHEDULER_NO_TASK;
-  /* send PING */
-  me = GNUNET_malloc (sizeof (struct MessageEntry) +
-                      sizeof (struct PingMessage));
-  me->deadline = GNUNET_TIME_relative_to_absolute (MAX_PING_DELAY);
-  me->priority = PING_PRIORITY;
-  me->size = sizeof (struct PingMessage);
-  GNUNET_CONTAINER_DLL_insert_after (n->encrypted_head, n->encrypted_tail,
-                                     n->encrypted_tail, me);
-  pm = (struct PingMessage *) &me[1];
-  pm->header.size = htons (sizeof (struct PingMessage));
-  pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING);
-  pm->iv_seed =
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-  derive_iv (&iv, &n->encrypt_key, pm->iv_seed, &n->peer);
-  pp.challenge = n->ping_challenge;
-  pp.target = n->peer;
-#if DEBUG_HANDSHAKE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Encrypting `%s' message with challenge %u for `%4s' using key %u, IV %u (salt %u).\n",
-              "PING", (unsigned int) n->ping_challenge, GNUNET_i2s (&n->peer),
-              (unsigned int) n->encrypt_key.crc32, GNUNET_CRYPTO_crc32_n (&iv,
-                                                                          sizeof
-                                                                          (iv)),
-              pm->iv_seed);
-#endif
-  do_encrypt (n, &iv, &pp.target, &pm->target,
-              sizeof (struct PingMessage) - ((void *) &pm->target -
-                                             (void *) pm));
-  process_encrypted_neighbour_queue (n);
-  /* reschedule PING job */
-  left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n));
-  retry =
-      GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2),
-                                MIN_PING_FREQUENCY);
-  n->keep_alive_task =
-      GNUNET_SCHEDULER_add_delayed (retry, &send_keep_alive, n);
-
-}
-
 /**
  * Consider freeing the given neighbour since we may not need
  * to keep it around anymore.
@@ -594,6 +516,525 @@ notify_encrypted_transmit_ready (void *cls, size_t size, void *buf)
 }
 
 
+
+
+
+/**
+ * Select messages for transmission.  This heuristic uses a combination
+ * of earliest deadline first (EDF) scheduling (with bounded horizon)
+ * and priority-based discard (in case no feasible schedule exist) and
+ * speculative optimization (defer any kind of transmission until
+ * we either create a batch of significant size, 25% of max, or until
+ * we are close to a deadline).  Furthermore, when scheduling the
+ * heuristic also packs as many messages into the batch as possible,
+ * starting with those with the earliest deadline.  Yes, this is fun.
+ *
+ * @param n neighbour to select messages from
+ * @param size number of bytes to select for transmission
+ * @param retry_time set to the time when we should try again
+ *        (only valid if this function returns zero)
+ * @return number of bytes selected, or 0 if we decided to
+ *         defer scheduling overall; in that case, retry_time is set.
+ */
+static size_t
+select_messages (struct Neighbour *n, size_t size,
+                 struct GNUNET_TIME_Relative *retry_time)
+{
+  struct MessageEntry *pos;
+  struct MessageEntry *min;
+  struct MessageEntry *last;
+  unsigned int min_prio;
+  struct GNUNET_TIME_Absolute t;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative delta;
+  uint64_t avail;
+  struct GNUNET_TIME_Relative slack;    /* how long could we wait before missing deadlines? */
+  size_t off;
+  uint64_t tsize;
+  unsigned int queue_size;
+  int discard_low_prio;
+
+  GNUNET_assert (NULL != n->messages);
+  now = GNUNET_TIME_absolute_get ();
+  /* last entry in linked list of messages processed */
+  last = NULL;
+  /* should we remove the entry with the lowest
+   * priority from consideration for scheduling at the
+   * end of the loop? */
+  queue_size = 0;
+  tsize = 0;
+  pos = n->messages;
+  while (pos != NULL)
+  {
+    queue_size++;
+    tsize += pos->size;
+    pos = pos->next;
+  }
+  discard_low_prio = GNUNET_YES;
+  while (GNUNET_YES == discard_low_prio)
+  {
+    min = NULL;
+    min_prio = UINT_MAX;
+    discard_low_prio = GNUNET_NO;
+    /* calculate number of bytes available for transmission at time "t" */
+    avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window);
+    t = now;
+    /* how many bytes have we (hypothetically) scheduled so far */
+    off = 0;
+    /* maximum time we can wait before transmitting anything
+     * and still make all of our deadlines */
+    slack = GNUNET_TIME_UNIT_FOREVER_REL;
+    pos = n->messages;
+    /* note that we use "*2" here because we want to look
+     * a bit further into the future; much more makes no
+     * sense since new message might be scheduled in the
+     * meantime... */
+    while ((pos != NULL) && (off < size * 2))
+    {
+      if (pos->do_transmit == GNUNET_YES)
+      {
+        /* already removed from consideration */
+        pos = pos->next;
+        continue;
+      }
+      if (discard_low_prio == GNUNET_NO)
+      {
+        delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline);
+        if (delta.rel_value > 0)
+        {
+          // FIXME: HUH? Check!
+          t = pos->deadline;
+          avail +=
+              GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta);
+        }
+        if (avail < pos->size)
+        {
+          // FIXME: HUH? Check!
+          discard_low_prio = GNUNET_YES;        /* we could not schedule this one! */
+        }
+        else
+        {
+          avail -= pos->size;
+          /* update slack, considering both its absolute deadline
+           * and relative deadlines caused by other messages
+           * with their respective load */
+          slack =
+              GNUNET_TIME_relative_min (slack,
+                                        GNUNET_BANDWIDTH_value_get_delay_for
+                                        (n->bw_out, avail));
+          if (pos->deadline.abs_value <= now.abs_value)
+          {
+            /* now or never */
+            slack = GNUNET_TIME_UNIT_ZERO;
+          }
+          else if (GNUNET_YES == pos->got_slack)
+          {
+            /* should be soon now! */
+            slack =
+                GNUNET_TIME_relative_min (slack,
+                                          GNUNET_TIME_absolute_get_remaining
+                                          (pos->slack_deadline));
+          }
+          else
+          {
+            slack =
+                GNUNET_TIME_relative_min (slack,
+                                          GNUNET_TIME_absolute_get_difference
+                                          (now, pos->deadline));
+            pos->got_slack = GNUNET_YES;
+            pos->slack_deadline =
+                GNUNET_TIME_absolute_min (pos->deadline,
+                                          GNUNET_TIME_relative_to_absolute
+                                          (GNUNET_CONSTANTS_MAX_CORK_DELAY));
+          }
+        }
+      }
+      off += pos->size;
+      t = GNUNET_TIME_absolute_max (pos->deadline, t);  // HUH? Check!
+      if (pos->priority <= min_prio)
+      {
+        /* update min for discard */
+        min_prio = pos->priority;
+        min = pos;
+      }
+      pos = pos->next;
+    }
+    if (discard_low_prio)
+    {
+      GNUNET_assert (min != NULL);
+      /* remove lowest-priority entry from consideration */
+      min->do_transmit = GNUNET_YES;    /* means: discard (for now) */
+    }
+    last = pos;
+  }
+  /* guard against sending "tiny" messages with large headers without
+   * urgent deadlines */
+  if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) &&
+      (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2))
+  {
+    /* less than 25% of message would be filled with deadlines still
+     * being met if we delay by one second or more; so just wait for
+     * more data; but do not wait longer than 1s (since we don't want
+     * to delay messages for a really long time either). */
+    *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY;
+    /* reset do_transmit values for next time */
+    while (pos != last)
+    {
+      pos->do_transmit = GNUNET_NO;
+      pos = pos->next;
+    }
+    GNUNET_STATISTICS_update (stats,
+                              gettext_noop
+                              ("# transmissions delayed due to corking"), 1,
+                              GNUNET_NO);
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n",
+                (unsigned long long) retry_time->rel_value, (unsigned int) off,
+                (unsigned int) size);
+#endif
+    return 0;
+  }
+  /* select marked messages (up to size) for transmission */
+  off = 0;
+  pos = n->messages;
+  while (pos != last)
+  {
+    if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO))
+    {
+      pos->do_transmit = GNUNET_YES;    /* mark for transmission */
+      off += pos->size;
+      size -= pos->size;
+#if DEBUG_CORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Selecting message of size %u for transmission\n",
+                  (unsigned int) pos->size);
+#endif
+    }
+    else
+    {
+#if DEBUG_CORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Not selecting message of size %u for transmission at this time (maximum is %u)\n",
+                  (unsigned int) pos->size, size);
+#endif
+      pos->do_transmit = GNUNET_NO;     /* mark for not transmitting! */
+    }
+    pos = pos->next;
+  }
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n",
+              (unsigned long long) off, (unsigned long long) tsize, queue_size,
+              (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer));
+#endif
+  return off;
+}
+
+
+/**
+ * Batch multiple messages into a larger buffer.
+ *
+ * @param n neighbour to take messages from
+ * @param buf target buffer
+ * @param size size of buf
+ * @param deadline set to transmission deadline for the result
+ * @param retry_time set to the time when we should try again
+ *        (only valid if this function returns zero)
+ * @param priority set to the priority of the batch
+ * @return number of bytes written to buf (can be zero)
+ */
+static size_t
+batch_message (struct Neighbour *n, char *buf, size_t size,
+               struct GNUNET_TIME_Absolute *deadline,
+               struct GNUNET_TIME_Relative *retry_time, unsigned int *priority)
+{
+  char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
+  struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb;
+  struct MessageEntry *pos;
+  struct MessageEntry *prev;
+  struct MessageEntry *next;
+  size_t ret;
+
+  ret = 0;
+  *priority = 0;
+  *deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
+  *retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
+  if (0 == select_messages (n, size, retry_time))
+  {
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No messages selected, will try again in %llu ms\n",
+                retry_time->rel_value);
+#endif
+    return 0;
+  }
+  ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND);
+  ntm->ats_count = htonl (0);
+  ntm->ats.type = htonl (0);
+  ntm->ats.value = htonl (0);
+  ntm->peer = n->peer;
+  pos = n->messages;
+  prev = NULL;
+  while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader)))
+  {
+    next = pos->next;
+    if (GNUNET_YES == pos->do_transmit)
+    {
+      GNUNET_assert (pos->size <= size);
+      /* do notifications */
+      /* FIXME: track if we have *any* client that wants
+       * full notifications and only do this if that is
+       * actually true */
+      if (pos->size <
+          GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage))
+      {
+        memcpy (&ntm[1], &pos[1], pos->size);
+        ntm->header.size =
+            htons (sizeof (struct NotifyTrafficMessage) +
+                   sizeof (struct GNUNET_MessageHeader));
+        send_to_all_clients (&ntm->header, GNUNET_YES,
+                             GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND);
+      }
+      else
+      {
+        /* message too large for 'full' notifications, we do at
+         * least the 'hdr' type */
+        memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader));
+      }
+      ntm->header.size =
+          htons (sizeof (struct NotifyTrafficMessage) + pos->size);
+      send_to_all_clients (&ntm->header, GNUNET_YES,
+                           GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND);
+#if DEBUG_HANDSHAKE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Encrypting %u bytes with message of type %u and size %u\n",
+                  pos->size,
+                  (unsigned int)
+                  ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
+                  (unsigned int)
+                  ntohs (((const struct GNUNET_MessageHeader *)
+                          &pos[1])->size));
+#endif
+      /* copy for encrypted transmission */
+      memcpy (&buf[ret], &pos[1], pos->size);
+      ret += pos->size;
+      size -= pos->size;
+      *priority += pos->priority;
+#if DEBUG_CORE > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Adding plaintext message of size %u with deadline %llu ms to batch\n",
+                  (unsigned int) pos->size,
+                  (unsigned long long)
+                  GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value);
+#endif
+      deadline->abs_value =
+          GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value);
+      GNUNET_free (pos);
+      if (prev == NULL)
+        n->messages = next;
+      else
+        prev->next = next;
+    }
+    else
+    {
+      prev = pos;
+    }
+    pos = next;
+  }
+#if DEBUG_CORE > 1
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Deadline for message batch is %llu ms\n",
+              GNUNET_TIME_absolute_get_remaining (*deadline).rel_value);
+#endif
+  return ret;
+}
+
+
+/**
+ * Remove messages with deadlines that have long expired from
+ * the queue.
+ *
+ * @param n neighbour to inspect
+ */
+static void
+discard_expired_messages (struct Neighbour *n)
+{
+  struct MessageEntry *prev;
+  struct MessageEntry *next;
+  struct MessageEntry *pos;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative delta;
+  int disc;
+  unsigned int queue_length;
+
+  disc = GNUNET_NO;
+  now = GNUNET_TIME_absolute_get ();
+  prev = NULL;
+  queue_length = 0;
+  pos = n->messages;
+  while (pos != NULL)
+  {
+    queue_length++;
+    next = pos->next;
+    delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
+    if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value)
+    {
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Message is %llu ms past due, discarding.\n",
+                  delta.rel_value);
+#endif
+      if (prev == NULL)
+        n->messages = next;
+      else
+        prev->next = next;
+      GNUNET_STATISTICS_update (stats,
+                                gettext_noop
+                                ("# messages discarded (expired prior to transmission)"),
+                                1, GNUNET_NO);
+      disc = GNUNET_YES;
+      GNUNET_free (pos);
+    }
+    else
+      prev = pos;
+    pos = next;
+  }
+  if ( (GNUNET_YES == disc) &&
+       (queue_length == MAX_PEER_QUEUE_SIZE) )
+    schedule_peer_messages (n);
+}
+
+
+/**
+ * Signature of the main function of a task.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+retry_plaintext_processing (void *cls,
+                            const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Neighbour *n = cls;
+
+  n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
+  process_plaintext_neighbour_queue (n);
+}
+
+
+/**
+ * Check if we have plaintext messages for the specified neighbour
+ * pending, and if so, consider batching and encrypting them (and
+ * then trigger processing of the encrypted queue if needed).
+ *
+ * @param n neighbour to check.
+ */
+static void
+process_plaintext_neighbour_queue (struct Neighbour *n)
+{
+  char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)];    /* plaintext */
+  size_t used;
+  struct EncryptedMessage *em;  /* encrypted message */
+  struct EncryptedMessage *ph;  /* plaintext header */
+  struct MessageEntry *me;
+  unsigned int priority;
+  struct GNUNET_TIME_Absolute deadline;
+  struct GNUNET_TIME_Relative retry_time;
+  struct GNUNET_CRYPTO_AesInitializationVector iv;
+  struct GNUNET_CRYPTO_AuthKey auth_key;
+
+  if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
+    n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  switch (n->status)
+  {
+  case PEER_STATE_DOWN:
+    send_key (n);
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+                GNUNET_i2s (&n->peer));
+#endif
+    return;
+  case PEER_STATE_KEY_SENT:
+    if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
+      n->retry_set_key_task =
+          GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
+                                        &set_key_retry_task, n);
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+                GNUNET_i2s (&n->peer));
+#endif
+    return;
+  case PEER_STATE_KEY_RECEIVED:
+    if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK)
+      n->retry_set_key_task =
+          GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency,
+                                        &set_key_retry_task, n);
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Not yet connected to `%4s', deferring processing of plaintext messages.\n",
+                GNUNET_i2s (&n->peer));
+#endif
+    return;
+  case PEER_STATE_KEY_CONFIRMED:
+    /* ready to continue */
+    break;
+  }
+  discard_expired_messages (n);
+  if (n->messages == NULL)
+  {
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Plaintext message queue for `%4s' is empty.\n",
+                GNUNET_i2s (&n->peer));
+#endif
+    return;                     /* no pending messages */
+  }
+  if (n->encrypted_head != NULL)
+  {
+#if DEBUG_CORE > 2
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n",
+                GNUNET_i2s (&n->peer));
+#endif
+    return;                     /* wait for messages already encrypted to be
+                                 * processed first! */
+  }
+  ph = (struct EncryptedMessage *) pbuf;
+  deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
+  priority = 0;
+  used = sizeof (struct EncryptedMessage);
+  used +=
+      batch_message (n, &pbuf[used],
+                     GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline,
+                     &retry_time, &priority);
+  if (used == sizeof (struct EncryptedMessage))
+  {
+#if DEBUG_CORE > 1
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No messages selected for transmission to `%4s' at this time, will try again later.\n",
+                GNUNET_i2s (&n->peer));
+#endif
+    /* no messages selected for sending, try again later... */
+    n->retry_plaintext_task =
+        GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing,
+                                      n);
+    return;
+  }
+  GSC_KX_encrypt_and_transmit (n->kx,
+                              &pbuf[struct EncryptedMessage],
+                              used - sizeof (struct EncryptedMessage));
+  schedule_peer_messages (n);
+}
+
+
+
+
 /**
  * Check if we have encrypted messages for the specified neighbour
  * pending, and if so, check with the transport about sending them
@@ -664,11 +1105,7 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid)
 #endif
   n = GNUNET_malloc (sizeof (struct Neighbour));
   n->peer = *pid;
-  GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
-  now = GNUNET_TIME_absolute_get ();
-  n->encrypt_key_created = now;
-  n->last_activity = now;
-  n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
+  n->last_activity = GNUNET_TIME_absolute_get ();
   n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
   n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT;
   n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX);
@@ -688,8 +1125,655 @@ create_neighbour (const struct GNUNET_PeerIdentity *pid)
 }
 
 
+
+/**
+ * We have a new client, notify it about all current sessions.
+ *
+ * @param client the new client
+ */
+void
+GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
+{
+  /* notify new client about existing neighbours */
+  GNUNET_CONTAINER_multihashmap_iterate (neighbours,
+                                        &notify_client_about_neighbour, client);
+}
+
+
+/**
+ * Queue a request from a client for transmission to a particular peer.
+ *
+ * @param car request to queue; this handle is then shared between
+ *         the caller (CLIENTS subsystem) and SESSIONS and must not
+ *         be released by either until either 'GNUNET_SESSIONS_dequeue',
+ *         'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed'
+ *         have been invoked on it
+ */
+void
+GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
+{
+  struct Neighbour *n; // FIXME: session...
+
+  n = find_neighbour (&car->peer);
+  if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
+      (n->status != PEER_STATE_KEY_CONFIRMED))
+  {
+    /* neighbour must have disconnected since request was issued,
+     * ignore (client will realize it once it processes the
+     * disconnect notification) */
+#if DEBUG_CORE_CLIENT
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Dropped client request for transmission (am disconnected)\n");
+#endif
+    GNUNET_STATISTICS_update (stats,
+                              gettext_noop
+                              ("# send requests dropped (disconnected)"), 1,
+                              GNUNET_NO);
+    GSC_CLIENTS_reject_requests (car);
+    return;
+  }
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received client transmission request. queueing\n");
+#endif
+    GNUNET_CONTAINER_DLL_insert (n->active_client_request_head,
+                                 n->active_client_request_tail, car);
+
+  // schedule_peer_messages (n);
+}
+
+
+/**
+ * Dequeue a request from a client from transmission to a particular peer.
+ *
+ * @param car request to dequeue; this handle will then be 'owned' by
+ *        the caller (CLIENTS sysbsystem)
+ */
+void
+GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
+{
+  struct Session *s;
+
+  s = find_session (&car->peer);
+  GNUNET_CONTAINER_DLL_remove (s->active_client_request_head,
+                               s->active_client_request_tail, car);
+}
+
+
+
+/**
+ * Transmit a message to a particular peer.
+ *
+ * @param car original request that was queued and then solicited;
+ *            this handle will now be 'owned' by the SESSIONS subsystem
+ * @param msg message to transmit
+ */
+void
+GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
+                      const struct GNUNET_MessageHeader *msg)
+{
+  struct MessageEntry *prev;
+  struct MessageEntry *pos;
+  struct MessageEntry *e;
+  struct MessageEntry *min_prio_entry;
+  struct MessageEntry *min_prio_prev;
+  unsigned int min_prio;
+  unsigned int queue_size;
+
+  n = find_neighbour (&sm->peer);
+  if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
+      (n->status != PEER_STATE_KEY_CONFIRMED))
+  {
+    /* attempt to send message to peer that is not connected anymore
+     * (can happen due to asynchrony) */
+    GNUNET_STATISTICS_update (stats,
+                              gettext_noop
+                              ("# messages discarded (disconnected)"), 1,
+                              GNUNET_NO);
+    if (client != NULL)
+      GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    return;
+  }
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
+              "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer));
+#endif
+  discard_expired_messages (n);
+  /* bound queue size */
+  /* NOTE: this entire block to bound the queue size should be
+   * obsolete with the new client-request code and the
+   * 'schedule_peer_messages' mechanism; we still have this code in
+   * here for now as a sanity check for the new mechanmism;
+   * ultimately, we should probably simply reject SEND messages that
+   * are not 'approved' (or provide a new core API for very unreliable
+   * delivery that always sends with priority 0).  Food for thought. */
+  min_prio = UINT32_MAX;
+  min_prio_entry = NULL;
+  min_prio_prev = NULL;
+  queue_size = 0;
+  prev = NULL;
+  pos = n->messages;
+  while (pos != NULL)
+  {
+    if (pos->priority <= min_prio)
+    {
+      min_prio_entry = pos;
+      min_prio_prev = prev;
+      min_prio = pos->priority;
+    }
+    queue_size++;
+    prev = pos;
+    pos = pos->next;
+  }
+  if (queue_size >= MAX_PEER_QUEUE_SIZE)
+  {
+    /* queue full */
+    if (ntohl (sm->priority) <= min_prio)
+    {
+      /* discard new entry; this should no longer happen! */
+      GNUNET_break (0);
+#if DEBUG_CORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
+                  queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE,
+                  (unsigned int) msize, (unsigned int) ntohs (message->type));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                                gettext_noop ("# discarded CORE_SEND requests"),
+                                1, GNUNET_NO);
+
+      if (client != NULL)
+        GNUNET_SERVER_receive_done (client, GNUNET_OK);
+      return;
+    }
+    GNUNET_assert (min_prio_entry != NULL);
+    /* discard "min_prio_entry" */
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Queue full, discarding existing older request\n");
+#endif
+    GNUNET_STATISTICS_update (stats,
+                              gettext_noop
+                              ("# discarded lower priority CORE_SEND requests"),
+                              1, GNUNET_NO);
+    if (min_prio_prev == NULL)
+      n->messages = min_prio_entry->next;
+    else
+      min_prio_prev->next = min_prio_entry->next;
+    GNUNET_free (min_prio_entry);
+  }
+
+#if DEBUG_CORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Adding transmission request for `%4s' of size %u to queue\n",
+              GNUNET_i2s (&sm->peer), (unsigned int) msize);
+#endif
+  e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
+  e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
+  e->priority = ntohl (sm->priority);
+  e->size = msize;
+  if (GNUNET_YES != (int) ntohl (sm->cork))
+    e->got_slack = GNUNET_YES;
+  memcpy (&e[1], &sm[1], msize);
+
+  /* insert, keep list sorted by deadline */
+  prev = NULL;
+  pos = n->messages;
+  while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value))
+  {
+    prev = pos;
+    pos = pos->next;
+  }
+  if (prev == NULL)
+    n->messages = e;
+  else
+    prev->next = e;
+  e->next = pos;
+
+  /* consider scheduling now */
+  process_plaintext_neighbour_queue (n);
+
+}
+
+
+
+/**
+ * Send a message to the neighbour.
+ *
+ * @param cls the message
+ * @param key neighbour's identity
+ * @param value 'struct Neighbour' of the target
+ * @return always GNUNET_OK
+ */
+static int
+do_send_message (void *cls, const GNUNET_HashCode * key, void *value)
+{
+  struct GNUNET_MessageHeader *hdr = cls;
+  struct Neighbour *n = value;
+  struct MessageEntry *m;
+  uint16_t size;
+
+  size = ntohs (hdr->size);
+  m = GNUNET_malloc (sizeof (struct MessageEntry) + size);
+  memcpy (&m[1], hdr, size);
+  m->deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
+  m->slack_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
+  m->priority = UINT_MAX;
+  m->sender_status = n->status;
+  m->size = size;
+  GNUNET_CONTAINER_DLL_insert (n->message_head,
+                              n->message_tail,
+                              m);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Broadcast a message to all neighbours.
+ *
+ * @param msg message to transmit
+ */
+void
+GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg)
+{
+  if (NULL == sessions)
+    return;
+  GNUNET_CONTAINER_multihashmap_iterate (sessions,
+                                         &do_send_message, msg);
+}
+
+
+/**
+ * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
+ *
+ * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
+ * @param key identity of the connected peer
+ * @param value the 'struct Neighbour' for the peer
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value)
+{
+  struct GNUNET_SERVER_TransmitContext *tc = cls;
+  struct Neighbour *n = value;
+  char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
+  struct GNUNET_TRANSPORT_ATS_Information *ats;
+  size_t size;
+  struct ConnectNotifyMessage *cnm;
+
+  cnm = (struct ConnectNotifyMessage *) buf;
+  if (n->status != PEER_STATE_KEY_CONFIRMED)
+    return GNUNET_OK;
+  size =
+      sizeof (struct ConnectNotifyMessage) +
+      (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+  if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    /* recovery strategy: throw away performance data */
+    GNUNET_array_grow (n->ats, n->ats_count, 0);
+    size =
+        sizeof (struct PeerStatusNotifyMessage) +
+        n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+  }
+  cnm = (struct ConnectNotifyMessage *) buf;
+  cnm->header.size = htons (size);
+  cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+  cnm->ats_count = htonl (n->ats_count);
+  ats = &cnm->ats;
+  memcpy (ats, n->ats,
+          n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
+  ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+  ats[n->ats_count].value = htonl (0);
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
+              "NOTIFY_CONNECT");
+#endif
+  cnm->peer = n->peer;
+  GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header);
+  return GNUNET_OK;
+}
+
+
+/**
+ * End the session with the given peer (we are no longer
+ * connected). 
+ *
+ * @param pid identity of peer to kill session with
+ */
+void
+GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
+{
+}
+
+
+/**
+ * Traffic is being solicited for the given peer.  This means that the
+ * message queue on the transport-level (NEIGHBOURS subsystem) is now
+ * empty and it is now OK to transmit another (non-control) message.
+ *
+ * @param pid identity of peer ready to receive data
+ */
+void
+GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
+{
+}
+
+
+/**
+ * Transmit a message to a particular peer.
+ *
+ * @param car original request that was queued and then solicited,
+ *            ownership does not change (dequeue will be called soon).
+ * @param msg message to transmit
+ */
+void
+GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
+                      const struct GNUNET_MessageHeader *msg)
+{
+}
+
+
+/**
+ * We have a new client, notify it about all current sessions.
+ *
+ * @param client the new client
+ */
+void
+GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
+{
+}
+
+
+/**
+ * Handle CORE_ITERATE_PEERS request. For this request type, the client
+ * does not have to have transmitted an INIT request.  All current peers
+ * are returned, regardless of which message types they accept. 
+ *
+ * @param cls unused
+ * @param client client sending the iteration request
+ * @param message iteration request message
+ */
+void
+GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client,
+                                         const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_MessageHeader done_msg;
+  struct GNUNET_SERVER_TransmitContext *tc;
+
+  tc = GNUNET_SERVER_transmit_context_create (client);
+  GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message,
+                                        tc);
+  done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
+  GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
+  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Handle CORE_PEER_CONNECTED request.   Notify client about connection
+ * to the given neighbour.  For this request type, the client does not
+ * have to have transmitted an INIT request.  All current peers are
+ * returned, regardless of which message types they accept.
+ *
+ * @param cls unused
+ * @param client client sending the iteration request
+ * @param message iteration request message
+ */
+void
+GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client,
+                                     const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_MessageHeader done_msg;
+  struct GNUNET_SERVER_TransmitContext *tc;
+  const struct GNUNET_PeerIdentity *peer;
+
+  peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK!
+  tc = GNUNET_SERVER_transmit_context_create (client);
+  GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey,
+                                              &queue_connect_message, tc);
+  done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
+  done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
+  GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
+  GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+
+/**
+ * Handle REQUEST_INFO request. For this request type, the client must
+ * have transmitted an INIT first.
+ *
+ * @param cls unused
+ * @param client client sending the request
+ * @param message iteration request message
+ */
+void
+GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client,
+                                        const struct GNUNET_MessageHeader *message)
+{
+  const struct RequestInfoMessage *rcm;
+  struct GSC_Client *pos;
+  struct Neighbour *n;
+  struct ConfigurationInfoMessage cim;
+  int32_t want_reserv;
+  int32_t got_reserv;
+  unsigned long long old_preference;
+  struct GNUNET_TIME_Relative rdelay;
+
+  rdelay = GNUNET_TIME_relative_get_zero ();
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n",
+              "REQUEST_INFO");
+#endif
+  rcm = (const struct RequestInfoMessage *) message;
+  n = find_neighbour (&rcm->peer);
+  memset (&cim, 0, sizeof (cim));
+  if ((n != NULL) && (GNUNET_YES == n->is_connected))
+  {
+    want_reserv = ntohl (rcm->reserve_inbound);
+    if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
+    {
+      n->bw_out_internal_limit = rcm->limit_outbound;
+      if (n->bw_out.value__ !=
+          GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
+                                      n->bw_out_external_limit).value__)
+      {
+        n->bw_out =
+            GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
+                                        n->bw_out_external_limit);
+        GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
+                                               n->bw_out);
+        GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
+        handle_peer_status_change (n);
+      }
+    }
+    if (want_reserv < 0)
+    {
+      got_reserv = want_reserv;
+    }
+    else if (want_reserv > 0)
+    {
+      rdelay =
+          GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
+                                              want_reserv);
+      if (rdelay.rel_value == 0)
+        got_reserv = want_reserv;
+      else
+        got_reserv = 0;         /* all or nothing */
+    }
+    else
+      got_reserv = 0;
+    GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv);
+    old_preference = n->current_preference;
+    n->current_preference += GNUNET_ntohll (rcm->preference_change);
+    if (old_preference > n->current_preference)
+    {
+      /* overflow; cap at maximum value */
+      n->current_preference = ULLONG_MAX;
+    }
+    update_preference_sum (n->current_preference - old_preference);
+#if DEBUG_CORE_QUOTA
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n",
+                (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv,
+                (unsigned long long) rdelay.rel_value);
+#endif
+    cim.reserved_amount = htonl (got_reserv);
+    cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay);
+    cim.bw_out = n->bw_out;
+    cim.preference = n->current_preference;
+  }
+  else
+  {
+    /* Technically, this COULD happen (due to asynchronous behavior),
+     * but it should be rare, so we should generate an info event
+     * to help diagnosis of serious errors that might be masked by this */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                _
+                ("Client asked for preference change with peer `%s', which is not connected!\n"),
+                GNUNET_i2s (&rcm->peer));
+    GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    return;
+  }
+  cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
+  cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
+  cim.peer = rcm->peer;
+  cim.rim_id = rcm->rim_id;
+#if DEBUG_CORE_CLIENT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
+              "CONFIGURATION_INFO");
+#endif
+  GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Create a session, a key exchange was just completed.
+ *
+ * @param peer peer that is now connected
+ * @param kx key exchange that completed
+ */
+void
+GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
+                    struct GSC_KeyExchangeInfo *kx)
+{
+    {
+      struct GNUNET_MessageHeader *hdr;
+
+      hdr = compute_type_map_message ();
+      send_type_map_to_neighbour (hdr, &n->peer.hashPubKey, n);
+      GNUNET_free (hdr);
+    }
+    if (n->bw_out_external_limit.value__ != t.inbound_bw_limit.value__)
+    {
+      n->bw_out_external_limit = t.inbound_bw_limit;
+      n->bw_out =
+          GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+                                      n->bw_out_internal_limit);
+      GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+                                             n->bw_out);
+      GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
+    }
+#if DEBUG_CORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Confirmed key via `%s' message for peer `%4s'\n", "PONG",
+                GNUNET_i2s (&n->peer));
+#endif
+
+
+    size =
+        sizeof (struct ConnectNotifyMessage) +
+        (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+    if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      /* recovery strategy: throw away performance data */
+      GNUNET_array_grow (n->ats, n->ats_count, 0);
+      size =
+          sizeof (struct PeerStatusNotifyMessage) +
+          n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
+    }
+    cnm = (struct ConnectNotifyMessage *) buf;
+    cnm->header.size = htons (size);
+    cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
+    cnm->ats_count = htonl (n->ats_count);
+    cnm->peer = n->peer;
+    mats = &cnm->ats;
+    memcpy (mats, n->ats,
+            n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
+    mats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+    mats[n->ats_count].value = htonl (0);
+    send_to_all_clients (&cnm->header, GNUNET_NO,
+                         GNUNET_CORE_OPTION_SEND_CONNECT);
+    process_encrypted_neighbour_queue (n);
+    n->last_activity = GNUNET_TIME_absolute_get ();
+
+  if (n->status == PEER_STATE_KEY_CONFIRMED)
+  {
+    now = GNUNET_TIME_absolute_get ();
+    n->last_activity = now;
+    changed = GNUNET_YES;
+    if (!up)
+    {
+      GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
+                                1, GNUNET_NO);
+      n->time_established = now;
+    }
+    if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
+      GNUNET_SCHEDULER_cancel (n->keep_alive_task);
+    n->keep_alive_task =
+        GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide
+                                      (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                       2), &send_keep_alive, n);
+  }
+
+
+}
+
+
+/**
+ * Update information about a session.
+ *
+ * @param peer peer who's session should be updated
+ * @param bw_out new outbound bandwidth limit for the peer
+ * @param atsi performance information
+ * @param atsi_count number of performance records supplied
+ */
+void
+GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer,
+                    struct GNUNET_BANDWIDTH_Value32NBO bw_out,
+                    const struct GNUNET_TRANSPORT_ATS_Information *atsi,
+                    uint32_t atsi_count)
+{
+  if (bw_out_external_limit.value__ != pt->inbound_bw_limit.value__)
+  {
+#if DEBUG_CORE_SET_QUOTA
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received %u b/s as new inbound limit for peer `%4s'\n",
+                (unsigned int) ntohl (pt->inbound_bw_limit.value__),
+                GNUNET_i2s (&n->peer));
+#endif
+    n->bw_out_external_limit = pt->inbound_bw_limit;
+    n->bw_out =
+        GNUNET_BANDWIDTH_value_min (n->bw_out_external_limit,
+                                    n->bw_out_internal_limit);
+    GNUNET_BANDWIDTH_tracker_update_quota (&n->available_send_window,
+                                           n->bw_out);
+    GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
+  }
+
+}
+
+
+/**
+ * Initialize sessions subsystem.
+ */
 int
-GSC_NEIGHBOURS_init ()
+GSC_SESSIONS_init ()
 {
   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
   self.public_key = &my_public_key;
@@ -701,8 +1785,11 @@ GSC_NEIGHBOURS_init ()
 }
 
 
+/**
+ * Shutdown sessions subsystem.
+ */
 void
-GSC_NEIGHBOURS_done ()
+GSC_SESSIONS_done ()
 {
   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
                                          NULL);