X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fgnunet-service-consensus.c;h=cb7ab2c6835f8477f0fa7aef88b91e80df6b6b0c;hb=d88cf34549d5aa0d8367ace1d5482289d4925525;hp=d223360dcbc9f89355e59af5e65f4c7869481f93;hpb=4dc3faf5e88b8ca602602aa28a6ff76c02d34848;p=oweals%2Fgnunet.git diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index d223360dc..cb7ab2c68 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2012 Christian Grothoff (and other contributing authors) + (C) 2012, 2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -20,7 +20,7 @@ /** * @file consensus/gnunet-service-consensus.c - * @brief + * @brief multi-peer set reconciliation * @author Florian Dold */ @@ -29,213 +29,90 @@ #include "gnunet_protocols.h" #include "gnunet_applications.h" #include "gnunet_util_lib.h" +#include "gnunet_set_service.h" #include "gnunet_consensus_service.h" -#include "gnunet_core_service.h" -#include "gnunet_stream_lib.h" #include "consensus_protocol.h" -#include "ibf.h" #include "consensus.h" /** - * Number of IBFs in a strata estimator. + * Log macro that prefixes the local peer and the peer we are in contact with. */ -#define STRATA_COUNT 32 -/** - * Number of buckets per IBF. - */ -#define STRATA_IBF_BUCKETS 80 -/** - * hash num parameter for the difference digests and strata estimators - */ -#define STRATA_HASH_NUM 3 +#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ + cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__) -/** - * Number of buckets that can be transmitted in one message. - */ -#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) /** - * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). - * Choose this value so that computing the IBF is still cheaper - * than transmitting all values. + * Number of exponential rounds, used in the exp and completion round. */ -#define MAX_IBF_ORDER (32) - +#define NUM_EXP_ROUNDS 4 /* forward declarations */ -struct ConsensusSession; -struct IncomingSocket; +/* mutual recursion with struct ConsensusSession */ struct ConsensusPeerInformation; +/* mutual recursion with round_over */ static void -send_next (struct ConsensusSession *session); - -static void -write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); - -static void -write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); - -static void -write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size); - -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); - +subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); -/** - * An element that is waiting to be transmitted. - */ -struct PendingElement -{ - /** - * Pending elements are kept in a DLL. - */ - struct PendingElement *next; - - /** - * Pending elements are kept in a DLL. - */ - struct PendingElement *prev; - - /** - * The actual element - */ - struct GNUNET_CONSENSUS_Element *element; - - /* peer this element is coming from */ - struct ConsensusPeerInformation *cpi; -}; /** - * Information about a peer that is in a consensus session. + * Describes the current round a consensus session is in. */ -struct ConsensusPeerInformation +enum ConsensusRound { - struct GNUNET_STREAM_Socket *socket; - - /** - * Is socket's connection established, i.e. can we write to it? - * Only relevent on outgoing cpi. - */ - int is_connected; - - /** - * Type of the peer in the all-to-all rounds, - * GNUNET_YES if we initiate reconciliation. - */ - int is_outgoing; - - /** - * if the peer did something wrong, and was disconnected, - * never interact with this peer again. - */ - int is_bad; - - /** - * Did we receive/send a consensus hello? - */ - int hello; - - /** - * Handle for currently active read - */ - struct GNUNET_STREAM_ReadHandle *rh; - - /** - * Handle for currently active read - */ - struct GNUNET_STREAM_WriteHandle *wh; - - enum { - IBF_STATE_NONE, - IBF_STATE_RECEIVING, - IBF_STATE_TRANSMITTING, - IBF_STATE_DECODING - } ibf_state ; - - /** - * What is the order (=log2 size) of the ibf - * we're currently dealing with? - */ - int ibf_order; - /** - * The current IBF for this peer, - * purpose dependent on ibf_state + * Not started the protocol yet. */ - struct InvertibleBloomFilter *ibf; - + CONSENSUS_ROUND_BEGIN=0, /** - * How many buckets have we transmitted/received (depending on state)? + * Distribution of elements with the exponential scheme. */ - int ibf_bucket_counter; - + CONSENSUS_ROUND_EXCHANGE, /** - * Strata estimator of the peer, NULL if our peer - * initiated the reconciliation. + * Exchange which elements each peer has, but don't + * transmit the element's data, only their SHA-512 hashes. + * This round uses the all-to-all scheme. */ - struct InvertibleBloomFilter **strata; - + CONSENSUS_ROUND_INVENTORY, /** - * difference estimated with the current strata estimator + * Collect and distribute missing values with the exponential scheme. */ - unsigned int diff; - - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - + CONSENSUS_ROUND_COMPLETION, /** - * Back-reference to the consensus session, - * to that ConsensusPeerInformation can be used as a closure + * Consensus concluded. After timeout and finished communication with client, + * consensus session will be destroyed. */ - struct ConsensusSession *session; - - struct PendingElement *send_pending_head; - struct PendingElement *send_pending_tail; + CONSENSUS_ROUND_FINISH }; -struct QueuedMessage -{ - struct GNUNET_MessageHeader *msg; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; -}; -enum ConsensusRound +/** + * Complete information about the current round and all + * subrounds. + */ +struct RoundInfo { /** - * distribution of information with the exponential scheme + * The current main round. */ - CONSENSUS_ROUND_EXP_EXCHANGE, + enum ConsensusRound round; /** - * All-to-all, exchange missing values + * The current exp round, valid if + * the main round is an exp round. */ - CONSENSUS_ROUND_A2A_EXCHANGE, + uint32_t exp_round; /** - * All-to-all, check what values are missing, don't exchange anything + * The current exp subround, valid if + * the main round is an exp round. */ - CONSENSUS_ROUND_A2A_INVENTORY - - /* - a round to exchange the information for fraud-detection - CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT - */ + uint32_t exp_subround; }; /** * A consensus session consists of one local client and the remote authorities. - * */ struct ConsensusSession { @@ -249,167 +126,139 @@ struct ConsensusSession */ struct ConsensusSession *prev; - /** - * Join message. Used to initialize the session later, - * if the identity of the local peer is not yet known. - * NULL if the session has been fully initialized. - */ - struct GNUNET_CONSENSUS_JoinMessage *join_msg; - /** * Global consensus identification, computed - * from the local id and participating authorities. + * from the session id and participating authorities. */ struct GNUNET_HashCode global_id; /** - * Local client in this consensus session. - * There is only one client per consensus session. + * Client that inhabits the session */ struct GNUNET_SERVER_Client *client; /** - * Values in the consensus set of this session, - * all of them either have been sent by or approved by the client. - * Contains GNUNET_CONSENSUS_Element. - */ - struct GNUNET_CONTAINER_MultiHashMap *values; - - /** - * Elements that have not been approved (or rejected) by the client yet. + * Queued messages to the client. */ - struct PendingElement *approval_pending_head; + struct GNUNET_MQ_Handle *client_mq; /** - * Elements that have not been approved (or rejected) by the client yet. + * Timeout for all rounds together, single rounds will schedule a timeout task + * with a fraction of the conclude timeout. + * Only valid once the current round is not CONSENSUS_ROUND_BEGIN. */ - struct PendingElement *approval_pending_tail; - + struct GNUNET_TIME_Relative conclude_timeout; + /** - * Messages to be sent to the local client that owns this session + * Timeout task identifier for the current round. */ - struct QueuedMessage *client_messages_head; + GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; /** - * Messages to be sent to the local client that owns this session + * Number of other peers in the consensus. */ - struct QueuedMessage *client_messages_tail; + unsigned int num_peers; /** - * Currently active transmit handle for sending to the client + * Information about the other peers, + * their state, etc. */ - struct GNUNET_SERVER_TransmitHandle *th; + struct ConsensusPeerInformation *info; /** - * Once conclude_requested is GNUNET_YES, the client may not - * insert any more values. + * Index of the local peer in the peers array */ - int conclude_requested; + unsigned int local_peer_idx; /** - * Minimum number of peers to form a consensus group + * Current round */ - int conclude_group_min; + enum ConsensusRound current_round; /** - * Number of other peers in the consensus + * Permutation of peers for the current round, + * maps logical index (for current round) to physical index (location in info array) */ - unsigned int num_peers; + uint32_t *shuffle; /** - * Information about the other peers, - * their state, etc. + * Current round of the exponential scheme. */ - struct ConsensusPeerInformation *info; + uint32_t exp_round; /** - * Sorted array of peer identities in this consensus session, - * includes the local peer. + * Current sub-round of the exponential scheme. */ - struct GNUNET_PeerIdentity *peers; + uint32_t exp_subround; /** - * Index of the local peer in the peers array + * The partner for the current exp-round */ - int local_peer_idx; + struct ConsensusPeerInformation *partner_outgoing; /** - * Strata estimator, computed online + * The partner for the current exp-round */ - struct InvertibleBloomFilter **strata; + struct ConsensusPeerInformation *partner_incoming; /** - * Pre-computed IBFs + * The consensus set of this session. */ - struct InvertibleBloomFilter **ibfs; + struct GNUNET_SET_Handle *element_set; /** - * Current round + * Listener for requests from other peers. + * Uses the session's global id as app id. */ - enum ConsensusRound current_round; + struct GNUNET_SET_ListenHandle *set_listener; }; /** - * Sockets from other peers who want to communicate with us. - * It may not be known yet which consensus session they belong to. + * Information about a peer that is in a consensus session. */ -struct IncomingSocket +struct ConsensusPeerInformation { /** - * Incoming sockets are kept in a double linked list. + * Peer identitty of the peer in the consensus session */ - struct IncomingSocket *next; + struct GNUNET_PeerIdentity peer_id; /** - * Incoming sockets are kept in a double linked list. - */ - struct IncomingSocket *prev; - - /** - * The actual socket. - */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Handle for currently active read + * Back-reference to the consensus session, + * to that ConsensusPeerInformation can be used as a closure */ - struct GNUNET_STREAM_ReadHandle *rh; + struct ConsensusSession *session; /** - * Peer that connected to us with the socket. + * We have finishes the exp-subround with the peer. */ - struct GNUNET_PeerIdentity *peer; + int exp_subround_finished; /** - * Message stream tokenizer for this socket. + * Set operation we are currently executing with this peer. */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct GNUNET_SET_OperationHandle *set_op; /** - * Peer-in-session this socket belongs to, once known, otherwise NULL. + * Set operation we are planning on executing with this peer. */ - struct ConsensusPeerInformation *cpi; + struct GNUNET_SET_OperationHandle *delayed_set_op; /** - * Set to the global session id, if the peer sent us a hello-message, - * but the session does not exist yet. - * - * FIXME: not implemented yet + * Info about the round of the delayed set operation. */ - struct GNUNET_HashCode *requested_gid; + struct RoundInfo delayed_round_info; }; -static struct IncomingSocket *incoming_sockets_head; -static struct IncomingSocket *incoming_sockets_tail; /** - * Linked list of sesstions this peer participates in. + * Linked list of sessions this peer participates in. */ static struct ConsensusSession *sessions_head; /** - * Linked list of sesstions this peer participates in. + * Linked list of sessions this peer participates in. */ static struct ConsensusSession *sessions_tail; @@ -426,559 +275,507 @@ static struct GNUNET_SERVER_Handle *srv; /** * Peer that runs this service. */ -static struct GNUNET_PeerIdentity *my_peer; +static struct GNUNET_PeerIdentity my_peer; -/** - * Handle to the core service. Only used during service startup, will be NULL after that. - */ -static struct GNUNET_CORE_Handle *core; -/** - * Listener for sockets from peers that want to reconcile with us. - */ -static struct GNUNET_STREAM_ListenSocket *listener; +static int +have_exp_subround_finished (const struct ConsensusSession *session) +{ + int not_finished; + not_finished = 0; + if ( (NULL != session->partner_outgoing) && + (GNUNET_NO == session->partner_outgoing->exp_subround_finished) ) + not_finished++; + if ( (NULL != session->partner_incoming) && + (GNUNET_NO == session->partner_incoming->exp_subround_finished) ) + not_finished++; + if (0 == not_finished) + return GNUNET_YES; + return GNUNET_NO; +} /** - * Queue a message to be sent to the inhabiting client of a sessino - * - * @param session session - * @param msg message we want to queue + * Destroy a session, free all resources associated with it. + * + * @param session the session to destroy */ static void -queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) +destroy_session (struct ConsensusSession *session) { - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); + int i; + + GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); + if (NULL != session->element_set) + { + GNUNET_SET_destroy (session->element_set); + session->element_set = NULL; + } + if (NULL != session->set_listener) + { + GNUNET_SET_listen_cancel (session->set_listener); + session->set_listener = NULL; + } + if (NULL != session->client_mq) + { + GNUNET_MQ_destroy (session->client_mq); + session->client_mq = NULL; + } + if (NULL != session->client) + { + GNUNET_SERVER_client_disconnect (session->client); + session->client = NULL; + } + if (NULL != session->shuffle) + { + GNUNET_free (session->shuffle); + session->shuffle = NULL; + } + if (NULL != session->info) + { + for (i = 0; i < session->num_peers; i++) + { + struct ConsensusPeerInformation *cpi; + cpi = &session->info[i]; + if (NULL != cpi->set_op) + { + GNUNET_SET_operation_cancel (cpi->set_op); + cpi->set_op = NULL; + } + } + GNUNET_free (session->info); + session->info = NULL; + } + GNUNET_free (session); } + /** - * Get peer index associated with the peer information, - * unique for every session among all peers. + * Iterator for set elements. + * + * @param cls closure + * @param element the current element, NULL if all elements have been + * iterated over + * @return GNUNET_YES to continue iterating, GNUNET_NO to stop. */ static int -get_cpi_index (struct ConsensusPeerInformation *cpi) +send_to_client_iter (void *cls, + const struct GNUNET_SET_Element *element) { - return cpi - cpi->session->info; + struct ConsensusSession *session = cls; + struct GNUNET_MQ_Envelope *ev; + + if (NULL != element) + { + struct GNUNET_CONSENSUS_ElementMessage *m; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got element for client\n", + session->local_peer_idx); + + ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); + m->element_type = htons (element->type); + memcpy (&m[1], element->data, element->size); + GNUNET_MQ_send (session->client_mq, ev); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: finished iterating elements for client\n", + session->local_peer_idx); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); + GNUNET_MQ_send (session->client_mq, ev); + } + return GNUNET_YES; } + /** - * Mark the peer as bad, free as state we don't need anymore. + * Start the next round. + * This function can be invoked as a timeout task, or called manually (tc will be NULL then). + * + * @param cls the session + * @param tc task context, for when this task is invoked by the scheduler, + * NULL if invoked for another reason */ -static void -mark_peer_bad (struct ConsensusPeerInformation *cpi) +static void +round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi)); - cpi->is_bad = GNUNET_YES; - /* FIXME: free ibfs etc. */ + struct ConsensusSession *session; + + /* don't kick off next round if we're shutting down */ + if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + session = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); + + if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (session->round_timeout_tid); + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; + } + + switch (session->current_round) + { + case CONSENSUS_ROUND_BEGIN: + session->current_round = CONSENSUS_ROUND_EXCHANGE; + session->exp_round = 0; + subround_over (session, NULL); + break; + case CONSENSUS_ROUND_EXCHANGE: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: finished, sending elements to client\n", + session->local_peer_idx); + session->current_round = CONSENSUS_ROUND_FINISH; + GNUNET_SET_iterate (session->element_set, send_to_client_iter, session); + break; + default: + GNUNET_assert (0); + } } /** - * Estimate set difference with two strata estimators, - * i.e. arrays of IBFs. + * Create a new permutation for the session's peers in session->shuffle. + * Uses a Fisher-Yates shuffle with pseudo-randomness coming from + * both the global session id and the current round index. + * + * @param session the session to create the new permutation for */ -static int -estimate_difference (struct InvertibleBloomFilter** strata1, - struct InvertibleBloomFilter** strata2) +static void +shuffle (struct ConsensusSession *session) { - int i; - int count; - count = 0; - for (i = STRATA_COUNT - 1; i >= 0; i--) + uint32_t i; + uint32_t randomness[session->num_peers-1]; + + if (NULL == session->shuffle) + session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle)); + + GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), + &session->exp_round, sizeof (uint32_t), + &session->global_id, sizeof (struct GNUNET_HashCode), + NULL); + + for (i = 0; i < session->num_peers; i++) + session->shuffle[i] = i; + + for (i = session->num_peers - 1; i > 0; i--) { - struct InvertibleBloomFilter *diff; - int ibf_count; - int more; - ibf_count = 0; - diff = ibf_dup (strata1[i]); - ibf_subtract (diff, strata2[i]); - for (;;) - { - more = ibf_decode (diff, NULL, NULL); - if (GNUNET_NO == more) - { - count += ibf_count; - break; - } - if (GNUNET_SYSERR == more) - { - ibf_destroy (diff); - return count * (1 << (i + 1)); - } - ibf_count++; - } - ibf_destroy (diff); + uint32_t x; + uint32_t tmp; + x = randomness[i-1] % session->num_peers; + tmp = session->shuffle[x]; + session->shuffle[x] = session->shuffle[i]; + session->shuffle[i] = tmp; } - return count; } /** - * Called when receiving data from a peer that is member of - * an inhabited consensus session. + * Find and set the partner_incoming and partner_outgoing of our peer, + * one of them may not exist (and thus set to NULL) if the number of peers + * in the session is not a power of two. * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). + * @param session the consensus session */ -static size_t -session_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) +static void +find_partners (struct ConsensusSession *session) { - struct ConsensusPeerInformation *cpi; - int ret; + unsigned int arc; + unsigned int num_ghosts; + unsigned int largest_arc; + int partner_idx; - GNUNET_assert (GNUNET_STREAM_OK == status); + /* shuffled local index */ + int my_idx = session->shuffle[session->local_peer_idx]; - cpi = cls; + /* distance to neighboring peer in current subround */ + arc = 1 << session->exp_subround; + largest_arc = 1; + while (largest_arc < session->num_peers) + largest_arc <<= 1; + num_ghosts = largest_arc - session->num_peers; - GNUNET_assert (NULL != cpi->mst); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts); - ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); - if (GNUNET_SYSERR == ret) + if (0 == (my_idx & arc)) { - /* FIXME: handle this correctly */ - GNUNET_assert (0); + /* we are outgoing */ + partner_idx = (my_idx + arc) % session->num_peers; + session->partner_outgoing = &session->info[session->shuffle[partner_idx]]; + session->partner_outgoing->exp_subround_finished = GNUNET_NO; + /* are we a 'ghost' of a peer that would exist if + * the number of peers was a power of two, and thus have to partner + * with an additional peer? + */ + if (my_idx < num_ghosts) + { + int ghost_partner_idx; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers); + ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is before %d\n", ghost_partner_idx); + /* platform dependent; modulo sometimes returns negative values */ + if (ghost_partner_idx < 0) + ghost_partner_idx += session->num_peers; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is after %d\n", ghost_partner_idx); + session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]]; + session->partner_incoming->exp_subround_finished = GNUNET_NO; + return; + } + session->partner_incoming = NULL; + return; } - - /* read again */ - cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &session_stream_data_processor, cpi); - - /* we always read all data */ - return size; + partner_idx = (my_idx - (int) arc) % (int) session->num_peers; + if (partner_idx < 0) + partner_idx += session->num_peers; + session->partner_outgoing = NULL; + session->partner_incoming = &session->info[session->shuffle[partner_idx]]; + session->partner_incoming->exp_subround_finished = GNUNET_NO; } + /** - * Called when we receive data from a peer that is not member of - * a session yet, or the session is not yet inhabited. + * Callback for set operation results. Called for each element + * in the result set. * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). + * @param cls closure + * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK + * @param status see enum GNUNET_SET_Status */ -static size_t -incoming_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) +static void +set_result_cb (void *cls, + const struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) { - struct IncomingSocket *incoming; - int ret; + struct ConsensusPeerInformation *cpi = cls; + unsigned int remote_idx = cpi - cpi->session->info; + unsigned int local_idx = cpi->session->local_peer_idx; - GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert ((cpi == cpi->session->partner_outgoing) || + (cpi == cpi->session->partner_incoming)); - incoming = cls; - - ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); - if (GNUNET_SYSERR == ret) + switch (status) { - /* FIXME: handle this correctly */ - GNUNET_assert (0); + case GNUNET_SET_STATUS_OK: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: element\n", + local_idx, remote_idx); + break; + case GNUNET_SET_STATUS_FAILURE: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: failure\n", + local_idx, remote_idx); + cpi->set_op = NULL; + return; + case GNUNET_SET_STATUS_HALF_DONE: + case GNUNET_SET_STATUS_DONE: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: done\n", + local_idx, remote_idx); + cpi->exp_subround_finished = GNUNET_YES; + cpi->set_op = NULL; + if (have_exp_subround_finished (cpi->session) == GNUNET_YES) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: all reconciliations of subround done\n", + local_idx); + subround_over (cpi->session, NULL); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: waiting for further set results\n", + local_idx); + } + return; + default: + GNUNET_break (0); + return; } - /* read again */ - incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &incoming_stream_data_processor, incoming); - - /* we always read all data */ - return size; + switch (cpi->session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL); + break; + default: + GNUNET_break (0); + return; + } } /** - * Iterator to insert values into an ibf. + * Compare the round the session is in with the round of the given context message. * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @param session a consensus session + * @param round a round context message + * @return 0 if it's the same round, -1 if the session is in an earlier round, + * 1 if the session is in a later round */ static int -ibf_values_iterator (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct ConsensusPeerInformation *cpi; - cpi = cls; - ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key)); - return GNUNET_YES; -} - -static void -prepare_ibf (struct ConsensusPeerInformation *cpi) +rounds_compare (struct ConsensusSession *session, + struct RoundInfo* ri) { - if (NULL == cpi->session->ibfs[cpi->ibf_order]) + if (session->current_round < ri->round) + return -1; + if (session->current_round > ri->round) + return 1; + if (session->current_round == CONSENSUS_ROUND_EXCHANGE) { - cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); - GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); + if (session->exp_round < ri->exp_round) + return -1; + if (session->exp_round > ri->exp_round) + return 1; + if (session->exp_subround < ri->exp_subround) + return -1; + if (session->exp_subround < ri->exp_subround) + return 1; + return 0; } + /* comparing rounds when we are not in a exp round */ + GNUNET_assert (0); } /** - * Called when a peer sends us its strata estimator. - * In response, we sent out IBF of appropriate size back. + * Do the next subround in the exp-scheme. + * This function can be invoked as a timeout task, or called manually (tc will be NULL then). * - * @param cpi session - * @param strata_msg message + * @param cls the session + * @param tc task context, for when this task is invoked by the scheduler, + * NULL if invoked for another reason */ -static int -handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) +static void +subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct ConsensusSession *session; int i; - uint64_t *key_src; - uint32_t *hash_src; - uint8_t *count_src; - GNUNET_assert (GNUNET_NO == cpi->is_outgoing); - - if (NULL == cpi->strata) + /* don't kick off next subround if we're shutting down */ + if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + session = cls; + /* cancel timeout */ + if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) { - cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); - for (i = 0; i < STRATA_COUNT; i++) - cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); + GNUNET_SCHEDULER_cancel (session->round_timeout_tid); + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; } - - /* for correct message alignment, copy bucket types seperately */ - key_src = (uint64_t *) &strata_msg[1]; - - for (i = 0; i < STRATA_COUNT; i++) + + if (session->exp_round >= NUM_EXP_ROUNDS) { - memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src); - key_src += STRATA_IBF_BUCKETS; + round_over (session, NULL); + return; } - hash_src = (uint32_t *) key_src; - - for (i = 0; i < STRATA_COUNT; i++) + if (session->exp_round == 0) { - memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); - hash_src += STRATA_IBF_BUCKETS; + /* initialize everything for the log-rounds */ + session->exp_round = 1; + session->exp_subround = 0; + if (NULL == session->shuffle) + session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); + for (i = 0; i < session->num_peers; i++) + session->shuffle[i] = i; } - - count_src = (uint8_t *) hash_src; - - for (i = 0; i < STRATA_COUNT; i++) + else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) { - memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS); - count_src += STRATA_IBF_BUCKETS; + /* subrounds done, start new log-round */ + session->exp_round++; + session->exp_subround = 0; + //shuffle (session); + } + else + { + session->exp_subround++; } - cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); - - /* send IBF of the right size */ - cpi->ibf_order = 0; - while ((1 << cpi->ibf_order) < cpi->diff) - cpi->ibf_order++; - if (cpi->ibf_order > MAX_IBF_ORDER) - cpi->ibf_order = MAX_IBF_ORDER; - cpi->ibf_order += 2; - /* create ibf if not already pre-computed */ - prepare_ibf (cpi); - cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_state = IBF_STATE_TRANSMITTING; - write_ibf (cpi, GNUNET_STREAM_OK, 0); - - return GNUNET_YES; -} - - -static int -handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) -{ - int num_buckets; - uint64_t *key_src; - uint32_t *hash_src; - uint8_t *count_src; + /* determine the incoming and outgoing partner */ + find_partners (session); - num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; + GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]); + GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]); - if (IBF_STATE_NONE == cpi->ibf_state) + /* initiate set operation with the outgoing partner */ + if (NULL != session->partner_outgoing) { - cpi->ibf_state = IBF_STATE_RECEIVING; - cpi->ibf_order = digest->order; - cpi->ibf_bucket_counter = 0; + struct GNUNET_CONSENSUS_RoundContextMessage *msg; + msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); + msg->header.size = htons (sizeof *msg); + msg->round = htonl (session->current_round); + msg->exp_round = htonl (session->exp_round); + msg->exp_subround = htonl (session->exp_subround); + + if (NULL != session->partner_outgoing->set_op) + { + GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); + } + session->partner_outgoing->set_op = + GNUNET_SET_prepare (&session->partner_outgoing->peer_id, + &session->global_id, + (struct GNUNET_MessageHeader *) msg, + 0, /* FIXME: salt */ + GNUNET_SET_RESULT_ADDED, + set_result_cb, session->partner_outgoing); + GNUNET_free (msg); + GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set); } - if ( (IBF_STATE_RECEIVING != cpi->ibf_state) || - (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) ) + /* commit to the delayed set operation */ + if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op)) { - mark_peer_bad (cpi); - return GNUNET_NO; - } + int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); - - if (NULL == cpi->ibf) - cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); - - key_src = (uint64_t *) &digest[1]; - - memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src); - hash_src += num_buckets; - - hash_src = (uint32_t *) key_src; - - memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); - hash_src += num_buckets; - - count_src = (uint8_t *) hash_src; - - memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src); - - cpi->ibf_bucket_counter += num_buckets; - - if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); - GNUNET_assert (NULL != cpi->wh); - cpi->ibf_state = IBF_STATE_DECODING; - prepare_ibf (cpi); - ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); - write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0); - } - return GNUNET_YES; -} - - -static int -handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) -{ - struct PendingElement *pending_element; - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; - size_t size; - - size = ntohs (element_msg->size) - sizeof *element_msg; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size); - - element = GNUNET_malloc (size + sizeof *element); - element->size = size; - memcpy (&element[1], &element_msg[1], size); - element->data = &element[1]; - - pending_element = GNUNET_malloc (sizeof *pending_element); - pending_element->element = element; - GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element); - - client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); - client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); - client_element_msg->header.size = htons (size + sizeof *client_element_msg); - memcpy (&client_element_msg[1], &element[1], size); - - queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n"); - - send_next (cpi->session); - - return GNUNET_YES; -} - - -/** - * Handle a request for elements. - * Only allowed in exchange-rounds. - * - * FIXME: implement - */ -static int -handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) -{ - /* FIXME: implement */ - return GNUNET_YES; -} - - -/** - * Handle a HELLO-message, send when another peer wants to join a session where - * our peer is a member. The session may or may not be inhabited yet. - */ -static int -handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) -{ - /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ - struct ConsensusSession *session; - session = sessions_head; - while (NULL != session) - { - if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) + if (NULL != session->partner_incoming->set_op) { - int idx; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n"); - idx = get_peer_idx (inc->peer, session); - GNUNET_assert (-1 != idx); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx); - inc->cpi = &session->info[idx]; - GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing); - inc->cpi->mst = inc->mst; - inc->cpi->hello = GNUNET_YES; - inc->cpi->socket = inc->socket; - return GNUNET_YES; + GNUNET_SET_operation_cancel (session->partner_incoming->set_op); + session->partner_incoming->set_op = NULL; + } + if (cmp == 0) + { + GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set); + session->partner_incoming->set_op = session->partner_incoming->delayed_set_op; + session->partner_incoming->delayed_set_op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d resumed delayed round with P%d\n", + session->local_peer_idx, (int) (session->partner_incoming - session->info)); + } + else + { + /* this should not happen -- a round has been skipped! */ + GNUNET_break_op (0); } - session = session->next; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n"); - GNUNET_break (0); - return GNUNET_NO; -} - -/** - * Functions with this signature are called whenever a - * complete message is received by the tokenizer. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure - * @param client identification of the client - * @param message the actual message - * - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct ConsensusPeerInformation *cpi; - cpi = cls; - switch (ntohs (message->type)) +#ifdef GNUNET_EXTRA_LOGGING { - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: - return handle_p2p_strata (cpi, (struct StrataMessage *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: - return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: - return handle_p2p_element (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: - return handle_p2p_element_request (cpi, (struct ElementRequest *) message); - default: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); - /* FIXME: handle correctly */ - GNUNET_assert (0); + int in; + int out; + if (session->partner_outgoing == NULL) + out = -1; + else + out = (int) (session->partner_outgoing - session->info); + if (session->partner_incoming == NULL) + in = -1; + else + in = (int) (session->partner_incoming - session->info); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, + session->exp_round, session->exp_subround, in, out); } - return GNUNET_OK; -} +#endif /* GNUNET_EXTRA_LOGGING */ - -/** - * Handle tokenized messages from stream sockets. - * Delegate them if the socket belongs to a session, - * handle hello messages otherwise. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure, unused - * @param client incoming socket this message comes from - * @param message the actual message - * - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct IncomingSocket *inc; - inc = (struct IncomingSocket *) client; - switch (ntohs( message->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: - return handle_p2p_hello (inc, (struct ConsensusHello *) message); - default: - if (NULL != inc->cpi) - return mst_session_callback (inc->cpi, client, message); - /* FIXME: disconnect peer properly */ - GNUNET_assert (0); - } - return GNUNET_OK; } /** - * Functions of this type are called upon new stream connection from other peers - * or upon binding error which happen when the app_port given in - * GNUNET_STREAM_listen() is already taken. + * Search peer in the list of peers in session. * - * @param cls the closure from GNUNET_STREAM_listen - * @param socket the socket representing the stream; NULL on binding error - * @param initiator the identity of the peer who wants to establish a stream - * with us; NULL on binding error - * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the - * stream (the socket will be invalid after the call) + * @param peer peer to find + * @param session session with peer + * @return index of peer, -1 if peer is not in session */ static int -listen_cb (void *cls, - struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_PeerIdentity *initiator) -{ - struct IncomingSocket *incoming; - - GNUNET_assert (NULL != socket); - - incoming = GNUNET_malloc (sizeof *incoming); - - incoming->socket = socket; - incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); - - incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &incoming_stream_data_processor, incoming); - - - incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); - - GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); - - return GNUNET_OK; -} - - -static void -destroy_session (struct ConsensusSession *session) -{ - /* FIXME: more stuff to free! */ - GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); - GNUNET_SERVER_client_drop (session->client); - GNUNET_free (session); -} - - -/** - * Disconnect a client, and destroy all sessions associated with it. - * - * @param client the client to disconnect - */ -static void -disconnect_client (struct GNUNET_SERVER_Client *client) +get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) { - struct ConsensusSession *session; - GNUNET_SERVER_client_disconnect (client); - - /* if the client owns a session, remove it */ - session = sessions_head; - while (NULL != session) - { - if (client == session->client) - { - destroy_session (session); - break; - } - session = session->next; - } + int i; + for (i = 0; i < session->num_peers; i++) + if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) + return i; + return -1; } @@ -988,98 +785,24 @@ disconnect_client (struct GNUNET_SERVER_Client *client) * Thus, if the local id of two consensus sessions coincide, but are not comprised of * exactly the same peers, the global id will be different. * - * @param local_id local id of the consensus session - * @param peers array of all peers participating in the consensus session - * @param num_peers number of elements in the peers array - * @param dst where the result is stored, may not be NULL + * @param session session to generate the global id for + * @param session_id local id of the consensus session */ static void -compute_global_id (const struct GNUNET_HashCode *local_id, - const struct GNUNET_PeerIdentity *peers, int num_peers, - struct GNUNET_HashCode *dst) +compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id) { int i; struct GNUNET_HashCode tmp; - *dst = *local_id; - for (i = 0; i < num_peers; ++i) - { - GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); - *dst = tmp; - GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); - *dst = tmp; - } -} - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls consensus session - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) -{ - struct ConsensusSession *session; - struct QueuedMessage *qmsg; - size_t msg_size; - - session = cls; - session->th = NULL; + /* FIXME: use kdf? */ - - qmsg = session->client_messages_head; - GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); - GNUNET_assert (qmsg); - - if (NULL == buf) - { - destroy_session (session); - return 0; - } - - msg_size = ntohs (qmsg->msg->size); - - GNUNET_assert (size >= msg_size); - - memcpy (buf, qmsg->msg, msg_size); - GNUNET_free (qmsg->msg); - GNUNET_free (qmsg); - - send_next (session); - - return msg_size; -} - - -/** - * Schedule sending the next message (if there is any) to a client. - * - * @param cli the client to send the next message to - */ -static void -send_next (struct ConsensusSession *session) -{ - - GNUNET_assert (NULL != session); - - if (NULL != session->th) - return; - - if (NULL != session->client_messages_head) + session->global_id = *session_id; + for (i = 0; i < session->num_peers; ++i) { - int msize; - msize = ntohs (session->client_messages_head->msg->size); - session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, session); + GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp); + session->global_id = tmp; + GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp); + session->global_id = tmp; } } @@ -1093,762 +816,362 @@ send_next (struct ConsensusSession *session) * @param h2 some hash code * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. */ -static int -hash_cmp (const void *a, const void *b) -{ - return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b); -} - - -/** - * Search peer in the list of peers in session. - * - * @param peer peer to find - * @param session session with peer - * @return index of peer, -1 if peer is not in session - */ -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) -{ - const struct GNUNET_PeerIdentity *needle; - needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); - if (NULL == needle) - return -1; - return needle - session->peers; -} - - - -/** - * Called when stream has finishes writing the hello message - */ -static void -hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) -{ - struct ConsensusPeerInformation *cpi; - - cpi = cls; - cpi->hello = GNUNET_YES; - - GNUNET_assert (GNUNET_STREAM_OK == status); - - if (cpi->session->conclude_requested) - { - write_strata (cpi, GNUNET_STREAM_OK, 0); - } -} - - -/** - * Functions of this type will be called when a stream is established - * - * @param cls the closure from GNUNET_STREAM_open - * @param socket socket to use to communicate with the other side (read/write) - */ -static void -open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) -{ - struct ConsensusPeerInformation *cpi; - struct ConsensusHello *hello; - - - cpi = cls; - cpi->is_connected = GNUNET_YES; - - hello = GNUNET_malloc (sizeof *hello); - hello->header.size = htons (sizeof *hello); - hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); - memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); - - cpi->wh = - GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); - - cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &session_stream_data_processor, cpi); - -} - - -static void -initialize_session_info (struct ConsensusSession *session) -{ - int i; - int last; - - for (i = 0; i < session->num_peers; ++i) - { - /* initialize back-references, so consensus peer information can - * be used as closure */ - session->info[i].session = session; - } - - session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; - - last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; - i = (session->local_peer_idx + 1) % session->num_peers; - while (i != last) - { - session->info[i].is_outgoing = GNUNET_YES; - session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); - session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]); - i = (i + 1) % session->num_peers; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i); - } - // tie-breaker for even number of peers - if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) - { - session->info[last].is_outgoing = GNUNET_YES; - session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); - session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); - } -} - - -/** - * Create the sorted list of peers for the session, - * add the local peer if not in the join message. - */ -static void -initialize_session_peer_list (struct ConsensusSession *session) -{ - int local_peer_in_list; - int listed_peers; - const struct GNUNET_PeerIdentity *msg_peers; - unsigned int i; - - GNUNET_assert (NULL != session->join_msg); - - /* peers in the join message, may or may not include the local peer */ - listed_peers = ntohs (session->join_msg->num_peers); - - session->num_peers = listed_peers; - - msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; - - local_peer_in_list = GNUNET_NO; - for (i = 0; i < listed_peers; i++) - { - if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) - { - local_peer_in_list = GNUNET_YES; - break; - } - } - - if (GNUNET_NO == local_peer_in_list) - session->num_peers++; - - session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); - - if (GNUNET_NO == local_peer_in_list) - session->peers[session->num_peers - 1] = *my_peer; - - memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); - qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); -} - - -static void -strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key) -{ - uint32_t v; - int i; - v = key->bits[0]; - /* count trailing '1'-bits of v */ - for (i = 0; v & 1; v>>=1, i++) - /* empty */; - ibf_insert (strata[i], ibf_key_from_hashcode (key)); -} - - -/** - * Initialize the session, continue receiving messages from the owning client - * - * @param session the session to initialize - */ -static void -initialize_session (struct ConsensusSession *session) -{ - const struct ConsensusSession *other_session; - int i; - - GNUNET_assert (NULL != session->join_msg); - - initialize_session_peer_list (session); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); - - compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id); - - /* Check if some local client already owns the session. */ - other_session = sessions_head; - while (NULL != other_session) - { - if ((other_session != session) && - (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) - { - /* session already owned by another client */ - GNUNET_break (0); - disconnect_client (session->client); - return; - } - other_session = other_session->next; - } - - session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); - - session->local_peer_idx = get_peer_idx (my_peer, session); - GNUNET_assert (-1 != session->local_peer_idx); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); - - session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); - for (i = 0; i < STRATA_COUNT; i++) - session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); - - session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); - - session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); - initialize_session_info (session); - - GNUNET_free (session->join_msg); - session->join_msg = NULL; - - GNUNET_SERVER_receive_done (session->client, GNUNET_OK); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); -} - - -/** - * Called when a client wants to join a consensus session. - * - * @param cls unused - * @param client client that sent the message - * @param m message sent by the client - */ -static void -client_join (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) -{ - struct ConsensusSession *session; - - // make sure the client has not already joined a session - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - { - GNUNET_break (0); - disconnect_client (client); - return; - } - session = session->next; - } - - session = GNUNET_malloc (sizeof (struct ConsensusSession)); - session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); - session->client = client; - GNUNET_SERVER_client_keep (client); - - GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); - - // Initialize session later if local peer identity is not known yet. - if (NULL == my_peer) - { - GNUNET_SERVER_disable_receive_done_warning (client); - return; - } - - initialize_session (session); -} - - -/** - * Called when a client performs an insert operation. - * - * @param cls (unused) - * @param client client handle - * @param message message sent by the client - */ -void -client_insert (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) -{ - struct ConsensusSession *session; - struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_HashCode key; - int element_size; - - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - break; - } - - if (NULL == session) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); - GNUNET_SERVER_client_disconnect (client); - return; - } - - msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; - element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); - - element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); - - element->type = msg->element_type; - element->size = element_size; - memcpy (&element[1], &msg[1], element_size); - element->data = &element[1]; - - GNUNET_assert (NULL != element->data); - - GNUNET_CRYPTO_hash (element, element_size, &key); - - GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - strata_insert (session->strata, &key); - - GNUNET_SERVER_receive_done (client, GNUNET_OK); - - send_next (session); -} - - - - -/** - * Functions of this signature are called whenever writing operations - * on a stream are executed - * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written - */ -static void -write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) +static int +hash_cmp (const void *h1, const void *h2) { - GNUNET_assert (GNUNET_STREAM_OK == status); - /* just wait for the ibf */ + return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2); } + /** - * Functions of this signature are called whenever writing operations - * on a stream are executed - * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written + * Create the sorted list of peers for the session, + * add the local peer if not in the join message. */ -static void -write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) +static void +initialize_session_peer_list (struct ConsensusSession *session, + struct GNUNET_CONSENSUS_JoinMessage *join_msg) { - struct ConsensusPeerInformation *cpi; - struct StrataMessage *strata_msg; - size_t msize; - int i; - uint64_t *key_dst; - uint32_t *hash_dst; - uint8_t *count_dst; - - cpi = cls; - cpi->wh = NULL; - - GNUNET_assert (GNUNET_STREAM_OK == status); - - GNUNET_assert (GNUNET_YES == cpi->is_outgoing); - - /* FIXME: handle this */ - GNUNET_assert (GNUNET_STREAM_OK == status); + unsigned int local_peer_in_list; + uint32_t listed_peers; + const struct GNUNET_PeerIdentity *msg_peers; + struct GNUNET_PeerIdentity *peers; + unsigned int i; - msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); + GNUNET_assert (NULL != join_msg); - strata_msg = GNUNET_malloc (msize); - strata_msg->header.size = htons (msize); - strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); + /* peers in the join message, may or may not include the local peer */ + listed_peers = ntohl (join_msg->num_peers); + + session->num_peers = listed_peers; - /* for correct message alignment, copy bucket types seperately */ - key_dst = (uint64_t *) &strata_msg[1]; + msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; - for (i = 0; i < STRATA_COUNT; i++) + local_peer_in_list = GNUNET_NO; + for (i = 0; i < listed_peers; i++) { - memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst); - key_dst += STRATA_IBF_BUCKETS; + if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) + { + local_peer_in_list = GNUNET_YES; + break; + } } - hash_dst = (uint32_t *) key_dst; + if (GNUNET_NO == local_peer_in_list) + session->num_peers++; - for (i = 0; i < STRATA_COUNT; i++) - { - memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); - hash_dst += STRATA_IBF_BUCKETS; - } + peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); + + if (GNUNET_NO == local_peer_in_list) + peers[session->num_peers - 1] = my_peer; + + memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); + qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); - count_dst = (uint8_t *) hash_dst; + session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); - for (i = 0; i < STRATA_COUNT; i++) + for (i = 0; i < session->num_peers; ++i) { - memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS); - count_dst += STRATA_IBF_BUCKETS; + /* initialize back-references, so consensus peer information can + * be used as closure */ + session->info[i].session = session; + session->info[i].peer_id = peers[i]; } - cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, - write_strata_done, cpi); - - GNUNET_assert (NULL != cpi->wh); + GNUNET_free (peers); } /** - * Functions of this signature are called whenever writing operations - * on a stream are executed + * Called when another peer wants to do a set operation with the + * local peer. * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written + * @param cls closure + * @param other_peer the other peer + * @param context_msg message with application specific information from + * the other peer + * @param request request from the other peer, use GNUNET_SET_accept + * to accept it, otherwise the request will be refused + * Note that we don't use a return value here, as it is also + * necessary to specify the set we want to do the operation with, + * whith sometimes can be derived from the context message. + * Also necessary to specify the timeout. */ -static void -write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) +static void +set_listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SET_Request *request) { + struct ConsensusSession *session = cls; + struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; struct ConsensusPeerInformation *cpi; - struct DifferenceDigest *digest; - int msize; - uint64_t *key_dst; - uint32_t *hash_dst; - uint8_t *count_dst; - int num_buckets; - - cpi = cls; - cpi->wh = NULL; - - GNUNET_assert (GNUNET_STREAM_OK == status); - - GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); + struct GNUNET_SET_OperationHandle *set_op; + struct RoundInfo round_info; + int index; + int cmp; - if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) + if (NULL == context_msg) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); - /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ + GNUNET_break_op (0); return; } - /* remaining buckets */ - num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; - - /* limit to maximum */ - if (num_buckets > BUCKETS_PER_MESSAGE) - num_buckets = BUCKETS_PER_MESSAGE; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<ibf_order)); - - msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); - - digest = GNUNET_malloc (msize); - digest->header.size = htons (msize); - digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); - digest->order = cpi->ibf_order; + index = get_peer_idx (other_peer, session); - key_dst = (uint64_t *) &digest[1]; - - memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst); - key_dst += num_buckets; - - hash_dst = (uint32_t *) key_dst; - - memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst); - hash_dst += num_buckets; - - count_dst = (uint8_t *) hash_dst; + if (index < 0) + { + GNUNET_break_op (0); + return; + } - memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst); + round_info.round = ntohl (msg->round); + round_info.exp_round = ntohl (msg->exp_round); + round_info.exp_subround = ntohl (msg->exp_subround); - cpi->ibf_bucket_counter += num_buckets; + cpi = &session->info[index]; - cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, - write_ibf, cpi); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got set request from P%d\n", session->local_peer_idx, index); - GNUNET_assert (NULL != cpi->wh); + switch (session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + cmp = rounds_compare (session, &round_info); + if (cmp > 0) + { + /* the other peer is too late */ + GNUNET_break_op (0); + return; + } + /* kill old request, if any. this is legal, + * as the other peer would not make a new request if it would want to + * complete the old one! */ + if (NULL != cpi->set_op) + { + GNUNET_SET_operation_cancel (cpi->set_op); + cpi->set_op = NULL; + } + set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, + set_result_cb, &session->info[index]); + if (cmp == 0) + { + cpi->set_op = set_op; + GNUNET_SET_commit (set_op, session->element_set); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d commited to set request from P%d\n", session->local_peer_idx, index); + } + else + { + /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */ + cpi->delayed_set_op = set_op; + cpi->delayed_round_info = round_info; + cpi->exp_subround_finished = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d delaying set request from P%d\n", session->local_peer_idx, index); + } + break; + default: + GNUNET_break_op (0); + return; + } } /** - * Functions of this signature are called whenever writing operations - * on a stream are executed + * Initialize the session, continue receiving messages from the owning client * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written + * @param session the session to initialize + * @param join_msg the join message from the client */ -static void -write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) +static void +initialize_session (struct ConsensusSession *session, + struct GNUNET_CONSENSUS_JoinMessage *join_msg) { - struct ConsensusPeerInformation *cpi; - uint64_t key; - struct GNUNET_HashCode hashcode; - int side; - int msize; - - GNUNET_assert (GNUNET_STREAM_OK == status); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); - - cpi = cls; - cpi->wh = NULL; + struct ConsensusSession *other_session; - GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); + initialize_session_peer_list (session, join_msg); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); + compute_global_id (session, &join_msg->session_id); - for (;;) + /* check if some local client already owns the session. + * it is only legal to have a session with an existing global id + * if all other sessions with this global id are finished.*/ + other_session = sessions_head; + while (NULL != other_session) { - int res; - res = ibf_decode (cpi->ibf, &side, &key); - if (GNUNET_SYSERR == res) - { - cpi->ibf_order++; - prepare_ibf (cpi); - cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_state = IBF_STATE_TRANSMITTING; - write_ibf (cls, status, size); - return; - } - if (GNUNET_NO == res) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); - return; - } - if (-1 == side) - { - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_MessageHeader *element_msg; - ibf_hashcode_from_key (key, &hashcode); - /* FIXME: this only transmits one element stored with the key */ - element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); - if (NULL == element) - continue; - msize = sizeof (struct GNUNET_MessageHeader) + element->size; - element_msg = GNUNET_malloc (msize); - element_msg->size = htons (msize); - element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); - GNUNET_assert (NULL != element->data); - memcpy (&element_msg[1], element->data, element->size); - cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, - write_requests_and_elements, cpi); - GNUNET_free (element_msg); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); - - GNUNET_assert (NULL != cpi->wh); - return; - } - else + if ((other_session != session) && + (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) { - struct ElementRequest *msg; - size_t msize; - uint64_t *p; - - msize = (sizeof *msg) + sizeof (uint64_t); - msg = GNUNET_malloc (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); - msg->header.size = htons (msize); - p = (uint64_t *) &msg[1]; - *p = key; - - cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, - write_requests_and_elements, cpi); - GNUNET_assert (NULL != cpi->wh); - GNUNET_free (msg); - return; + if (CONSENSUS_ROUND_FINISH != other_session->current_round) + { + GNUNET_break (0); + destroy_session (session); + return; + } + break; } + other_session = other_session->next; } + session->local_peer_idx = get_peer_idx (&my_peer, session); + GNUNET_assert (-1 != session->local_peer_idx); + session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); + GNUNET_assert (NULL != session->element_set); + session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, + &session->global_id, + set_listen_cb, session); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); } - -/* -static void -select_best_group (struct ConsensusSession *session) +static struct ConsensusSession * +get_session_by_client (struct GNUNET_SERVER_Client *client) { + struct ConsensusSession *session; + + session = sessions_head; + while (NULL != session) + { + if (session->client == client) + return session; + session = session->next; + } + return NULL; } -*/ /** - * Called when a client performs the conclude operation. + * Called when a client wants to join a consensus session. * - * @param cls (unused) - * @param client client handle - * @param message message sent by the client + * @param cls unused + * @param client client that sent the message + * @param m message sent by the client */ static void -client_conclude (void *cls, +client_join (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *m) { struct ConsensusSession *session; - int i; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); - session = sessions_head; - while ((session != NULL) && (session->client != client)) - session = session->next; - if (NULL == session) - { - /* client not found */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - return; - } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n"); - if (GNUNET_YES == session->conclude_requested) + session = get_session_by_client (client); + if (NULL != session) { - /* client requested conclude twice */ GNUNET_break (0); - disconnect_client (client); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - - session->conclude_requested = GNUNET_YES; - - for (i = 0; i < session->num_peers; i++) - { - if ( (GNUNET_YES == session->info[i].is_outgoing) && - (GNUNET_YES == session->info[i].hello) ) - { - /* kick off transmitting strata by calling the write continuation */ - write_strata (&session->info[i], GNUNET_STREAM_OK, 0); - } - } - + session = GNUNET_new (struct ConsensusSession); + session->client = client; + session->client_mq = GNUNET_MQ_queue_for_server_client (client); + GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); + initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); GNUNET_SERVER_receive_done (client, GNUNET_OK); - send_next (session); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n"); } /** - * Called when a client sends an ack + * Called when a client performs an insert operation. * * @param cls (unused) * @param client client handle - * @param message message sent by the client + * @param m message sent by the client */ void -client_ack (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +client_insert (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) { struct ConsensusSession *session; - struct GNUNET_CONSENSUS_AckMessage *msg; - struct PendingElement *pending; - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_HashCode key; + struct GNUNET_CONSENSUS_ElementMessage *msg; + struct GNUNET_SET_Element *element; + ssize_t element_size; - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - break; - } + session = get_session_by_client (client); if (NULL == session) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n"); + GNUNET_break (0); GNUNET_SERVER_client_disconnect (client); return; } - pending = session->approval_pending_head; - - GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending); - - msg = (struct GNUNET_CONSENSUS_AckMessage *) message; - - if (msg->keep) + if (CONSENSUS_ROUND_BEGIN != session->current_round) { - element = pending->element; - GNUNET_CRYPTO_hash (element, element->size, &key); + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; + } - GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - strata_insert (session->strata, &key); + msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; + element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + if (element_size < 0) + { + GNUNET_break (0); + return; } + element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); + element->type = msg->element_type; + element->size = element_size; + memcpy (&element[1], &msg[1], element_size); + element->data = &element[1]; + GNUNET_SET_add_element (session->element_set, element, NULL, NULL); + GNUNET_free (element); GNUNET_SERVER_receive_done (client, GNUNET_OK); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx); } + /** - * Task that disconnects from core. + * Called when a client performs the conclude operation. * - * @param cls core handle - * @param tc context information (why was this task triggered now) + * @param cls (unused) + * @param client client handle + * @param message message sent by the client */ static void -disconnect_core (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - GNUNET_CORE_disconnect (core); - core = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); -} - - -static void -core_startup (void *cls, - struct GNUNET_CORE_Handle *core, - const struct GNUNET_PeerIdentity *peer) +client_conclude (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { struct ConsensusSession *session; + struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; - my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); - /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ - GNUNET_SCHEDULER_add_now (&disconnect_core, core); - GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); - session = sessions_head; - while (NULL != session) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); + cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; + session = get_session_by_client (client); + if (NULL == session) { - if (NULL != session->join_msg) - initialize_session (session); - session = session->next; + /* client not found */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; + } + if (CONSENSUS_ROUND_BEGIN != session->current_round) + { + /* client requested conclude twice */ + GNUNET_break (0); + return; + } + if (session->num_peers <= 1) + { + /* FIXME: what to do here? */ + //send_client_conclude_done (session); + } + else + { + session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); + /* the 'begin' round is over, start with the next, actual round */ + round_over (session, NULL); } + + GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1862,58 +1185,33 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - - /* FIXME: complete; write separate destructors for different data types */ - - while (NULL != incoming_sockets_head) - { - struct IncomingSocket *socket; - socket = incoming_sockets_head; - if (NULL == socket->cpi) - { - GNUNET_STREAM_close (socket->socket); - } - incoming_sockets_head = incoming_sockets_head->next; - GNUNET_free (socket); - } - while (NULL != sessions_head) - { - struct ConsensusSession *session; - int i; - - session = sessions_head; - - for (i = 0; session->num_peers; i++) - { - struct ConsensusPeerInformation *cpi; - cpi = &session->info[i]; - if ((NULL != cpi) && (NULL != cpi->socket)) - { - GNUNET_STREAM_close (cpi->socket); - } - } - - if (NULL != session->client) - GNUNET_SERVER_client_disconnect (session->client); + destroy_session (sessions_head); - sessions_head = sessions_head->next; - GNUNET_free (session); - } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); +} - if (NULL != core) - { - GNUNET_CORE_disconnect (core); - core = NULL; - } - if (NULL != listener) - { - GNUNET_STREAM_listen_close (listener); - listener = NULL; - } +/** + * Clean up after a client after it is + * disconnected (either by us or by itself) + * + * @param cls closure, unused + * @param client the client to clean up after + */ +void +handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + struct ConsensusSession *session; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); + session = get_session_by_client (client); + if (NULL == session) + return; + if ((CONSENSUS_ROUND_BEGIN == session->current_round) || + (CONSENSUS_ROUND_FINISH == session->current_round)) + destroy_session (session); + else + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client disconnected, but waiting for consensus to finish\n"); } @@ -1925,39 +1223,30 @@ shutdown_task (void *cls, * @param c configuration to use */ static void -run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) +run (void *cls, struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *c) { - /* core is only used to retrieve the peer identity */ - static const struct GNUNET_CORE_MessageHandler core_handlers[] = { - {NULL, 0, 0} - }; static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { - {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, - {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, - {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, - sizeof (struct GNUNET_CONSENSUS_AckMessage)}, + {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, + {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, {NULL, NULL, 0, 0} }; cfg = c; srv = server; - + if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &my_peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + } GNUNET_SERVER_add_handlers (server, server_handlers); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); - - listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, - listen_cb, NULL, - GNUNET_STREAM_OPTION_END); - - - /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ - core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers); - GNUNET_assert (NULL != core); - - GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); + GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); } @@ -1973,7 +1262,7 @@ main (int argc, char *const *argv) { int ret; ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); return (GNUNET_OK == ret) ? 0 : 1; }