- delete unused message type
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
index a7640c51f003f8e52c6cf3dc5f7b3f6556372744..ffd9786d33522098acf1f6efa80995b28e8a6a8a 100644 (file)
@@ -1,10 +1,10 @@
 /*
       This file is part of GNUnet
 /*
       This file is part of GNUnet
-      (C) 2012 Christian Grothoff (and other contributing authors)
+      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
+      by the Free Software Foundation; either version 3, or (at your
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
  */
 
 #include "platform.h"
  */
 
 #include "platform.h"
-#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
-#include "gnunet_util_lib.h"
+#include "gnunet_set_service.h"
 #include "gnunet_consensus_service.h"
 #include "gnunet_consensus_service.h"
-#include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
-
 #include "consensus_protocol.h"
 #include "consensus.h"
 #include "consensus_protocol.h"
 #include "consensus.h"
-#include "ibf.h"
-#include "strata_estimator.h"
 
 
 
 
-/*
+/**
  * Log macro that prefixes the local peer and the peer we are in contact with.
  */
 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
  * Log macro that prefixes the local peer and the peer we are in contact with.
  */
 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
 
 
 /**
 
 
 /**
- * Number of IBFs in a strata estimator.
- */
-#define SE_STRATA_COUNT 32
-/**
- * Size of the IBFs in the strata estimator.
- */
-#define SE_IBF_SIZE 80
-/**
- * hash num parameter for the difference digests and strata estimators
- */
-#define SE_IBF_HASH_NUM 3
-
-/**
- * Number of buckets that can be transmitted in one message.
- */
-#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
-
-/**
- * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
- * Choose this value so that computing the IBF is still cheaper
- * than transmitting all values.
- */
-#define MAX_IBF_ORDER (16)
-
-/**
- * Number of exponential rounds, used in the inventory and completion round.
+ * Number of exponential rounds, used in the exp and completion round.
  */
  */
-#define NUM_EXP_ROUNDS (4)
-
+#define NUM_EXP_ROUNDS 4
 
 /* forward declarations */
 
 /* mutual recursion with struct ConsensusSession */
 struct ConsensusPeerInformation;
 
 
 /* forward declarations */
 
 /* mutual recursion with struct ConsensusSession */
 struct ConsensusPeerInformation;
 
-struct MessageQueue;
-
 /* mutual recursion with round_over */
 static void
 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 /* mutual recursion with round_over */
 static void
 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
-/* mutial recursion with transmit_queued */
-static void
-client_send_next (struct MessageQueue *mq);
-
-/* mutual recursion with mst_session_callback */
-static void
-open_cb (void *cls, struct GNUNET_STREAM_Socket *socket);
-
-static int
-mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message);
-
-
-/**
- * Additional information about a consensus element.
- */
-struct ElementInfo
-{
-  /**
-   * The element itself.
-   */
-  struct GNUNET_CONSENSUS_Element *element;
-  /**
-   * Hash of the element
-   */
-  struct GNUNET_HashCode *element_hash;
-  /**
-   * Number of other peers that have the element in the inventory.
-   */
-  unsigned int inventory_count;
-  /**
-   * Bitmap of peers that have this element in their inventory
-   */
-  uint8_t *inventory_bitmap;
-};
-
 
 /**
  * Describes the current round a consensus session is in.
 
 /**
  * Describes the current round a consensus session is in.
@@ -138,7 +70,8 @@ enum ConsensusRound
    */
   CONSENSUS_ROUND_EXCHANGE,
   /**
    */
   CONSENSUS_ROUND_EXCHANGE,
   /**
-   * Exchange which elements each peer has, but not the elements.
+   * Exchange which elements each peer has, but don't
+   * transmit the element's data, only their SHA-512 hashes.
    * This round uses the all-to-all scheme.
    */
   CONSENSUS_ROUND_INVENTORY,
    * This round uses the all-to-all scheme.
    */
   CONSENSUS_ROUND_INVENTORY,
@@ -153,79 +86,27 @@ enum ConsensusRound
   CONSENSUS_ROUND_FINISH
 };
 
   CONSENSUS_ROUND_FINISH
 };
 
-/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */
 
 /**
 
 /**
- * State of another peer with respect to the
- * current ibf.
+ * Complete information about the current round and all
+ * subrounds.
  */
  */
-enum ConsensusIBFState {
+struct RoundInfo
+{
   /**
   /**
-   * There is nothing going on with the IBF.
+   * The current main round.
    */
    */
-  IBF_STATE_NONE=0,
+  enum ConsensusRound round;
   /**
   /**
-   * We currently receive an ibf.
+   * The current exp round, valid if
+   * the main round is an exp round.
    */
    */
-  IBF_STATE_RECEIVING,
-  /*
-   * we decode a received ibf
-  */
-  IBF_STATE_DECODING,
+  uint32_t exp_round;
   /**
   /**
-   *  wait for elements and element requests
+   * The current exp subround, valid if
+   * the main round is an exp round.
    */
    */
-  IBF_STATE_ANTICIPATE_DIFF
-};
-
-
-typedef void (*AddCallback) (struct MessageQueue *mq);
-typedef void (*MessageSentCallback) (void *cls);
-
-
-/**
- * Collection of the state necessary to read and write gnunet messages 
- * to a stream socket. Should be used as closure for stream_data_processor.
- */
-struct MessageStreamState
-{
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-  struct MessageQueue *mq;
-  void *mst_cls;
-  struct GNUNET_STREAM_Socket *socket;
-  struct GNUNET_STREAM_ReadHandle *rh;
-  struct GNUNET_STREAM_WriteHandle *wh;
-};
-
-
-struct ServerClientSocketState
-{
-  struct GNUNET_SERVER_Client *client;
-  struct GNUNET_SERVER_TransmitHandle* th;
-};
-
-
-/**
- * Generic message queue, for queueing outgoing messages.
- */
-struct MessageQueue
-{
-  void *state;
-  AddCallback add_cb;
-  struct PendingMessage *pending_head;
-  struct PendingMessage *pending_tail;
-  struct PendingMessage *current_pm;
-};
-
-
-struct PendingMessage
-{
-  struct GNUNET_MessageHeader *msg;
-  struct MessageQueue *parent_queue;
-  struct PendingMessage *next;
-  struct PendingMessage *prev;
-  MessageSentCallback sent_cb;
-  void *sent_cb_cls;
+  uint32_t exp_subround;
 };
 
 
 };
 
 
@@ -244,13 +125,6 @@ struct ConsensusSession
    */
   struct ConsensusSession *prev;
 
    */
   struct ConsensusSession *prev;
 
-  /**
-   * Join message. Used to initialize the session later,
-   * if the identity of the local peer is not yet known.
-   * NULL if the session has been fully initialized.
-   */
-  struct GNUNET_CONSENSUS_JoinMessage *join_msg;
-
   /**
   * Global consensus identification, computed
   * from the session id and participating authorities.
   /**
   * Global consensus identification, computed
   * from the session id and participating authorities.
@@ -258,45 +132,34 @@ struct ConsensusSession
   struct GNUNET_HashCode global_id;
 
   /**
   struct GNUNET_HashCode global_id;
 
   /**
-   * The server's client and associated local state
+   * Client that inhabits the session
    */
    */
-  struct ServerClientSocketState scss;
+  struct GNUNET_SERVER_Client *client;
 
   /**
    * Queued messages to the client.
    */
 
   /**
    * Queued messages to the client.
    */
-  struct MessageQueue *client_mq;
-
-  /**
-   * IBF_Key -> 2^(HashCode*)
-   * FIXME:
-   * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map;
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
 
   /**
-   * Maps HashCodes to ElementInfos
+   * Time when the conclusion of the consensus should begin.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap *values;
-
-  /**
-   * Currently active transmit handle for sending to the client
-   */
-  struct GNUNET_SERVER_TransmitHandle *client_th;
+  struct GNUNET_TIME_Absolute conclude_start;
 
   /**
    * Timeout for all rounds together, single rounds will schedule a timeout task
    * with a fraction of the conclude timeout.
 
   /**
    * Timeout for all rounds together, single rounds will schedule a timeout task
    * with a fraction of the conclude timeout.
+   * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
    */
    */
-  struct GNUNET_TIME_Relative conclude_timeout;
-  
+  struct GNUNET_TIME_Absolute conclude_deadline;
+
   /**
   /**
-   * Timeout task identifier for the current round
+   * Timeout task identifier for the current round.
    */
   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 
   /**
    */
   GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 
   /**
-   * Number of other peers in the consensus
+   * Number of other peers in the consensus.
    */
   unsigned int num_peers;
 
    */
   unsigned int num_peers;
 
@@ -306,26 +169,11 @@ struct ConsensusSession
    */
   struct ConsensusPeerInformation *info;
 
    */
   struct ConsensusPeerInformation *info;
 
-  /**
-   * GNUNET_YES if the client has called conclude.
-   * */
-  int conclude;
-
   /**
    * Index of the local peer in the peers array
    */
   unsigned int local_peer_idx;
 
   /**
    * Index of the local peer in the peers array
    */
   unsigned int local_peer_idx;
 
-  /**
-   * Strata estimator, computed online
-   */
-  struct StrataEstimator *se;
-
-  /**
-   * Pre-computed IBFs
-   */
-  struct InvertibleBloomFilter **ibfs;
-
   /**
    * Current round
    */
   /**
    * Current round
    */
@@ -333,81 +181,56 @@ struct ConsensusSession
 
   /**
    * Permutation of peers for the current round,
 
   /**
    * Permutation of peers for the current round,
-   * maps logical index (for current round) to physical index (location in info array)
-   */
-  int *shuffle;
-
-  int exp_round;
-
-  int exp_subround;
-
-  /**
-   * The partner for the current exp-round
    */
    */
-  struct ConsensusPeerInformation* partner_outgoing;
+  uint32_t *shuffle;
 
   /**
 
   /**
-   * The partner for the current exp-round
+   * Inverse permutation of peers for the current round,
    */
    */
-  struct ConsensusPeerInformation* partner_incoming;
-};
+  uint32_t *shuffle_inv;
 
 
-
-/**
- * Information about a peer that is in a consensus session.
- */
-struct ConsensusPeerInformation
-{
   /**
   /**
-   * Peer identitty of the peer in the consensus session
+   * Current round of the exponential scheme.
    */
    */
-  struct GNUNET_PeerIdentity peer_id;
+  uint32_t exp_round;
 
   /**
 
   /**
-   * Do we connect to the peer, or does the peer connect to us?
-   * Only valid for all-to-all phases
+   * Current sub-round of the exponential scheme.
    */
    */
-  int is_outgoing;
+  uint32_t exp_subround;
 
   /**
 
   /**
-   * Did we receive/send a consensus hello?
-   */
-  int hello;
-
-  /*
-   * FIXME
+   * The partner for the current exp-round
    */
    */
-  struct MessageStreamState mss;
+  struct ConsensusPeerInformation *partner_outgoing;
 
   /**
 
   /**
-   * Current state
+   * The partner for the current exp-round
    */
    */
-  enum ConsensusIBFState ibf_state;
+  struct ConsensusPeerInformation *partner_incoming;
 
   /**
 
   /**
-   * What is the order (=log2 size) of the ibf
-   * we're currently dealing with?
-   * Interpretation depends on ibf_state.
+   * The consensus set of this session.
    */
    */
-  int ibf_order;
+  struct GNUNET_SET_Handle *element_set;
 
   /**
 
   /**
-   * The current IBF for this peer,
-   * purpose dependent on ibf_state
+   * Listener for requests from other peers.
+   * Uses the session's global id as app id.
    */
    */
-  struct InvertibleBloomFilter *ibf;
+  struct GNUNET_SET_ListenHandle *set_listener;
+};
 
 
-  /**
-   * How many buckets have we transmitted/received? 
-   * Interpretatin depends on ibf_state
-   */
-  int ibf_bucket_counter;
 
 
+/**
+ * Information about a peer that is in a consensus session.
+ */
+struct ConsensusPeerInformation
+{
   /**
   /**
-   * Strata estimator of the peer, NULL if our peer
-   * initiated the reconciliation.
+   * Peer identitty of the peer in the consensus session
    */
    */
-  struct StrataEstimator *se;
+  struct GNUNET_PeerIdentity peer_id;
 
   /**
    * Back-reference to the consensus session,
 
   /**
    * Back-reference to the consensus session,
@@ -415,93 +238,28 @@ struct ConsensusPeerInformation
    */
   struct ConsensusSession *session;
 
    */
   struct ConsensusSession *session;
 
-  /**
-   * True if we are actually replaying the strata message,
-   * e.g. currently handling the premature_strata_message.
-   */
-  int replaying_strata_message;
-
-  /**
-   * A strata message that is not actually for the current round,
-   * used in the exp-scheme.
-   */
-  struct StrataMessage *premature_strata_message;
-
   /**
    * We have finishes the exp-subround with the peer.
    */
   int exp_subround_finished;
 
   /**
   /**
    * We have finishes the exp-subround with the peer.
    */
   int exp_subround_finished;
 
   /**
-   * GNUNET_YES if we synced inventory with this peer;
-   * GNUNET_NO otherwise.
-   */
-  int inventory_synced;
-
-  /**
-   * Round this peer seems to be in, according to the last SE we got.
-   * Necessary to store this, as we sometimes need to respond to a request from an
-   * older round, while we are already in the next round.
-   */
-  enum ConsensusRound apparent_round;
-};
-
-
-/**
- * Sockets from other peers who want to communicate with us.
- * It may not be known yet which consensus session they belong to, we have to wait for the
- * peer's hello.
- * Also, the session might not exist yet locally, we have to wait for a local client to connect.
- */
-struct IncomingSocket
-{
-  /**
-   * Incoming sockets are kept in a double linked list.
-   */
-  struct IncomingSocket *next;
-
-  /**
-   * Incoming sockets are kept in a double linked list.
-   */
-  struct IncomingSocket *prev;
-
-  /**
-   * Peer that connected to us with the socket.
+   * Set operation we are currently executing with this peer.
    */
    */
-  struct GNUNET_PeerIdentity peer_id;
+  struct GNUNET_SET_OperationHandle *set_op;
 
   /**
 
   /**
-   * Peer-in-session this socket belongs to, once known, otherwise NULL.
+   * Set operation we are planning on executing with this peer.
    */
    */
-  struct ConsensusPeerInformation *cpi;
+  struct GNUNET_SET_OperationHandle *delayed_set_op;
 
   /**
 
   /**
-   * Set to the global session id, if the peer sent us a hello-message,
-   * but the session does not exist yet.
-   */
-  struct GNUNET_HashCode *requested_gid;
-
-  /*
-   * Timeout, will disconnect the socket if not yet in a session.
-   * FIXME: implement
+   * Info about the round of the delayed set operation.
    */
    */
-  GNUNET_SCHEDULER_TaskIdentifier timeout;
-
-  /* FIXME */
-  struct MessageStreamState mss;
+  struct RoundInfo delayed_round_info;
 };
 
 
 };
 
 
-/**
- * Linked list of incoming sockets.
- */
-static struct IncomingSocket *incoming_sockets_head;
-
-/**
- * Linked list of incoming sockets.
- */
-static struct IncomingSocket *incoming_sockets_tail;
-
 /**
  * Linked list of sessions this peer participates in.
  */
 /**
  * Linked list of sessions this peer participates in.
  */
@@ -525,1364 +283,375 @@ static struct GNUNET_SERVER_Handle *srv;
 /**
  * Peer that runs this service.
  */
 /**
  * Peer that runs this service.
  */
-static struct GNUNET_PeerIdentity *my_peer;
-
-/**
- * Handle to the core service. Only used during service startup, will be NULL after that.
- */
-static struct GNUNET_CORE_Handle *core;
-
-/**
- * Listener for sockets from peers that want to reconcile with us.
- */
-static struct GNUNET_STREAM_ListenSocket *listener;
-
-
-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_queued (void *cls, size_t size,
-                 void *buf)
-{
-  struct MessageQueue *mq = cls;
-  struct PendingMessage *pm = mq->pending_head;
-  struct ServerClientSocketState *state = mq->state;
-  size_t msg_size;
-
-  GNUNET_assert (NULL != pm);
-  GNUNET_assert (NULL != buf);
-  msg_size = ntohs (pm->msg->size);
-  GNUNET_assert (size >= msg_size);
-  memcpy (buf, pm->msg, msg_size);
-  GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
-  state->th = NULL;
-  client_send_next (cls);
-  GNUNET_free (pm);
-  return msg_size;
-}
-
-
-static void
-client_send_next (struct MessageQueue *mq)
-{
-  struct ServerClientSocketState *state = mq->state;
-  int msize;
-
-  GNUNET_assert (NULL != state);
-
-  if ( (NULL != state->th) ||
-       (NULL == mq->pending_head) )
-    return;
-  msize = ntohs (mq->pending_head->msg->size);
-  state->th = 
-      GNUNET_SERVER_notify_transmit_ready (state->client, msize, 
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           &transmit_queued, mq);
-}
+static struct GNUNET_PeerIdentity my_peer;
 
 
 
 
-struct MessageQueue *
-create_message_queue_for_server_client (struct ServerClientSocketState *scss)
+static int
+have_exp_subround_finished (const struct ConsensusSession *session)
 {
 {
-  struct MessageQueue *mq;
-  mq = GNUNET_new (struct MessageQueue);
-  mq->add_cb = client_send_next;
-  mq->state = scss;
-  return mq;
+  int not_finished;
+  not_finished = 0;
+  if ( (NULL != session->partner_outgoing) &&
+       (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
+    not_finished++;
+  if ( (NULL != session->partner_incoming) &&
+       (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
+    not_finished++;
+  if (0 == not_finished)
+    return GNUNET_YES;
+  return GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
+ * Destroy a session, free all resources associated with it.
  *
  *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
+ * @param session the session to destroy
  */
  */
-static void 
-write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct MessageQueue *mq = cls;
-  struct MessageStreamState *mss = mq->state;
-  struct PendingMessage *pm;
-
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  
-  /* call cb for message we finished sending */
-  pm = mq->current_pm;
-  if (NULL != pm)
-  {
-    if (NULL != pm->sent_cb)
-      pm->sent_cb (pm->sent_cb_cls);
-    GNUNET_free (pm);
-  }
-
-  mss->wh = NULL;
-
-  pm = mq->pending_head;
-  mq->current_pm = pm;
-  if (NULL == pm)
-    return;
-  GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
-  mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size),
-                                 GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls);
-  GNUNET_assert (NULL != mss->wh);
-}
-
-
 static void
 static void
-stream_socket_add_cb (struct MessageQueue *mq)
-{
-  if (NULL != mq->current_pm)
-    return;
-  write_queued (mq, GNUNET_STREAM_OK, 0);
-}
-
-
-struct MessageQueue *
-create_message_queue_for_stream_socket (struct MessageStreamState *mss)
+destroy_session (struct ConsensusSession *session)
 {
 {
-  struct MessageQueue *mq;
-  mq = GNUNET_new (struct MessageQueue);
-  mq->state = mss;
-  mq->add_cb = stream_socket_add_cb;
-  return mq;
-}
-
+  int i;
 
 
-struct PendingMessage *
-new_pending_message (uint16_t size, uint16_t type)
-{
-  struct PendingMessage *pm;
-  pm = GNUNET_malloc (sizeof *pm + size);
-  pm->msg = (void *) &pm[1];
-  pm->msg->size = htons (size);
-  pm->msg->type = htons (type);
-  return pm;
+  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+  if (NULL != session->element_set)
+  {
+    GNUNET_SET_destroy (session->element_set);
+    session->element_set = NULL;
+  }
+  if (NULL != session->set_listener)
+  {
+    GNUNET_SET_listen_cancel (session->set_listener);
+    session->set_listener = NULL;
+  }
+  if (NULL != session->client_mq)
+  {
+    GNUNET_MQ_destroy (session->client_mq);
+    session->client_mq = NULL;
+  }
+  if (NULL != session->client)
+  {
+    GNUNET_SERVER_client_disconnect (session->client);
+    session->client = NULL;
+  }
+  if (NULL != session->shuffle)
+  {
+    GNUNET_free (session->shuffle);
+    session->shuffle = NULL;
+  }
+  if (NULL != session->shuffle_inv)
+  {
+    GNUNET_free (session->shuffle_inv);
+    session->shuffle_inv = NULL;
+  }
+  if (NULL != session->info)
+  {
+    for (i = 0; i < session->num_peers; i++)
+    {
+      struct ConsensusPeerInformation *cpi;
+      cpi = &session->info[i];
+      if (NULL != cpi->set_op)
+      {
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
+      }
+    }
+    GNUNET_free (session->info);
+    session->info = NULL;
+  }
+  GNUNET_free (session);
 }
 
 
 /**
 }
 
 
 /**
- * Queue a message in a message queue.
+ * Iterator for set elements.
  *
  *
- * @param queue the message queue
- * @param pending message, message with additional information
+ * @param cls closure
+ * @param element the current element, NULL if all elements have been
+ *        iterated over
+ * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
  */
  */
-void
-message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg)
+static int
+send_to_client_iter (void *cls,
+                     const struct GNUNET_SET_Element *element)
 {
 {
-  GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg);
-  queue->add_cb (queue);
-}
-
+  struct ConsensusSession *session = cls;
+  struct GNUNET_MQ_Envelope *ev;
 
 
-/**
- * Called when we receive data from a peer via stream.
- *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout 
- * @return number of bytes of processed from 'data' (any data remaining should be
- *         given to the next time the read processor is called).
- */
-static size_t
-stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size)
-{
-  struct MessageStreamState *mss = cls;
-  int ret;
+  if (NULL != element)
+  {
+    struct GNUNET_CONSENSUS_ElementMessage *m;
 
 
-  mss->rh = NULL;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
+                session->local_peer_idx);
 
 
-  if (GNUNET_STREAM_OK != status)
-  {
-    /* FIXME: handle this correctly */
-    GNUNET_break (0);
-    return 0;
+    ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
+    m->element_type = htons (element->type);
+    memcpy (&m[1], element->data, element->size);
+    GNUNET_MQ_send (session->client_mq, ev);
   }
   }
-  GNUNET_assert (NULL != mss->mst);
-  ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES);
-  if (GNUNET_SYSERR == ret)
+  else
   {
   {
-    /* FIXME: handle this correctly */
-    GNUNET_break (0);
-    return 0;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
+                session->local_peer_idx);
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
+    GNUNET_MQ_send (session->client_mq, ev);
   }
   }
-  /* read again */
-  mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss);
-  /* we always read all data */
-  return size;
+  return GNUNET_YES;
 }
 
 
 /**
 }
 
 
 /**
- * Send element or element report to the peer specified in cpi.
+ * Start the next round.
+ * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
  *
  *
- * @param cpi peer to send the elements to
- * @param head head of the element list
+ * @param cls the session
+ * @param tc task context, for when this task is invoked by the scheduler,
+ *           NULL if invoked for another reason
  */
 static void
  */
 static void
-send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e)
-{
-  struct PendingMessage *pm;
-
-  switch (cpi->apparent_round)
-  {
-    case CONSENSUS_ROUND_COMPLETION:
-    case CONSENSUS_ROUND_EXCHANGE:
-      pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size,
-                                GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
-      memcpy (&pm->msg[1], e->element->data, e->element->size);
-      message_queue_add (cpi->mss.mq, pm);
-      break;
-    case CONSENSUS_ROUND_INVENTORY:
-      pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode),
-                                GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
-      memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode));
-      message_queue_add (cpi->mss.mq, pm);
-      break;
-    default:
-      GNUNET_break (0);
-  }
-}
-
-
-/**
- * Iterator to insert values into an ibf.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-ibf_values_iterator (void *cls,
-                     const struct GNUNET_HashCode *key,
-                     void *value)
+round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
-  struct ConsensusPeerInformation *cpi = cls;
-  struct ElementInfo *e = value;
-  struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash);
-
-  GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
-  ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
-  return GNUNET_YES;
-}
+  struct ConsensusSession *session;
 
 
-/**
- * Create and populate an IBF for the specified peer,
- * if it does not already exist.
- *
- * @param cpi peer to create the ibf for
- */
-static void
-prepare_ibf (struct ConsensusPeerInformation *cpi)
-{
-  if (NULL != cpi->session->ibfs[cpi->ibf_order])
+  /* don't kick off next round if we're shutting down */
+  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
     return;
-  cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
-  GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
-}
-
 
 
-/**
- * Called when a remote peer wants to inform the local peer
- * that the remote peer misses elements.
- * Elements are not reconciled.
- *
- * @param cpi session
- * @param msg message
- */
-static int
-handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  GNUNET_assert (0);
-}
-
-
-static int
-exp_subround_finished (const struct ConsensusSession *session)
-{
-  int not_finished;
-  not_finished = 0;
-  if ( (NULL != session->partner_outgoing) && 
-       (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
-    not_finished++;
-  if ( (NULL != session->partner_incoming) &&
-       (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
-    not_finished++;
-  if (0 == not_finished)
-    return GNUNET_YES;
-  return GNUNET_NO;
-}
-
-
-static int
-inventory_round_finished (struct ConsensusSession *session)
-{
-  int i;
-  int finished;
-  finished = 0;
-  for (i = 0; i < session->num_peers; i++)
-    if (GNUNET_YES == session->info[i].inventory_synced)
-      finished++;
-  if (finished >= (session->num_peers / 2))
-    return GNUNET_YES;
-  return GNUNET_NO;
-}
-
-
-static void
-clear_message_stream_state (struct MessageStreamState *mss)
-{
-  if (NULL != mss->mst)
-  {
-    GNUNET_SERVER_mst_destroy (mss->mst);
-    mss->mst = NULL;
-  }
-  if (NULL != mss->rh)
-  {
-    GNUNET_STREAM_read_cancel (mss->rh);
-    mss->rh = NULL;
-  } 
-  if (NULL != mss->wh)
-  {
-    GNUNET_STREAM_write_cancel (mss->wh);
-    mss->wh = NULL;
-  } 
-  if (NULL != mss->socket)
-  {
-    GNUNET_STREAM_close (mss->socket);
-    mss->socket = NULL;
-  }
-  if (NULL != mss->mq)
-  {
-    GNUNET_free (mss->mq);
-    mss->mq = NULL;
-  }
-}
-
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-destroy_element_info_iter (void *cls,
-                           const struct GNUNET_HashCode * key,
-                           void *value)
-{
-  struct ElementInfo *ei = value;
-  GNUNET_free (ei->element);
-  GNUNET_free (ei->element_hash);
-  GNUNET_free (ei);
-  return GNUNET_YES;
-}
-
-
-/**
- * Destroy a session, free all resources associated with it.
- * 
- * @param session the session to destroy
- */
-static void
-destroy_session (struct ConsensusSession *session)
-{
-  int i;
-
-  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
-  GNUNET_SERVER_client_drop (session->scss.client);
-  session->scss.client = NULL;
-  if (NULL != session->client_mq)
-  {
-    GNUNET_free (session->client_mq);
-    session->client_mq = NULL;
-  }
-  if (NULL != session->shuffle)
-  {
-    GNUNET_free (session->shuffle);
-    session->shuffle = NULL;
-  }
-  if (NULL != session->se)
-  {
-    strata_estimator_destroy (session->se);
-    session->se = NULL;
-  }
-  if (NULL != session->info)
-  {
-    for (i = 0; i < session->num_peers; i++)
-    {
-      struct ConsensusPeerInformation *cpi;
-      cpi = &session->info[i];
-      clear_message_stream_state (&cpi->mss);
-      if (NULL != cpi->se)
-      {
-        strata_estimator_destroy (cpi->se);
-        cpi->se = NULL;
-      }
-      if (NULL != cpi->ibf)
-      {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
-      }
-    }
-    GNUNET_free (session->info);
-    session->info = NULL;
-  }
-  if (NULL != session->ibfs)
-  {
-    for (i = 0; i <= MAX_IBF_ORDER; i++)
-    {
-      if (NULL != session->ibfs[i])
-      {
-        ibf_destroy (session->ibfs[i]);
-        session->ibfs[i] = NULL;
-      }
-    }
-    GNUNET_free (session->ibfs);
-    session->ibfs = NULL;
-  }
-  if (NULL != session->values)
-  {
-    GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL);
-    GNUNET_CONTAINER_multihashmap_destroy (session->values);
-    session->values = NULL;
-  }
+  session = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
 
 
-  if (NULL != session->ibf_key_map)
+  if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
   {
   {
-    GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map);
-    session->ibf_key_map = NULL;
+    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
+    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
   }
   }
-  GNUNET_free (session);
-}
-
 
 
-static void
-send_client_conclude_done (struct ConsensusSession *session)
-{
-  struct PendingMessage *pm;
-
-  /* check if client is even there anymore */
-  if (NULL == session->scss.client)
-    return;
-  pm = new_pending_message (sizeof (struct GNUNET_MessageHeader),
-                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
-  message_queue_add (session->client_mq, pm);
-}
-
-
-/**
- * Check if a strata message is for the current round or not
- *
- * @param session session we are in
- * @param strata_msg the strata message to check
- * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise
- */
-static int
-is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
-{
-  switch (strata_msg->round)
+  switch (session->current_round)
   {
   {
-    case CONSENSUS_ROUND_COMPLETION:
+    case CONSENSUS_ROUND_BEGIN:
+      session->current_round = CONSENSUS_ROUND_EXCHANGE;
+      session->exp_round = 0;
+      subround_over (session, NULL);
+      break;
     case CONSENSUS_ROUND_EXCHANGE:
     case CONSENSUS_ROUND_EXCHANGE:
-      /* here, we also have to compare subrounds */
-      if ( (strata_msg->round != session->current_round) ||
-           (strata_msg->exp_round != session->exp_round) ||
-           (strata_msg->exp_subround != session->exp_subround) )
-        return GNUNET_YES;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
+                  session->local_peer_idx);
+      session->current_round = CONSENSUS_ROUND_FINISH;
+      GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
       break;
     default:
       break;
     default:
-      if (session->current_round != strata_msg->round)
-        return GNUNET_YES;
-    break;
+      GNUNET_assert (0);
   }
   }
-  return GNUNET_NO;
-}
-
-
-/**
- * Send a strata estimator.
- *
- * @param cpi the peer
- */
-static void
-send_strata_estimator (struct ConsensusPeerInformation *cpi)
-{
-  struct PendingMessage *pm;
-  struct StrataMessage *strata_msg;
-
-  /* FIXME: why is this correct? */
-  cpi->apparent_round = cpi->session->current_round;
-  cpi->ibf_state = IBF_STATE_NONE;
-  cpi->ibf_bucket_counter = 0;
-
-  LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round);
-
-  pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE),
-                            GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
-  strata_msg = (struct StrataMessage *) pm->msg;
-  strata_msg->round = cpi->session->current_round;
-  strata_msg->exp_round = cpi->session->exp_round;
-  strata_msg->exp_subround = cpi->session->exp_subround;
-  strata_estimator_write (cpi->session->se, &strata_msg[1]);
-  message_queue_add (cpi->mss.mq, pm);
 }
 
 
 /**
 }
 
 
 /**
- * Send an IBF of the order specified in cpi.
+ * Create a new permutation for the session's peers in session->shuffle.
+ * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
+ * both the global session id and the current round index.
  *
  *
- * @param cpi the peer
+ * @param session the session to create the new permutation for
  */
 static void
  */
 static void
-send_ibf (struct ConsensusPeerInformation *cpi)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
-              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-
-  cpi->ibf_bucket_counter = 0;
-  while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
-  {
-    unsigned int num_buckets;
-    struct PendingMessage *pm;
-    struct DifferenceDigest *digest;
-
-    num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
-    /* limit to maximum */
-    if (num_buckets > BUCKETS_PER_MESSAGE)
-      num_buckets = BUCKETS_PER_MESSAGE;
-
-    pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE),
-                              GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
-    digest = (struct DifferenceDigest *) pm->msg;
-    digest->order = cpi->ibf_order;
-    digest->round = cpi->apparent_round;
-    ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]);
-    cpi->ibf_bucket_counter += num_buckets;
-    message_queue_add (cpi->mss.mq, pm);
-  }
-  cpi->ibf_bucket_counter = 0;
-  cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
-}
-
-
-/**
- * Called when a peer sends us its strata estimator.
- * In response, we sent out IBF of appropriate size back.
- *
- * @param cpi session
- * @param strata_msg message
- */
-static int
-handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
+shuffle (struct ConsensusSession *session)
 {
 {
-  unsigned int diff;
-
-  if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) &&
-       (strata_msg->round == CONSENSUS_ROUND_INVENTORY) )
-  {
-    /* we still have to handle this request appropriately */
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
-                cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-  }
-  else if (is_premature_strata_message (cpi->session, strata_msg))
-  {
-    if (GNUNET_NO == cpi->replaying_strata_message)
-    {
-      LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n",
-              strata_msg->exp_round, strata_msg->exp_subround);
-      cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header);
-    }
-    return GNUNET_YES;
-  }
-
-  if (NULL == cpi->se)
-    cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
+  uint32_t i;
+  uint32_t randomness[session->num_peers-1];
 
 
-  cpi->apparent_round = strata_msg->round;
+  if (NULL == session->shuffle)
+    session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
+  if (NULL == session->shuffle_inv)
+    session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
 
 
-  if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
-  {
-    LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n");
-    return GNUNET_NO;
-  }
-  strata_estimator_read (&strata_msg[1], cpi->se);
-  GNUNET_assert (NULL != cpi->session->se);
-  diff = strata_estimator_difference (cpi->session->se, cpi->se);
+  GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
+                    &session->exp_round, sizeof (uint32_t),
+                     &session->global_id, sizeof (struct GNUNET_HashCode),
+                    NULL);
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
-              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle[i] = i;
 
 
-  switch (cpi->session->current_round)
+  for (i = session->num_peers - 1; i > 0; i--)
   {
   {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_INVENTORY:
-    case CONSENSUS_ROUND_COMPLETION:
-      /* send IBF of the right size */
-      cpi->ibf_order = 0;
-      while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) )
-        cpi->ibf_order++;
-      if (cpi->ibf_order > MAX_IBF_ORDER)
-        cpi->ibf_order = MAX_IBF_ORDER;
-      cpi->ibf_order += 1;
-      /* create ibf if not already pre-computed */
-      prepare_ibf (cpi);
-      if (NULL != cpi->ibf)
-        ibf_destroy (cpi->ibf);
-      cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-      cpi->ibf_bucket_counter = 0;
-      send_ibf (cpi);
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
-                  cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      break;
+    uint32_t x;
+    uint32_t tmp;
+    x = randomness[i-1] % session->num_peers;
+    tmp = session->shuffle[x];
+    session->shuffle[x] = session->shuffle[i];
+    session->shuffle[i] = tmp;
   }
   }
-  return GNUNET_YES;
-}
-
-
 
 
-static int
-send_elements_iterator (void *cls,
-                        const struct GNUNET_HashCode * key,
-                        void *value)
-{
-  struct ConsensusPeerInformation *cpi = cls;
-  struct ElementInfo *ei;
-  ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value);
-  if (NULL == ei)
-  {
-    LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n",
-            GNUNET_h2s((struct GNUNET_HashCode *) value));
-    return GNUNET_YES;
-  }
-  LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n");
-  send_element_or_report (cpi, ei);
-  return GNUNET_YES;
+  /* create the inverse */
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle_inv[session->shuffle[i]] = i;
 }
 
 
 /**
 }
 
 
 /**
- * Decode the current diff ibf, and send elements/requests/reports/
+ * Find and set the partner_incoming and partner_outgoing of our peer,
+ * one of them may not exist (and thus set to NULL) if the number of peers
+ * in the session is not a power of two.
  *
  *
- * @param cpi partner peer
+ * @param session the consensus session
  */
 static void
  */
 static void
-decode (struct ConsensusPeerInformation *cpi)
+find_partners (struct ConsensusSession *session)
 {
 {
-  struct IBF_Key key;
-  int side;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-
-  while (1)
-  {
-    int res;
-
-    res = ibf_decode (cpi->ibf, &side, &key);
-    if (GNUNET_SYSERR == res)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
-      /* decoding failed, we tell the other peer by sending our ibf with a larger order */
-      cpi->ibf_order++;
-      prepare_ibf (cpi);
-      cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-      cpi->ibf_bucket_counter = 0;
-      send_ibf (cpi);
-      return;
-    }
-    if (GNUNET_NO == res)
-    {
-      struct PendingMessage *pm;
-      struct ConsensusRoundMessage *rmsg;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
-
-      pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
-      rmsg = (struct ConsensusRoundMessage *) pm->msg;
-      rmsg->round = cpi->apparent_round;
-      message_queue_add (cpi->mss.mq, pm);
-      return;
-    }
-    if (-1 == side)
-    {
-      struct GNUNET_HashCode hashcode;
-      /* we have the element(s), send it to the other peer */
-      ibf_hashcode_from_key (key, &hashcode);
-      GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
-    }
-    else
+  unsigned int arc;
+  unsigned int num_ghosts;
+  unsigned int largest_arc;
+  int partner_idx;
+
+  /* shuffled local index */
+  int my_idx = session->shuffle[session->local_peer_idx];
+
+  /* distance to neighboring peer in current subround */
+  arc = 1 << session->exp_subround;
+  largest_arc = 1;
+  while (largest_arc < session->num_peers)
+    largest_arc <<= 1;
+  num_ghosts = largest_arc - session->num_peers;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
+
+  if (0 == (my_idx & arc))
+  {
+    /* we are outgoing */
+    partner_idx = (my_idx + arc) % session->num_peers;
+    session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
+    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+    /* are we a 'ghost' of a peer that would exist if
+     * the number of peers was a power of two, and thus have to partner
+     * with an additional peer?
+     */
+    if (my_idx < num_ghosts)
     {
     {
-      struct PendingMessage *pm;
-      uint16_t type;
-
-      switch (cpi->apparent_round)
+      int ghost_partner_idx;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
+      ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+      /* platform dependent; modulo sometimes returns negative values */
+      if (ghost_partner_idx < 0)
+        ghost_partner_idx += session->num_peers;
+      /* we only need to have a ghost partner if the partner is outgoing */
+      if (0 == (ghost_partner_idx & arc))
       {
       {
-        case CONSENSUS_ROUND_COMPLETION:
-          /* FIXME: check if we really want to request the element */
-        case CONSENSUS_ROUND_EXCHANGE:
-        case CONSENSUS_ROUND_INVENTORY:
-          type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST;
-          break;
-        default:
-          GNUNET_assert (0);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
+        session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
+        session->partner_incoming->exp_subround_finished = GNUNET_NO;
+        return;
       }
       }
-      pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key),
-                                type);
-      *(struct IBF_Key *) &pm->msg[1] = key;
-      message_queue_add (cpi->mss.mq, pm);
     }
     }
-  }
-}
-
-
-static int
-handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
-{
-  int num_buckets;
-
-  /* FIXME: find out if we're still expecting the same ibf! */
-
-  cpi->apparent_round = cpi->session->current_round;
-  // FIXME: check header.size >= sizeof (DD)
-  num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
-  switch (cpi->ibf_state)
-  {
-    case IBF_STATE_NONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      cpi->ibf_state = IBF_STATE_RECEIVING;
-      cpi->ibf_order = digest->order;
-      cpi->ibf_bucket_counter = 0;
-      if (NULL != cpi->ibf)
-      {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
-      }
-      break;
-    case IBF_STATE_ANTICIPATE_DIFF:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
-                  cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      cpi->ibf_state = IBF_STATE_RECEIVING;
-      cpi->ibf_order = digest->order;
-      cpi->ibf_bucket_counter = 0;
-      if (NULL != cpi->ibf)
-      {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
-      }
-      break;
-    case IBF_STATE_RECEIVING:
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      return GNUNET_YES;
-  }
-
-  if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-    return GNUNET_YES;
-  }
-
-  if (NULL == cpi->ibf)
-    cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
-
-  ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
-  cpi->ibf_bucket_counter += num_buckets;
-
-  if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
-  {
-    cpi->ibf_state = IBF_STATE_DECODING;
-    cpi->ibf_bucket_counter = 0;
-    prepare_ibf (cpi);
-    ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
-    decode (cpi);
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * Insert an element into the consensus set of the specified session.
- * The element will not be copied, and freed when destroying the session.
- *
- * @param session session for new element
- * @param element element to insert
- */
-static void
-insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
-{
-  struct GNUNET_HashCode hash;
-  struct ElementInfo *e;
-  struct IBF_Key ibf_key;
-  int i;
-
-  e = GNUNET_new (struct ElementInfo);
-  e->element = element;
-  e->element_hash = GNUNET_new (struct GNUNET_HashCode);
-  GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash);
-  ibf_key = ibf_key_from_hashcode (e->element_hash);
-  ibf_hashcode_from_key (ibf_key, &hash);
-  strata_estimator_insert (session->se, &hash);
-  GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  for (i = 0; i <= MAX_IBF_ORDER; i++)
-  {
-    if (NULL == session->ibfs[i])
-      continue;
-    ibf_insert (session->ibfs[i], ibf_key);
-  }
-}
-
-
-/**
- * Handle an element that another peer sent us
- */
-static int
-handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
-{
-  struct GNUNET_CONSENSUS_Element *element;
-  size_t size;
-
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_COMPLETION:
-      /* FIXME: check if we really expect the element */
-    case CONSENSUS_ROUND_EXCHANGE:
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
-      return GNUNET_YES;
-  }
-
-  size = ntohs (element_msg->size) - sizeof *element_msg;
-
-  element = GNUNET_malloc (size + sizeof *element);
-  element->size = size;
-  memcpy (&element[1], &element_msg[1], size);
-  element->data = &element[1];
-
-  LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n");
-
-  insert_element (cpi->session, element);
-
-  return GNUNET_YES;
-}
-
-
-/**
- * Handle a request for elements.
- * 
- * @param cpi peer that is requesting the element
- * @param msg the element request message
- */
-static int
-handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
-{
-  struct GNUNET_HashCode hashcode;
-  struct IBF_Key *ibf_key;
-  unsigned int num;
-
-  /* element requests are allowed in every round */
-
-  num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
-  
-  ibf_key = (struct IBF_Key *) &msg[1];
-  while (num--)
-  {
-    ibf_hashcode_from_key (*ibf_key, &hashcode);
-    GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
-    ibf_key++;
-  }
-  return GNUNET_YES;
-}
-
-static int
-is_peer_connected (struct ConsensusPeerInformation *cpi)
-{
-  if (NULL == cpi->mss.socket)
-    return GNUNET_NO;
-  return GNUNET_YES;
-}
-
-
-static void
-ensure_peer_connected (struct ConsensusPeerInformation *cpi)
-{
-  if (NULL != cpi->mss.socket)
-    return;
-  cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                        open_cb, cpi, GNUNET_STREAM_OPTION_END);
-}
-
-
-/**
- * If necessary, send a message to the peer, depending on the current
- * round.
- */
-static void
-embrace_peer (struct ConsensusPeerInformation *cpi)
-{
-  if (GNUNET_NO == is_peer_connected (cpi))
-  {
-    ensure_peer_connected (cpi);
-    return;
-  }
-  if (GNUNET_NO == cpi->hello)
+    session->partner_incoming = NULL;
     return;
     return;
-  /* FIXME: correctness of switch */
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_INVENTORY:
-      if (cpi->session->partner_outgoing != cpi)
-        break;
-      /* fallthrough */
-    case CONSENSUS_ROUND_COMPLETION:
-        send_strata_estimator (cpi);
-    default:
-      break;
   }
   }
+  /* we only have an incoming connection */
+  partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+  if (partner_idx < 0)
+    partner_idx += session->num_peers;
+  session->partner_outgoing = NULL;
+  session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
+  session->partner_incoming->exp_subround_finished = GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * Called when stream has finishes writing the hello message
- */
-static void
-hello_cont (void *cls)
-{
-  struct ConsensusPeerInformation *cpi = cls;
-
-  cpi->hello = GNUNET_YES;
-  embrace_peer (cpi);
-}
-
-
-/**
- * Called when we established a stream connection to another peer
+ * Callback for set operation results. Called for each element
+ * in the result set.
  *
  *
- * @param cls cpi of the peer we just connected to
- * @param socket socket to use to communicate with the other side (read/write)
+ * @param cls closure
+ * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
+ * @param status see enum GNUNET_SET_Status
  */
 static void
  */
 static void
-open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
+set_result_cb (void *cls,
+               const struct GNUNET_SET_Element *element,
+               enum GNUNET_SET_Status status)
 {
   struct ConsensusPeerInformation *cpi = cls;
 {
   struct ConsensusPeerInformation *cpi = cls;
-  struct PendingMessage *pm;
-  struct ConsensusHello *hello;
-
-  GNUNET_assert (NULL == cpi->mss.mst);
-  GNUNET_assert (NULL == cpi->mss.mq);
-
-  cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
-  cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
-  cpi->mss.mst_cls = cpi;
-
-  pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
-  hello = (struct ConsensusHello *) pm->msg;
-  memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
-  pm->sent_cb = hello_cont;
-  pm->sent_cb_cls = cpi;
-  message_queue_add (cpi->mss.mq, pm);
-  cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                &stream_data_processor, &cpi->mss);
-}
-
-
-static void
-replay_premature_message (struct ConsensusPeerInformation *cpi)
-{
-  if (NULL != cpi->premature_strata_message)
-  {
-    struct StrataMessage *sm;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
-    sm = cpi->premature_strata_message;
-    cpi->premature_strata_message = NULL;
-
-    cpi->replaying_strata_message = GNUNET_YES;
-    handle_p2p_strata (cpi, sm);
-    cpi->replaying_strata_message = GNUNET_NO;
-
-    GNUNET_free (sm);
-  }
-}
-
-
-/**
- * Start the inventory round, contact all peers we are supposed to contact.
- *
- * @param session the current session
- */
-static void
-start_inventory (struct ConsensusSession *session)
-{
-  int i;
-  int last;
-
-  for (i = 0; i < session->num_peers; i++)
-  {
-    session->info[i].ibf_bucket_counter = 0;
-    session->info[i].ibf_state = IBF_STATE_NONE;
-    session->info[i].is_outgoing = GNUNET_NO;
-  }
-
-  last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
-  i = (session->local_peer_idx + 1) % session->num_peers;
-  while (i != last)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
-    session->info[i].is_outgoing = GNUNET_YES;
-    embrace_peer (&session->info[i]);
-    i = (i + 1) % session->num_peers;
-  }
-  // tie-breaker for even number of peers
-  if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
-    session->info[last].is_outgoing = GNUNET_YES;
-    embrace_peer (&session->info[last]);
-  }
-
-  for (i = 0; i < session->num_peers; i++)
-  {
-    if (GNUNET_NO == session->info[i].is_outgoing)
-      replay_premature_message (&session->info[i]);
-  }
-}
-
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-send_client_elements_iter (void *cls,
-                           const struct GNUNET_HashCode * key,
-                           void *value)
-{
-  struct ConsensusSession *session = cls;
-  struct ElementInfo *ei = value;
-  struct PendingMessage *pm;
-
-  /* is the client still there? */
-  if (NULL == session->scss.client)
-    return GNUNET_NO;
-
-  pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size,
-                            GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
-  message_queue_add (session->client_mq, pm);
-  return GNUNET_YES;
-}
+  unsigned int remote_idx = cpi - cpi->session->info;
+  unsigned int local_idx = cpi->session->local_peer_idx;
 
 
+  GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
+                 (cpi == cpi->session->partner_incoming));
 
 
-
-/**
- * Start the next round.
- * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
- *
- * @param cls the session
- * @param tc task context, for when this task is invoked by the scheduler,
- *           NULL if invoked for another reason
- */
-static void 
-round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct ConsensusSession *session;
-
-  /* don't kick off next round if we're shutting down */
-  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
-
-  session = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
-
-  if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
-  {
-    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
-    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
-  }
-
-  switch (session->current_round)
+  switch (status)
   {
   {
-    case CONSENSUS_ROUND_BEGIN:
-      session->current_round = CONSENSUS_ROUND_EXCHANGE;
-      session->exp_round = 0;
-      subround_over (session, NULL);
-      break;
-    case CONSENSUS_ROUND_EXCHANGE:
-      /* handle two peers specially */
-      if (session->num_peers <= 2)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
-        GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
-        send_client_conclude_done (session);
-        session->current_round = CONSENSUS_ROUND_FINISH;
-        return;
-      }
-      session->current_round = CONSENSUS_ROUND_INVENTORY;
-      start_inventory (session);
-      break;
-    case CONSENSUS_ROUND_INVENTORY:
-      session->current_round = CONSENSUS_ROUND_COMPLETION;
-      session->exp_round = 0;
-      subround_over (session, NULL);
-      break;
-    case CONSENSUS_ROUND_COMPLETION:
-      session->current_round = CONSENSUS_ROUND_FINISH;
-      send_client_conclude_done (session);
+    case GNUNET_SET_STATUS_OK:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
+                  local_idx, remote_idx);
       break;
       break;
-    default:
-      GNUNET_assert (0);
-  }
-}
-
-
-static void
-fin_sent_cb (void *cls)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_COMPLETION:
-      if (cpi->session->current_round != cpi->apparent_round)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
-        break;
-      }
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
+                  local_idx, remote_idx);
+      cpi->set_op = NULL;
+      return;
+    case GNUNET_SET_STATUS_HALF_DONE:
+    case GNUNET_SET_STATUS_DONE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
+                  local_idx, remote_idx);
       cpi->exp_subround_finished = GNUNET_YES;
       cpi->exp_subround_finished = GNUNET_YES;
-      /* the subround is only really over if *both* partners are done */
-      if (GNUNET_YES == exp_subround_finished (cpi->session))
+      cpi->set_op = NULL;
+      if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
+                    local_idx);
         subround_over (cpi->session, NULL);
         subround_over (cpi->session, NULL);
+      }
       else
       else
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
-      break;
-    case CONSENSUS_ROUND_INVENTORY:
-      cpi->inventory_synced = GNUNET_YES;
-      if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
-        round_over (cpi->session, NULL);
-      /* FIXME: maybe go to next round */
-      break;
-    default:
-      GNUNET_break (0);
-  }
-}
-
-
-/**
- * The other peer wants us to inform that he sent us all the elements we requested.
- */
-static int
-handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  struct ConsensusRoundMessage *round_msg;
-  round_msg = (struct ConsensusRoundMessage *) msg;
-  /* FIXME: only call subround_over if round is the current one! */
-  switch (cpi->session->current_round)
-  {
-    case CONSENSUS_ROUND_EXCHANGE:
-    case CONSENSUS_ROUND_COMPLETION:
-      if (cpi->session->current_round != round_msg->round)
       {
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-        cpi->ibf_state = IBF_STATE_NONE;
-        cpi->ibf_bucket_counter = 0;
-        break;
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
+                    local_idx);
       }
       }
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      cpi->exp_subround_finished = GNUNET_YES;
-      if (GNUNET_YES == exp_subround_finished (cpi->session))
-        subround_over (cpi->session, NULL);
-      else
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
-    break;
-    case CONSENSUS_ROUND_INVENTORY:
-      cpi->inventory_synced = GNUNET_YES;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      if (inventory_round_finished (cpi->session))
-        round_over (cpi->session, NULL);
-      break;
+      return;
     default:
     default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
-      break;
+      GNUNET_break (0);
+      return;
   }
   }
-  return GNUNET_YES;
-}
-
-
-/**
- * Gets called when the other peer wants us to inform that
- * it has decoded our ibf and sent us all elements / requests
- */
-static int
-handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  struct PendingMessage *pm;
-  struct ConsensusRoundMessage *fin_msg;
 
 
-  /* FIXME: why handle current round?? */
   switch (cpi->session->current_round)
   {
   switch (cpi->session->current_round)
   {
-    case CONSENSUS_ROUND_INVENTORY:
-      cpi->inventory_synced = GNUNET_YES;
-    case CONSENSUS_ROUND_COMPLETION:
     case CONSENSUS_ROUND_EXCHANGE:
     case CONSENSUS_ROUND_EXCHANGE:
-      LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n");
-      pm = new_pending_message (sizeof *fin_msg,
-                                GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
-      fin_msg = (struct ConsensusRoundMessage *) pm->msg;
-      fin_msg->round = cpi->apparent_round;
-      /* the subround is over once we kicked off sending the fin msg */
-      /* FIXME: assert we are talking to the right peer! */
-      /* FIXME: mark peer as synced */
-      pm->sent_cb = fin_sent_cb;
-      pm->sent_cb_cls = cpi;
-      message_queue_add (cpi->mss.mq, pm);
+      GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
       break;
     default:
       break;
     default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
-      break;
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete message is received by the tokenizer.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
-{
-  struct ConsensusPeerInformation *cpi = cls;
-  GNUNET_assert (NULL == client);
-  GNUNET_assert (NULL != cls);
-  switch (ntohs (message->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
-      return handle_p2p_strata (cpi, (struct StrataMessage *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
-      return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
-      return handle_p2p_element (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
-      return handle_p2p_element_report (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
-      return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
-      return handle_p2p_synced (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
-      return handle_p2p_fin (cpi, message);
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
-                  ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
-  }
-  return GNUNET_OK;
-}
-
-
-static void
-shuffle (struct ConsensusSession *session)
-{
-  /* adapted from random_permute in util/crypto_random.c */
-  /* FIXME
-  unsigned int *ret;
-  unsigned int i;
-  unsigned int tmp;
-  uint32_t x;
-
-  GNUNET_assert (n > 0);
-  ret = GNUNET_malloc (n * sizeof (unsigned int));
-  for (i = 0; i < n; i++)
-    ret[i] = i;
-  for (i = n - 1; i > 0; i--)
-  {
-    x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
-    tmp = ret[x];
-    ret[x] = ret[i];
-    ret[i] = tmp;
+      GNUNET_break (0);
+      return;
   }
   }
-  */
 }
 
 
 /**
 }
 
 
 /**
- * Find and set the partner_incoming and partner_outgoing of our peer,
- * one of them may not exist in most cases.
+ * Compare the round the session is in with the round of the given context message.
  *
  *
- * @param session the consensus session
+ * @param session a consensus session
+ * @param ri a round context message
+ * @return 0 if it's the same round, -1 if the session is in an earlier round,
+ *         1 if the session is in a later round
  */
  */
-static void
-find_partners (struct ConsensusSession *session)
-{
-  int mark[session->num_peers];
-  int i;
-  memset (mark, 0, session->num_peers * sizeof (int));
-  session->partner_incoming = session->partner_outgoing = NULL;
-  for (i = 0; i < session->num_peers; i++)
-  {
-    int arc;
-    if (0 != mark[i])
-      continue;
-    arc = (i + (1 << session->exp_subround)) % session->num_peers;
-    mark[i] = mark[arc] = 1;
-    GNUNET_assert (i != arc);
-    if (i == session->local_peer_idx)
-    {
-      GNUNET_assert (NULL == session->partner_outgoing);
-      session->partner_outgoing = &session->info[session->shuffle[arc]];
-      session->partner_outgoing->exp_subround_finished = GNUNET_NO;
-    }
-    if (arc == session->local_peer_idx)
-    {
-      GNUNET_assert (NULL == session->partner_incoming);
-      session->partner_incoming = &session->info[session->shuffle[i]];
-      session->partner_incoming->exp_subround_finished = GNUNET_NO;
-    }
+static int
+rounds_compare (struct ConsensusSession *session,
+                struct RoundInfo* ri)
+{
+  if (session->current_round < ri->round)
+    return -1;
+  if (session->current_round > ri->round)
+    return 1;
+  if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
+  {
+    if (session->exp_round < ri->exp_round)
+      return -1;
+    if (session->exp_round > ri->exp_round)
+      return 1;
+    if (session->exp_subround < ri->exp_subround)
+      return -1;
+    if (session->exp_subround < ri->exp_subround)
+      return 1;
+    return 0;
   }
   }
+  /* comparing rounds when we are not in a exp round */
+  GNUNET_assert (0);
 }
 
 
 }
 
 
@@ -1905,17 +674,18 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     return;
   session = cls;
   /* cancel timeout */
     return;
   session = cls;
   /* cancel timeout */
-  if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
+  if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
+  {
     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
-  session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
-  /* check if we are done with the log phase, 2-peer consensus only does one log round */
-  if ( (session->exp_round == NUM_EXP_ROUNDS) ||
-       ((session->num_peers == 2) && (session->exp_round == 1)))
+    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
+  }
+
+  if (session->exp_round >= NUM_EXP_ROUNDS)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
     round_over (session, NULL);
     return;
   }
     round_over (session, NULL);
     return;
   }
+
   if (session->exp_round == 0)
   {
     /* initialize everything for the log-rounds */
   if (session->exp_round == 0)
   {
     /* initialize everything for the log-rounds */
@@ -1923,8 +693,10 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     session->exp_subround = 0;
     if (NULL == session->shuffle)
       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
     session->exp_subround = 0;
     if (NULL == session->shuffle)
       session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
+    if (NULL == session->shuffle_inv)
+      session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
     for (i = 0; i < session->num_peers; i++)
     for (i = 0; i < session->num_peers; i++)
-      session->shuffle[i] = i;
+      session->shuffle[i] = session->shuffle_inv[i] = i;
   }
   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
   {
   }
   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
   {
@@ -1933,13 +705,68 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     session->exp_subround = 0;
     shuffle (session);
   }
     session->exp_subround = 0;
     shuffle (session);
   }
-  else 
+  else
   {
     session->exp_subround++;
   }
 
   {
     session->exp_subround++;
   }
 
+  /* determine the incoming and outgoing partner */
   find_partners (session);
 
   find_partners (session);
 
+  GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
+  GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
+
+  /* initiate set operation with the outgoing partner */
+  if (NULL != session->partner_outgoing)
+  {
+    struct GNUNET_CONSENSUS_RoundContextMessage *msg;
+    msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
+    msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
+    msg->header.size = htons (sizeof *msg);
+    msg->round = htonl (session->current_round);
+    msg->exp_round = htonl (session->exp_round);
+    msg->exp_subround = htonl (session->exp_subround);
+
+    if (NULL != session->partner_outgoing->set_op)
+    {
+      GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
+    }
+    session->partner_outgoing->set_op =
+        GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
+                            &session->global_id,
+                            (struct GNUNET_MessageHeader *) msg,
+                            0, /* FIXME: salt */
+                            GNUNET_SET_RESULT_ADDED,
+                            set_result_cb, session->partner_outgoing);
+    GNUNET_free (msg);
+    GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
+  }
+
+  /* commit to the delayed set operation */
+  if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
+  {
+    int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
+
+    if (NULL != session->partner_incoming->set_op)
+    {
+      GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
+      session->partner_incoming->set_op = NULL;
+    }
+    if (cmp == 0)
+    {
+      GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
+      session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
+      session->partner_incoming->delayed_set_op = NULL;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
+                  session->local_peer_idx, (int) (session->partner_incoming - session->info));
+    }
+    else
+    {
+      /* this should not happen -- a round has been skipped! */
+      GNUNET_break_op (0);
+    }
+  }
+
 #ifdef GNUNET_EXTRA_LOGGING
   {
     int in;
 #ifdef GNUNET_EXTRA_LOGGING
   {
     int in;
@@ -1952,34 +779,11 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       in = -1;
     else
       in = (int) (session->partner_incoming - session->info);
       in = -1;
     else
       in = (int) (session->partner_incoming - session->info);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
                 session->exp_round, session->exp_subround, in, out);
   }
 #endif /* GNUNET_EXTRA_LOGGING */
 
                 session->exp_round, session->exp_subround, in, out);
   }
 #endif /* GNUNET_EXTRA_LOGGING */
 
-  if (NULL != session->partner_incoming)
-  {
-    session->partner_incoming->ibf_state = IBF_STATE_NONE;
-    session->partner_incoming->exp_subround_finished = GNUNET_NO;
-    session->partner_incoming->ibf_bucket_counter = 0;
-
-    /* maybe there's an early strata estimator? */
-    replay_premature_message (session->partner_incoming);
-  }
-
-  if (NULL != session->partner_outgoing)
-  {
-    session->partner_outgoing->ibf_state = IBF_STATE_NONE;
-    session->partner_outgoing->ibf_bucket_counter = 0;
-    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
-    /* make sure peer is connected and send the SE */
-    embrace_peer (session->partner_outgoing);
-  }
-
-  /*
-  session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
-                                                                   subround_over, session);
-  */
 }
 
 
 }
 
 
@@ -2001,146 +805,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
 }
 
 
 }
 
 
-/**
- * Handle a HELLO-message, send when another peer wants to join a session where
- * our peer is a member. The session may or may not be inhabited yet.
- */
-static int
-handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
-{
-  struct ConsensusSession *session;
-
-  if (NULL != inc->requested_gid)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n");
-    return GNUNET_YES;
-  }
-  if (NULL != inc->cpi)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n");
-    return GNUNET_YES;
-  }
-
-  for (session = sessions_head; NULL != session; session = session->next)
-  {
-    int idx;
-    struct ConsensusPeerInformation *cpi;
-    if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
-      continue;
-    idx = get_peer_idx (&inc->peer_id, session);
-    GNUNET_assert (-1 != idx);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
-    cpi = &session->info[idx];
-    inc->cpi = cpi;
-    cpi->mss = inc->mss;
-    cpi = &session->info[idx];
-    cpi->hello = GNUNET_YES;
-    cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
-    embrace_peer (cpi);
-    return GNUNET_YES;        
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
-  inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode));
-  return GNUNET_YES;
-}
-
-
-
-/**
- * Handle tokenized messages from stream sockets.
- * Delegate them if the socket belongs to a session,
- * handle hello messages otherwise.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure, unused
- * @param client incoming socket this message comes from
- * @param message the actual message
- *
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
-{
-  struct IncomingSocket *inc;
-  GNUNET_assert (NULL == client);
-  GNUNET_assert (NULL != cls);
-  inc = (struct IncomingSocket *) cls;
-  switch (ntohs( message->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
-      return handle_p2p_hello (inc, (struct ConsensusHello *) message);
-    default:
-      if (NULL != inc->cpi)
-        return mst_session_callback (inc->cpi, client, message);
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
-                  ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Functions of this type are called upon new stream connection from other peers
- * or upon binding error which happen when the app_port given in
- * GNUNET_STREAM_listen() is already taken.
- *
- * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
- * @param initiator the identity of the peer who wants to establish a stream
- *            with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- *             stream (the socket will be invalid after the call)
- */
-static int
-listen_cb (void *cls,
-           struct GNUNET_STREAM_Socket *socket,
-           const struct GNUNET_PeerIdentity *initiator)
-{
-  struct IncomingSocket *incoming;
-
-  if (NULL == socket)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  incoming = GNUNET_malloc (sizeof *incoming);
-  incoming->peer_id = *initiator;
-  incoming->mss.socket = socket;
-  incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &stream_data_processor, &incoming->mss);
-  incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
-  incoming->mss.mst_cls = incoming;
-  GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
-  return GNUNET_OK;
-}
-
-
-/**
- * Disconnect a client, and destroy all sessions associated with it.
- *
- * @param client the client to disconnect
- */
-static void
-disconnect_client (struct GNUNET_SERVER_Client *client)
-{
-  struct ConsensusSession *session;
-  GNUNET_SERVER_client_disconnect (client);
-  
-  /* if the client owns a session, remove it */
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (client == session->scss.client)
-    {
-      destroy_session (session);
-      break;
-    }
-    session = session->next;
-  }
-}
-
-
 /**
  * Compute a global, (hopefully) unique consensus session id,
  * from the local id of the consensus session, and the identities of all participants.
 /**
  * Compute a global, (hopefully) unique consensus session id,
  * from the local id of the consensus session, and the identities of all participants.
@@ -2151,15 +815,20 @@ disconnect_client (struct GNUNET_SERVER_Client *client)
  * @param session_id local id of the consensus session
  */
 static void
  * @param session_id local id of the consensus session
  */
 static void
-compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
+compute_global_id (struct ConsensusSession *session,
+                  const struct GNUNET_HashCode *session_id)
 {
   int i;
   struct GNUNET_HashCode tmp;
 {
   int i;
   struct GNUNET_HashCode tmp;
+  struct GNUNET_HashCode phash;
+
+  /* FIXME: use kdf? */
 
   session->global_id = *session_id;
   for (i = 0; i < session->num_peers; ++i)
   {
 
   session->global_id = *session_id;
   for (i = 0; i < session->num_peers; ++i)
   {
-    GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
+    GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
+    GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
     session->global_id = tmp;
     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
     session->global_id = tmp;
     session->global_id = tmp;
     GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
     session->global_id = tmp;
@@ -2188,7 +857,8 @@ hash_cmp (const void *h1, const void *h2)
  * add the local peer if not in the join message.
  */
 static void
  * add the local peer if not in the join message.
  */
 static void
-initialize_session_peer_list (struct ConsensusSession *session)
+initialize_session_peer_list (struct ConsensusSession *session,
+                              struct GNUNET_CONSENSUS_JoinMessage *join_msg)
 {
   unsigned int local_peer_in_list;
   uint32_t listed_peers;
 {
   unsigned int local_peer_in_list;
   uint32_t listed_peers;
@@ -2196,19 +866,19 @@ initialize_session_peer_list (struct ConsensusSession *session)
   struct GNUNET_PeerIdentity *peers;
   unsigned int i;
 
   struct GNUNET_PeerIdentity *peers;
   unsigned int i;
 
-  GNUNET_assert (NULL != session->join_msg);
+  GNUNET_assert (NULL != join_msg);
 
   /* peers in the join message, may or may not include the local peer */
 
   /* peers in the join message, may or may not include the local peer */
-  listed_peers = ntohl (session->join_msg->num_peers);
-  
+  listed_peers = ntohl (join_msg->num_peers);
+
   session->num_peers = listed_peers;
 
   session->num_peers = listed_peers;
 
-  msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
+  msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
 
   local_peer_in_list = GNUNET_NO;
   for (i = 0; i < listed_peers; i++)
   {
 
   local_peer_in_list = GNUNET_NO;
   for (i = 0; i < listed_peers; i++)
   {
-    if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
+    if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
     {
       local_peer_in_list = GNUNET_YES;
       break;
     {
       local_peer_in_list = GNUNET_YES;
       break;
@@ -2221,7 +891,7 @@ initialize_session_peer_list (struct ConsensusSession *session)
   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   if (GNUNET_NO == local_peer_in_list)
   peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   if (GNUNET_NO == local_peer_in_list)
-    peers[session->num_peers - 1] = *my_peer;
+    peers[session->num_peers - 1] = my_peer;
 
   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
 
   memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
   qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
@@ -2236,37 +906,104 @@ initialize_session_peer_list (struct ConsensusSession *session)
     session->info[i].peer_id = peers[i];
   }
 
     session->info[i].peer_id = peers[i];
   }
 
-  free (peers);
+  GNUNET_free (peers);
 }
 
 
 /**
 }
 
 
 /**
- * Add incoming peer connections to the session,
- * for peers who have connected to us before the local session has been established
+ * Called when another peer wants to do a set operation with the
+ * local peer.
  *
  *
- * @param session ...
+ * @param cls closure
+ * @param other_peer the other peer
+ * @param context_msg message with application specific information from
+ *        the other peer
+ * @param request request from the other peer, use GNUNET_SET_accept
+ *        to accept it, otherwise the request will be refused
+ *        Note that we don't use a return value here, as it is also
+ *        necessary to specify the set we want to do the operation with,
+ *        whith sometimes can be derived from the context message.
+ *        Also necessary to specify the timeout.
  */
 static void
  */
 static void
-add_incoming_peers (struct ConsensusSession *session)
+set_listen_cb (void *cls,
+               const struct GNUNET_PeerIdentity *other_peer,
+               const struct GNUNET_MessageHeader *context_msg,
+               struct GNUNET_SET_Request *request)
 {
 {
-  struct IncomingSocket *inc;
-  int i;
+  struct ConsensusSession *session = cls;
+  struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
   struct ConsensusPeerInformation *cpi;
   struct ConsensusPeerInformation *cpi;
+  struct GNUNET_SET_OperationHandle *set_op;
+  struct RoundInfo round_info;
+  int index;
+  int cmp;
 
 
-  for (inc = incoming_sockets_head; NULL != inc; inc = inc->next)
+  if (NULL == context_msg)
   {
   {
-    if ( (NULL == inc->requested_gid) ||
-         (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) )
-      continue;
-    for (i = 0; i < session->num_peers; i++)
-    {
-      cpi = &session->info[i];
-      cpi->peer_id = inc->peer_id;
-      cpi->mss = inc->mss;
-      cpi->hello = GNUNET_YES;
-      inc->cpi = cpi;
+    GNUNET_break_op (0);
+    return;
+  }
+
+  index = get_peer_idx (other_peer, session);
+
+  if (index < 0)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  round_info.round = ntohl (msg->round);
+  round_info.exp_round = ntohl (msg->exp_round);
+  round_info.exp_subround = ntohl (msg->exp_subround);
+
+  cpi = &session->info[index];
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
+
+  switch (session->current_round)
+  {
+    case CONSENSUS_ROUND_BEGIN:
+      /* we're in the begin round, so requests for the exchange round may
+       * come in, they will be delayed for now! */
+    case CONSENSUS_ROUND_EXCHANGE:
+      cmp = rounds_compare (session, &round_info);
+      if (cmp > 0)
+      {
+        /* the other peer is too late */
+        GNUNET_break_op (0);
+        return;
+      }
+      /* kill old request, if any. this is legal,
+       * as the other peer would not make a new request if it would want to
+       * complete the old one! */
+      if (NULL != cpi->set_op)
+      {
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
+      }
+      set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+                                       set_result_cb, &session->info[index]);
+      if (cmp == 0)
+      {
+        cpi->set_op = set_op;
+        GNUNET_SET_commit (set_op, session->element_set);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
+      }
+      else
+      {
+        /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
+        cpi->delayed_set_op = set_op;
+        cpi->delayed_round_info = round_info;
+        cpi->exp_subround_finished = GNUNET_YES;
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
+      }
       break;
       break;
-    }
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
+                  session->local_peer_idx, session->current_round, index);
+      GNUNET_break_op (0);
+      return;
   }
 }
 
   }
 }
 
@@ -2275,45 +1012,66 @@ add_incoming_peers (struct ConsensusSession *session)
  * Initialize the session, continue receiving messages from the owning client
  *
  * @param session the session to initialize
  * Initialize the session, continue receiving messages from the owning client
  *
  * @param session the session to initialize
+ * @param join_msg the join message from the client
  */
 static void
  */
 static void
-initialize_session (struct ConsensusSession *session)
+initialize_session (struct ConsensusSession *session,
+                    struct GNUNET_CONSENSUS_JoinMessage *join_msg)
 {
   struct ConsensusSession *other_session;
 
 {
   struct ConsensusSession *other_session;
 
-  GNUNET_assert (NULL != session->join_msg);
-  initialize_session_peer_list (session);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
-  compute_global_id (session, &session->join_msg->session_id);
+  initialize_session_peer_list (session, join_msg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
+  compute_global_id (session, &join_msg->session_id);
 
 
-  /* Check if some local client already owns the session. */
+  /* check if some local client already owns the session.
+   * it is only legal to have a session with an existing global id
+   * if all other sessions with this global id are finished.*/
   other_session = sessions_head;
   while (NULL != other_session)
   {
   other_session = sessions_head;
   while (NULL != other_session)
   {
-    if ((other_session != session) && 
+    if ((other_session != session) &&
         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
     {
         (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
     {
-      if (GNUNET_NO == other_session->conclude)
+      if (CONSENSUS_ROUND_FINISH != other_session->current_round)
       {
         GNUNET_break (0);
         destroy_session (session);
         return;
       }
       {
         GNUNET_break (0);
         destroy_session (session);
         return;
       }
-      GNUNET_SERVER_client_drop (other_session->scss.client);
-      other_session->scss.client = NULL;
       break;
     }
     other_session = other_session->next;
   }
 
       break;
     }
     other_session = other_session->next;
   }
 
-  session->local_peer_idx = get_peer_idx (my_peer, session);
+  session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
+  session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
+
+  session->local_peer_idx = get_peer_idx (&my_peer, session);
   GNUNET_assert (-1 != session->local_peer_idx);
   GNUNET_assert (-1 != session->local_peer_idx);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
-  GNUNET_free (session->join_msg);
-  session->join_msg = NULL;
-  add_incoming_peers (session);
-  GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
+  session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  GNUNET_assert (NULL != session->element_set);
+  session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
+                                             &session->global_id,
+                                             set_listen_cb, session);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
+}
+
+
+static struct ConsensusSession *
+get_session_by_client (struct GNUNET_SERVER_Client *client)
+{
+  struct ConsensusSession *session;
+
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (session->client == client)
+      return session;
+    session = session->next;
+  }
+  return NULL;
 }
 
 
 }
 
 
@@ -2331,45 +1089,26 @@ client_join (void *cls,
 {
   struct ConsensusSession *session;
 
 {
   struct ConsensusSession *session;
 
-  // make sure the client has not already joined a session
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->scss.client == client)
-    {
-      GNUNET_break (0);
-      disconnect_client (client);
-      return;
-    }
-    session = session->next;
-  }
-
-  session = GNUNET_new (struct ConsensusSession);
-  session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
-  /* these have to be initialized here, as the client can already start to give us values */
-  session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
-  session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
-  session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
-  session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
-  session->scss.client = client;
-  session->client_mq = create_message_queue_for_server_client (&session->scss);
-  GNUNET_SERVER_client_keep (client);
-
-  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
 
 
-  // Initialize session later if local peer identity is not known yet.
-  if (NULL == my_peer)
+  session = get_session_by_client (client);
+  if (NULL != session)
   {
   {
-    GNUNET_SERVER_disable_receive_done_warning (client);
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
     return;
   }
+  session = GNUNET_new (struct ConsensusSession);
+  session->client = client;
+  session->client_mq = GNUNET_MQ_queue_for_server_client (client);
+  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+  initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
 
-  initialize_session (session);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
 }
 
 
 }
 
 
-
-
 /**
  * Called when a client performs an insert operation.
  *
 /**
  * Called when a client performs an insert operation.
  *
@@ -2379,39 +1118,48 @@ client_join (void *cls,
  */
 void
 client_insert (void *cls,
  */
 void
 client_insert (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *m)
+               struct GNUNET_SERVER_Client *client,
+               const struct GNUNET_MessageHeader *m)
 {
   struct ConsensusSession *session;
   struct GNUNET_CONSENSUS_ElementMessage *msg;
 {
   struct ConsensusSession *session;
   struct GNUNET_CONSENSUS_ElementMessage *msg;
-  struct GNUNET_CONSENSUS_Element *element;
-  int element_size;
+  struct GNUNET_SET_Element *element;
+  ssize_t element_size;
 
 
-  session = sessions_head;
-  while (NULL != session)
+  session = get_session_by_client (client);
+
+  if (NULL == session)
   {
   {
-    if (session->scss.client == client)
-      break;
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (client);
+    return;
   }
 
   }
 
-  if (NULL == session)
+  if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
+    GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
 
   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
 
   msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
-  element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
-  element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
+  element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+  if (element_size < 0)
+  {
+    GNUNET_break (0);
+    return;
+  }
+
+  element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
   element->type = msg->element_type;
   element->size = element_size;
   memcpy (&element[1], &msg[1], element_size);
   element->data = &element[1];
   element->type = msg->element_type;
   element->size = element_size;
   memcpy (&element[1], &msg[1], element_size);
   element->data = &element[1];
-  GNUNET_assert (NULL != element->data);
-  insert_element (session, element);
-
+  GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
+  GNUNET_free (element);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
 }
 
 
 }
 
 
@@ -2428,13 +1176,9 @@ client_conclude (void *cls,
                  const struct GNUNET_MessageHeader *message)
 {
   struct ConsensusSession *session;
                  const struct GNUNET_MessageHeader *message)
 {
   struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
-
-  cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
 
 
-  session = sessions_head;
-  while ((session != NULL) && (session->scss.client != client))
-    session = session->next;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
+  session = get_session_by_client (client);
   if (NULL == session)
   {
     /* client not found */
   if (NULL == session)
   {
     /* client not found */
@@ -2442,70 +1186,28 @@ client_conclude (void *cls,
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
     /* client requested conclude twice */
     GNUNET_break (0);
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
     /* client requested conclude twice */
     GNUNET_break (0);
-    /* client may still own a session, destroy it */
-    disconnect_client (client);
     return;
   }
     return;
   }
-
-  session->conclude = GNUNET_YES;
-
   if (session->num_peers <= 1)
   {
   if (session->num_peers <= 1)
   {
-    send_client_conclude_done (session);
+    session->current_round = CONSENSUS_ROUND_FINISH;
+    GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
   }
   else
   {
   }
   else
   {
-    session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
-    /* the 'begin' round is over, start with the next, real round */
+    /* the 'begin' round is over, start with the next, actual round */
     round_over (session, NULL);
   }
 
     round_over (session, NULL);
   }
 
+  GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
-/**
- * Task that disconnects from core.
- *
- * @param cls core handle
- * @param tc context information (why was this task triggered now)
- */
-static void
-disconnect_core (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  if (core != NULL)
-  {
-    GNUNET_CORE_disconnect (core);
-    core = NULL;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
-}
-
-
-static void
-core_startup (void *cls,
-              struct GNUNET_CORE_Handle *core,
-              const struct GNUNET_PeerIdentity *peer)
-{
-  struct ConsensusSession *session;
-
-  my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
-  /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
-  GNUNET_SCHEDULER_add_now (&disconnect_core, core);
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
-  /* initialize sessions that are waiting for the local peer identity */
-  for (session = sessions_head; NULL != session; session = session->next)
-    if (NULL != session->join_msg)
-      initialize_session (session);
-}
-
-
 /**
  * Called to clean up, after a shutdown has been requested.
  *
 /**
  * Called to clean up, after a shutdown has been requested.
  *
@@ -2516,37 +1218,33 @@ static void
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  while (NULL != incoming_sockets_head)
-  {
-    struct IncomingSocket *socket;
-    socket = incoming_sockets_head;
-    if (NULL == socket->cpi)
-      clear_message_stream_state (&socket->mss);
-    incoming_sockets_head = incoming_sockets_head->next;
-    GNUNET_free (socket);
-  }
-
   while (NULL != sessions_head)
   while (NULL != sessions_head)
-  {
-    struct ConsensusSession *session;
-    session = sessions_head->next;
     destroy_session (sessions_head);
     destroy_session (sessions_head);
-    sessions_head = session;
-  }
 
 
-  if (NULL != core)
-  {
-    GNUNET_CORE_disconnect (core);
-    core = NULL;
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
+}
 
 
-  if (NULL != listener)
-  {
-    GNUNET_STREAM_listen_close (listener);
-    listener = NULL;
-  } 
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+/**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+  struct ConsensusSession *session;
+
+  session = get_session_by_client (client);
+  if (NULL == session)
+    return;
+  if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
+      (CONSENSUS_ROUND_FINISH == session->current_round))
+    destroy_session (session);
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
 }
 
 
 }
 
 
@@ -2558,37 +1256,29 @@ shutdown_task (void *cls,
  * @param c configuration to use
  */
 static void
  * @param c configuration to use
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
 {
-  /* core is only used to retrieve the peer identity */
-  static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
-    {NULL, 0, 0}
-  };
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
-    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
-        sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
+        sizeof (struct GNUNET_MessageHeader)},
+    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
     {NULL, NULL, 0, 0}
   };
 
   cfg = c;
   srv = server;
     {NULL, NULL, 0, 0}
   };
 
   cfg = c;
   srv = server;
-
+  if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
   GNUNET_SERVER_add_handlers (server, server_handlers);
   GNUNET_SERVER_add_handlers (server, server_handlers);
-
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
-
-  listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                   &listen_cb, NULL,
-                                   GNUNET_STREAM_OPTION_END);
-
-  /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
-  core = GNUNET_CORE_connect (c, NULL, 
-                             &core_startup, NULL, 
-                             NULL, NULL, GNUNET_NO, NULL, 
-                             GNUNET_NO, core_handlers);
-  GNUNET_assert (NULL != core);
+  GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
 }
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
 }
 
@@ -2605,7 +1295,7 @@ main (int argc, char *const *argv)
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 
   return (GNUNET_OK == ret) ? 0 : 1;
 }