- delete unused message type
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
index 2f59b86bcb96d68e8cbb1add11719889cdb453f8..ffd9786d33522098acf1f6efa80995b28e8a6a8a 100644 (file)
@@ -1,10 +1,10 @@
 /*
       This file is part of GNUnet
 /*
       This file is part of GNUnet
-      (C) 2012 Christian Grothoff (and other contributing authors)
+      (C) 2012, 2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
+      by the Free Software Foundation; either version 3, or (at your
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
  */
 
 #include "platform.h"
  */
 
 #include "platform.h"
-#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
-#include "gnunet_util_lib.h"
+#include "gnunet_set_service.h"
 #include "gnunet_consensus_service.h"
 #include "gnunet_consensus_service.h"
-#include "gnunet_core_service.h"
-#include "gnunet_stream_lib.h"
 #include "consensus_protocol.h"
 #include "consensus_protocol.h"
-#include "ibf.h"
 #include "consensus.h"
 
 
 /**
 #include "consensus.h"
 
 
 /**
- * Number of IBFs in a strata estimator.
- */
-#define STRATA_COUNT 32
-/**
- * Number of buckets per IBF.
+ * Log macro that prefixes the local peer and the peer we are in contact with.
  */
  */
-#define STRATA_IBF_BUCKETS 80
-/**
- * hash num parameter for the difference digests and strata estimators
- */
-#define STRATA_HASH_NUM 3
+#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
+   cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
 
 
-/**
- * Number of buckets that can be transmitted in one message.
- */
-#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
 
 /**
 
 /**
- * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
- * Choose this value so that computing the IBF is still cheaper
- * than transmitting all values.
+ * Number of exponential rounds, used in the exp and completion round.
  */
  */
-#define MAX_IBF_ORDER (16)
-
+#define NUM_EXP_ROUNDS 4
 
 /* forward declarations */
 
 
 /* forward declarations */
 
-struct ConsensusSession;
-struct IncomingSocket;
+/* mutual recursion with struct ConsensusSession */
 struct ConsensusPeerInformation;
 
 struct ConsensusPeerInformation;
 
+/* mutual recursion with round_over */
 static void
 static void
-send_next (struct ConsensusSession *session);
-
-static void 
-write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
-
-static void 
-write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
-
-static void 
-write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size);
-
-static int
-get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
-
-
-/**
- * An element that is waiting to be transmitted.
- */
-struct PendingElement
-{
-  /**
-   * Pending elements are kept in a DLL.
-   */
-  struct PendingElement *next;
-
-  /**
-   * Pending elements are kept in a DLL.
-   */
-  struct PendingElement *prev;
+subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
-  /**
-   * The actual element
-   */
-  struct GNUNET_CONSENSUS_Element *element;
-
-  /* peer this element is coming from */
-  struct ConsensusPeerInformation *cpi;
-};
 
 /**
 
 /**
- * Information about a peer that is in a consensus session.
+ * Describes the current round a consensus session is in.
  */
  */
-struct ConsensusPeerInformation
+enum ConsensusRound
 {
   /**
 {
   /**
-   * Socket for communicating with the peer, either created by the local peer,
-   * or the remote peer.
-   */
-  struct GNUNET_STREAM_Socket *socket;
-
-  /**
-   * Message tokenizer, for the data received from this peer via the stream socket.
-   */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-
-  /**
-   * Is socket's connection established, i.e. can we write to it?
-   * Only relevent to outgoing cpi.
-   */
-  int is_connected;
-
-  /**
-   * Type of the peer in the all-to-all rounds,
-   * GNUNET_YES if we initiate reconciliation.
-   */
-  int is_outgoing;
-
-  /**
-   * if the peer did something wrong, and was disconnected,
-   * never interact with this peer again.
-   */
-  int is_bad;
-
-  /**
-   * Did we receive/send a consensus hello?
-   */
-  int hello;
-
-  /**
-   * Handle for currently active read
-   */
-  struct GNUNET_STREAM_ReadHandle *rh;
-
-  /**
-   * Handle for currently active read
-   */
-  struct GNUNET_STREAM_WriteHandle *wh;
-
-  enum {
-    /* beginning of round */
-    IBF_STATE_NONE=0,
-    /* we currently receive an ibf */
-    IBF_STATE_RECEIVING,
-    /* we currently transmit an ibf */
-    IBF_STATE_TRANSMITTING,
-    /* we decode a received ibf */
-    IBF_STATE_DECODING,
-    /* wait for elements and element requests */
-    IBF_STATE_ANTICIPATE_DIFF
-  } ibf_state ;
-
-  /**
-   * What is the order (=log2 size) of the ibf
-   * we're currently dealing with?
-   * Interpretation depends on ibf_state.
-   */
-  int ibf_order;
-
-  /**
-   * The current IBF for this peer,
-   * purpose dependent on ibf_state
-   */
-  struct InvertibleBloomFilter *ibf;
-
-  /**
-   * How many buckets have we transmitted/received? 
-   * Interpretatin depends on ibf_state
-   */
-  int ibf_bucket_counter;
-
-  /**
-   * Strata estimator of the peer, NULL if our peer
-   * initiated the reconciliation.
-   */
-  struct InvertibleBloomFilter **strata;
-
-  /**
-   * Elements that the peer is missing from us.
-   */
-  uint64_t *missing_local;
-
-  /**
-   * Number of elements in missing_local
+   * Not started the protocol yet.
    */
    */
-  unsigned int num_missing_local;
-
+  CONSENSUS_ROUND_BEGIN=0,
   /**
   /**
-   * Elements that this peer told us *we* don't have,
-   * i.e. we are the remote peer that has some values missing.
+   * Distribution of elements with the exponential scheme.
    */
    */
-  uint64_t *missing_remote;
-
+  CONSENSUS_ROUND_EXCHANGE,
   /**
   /**
-   * Number of elements in missing_local
+   * Exchange which elements each peer has, but don't
+   * transmit the element's data, only their SHA-512 hashes.
+   * This round uses the all-to-all scheme.
    */
    */
-  unsigned int num_missing_remote;
-
+  CONSENSUS_ROUND_INVENTORY,
   /**
   /**
-   * Back-reference to the consensus session,
-   * to that ConsensusPeerInformation can be used as a closure
+   * Collect and distribute missing values with the exponential scheme.
    */
    */
-  struct ConsensusSession *session;
-
+  CONSENSUS_ROUND_COMPLETION,
   /**
   /**
-   * When decoding the IBF, requests for elements and outgoing elements
-   * have to be queued, to ensure that messages actually fit in the stream buffer.
+   * Consensus concluded. After timeout and finished communication with client,
+   * consensus session will be destroyed.
    */
    */
-  struct QueuedMessage *requests_and_elements_head;
-  struct QueuedMessage *requests_and_elements_tail;
+  CONSENSUS_ROUND_FINISH
 };
 
 };
 
-/**
- * A doubly linked list of messages.
- */
-struct QueuedMessage
-{
-  struct GNUNET_MessageHeader *msg;
-
-  /**
-   * Queued messages are stored in a doubly linked list.
-   */
-  struct QueuedMessage *next;
-
-  /**
-   * Queued messages are stored in a doubly linked list.
-   */
-  struct QueuedMessage *prev;
-};
 
 /**
 
 /**
- * Describes the current round a consensus session is in.
+ * Complete information about the current round and all
+ * subrounds.
  */
  */
-enum ConsensusRound
+struct RoundInfo
 {
   /**
 {
   /**
-   * Not started the protocl yet
-   */
-  CONSENSUS_ROUND_BEGIN=0,
-  /**
-   * distribution of information with the exponential scheme.
+   * The current main round.
    */
    */
-  CONSENSUS_ROUND_EXP_EXCHANGE,
+  enum ConsensusRound round;
   /**
   /**
-   * All-to-all, exchange missing values.
+   * The current exp round, valid if
+   * the main round is an exp round.
    */
    */
-  CONSENSUS_ROUND_A2A_EXCHANGE,
+  uint32_t exp_round;
   /**
   /**
-   * All-to-all, check what values are missing, don't exchange anything.
+   * The current exp subround, valid if
+   * the main round is an exp round.
    */
    */
-  CONSENSUS_ROUND_A2A_INVENTORY,
-  /**
-   * All-to-all round to exchange information for byzantine fault detection.
-   */
-  CONSENSUS_ROUND_A2A_INVENTORY_AGREEMENT,
-  /**
-   * Rounds are over
-   */
-  CONSENSUS_ROUND_FINISH
+  uint32_t exp_subround;
 };
 
 
 };
 
 
@@ -296,169 +125,148 @@ struct ConsensusSession
    */
   struct ConsensusSession *prev;
 
    */
   struct ConsensusSession *prev;
 
-  /**
-   * Join message. Used to initialize the session later,
-   * if the identity of the local peer is not yet known.
-   * NULL if the session has been fully initialized.
-   */
-  struct GNUNET_CONSENSUS_JoinMessage *join_msg;
-
   /**
   * Global consensus identification, computed
   /**
   * Global consensus identification, computed
-  * from the local id and participating authorities.
+  * from the session id and participating authorities.
   */
   struct GNUNET_HashCode global_id;
 
   /**
   */
   struct GNUNET_HashCode global_id;
 
   /**
-   * Local client in this consensus session.
-   * There is only one client per consensus session.
+   * Client that inhabits the session
    */
   struct GNUNET_SERVER_Client *client;
 
   /**
    */
   struct GNUNET_SERVER_Client *client;
 
   /**
-   * Values in the consensus set of this session,
-   * all of them either have been sent by or approved by the client.
-   * Contains GNUNET_CONSENSUS_Element.
+   * Queued messages to the client.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap *values;
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
 
   /**
-   * Elements that have not been approved (or rejected) by the client yet.
+   * Time when the conclusion of the consensus should begin.
    */
    */
-  struct PendingElement *approval_pending_head;
+  struct GNUNET_TIME_Absolute conclude_start;
 
   /**
 
   /**
-   * Elements that have not been approved (or rejected) by the client yet.
+   * Timeout for all rounds together, single rounds will schedule a timeout task
+   * with a fraction of the conclude timeout.
+   * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
    */
    */
-  struct PendingElement *approval_pending_tail;
+  struct GNUNET_TIME_Absolute conclude_deadline;
 
   /**
 
   /**
-   * Messages to be sent to the local client that owns this session
+   * Timeout task identifier for the current round.
    */
    */
-  struct QueuedMessage *client_messages_head;
+  GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
 
   /**
 
   /**
-   * Messages to be sent to the local client that owns this session
+   * Number of other peers in the consensus.
    */
    */
-  struct QueuedMessage *client_messages_tail;
+  unsigned int num_peers;
 
   /**
 
   /**
-   * Currently active transmit handle for sending to the client
+   * Information about the other peers,
+   * their state, etc.
    */
    */
-  struct GNUNET_SERVER_TransmitHandle *th;
+  struct ConsensusPeerInformation *info;
 
   /**
 
   /**
-   * Timeout for all rounds together, single rounds will schedule a timeout task
-   * with a fraction of the conclude timeout.
+   * Index of the local peer in the peers array
    */
    */
-  struct GNUNET_TIME_Relative conclude_timeout;
-  
+  unsigned int local_peer_idx;
+
   /**
   /**
-   * Timeout task identifier for the current round
+   * Current round
    */
    */
-  GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
+  enum ConsensusRound current_round;
 
   /**
 
   /**
-   * Number of other peers in the consensus
+   * Permutation of peers for the current round,
    */
    */
-  unsigned int num_peers;
+  uint32_t *shuffle;
 
   /**
 
   /**
-   * Information about the other peers,
-   * their state, etc.
+   * Inverse permutation of peers for the current round,
    */
    */
-  struct ConsensusPeerInformation *info;
+  uint32_t *shuffle_inv;
 
   /**
 
   /**
-   * Sorted array of peer identities in this consensus session,
-   * includes the local peer.
+   * Current round of the exponential scheme.
    */
    */
-  struct GNUNET_PeerIdentity *peers;
+  uint32_t exp_round;
 
   /**
 
   /**
-   * Index of the local peer in the peers array
+   * Current sub-round of the exponential scheme.
    */
    */
-  int local_peer_idx;
+  uint32_t exp_subround;
 
   /**
 
   /**
-   * Strata estimator, computed online
+   * The partner for the current exp-round
    */
    */
-  struct InvertibleBloomFilter **strata;
+  struct ConsensusPeerInformation *partner_outgoing;
 
   /**
 
   /**
-   * Pre-computed IBFs
+   * The partner for the current exp-round
    */
    */
-  struct InvertibleBloomFilter **ibfs;
+  struct ConsensusPeerInformation *partner_incoming;
 
   /**
 
   /**
-   * Current round
+   * The consensus set of this session.
    */
    */
-  enum ConsensusRound current_round;
+  struct GNUNET_SET_Handle *element_set;
+
+  /**
+   * Listener for requests from other peers.
+   * Uses the session's global id as app id.
+   */
+  struct GNUNET_SET_ListenHandle *set_listener;
 };
 
 
 /**
 };
 
 
 /**
- * Sockets from other peers who want to communicate with us.
- * It may not be known yet which consensus session they belong to.
- * Also, the session might not exist yet locally.
+ * Information about a peer that is in a consensus session.
  */
  */
-struct IncomingSocket
+struct ConsensusPeerInformation
 {
   /**
 {
   /**
-   * Incoming sockets are kept in a double linked list.
-   */
-  struct IncomingSocket *next;
-
-  /**
-   * Incoming sockets are kept in a double linked list.
-   */
-  struct IncomingSocket *prev;
-
-  /**
-   * The actual socket.
+   * Peer identitty of the peer in the consensus session
    */
    */
-  struct GNUNET_STREAM_Socket *socket;
+  struct GNUNET_PeerIdentity peer_id;
 
   /**
 
   /**
-   * Handle for currently active read
+   * Back-reference to the consensus session,
+   * to that ConsensusPeerInformation can be used as a closure
    */
    */
-  struct GNUNET_STREAM_ReadHandle *rh;
+  struct ConsensusSession *session;
 
   /**
 
   /**
-   * Peer that connected to us with the socket.
+   * We have finishes the exp-subround with the peer.
    */
    */
-  struct GNUNET_PeerIdentity *peer;
+  int exp_subround_finished;
 
   /**
 
   /**
-   * Message stream tokenizer for this socket.
+   * Set operation we are currently executing with this peer.
    */
    */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  struct GNUNET_SET_OperationHandle *set_op;
 
   /**
 
   /**
-   * Peer-in-session this socket belongs to, once known, otherwise NULL.
+   * Set operation we are planning on executing with this peer.
    */
    */
-  struct ConsensusPeerInformation *cpi;
+  struct GNUNET_SET_OperationHandle *delayed_set_op;
 
   /**
 
   /**
-   * Set to the global session id, if the peer sent us a hello-message,
-   * but the session does not exist yet.
-   *
-   * FIXME: not implemented yet
+   * Info about the round of the delayed set operation.
    */
    */
-  struct GNUNET_HashCode *requested_gid;
+  struct RoundInfo delayed_round_info;
 };
 
 
 };
 
 
-static struct IncomingSocket *incoming_sockets_head;
-static struct IncomingSocket *incoming_sockets_tail;
-
 /**
 /**
- * Linked list of sesstions this peer participates in.
+ * Linked list of sessions this peer participates in.
  */
 static struct ConsensusSession *sessions_head;
 
 /**
  */
 static struct ConsensusSession *sessions_head;
 
 /**
- * Linked list of sesstions this peer participates in.
+ * Linked list of sessions this peer participates in.
  */
 static struct ConsensusSession *sessions_tail;
 
  */
 static struct ConsensusSession *sessions_tail;
 
@@ -475,1661 +283,853 @@ static struct GNUNET_SERVER_Handle *srv;
 /**
  * Peer that runs this service.
  */
 /**
  * Peer that runs this service.
  */
-static struct GNUNET_PeerIdentity *my_peer;
-
-/**
- * Handle to the core service. Only used during service startup, will be NULL after that.
- */
-static struct GNUNET_CORE_Handle *core;
-
-/**
- * Listener for sockets from peers that want to reconcile with us.
- */
-static struct GNUNET_STREAM_ListenSocket *listener;
+static struct GNUNET_PeerIdentity my_peer;
 
 
 
 
-/**
- * Queue a message to be sent to the inhabiting client of a session.
- *
- * @param session session
- * @param msg message we want to queue
- */
-static void
-queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
-{
-  struct QueuedMessage *qm;
-  qm = GNUNET_malloc (sizeof *qm);
-  qm->msg = msg;
-  GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
-}
-
-/**
- * Get peer index associated with the peer information,
- * unique for every session among all peers.
- */
 static int
 static int
-get_cpi_index (struct ConsensusPeerInformation *cpi)
-{
-  return cpi - cpi->session->info;
-}
-
-/**
- * Mark the peer as bad, free state we don't need anymore.
- *
- * @param cpi consensus peer information of the bad peer
- */
-static void
-mark_peer_bad (struct ConsensusPeerInformation *cpi)
+have_exp_subround_finished (const struct ConsensusSession *session)
 {
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi));
-  cpi->is_bad = GNUNET_YES;
-  /* FIXME: free ibfs etc. */
+  int not_finished;
+  not_finished = 0;
+  if ( (NULL != session->partner_outgoing) &&
+       (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
+    not_finished++;
+  if ( (NULL != session->partner_incoming) &&
+       (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
+    not_finished++;
+  if (0 == not_finished)
+    return GNUNET_YES;
+  return GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * Estimate set difference with two strata estimators,
- * i.e. arrays of IBFs.
- * Does not not modify its arguments.
+ * Destroy a session, free all resources associated with it.
  *
  *
- * @param strata1 first strata estimator
- * @param strata2 second strata estimator
- * @return the estimated difference
+ * @param session the session to destroy
  */
  */
-static int
-estimate_difference (struct InvertibleBloomFilter** strata1,
-                     struct InvertibleBloomFilter** strata2)
+static void
+destroy_session (struct ConsensusSession *session)
 {
   int i;
 {
   int i;
-  int count;
-  count = 0;
-  for (i = STRATA_COUNT - 1; i >= 0; i--)
+
+  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+  if (NULL != session->element_set)
+  {
+    GNUNET_SET_destroy (session->element_set);
+    session->element_set = NULL;
+  }
+  if (NULL != session->set_listener)
+  {
+    GNUNET_SET_listen_cancel (session->set_listener);
+    session->set_listener = NULL;
+  }
+  if (NULL != session->client_mq)
   {
   {
-    struct InvertibleBloomFilter *diff;
-    /* number of keys decoded from the ibf */
-    int ibf_count;
-    int more;
-    ibf_count = 0;
-    /* FIXME: implement this without always allocating new IBFs */
-    diff = ibf_dup (strata1[i]);
-    ibf_subtract (diff, strata2[i]);
-    for (;;)
+    GNUNET_MQ_destroy (session->client_mq);
+    session->client_mq = NULL;
+  }
+  if (NULL != session->client)
+  {
+    GNUNET_SERVER_client_disconnect (session->client);
+    session->client = NULL;
+  }
+  if (NULL != session->shuffle)
+  {
+    GNUNET_free (session->shuffle);
+    session->shuffle = NULL;
+  }
+  if (NULL != session->shuffle_inv)
+  {
+    GNUNET_free (session->shuffle_inv);
+    session->shuffle_inv = NULL;
+  }
+  if (NULL != session->info)
+  {
+    for (i = 0; i < session->num_peers; i++)
     {
     {
-      more = ibf_decode (diff, NULL, NULL);
-      if (GNUNET_NO == more)
+      struct ConsensusPeerInformation *cpi;
+      cpi = &session->info[i];
+      if (NULL != cpi->set_op)
       {
       {
-        count += ibf_count;
-        break;
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
       }
       }
-      if (GNUNET_SYSERR == more)
-      {
-        ibf_destroy (diff);
-        return count * (1 << (i + 1));
-      }
-      ibf_count++;
     }
     }
-    ibf_destroy (diff);
+    GNUNET_free (session->info);
+    session->info = NULL;
   }
   }
-  return count;
+  GNUNET_free (session);
 }
 
 
 /**
 }
 
 
 /**
- * Called when receiving data from a peer that is member of
- * an inhabited consensus session.
+ * Iterator for set elements.
  *
  *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout 
- * @return number of bytes of processed from 'data' (any data remaining should be
- *         given to the next time the read processor is called).
+ * @param cls closure
+ * @param element the current element, NULL if all elements have been
+ *        iterated over
+ * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
  */
  */
-static size_t
-session_stream_data_processor (void *cls,
-                       enum GNUNET_STREAM_Status status,
-                       const void *data,
-                       size_t size)
+static int
+send_to_client_iter (void *cls,
+                     const struct GNUNET_SET_Element *element)
 {
 {
-  struct ConsensusPeerInformation *cpi;
-  int ret;
+  struct ConsensusSession *session = cls;
+  struct GNUNET_MQ_Envelope *ev;
+
+  if (NULL != element)
+  {
+    struct GNUNET_CONSENSUS_ElementMessage *m;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
+                session->local_peer_idx);
 
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  cpi = cls;
-  GNUNET_assert (NULL != cpi->mst);
-  ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
-  if (GNUNET_SYSERR == ret)
+    ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
+    m->element_type = htons (element->type);
+    memcpy (&m[1], element->data, element->size);
+    GNUNET_MQ_send (session->client_mq, ev);
+  }
+  else
   {
   {
-    /* FIXME: handle this correctly */
-    GNUNET_assert (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
+                session->local_peer_idx);
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
+    GNUNET_MQ_send (session->client_mq, ev);
   }
   }
-  /* read again */
-  cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                &session_stream_data_processor, cpi);
-  /* we always read all data */
-  return size;
+  return GNUNET_YES;
 }
 
 }
 
+
 /**
 /**
- * Called when we receive data from a peer that is not member of
- * a session yet, or the session is not yet inhabited.
+ * Start the next round.
+ * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
  *
  *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout 
- * @return number of bytes of processed from 'data' (any data remaining should be
- *         given to the next time the read processor is called).
+ * @param cls the session
+ * @param tc task context, for when this task is invoked by the scheduler,
+ *           NULL if invoked for another reason
  */
  */
-static size_t
-incoming_stream_data_processor (void *cls,
-                       enum GNUNET_STREAM_Status status,
-                       const void *data,
-                       size_t size)
+static void
+round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 {
-  struct IncomingSocket *incoming;
-  int ret;
+  struct ConsensusSession *session;
+
+  /* don't kick off next round if we're shutting down */
+  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+
+  session = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
+
+  if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
+    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
+  }
 
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  incoming = cls;
-  ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
-  if (GNUNET_SYSERR == ret)
+  switch (session->current_round)
   {
   {
-    /* FIXME: handle this correctly */
-    GNUNET_assert (0);
+    case CONSENSUS_ROUND_BEGIN:
+      session->current_round = CONSENSUS_ROUND_EXCHANGE;
+      session->exp_round = 0;
+      subround_over (session, NULL);
+      break;
+    case CONSENSUS_ROUND_EXCHANGE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
+                  session->local_peer_idx);
+      session->current_round = CONSENSUS_ROUND_FINISH;
+      GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
+      break;
+    default:
+      GNUNET_assert (0);
   }
   }
-  /* read again */
-  incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &incoming_stream_data_processor, incoming);
-  /* we always read all data */
-  return size;
 }
 
 
 /**
 }
 
 
 /**
- * Iterator over hash map entries.
- * Queue elements to be sent to the peer in cls.
+ * Create a new permutation for the session's peers in session->shuffle.
+ * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
+ * both the global session id and the current round index.
  *
  *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param session the session to create the new permutation for
  */
  */
-static int
-send_element_iter (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
+static void
+shuffle (struct ConsensusSession *session)
 {
 {
-  struct ConsensusPeerInformation *cpi;
-  struct GNUNET_CONSENSUS_Element *element;
-  struct QueuedMessage *qm;
-  struct GNUNET_MessageHeader *element_msg;
-  size_t msize;
-  cpi = cls;
-  element = value;
-  msize = sizeof (struct GNUNET_MessageHeader) + element->size;
-  element_msg = GNUNET_malloc (msize);
-  element_msg->size = htons (msize);
-  if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round)
-    element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
-  else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
-    element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE);
-  else
-    GNUNET_assert (0);
-  GNUNET_assert (NULL != element->data);
-  memcpy (&element_msg[1], element->data, element->size);
-  qm = GNUNET_malloc (sizeof *qm);
-  qm->msg = element_msg;
-  GNUNET_CONTAINER_DLL_insert (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
-  return GNUNET_YES;
-}
+  uint32_t i;
+  uint32_t randomness[session->num_peers-1];
 
 
-/**
- * Iterator to insert values into an ibf.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-ibf_values_iterator (void *cls,
-                     const struct GNUNET_HashCode *key,
-                     void *value)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi = cls;
-  ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key));
-  return GNUNET_YES;
+  if (NULL == session->shuffle)
+    session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
+  if (NULL == session->shuffle_inv)
+    session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
+
+  GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
+                    &session->exp_round, sizeof (uint32_t),
+                     &session->global_id, sizeof (struct GNUNET_HashCode),
+                    NULL);
+
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle[i] = i;
+
+  for (i = session->num_peers - 1; i > 0; i--)
+  {
+    uint32_t x;
+    uint32_t tmp;
+    x = randomness[i-1] % session->num_peers;
+    tmp = session->shuffle[x];
+    session->shuffle[x] = session->shuffle[i];
+    session->shuffle[i] = tmp;
+  }
+
+  /* create the inverse */
+  for (i = 0; i < session->num_peers; i++)
+    session->shuffle_inv[session->shuffle[i]] = i;
 }
 
 }
 
+
 /**
 /**
- * Create and populate an IBF for the specified peer,
- * if it does not already exist.
+ * Find and set the partner_incoming and partner_outgoing of our peer,
+ * one of them may not exist (and thus set to NULL) if the number of peers
+ * in the session is not a power of two.
  *
  *
- * @param peer to create the ibf for
+ * @param session the consensus session
  */
 static void
  */
 static void
-prepare_ibf (struct ConsensusPeerInformation *cpi)
+find_partners (struct ConsensusSession *session)
 {
 {
-  if (NULL == cpi->session->ibfs[cpi->ibf_order])
+  unsigned int arc;
+  unsigned int num_ghosts;
+  unsigned int largest_arc;
+  int partner_idx;
+
+  /* shuffled local index */
+  int my_idx = session->shuffle[session->local_peer_idx];
+
+  /* distance to neighboring peer in current subround */
+  arc = 1 << session->exp_subround;
+  largest_arc = 1;
+  while (largest_arc < session->num_peers)
+    largest_arc <<= 1;
+  num_ghosts = largest_arc - session->num_peers;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
+
+  if (0 == (my_idx & arc))
   {
   {
-    cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
-    GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
+    /* we are outgoing */
+    partner_idx = (my_idx + arc) % session->num_peers;
+    session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
+    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+    /* are we a 'ghost' of a peer that would exist if
+     * the number of peers was a power of two, and thus have to partner
+     * with an additional peer?
+     */
+    if (my_idx < num_ghosts)
+    {
+      int ghost_partner_idx;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
+      ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+      /* platform dependent; modulo sometimes returns negative values */
+      if (ghost_partner_idx < 0)
+        ghost_partner_idx += session->num_peers;
+      /* we only need to have a ghost partner if the partner is outgoing */
+      if (0 == (ghost_partner_idx & arc))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
+        session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
+        session->partner_incoming->exp_subround_finished = GNUNET_NO;
+        return;
+      }
+    }
+    session->partner_incoming = NULL;
+    return;
   }
   }
+  /* we only have an incoming connection */
+  partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+  if (partner_idx < 0)
+    partner_idx += session->num_peers;
+  session->partner_outgoing = NULL;
+  session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
+  session->partner_incoming->exp_subround_finished = GNUNET_NO;
 }
 
 
 /**
 }
 
 
 /**
- * Called when a remote peer wants to inform the local peer
- * that the remote peer misses elements.
- * Elements are not reconciled.
+ * Callback for set operation results. Called for each element
+ * in the result set.
  *
  *
- * @param cpi session
- * @param msg message
+ * @param cls closure
+ * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
+ * @param status see enum GNUNET_SET_Status
  */
  */
-static int
-handle_p2p_missing_local (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
+static void
+set_result_cb (void *cls,
+               const struct GNUNET_SET_Element *element,
+               enum GNUNET_SET_Status status)
 {
 {
-  uint64_t key;
-  key = *(uint64_t *) &msg[1];
-  GNUNET_array_append (cpi->missing_remote, cpi->num_missing_remote, key);
-  return GNUNET_OK;
-}
+  struct ConsensusPeerInformation *cpi = cls;
+  unsigned int remote_idx = cpi - cpi->session->info;
+  unsigned int local_idx = cpi->session->local_peer_idx;
 
 
+  GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
+                 (cpi == cpi->session->partner_incoming));
 
 
-/**
- * Called when a remote peer wants to inform the local peer
- * that the local peer misses elements.
- * Elements are not reconciled.
- *
- * @param cpi session
- * @param msg message
- */
-static int
-handle_p2p_missing_remote (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
-{
-  uint64_t key;
-  key = *(uint64_t *) &msg[1];
-  GNUNET_array_append (cpi->missing_local, cpi->num_missing_local, key);
-  return GNUNET_OK;
-}
-
-
-/**
- * Called when a peer sends us its strata estimator.
- * In response, we sent out IBF of appropriate size back.
- *
- * @param cpi session
- * @param strata_msg message
- */
-static int
-handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
-{
-  int i;
-  unsigned int diff;
-  void *buf;
-  size_t size;
-
-  GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
-
-  if (NULL == cpi->strata)
+  switch (status)
   {
   {
-    cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
-    for (i = 0; i < STRATA_COUNT; i++)
-      cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
-  }
-
-  size = ntohs (strata_msg->header.size);
-  buf = (void *) &strata_msg[1];
-  for (i = 0; i < STRATA_COUNT; i++)
-  {
-    int res;
-    res = ibf_read (&buf, &size, cpi->strata[i]);
-    GNUNET_assert (GNUNET_OK == res);
-  }
-
-  diff = estimate_difference (cpi->session->strata, cpi->strata);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff);
-
-  if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) ||
-       (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round))
-  {
-    /* send IBF of the right size */
-    cpi->ibf_order = 0;
-    while ((1 << cpi->ibf_order) < diff)
-      cpi->ibf_order++;
-    if (cpi->ibf_order > MAX_IBF_ORDER)
-      cpi->ibf_order = MAX_IBF_ORDER;
-    cpi->ibf_order += 1;
-    /* create ibf if not already pre-computed */
-    prepare_ibf (cpi);
-    cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-    cpi->ibf_state = IBF_STATE_TRANSMITTING;
-    cpi->ibf_bucket_counter = 0;
-    write_ibf (cpi, GNUNET_STREAM_OK, 0);
-  }
-
-  return GNUNET_YES;
-}
-
-
-static int
-handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
-{
-  int num_buckets;
-  void *buf;
-
-  num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
-  switch (cpi->ibf_state)
-  {
-    case IBF_STATE_NONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", digest->order);
-      cpi->ibf_state = IBF_STATE_RECEIVING;
-      cpi->ibf_order = digest->order;
-      cpi->ibf_bucket_counter = 0;
-      if (NULL != cpi->ibf)
-      {
-        GNUNET_free (cpi->ibf);
-        cpi->ibf = NULL;
-      }
+    case GNUNET_SET_STATUS_OK:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
+                  local_idx, remote_idx);
       break;
       break;
-    case IBF_STATE_ANTICIPATE_DIFF:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order %d\n", digest->order);
-      cpi->ibf_state = IBF_STATE_RECEIVING;
-      cpi->ibf_order = digest->order;
-      cpi->ibf_bucket_counter = 0;
-      if (NULL != cpi->ibf)
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
+                  local_idx, remote_idx);
+      cpi->set_op = NULL;
+      return;
+    case GNUNET_SET_STATUS_HALF_DONE:
+    case GNUNET_SET_STATUS_DONE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
+                  local_idx, remote_idx);
+      cpi->exp_subround_finished = GNUNET_YES;
+      cpi->set_op = NULL;
+      if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
       {
       {
-        ibf_destroy (cpi->ibf);
-        cpi->ibf = NULL;
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
+                    local_idx);
+        subround_over (cpi->session, NULL);
       }
       }
-      break;
-    case IBF_STATE_RECEIVING:
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state);
-      mark_peer_bad (cpi);
-      return GNUNET_NO;
-  }
-
-  if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
-    mark_peer_bad (cpi);
-    return GNUNET_NO;
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets,
-              cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
-
-  if (NULL == cpi->ibf)
-    cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
-
-  buf = (void *) &digest[1];
-  ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
-
-  cpi->ibf_bucket_counter += num_buckets;
-
-  if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
-    cpi->ibf_state = IBF_STATE_DECODING;
-    prepare_ibf (cpi);
-    ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
-    write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0);
-  }
-  return GNUNET_YES;
-}
-
-
-static int
-handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
-{
-  struct PendingElement *pending_element;
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
-  size_t size;
-
-  size = ntohs (element_msg->size) - sizeof *element_msg;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
-
-  element = GNUNET_malloc (size + sizeof *element);
-  element->size = size;
-  memcpy (&element[1], &element_msg[1], size);
-  element->data = &element[1];
-
-  pending_element = GNUNET_malloc (sizeof *pending_element);
-  pending_element->element = element;
-  GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element);
-
-  client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
-  client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
-  client_element_msg->header.size = htons (size + sizeof *client_element_msg);
-  memcpy (&client_element_msg[1], &element[1], size);
-
-  queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
-
-  send_next (cpi->session);
-  
-  return GNUNET_YES;
-}
-
-
-/**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
- *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
- */
-static void 
-write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi = cls;
-  GNUNET_assert (NULL == cpi->wh);
-  cpi->wh = NULL;
-  if (NULL != cpi->requests_and_elements_head)
-  {
-    struct QueuedMessage *qm;
-    qm = cpi->requests_and_elements_head;
-    GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
-
-    cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
-                                   GNUNET_TIME_UNIT_FOREVER_REL,
-                                   write_requested_elements, cpi);
-    GNUNET_assert (NULL != cpi->wh);
-  }
-}
-
-
-/**
- * Handle a request for elements.
- * Only allowed in exchange-rounds.
- */
-static int
-handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
-{
-  struct GNUNET_HashCode *hashcode;
-  unsigned int num;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n");
-  num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode);
-  hashcode = (struct GNUNET_HashCode *) &msg[1];
-  while (num--)
-  {
-    GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state);
-    GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi);
-    if (NULL == cpi->wh)
-      write_requested_elements (cpi, GNUNET_STREAM_OK, 0);
-    hashcode++;
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * Handle a HELLO-message, send when another peer wants to join a session where
- * our peer is a member. The session may or may not be inhabited yet.
- */
-static int
-handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
-{
-  /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
-  struct ConsensusSession *session;
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
-    {
-      int idx;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n");
-      idx = get_peer_idx (inc->peer, session);
-      GNUNET_assert (-1 != idx);
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx);
-      inc->cpi = &session->info[idx];
-      GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
-      inc->cpi->mst = inc->mst;
-      inc->cpi->hello = GNUNET_YES;
-      inc->cpi->socket = inc->socket;
-
-      if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) &&
-           (GNUNET_YES == inc->cpi->is_outgoing))
+      else
       {
       {
-        write_strata (&session->info[idx], GNUNET_STREAM_OK, 0);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
+                    local_idx);
       }
       }
-      return GNUNET_YES;
-    }
-    session = session->next;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n");
-  GNUNET_break (0);
-  return GNUNET_NO;
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete message is received by the tokenizer.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure
- * @param client identification of the client
- * @param message the actual message
- *
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi =  cls;
-  switch (ntohs (message->type))
-  {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
-      return handle_p2p_strata (cpi, (struct StrataMessage *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
-      return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
-      return handle_p2p_element (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL:
-      return handle_p2p_missing_local (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE:
-      return handle_p2p_missing_remote (cpi, message);
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
-      return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
+      return;
     default:
     default:
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type));
-      /* FIXME: handle correctly */
-      GNUNET_assert (0);
+      GNUNET_break (0);
+      return;
   }
   }
-  return GNUNET_OK;
-}
-
 
 
-/**
- * Handle tokenized messages from stream sockets.
- * Delegate them if the socket belongs to a session,
- * handle hello messages otherwise.
- *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure, unused
- * @param client incoming socket this message comes from
- * @param message the actual message
- *
- * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
- */
-static int
-mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
-{
-  struct IncomingSocket *inc;
-  inc = (struct IncomingSocket *) client;
-  switch (ntohs( message->type))
+  switch (cpi->session->current_round)
   {
   {
-    case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
-      return handle_p2p_hello (inc, (struct ConsensusHello *) message);
+    case CONSENSUS_ROUND_EXCHANGE:
+      GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
+      break;
     default:
     default:
-      if (NULL != inc->cpi)
-        return mst_session_callback (inc->cpi, client, message);
-      /* FIXME: disconnect peer properly */
-      GNUNET_assert (0);
+      GNUNET_break (0);
+      return;
   }
   }
-  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Functions of this type are called upon new stream connection from other peers
- * or upon binding error which happen when the app_port given in
- * GNUNET_STREAM_listen() is already taken.
+ * Compare the round the session is in with the round of the given context message.
  *
  *
- * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
- * @param initiator the identity of the peer who wants to establish a stream
- *            with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- *             stream (the socket will be invalid after the call)
+ * @param session a consensus session
+ * @param ri a round context message
+ * @return 0 if it's the same round, -1 if the session is in an earlier round,
+ *         1 if the session is in a later round
  */
 static int
  */
 static int
-listen_cb (void *cls,
-           struct GNUNET_STREAM_Socket *socket,
-           const struct GNUNET_PeerIdentity *initiator)
-{
-  struct IncomingSocket *incoming;
-
-  GNUNET_assert (NULL != socket);
-
-  incoming = GNUNET_malloc (sizeof *incoming);
-
-  incoming->socket = socket;
-  incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
-
-  incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     &incoming_stream_data_processor, incoming);
-
-  incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
-
-  GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
-
-  return GNUNET_OK;
-}
-
-
-/**
- * Destroy a session, free all resources associated with it.
- * 
- * @param session the session to destroy
- */
-static void
-destroy_session (struct ConsensusSession *session)
+rounds_compare (struct ConsensusSession *session,
+                struct RoundInfo* ri)
 {
 {
-  /* FIXME: more stuff to free! */
-  GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
-  GNUNET_SERVER_client_drop (session->client);
-  GNUNET_free (session);
-}
-
-
-/**
- * Disconnect a client, and destroy all sessions associated with it.
- *
- * @param client the client to disconnect
- */
-static void
-disconnect_client (struct GNUNET_SERVER_Client *client)
-{
-  struct ConsensusSession *session;
-  GNUNET_SERVER_client_disconnect (client);
-  
-  /* if the client owns a session, remove it */
-  session = sessions_head;
-  while (NULL != session)
+  if (session->current_round < ri->round)
+    return -1;
+  if (session->current_round > ri->round)
+    return 1;
+  if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
   {
   {
-    if (client == session->client)
-    {
-      destroy_session (session);
-      break;
-    }
-    session = session->next;
+    if (session->exp_round < ri->exp_round)
+      return -1;
+    if (session->exp_round > ri->exp_round)
+      return 1;
+    if (session->exp_subround < ri->exp_subround)
+      return -1;
+    if (session->exp_subround < ri->exp_subround)
+      return 1;
+    return 0;
   }
   }
+  /* comparing rounds when we are not in a exp round */
+  GNUNET_assert (0);
 }
 
 
 /**
 }
 
 
 /**
- * Compute a global, (hopefully) unique consensus session id,
- * from the local id of the consensus session, and the identities of all participants.
- * Thus, if the local id of two consensus sessions coincide, but are not comprised of
- * exactly the same peers, the global id will be different.
+ * Do the next subround in the exp-scheme.
+ * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
  *
  *
- * @param local_id local id of the consensus session
- * @param peers array of all peers participating in the consensus session
- * @param num_peers number of elements in the peers array
- * @param dst where the result is stored, may not be NULL
+ * @param cls the session
+ * @param tc task context, for when this task is invoked by the scheduler,
+ *           NULL if invoked for another reason
  */
 static void
  */
 static void
-compute_global_id (const struct GNUNET_HashCode *local_id,
-                   const struct GNUNET_PeerIdentity *peers, int num_peers, 
-                   struct GNUNET_HashCode *dst)
-{
-  int i;
-  struct GNUNET_HashCode tmp;
-
-  *dst = *local_id;
-  for (i = 0; i < num_peers; ++i)
-  {
-    GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
-    *dst = tmp;
-    GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
-    *dst = tmp;
-  }
-}
-
-
-/**
- * Transmit a queued message to the session's client.
- *
- * @param cls consensus session
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_queued (void *cls, size_t size,
-                 void *buf)
+subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct ConsensusSession *session;
 {
   struct ConsensusSession *session;
-  struct QueuedMessage *qmsg;
-  size_t msg_size;
+  int i;
 
 
+  /* don't kick off next subround if we're shutting down */
+  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
   session = cls;
   session = cls;
-  session->th = NULL;
-
-  qmsg = session->client_messages_head;
-  GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
-  GNUNET_assert (qmsg);
-
-  if (NULL == buf)
+  /* cancel timeout */
+  if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
   {
   {
-    destroy_session (session);
-    return 0;
+    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
+    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
   }
 
   }
 
-  msg_size = ntohs (qmsg->msg->size);
-
-  GNUNET_assert (size >= msg_size);
-
-  memcpy (buf, qmsg->msg, msg_size);
-  GNUNET_free (qmsg->msg);
-  GNUNET_free (qmsg);
-
-  send_next (session);
-
-  return msg_size;
-}
-
-
-/**
- * Schedule transmitting the next queued message (if any) to a client.
- *
- * @param cli the client to send the next message to
- */
-static void
-send_next (struct ConsensusSession *session)
-{
-
-  GNUNET_assert (NULL != session);
-
-  if (NULL != session->th)
-    return;
-
-  if (NULL != session->client_messages_head)
+  if (session->exp_round >= NUM_EXP_ROUNDS)
   {
   {
-    int msize;
-    msize = ntohs (session->client_messages_head->msg->size);
-    session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 
-                                                       GNUNET_TIME_UNIT_FOREVER_REL,
-                                                       &transmit_queued, session);
-  }
-}
-
-
-/**
- * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
- * the correct signature to be used with e.g. qsort.
- * We use this function instead.
- *
- * @param h1 some hash code
- * @param h2 some hash code
- * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
- */
-static int
-hash_cmp (const void *a, const void *b)
-{
-  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
-}
-
-
-/**
- * Search peer in the list of peers in session.
- *
- * @param peer peer to find
- * @param session session with peer
- * @return index of peer, -1 if peer is not in session
- */
-static int
-get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
-{
-  const struct GNUNET_PeerIdentity *needle;
-  needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
-  if (NULL == needle)
-    return -1;
-  return needle - session->peers;
-}
-
-
-/**
- * Called when stream has finishes writing the hello message
- */
-static void
-hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct ConsensusPeerInformation *cpi;
-
-  cpi = cls;
-  cpi->wh = NULL;
-  cpi->hello = GNUNET_YES;
-  
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-
-  /* FIXME: other rounds */
-
-  if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) &&
-       (GNUNET_YES == cpi->is_outgoing))
-  {
-    write_strata (cpi, GNUNET_STREAM_OK, 0);
+    round_over (session, NULL);
+    return;
   }
   }
-}
 
 
-
-/**
- * Called when we established a stream connection to another peer
- *
- * @param cls cpi of the peer we just connected to
- * @param socket socket to use to communicate with the other side (read/write)
- */
-static void
-open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
-{
-  struct ConsensusPeerInformation *cpi;
-  struct ConsensusHello *hello;
-
-  cpi = cls;
-  cpi->is_connected = GNUNET_YES;
-  cpi->wh = NULL;
-
-  hello = GNUNET_malloc (sizeof *hello);
-  hello->header.size = htons (sizeof *hello);
-  hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
-  memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
-
-  cpi->wh =
-      GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
-
-  cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
-                                &session_stream_data_processor, cpi);
-}
-
-
-static void
-initialize_session_info (struct ConsensusSession *session)
-{
-  int i;
-  int last;
-
-  for (i = 0; i < session->num_peers; ++i)
+  if (session->exp_round == 0)
   {
   {
-    /* initialize back-references, so consensus peer information can
-     * be used as closure */
-    session->info[i].session = session;
+    /* initialize everything for the log-rounds */
+    session->exp_round = 1;
+    session->exp_subround = 0;
+    if (NULL == session->shuffle)
+      session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
+    if (NULL == session->shuffle_inv)
+      session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
+    for (i = 0; i < session->num_peers; i++)
+      session->shuffle[i] = session->shuffle_inv[i] = i;
   }
   }
-
-  session->current_round = CONSENSUS_ROUND_BEGIN;
-
-  last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
-  i = (session->local_peer_idx + 1) % session->num_peers;
-  while (i != last)
+  else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
   {
   {
-    session->info[i].is_outgoing = GNUNET_YES;
-    session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                                  open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
-    session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
-    i = (i + 1) % session->num_peers;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
+    /* subrounds done, start new log-round */
+    session->exp_round++;
+    session->exp_subround = 0;
+    shuffle (session);
   }
   }
-  // tie-breaker for even number of peers
-  if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
+  else
   {
   {
-    session->info[last].is_outgoing = GNUNET_YES;
-    session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                                     open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
-    session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
-
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
+    session->exp_subround++;
   }
   }
-}
-
-
-/**
- * Create the sorted list of peers for the session,
- * add the local peer if not in the join message.
- */
-static void
-initialize_session_peer_list (struct ConsensusSession *session)
-{
-  int local_peer_in_list;
-  int listed_peers;
-  const struct GNUNET_PeerIdentity *msg_peers;
-  unsigned int i;
 
 
-  GNUNET_assert (NULL != session->join_msg);
+  /* determine the incoming and outgoing partner */
+  find_partners (session);
 
 
-  /* peers in the join message, may or may not include the local peer */
-  listed_peers = ntohs (session->join_msg->num_peers);
-  
-  session->num_peers = listed_peers;
+  GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
+  GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
 
 
-  msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
-
-  local_peer_in_list = GNUNET_NO;
-  for (i = 0; i < listed_peers; i++)
+  /* initiate set operation with the outgoing partner */
+  if (NULL != session->partner_outgoing)
   {
   {
-    if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
+    struct GNUNET_CONSENSUS_RoundContextMessage *msg;
+    msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
+    msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
+    msg->header.size = htons (sizeof *msg);
+    msg->round = htonl (session->current_round);
+    msg->exp_round = htonl (session->exp_round);
+    msg->exp_subround = htonl (session->exp_subround);
+
+    if (NULL != session->partner_outgoing->set_op)
     {
     {
-      local_peer_in_list = GNUNET_YES;
-      break;
+      GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
     }
     }
+    session->partner_outgoing->set_op =
+        GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
+                            &session->global_id,
+                            (struct GNUNET_MessageHeader *) msg,
+                            0, /* FIXME: salt */
+                            GNUNET_SET_RESULT_ADDED,
+                            set_result_cb, session->partner_outgoing);
+    GNUNET_free (msg);
+    GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
   }
 
   }
 
-  if (GNUNET_NO == local_peer_in_list)
-    session->num_peers++;
-
-  session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
-
-  if (GNUNET_NO == local_peer_in_list)
-    session->peers[session->num_peers - 1] = *my_peer;
-
-  memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
-  qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
-}
-
-
-static void
-strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
-{
-  uint32_t v;
-  int i;
-  v = key->bits[0];
-  /* count trailing '1'-bits of v */
-  for (i = 0; v & 1; v>>=1, i++)
-    /* empty */;
-  ibf_insert (strata[i], ibf_key_from_hashcode (key));
-}
-
-
-/**
- * Add incoming peer connections to the session,
- * for peers who have connected to us before the local session has been established
- *
- * @param session ...
- */
-static void
-add_incoming_peers (struct ConsensusSession *session)
-{
-  struct IncomingSocket *inc;
-  inc = incoming_sockets_head;
-
-  while (NULL != inc)
+  /* commit to the delayed set operation */
+  if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
   {
   {
-    if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid))
+    int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
+
+    if (NULL != session->partner_incoming->set_op)
     {
     {
-      int i;
-      for (i = 0; i < session->num_peers; i++)
-      {
-        struct ConsensusPeerInformation *cpi;
-        cpi = &session->info[i];
-        if (0 == memcmp (inc->peer, &cpi->session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
-        {
-          if (GNUNET_YES == cpi->is_outgoing)
-          {
-            /* FIXME: disconnect */
-            continue;
-          }
-          cpi->socket = inc->socket;
-          inc->cpi = cpi;
-          inc->cpi->mst = inc->mst;
-          inc->cpi->hello = GNUNET_YES;
-          break;
-        }
-      }
+      GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
+      session->partner_incoming->set_op = NULL;
     }
     }
-    inc = inc->next;
-  }
-}
-
-
-/**
- * Initialize the session, continue receiving messages from the owning client
- *
- * @param session the session to initialize
- */
-static void
-initialize_session (struct ConsensusSession *session)
-{
-  const struct ConsensusSession *other_session;
-  int i;
-
-  GNUNET_assert (NULL != session->join_msg);
-
-  initialize_session_peer_list (session);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
-
-  compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
-
-  /* Check if some local client already owns the session. */
-  other_session = sessions_head;
-  while (NULL != other_session)
-  {
-    if ((other_session != session) && 
-        (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
+    if (cmp == 0)
     {
     {
-      /* session already owned by another client */
-      GNUNET_break (0);
-      disconnect_client (session->client);
-      return;
+      GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
+      session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
+      session->partner_incoming->delayed_set_op = NULL;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
+                  session->local_peer_idx, (int) (session->partner_incoming - session->info));
     }
     }
-    other_session = other_session->next;
-  }
-
-  session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
-
-  session->local_peer_idx = get_peer_idx (my_peer, session);
-  GNUNET_assert (-1 != session->local_peer_idx);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
-
-  session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
-  for (i = 0; i < STRATA_COUNT; i++)
-    session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
-
-  session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
-
-  session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
-  initialize_session_info (session);
-
-  GNUNET_free (session->join_msg);
-  session->join_msg = NULL;
-
-  add_incoming_peers (session);
-
-  GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
-}
-
-
-/**
- * Called when a client wants to join a consensus session.
- *
- * @param cls unused
- * @param client client that sent the message
- * @param m message sent by the client
- */
-static void
-client_join (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *m)
-{
-  struct ConsensusSession *session;
-
-  // make sure the client has not already joined a session
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
+    else
     {
     {
-      GNUNET_break (0);
-      disconnect_client (client);
-      return;
+      /* this should not happen -- a round has been skipped! */
+      GNUNET_break_op (0);
     }
     }
-    session = session->next;
   }
 
   }
 
-  session = GNUNET_malloc (sizeof (struct ConsensusSession));
-  session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
-  session->client = client;
-  GNUNET_SERVER_client_keep (client);
-
-  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
-
-  // Initialize session later if local peer identity is not known yet.
-  if (NULL == my_peer)
+#ifdef GNUNET_EXTRA_LOGGING
   {
   {
-    GNUNET_SERVER_disable_receive_done_warning (client);
-    return;
+    int in;
+    int out;
+    if (session->partner_outgoing == NULL)
+      out = -1;
+    else
+      out = (int) (session->partner_outgoing - session->info);
+    if (session->partner_incoming == NULL)
+      in = -1;
+    else
+      in = (int) (session->partner_incoming - session->info);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
+                session->exp_round, session->exp_subround, in, out);
   }
   }
+#endif /* GNUNET_EXTRA_LOGGING */
 
 
-  initialize_session (session);
 }
 
 
 /**
 }
 
 
 /**
- * Hash a block of data, producing a replicated ibf hash.
+ * Search peer in the list of peers in session.
+ *
+ * @param peer peer to find
+ * @param session session with peer
+ * @return index of peer, -1 if peer is not in session
  */
  */
-static void
-hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
+static int
+get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
 {
 {
-  struct IBF_Key ibf_key;
-  GNUNET_CRYPTO_hash (block, size, ret);
-  ibf_key = ibf_key_from_hashcode (ret);
-  ibf_hashcode_from_key (ibf_key, ret);
+  int i;
+  for (i = 0; i < session->num_peers; i++)
+    if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
+      return i;
+  return -1;
 }
 
 
 /**
 }
 
 
 /**
- * Called when a client performs an insert operation.
+ * Compute a global, (hopefully) unique consensus session id,
+ * from the local id of the consensus session, and the identities of all participants.
+ * Thus, if the local id of two consensus sessions coincide, but are not comprised of
+ * exactly the same peers, the global id will be different.
  *
  *
- * @param cls (unused)
- * @param client client handle
- * @param message message sent by the client
+ * @param session session to generate the global id for
+ * @param session_id local id of the consensus session
  */
  */
-void
-client_insert (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *m)
+static void
+compute_global_id (struct ConsensusSession *session,
+                  const struct GNUNET_HashCode *session_id)
 {
 {
-  struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_ElementMessage *msg;
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_HashCode hash;
-  int element_size;
+  int i;
+  struct GNUNET_HashCode tmp;
+  struct GNUNET_HashCode phash;
 
 
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-      break;
-  }
+  /* FIXME: use kdf? */
 
 
-  if (NULL == session)
+  session->global_id = *session_id;
+  for (i = 0; i < session->num_peers; ++i)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
-    GNUNET_SERVER_client_disconnect (client);
-    return;
+    GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
+    GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
+    session->global_id = tmp;
+    GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
+    session->global_id = tmp;
   }
   }
-
-  msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
-  element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
-
-  element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
-
-  element->type = msg->element_type;
-  element->size = element_size;
-  memcpy (&element[1], &msg[1], element_size);
-  element->data = &element[1];
-
-  GNUNET_assert (NULL != element->data);
-
-  hash_for_ibf (element, element_size, &hash);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting with hash_for_ibf %s\n", GNUNET_h2s (&hash));
-
-  GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  strata_insert (session->strata, &hash);
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
-  send_next (session);
-}
-
-
-/**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
- *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
- */
-static void 
-write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct ConsensusPeerInformation *cpi;
-  cpi = cls;
-  cpi->wh = NULL;
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  /* just wait for the ibf */
 }
 
 
 /**
 }
 
 
 /**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
+ * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
+ * the correct signature to be used with e.g. qsort.
+ * We use this function instead.
  *
  *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
+ * @param h1 some hash code
+ * @param h2 some hash code
+ * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
  */
  */
-static void 
-write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
-  struct ConsensusPeerInformation *cpi;
-  struct StrataMessage *strata_msg;
-  void *buf;
-  size_t msize;
-  int i;
-
-  cpi = cls;
-  cpi->wh = NULL;
-
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-
-  GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
-
-  /* FIXME: handle this */
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-
-  msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
-
-  strata_msg = GNUNET_malloc (msize);
-  strata_msg->header.size = htons (msize);
-  strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
-  
-  buf = &strata_msg[1];
-  for (i = 0; i < STRATA_COUNT; i++)
-  {
-    ibf_write (cpi->session->strata[i], &buf, NULL);
-  }
-
-  cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
-                                 write_strata_done, cpi);
-
-  GNUNET_assert (NULL != cpi->wh);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
-}
-
-
-static void 
-write_ibf_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+static int
+hash_cmp (const void *h1, const void *h2)
 {
 {
-  struct ConsensusPeerInformation *cpi;
-  cpi = cls;
-  cpi->wh = NULL;
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "write ibf done callback\n");
+  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
 }
 
 
 /**
 }
 
 
 /**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
- *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
+ * Create the sorted list of peers for the session,
+ * add the local peer if not in the join message.
  */
  */
-static void 
-write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+static void
+initialize_session_peer_list (struct ConsensusSession *session,
+                              struct GNUNET_CONSENSUS_JoinMessage *join_msg)
 {
 {
-  struct ConsensusPeerInformation *cpi;
-  struct DifferenceDigest *digest;
-  int msize;
-  int num_buckets;
-  void *buf;
+  unsigned int local_peer_in_list;
+  uint32_t listed_peers;
+  const struct GNUNET_PeerIdentity *msg_peers;
+  struct GNUNET_PeerIdentity *peers;
+  unsigned int i;
 
 
-  cpi = cls;
-  cpi->wh = NULL;
+  GNUNET_assert (NULL != join_msg);
 
 
-  GNUNET_assert (GNUNET_STREAM_OK == status);
-  GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
+  /* peers in the join message, may or may not include the local peer */
+  listed_peers = ntohl (join_msg->num_peers);
 
 
-  /* we should not be done here! */
-  GNUNET_assert (cpi->ibf_bucket_counter != (1 << cpi->ibf_order));
+  session->num_peers = listed_peers;
 
 
-  /* remaining buckets */
-  num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
+  msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
 
 
-  /* limit to maximum */
-  if (num_buckets > BUCKETS_PER_MESSAGE)
-    num_buckets = BUCKETS_PER_MESSAGE;
+  local_peer_in_list = GNUNET_NO;
+  for (i = 0; i < listed_peers; i++)
+  {
+    if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
+    {
+      local_peer_in_list = GNUNET_YES;
+      break;
+    }
+  }
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order));
+  if (GNUNET_NO == local_peer_in_list)
+    session->num_peers++;
 
 
-  msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
+  peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
 
 
-  digest = GNUNET_malloc (msize);
-  digest->header.size = htons (msize);
-  digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
-  digest->order = cpi->ibf_order;
+  if (GNUNET_NO == local_peer_in_list)
+    peers[session->num_peers - 1] = my_peer;
 
 
-  buf = &digest[1];
-  ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
+  memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
+  qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
 
 
-  cpi->ibf_bucket_counter += num_buckets;
+  session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
 
 
-  /* we have to set the new state here, because of non-deterministic schedulung */
-  if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
-    /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
-    cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
-    cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf_done, cpi);
-  }
-  else
+  for (i = 0; i < session->num_peers; ++i)
   {
   {
-    cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf, cpi);
+    /* initialize back-references, so consensus peer information can
+     * be used as closure */
+    session->info[i].session = session;
+    session->info[i].peer_id = peers[i];
   }
 
   }
 
-  GNUNET_assert (NULL != cpi->wh);
+  GNUNET_free (peers);
 }
 
 
 /**
 }
 
 
 /**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
+ * Called when another peer wants to do a set operation with the
+ * local peer.
  *
  *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_STREAM_OK if writing to stream was completed successfully;
- *          GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- *          (this doesn't mean that the data is never sent, the receiver may
- *          have read the data but its ACKs may have been lost);
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- *          be processed.
- * @param size the number of bytes written
+ * @param cls closure
+ * @param other_peer the other peer
+ * @param context_msg message with application specific information from
+ *        the other peer
+ * @param request request from the other peer, use GNUNET_SET_accept
+ *        to accept it, otherwise the request will be refused
+ *        Note that we don't use a return value here, as it is also
+ *        necessary to specify the set we want to do the operation with,
+ *        whith sometimes can be derived from the context message.
+ *        Also necessary to specify the timeout.
  */
  */
-static void 
-write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+static void
+set_listen_cb (void *cls,
+               const struct GNUNET_PeerIdentity *other_peer,
+               const struct GNUNET_MessageHeader *context_msg,
+               struct GNUNET_SET_Request *request)
 {
 {
+  struct ConsensusSession *session = cls;
+  struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
   struct ConsensusPeerInformation *cpi;
   struct ConsensusPeerInformation *cpi;
-  struct IBF_Key key;
-  struct GNUNET_HashCode hashcode;
-  int side;
-
-  GNUNET_assert (GNUNET_STREAM_OK == status);
+  struct GNUNET_SET_OperationHandle *set_op;
+  struct RoundInfo round_info;
+  int index;
+  int cmp;
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
+  if (NULL == context_msg)
+  {
+    GNUNET_break_op (0);
+    return;
+  }
 
 
-  cpi = cls;
-  GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state);
-  cpi->wh = NULL;
+  index = get_peer_idx (other_peer, session);
 
 
-  if (NULL != cpi->requests_and_elements_head)
+  if (index < 0)
   {
   {
-    struct QueuedMessage *qm;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending queued element\n");
-    qm = cpi->requests_and_elements_head;
-    GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
-
-    cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
-                                   GNUNET_TIME_UNIT_FOREVER_REL,
-                                   write_requests_and_elements, cpi);
-    GNUNET_assert (NULL != cpi->wh);
-    /* some elements / requests have queued up, we have to transmit them first */
+    GNUNET_break_op (0);
     return;
   }
 
     return;
   }
 
-  for (;;)
+  round_info.round = ntohl (msg->round);
+  round_info.exp_round = ntohl (msg->exp_round);
+  round_info.exp_subround = ntohl (msg->exp_subround);
+
+  cpi = &session->info[index];
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
+
+  switch (session->current_round)
   {
   {
-    int res;
-    res = ibf_decode (cpi->ibf, &side, &key);
-    if (GNUNET_SYSERR == res)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
-      /* decoding failed, we tell the other peer by sending our ibf with a larger order */
-      cpi->ibf_order++;
-      prepare_ibf (cpi);
-      cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-      cpi->ibf_state = IBF_STATE_TRANSMITTING;
-      cpi->ibf_bucket_counter = 0;
-      write_ibf (cls, status, size);
-      return;
-    }
-    if (GNUNET_NO == res)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
-      return;
-    }
-    if (-1 == side)
-    {
-      /* we have the element, send it to the other peer */
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element\n");
-      ibf_hashcode_from_key (key, &hashcode);
-      GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi);
-      /* send the first message, because we can! */
-      if (NULL != cpi->requests_and_elements_head)
-      {
-        struct QueuedMessage *qm;
-        qm = cpi->requests_and_elements_head;
-        GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing element\n");
-        cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
-                                       GNUNET_TIME_UNIT_FOREVER_REL,
-                                       write_requests_and_elements, cpi);
-        GNUNET_assert (NULL != cpi->wh);
-      }
-      else
+    case CONSENSUS_ROUND_BEGIN:
+      /* we're in the begin round, so requests for the exchange round may
+       * come in, they will be delayed for now! */
+    case CONSENSUS_ROUND_EXCHANGE:
+      cmp = rounds_compare (session, &round_info);
+      if (cmp > 0)
       {
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "no element found for decoded hash %s\n", GNUNET_h2s (&hashcode));
+        /* the other peer is too late */
+        GNUNET_break_op (0);
+        return;
       }
       }
-      return;
-    }
-    else
-    {
-      struct ElementRequest *msg;
-      size_t msize;
-      struct IBF_Key *p;
-
-      msize = (sizeof *msg) + sizeof (uint64_t);
-      msg = GNUNET_malloc (msize);
-      if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round)
+      /* kill old request, if any. this is legal,
+       * as the other peer would not make a new request if it would want to
+       * complete the old one! */
+      if (NULL != cpi->set_op)
       {
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request for element\n");
-        msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
       }
       }
-      else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
+      set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+                                       set_result_cb, &session->info[index]);
+      if (cmp == 0)
       {
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending locally missing element\n");
-        msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL);
+        cpi->set_op = set_op;
+        GNUNET_SET_commit (set_op, session->element_set);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
       }
       else
       {
       }
       else
       {
-        GNUNET_assert (0);
+        /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
+        cpi->delayed_set_op = set_op;
+        cpi->delayed_round_info = round_info;
+        cpi->exp_subround_finished = GNUNET_YES;
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
       }
       }
-      msg->header.size = htons (msize);
-      p = (struct IBF_Key *) &msg[1];
-      *p = key;
-
-      cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
-                                     write_requests_and_elements, cpi);
-      GNUNET_assert (NULL != cpi->wh);
-      GNUNET_free (msg);
+      break;
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
+                  session->local_peer_idx, session->current_round, index);
+      GNUNET_break_op (0);
       return;
       return;
-    }
   }
 }
 
 
   }
 }
 
 
-static double
-compute_similarity (struct ConsensusSession *session, int p1, int p2)
-{
-  /* FIXME: simplistic dummy implementation, use real set union/intersecion */
-  return (session->info[p1].num_missing_local + session->info[p2].num_missing_local) /
-      ((double) (session->info[p1].num_missing_remote + session->info[p2].num_missing_remote + 1));
-}
-
-
+/**
+ * Initialize the session, continue receiving messages from the owning client
+ *
+ * @param session the session to initialize
+ * @param join_msg the join message from the client
+ */
 static void
 static void
-select_fittest_group (struct ConsensusSession *session)
+initialize_session (struct ConsensusSession *session,
+                    struct GNUNET_CONSENSUS_JoinMessage *join_msg)
 {
 {
-  /* simplistic implementation: compute the similarity with the latest strata estimator,
-   * rank the results once */
-  struct GNUNET_PeerIdentity *group;
-  double rating[session->num_peers];
-  struct GNUNET_CONSENSUS_ConcludeDoneMessage *done_msg;
-  size_t msize;
-  int i;
-  int j;
-  /* number of peers in the consensus group */
-  int k;
+  struct ConsensusSession *other_session;
 
 
-  k = ceil(session->num_peers / 3.0) * 2;
-  group = GNUNET_malloc (k * sizeof *group);
+  initialize_session_peer_list (session, join_msg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
+  compute_global_id (session, &join_msg->session_id);
 
 
-  /* do strata subtraction */
-  /* FIXME: we know the real sets, subtract them! */
-  for (i = 0; i < session->num_peers; i++)
+  /* check if some local client already owns the session.
+   * it is only legal to have a session with an existing global id
+   * if all other sessions with this global id are finished.*/
+  other_session = sessions_head;
+  while (NULL != other_session)
   {
   {
-    rating[i] = 0;
-    for (j = 0; j < i; j++)
+    if ((other_session != session) &&
+        (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
     {
     {
-      double sim;
-      sim = compute_similarity (session, i, j);
-      rating[i] += sim;
-      rating[j] += sim;
+      if (CONSENSUS_ROUND_FINISH != other_session->current_round)
+      {
+        GNUNET_break (0);
+        destroy_session (session);
+        return;
+      }
+      break;
     }
     }
-  }
-  for (i = 0; i < k; i++)
-  {
-    int best_idx = 0;
-    for (j = 1; j < session->num_peers; j++)
-      if (rating[j] > rating[best_idx])
-        best_idx = j;
-    rating[best_idx] = -1;
-    group[i] = session->peers[best_idx];
+    other_session = other_session->next;
   }
 
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got group!\n");
+  session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
+  session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
 
 
-  msize = sizeof *done_msg + k * sizeof *group;
+  session->local_peer_idx = get_peer_idx (&my_peer, session);
+  GNUNET_assert (-1 != session->local_peer_idx);
+  session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  GNUNET_assert (NULL != session->element_set);
+  session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
+                                             &session->global_id,
+                                             set_listen_cb, session);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
+}
 
 
-  done_msg = GNUNET_malloc (msize);
-  done_msg->header.size = htons (msize);
-  done_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
-  memcpy (&done_msg[1], group, k * sizeof *group);
 
 
-  queue_client_message (session, (struct GNUNET_MessageHeader *) done_msg);
-  send_next (session);
+static struct ConsensusSession *
+get_session_by_client (struct GNUNET_SERVER_Client *client)
+{
+  struct ConsensusSession *session;
+
+  session = sessions_head;
+  while (NULL != session)
+  {
+    if (session->client == client)
+      return session;
+    session = session->next;
+  }
+  return NULL;
 }
 
 
 /**
 }
 
 
 /**
- * Select and kick off the next round, based on the current round.
- * @param cls the session
- * @param tc task context, for when this task is invoked by the scheduler,
- *           NULL if invoked for another reason
+ * Called when a client wants to join a consensus session.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
  */
  */
-static void 
-round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static void
+client_join (void *cls,
+             struct GNUNET_SERVER_Client *client,
+             const struct GNUNET_MessageHeader *m)
 {
   struct ConsensusSession *session;
 {
   struct ConsensusSession *session;
-  int i;
-
-  /* don't kick off next round if we're shutting down */
-  if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
-  session = cls;
 
 
-  if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
-  {
-    GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
-    session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
 
 
-  for (i = 0; i < session->num_peers; i++)
+  session = get_session_by_client (client);
+  if (NULL != session)
   {
   {
-    if ((NULL != session->info) && (NULL != session->info[i].wh))
-      GNUNET_STREAM_write_cancel (session->info[i].wh);
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
   }
   }
+  session = GNUNET_new (struct ConsensusSession);
+  session->client = client;
+  session->client_mq = GNUNET_MQ_queue_for_server_client (client);
+  GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
+  initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
 
-  switch (session->current_round)
-  {
-    case CONSENSUS_ROUND_BEGIN:
-    {
-      session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
-      session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
-                                                                 round_over, session);
-      for (i = 0; i < session->num_peers; i++)
-      {
-        /* we can only talk to hello'ed peers */
-        if ( (GNUNET_YES == session->info[i].is_outgoing) &&
-             (GNUNET_YES == session->info[i].hello) )
-        {
-          /* kick off transmitting strata by calling the write continuation */
-          write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
-        }
-      }
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude started, timeout=%llu\n", session->conclude_timeout.rel_value);
-      break;
-    }
-    case CONSENSUS_ROUND_A2A_EXCHANGE:
-    {
-      session->current_round = CONSENSUS_ROUND_A2A_INVENTORY;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting inventory round\n");
-      session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
-                                                                 round_over, session);
-      for (i = 0; i < session->num_peers; i++)
-      {
-        session->info[i].ibf_state = IBF_STATE_NONE;
-        if ( (GNUNET_YES == session->info[i].is_outgoing) &&
-             (GNUNET_YES == session->info[i].hello) )
-        {
-          /* kick off transmitting strata by calling the write continuation */
-          write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
-        }
-      }
-      break;
-    }
-    case CONSENSUS_ROUND_A2A_INVENTORY:
-      /* finally, we are done and select the most fitting group */
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "protocol rounds done\n");
-      session->current_round = CONSENSUS_ROUND_FINISH;
-      select_fittest_group (session);
-      break;
-    default:
-      GNUNET_assert (0);
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
 }
 
 
 /**
 }
 
 
 /**
- * Called when a client performs the conclude operation.
+ * Called when a client performs an insert operation.
  *
  * @param cls (unused)
  * @param client client handle
  *
  * @param cls (unused)
  * @param client client handle
- * @param message message sent by the client
+ * @param m message sent by the client
  */
  */
-static void
-client_conclude (void *cls,
-                 struct GNUNET_SERVER_Client *client,
-                 const struct GNUNET_MessageHeader *message)
+void
+client_insert (void *cls,
+               struct GNUNET_SERVER_Client *client,
+               const struct GNUNET_MessageHeader *m)
 {
   struct ConsensusSession *session;
 {
   struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
+  struct GNUNET_CONSENSUS_ElementMessage *msg;
+  struct GNUNET_SET_Element *element;
+  ssize_t element_size;
 
 
-  cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
+  session = get_session_by_client (client);
 
 
-  session = sessions_head;
-  while ((session != NULL) && (session->client != client))
-    session = session->next;
   if (NULL == session)
   {
   if (NULL == session)
   {
-    /* client not found */
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
@@ -2137,116 +1137,74 @@ client_conclude (void *cls,
 
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
 
   if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
-    /* client requested conclude twice */
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "unexpected round at conclude: %d\n", session->current_round);
     GNUNET_break (0);
     GNUNET_break (0);
-    disconnect_client (client);
+    GNUNET_SERVER_client_disconnect (client);
     return;
   }
 
     return;
   }
 
-  session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
-
-  /* the 'begin' round is over, start with the next, real round */
-  round_over (session, NULL);
+  msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
+  element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
+  if (element_size < 0)
+  {
+    GNUNET_break (0);
+    return;
+  }
 
 
+  element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
+  element->type = msg->element_type;
+  element->size = element_size;
+  memcpy (&element[1], &msg[1], element_size);
+  element->data = &element[1];
+  GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
+  GNUNET_free (element);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
-  send_next (session);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
 }
 
 
 /**
 }
 
 
 /**
- * Called when a client sends an ack
+ * Called when a client performs the conclude operation.
  *
  * @param cls (unused)
  * @param client client handle
  * @param message message sent by the client
  */
  *
  * @param cls (unused)
  * @param client client handle
  * @param message message sent by the client
  */
-void
-client_ack (void *cls,
-             struct GNUNET_SERVER_Client *client,
-             const struct GNUNET_MessageHeader *message)
+static void
+client_conclude (void *cls,
+                 struct GNUNET_SERVER_Client *client,
+                 const struct GNUNET_MessageHeader *message)
 {
   struct ConsensusSession *session;
 {
   struct ConsensusSession *session;
-  struct GNUNET_CONSENSUS_AckMessage *msg;
-  struct PendingElement *pending;
-  struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_HashCode key;
-
-  session = sessions_head;
-  while (NULL != session)
-  {
-    if (session->client == client)
-      break;
-  }
 
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
+  session = get_session_by_client (client);
   if (NULL == session)
   {
   if (NULL == session)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
+    /* client not found */
+    GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-
-  pending = session->approval_pending_head;
-
-  GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending);
-
-  msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
-
-  if (msg->keep)
+  if (CONSENSUS_ROUND_BEGIN != session->current_round)
   {
   {
-    int i;
-    element = pending->element;
-    hash_for_ibf (element, element->size, &key);
-
-    GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-    strata_insert (session->strata, &key);
-    
-    for (i = 0; i <= MAX_IBF_ORDER; i++)
-    {
-      if (NULL != session->ibfs[i])
-        ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key));
-    }
+    /* client requested conclude twice */
+    GNUNET_break (0);
+    return;
   }
   }
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-/**
- * Task that disconnects from core.
- *
- * @param cls core handle
- * @param tc context information (why was this task triggered now)
- */
-static void
-disconnect_core (void *cls,
-                 const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  GNUNET_CORE_disconnect (core);
-  core = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
-}
-
-
-static void
-core_startup (void *cls,
-              struct GNUNET_CORE_Handle *core,
-              const struct GNUNET_PeerIdentity *peer)
-{
-  struct ConsensusSession *session;
-
-  my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
-  /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
-  GNUNET_SCHEDULER_add_now (&disconnect_core, core);
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
-
-  session = sessions_head;
-  while (NULL != session)
+  if (session->num_peers <= 1)
   {
   {
-    if (NULL != session->join_msg)
-      initialize_session (session);
-    session = session->next;
+    session->current_round = CONSENSUS_ROUND_FINISH;
+    GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
+  }
+  else
+  {
+    /* the 'begin' round is over, start with the next, actual round */
+    round_over (session, NULL);
   }
   }
+
+  GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 }
 
 
@@ -2260,58 +1218,33 @@ static void
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  /* FIXME: complete; write separate destructors for different data types */
-
-  while (NULL != incoming_sockets_head)
-  {
-    struct IncomingSocket *socket;
-    socket = incoming_sockets_head;
-    if (NULL == socket->cpi)
-    {
-      GNUNET_STREAM_close (socket->socket);
-    }
-    incoming_sockets_head = incoming_sockets_head->next;
-    GNUNET_free (socket);
-  }
-
   while (NULL != sessions_head)
   while (NULL != sessions_head)
-  {
-    struct ConsensusSession *session;
-    int i;
-
-    session = sessions_head;
-
-    if (NULL != session->info)
-      for (i = 0; i < session->num_peers; i++)
-      {
-        struct ConsensusPeerInformation *cpi;
-        cpi = &session->info[i];
-        if ((NULL != cpi) && (NULL != cpi->socket))
-        {
-          GNUNET_STREAM_close (cpi->socket);
-        }
-      }
-
-    if (NULL != session->client)
-      GNUNET_SERVER_client_disconnect (session->client);
+    destroy_session (sessions_head);
 
 
-    sessions_head = sessions_head->next;
-    GNUNET_free (session);
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
+}
 
 
-  if (NULL != core)
-  {
-    GNUNET_CORE_disconnect (core);
-    core = NULL;
-  }
 
 
-  if (NULL != listener)
-  {
-    GNUNET_STREAM_listen_close (listener);
-    listener = NULL;
-  } 
+/**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+  struct ConsensusSession *session;
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+  session = get_session_by_client (client);
+  if (NULL == session)
+    return;
+  if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
+      (CONSENSUS_ROUND_FINISH == session->current_round))
+    destroy_session (session);
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
 }
 
 
 }
 
 
@@ -2323,38 +1256,30 @@ shutdown_task (void *cls,
  * @param c configuration to use
  */
 static void
  * @param c configuration to use
  */
 static void
-run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+     const struct GNUNET_CONFIGURATION_Handle *c)
 {
 {
-  /* core is only used to retrieve the peer identity */
-  static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
-    {NULL, 0, 0}
-  };
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
-    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
     {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
-        sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
-    {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
-        sizeof (struct GNUNET_CONSENSUS_AckMessage)},
+        sizeof (struct GNUNET_MessageHeader)},
+    {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+    {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
     {NULL, NULL, 0, 0}
   };
 
   cfg = c;
   srv = server;
     {NULL, NULL, 0, 0}
   };
 
   cfg = c;
   srv = server;
-
+  if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
   GNUNET_SERVER_add_handlers (server, server_handlers);
   GNUNET_SERVER_add_handlers (server, server_handlers);
-
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
-
-  listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
-                                   listen_cb, NULL,
-                                   GNUNET_STREAM_OPTION_END);
-
-  /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
-  core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
-  GNUNET_assert (NULL != core);
-
-  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
+  GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
 }
 
 
 }
 
 
@@ -2370,7 +1295,7 @@ main (int argc, char *const *argv)
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 
   return (GNUNET_OK == ret) ? 0 : 1;
 }