-static int
-hash_cmp (const void *a, const void *b)
-{
- return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b);
-}
-
-
-/**
- * Search peer in the list of peers in session.
- *
- * @param peer peer to find
- * @param session session with peer
- * @return index of peer, -1 if peer is not in session
- */
-static int
-get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
-{
- const struct GNUNET_PeerIdentity *needle;
- needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
- if (NULL == needle)
- return -1;
- return needle - session->peers;
-}
-
-
-
-/**
- * Called when stream has finishes writing the hello message
- */
-static void
-hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
-{
- struct ConsensusPeerInformation *cpi;
-
- cpi = cls;
- cpi->hello = GNUNET_YES;
-
- GNUNET_assert (GNUNET_STREAM_OK == status);
-
- if (cpi->session->conclude_requested)
- {
- write_strata (cpi, GNUNET_STREAM_OK, 0);
- }
-}
-
-
-/**
- * Functions of this type will be called when a stream is established
- *
- * @param cls the closure from GNUNET_STREAM_open
- * @param socket socket to use to communicate with the other side (read/write)
- */
-static void
-open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
-{
- struct ConsensusPeerInformation *cpi;
- struct ConsensusHello *hello;
-
-
- cpi = cls;
- cpi->is_connected = GNUNET_YES;
-
- hello = GNUNET_malloc (sizeof *hello);
- hello->header.size = htons (sizeof *hello);
- hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
- memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
-
- cpi->wh =
- GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
-
- cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
- &session_stream_data_processor, cpi);
-
-}
-
-
-static void
-initialize_session_info (struct ConsensusSession *session)
-{
- int i;
- int last;
-
- for (i = 0; i < session->num_peers; ++i)
- {
- /* initialize back-references, so consensus peer information can
- * be used as closure */
- session->info[i].session = session;
- }
-
- session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
-
- last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
- i = (session->local_peer_idx + 1) % session->num_peers;
- while (i != last)
- {
- session->info[i].is_outgoing = GNUNET_YES;
- session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
- open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
- session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
- i = (i + 1) % session->num_peers;
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
- }
- // tie-breaker for even number of peers
- if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
- {
- session->info[last].is_outgoing = GNUNET_YES;
- session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
- open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
- session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
- }
-}
-
-
-/**
- * Create the sorted list of peers for the session,
- * add the local peer if not in the join message.
- */
-static void
-initialize_session_peer_list (struct ConsensusSession *session)
-{
- int local_peer_in_list;
- int listed_peers;
- const struct GNUNET_PeerIdentity *msg_peers;
- unsigned int i;
-
- GNUNET_assert (NULL != session->join_msg);
-
- /* peers in the join message, may or may not include the local peer */
- listed_peers = ntohs (session->join_msg->num_peers);
-
- session->num_peers = listed_peers;
-
- msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
-
- local_peer_in_list = GNUNET_NO;
- for (i = 0; i < listed_peers; i++)
- {
- if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
- {
- local_peer_in_list = GNUNET_YES;
- break;
- }
- }
-
- if (GNUNET_NO == local_peer_in_list)
- session->num_peers++;
-
- session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
-
- if (GNUNET_NO == local_peer_in_list)
- session->peers[session->num_peers - 1] = *my_peer;
-
- memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
- qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
-}
-
-
-static void
-strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key)
-{
- uint32_t v;
- int i;
- v = key->bits[0];
- /* count trailing '1'-bits of v */
- for (i = 0; v & 1; v>>=1, i++)
- /* empty */;
- ibf_insert (strata[i], ibf_key_from_hashcode (key));
-}
-
-
-/**
- * Initialize the session, continue receiving messages from the owning client
- *
- * @param session the session to initialize
- */
-static void
-initialize_session (struct ConsensusSession *session)
-{
- const struct ConsensusSession *other_session;
- int i;
-
- GNUNET_assert (NULL != session->join_msg);
-
- initialize_session_peer_list (session);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
-
- compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
-
- /* Check if some local client already owns the session. */
- other_session = sessions_head;
- while (NULL != other_session)
- {
- if ((other_session != session) &&
- (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
- {
- /* session already owned by another client */
- GNUNET_break (0);
- disconnect_client (session->client);
- return;
- }
- other_session = other_session->next;
- }
-
- session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
-
- session->local_peer_idx = get_peer_idx (my_peer, session);
- GNUNET_assert (-1 != session->local_peer_idx);
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
-
- session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
- for (i = 0; i < STRATA_COUNT; i++)
- session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
-
- session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *));
-
- session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
- initialize_session_info (session);
-
- GNUNET_free (session->join_msg);
- session->join_msg = NULL;
-
- GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
-}
-
-
-/**
- * Called when a client wants to join a consensus session.
- *
- * @param cls unused
- * @param client client that sent the message
- * @param m message sent by the client
- */
-static void
-client_join (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *m)
-{
- struct ConsensusSession *session;
-
- // make sure the client has not already joined a session
- session = sessions_head;
- while (NULL != session)
- {
- if (session->client == client)
- {
- GNUNET_break (0);
- disconnect_client (client);
- return;
- }
- session = session->next;
- }
-
- session = GNUNET_malloc (sizeof (struct ConsensusSession));
- session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
- session->client = client;
- GNUNET_SERVER_client_keep (client);
-
- GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
-
- // Initialize session later if local peer identity is not known yet.
- if (NULL == my_peer)
- {
- GNUNET_SERVER_disable_receive_done_warning (client);
- return;
- }
-
- initialize_session (session);
-}
-
-
-/**
- * Called when a client performs an insert operation.
- *
- * @param cls (unused)
- * @param client client handle
- * @param message message sent by the client
- */
-void
-client_insert (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *m)
-{
- struct ConsensusSession *session;
- struct GNUNET_CONSENSUS_ElementMessage *msg;
- struct GNUNET_CONSENSUS_Element *element;
- struct GNUNET_HashCode key;
- int element_size;
-
- session = sessions_head;
- while (NULL != session)
- {
- if (session->client == client)
- break;
- }
-
- if (NULL == session)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
- GNUNET_SERVER_client_disconnect (client);
- return;
- }
-
- msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
- element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
-
- element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
-
- element->type = msg->element_type;
- element->size = element_size;
- memcpy (&element[1], &msg[1], element_size);
- element->data = &element[1];
-
- GNUNET_assert (NULL != element->data);
-
- GNUNET_CRYPTO_hash (element, element_size, &key);
-
- GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- strata_insert (session->strata, &key);
-
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
- send_next (session);
-}
-
-
-
-
-/**
- * Functions of this signature are called whenever writing operations
- * on a stream are executed
- *
- * @param cls the closure from GNUNET_STREAM_write
- * @param status the status of the stream at the time this function is called;
- * GNUNET_STREAM_OK if writing to stream was completed successfully;
- * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
- * (this doesn't mean that the data is never sent, the receiver may
- * have read the data but its ACKs may have been lost);
- * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
- * be processed.
- * @param size the number of bytes written
- */
-static void
-write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)