- delete unused message type
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
index 4b69c5327e5799744b9b97d346ac9fc004d5a573..ffd9786d33522098acf1f6efa80995b28e8a6a8a 100644 (file)
@@ -1,10 +1,10 @@
 /*
       This file is part of GNUnet
 /*
       This file is part of GNUnet
-      (C) 2012 Christian Grothoff (and other contributing authors)
+      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
+      by the Free Software Foundation; either version 3, or (at your
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
  */
 
 #include "platform.h"
  */
 
 #include "platform.h"
-#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
-#include "gnunet_util_lib.h"
+#include "gnunet_set_service.h"
 #include "gnunet_consensus_service.h"
 #include "gnunet_consensus_service.h"
-#include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
 #include "consensus_protocol.h"
 #include "consensus_protocol.h"
-#include "ibf.h"
 #include "consensus.h"
 
 
 /**
 #include "consensus.h"
 
 
 /**
- * Number of IBFs in a strata estimator.
- */
-#define STRATA_COUNT 32
-/**
- * Number of buckets per IBF.
- */
-#define STRATA_IBF_BUCKETS 80
-/**
- * hash num parameter for the difference digests and strata estimators
- */
-#define STRATA_HASH_NUM 3
-
-/**
- * Number of buckets that can be transmitted in one message.
+ * Log macro that prefixes the local peer and the peer we are in contact with.
  */
  */
-#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
+#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
+   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
 
 
-/**
- * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
- * Choose this value so that computing the IBF is still cheaper
- * than transmitting all values.
- */
-#define MAX_IBF_ORDER (16)
 
 /**
 
 /**
- * Number exp-rounds.
+ * Number of exponential rounds, used in the exp and completion round.
  */
  */
-#define NUM_EXP_ROUNDS (4)
-
+#define NUM_EXP_ROUNDS 4
 
 /* forward declarations */
 
 
 /* forward declarations */
 
-struct ConsensusSession;
-struct IncomingSocket;
+/* mutual recursion with struct ConsensusSession */
 struct ConsensusPeerInformation;
 
 struct ConsensusPeerInformation;
 
-static void
-client_send_next (struct ConsensusSession *session);
-
-static int
-get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
-
-static void 
-round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-static void
-send_ibf (struct ConsensusPeerInformation *cpi);
-
-static void
-send_strata_estimator (struct ConsensusPeerInformation *cpi);
-
-static void
-decode (struct ConsensusPeerInformation *cpi);
-
-static void 
-write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size);
-
+/* mutual recursion with round_over */
 static void
 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 static void
 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
-/**
- * An element that is waiting to be transmitted to the client.
- */
-struct PendingElement
-{
-  /**
-   * Pending elements are kept in a DLL.
-   */
-  struct PendingElement *next;
-
-  /**
-   * Pending elements are kept in a DLL.
-   */
-  struct PendingElement *prev;
-
-  /**
-   * The actual element
-   */
-  struct GNUNET_CONSENSUS_Element *element;
-
-  /* peer this element is coming from */
-  struct ConsensusPeerInformation *cpi;
-};
-
-
-struct ElementList
-{
-  struct ElementList *next;
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_HashCode *element_hash;
-};
-
-
 /**
  * Describes the current round a consensus session is in.
  */
 /**
  * Describes the current round a consensus session is in.
  */
@@ -146,179 +70,43 @@ enum ConsensusRound
    */
   CONSENSUS_ROUND_EXCHANGE,
   /**
    */
   CONSENSUS_ROUND_EXCHANGE,
   /**
-   * Exchange which elements each peer has, but not the elements.
+   * Exchange which elements each peer has, but don't
+   * transmit the element's data, only their SHA-512 hashes.
+   * This round uses the all-to-all scheme.
    */
   CONSENSUS_ROUND_INVENTORY,
   /**
    */
   CONSENSUS_ROUND_INVENTORY,
   /**
-   * Collect and distribute missing values.
+   * Collect and distribute missing values with the exponential scheme.
    */
    */
-  CONSENSUS_ROUND_STOCK,
+  CONSENSUS_ROUND_COMPLETION,
   /**
   /**
-   * Consensus concluded.
+   * Consensus concluded. After timeout and finished communication with client,
+   * consensus session will be destroyed.
    */
   CONSENSUS_ROUND_FINISH
 };
 
 
 /**
    */
   CONSENSUS_ROUND_FINISH
 };
 
 
 /**
- * Information about a peer that is in a consensus session.
+ * Complete information about the current round and all
+ * subrounds.
  */
  */
-struct ConsensusPeerInformation
+struct RoundInfo
 {
 {
-  struct GNUNET_PeerIdentity peer_id;
-
-  /**
-   * Socket for communicating with the peer, either created by the local peer,
-   * or the remote peer.
-   */
-  struct GNUNET_STREAM_Socket *socket;
-
-  /**
-   * Message tokenizer, for the data received from this peer via the stream socket.
-   */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-
-  /**
-   * Do we connect to the peer, or does the peer connect to us?
-   * Only valid for all-to-all phases
-   */
-  int is_outgoing;
-
-  /**
-   * Did we receive/send a consensus hello?
-   */
-  int hello;
-
-  /**
-   * Handle for currently active read
-   */
-  struct GNUNET_STREAM_ReadHandle *rh;
-
-  /**
-   * Handle for currently active read
-   */
-  struct GNUNET_STREAM_WriteHandle *wh;
-
-  enum {
-    /* beginning of round */
-    IBF_STATE_NONE=0,
-    /* we currently receive an ibf */
-    IBF_STATE_RECEIVING,
-    /* we currently transmit an ibf */
-    IBF_STATE_TRANSMITTING,
-    /* we decode a received ibf */
-    IBF_STATE_DECODING,
-    /* wait for elements and element requests */
-    IBF_STATE_ANTICIPATE_DIFF
-  } ibf_state ;
-
-  /**
-   * What is the order (=log2 size) of the ibf
-   * we're currently dealing with?
-   * Interpretation depends on ibf_state.
-   */
-  int ibf_order;
-
-  /**
-   * The current IBF for this peer,
-   * purpose dependent on ibf_state
-   */
-  struct InvertibleBloomFilter *ibf;
-
-  /**
-   * How many buckets have we transmitted/received? 
-   * Interpretatin depends on ibf_state
-   */
-  int ibf_bucket_counter;
-
-  /**
-   * Strata estimator of the peer, NULL if our peer
-   * initiated the reconciliation.
-   */
-  struct StrataEstimator *se;
-
-  /**
-   * Element keys that this peer misses, but we have them.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *requested_keys;
-
-  /**
-   * Element keys that this peer has, but we miss.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *reported_keys;
-
-  /**
-   * Back-reference to the consensus session,
-   * to that ConsensusPeerInformation can be used as a closure
-   */
-  struct ConsensusSession *session;
-
-  /**
-   * Messages queued for the current round.
-   */
-  struct QueuedMessage *messages_head;
-
-  /**
-   * Messages queued for the current round.
-   */
-  struct QueuedMessage *messages_tail;
-
   /**
   /**
-   * True if we are actually replaying the strata message,
-   * e.g. currently handling the premature_strata_message.
+   * The current main round.
    */
    */
-  int replaying_strata_message;
-
-  /**
-   * A strata message that is not actually for the current round,
-   * used in the exp-scheme.
-   */
-  struct StrataMessage *premature_strata_message;
-
-  /**
-   * We have finishes the exp-subround with the peer.
-   */
-  int exp_subround_finished;
-
-  int inventory_synced;
-
-  /**
-   * Round this peer seems to be in, according to the last SE we got.
-   * Necessary to store this, as we sometimes need to respond to a request from an
-   * older round, while we are already in the next round.
-   */
-  enum ConsensusRound apparent_round;
-
-};
-
-typedef void (*QueuedMessageCallback) (void *msg);
-
-/**
- * A doubly linked list of messages.
- */
-struct QueuedMessage
-{
-  struct GNUNET_MessageHeader *msg;
-
+  enum ConsensusRound round;
   /**
   /**
-   * Queued messages are stored in a doubly linked list.
+   * The current exp round, valid if
+   * the main round is an exp round.
    */
    */
-  struct QueuedMessage *next;
-
+  uint32_t exp_round;
   /**
   /**
-   * Queued messages are stored in a doubly linked list.
+   * The current exp subround, valid if
+   * the main round is an exp round.
    */
    */
-  struct QueuedMessage *prev;
-
-  QueuedMessageCallback cb;
-  
-  void *cls;
-};
-
-
-struct StrataEstimator
-{
-  struct InvertibleBloomFilter **strata;
+  uint32_t exp_subround;
 };
 
 
 };
 
 
@@ -337,13 +125,6 @@ struct ConsensusSession
    */
   struct ConsensusSession *prev;
 
    */
   struct ConsensusSession *prev;
 
-  /**
-   * Join message. Used to initialize the session later,
-   * if the identity of the local peer is not yet known.
-   * NULL if the session has been fully initialized.
-   */
-  struct GNUNET_CONSENSUS_JoinMessage *join_msg;
-
   /**
   * Global consensus identification, computed
   * from the session id and participating authorities.
   /**
   * Global consensus identification, computed
   * from the session id and participating authorities.
@@ -351,57 +132,34 @@ struct ConsensusSession
   struct GNUNET_HashCode global_id;
 
   /**
   struct GNUNET_HashCode global_id;
 
   /**
-   * Local client in this consensus session.
-   * There is only one client per consensus session.
+   * Client that inhabits the session
    */
   struct GNUNET_SERVER_Client *client;
 
   /**
    */
   struct GNUNET_SERVER_Client *client;
 
   /**
-   * Elements in the consensus set of this session,
-   * all of them either have been sent by or approved by the client.
-   * Contains ElementList.
-   * Used as a unique-key hashmap.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *values;
-
-  /**
-   * Elements that have not been approved (or rejected) by the client yet.
+   * Queued messages to the client.
    */
    */
-  struct PendingElement *client_approval_head;
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
 
   /**
-   * Elements that have not been approved (or rejected) by the client yet.
+   * Time when the conclusion of the consensus should begin.
    */
    */
-  struct PendingElement *client_approval_tail;
-
-  /**
-   * Messages to be sent to the local client that owns this session
-   */
-  struct QueuedMessage *client_messages_head;
-
-  /**
-   * Messages to be sent to the local client that owns this session
-   */
-  struct QueuedMessage *client_messages_tail;
-
-  /**
-   * Currently active transmit handle for sending to the client
-   */
-  struct GNUNET_SERVER_TransmitHandle *client_th;
+  struct GNUNET_TIME_Absolute conclude_start;
 
   /**
    * Timeout for all rounds together, single rounds will schedule a timeout task
    * with a fraction of the conclude timeout.
 
   /**
    * Timeout for all rounds together, single rounds will schedule a timeout task
    * with a fraction of the conclude timeout.
+   * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
    */
    */
-  struct GNUNET_TIME_Relative conclude_timeout;
-  
+  struct GNUNET_TIME_Absolute conclude_deadline;
+
   /**
   /**
-   * Timeout task identifier for the current round
+   * Timeout task identifier for the current round.
    */
   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 
   /**
    */
   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 
   /**
-   * Number of other peers in the consensus
+   * Number of other peers in the consensus.
    */
   unsigned int num_peers;
 
    */
   unsigned int num_peers;
 
@@ -414,105 +172,101 @@ struct ConsensusSession
   /**
    * Index of the local peer in the peers array
    */
   /**
    * Index of the local peer in the peers array
    */
-  int local_peer_idx;
+  unsigned int local_peer_idx;
 
   /**
 
   /**
-   * Strata estimator, computed online
+   * Current round
    */
    */
-  struct StrataEstimator *se;
+  enum ConsensusRound current_round;
 
   /**
 
   /**
-   * Pre-computed IBFs
+   * Permutation of peers for the current round,
    */
    */
-  struct InvertibleBloomFilter **ibfs;
+  uint32_t *shuffle;
 
   /**
 
   /**
-   * Current round
+   * Inverse permutation of peers for the current round,
    */
    */
-  enum ConsensusRound current_round;
-
-  int exp_round;
+  uint32_t *shuffle_inv;
 
 
-  int exp_subround;
+  /**
+   * Current round of the exponential scheme.
+   */
+  uint32_t exp_round;
 
   /**
 
   /**
-   * Permutation of peers for the current round,
-   * maps logical index (for current round) to physical index (location in info array)
+   * Current sub-round of the exponential scheme.
    */
    */
-  int *shuffle;
+  uint32_t exp_subround;
 
   /**
    * The partner for the current exp-round
    */
 
   /**
    * The partner for the current exp-round
    */
-  struct ConsensusPeerInformationpartner_outgoing;
+  struct ConsensusPeerInformation *partner_outgoing;
 
   /**
    * The partner for the current exp-round
    */
 
   /**
    * The partner for the current exp-round
    */
-  struct ConsensusPeerInformation* partner_incoming;
-};
+  struct ConsensusPeerInformation *partner_incoming;
 
 
-
-/**
- * Sockets from other peers who want to communicate with us.
- * It may not be known yet which consensus session they belong to.
- * Also, the session might not exist yet locally.
- */
-struct IncomingSocket
-{
   /**
   /**
-   * Incoming sockets are kept in a double linked list.
+   * The consensus set of this session.
    */
    */
-  struct IncomingSocket *next;
+  struct GNUNET_SET_Handle *element_set;
 
   /**
 
   /**
-   * Incoming sockets are kept in a double linked list.
+   * Listener for requests from other peers.
+   * Uses the session's global id as app id.
    */
    */
-  struct IncomingSocket *prev;
+  struct GNUNET_SET_ListenHandle *set_listener;
+};
+
 
 
+/**
+ * Information about a peer that is in a consensus session.
+ */
+struct ConsensusPeerInformation
+{
   /**
   /**
-   * The actual socket.
+   * Peer identitty of the peer in the consensus session
    */
    */
-  struct GNUNET_STREAM_Socket *socket;
+  struct GNUNET_PeerIdentity peer_id;
 
   /**
 
   /**
-   * Handle for currently active read
+   * Back-reference to the consensus session,
+   * to that ConsensusPeerInformation can be used as a closure
    */
    */
-  struct GNUNET_STREAM_ReadHandle *rh;
+  struct ConsensusSession *session;
 
   /**
 
   /**
-   * Peer that connected to us with the socket.
+   * We have finishes the exp-subround with the peer.
    */
    */
-  struct GNUNET_PeerIdentity peer_id;
+  int exp_subround_finished;
 
   /**
 
   /**
-   * Message stream tokenizer for this socket.
+   * Set operation we are currently executing with this peer.
    */
    */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  struct GNUNET_SET_OperationHandle *set_op;
 
   /**
 
   /**
-   * Peer-in-session this socket belongs to, once known, otherwise NULL.
+   * Set operation we are planning on executing with this peer.
    */
    */
-  struct ConsensusPeerInformation *cpi;
+  struct GNUNET_SET_OperationHandle *delayed_set_op;
 
   /**
 
   /**
-   * Set to the global session id, if the peer sent us a hello-message,
-   * but the session does not exist yet.
+   * Info about the round of the delayed set operation.
    */
    */
-  struct GNUNET_HashCode *requested_gid;
+  struct RoundInfo delayed_round_info;
 };
 
 
 };
 
 
-static struct IncomingSocket *incoming_sockets_head;
-static struct IncomingSocket *incoming_sockets_tail;
-
 /**
 /**
- * Linked list of sesstions this peer participates in.
+ * Linked list of sessions this peer participates in.
  */
 static struct ConsensusSession *sessions_head;
 
 /**
  */
 static struct ConsensusSession *sessions_head;
 
 /**
- * Linked list of sesstions this peer participates in.
+ * Linked list of sessions this peer participates in.
  */
 static struct ConsensusSession *sessions_tail;
 
  */
 static struct ConsensusSession *sessions_tail;
 
@@ -529,1741 +283,375 @@ static struct GNUNET_SERVER_Handle *srv;
 /**
  * Peer that runs this service.
  */
 /**
  * Peer that runs this service.
  */
-static struct GNUNET_PeerIdentity *my_peer;
+static struct GNUNET_PeerIdentity my_peer;
 
 
-/**
- * Handle to the core service. Only used during service startup, will be NULL after that.
- */
-static struct GNUNET_CORE_Handle *core;
-
-/**
- * Listener for sockets from peers that want to reconcile with us.
- */
-static struct GNUNET_STREAM_ListenSocket *listener;
-
-
-/**
- * Queue a message to be sent to the inhabiting client of a session.
- *
- * @param session session
- * @param msg message we want to queue
- */
-static void
-queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
-{
-  struct QueuedMessage *qm;
-  qm = GNUNET_malloc (sizeof *qm);
-  qm->msg = msg;
-  GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
-}
 
 
-/**
- * Queue a message to be sent to another peer
- *
- * @param cpi peer
- * @param msg message we want to queue
- * @param cb callback, called when the message is given to strem
- * @param cls closure for cb
- */
-static void
-queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
+static int
+have_exp_subround_finished (const struct ConsensusSession *session)
 {
 {
-  struct QueuedMessage *qm;
-  qm = GNUNET_malloc (sizeof *qm);
-  qm->msg = msg;
-  qm->cls = cls;
-  qm->cb = cb;
-  GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm);
-  if (cpi->wh == NULL)
-    write_queued (cpi, GNUNET_STREAM_OK, 0);
+  int not_finished;
+  not_finished = 0;
+  if ( (NULL != session->partner_outgoing) &&
+       (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
+    not_finished++;
+  if ( (NULL != session->partner_incoming) &&
+       (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
+    not_finished++;
+  if (0 == not_finished)
+    return GNUNET_YES;
+  return GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * Queue a message to be sent to another peer
+ * Destroy a session, free all resources associated with it.
  *
  *
- * @param cpi peer
- * @param msg message we want to queue
+ * @param session the session to destroy
  */
 static void
  */
 static void
-queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg)
-{
-  queue_peer_message_with_cls (cpi, msg, NULL, NULL);
-}
-
-
-/*
-static void
-clear_peer_messages (struct ConsensusPeerInformation *cpi)
-{
-  cpi->messages_head = NULL;
-  cpi->messages_tail = NULL;
-}
-*/
-
-
-/**
- * Estimate set difference with two strata estimators,
- * i.e. arrays of IBFs.
- * Does not not modify its arguments.
- *
- * @param se1 first strata estimator
- * @param se2 second strata estimator
- * @return the estimated difference
- */
-static int
-estimate_difference (const struct StrataEstimator *se1,
-                     const struct StrataEstimator *se2)
+destroy_session (struct ConsensusSession *session)
 {
   int i;
 {
   int i;
-  int count;
-  count = 0;
-  for (i = STRATA_COUNT - 1; i >= 0; i--)
+
+  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+  if (NULL != session->element_set)
+  {
+    GNUNET_SET_destroy (session->element_set);
+    session->element_set = NULL;
+  }
+  if (NULL != session->set_listener)
+  {
+    GNUNET_SET_listen_cancel (session->set_listener);
+    session->set_listener = NULL;
+  }
+  if (NULL != session->client_mq)
+  {
+    GNUNET_MQ_destroy (session->client_mq);
+    session->client_mq = NULL;
+  }
+  if (NULL != session->client)
+  {
+    GNUNET_SERVER_client_disconnect (session->client);
+    session->client = NULL;
+  }
+  if (NULL != session->shuffle)
+  {
+    GNUNET_free (session->shuffle);
+    session->shuffle = NULL;
+  }
+  if (NULL != session->shuffle_inv)
   {
   {
-    struct InvertibleBloomFilter *diff;
-    /* number of keys decoded from the ibf */
-    int ibf_count;
-    int more;
-    ibf_count = 0;
-    /* FIXME: implement this without always allocating new IBFs */
-    diff = ibf_dup (se1->strata[i]);
-    ibf_subtract (diff, se2->strata[i]);
-    for (;;)
+    GNUNET_free (session->shuffle_inv);
+    session->shuffle_inv = NULL;
+  }
+  if (NULL != session->info)
+  {
+    for (i = 0; i < session->num_peers; i++)
     {
     {
-      more = ibf_decode (diff, NULL, NULL);
-      if (GNUNET_NO == more)
-      {
-        count += ibf_count;
-        break;
-      }
-      if (GNUNET_SYSERR == more)
+      struct ConsensusPeerInformation *cpi;
+      cpi = &session->info[i];
+      if (NULL != cpi->set_op)
       {
       {
-        ibf_destroy (diff);
-        return count * (1 << (i + 1));
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
       }
       }
-      ibf_count++;
     }
     }
-    ibf_destroy (diff);
+    GNUNET_free (session->info);
+    session->info = NULL;
   }
   }
-  return count;
+  GNUNET_free (session);
 }
 
 
 /**
 }
 
 
 /**
- * Called when receiving data from a peer that is member of
- * an inhabited consensus session.
+ * Iterator for set elements.
  *
  *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout 
- * @return number of bytes of processed from 'data' (any data remaining should be
- *         given to the next time the read processor is called).
+ * @param cls closure
+ * @param element the current element, NULL if all elements have been
+ *        iterated over
+ * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
  */
  */
-static size_t
-session_stream_data_processor (void *cls,
-                       enum GNUNET_STREAM_Status status,
-                       const void *data,
-                       size_t size)
+static int
+send_to_client_iter (void *cls,
+                     const struct GNUNET_SET_Element *element)
 {
 {
-  struct ConsensusPeerInformation *cpi;
-  int ret;
+  struct ConsensusSession *session = cls;
+  struct GNUNET_MQ_Envelope *ev;
+
+  if (NULL != element)
+  {
+    struct GNUNET_CONSENSUS_ElementMessage *m;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
+                session->local_peer_idx);
 
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  cpi = cls;
-  GNUNET_assert (NULL != cpi->mst);
-  ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
-  if (GNUNET_SYSERR == ret)
+    ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
+    m->element_type = htons (element->type);
+    memcpy (&m[1], element->data, element->size);
+    GNUNET_MQ_send (session->client_mq, ev);
+  }
+  else
   {
   {
-    /* FIXME: handle this correctly */
-    GNUNET_assert (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
+                session->local_peer_idx);
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
+    GNUNET_MQ_send (session->client_mq, ev);
   }
   }
-  /* read again */
-  cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                &session_stream_data_processor, cpi);
-  /* we always read all data */
-  return size;
+  return GNUNET_YES;
 }
 
 }
 
+
 /**
 /**
- * Called when we receive data from a peer that is not member of
- * a session yet, or the session is not yet inhabited.
+ * Start the next round.
+ * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
  *
  *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout 
- * @return number of bytes of processed from 'data' (any data remaining should be
- *         given to the next time the read processor is called).
+ * @param cls the session
+ * @param tc task context, for when this task is invoked by the scheduler,
+ *           NULL if invoked for another reason
  */
  */
-static size_t
-incoming_stream_data_processor (void *cls,
-                       enum GNUNET_STREAM_Status status,
-                       const void *data,
-                       size_t size)
+static void
+round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
-  struct IncomingSocket *incoming;
-  int ret;
+  struct ConsensusSession *session;
 
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  incoming = cls;
-  ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
-  if (GNUNET_SYSERR == ret)
-  {
-    /* FIXME: handle this correctly */
-    GNUNET_assert (0);
-  }
-  /* read again */
-  incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &incoming_stream_data_processor, incoming);
-  /* we always read all data */
-  return size;
-}
-
-
-static void
-send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head)
-{
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_MessageHeader *element_msg;
-  size_t msize;
-
-  while (NULL != head)
-  {
-    element = head->element;
-    msize = sizeof (struct GNUNET_MessageHeader) + element->size;
-    element_msg = GNUNET_malloc (msize);
-    element_msg->size = htons (msize);
-    switch (cpi->apparent_round)
-    {
-      case CONSENSUS_ROUND_STOCK:
-      case CONSENSUS_ROUND_EXCHANGE:
-        element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
-        break;
-      case CONSENSUS_ROUND_INVENTORY:
-        element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
-        break;
-      default:
-        GNUNET_break (0);
-    }
-    GNUNET_assert (NULL != element->data);
-    memcpy (&element_msg[1], element->data, element->size);
-    queue_peer_message (cpi, element_msg);
-    head = head->next;
-  }
-}
-
-/**
- * Iterator to insert values into an ibf.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-ibf_values_iterator (void *cls,
-                     const struct GNUNET_HashCode *key,
-                     void *value)
-{
-  struct ConsensusPeerInformation *cpi;
-  struct ElementList *head;
-  struct IBF_Key ibf_key;
-  cpi = cls;
-  head = value;
-  ibf_key = ibf_key_from_hashcode (head->element_hash);
-  GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
-  ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
-  return GNUNET_YES;
-}
-
-/**
- * Create and populate an IBF for the specified peer,
- * if it does not already exist.
- *
- * @param cpi peer to create the ibf for
- */
-static void
-prepare_ibf (struct ConsensusPeerInformation *cpi)
-{
-  if (NULL == cpi->session->ibfs[cpi->ibf_order])
-  {
-    cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
-    GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
-  }
-}
-
-
-/**
- * Called when a remote peer wants to inform the local peer
- * that the remote peer misses elements.
- * Elements are not reconciled.
- *
- * @param cpi session
- * @param msg message
- */
-static int
-handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  GNUNET_assert (0);
-}
-
-
-static int
-exp_subround_finished (const struct ConsensusSession *session)
-{
-  int not_finished;
-  not_finished = 0;
-  if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO))
-      not_finished++;
-  if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO))
-      not_finished++;
-  if (0 == not_finished)
-    return GNUNET_YES;
-  return GNUNET_NO;
-}
-
-static int
-inventory_round_finished (struct ConsensusSession *session)
-{
-  int i;
-  int finished;
-  finished = 0;
-  for (i = 0; i < session->num_peers; i++)
-    if (GNUNET_YES == session->info[i].inventory_synced)
-      finished++;
-  if (finished >= (session->num_peers / 2))
-    return GNUNET_YES;
-  return GNUNET_NO;
-}
-
-
-
-static void
-fin_sent_cb (void *cls)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_STOCK:
-      if (cpi->session->current_round != cpi->apparent_round)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
-        break;
-      }
-      cpi->exp_subround_finished = GNUNET_YES;
-      /* the subround is only really over if *both* partners are done */
-      if (GNUNET_YES == exp_subround_finished (cpi->session))
-        subround_over (cpi->session, NULL);
-      else
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
-      break;
-    case CONSENSUS_ROUND_INVENTORY:
-      cpi->inventory_synced = GNUNET_YES;
-      if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
-        round_over (cpi->session, NULL);
-      /* FIXME: maybe go to next round */
-      break;
-    default:
-      GNUNET_break (0);
-  }
-}
-
-
-/**
- * Gets called when the other peer wants us to inform that
- * it has decoded our ibf and sent us all elements / requests
- */
-static int
-handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  struct ConsensusRoundMessage *fin_msg;
-
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_INVENTORY:
-      cpi->inventory_synced = GNUNET_YES;
-    case CONSENSUS_ROUND_STOCK:
-    case CONSENSUS_ROUND_EXCHANGE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      fin_msg = GNUNET_malloc (sizeof *fin_msg);
-      fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
-      fin_msg->header.size = htons (sizeof *fin_msg);
-      fin_msg->round = cpi->apparent_round;
-      /* the subround os over once we kicked off sending the fin msg */
-      /* FIXME: assert we are talking to the right peer! */
-      queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi);
-      /* FIXME: mark peer as synced */
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
-      break;
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * The other peer wants us to inform that he sent us all the elements we requested.
- */
-static int
-handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  struct ConsensusRoundMessage *round_msg;
-  round_msg = (struct ConsensusRoundMessage *) msg;
-  /* FIXME: only call subround_over if round is the current one! */
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_STOCK:
-      if (cpi->session->current_round != round_msg->round)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-        cpi->ibf_state = IBF_STATE_NONE;
-        cpi->ibf_bucket_counter = 0;
-        break;
-      }
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      cpi->exp_subround_finished = GNUNET_YES;
-      if (GNUNET_YES == exp_subround_finished (cpi->session))
-        subround_over (cpi->session, NULL);
-      else
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
-    break;
-    case CONSENSUS_ROUND_INVENTORY:
-      cpi->inventory_synced = GNUNET_YES;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      if (inventory_round_finished (cpi->session))
-        round_over (cpi->session, NULL);
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
-      break;
-  }
-  return GNUNET_YES;
-}
-
-
-static struct StrataEstimator *
-strata_estimator_create ()
-{
-  struct StrataEstimator *se;
-  int i;
-
-  /* fixme: allocate everything in one chunk */
-
-  se = GNUNET_malloc (sizeof (struct StrataEstimator));
-  se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT);
-  for (i = 0; i < STRATA_COUNT; i++)
-    se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
-
-  return se;
-}
-
-static void
-strata_estimator_destroy (struct StrataEstimator *se)
-{
-  int i;
-  for (i = 0; i < STRATA_COUNT; i++)
-    ibf_destroy (se->strata[i]);
-  GNUNET_free (se->strata);
-  GNUNET_free (se);
-}
-
-
-static int
-is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
-{
-  switch (strata_msg->round)
-  {
-    case CONSENSUS_ROUND_STOCK:
-    case CONSENSUS_ROUND_EXCHANGE:
-      /* here, we also have to compare subrounds */
-      if ( (strata_msg->round != session->current_round) ||
-           (strata_msg->exp_round != session->exp_round) ||
-           (strata_msg->exp_subround != session->exp_subround))
-        return GNUNET_YES;
-      break;
-    default:
-      if (session->current_round != strata_msg->round)
-        return GNUNET_YES;
-    break;
-  }
-  return GNUNET_NO;
-}
-
-
-/**
- * Called when a peer sends us its strata estimator.
- * In response, we sent out IBF of appropriate size back.
- *
- * @param cpi session
- * @param strata_msg message
- */
-static int
-handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
-{
-  int i;
-  unsigned int diff;
-  void *buf;
-  size_t size;
-
-  if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY))
-  {
-    /* we still have to handle this request appropriately */
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
-                cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-  }
-  else if (is_premature_strata_message (cpi->session, strata_msg))
-  {
-    if (GNUNET_NO == cpi->replaying_strata_message)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n",
-                  cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
-      cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
-    }
-    return GNUNET_YES;
-  }
-
-  if (NULL == cpi->se)
-    cpi->se = strata_estimator_create ();
-
-  cpi->apparent_round = strata_msg->round;
-
-  size = ntohs (strata_msg->header.size);
-  buf = (void *) &strata_msg[1];
-  for (i = 0; i < STRATA_COUNT; i++)
-  {
-    int res;
-    res = ibf_read (&buf, &size, cpi->se->strata[i]);
-    GNUNET_assert (GNUNET_OK == res);
-  }
-
-  diff = estimate_difference (cpi->session->se, cpi->se);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
-              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
-
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_INVENTORY:
-    case CONSENSUS_ROUND_STOCK:
-      /* send IBF of the right size */
-      cpi->ibf_order = 0;
-      while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) )
-        cpi->ibf_order++;
-      if (cpi->ibf_order > MAX_IBF_ORDER)
-        cpi->ibf_order = MAX_IBF_ORDER;
-      cpi->ibf_order += 1;
-      /* create ibf if not already pre-computed */
-      prepare_ibf (cpi);
-      if (NULL != cpi->ibf)
-        ibf_destroy (cpi->ibf);
-      cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-      cpi->ibf_state = IBF_STATE_TRANSMITTING;
-      cpi->ibf_bucket_counter = 0;
-      send_ibf (cpi);
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
-                  cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      break;
-  }
-  return GNUNET_YES;
-}
-
-
-static int
-handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
-{
-  int num_buckets;
-  void *buf;
-
-  /* FIXME: find out if we're still expecting the same ibf! */
-
-  cpi->apparent_round = cpi->session->current_round;
-
-  num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
-  switch (cpi->ibf_state)
-  {
-    case IBF_STATE_NONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      cpi->ibf_state = IBF_STATE_RECEIVING;
-      cpi->ibf_order = digest->order;
-      cpi->ibf_bucket_counter = 0;
-      if (NULL != cpi->ibf)
-      {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
-      }
-      break;
-    case IBF_STATE_ANTICIPATE_DIFF:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
-                  cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      cpi->ibf_state = IBF_STATE_RECEIVING;
-      cpi->ibf_order = digest->order;
-      cpi->ibf_bucket_counter = 0;
-      if (NULL != cpi->ibf)
-      {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
-      }
-      break;
-    case IBF_STATE_RECEIVING:
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      return GNUNET_YES;
-  }
-
-  if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-    return GNUNET_YES;
-  }
-
-
-  if (NULL == cpi->ibf)
-    cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
-
-  buf = (void *) &digest[1];
-  ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
-
-  cpi->ibf_bucket_counter += num_buckets;
-
-  if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
-  {
-    cpi->ibf_state = IBF_STATE_DECODING;
-    cpi->ibf_bucket_counter = 0;
-    prepare_ibf (cpi);
-    ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
-    decode (cpi);
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * Handle an element that another peer sent us
- */
-static int
-handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
-{
-  struct PendingElement *pending_element;
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
-  size_t size;
-
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_STOCK:
-      /* FIXME: check if we really expect the element */
-    case CONSENSUS_ROUND_EXCHANGE:
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
-      return GNUNET_YES;
-  }
-
-  size = ntohs (element_msg->size) - sizeof *element_msg;
-
-  element = GNUNET_malloc (size + sizeof *element);
-  element->size = size;
-  memcpy (&element[1], &element_msg[1], size);
-  element->data = &element[1];
-
-  pending_element = GNUNET_malloc (sizeof *pending_element);
-  pending_element->element = element;
-  GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element);
-
-  client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
-  client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
-  client_element_msg->header.size = htons (size + sizeof *client_element_msg);
-  memcpy (&client_element_msg[1], &element[1], size);
-
-  queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size);
-
-  client_send_next (cpi->session);
-  
-  return GNUNET_YES;
-}
-
-
-/**
- * Handle a request for elements.
- * 
- * @param cpi peer that is requesting the element
- * @param msg the element request message
- */
-static int
-handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
-{
-  struct GNUNET_HashCode hashcode;
-  struct IBF_Key *ibf_key;
-  unsigned int num;
-
-  /* element requests are allowed in every round */
-
-  num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num);
-  
-  ibf_key = (struct IBF_Key *) &msg[1];
-  while (num--)
-  {
-    struct ElementList *head;
-    ibf_hashcode_from_key (*ibf_key, &hashcode);
-    head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
-    send_elements (cpi, head);
-    ibf_key++;
-  }
-  return GNUNET_YES;
-}
-
-/**
- * If necessary, send a message to the peer, depending on the current
- * round.
- */
-static void
-embrace_peer (struct ConsensusPeerInformation *cpi)
-{
-  GNUNET_assert (GNUNET_YES == cpi->hello);
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-      if (cpi->session->partner_outgoing != cpi)
-        break;
-      /* fallthrough */
-    case CONSENSUS_ROUND_INVENTORY:
-      /* fallthrough */
-    case CONSENSUS_ROUND_STOCK:
-      if (cpi == cpi->session->partner_outgoing)
-        send_strata_estimator (cpi);
-    default:
-      break;
-  }
-}
-
-
-/**
- * Handle a HELLO-message, send when another peer wants to join a session where
- * our peer is a member. The session may or may not be inhabited yet.
- */
-static int
-handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
-{
-  /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
-  struct ConsensusSession *session;
-
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
-    {
-      int idx;
-      idx = get_peer_idx (&inc->peer_id, session);
-      GNUNET_assert (-1 != idx);
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
-      inc->cpi = &session->info[idx];
-      inc->cpi->mst = inc->mst;
-      inc->cpi->hello = GNUNET_YES;
-      inc->cpi->socket = inc->socket;
-      embrace_peer (inc->cpi);
-      return GNUNET_YES;
-    }
-    session = session->next;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
-  return GNUNET_NO;
-}
-
-
-/**
- * Send a strata estimator.
- *
- * @param cpi the peer
- */
-static void
-send_strata_estimator (struct ConsensusPeerInformation *cpi)
-{
-  struct StrataMessage *strata_msg;
-  void *buf;
-  size_t msize;
-  int i;
-
-  cpi->apparent_round = cpi->session->current_round;
-  cpi->ibf_state = IBF_STATE_NONE;
-  cpi->ibf_bucket_counter = 0;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n",
-              cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info));
-
-  msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
-
-  strata_msg = GNUNET_malloc (msize);
-  strata_msg->header.size = htons (msize);
-  strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
-  strata_msg->round = cpi->session->current_round;
-  strata_msg->exp_round = cpi->session->exp_round;
-  strata_msg->exp_subround = cpi->session->exp_subround;
-  
-  buf = &strata_msg[1];
-  for (i = 0; i < STRATA_COUNT; i++)
-  {
-    ibf_write (cpi->session->se->strata[i], &buf, NULL);
-  }
-
-  queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
-}
-
-
-/**
- * Send an IBF of the order specified in cpi.
- *
- * @param cpi the peer
- */
-static void
-send_ibf (struct ConsensusPeerInformation *cpi)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
-              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-
-  cpi->ibf_bucket_counter = 0;
-  while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
-  {
-    int num_buckets;
-    void *buf;
-    struct DifferenceDigest *digest;
-    int msize;
-
-    num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
-    /* limit to maximum */
-    if (num_buckets > BUCKETS_PER_MESSAGE)
-      num_buckets = BUCKETS_PER_MESSAGE;
-
-    msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
-
-    digest = GNUNET_malloc (msize);
-    digest->header.size = htons (msize);
-    digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
-    digest->order = cpi->ibf_order;
-    digest->round = cpi->apparent_round;
-
-    buf = &digest[1];
-    ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
-
-    queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
-
-    cpi->ibf_bucket_counter += num_buckets;
-  }
-  cpi->ibf_bucket_counter = 0;
-  cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
-}
-
-
-/**
- * Decode the current diff ibf, and send elements/requests/reports/
- *
- * @param cpi partner peer
- */
-static void
-decode (struct ConsensusPeerInformation *cpi)
-{
-  struct IBF_Key key;
-  struct GNUNET_HashCode hashcode;
-  int side;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-
-  for (;;)
-  {
-    int res;
-
-    res = ibf_decode (cpi->ibf, &side, &key);
-    if (GNUNET_SYSERR == res)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
-      /* decoding failed, we tell the other peer by sending our ibf with a larger order */
-      cpi->ibf_order++;
-      prepare_ibf (cpi);
-      cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-      cpi->ibf_state = IBF_STATE_TRANSMITTING;
-      cpi->ibf_bucket_counter = 0;
-      send_ibf (cpi);
-      return;
-    }
-    if (GNUNET_NO == res)
-    {
-      struct ConsensusRoundMessage *msg;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
-      msg = GNUNET_malloc (sizeof *msg);
-      msg->header.size = htons (sizeof *msg);
-      msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
-      msg->round = cpi->apparent_round;
-      queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
-      return;
-    }
-    if (-1 == side)
-    {
-      struct ElementList *head;
-      /* we have the element(s), send it to the other peer */
-      ibf_hashcode_from_key (key, &hashcode);
-      head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
-      send_elements (cpi, head);
-    }
-    else
-    {
-      struct ElementRequest *msg;
-      size_t msize;
-      struct IBF_Key *p;
-
-      msize = (sizeof *msg) + sizeof (struct IBF_Key);
-      msg = GNUNET_malloc (msize);
-      switch (cpi->apparent_round)
-      {
-        case CONSENSUS_ROUND_STOCK:
-          /* FIXME: check if we really want to request the element */
-        case CONSENSUS_ROUND_EXCHANGE:
-          msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
-          break;
-        case CONSENSUS_ROUND_INVENTORY:
-          msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
-          break;
-        default:
-          GNUNET_assert (0);
-      }
-      msg->header.size = htons (msize);
-      p = (struct IBF_Key *) &msg[1];
-      *p = key;
-      queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
-    }
-  }
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete message is received by the tokenizer.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi =  cls;
-  switch (ntohs (message->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
-      return handle_p2p_strata (cpi, (struct StrataMessage *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
-      return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
-      return handle_p2p_element (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
-      return handle_p2p_element_report (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
-      return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
-      return handle_p2p_synced (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
-      return handle_p2p_fin (cpi, message);
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
-                  ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Handle tokenized messages from stream sockets.
- * Delegate them if the socket belongs to a session,
- * handle hello messages otherwise.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure, unused
- * @param client incoming socket this message comes from
- * @param message the actual message
- *
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
-{
-  struct IncomingSocket *inc;
-  inc = (struct IncomingSocket *) client;
-  switch (ntohs( message->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
-      return handle_p2p_hello (inc, (struct ConsensusHello *) message);
-    default:
-      if (NULL != inc->cpi)
-        return mst_session_callback (inc->cpi, client, message);
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
-                  ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Functions of this type are called upon new stream connection from other peers
- * or upon binding error which happen when the app_port given in
- * GNUNET_STREAM_listen() is already taken.
- *
- * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
- * @param initiator the identity of the peer who wants to establish a stream
- *            with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- *             stream (the socket will be invalid after the call)
- */
-static int
-listen_cb (void *cls,
-           struct GNUNET_STREAM_Socket *socket,
-           const struct GNUNET_PeerIdentity *initiator)
-{
-  struct IncomingSocket *incoming;
-  GNUNET_assert (NULL != socket);
-  incoming = GNUNET_malloc (sizeof *incoming);
-  incoming->socket = socket;
-  incoming->peer_id = *initiator;
-  incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &incoming_stream_data_processor, incoming);
-  incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
-  GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
-  return GNUNET_OK;
-}
-
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-destroy_element_list_iter (void *cls,
-                           const struct GNUNET_HashCode * key,
-                           void *value)
-{
-  struct ElementList *el;
-  el = value;
-  while (NULL != el)
-  {
-    struct ElementList *el_old;
-    el_old = el;
-    el = el->next;
-    GNUNET_free (el_old->element_hash);
-    GNUNET_free (el_old->element);
-    GNUNET_free (el_old);
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * Destroy a session, free all resources associated with it.
- * 
- * @param session the session to destroy
- */
-static void
-destroy_session (struct ConsensusSession *session)
-{
-  int i;
-
-  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
-  GNUNET_SERVER_client_drop (session->client);
-  session->client = NULL;
-  if (NULL != session->shuffle)
-  {
-    GNUNET_free (session->shuffle);
-    session->shuffle = NULL;
-  }
-  if (NULL != session->se)
-  {
-    strata_estimator_destroy (session->se);
-    session->se = NULL;
-  }
-  if (NULL != session->info)
-  {
-    for (i = 0; i < session->num_peers; i++)
-    {
-      struct ConsensusPeerInformation *cpi;
-      cpi = &session->info[i];
-      if ((NULL != cpi) && (NULL != cpi->socket))
-      {
-        if (NULL != cpi->rh)
-        {
-          GNUNET_STREAM_read_cancel (cpi->rh);
-          cpi->rh = NULL;
-        } 
-        if (NULL != cpi->wh)
-        {
-          GNUNET_STREAM_write_cancel (cpi->wh);
-          cpi->wh = NULL;
-        } 
-        GNUNET_STREAM_close (cpi->socket);
-        cpi->socket = NULL;
-      }
-      if (NULL != cpi->se)
-      {
-        strata_estimator_destroy (cpi->se);
-        cpi->se = NULL;
-      }
-      if (NULL != cpi->ibf)
-      {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
-      }
-      if (NULL != cpi->mst)
-      {
-        GNUNET_SERVER_mst_destroy (cpi->mst);
-        cpi->mst = NULL;
-      }
-    }
-    GNUNET_free (session->info);
-    session->info = NULL;
-  }
-  if (NULL != session->ibfs)
-  {
-    for (i = 0; i <= MAX_IBF_ORDER; i++)
-    {
-      if (NULL != session->ibfs[i])
-      {
-        ibf_destroy (session->ibfs[i]);
-        session->ibfs[i] = NULL;
-      }
-    }
-    GNUNET_free (session->ibfs);
-    session->ibfs = NULL;
-  }
-  if (NULL != session->values)
-  {
-    GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL);
-    GNUNET_CONTAINER_multihashmap_destroy (session->values);
-    session->values = NULL;
-  }
-  GNUNET_free (session);
-}
-
-
-/**
- * Disconnect a client, and destroy all sessions associated with it.
- *
- * @param client the client to disconnect
- */
-static void
-disconnect_client (struct GNUNET_SERVER_Client *client)
-{
-  struct ConsensusSession *session;
-  GNUNET_SERVER_client_disconnect (client);
-  
-  /* if the client owns a session, remove it */
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (client == session->client)
-    {
-      destroy_session (session);
-      break;
-    }
-    session = session->next;
-  }
-}
-
-
-/**
- * Compute a global, (hopefully) unique consensus session id,
- * from the local id of the consensus session, and the identities of all participants.
- * Thus, if the local id of two consensus sessions coincide, but are not comprised of
- * exactly the same peers, the global id will be different.
- *
- * @param session session to generate the global id for
- * @param session_id local id of the consensus session
- */
-static void
-compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
-{
-  int i;
-  struct GNUNET_HashCode tmp;
-
-  session->global_id = *session_id;
-  for (i = 0; i < session->num_peers; ++i)
-  {
-    GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
-    session->global_id = tmp;
-    GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
-    session->global_id = tmp;
-  }
-}
-
-
-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_queued (void *cls, size_t size,
-                 void *buf)
-{
-  struct ConsensusSession *session;
-  struct QueuedMessage *qmsg;
-  size_t msg_size;
-
-  session = cls;
-  session->client_th = NULL;
-
-  qmsg = session->client_messages_head;
-  GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
-  GNUNET_assert (qmsg);
-
-  if (NULL == buf)
-  {
-    destroy_session (session);
-    return 0;
-  }
-
-  msg_size = ntohs (qmsg->msg->size);
-
-  GNUNET_assert (size >= msg_size);
-
-  memcpy (buf, qmsg->msg, msg_size);
-  GNUNET_free (qmsg->msg);
-  GNUNET_free (qmsg);
-
-  client_send_next (session);
-
-  return msg_size;
-}
-
-
-/**
- * Schedule transmitting the next queued message (if any) to the inhabiting client of a session.
- *
- * @param session the consensus session
- */
-static void
-client_send_next (struct ConsensusSession *session)
-{
-
-  GNUNET_assert (NULL != session);
-
-  if (NULL != session->client_th)
-    return;
-
-  if (NULL != session->client_messages_head)
-  {
-    int msize;
-    msize = ntohs (session->client_messages_head->msg->size);
-    session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
-                                                              GNUNET_TIME_UNIT_FOREVER_REL,
-                                                              &transmit_queued, session);
-  }
-}
-
-
-/**
- * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
- * the correct signature to be used with e.g. qsort.
- * We use this function instead.
- *
- * @param h1 some hash code
- * @param h2 some hash code
- * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
- */
-static int
-hash_cmp (const void *h1, const void *h2)
-{
-  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
-}
-
-
-/**
- * Search peer in the list of peers in session.
- *
- * @param peer peer to find
- * @param session session with peer
- * @return index of peer, -1 if peer is not in session
- */
-static int
-get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
-{
-  int i;
-  for (i = 0; i < session->num_peers; i++)
-    if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
-      return i;
-  return -1;
-}
-
-
-/**
- * Called when stream has finishes writing the hello message
- */
-static void
-hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct ConsensusPeerInformation *cpi;
-
-  cpi = cls;
-  cpi->wh = NULL;
-  cpi->hello = GNUNET_YES;
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  embrace_peer (cpi);
-}
-
-
-/**
- * Called when we established a stream connection to another peer
- *
- * @param cls cpi of the peer we just connected to
- * @param socket socket to use to communicate with the other side (read/write)
- */
-static void
-open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
-{
-  struct ConsensusPeerInformation *cpi;
-  struct ConsensusHello *hello;
-
-  cpi = cls;
-  hello = GNUNET_malloc (sizeof *hello);
-  hello->header.size = htons (sizeof *hello);
-  hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
-  memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
-  GNUNET_assert (NULL == cpi->mst);
-  cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
-  cpi->wh =
-      GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
-  GNUNET_free (hello);
-  cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                &session_stream_data_processor, cpi);
-}
-
-
-/**
- * Create the sorted list of peers for the session,
- * add the local peer if not in the join message.
- */
-static void
-initialize_session_peer_list (struct ConsensusSession *session)
-{
-  unsigned int local_peer_in_list;
-  uint32_t listed_peers;
-  const struct GNUNET_PeerIdentity *msg_peers;
-  struct GNUNET_PeerIdentity *peers;
-  unsigned int i;
-
-  GNUNET_assert (NULL != session->join_msg);
-
-  /* peers in the join message, may or may not include the local peer */
-  listed_peers = ntohl (session->join_msg->num_peers);
-  
-  session->num_peers = listed_peers;
-
-  msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
-
-  local_peer_in_list = GNUNET_NO;
-  for (i = 0; i < listed_peers; i++)
-  {
-    if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
-    {
-      local_peer_in_list = GNUNET_YES;
-      break;
-    }
-  }
-
-  if (GNUNET_NO == local_peer_in_list)
-    session->num_peers++;
-
-  peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
-
-  if (GNUNET_NO == local_peer_in_list)
-    peers[session->num_peers - 1] = *my_peer;
-
-  memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
-  qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
-
-  session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
-
-  for (i = 0; i < session->num_peers; ++i)
-  {
-    /* initialize back-references, so consensus peer information can
-     * be used as closure */
-    session->info[i].session = session;
-    session->info[i].peer_id = peers[i];
-  }
-
-  free (peers);
-}
-
-
-static void
-strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key)
-{
-  uint32_t v;
-  int i;
-  v = key->bits[0];
-  /* count trailing '1'-bits of v */
-  for (i = 0; v & 1; v>>=1, i++)
-    /* empty */;
-  ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
-}
-
-
-/**
- * Add incoming peer connections to the session,
- * for peers who have connected to us before the local session has been established
- *
- * @param session ...
- */
-static void
-add_incoming_peers (struct ConsensusSession *session)
-{
-  struct IncomingSocket *inc;
-  inc = incoming_sockets_head;
-
-  while (NULL != inc)
-  {
-    if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid))
-    {
-      int i;
-      for (i = 0; i < session->num_peers; i++)
-      {
-        struct ConsensusPeerInformation *cpi;
-        cpi = &session->info[i];
-        if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity)))
-        {
-          cpi->socket = inc->socket;
-          inc->cpi = cpi;
-          inc->cpi->mst = inc->mst;
-          inc->cpi->hello = GNUNET_YES;
-          break;
-        }
-      }
-    }
-    inc = inc->next;
-  }
-}
-
-
-/**
- * Initialize the session, continue receiving messages from the owning client
- *
- * @param session the session to initialize
- */
-static void
-initialize_session (struct ConsensusSession *session)
-{
-  const struct ConsensusSession *other_session;
-
-  GNUNET_assert (NULL != session->join_msg);
-  initialize_session_peer_list (session);
-  session->current_round = CONSENSUS_ROUND_BEGIN;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
-  compute_global_id (session, &session->join_msg->session_id);
-
-  /* Check if some local client already owns the session. */
-  other_session = sessions_head;
-  while (NULL != other_session)
-  {
-    if ((other_session != session) && 
-        (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
-    {
-      /* session already owned by another client */
-      GNUNET_break (0);
-      disconnect_client (session->client);
-      return;
-    }
-    other_session = other_session->next;
-  }
-
-  session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
-  session->local_peer_idx = get_peer_idx (my_peer, session);
-  GNUNET_assert (-1 != session->local_peer_idx);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
-  session->se = strata_estimator_create ();
-  session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
-  GNUNET_free (session->join_msg);
-  session->join_msg = NULL;
-  add_incoming_peers (session);
-  GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
-}
-
-
-/**
- * Called when a client wants to join a consensus session.
- *
- * @param cls unused
- * @param client client that sent the message
- * @param m message sent by the client
- */
-static void
-client_join (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *m)
-{
-  struct ConsensusSession *session;
-
-  // make sure the client has not already joined a session
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-    {
-      GNUNET_break (0);
-      disconnect_client (client);
-      return;
-    }
-    session = session->next;
-  }
-
-  session = GNUNET_malloc (sizeof (struct ConsensusSession));
-  session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
-  session->client = client;
-  GNUNET_SERVER_client_keep (client);
-
-  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
-
-  // Initialize session later if local peer identity is not known yet.
-  if (NULL == my_peer)
-  {
-    GNUNET_SERVER_disable_receive_done_warning (client);
+  /* don't kick off next round if we're shutting down */
+  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
     return;
-  }
-
-  initialize_session (session);
-}
-
-
-/**
- * Hash a block of data, producing a replicated ibf hash.
- */
-static void
-hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
-{
-  struct IBF_Key ibf_key;
-  GNUNET_CRYPTO_hash (block, size, ret);
-  ibf_key = ibf_key_from_hashcode (ret);
-  ibf_hashcode_from_key (ibf_key, ret);
-}
 
 
+  session = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
 
 
-static void
-insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
-{
-  struct GNUNET_HashCode hash;
-  struct ElementList *head;
-
-  hash_for_ibf (element->data, element->size, &hash);
-
-  head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
-
-  if (NULL == head)
+  if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
   {
   {
-    int i;
-
-    head = GNUNET_malloc (sizeof *head);
-    head->element = element;
-    head->next = NULL;
-    head->element_hash = GNUNET_memdup (&hash, sizeof hash);
-    GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-    strata_estimator_insert (session->se, &hash);
-
-    for (i = 0; i <= MAX_IBF_ORDER; i++)
-      if (NULL != session->ibfs[i])
-        ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
+    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
+    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
   }
   }
-  else
+
+  switch (session->current_round)
   {
   {
-    struct ElementList *el;
-    el = GNUNET_malloc (sizeof *el);
-    head->element = element;
-    head->next = NULL;
-    head->element_hash = GNUNET_memdup (&hash, sizeof hash);
-    while (NULL != head->next)
-      head = head->next;
-    head->next = el;
+    case CONSENSUS_ROUND_BEGIN:
+      session->current_round = CONSENSUS_ROUND_EXCHANGE;
+      session->exp_round = 0;
+      subround_over (session, NULL);
+      break;
+    case CONSENSUS_ROUND_EXCHANGE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
+                  session->local_peer_idx);
+      session->current_round = CONSENSUS_ROUND_FINISH;
+      GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
+      break;
+    default:
+      GNUNET_assert (0);
   }
 }
 
 
 /**
   }
 }
 
 
 /**
- * Called when a client performs an insert operation.
+ * Create a new permutation for the session's peers in session->shuffle.
+ * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
+ * both the global session id and the current round index.
  *
  *
- * @param cls (unused)
- * @param client client handle
- * @param m message sent by the client
+ * @param session the session to create the new permutation for
  */
  */
-void
-client_insert (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *m)
+static void
+shuffle (struct ConsensusSession *session)
 {
 {
-  struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_ElementMessage *msg;
-  struct GNUNET_CONSENSUS_Element *element;
-  int element_size;
-
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-      break;
-  }
-
-  if (NULL == session)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
-    GNUNET_SERVER_client_disconnect (client);
-    return;
-  }
-
-  msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
-  element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+  uint32_t i;
+  uint32_t randomness[session->num_peers-1];
 
 
-  element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
+  if (NULL == session->shuffle)
+    session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
+  if (NULL == session->shuffle_inv)
+    session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
 
 
-  element->type = msg->element_type;
-  element->size = element_size;
-  memcpy (&element[1], &msg[1], element_size);
-  element->data = &element[1];
-
-  GNUNET_assert (NULL != element->data);
-
-  insert_element (session, element);
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
+                    &session->exp_round, sizeof (uint32_t),
+                     &session->global_id, sizeof (struct GNUNET_HashCode),
+                    NULL);
 
 
-  client_send_next (session);
-}
-
-
-
-/**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
- *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
- */
-static void 
-write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct ConsensusPeerInformation *cpi;
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle[i] = i;
 
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  cpi = cls;
-  cpi->wh = NULL;
-  if (NULL != cpi->messages_head)
+  for (i = session->num_peers - 1; i > 0; i--)
   {
   {
-    struct QueuedMessage *qm;
-    qm = cpi->messages_head;
-    GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm);
-    cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
-                                   GNUNET_TIME_UNIT_FOREVER_REL,
-                                   write_queued, cpi);
-    if (NULL != qm->cb)
-      qm->cb (qm->cls);
-    GNUNET_free (qm->msg);
-    GNUNET_free (qm);
-    GNUNET_assert (NULL != cpi->wh);
+    uint32_t x;
+    uint32_t tmp;
+    x = randomness[i-1] % session->num_peers;
+    tmp = session->shuffle[x];
+    session->shuffle[x] = session->shuffle[i];
+    session->shuffle[i] = tmp;
   }
   }
-}
-
 
 
-static void
-shuffle (struct ConsensusSession *session)
-{
-  /* FIXME: implement */
+  /* create the inverse */
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle_inv[session->shuffle[i]] = i;
 }
 
 
 /**
  * Find and set the partner_incoming and partner_outgoing of our peer,
 }
 
 
 /**
  * Find and set the partner_incoming and partner_outgoing of our peer,
- * one of them may not exist in most cases.
+ * one of them may not exist (and thus set to NULL) if the number of peers
+ * in the session is not a power of two.
  *
  * @param session the consensus session
  */
 static void
 find_partners (struct ConsensusSession *session)
 {
  *
  * @param session the consensus session
  */
 static void
 find_partners (struct ConsensusSession *session)
 {
-  int mark[session->num_peers];
-  int i;
-  memset (mark, 0, session->num_peers * sizeof (int));
-  session->partner_incoming = session->partner_outgoing = NULL;
-  for (i = 0; i < session->num_peers; i++)
-  {
-    int arc;
-    if (0 != mark[i])
-      continue;
-    arc = (i + (1 << session->exp_subround)) % session->num_peers;
-    mark[i] = mark[arc] = 1;
-    GNUNET_assert (i != arc);
-    if (i == session->local_peer_idx)
-    {
-      GNUNET_assert (NULL == session->partner_outgoing);
-      session->partner_outgoing = &session->info[session->shuffle[arc]];
-      session->partner_outgoing->exp_subround_finished = GNUNET_NO;
-    }
-    if (arc == session->local_peer_idx)
+  unsigned int arc;
+  unsigned int num_ghosts;
+  unsigned int largest_arc;
+  int partner_idx;
+
+  /* shuffled local index */
+  int my_idx = session->shuffle[session->local_peer_idx];
+
+  /* distance to neighboring peer in current subround */
+  arc = 1 << session->exp_subround;
+  largest_arc = 1;
+  while (largest_arc < session->num_peers)
+    largest_arc <<= 1;
+  num_ghosts = largest_arc - session->num_peers;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
+
+  if (0 == (my_idx & arc))
+  {
+    /* we are outgoing */
+    partner_idx = (my_idx + arc) % session->num_peers;
+    session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
+    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+    /* are we a 'ghost' of a peer that would exist if
+     * the number of peers was a power of two, and thus have to partner
+     * with an additional peer?
+     */
+    if (my_idx < num_ghosts)
     {
     {
-      GNUNET_assert (NULL == session->partner_incoming);
-      session->partner_incoming = &session->info[session->shuffle[i]];
-      session->partner_incoming->exp_subround_finished = GNUNET_NO;
+      int ghost_partner_idx;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
+      ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+      /* platform dependent; modulo sometimes returns negative values */
+      if (ghost_partner_idx < 0)
+        ghost_partner_idx += session->num_peers;
+      /* we only need to have a ghost partner if the partner is outgoing */
+      if (0 == (ghost_partner_idx & arc))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
+        session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
+        session->partner_incoming->exp_subround_finished = GNUNET_NO;
+        return;
+      }
     }
     }
+    session->partner_incoming = NULL;
+    return;
   }
   }
+  /* we only have an incoming connection */
+  partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+  if (partner_idx < 0)
+    partner_idx += session->num_peers;
+  session->partner_outgoing = NULL;
+  session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
+  session->partner_incoming->exp_subround_finished = GNUNET_NO;
 }
 
 
 }
 
 
+/**
+ * Callback for set operation results. Called for each element
+ * in the result set.
+ *
+ * @param cls closure
+ * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
+ * @param status see enum GNUNET_SET_Status
+ */
 static void
 static void
-replay_premature_message (struct ConsensusPeerInformation *cpi)
+set_result_cb (void *cls,
+               const struct GNUNET_SET_Element *element,
+               enum GNUNET_SET_Status status)
 {
 {
-  if (NULL != cpi->premature_strata_message)
+  struct ConsensusPeerInformation *cpi = cls;
+  unsigned int remote_idx = cpi - cpi->session->info;
+  unsigned int local_idx = cpi->session->local_peer_idx;
+
+  GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
+                 (cpi == cpi->session->partner_incoming));
+
+  switch (status)
   {
   {
-    struct StrataMessage *sm;
+    case GNUNET_SET_STATUS_OK:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
+                  local_idx, remote_idx);
+      break;
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
+                  local_idx, remote_idx);
+      cpi->set_op = NULL;
+      return;
+    case GNUNET_SET_STATUS_HALF_DONE:
+    case GNUNET_SET_STATUS_DONE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
+                  local_idx, remote_idx);
+      cpi->exp_subround_finished = GNUNET_YES;
+      cpi->set_op = NULL;
+      if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
+                    local_idx);
+        subround_over (cpi->session, NULL);
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
+                    local_idx);
+      }
+      return;
+    default:
+      GNUNET_break (0);
+      return;
+  }
 
 
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
-    sm = cpi->premature_strata_message;
-    cpi->premature_strata_message = NULL;
+  switch (cpi->session->current_round)
+  {
+    case CONSENSUS_ROUND_EXCHANGE:
+      GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
+      break;
+    default:
+      GNUNET_break (0);
+      return;
+  }
+}
 
 
-    cpi->replaying_strata_message = GNUNET_YES;
-    handle_p2p_strata (cpi, sm);
-    cpi->replaying_strata_message = GNUNET_NO;
 
 
-    GNUNET_free (sm);
+/**
+ * Compare the round the session is in with the round of the given context message.
+ *
+ * @param session a consensus session
+ * @param ri a round context message
+ * @return 0 if it's the same round, -1 if the session is in an earlier round,
+ *         1 if the session is in a later round
+ */
+static int
+rounds_compare (struct ConsensusSession *session,
+                struct RoundInfo* ri)
+{
+  if (session->current_round < ri->round)
+    return -1;
+  if (session->current_round > ri->round)
+    return 1;
+  if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
+  {
+    if (session->exp_round < ri->exp_round)
+      return -1;
+    if (session->exp_round > ri->exp_round)
+      return 1;
+    if (session->exp_subround < ri->exp_subround)
+      return -1;
+    if (session->exp_subround < ri->exp_subround)
+      return 1;
+    return 0;
   }
   }
+  /* comparing rounds when we are not in a exp round */
+  GNUNET_assert (0);
 }
 
 
 }
 
 
@@ -2285,25 +673,19 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
   session = cls;
   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
   session = cls;
-  /* don't send any messages from the last round */
-  /*
-  clear_peer_messages (session->partner_outgoing);
-  clear_peer_messages (session->partner_incoming);
-  for (i = 0; i < session->num_peers; i++)
-    clear_peer_messages (&session->info[i]);
-  */
   /* cancel timeout */
   /* cancel timeout */
-  if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
+  if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
+  {
     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
-  session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
-  /* check if we are done with the log phase, 2-peer consensus only does one log round */
-  if ( (session->exp_round == NUM_EXP_ROUNDS) ||
-       ((session->num_peers == 2) && (session->exp_round == 1)))
+    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
+  }
+
+  if (session->exp_round >= NUM_EXP_ROUNDS)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
     round_over (session, NULL);
     return;
   }
     round_over (session, NULL);
     return;
   }
+
   if (session->exp_round == 0)
   {
     /* initialize everything for the log-rounds */
   if (session->exp_round == 0)
   {
     /* initialize everything for the log-rounds */
@@ -2311,8 +693,10 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     session->exp_subround = 0;
     if (NULL == session->shuffle)
       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
     session->exp_subround = 0;
     if (NULL == session->shuffle)
       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
+    if (NULL == session->shuffle_inv)
+      session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
     for (i = 0; i < session->num_peers; i++)
     for (i = 0; i < session->num_peers; i++)
-      session->shuffle[i] = i;
+      session->shuffle[i] = session->shuffle_inv[i] = i;
   }
   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
   {
   }
   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
   {
@@ -2321,13 +705,68 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     session->exp_subround = 0;
     shuffle (session);
   }
     session->exp_subround = 0;
     shuffle (session);
   }
-  else 
+  else
   {
     session->exp_subround++;
   }
 
   {
     session->exp_subround++;
   }
 
+  /* determine the incoming and outgoing partner */
   find_partners (session);
 
   find_partners (session);
 
+  GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
+  GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
+
+  /* initiate set operation with the outgoing partner */
+  if (NULL != session->partner_outgoing)
+  {
+    struct GNUNET_CONSENSUS_RoundContextMessage *msg;
+    msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
+    msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
+    msg->header.size = htons (sizeof *msg);
+    msg->round = htonl (session->current_round);
+    msg->exp_round = htonl (session->exp_round);
+    msg->exp_subround = htonl (session->exp_subround);
+
+    if (NULL != session->partner_outgoing->set_op)
+    {
+      GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
+    }
+    session->partner_outgoing->set_op =
+        GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
+                            &session->global_id,
+                            (struct GNUNET_MessageHeader *) msg,
+                            0, /* FIXME: salt */
+                            GNUNET_SET_RESULT_ADDED,
+                            set_result_cb, session->partner_outgoing);
+    GNUNET_free (msg);
+    GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
+  }
+
+  /* commit to the delayed set operation */
+  if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
+  {
+    int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
+
+    if (NULL != session->partner_incoming->set_op)
+    {
+      GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
+      session->partner_incoming->set_op = NULL;
+    }
+    if (cmp == 0)
+    {
+      GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
+      session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
+      session->partner_incoming->delayed_set_op = NULL;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
+                  session->local_peer_idx, (int) (session->partner_incoming - session->info));
+    }
+    else
+    {
+      /* this should not happen -- a round has been skipped! */
+      GNUNET_break_op (0);
+    }
+  }
+
 #ifdef GNUNET_EXTRA_LOGGING
   {
     int in;
 #ifdef GNUNET_EXTRA_LOGGING
   {
     int in;
@@ -2340,203 +779,357 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       in = -1;
     else
       in = (int) (session->partner_incoming - session->info);
       in = -1;
     else
       in = (int) (session->partner_incoming - session->info);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
                 session->exp_round, session->exp_subround, in, out);
   }
 #endif /* GNUNET_EXTRA_LOGGING */
 
                 session->exp_round, session->exp_subround, in, out);
   }
 #endif /* GNUNET_EXTRA_LOGGING */
 
-  if (NULL != session->partner_incoming)
+}
+
+
+/**
+ * Search peer in the list of peers in session.
+ *
+ * @param peer peer to find
+ * @param session session with peer
+ * @return index of peer, -1 if peer is not in session
+ */
+static int
+get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
+{
+  int i;
+  for (i = 0; i < session->num_peers; i++)
+    if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
+      return i;
+  return -1;
+}
+
+
+/**
+ * Compute a global, (hopefully) unique consensus session id,
+ * from the local id of the consensus session, and the identities of all participants.
+ * Thus, if the local id of two consensus sessions coincide, but are not comprised of
+ * exactly the same peers, the global id will be different.
+ *
+ * @param session session to generate the global id for
+ * @param session_id local id of the consensus session
+ */
+static void
+compute_global_id (struct ConsensusSession *session,
+                  const struct GNUNET_HashCode *session_id)
+{
+  int i;
+  struct GNUNET_HashCode tmp;
+  struct GNUNET_HashCode phash;
+
+  /* FIXME: use kdf? */
+
+  session->global_id = *session_id;
+  for (i = 0; i < session->num_peers; ++i)
+  {
+    GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
+    GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
+    session->global_id = tmp;
+    GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
+    session->global_id = tmp;
+  }
+}
+
+
+/**
+ * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
+ * the correct signature to be used with e.g. qsort.
+ * We use this function instead.
+ *
+ * @param h1 some hash code
+ * @param h2 some hash code
+ * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
+ */
+static int
+hash_cmp (const void *h1, const void *h2)
+{
+  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
+}
+
+
+/**
+ * Create the sorted list of peers for the session,
+ * add the local peer if not in the join message.
+ */
+static void
+initialize_session_peer_list (struct ConsensusSession *session,
+                              struct GNUNET_CONSENSUS_JoinMessage *join_msg)
+{
+  unsigned int local_peer_in_list;
+  uint32_t listed_peers;
+  const struct GNUNET_PeerIdentity *msg_peers;
+  struct GNUNET_PeerIdentity *peers;
+  unsigned int i;
+
+  GNUNET_assert (NULL != join_msg);
+
+  /* peers in the join message, may or may not include the local peer */
+  listed_peers = ntohl (join_msg->num_peers);
+
+  session->num_peers = listed_peers;
+
+  msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
+
+  local_peer_in_list = GNUNET_NO;
+  for (i = 0; i < listed_peers; i++)
   {
   {
-    session->partner_incoming->ibf_state = IBF_STATE_NONE;
-    session->partner_incoming->exp_subround_finished = GNUNET_NO;
-    session->partner_incoming->ibf_bucket_counter = 0;
+    if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      local_peer_in_list = GNUNET_YES;
+      break;
+    }
+  }
+
+  if (GNUNET_NO == local_peer_in_list)
+    session->num_peers++;
+
+  peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
+
+  if (GNUNET_NO == local_peer_in_list)
+    peers[session->num_peers - 1] = my_peer;
+
+  memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
+  qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
 
 
-    /* maybe there's an early strata estimator? */
-    replay_premature_message (session->partner_incoming);
-  }
+  session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
 
 
-  if (NULL != session->partner_outgoing)
+  for (i = 0; i < session->num_peers; ++i)
   {
   {
-    session->partner_outgoing->ibf_state = IBF_STATE_NONE;
-    session->partner_outgoing->ibf_bucket_counter = 0;
-    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
-
-    if (NULL == session->partner_outgoing->socket)
-    {
-      session->partner_outgoing->socket =
-          GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
-                              open_cb, session->partner_outgoing,
-                              GNUNET_STREAM_OPTION_END);
-    }
-    else if (GNUNET_YES == session->partner_outgoing->hello)
-    {
-      send_strata_estimator (session->partner_outgoing);
-    }
-    /* else: do nothing, the send hello cb will handle this */
+    /* initialize back-references, so consensus peer information can
+     * be used as closure */
+    session->info[i].session = session;
+    session->info[i].peer_id = peers[i];
   }
 
   }
 
-  /*
-  session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
-                                                                   subround_over, session);
-  */
+  GNUNET_free (peers);
 }
 
 }
 
+
+/**
+ * Called when another peer wants to do a set operation with the
+ * local peer.
+ *
+ * @param cls closure
+ * @param other_peer the other peer
+ * @param context_msg message with application specific information from
+ *        the other peer
+ * @param request request from the other peer, use GNUNET_SET_accept
+ *        to accept it, otherwise the request will be refused
+ *        Note that we don't use a return value here, as it is also
+ *        necessary to specify the set we want to do the operation with,
+ *        whith sometimes can be derived from the context message.
+ *        Also necessary to specify the timeout.
+ */
 static void
 static void
-contact_peer_a2a (struct ConsensusPeerInformation *cpi)
+set_listen_cb (void *cls,
+               const struct GNUNET_PeerIdentity *other_peer,
+               const struct GNUNET_MessageHeader *context_msg,
+               struct GNUNET_SET_Request *request)
 {
 {
-  cpi->is_outgoing = GNUNET_YES;
-  if (NULL == cpi->socket)
+  struct ConsensusSession *session = cls;
+  struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
+  struct ConsensusPeerInformation *cpi;
+  struct GNUNET_SET_OperationHandle *set_op;
+  struct RoundInfo round_info;
+  int index;
+  int cmp;
+
+  if (NULL == context_msg)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  index = get_peer_idx (other_peer, session);
+
+  if (index < 0)
   {
   {
-    cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                      open_cb, cpi, GNUNET_STREAM_OPTION_END);
+    GNUNET_break_op (0);
+    return;
   }
   }
-  else if (GNUNET_YES == cpi->hello)
+
+  round_info.round = ntohl (msg->round);
+  round_info.exp_round = ntohl (msg->exp_round);
+  round_info.exp_subround = ntohl (msg->exp_subround);
+
+  cpi = &session->info[index];
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
+
+  switch (session->current_round)
   {
   {
-    send_strata_estimator (cpi);
+    case CONSENSUS_ROUND_BEGIN:
+      /* we're in the begin round, so requests for the exchange round may
+       * come in, they will be delayed for now! */
+    case CONSENSUS_ROUND_EXCHANGE:
+      cmp = rounds_compare (session, &round_info);
+      if (cmp > 0)
+      {
+        /* the other peer is too late */
+        GNUNET_break_op (0);
+        return;
+      }
+      /* kill old request, if any. this is legal,
+       * as the other peer would not make a new request if it would want to
+       * complete the old one! */
+      if (NULL != cpi->set_op)
+      {
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
+      }
+      set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+                                       set_result_cb, &session->info[index]);
+      if (cmp == 0)
+      {
+        cpi->set_op = set_op;
+        GNUNET_SET_commit (set_op, session->element_set);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
+      }
+      else
+      {
+        /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
+        cpi->delayed_set_op = set_op;
+        cpi->delayed_round_info = round_info;
+        cpi->exp_subround_finished = GNUNET_YES;
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
+      }
+      break;
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
+                  session->local_peer_idx, session->current_round, index);
+      GNUNET_break_op (0);
+      return;
   }
 }
 
   }
 }
 
+
 /**
 /**
- * Start the inventory round, contact all peers we are supposed to contact.
+ * Initialize the session, continue receiving messages from the owning client
  *
  *
- * @param session the current session
+ * @param session the session to initialize
+ * @param join_msg the join message from the client
  */
 static void
  */
 static void
-start_inventory (struct ConsensusSession *session)
+initialize_session (struct ConsensusSession *session,
+                    struct GNUNET_CONSENSUS_JoinMessage *join_msg)
 {
 {
-  int i;
-  int last;
+  struct ConsensusSession *other_session;
 
 
-  for (i = 0; i < session->num_peers; i++)
-  {
-    session->info[i].ibf_bucket_counter = 0;
-    session->info[i].ibf_state = IBF_STATE_NONE;
-    session->info[i].is_outgoing = GNUNET_NO;
-  }
+  initialize_session_peer_list (session, join_msg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
+  compute_global_id (session, &join_msg->session_id);
 
 
-  last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
-  i = (session->local_peer_idx + 1) % session->num_peers;
-  while (i != last)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
-    contact_peer_a2a (&session->info[i]);
-    session->info[i].is_outgoing = GNUNET_YES;
-    i = (i + 1) % session->num_peers;
-  }
-  // tie-breaker for even number of peers
-  if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
+  /* check if some local client already owns the session.
+   * it is only legal to have a session with an existing global id
+   * if all other sessions with this global id are finished.*/
+  other_session = sessions_head;
+  while (NULL != other_session)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
-    session->info[last].is_outgoing = GNUNET_YES;
-    contact_peer_a2a (&session->info[last]);
+    if ((other_session != session) &&
+        (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
+    {
+      if (CONSENSUS_ROUND_FINISH != other_session->current_round)
+      {
+        GNUNET_break (0);
+        destroy_session (session);
+        return;
+      }
+      break;
+    }
+    other_session = other_session->next;
   }
 
   }
 
-  for (i = 0; i < session->num_peers; i++)
-  {
-    if (GNUNET_NO == session->info[i].is_outgoing)
-      replay_premature_message (&session->info[i]);
-  }
+  session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
+  session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
+
+  session->local_peer_idx = get_peer_idx (&my_peer, session);
+  GNUNET_assert (-1 != session->local_peer_idx);
+  session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  GNUNET_assert (NULL != session->element_set);
+  session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
+                                             &session->global_id,
+                                             set_listen_cb, session);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
 }
 
 }
 
-static void
-send_client_conclude_done (struct ConsensusSession *session)
+
+static struct ConsensusSession *
+get_session_by_client (struct GNUNET_SERVER_Client *client)
 {
 {
-  struct GNUNET_MessageHeader *msg;
-  session->current_round = CONSENSUS_ROUND_FINISH;
-  msg = GNUNET_malloc (sizeof *msg);
-  msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
-  msg->size = htons (sizeof *msg);
-  queue_client_message (session, msg);
-  client_send_next (session);
+  struct ConsensusSession *session;
+
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (session->client == client)
+      return session;
+    session = session->next;
+  }
+  return NULL;
 }
 
 }
 
+
 /**
 /**
- * Start the next round.
- * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
+ * Called when a client wants to join a consensus session.
  *
  *
- * @param cls the session
- * @param tc task context, for when this task is invoked by the scheduler,
- *           NULL if invoked for another reason
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
  */
  */
-static void 
-round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static void
+client_join (void *cls,
+             struct GNUNET_SERVER_Client *client,
+             const struct GNUNET_MessageHeader *m)
 {
   struct ConsensusSession *session;
 
 {
   struct ConsensusSession *session;
 
-  /* don't kick off next round if we're shutting down */
-  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
-
-  session = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
-
-  /*
-  for (i = 0; i < session->num_peers; i++)
-    clear_peer_messages (&session->info[i]);
-  */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
 
 
-  if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
+  session = get_session_by_client (client);
+  if (NULL != session)
   {
   {
-    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
-    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
   }
   }
+  session = GNUNET_new (struct ConsensusSession);
+  session->client = client;
+  session->client_mq = GNUNET_MQ_queue_for_server_client (client);
+  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+  initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
 
-  switch (session->current_round)
-  {
-    case CONSENSUS_ROUND_BEGIN:
-      session->current_round = CONSENSUS_ROUND_EXCHANGE;
-      session->exp_round = 0;
-      subround_over (session, NULL);
-      break;
-    case CONSENSUS_ROUND_EXCHANGE:
-      /* handle two peers specially */
-      if (session->num_peers <= 2)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx);
-        send_client_conclude_done (session);
-        return;
-      }
-      session->current_round = CONSENSUS_ROUND_INVENTORY;
-      start_inventory (session);
-      break;
-    case CONSENSUS_ROUND_INVENTORY:
-      session->current_round = CONSENSUS_ROUND_STOCK;
-      session->exp_round = 0;
-      subround_over (session, NULL);
-      break;
-    case CONSENSUS_ROUND_STOCK:
-      session->current_round = CONSENSUS_ROUND_FINISH;
-      send_client_conclude_done (session);
-      break;
-    default:
-      GNUNET_assert (0);
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
 }
 
 
 /**
 }
 
 
 /**
- * Called when a client performs the conclude operation.
+ * Called when a client performs an insert operation.
  *
  * @param cls (unused)
  * @param client client handle
  *
  * @param cls (unused)
  * @param client client handle
- * @param message message sent by the client
+ * @param m message sent by the client
  */
  */
-static void
-client_conclude (void *cls,
-                 struct GNUNET_SERVER_Client *client,
-                 const struct GNUNET_MessageHeader *message)
+void
+client_insert (void *cls,
+               struct GNUNET_SERVER_Client *client,
+               const struct GNUNET_MessageHeader *m)
 {
   struct ConsensusSession *session;
 {
   struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
+  struct GNUNET_CONSENSUS_ElementMessage *msg;
+  struct GNUNET_SET_Element *element;
+  ssize_t element_size;
 
 
-  cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
+  session = get_session_by_client (client);
 
 
-  session = sessions_head;
-  while ((session != NULL) && (session->client != client))
-    session = session->next;
   if (NULL == session)
   {
   if (NULL == session)
   {
-    /* client not found */
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
@@ -2544,117 +1137,74 @@ client_conclude (void *cls,
 
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
 
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
-    /* client requested conclude twice */
     GNUNET_break (0);
     GNUNET_break (0);
-    /* client may still own a session, destroy it */
-    disconnect_client (client);
+    GNUNET_SERVER_client_disconnect (client);
     return;
   }
 
     return;
   }
 
-  if (session->num_peers <= 1)
-  {
-    send_client_conclude_done (session);
-  }
-  else
+  msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
+  element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+  if (element_size < 0)
   {
   {
-    session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
-    /* the 'begin' round is over, start with the next, real round */
-    round_over (session, NULL);
+    GNUNET_break (0);
+    return;
   }
 
   }
 
+  element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
+  element->type = msg->element_type;
+  element->size = element_size;
+  memcpy (&element[1], &msg[1], element_size);
+  element->data = &element[1];
+  GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
+  GNUNET_free (element);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
-  client_send_next (session);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
 }
 
 
 /**
 }
 
 
 /**
- * Called when a client sends an ack
+ * Called when a client performs the conclude operation.
  *
  * @param cls (unused)
  * @param client client handle
  * @param message message sent by the client
  */
  *
  * @param cls (unused)
  * @param client client handle
  * @param message message sent by the client
  */
-void
-client_ack (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *message)
+static void
+client_conclude (void *cls,
+                 struct GNUNET_SERVER_Client *client,
+                 const struct GNUNET_MessageHeader *message)
 {
   struct ConsensusSession *session;
 {
   struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_AckMessage *msg;
-  struct PendingElement *pending;
-  struct GNUNET_CONSENSUS_Element *element;
-
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-      break;
-  }
 
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
+  session = get_session_by_client (client);
   if (NULL == session)
   {
   if (NULL == session)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
+    /* client not found */
+    GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-
-  pending = session->client_approval_head;
-
-  GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending);
-
-  msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
-
-  if (msg->keep)
+  if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
   {
-    element = pending->element;
-    insert_element (session, element);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
+    /* client requested conclude twice */
+    GNUNET_break (0);
+    return;
   }
   }
-
-  GNUNET_free (pending);
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Task that disconnects from core.
- *
- * @param cls core handle
- * @param tc context information (why was this task triggered now)
- */
-static void
-disconnect_core (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  if (core != NULL)
+  if (session->num_peers <= 1)
   {
   {
-    GNUNET_CORE_disconnect (core);
-    core = NULL;
+    session->current_round = CONSENSUS_ROUND_FINISH;
+    GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
   }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
-}
-
-
-static void
-core_startup (void *cls,
-              struct GNUNET_CORE_Handle *core,
-              const struct GNUNET_PeerIdentity *peer)
-{
-  struct ConsensusSession *session;
-
-  my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
-  /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
-  GNUNET_SCHEDULER_add_now (&disconnect_core, core);
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
-
-  session = sessions_head;
-  while (NULL != session)
+  else
   {
   {
-    if (NULL != session->join_msg)
-      initialize_session (session);
-    session = session->next;
+    /* the 'begin' round is over, start with the next, actual round */
+    round_over (session, NULL);
   }
   }
+
+  GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 }
 
 
@@ -2668,52 +1218,33 @@ static void
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  /* FIXME: complete; write separate destructors for different data types */
-
-  while (NULL != incoming_sockets_head)
-  {
-    struct IncomingSocket *socket;
-    socket = incoming_sockets_head;
-    if (NULL != socket->rh)
-    {
-      GNUNET_STREAM_read_cancel (socket->rh);
-      socket->rh = NULL;
-    } 
-    if (NULL == socket->cpi)
-    {
-      GNUNET_STREAM_close (socket->socket);
-      socket->socket = NULL;
-      if (NULL != socket->mst)
-      {
-        GNUNET_SERVER_mst_destroy (socket->mst);
-        socket->mst = NULL;
-      }
-    }
-    incoming_sockets_head = incoming_sockets_head->next;
-    GNUNET_free (socket);
-  }
-
   while (NULL != sessions_head)
   while (NULL != sessions_head)
-  {
-    struct ConsensusSession *session;
-    session = sessions_head->next;
     destroy_session (sessions_head);
     destroy_session (sessions_head);
-    sessions_head = session;
-  }
 
 
-  if (NULL != core)
-  {
-    GNUNET_CORE_disconnect (core);
-    core = NULL;
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
+}
 
 
-  if (NULL != listener)
-  {
-    GNUNET_STREAM_listen_close (listener);
-    listener = NULL;
-  } 
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+/**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+  struct ConsensusSession *session;
+
+  session = get_session_by_client (client);
+  if (NULL == session)
+    return;
+  if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
+      (CONSENSUS_ROUND_FINISH == session->current_round))
+    destroy_session (session);
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
 }
 
 
 }
 
 
@@ -2725,38 +1256,30 @@ shutdown_task (void *cls,
  * @param c configuration to use
  */
 static void
  * @param c configuration to use
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
 {
-  /* core is only used to retrieve the peer identity */
-  static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
-    {NULL, 0, 0}
-  };
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
-    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
-        sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
-    {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
-        sizeof (struct GNUNET_CONSENSUS_AckMessage)},
+        sizeof (struct GNUNET_MessageHeader)},
+    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
     {NULL, NULL, 0, 0}
   };
 
   cfg = c;
   srv = server;
     {NULL, NULL, 0, 0}
   };
 
   cfg = c;
   srv = server;
-
+  if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
   GNUNET_SERVER_add_handlers (server, server_handlers);
   GNUNET_SERVER_add_handlers (server, server_handlers);
-
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
-
-  listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                   listen_cb, NULL,
-                                   GNUNET_STREAM_OPTION_END);
-
-  /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
-  core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
-  GNUNET_assert (NULL != core);
-
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
+  GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
 }
 
 
 }
 
 
@@ -2772,7 +1295,7 @@ main (int argc, char *const *argv)
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 
   return (GNUNET_OK == ret) ? 0 : 1;
 }