X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconsensus%2Fgnunet-service-consensus.c;h=ffd9786d33522098acf1f6efa80995b28e8a6a8a;hb=fdaa7877af4902433a51c217ea379e1accb63090;hp=a7640c51f003f8e52c6cf3dc5f7b3f6556372744;hpb=e77e2db24ef3681f207521e539a2c1ca3584efda;p=oweals%2Fgnunet.git diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index a7640c51f..ffd9786d3 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -1,10 +1,10 @@ /* This file is part of GNUnet - (C) 2012 Christian Grothoff (and other contributing authors) + (C) 2012, 2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -25,21 +25,16 @@ */ #include "platform.h" -#include "gnunet_common.h" +#include "gnunet_util_lib.h" #include "gnunet_protocols.h" #include "gnunet_applications.h" -#include "gnunet_util_lib.h" +#include "gnunet_set_service.h" #include "gnunet_consensus_service.h" -#include "gnunet_core_service.h" -#include "gnunet_stream_lib.h" - #include "consensus_protocol.h" #include "consensus.h" -#include "ibf.h" -#include "strata_estimator.h" -/* +/** * Log macro that prefixes the local peer and the peer we are in contact with. */ #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ @@ -47,82 +42,19 @@ /** - * Number of IBFs in a strata estimator. - */ -#define SE_STRATA_COUNT 32 -/** - * Size of the IBFs in the strata estimator. - */ -#define SE_IBF_SIZE 80 -/** - * hash num parameter for the difference digests and strata estimators - */ -#define SE_IBF_HASH_NUM 3 - -/** - * Number of buckets that can be transmitted in one message. - */ -#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) - -/** - * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). - * Choose this value so that computing the IBF is still cheaper - * than transmitting all values. - */ -#define MAX_IBF_ORDER (16) - -/** - * Number of exponential rounds, used in the inventory and completion round. + * Number of exponential rounds, used in the exp and completion round. */ -#define NUM_EXP_ROUNDS (4) - +#define NUM_EXP_ROUNDS 4 /* forward declarations */ /* mutual recursion with struct ConsensusSession */ struct ConsensusPeerInformation; -struct MessageQueue; - /* mutual recursion with round_over */ static void subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); -/* mutial recursion with transmit_queued */ -static void -client_send_next (struct MessageQueue *mq); - -/* mutual recursion with mst_session_callback */ -static void -open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); - -static int -mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); - - -/** - * Additional information about a consensus element. - */ -struct ElementInfo -{ - /** - * The element itself. - */ - struct GNUNET_CONSENSUS_Element *element; - /** - * Hash of the element - */ - struct GNUNET_HashCode *element_hash; - /** - * Number of other peers that have the element in the inventory. - */ - unsigned int inventory_count; - /** - * Bitmap of peers that have this element in their inventory - */ - uint8_t *inventory_bitmap; -}; - /** * Describes the current round a consensus session is in. @@ -138,7 +70,8 @@ enum ConsensusRound */ CONSENSUS_ROUND_EXCHANGE, /** - * Exchange which elements each peer has, but not the elements. + * Exchange which elements each peer has, but don't + * transmit the element's data, only their SHA-512 hashes. * This round uses the all-to-all scheme. */ CONSENSUS_ROUND_INVENTORY, @@ -153,79 +86,27 @@ enum ConsensusRound CONSENSUS_ROUND_FINISH }; -/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ /** - * State of another peer with respect to the - * current ibf. + * Complete information about the current round and all + * subrounds. */ -enum ConsensusIBFState { +struct RoundInfo +{ /** - * There is nothing going on with the IBF. + * The current main round. */ - IBF_STATE_NONE=0, + enum ConsensusRound round; /** - * We currently receive an ibf. + * The current exp round, valid if + * the main round is an exp round. */ - IBF_STATE_RECEIVING, - /* - * we decode a received ibf - */ - IBF_STATE_DECODING, + uint32_t exp_round; /** - * wait for elements and element requests + * The current exp subround, valid if + * the main round is an exp round. */ - IBF_STATE_ANTICIPATE_DIFF -}; - - -typedef void (*AddCallback) (struct MessageQueue *mq); -typedef void (*MessageSentCallback) (void *cls); - - -/** - * Collection of the state necessary to read and write gnunet messages - * to a stream socket. Should be used as closure for stream_data_processor. - */ -struct MessageStreamState -{ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - struct MessageQueue *mq; - void *mst_cls; - struct GNUNET_STREAM_Socket *socket; - struct GNUNET_STREAM_ReadHandle *rh; - struct GNUNET_STREAM_WriteHandle *wh; -}; - - -struct ServerClientSocketState -{ - struct GNUNET_SERVER_Client *client; - struct GNUNET_SERVER_TransmitHandle* th; -}; - - -/** - * Generic message queue, for queueing outgoing messages. - */ -struct MessageQueue -{ - void *state; - AddCallback add_cb; - struct PendingMessage *pending_head; - struct PendingMessage *pending_tail; - struct PendingMessage *current_pm; -}; - - -struct PendingMessage -{ - struct GNUNET_MessageHeader *msg; - struct MessageQueue *parent_queue; - struct PendingMessage *next; - struct PendingMessage *prev; - MessageSentCallback sent_cb; - void *sent_cb_cls; + uint32_t exp_subround; }; @@ -244,13 +125,6 @@ struct ConsensusSession */ struct ConsensusSession *prev; - /** - * Join message. Used to initialize the session later, - * if the identity of the local peer is not yet known. - * NULL if the session has been fully initialized. - */ - struct GNUNET_CONSENSUS_JoinMessage *join_msg; - /** * Global consensus identification, computed * from the session id and participating authorities. @@ -258,45 +132,34 @@ struct ConsensusSession struct GNUNET_HashCode global_id; /** - * The server's client and associated local state + * Client that inhabits the session */ - struct ServerClientSocketState scss; + struct GNUNET_SERVER_Client *client; /** * Queued messages to the client. */ - struct MessageQueue *client_mq; - - /** - * IBF_Key -> 2^(HashCode*) - * FIXME: - * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. - */ - struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; + struct GNUNET_MQ_Handle *client_mq; /** - * Maps HashCodes to ElementInfos + * Time when the conclusion of the consensus should begin. */ - struct GNUNET_CONTAINER_MultiHashMap *values; - - /** - * Currently active transmit handle for sending to the client - */ - struct GNUNET_SERVER_TransmitHandle *client_th; + struct GNUNET_TIME_Absolute conclude_start; /** * Timeout for all rounds together, single rounds will schedule a timeout task * with a fraction of the conclude timeout. + * Only valid once the current round is not CONSENSUS_ROUND_BEGIN. */ - struct GNUNET_TIME_Relative conclude_timeout; - + struct GNUNET_TIME_Absolute conclude_deadline; + /** - * Timeout task identifier for the current round + * Timeout task identifier for the current round. */ GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; /** - * Number of other peers in the consensus + * Number of other peers in the consensus. */ unsigned int num_peers; @@ -306,26 +169,11 @@ struct ConsensusSession */ struct ConsensusPeerInformation *info; - /** - * GNUNET_YES if the client has called conclude. - * */ - int conclude; - /** * Index of the local peer in the peers array */ unsigned int local_peer_idx; - /** - * Strata estimator, computed online - */ - struct StrataEstimator *se; - - /** - * Pre-computed IBFs - */ - struct InvertibleBloomFilter **ibfs; - /** * Current round */ @@ -333,81 +181,56 @@ struct ConsensusSession /** * Permutation of peers for the current round, - * maps logical index (for current round) to physical index (location in info array) - */ - int *shuffle; - - int exp_round; - - int exp_subround; - - /** - * The partner for the current exp-round */ - struct ConsensusPeerInformation* partner_outgoing; + uint32_t *shuffle; /** - * The partner for the current exp-round + * Inverse permutation of peers for the current round, */ - struct ConsensusPeerInformation* partner_incoming; -}; + uint32_t *shuffle_inv; - -/** - * Information about a peer that is in a consensus session. - */ -struct ConsensusPeerInformation -{ /** - * Peer identitty of the peer in the consensus session + * Current round of the exponential scheme. */ - struct GNUNET_PeerIdentity peer_id; + uint32_t exp_round; /** - * Do we connect to the peer, or does the peer connect to us? - * Only valid for all-to-all phases + * Current sub-round of the exponential scheme. */ - int is_outgoing; + uint32_t exp_subround; /** - * Did we receive/send a consensus hello? - */ - int hello; - - /* - * FIXME + * The partner for the current exp-round */ - struct MessageStreamState mss; + struct ConsensusPeerInformation *partner_outgoing; /** - * Current state + * The partner for the current exp-round */ - enum ConsensusIBFState ibf_state; + struct ConsensusPeerInformation *partner_incoming; /** - * What is the order (=log2 size) of the ibf - * we're currently dealing with? - * Interpretation depends on ibf_state. + * The consensus set of this session. */ - int ibf_order; + struct GNUNET_SET_Handle *element_set; /** - * The current IBF for this peer, - * purpose dependent on ibf_state + * Listener for requests from other peers. + * Uses the session's global id as app id. */ - struct InvertibleBloomFilter *ibf; + struct GNUNET_SET_ListenHandle *set_listener; +}; - /** - * How many buckets have we transmitted/received? - * Interpretatin depends on ibf_state - */ - int ibf_bucket_counter; +/** + * Information about a peer that is in a consensus session. + */ +struct ConsensusPeerInformation +{ /** - * Strata estimator of the peer, NULL if our peer - * initiated the reconciliation. + * Peer identitty of the peer in the consensus session */ - struct StrataEstimator *se; + struct GNUNET_PeerIdentity peer_id; /** * Back-reference to the consensus session, @@ -415,93 +238,28 @@ struct ConsensusPeerInformation */ struct ConsensusSession *session; - /** - * True if we are actually replaying the strata message, - * e.g. currently handling the premature_strata_message. - */ - int replaying_strata_message; - - /** - * A strata message that is not actually for the current round, - * used in the exp-scheme. - */ - struct StrataMessage *premature_strata_message; - /** * We have finishes the exp-subround with the peer. */ int exp_subround_finished; /** - * GNUNET_YES if we synced inventory with this peer; - * GNUNET_NO otherwise. - */ - int inventory_synced; - - /** - * Round this peer seems to be in, according to the last SE we got. - * Necessary to store this, as we sometimes need to respond to a request from an - * older round, while we are already in the next round. - */ - enum ConsensusRound apparent_round; -}; - - -/** - * Sockets from other peers who want to communicate with us. - * It may not be known yet which consensus session they belong to, we have to wait for the - * peer's hello. - * Also, the session might not exist yet locally, we have to wait for a local client to connect. - */ -struct IncomingSocket -{ - /** - * Incoming sockets are kept in a double linked list. - */ - struct IncomingSocket *next; - - /** - * Incoming sockets are kept in a double linked list. - */ - struct IncomingSocket *prev; - - /** - * Peer that connected to us with the socket. + * Set operation we are currently executing with this peer. */ - struct GNUNET_PeerIdentity peer_id; + struct GNUNET_SET_OperationHandle *set_op; /** - * Peer-in-session this socket belongs to, once known, otherwise NULL. + * Set operation we are planning on executing with this peer. */ - struct ConsensusPeerInformation *cpi; + struct GNUNET_SET_OperationHandle *delayed_set_op; /** - * Set to the global session id, if the peer sent us a hello-message, - * but the session does not exist yet. - */ - struct GNUNET_HashCode *requested_gid; - - /* - * Timeout, will disconnect the socket if not yet in a session. - * FIXME: implement + * Info about the round of the delayed set operation. */ - GNUNET_SCHEDULER_TaskIdentifier timeout; - - /* FIXME */ - struct MessageStreamState mss; + struct RoundInfo delayed_round_info; }; -/** - * Linked list of incoming sockets. - */ -static struct IncomingSocket *incoming_sockets_head; - -/** - * Linked list of incoming sockets. - */ -static struct IncomingSocket *incoming_sockets_tail; - /** * Linked list of sessions this peer participates in. */ @@ -525,1364 +283,375 @@ static struct GNUNET_SERVER_Handle *srv; /** * Peer that runs this service. */ -static struct GNUNET_PeerIdentity *my_peer; - -/** - * Handle to the core service. Only used during service startup, will be NULL after that. - */ -static struct GNUNET_CORE_Handle *core; - -/** - * Listener for sockets from peers that want to reconcile with us. - */ -static struct GNUNET_STREAM_ListenSocket *listener; - - -/** - * Transmit a queued message to the session's client. - * - * @param cls consensus session - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) -{ - struct MessageQueue *mq = cls; - struct PendingMessage *pm = mq->pending_head; - struct ServerClientSocketState *state = mq->state; - size_t msg_size; - - GNUNET_assert (NULL != pm); - GNUNET_assert (NULL != buf); - msg_size = ntohs (pm->msg->size); - GNUNET_assert (size >= msg_size); - memcpy (buf, pm->msg, msg_size); - GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); - state->th = NULL; - client_send_next (cls); - GNUNET_free (pm); - return msg_size; -} - - -static void -client_send_next (struct MessageQueue *mq) -{ - struct ServerClientSocketState *state = mq->state; - int msize; - - GNUNET_assert (NULL != state); - - if ( (NULL != state->th) || - (NULL == mq->pending_head) ) - return; - msize = ntohs (mq->pending_head->msg->size); - state->th = - GNUNET_SERVER_notify_transmit_ready (state->client, msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, mq); -} +static struct GNUNET_PeerIdentity my_peer; -struct MessageQueue * -create_message_queue_for_server_client (struct ServerClientSocketState *scss) +static int +have_exp_subround_finished (const struct ConsensusSession *session) { - struct MessageQueue *mq; - mq = GNUNET_new (struct MessageQueue); - mq->add_cb = client_send_next; - mq->state = scss; - return mq; + int not_finished; + not_finished = 0; + if ( (NULL != session->partner_outgoing) && + (GNUNET_NO == session->partner_outgoing->exp_subround_finished) ) + not_finished++; + if ( (NULL != session->partner_incoming) && + (GNUNET_NO == session->partner_incoming->exp_subround_finished) ) + not_finished++; + if (0 == not_finished) + return GNUNET_YES; + return GNUNET_NO; } /** - * Functions of this signature are called whenever writing operations - * on a stream are executed + * Destroy a session, free all resources associated with it. * - * @param cls the closure from GNUNET_STREAM_write - * @param status the status of the stream at the time this function is called; - * GNUNET_STREAM_OK if writing to stream was completed successfully; - * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully - * (this doesn't mean that the data is never sent, the receiver may - * have read the data but its ACKs may have been lost); - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot - * be processed. - * @param size the number of bytes written + * @param session the session to destroy */ -static void -write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) -{ - struct MessageQueue *mq = cls; - struct MessageStreamState *mss = mq->state; - struct PendingMessage *pm; - - GNUNET_assert (GNUNET_STREAM_OK == status); - - /* call cb for message we finished sending */ - pm = mq->current_pm; - if (NULL != pm) - { - if (NULL != pm->sent_cb) - pm->sent_cb (pm->sent_cb_cls); - GNUNET_free (pm); - } - - mss->wh = NULL; - - pm = mq->pending_head; - mq->current_pm = pm; - if (NULL == pm) - return; - GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); - mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); - GNUNET_assert (NULL != mss->wh); -} - - static void -stream_socket_add_cb (struct MessageQueue *mq) -{ - if (NULL != mq->current_pm) - return; - write_queued (mq, GNUNET_STREAM_OK, 0); -} - - -struct MessageQueue * -create_message_queue_for_stream_socket (struct MessageStreamState *mss) +destroy_session (struct ConsensusSession *session) { - struct MessageQueue *mq; - mq = GNUNET_new (struct MessageQueue); - mq->state = mss; - mq->add_cb = stream_socket_add_cb; - return mq; -} - + int i; -struct PendingMessage * -new_pending_message (uint16_t size, uint16_t type) -{ - struct PendingMessage *pm; - pm = GNUNET_malloc (sizeof *pm + size); - pm->msg = (void *) &pm[1]; - pm->msg->size = htons (size); - pm->msg->type = htons (type); - return pm; + GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); + if (NULL != session->element_set) + { + GNUNET_SET_destroy (session->element_set); + session->element_set = NULL; + } + if (NULL != session->set_listener) + { + GNUNET_SET_listen_cancel (session->set_listener); + session->set_listener = NULL; + } + if (NULL != session->client_mq) + { + GNUNET_MQ_destroy (session->client_mq); + session->client_mq = NULL; + } + if (NULL != session->client) + { + GNUNET_SERVER_client_disconnect (session->client); + session->client = NULL; + } + if (NULL != session->shuffle) + { + GNUNET_free (session->shuffle); + session->shuffle = NULL; + } + if (NULL != session->shuffle_inv) + { + GNUNET_free (session->shuffle_inv); + session->shuffle_inv = NULL; + } + if (NULL != session->info) + { + for (i = 0; i < session->num_peers; i++) + { + struct ConsensusPeerInformation *cpi; + cpi = &session->info[i]; + if (NULL != cpi->set_op) + { + GNUNET_SET_operation_cancel (cpi->set_op); + cpi->set_op = NULL; + } + } + GNUNET_free (session->info); + session->info = NULL; + } + GNUNET_free (session); } /** - * Queue a message in a message queue. + * Iterator for set elements. * - * @param queue the message queue - * @param pending message, message with additional information + * @param cls closure + * @param element the current element, NULL if all elements have been + * iterated over + * @return GNUNET_YES to continue iterating, GNUNET_NO to stop. */ -void -message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg) +static int +send_to_client_iter (void *cls, + const struct GNUNET_SET_Element *element) { - GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg); - queue->add_cb (queue); -} - + struct ConsensusSession *session = cls; + struct GNUNET_MQ_Envelope *ev; -/** - * Called when we receive data from a peer via stream. - * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). - */ -static size_t -stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) -{ - struct MessageStreamState *mss = cls; - int ret; + if (NULL != element) + { + struct GNUNET_CONSENSUS_ElementMessage *m; - mss->rh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n", + session->local_peer_idx); - if (GNUNET_STREAM_OK != status) - { - /* FIXME: handle this correctly */ - GNUNET_break (0); - return 0; + ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); + m->element_type = htons (element->type); + memcpy (&m[1], element->data, element->size); + GNUNET_MQ_send (session->client_mq, ev); } - GNUNET_assert (NULL != mss->mst); - ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES); - if (GNUNET_SYSERR == ret) + else { - /* FIXME: handle this correctly */ - GNUNET_break (0); - return 0; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n", + session->local_peer_idx); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); + GNUNET_MQ_send (session->client_mq, ev); } - /* read again */ - mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss); - /* we always read all data */ - return size; + return GNUNET_YES; } /** - * Send element or element report to the peer specified in cpi. + * Start the next round. + * This function can be invoked as a timeout task, or called manually (tc will be NULL then). * - * @param cpi peer to send the elements to - * @param head head of the element list + * @param cls the session + * @param tc task context, for when this task is invoked by the scheduler, + * NULL if invoked for another reason */ static void -send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e) -{ - struct PendingMessage *pm; - - switch (cpi->apparent_round) - { - case CONSENSUS_ROUND_COMPLETION: - case CONSENSUS_ROUND_EXCHANGE: - pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size, - GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); - memcpy (&pm->msg[1], e->element->data, e->element->size); - message_queue_add (cpi->mss.mq, pm); - break; - case CONSENSUS_ROUND_INVENTORY: - pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode), - GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); - memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode)); - message_queue_add (cpi->mss.mq, pm); - break; - default: - GNUNET_break (0); - } -} - - -/** - * Iterator to insert values into an ibf. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -ibf_values_iterator (void *cls, - const struct GNUNET_HashCode *key, - void *value) +round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct ConsensusPeerInformation *cpi = cls; - struct ElementInfo *e = value; - struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash); - - GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); - ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); - return GNUNET_YES; -} + struct ConsensusSession *session; -/** - * Create and populate an IBF for the specified peer, - * if it does not already exist. - * - * @param cpi peer to create the ibf for - */ -static void -prepare_ibf (struct ConsensusPeerInformation *cpi) -{ - if (NULL != cpi->session->ibfs[cpi->ibf_order]) + /* don't kick off next round if we're shutting down */ + if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); - GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); -} - -/** - * Called when a remote peer wants to inform the local peer - * that the remote peer misses elements. - * Elements are not reconciled. - * - * @param cpi session - * @param msg message - */ -static int -handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) -{ - GNUNET_assert (0); -} - - -static int -exp_subround_finished (const struct ConsensusSession *session) -{ - int not_finished; - not_finished = 0; - if ( (NULL != session->partner_outgoing) && - (GNUNET_NO == session->partner_outgoing->exp_subround_finished) ) - not_finished++; - if ( (NULL != session->partner_incoming) && - (GNUNET_NO == session->partner_incoming->exp_subround_finished) ) - not_finished++; - if (0 == not_finished) - return GNUNET_YES; - return GNUNET_NO; -} - - -static int -inventory_round_finished (struct ConsensusSession *session) -{ - int i; - int finished; - finished = 0; - for (i = 0; i < session->num_peers; i++) - if (GNUNET_YES == session->info[i].inventory_synced) - finished++; - if (finished >= (session->num_peers / 2)) - return GNUNET_YES; - return GNUNET_NO; -} - - -static void -clear_message_stream_state (struct MessageStreamState *mss) -{ - if (NULL != mss->mst) - { - GNUNET_SERVER_mst_destroy (mss->mst); - mss->mst = NULL; - } - if (NULL != mss->rh) - { - GNUNET_STREAM_read_cancel (mss->rh); - mss->rh = NULL; - } - if (NULL != mss->wh) - { - GNUNET_STREAM_write_cancel (mss->wh); - mss->wh = NULL; - } - if (NULL != mss->socket) - { - GNUNET_STREAM_close (mss->socket); - mss->socket = NULL; - } - if (NULL != mss->mq) - { - GNUNET_free (mss->mq); - mss->mq = NULL; - } -} - - -/** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -destroy_element_info_iter (void *cls, - const struct GNUNET_HashCode * key, - void *value) -{ - struct ElementInfo *ei = value; - GNUNET_free (ei->element); - GNUNET_free (ei->element_hash); - GNUNET_free (ei); - return GNUNET_YES; -} - - -/** - * Destroy a session, free all resources associated with it. - * - * @param session the session to destroy - */ -static void -destroy_session (struct ConsensusSession *session) -{ - int i; - - GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); - GNUNET_SERVER_client_drop (session->scss.client); - session->scss.client = NULL; - if (NULL != session->client_mq) - { - GNUNET_free (session->client_mq); - session->client_mq = NULL; - } - if (NULL != session->shuffle) - { - GNUNET_free (session->shuffle); - session->shuffle = NULL; - } - if (NULL != session->se) - { - strata_estimator_destroy (session->se); - session->se = NULL; - } - if (NULL != session->info) - { - for (i = 0; i < session->num_peers; i++) - { - struct ConsensusPeerInformation *cpi; - cpi = &session->info[i]; - clear_message_stream_state (&cpi->mss); - if (NULL != cpi->se) - { - strata_estimator_destroy (cpi->se); - cpi->se = NULL; - } - if (NULL != cpi->ibf) - { - ibf_destroy (cpi->ibf); - cpi->ibf = NULL; - } - } - GNUNET_free (session->info); - session->info = NULL; - } - if (NULL != session->ibfs) - { - for (i = 0; i <= MAX_IBF_ORDER; i++) - { - if (NULL != session->ibfs[i]) - { - ibf_destroy (session->ibfs[i]); - session->ibfs[i] = NULL; - } - } - GNUNET_free (session->ibfs); - session->ibfs = NULL; - } - if (NULL != session->values) - { - GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL); - GNUNET_CONTAINER_multihashmap_destroy (session->values); - session->values = NULL; - } + session = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx); - if (NULL != session->ibf_key_map) + if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) { - GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map); - session->ibf_key_map = NULL; + GNUNET_SCHEDULER_cancel (session->round_timeout_tid); + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; } - GNUNET_free (session); -} - -static void -send_client_conclude_done (struct ConsensusSession *session) -{ - struct PendingMessage *pm; - - /* check if client is even there anymore */ - if (NULL == session->scss.client) - return; - pm = new_pending_message (sizeof (struct GNUNET_MessageHeader), - GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); - message_queue_add (session->client_mq, pm); -} - - -/** - * Check if a strata message is for the current round or not - * - * @param session session we are in - * @param strata_msg the strata message to check - * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise - */ -static int -is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) -{ - switch (strata_msg->round) + switch (session->current_round) { - case CONSENSUS_ROUND_COMPLETION: + case CONSENSUS_ROUND_BEGIN: + session->current_round = CONSENSUS_ROUND_EXCHANGE; + session->exp_round = 0; + subround_over (session, NULL); + break; case CONSENSUS_ROUND_EXCHANGE: - /* here, we also have to compare subrounds */ - if ( (strata_msg->round != session->current_round) || - (strata_msg->exp_round != session->exp_round) || - (strata_msg->exp_subround != session->exp_subround) ) - return GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n", + session->local_peer_idx); + session->current_round = CONSENSUS_ROUND_FINISH; + GNUNET_SET_iterate (session->element_set, send_to_client_iter, session); break; default: - if (session->current_round != strata_msg->round) - return GNUNET_YES; - break; + GNUNET_assert (0); } - return GNUNET_NO; -} - - -/** - * Send a strata estimator. - * - * @param cpi the peer - */ -static void -send_strata_estimator (struct ConsensusPeerInformation *cpi) -{ - struct PendingMessage *pm; - struct StrataMessage *strata_msg; - - /* FIXME: why is this correct? */ - cpi->apparent_round = cpi->session->current_round; - cpi->ibf_state = IBF_STATE_NONE; - cpi->ibf_bucket_counter = 0; - - LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round); - - pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE), - GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); - strata_msg = (struct StrataMessage *) pm->msg; - strata_msg->round = cpi->session->current_round; - strata_msg->exp_round = cpi->session->exp_round; - strata_msg->exp_subround = cpi->session->exp_subround; - strata_estimator_write (cpi->session->se, &strata_msg[1]); - message_queue_add (cpi->mss.mq, pm); } /** - * Send an IBF of the order specified in cpi. + * Create a new permutation for the session's peers in session->shuffle. + * Uses a Fisher-Yates shuffle with pseudo-randomness coming from + * both the global session id and the current round index. * - * @param cpi the peer + * @param session the session to create the new permutation for */ static void -send_ibf (struct ConsensusPeerInformation *cpi) -{ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - - cpi->ibf_bucket_counter = 0; - while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) - { - unsigned int num_buckets; - struct PendingMessage *pm; - struct DifferenceDigest *digest; - - num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; - /* limit to maximum */ - if (num_buckets > BUCKETS_PER_MESSAGE) - num_buckets = BUCKETS_PER_MESSAGE; - - pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE), - GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); - digest = (struct DifferenceDigest *) pm->msg; - digest->order = cpi->ibf_order; - digest->round = cpi->apparent_round; - ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]); - cpi->ibf_bucket_counter += num_buckets; - message_queue_add (cpi->mss.mq, pm); - } - cpi->ibf_bucket_counter = 0; - cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; -} - - -/** - * Called when a peer sends us its strata estimator. - * In response, we sent out IBF of appropriate size back. - * - * @param cpi session - * @param strata_msg message - */ -static int -handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) +shuffle (struct ConsensusSession *session) { - unsigned int diff; - - if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) && - (strata_msg->round == CONSENSUS_ROUND_INVENTORY) ) - { - /* we still have to handle this request appropriately */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - } - else if (is_premature_strata_message (cpi->session, strata_msg)) - { - if (GNUNET_NO == cpi->replaying_strata_message) - { - LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n", - strata_msg->exp_round, strata_msg->exp_subround); - cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header); - } - return GNUNET_YES; - } - - if (NULL == cpi->se) - cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); + uint32_t i; + uint32_t randomness[session->num_peers-1]; - cpi->apparent_round = strata_msg->round; + if (NULL == session->shuffle) + session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle)); + if (NULL == session->shuffle_inv) + session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv)); - if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) - { - LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n"); - return GNUNET_NO; - } - strata_estimator_read (&strata_msg[1], cpi->se); - GNUNET_assert (NULL != cpi->session->se); - diff = strata_estimator_difference (cpi->session->se, cpi->se); + GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), + &session->exp_round, sizeof (uint32_t), + &session->global_id, sizeof (struct GNUNET_HashCode), + NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); + for (i = 0; i < session->num_peers; i++) + session->shuffle[i] = i; - switch (cpi->session->current_round) + for (i = session->num_peers - 1; i > 0; i--) { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_INVENTORY: - case CONSENSUS_ROUND_COMPLETION: - /* send IBF of the right size */ - cpi->ibf_order = 0; - while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) ) - cpi->ibf_order++; - if (cpi->ibf_order > MAX_IBF_ORDER) - cpi->ibf_order = MAX_IBF_ORDER; - cpi->ibf_order += 1; - /* create ibf if not already pre-computed */ - prepare_ibf (cpi); - if (NULL != cpi->ibf) - ibf_destroy (cpi->ibf); - cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_bucket_counter = 0; - send_ibf (cpi); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - break; + uint32_t x; + uint32_t tmp; + x = randomness[i-1] % session->num_peers; + tmp = session->shuffle[x]; + session->shuffle[x] = session->shuffle[i]; + session->shuffle[i] = tmp; } - return GNUNET_YES; -} - - -static int -send_elements_iterator (void *cls, - const struct GNUNET_HashCode * key, - void *value) -{ - struct ConsensusPeerInformation *cpi = cls; - struct ElementInfo *ei; - ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value); - if (NULL == ei) - { - LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n", - GNUNET_h2s((struct GNUNET_HashCode *) value)); - return GNUNET_YES; - } - LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n"); - send_element_or_report (cpi, ei); - return GNUNET_YES; + /* create the inverse */ + for (i = 0; i < session->num_peers; i++) + session->shuffle_inv[session->shuffle[i]] = i; } /** - * Decode the current diff ibf, and send elements/requests/reports/ + * Find and set the partner_incoming and partner_outgoing of our peer, + * one of them may not exist (and thus set to NULL) if the number of peers + * in the session is not a power of two. * - * @param cpi partner peer + * @param session the consensus session */ static void -decode (struct ConsensusPeerInformation *cpi) +find_partners (struct ConsensusSession *session) { - struct IBF_Key key; - int side; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - - while (1) - { - int res; - - res = ibf_decode (cpi->ibf, &side, &key); - if (GNUNET_SYSERR == res) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); - /* decoding failed, we tell the other peer by sending our ibf with a larger order */ - cpi->ibf_order++; - prepare_ibf (cpi); - cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); - cpi->ibf_bucket_counter = 0; - send_ibf (cpi); - return; - } - if (GNUNET_NO == res) - { - struct PendingMessage *pm; - struct ConsensusRoundMessage *rmsg; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); - - pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); - rmsg = (struct ConsensusRoundMessage *) pm->msg; - rmsg->round = cpi->apparent_round; - message_queue_add (cpi->mss.mq, pm); - return; - } - if (-1 == side) - { - struct GNUNET_HashCode hashcode; - /* we have the element(s), send it to the other peer */ - ibf_hashcode_from_key (key, &hashcode); - GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); - } - else + unsigned int arc; + unsigned int num_ghosts; + unsigned int largest_arc; + int partner_idx; + + /* shuffled local index */ + int my_idx = session->shuffle[session->local_peer_idx]; + + /* distance to neighboring peer in current subround */ + arc = 1 << session->exp_subround; + largest_arc = 1; + while (largest_arc < session->num_peers) + largest_arc <<= 1; + num_ghosts = largest_arc - session->num_peers; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts); + + if (0 == (my_idx & arc)) + { + /* we are outgoing */ + partner_idx = (my_idx + arc) % session->num_peers; + session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]]; + session->partner_outgoing->exp_subround_finished = GNUNET_NO; + /* are we a 'ghost' of a peer that would exist if + * the number of peers was a power of two, and thus have to partner + * with an additional peer? + */ + if (my_idx < num_ghosts) { - struct PendingMessage *pm; - uint16_t type; - - switch (cpi->apparent_round) + int ghost_partner_idx; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers); + ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers; + /* platform dependent; modulo sometimes returns negative values */ + if (ghost_partner_idx < 0) + ghost_partner_idx += session->num_peers; + /* we only need to have a ghost partner if the partner is outgoing */ + if (0 == (ghost_partner_idx & arc)) { - case CONSENSUS_ROUND_COMPLETION: - /* FIXME: check if we really want to request the element */ - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_INVENTORY: - type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST; - break; - default: - GNUNET_assert (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx); + session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]]; + session->partner_incoming->exp_subround_finished = GNUNET_NO; + return; } - pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key), - type); - *(struct IBF_Key *) &pm->msg[1] = key; - message_queue_add (cpi->mss.mq, pm); } - } -} - - -static int -handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) -{ - int num_buckets; - - /* FIXME: find out if we're still expecting the same ibf! */ - - cpi->apparent_round = cpi->session->current_round; - // FIXME: check header.size >= sizeof (DD) - num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; - switch (cpi->ibf_state) - { - case IBF_STATE_NONE: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->ibf_state = IBF_STATE_RECEIVING; - cpi->ibf_order = digest->order; - cpi->ibf_bucket_counter = 0; - if (NULL != cpi->ibf) - { - ibf_destroy (cpi->ibf); - cpi->ibf = NULL; - } - break; - case IBF_STATE_ANTICIPATE_DIFF: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n", - cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->ibf_state = IBF_STATE_RECEIVING; - cpi->ibf_order = digest->order; - cpi->ibf_bucket_counter = 0; - if (NULL != cpi->ibf) - { - ibf_destroy (cpi->ibf); - cpi->ibf = NULL; - } - break; - case IBF_STATE_RECEIVING: - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - return GNUNET_YES; - } - - if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - return GNUNET_YES; - } - - if (NULL == cpi->ibf) - cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); - - ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf); - cpi->ibf_bucket_counter += num_buckets; - - if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) - { - cpi->ibf_state = IBF_STATE_DECODING; - cpi->ibf_bucket_counter = 0; - prepare_ibf (cpi); - ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); - decode (cpi); - } - return GNUNET_YES; -} - - -/** - * Insert an element into the consensus set of the specified session. - * The element will not be copied, and freed when destroying the session. - * - * @param session session for new element - * @param element element to insert - */ -static void -insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) -{ - struct GNUNET_HashCode hash; - struct ElementInfo *e; - struct IBF_Key ibf_key; - int i; - - e = GNUNET_new (struct ElementInfo); - e->element = element; - e->element_hash = GNUNET_new (struct GNUNET_HashCode); - GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash); - ibf_key = ibf_key_from_hashcode (e->element_hash); - ibf_hashcode_from_key (ibf_key, &hash); - strata_estimator_insert (session->se, &hash); - GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - for (i = 0; i <= MAX_IBF_ORDER; i++) - { - if (NULL == session->ibfs[i]) - continue; - ibf_insert (session->ibfs[i], ibf_key); - } -} - - -/** - * Handle an element that another peer sent us - */ -static int -handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) -{ - struct GNUNET_CONSENSUS_Element *element; - size_t size; - - switch (cpi->session->current_round) - { - case CONSENSUS_ROUND_COMPLETION: - /* FIXME: check if we really expect the element */ - case CONSENSUS_ROUND_EXCHANGE: - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n"); - return GNUNET_YES; - } - - size = ntohs (element_msg->size) - sizeof *element_msg; - - element = GNUNET_malloc (size + sizeof *element); - element->size = size; - memcpy (&element[1], &element_msg[1], size); - element->data = &element[1]; - - LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n"); - - insert_element (cpi->session, element); - - return GNUNET_YES; -} - - -/** - * Handle a request for elements. - * - * @param cpi peer that is requesting the element - * @param msg the element request message - */ -static int -handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) -{ - struct GNUNET_HashCode hashcode; - struct IBF_Key *ibf_key; - unsigned int num; - - /* element requests are allowed in every round */ - - num = ntohs (msg->header.size) / sizeof (struct IBF_Key); - - ibf_key = (struct IBF_Key *) &msg[1]; - while (num--) - { - ibf_hashcode_from_key (*ibf_key, &hashcode); - GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); - ibf_key++; - } - return GNUNET_YES; -} - -static int -is_peer_connected (struct ConsensusPeerInformation *cpi) -{ - if (NULL == cpi->mss.socket) - return GNUNET_NO; - return GNUNET_YES; -} - - -static void -ensure_peer_connected (struct ConsensusPeerInformation *cpi) -{ - if (NULL != cpi->mss.socket) - return; - cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, - open_cb, cpi, GNUNET_STREAM_OPTION_END); -} - - -/** - * If necessary, send a message to the peer, depending on the current - * round. - */ -static void -embrace_peer (struct ConsensusPeerInformation *cpi) -{ - if (GNUNET_NO == is_peer_connected (cpi)) - { - ensure_peer_connected (cpi); - return; - } - if (GNUNET_NO == cpi->hello) + session->partner_incoming = NULL; return; - /* FIXME: correctness of switch */ - switch (cpi->session->current_round) - { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_INVENTORY: - if (cpi->session->partner_outgoing != cpi) - break; - /* fallthrough */ - case CONSENSUS_ROUND_COMPLETION: - send_strata_estimator (cpi); - default: - break; } + /* we only have an incoming connection */ + partner_idx = (my_idx - (int) arc) % (int) session->num_peers; + if (partner_idx < 0) + partner_idx += session->num_peers; + session->partner_outgoing = NULL; + session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]]; + session->partner_incoming->exp_subround_finished = GNUNET_NO; } /** - * Called when stream has finishes writing the hello message - */ -static void -hello_cont (void *cls) -{ - struct ConsensusPeerInformation *cpi = cls; - - cpi->hello = GNUNET_YES; - embrace_peer (cpi); -} - - -/** - * Called when we established a stream connection to another peer + * Callback for set operation results. Called for each element + * in the result set. * - * @param cls cpi of the peer we just connected to - * @param socket socket to use to communicate with the other side (read/write) + * @param cls closure + * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK + * @param status see enum GNUNET_SET_Status */ static void -open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) +set_result_cb (void *cls, + const struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) { struct ConsensusPeerInformation *cpi = cls; - struct PendingMessage *pm; - struct ConsensusHello *hello; - - GNUNET_assert (NULL == cpi->mss.mst); - GNUNET_assert (NULL == cpi->mss.mq); - - cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); - cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); - cpi->mss.mst_cls = cpi; - - pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); - hello = (struct ConsensusHello *) pm->msg; - memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); - pm->sent_cb = hello_cont; - pm->sent_cb_cls = cpi; - message_queue_add (cpi->mss.mq, pm); - cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &stream_data_processor, &cpi->mss); -} - - -static void -replay_premature_message (struct ConsensusPeerInformation *cpi) -{ - if (NULL != cpi->premature_strata_message) - { - struct StrataMessage *sm; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); - sm = cpi->premature_strata_message; - cpi->premature_strata_message = NULL; - - cpi->replaying_strata_message = GNUNET_YES; - handle_p2p_strata (cpi, sm); - cpi->replaying_strata_message = GNUNET_NO; - - GNUNET_free (sm); - } -} - - -/** - * Start the inventory round, contact all peers we are supposed to contact. - * - * @param session the current session - */ -static void -start_inventory (struct ConsensusSession *session) -{ - int i; - int last; - - for (i = 0; i < session->num_peers; i++) - { - session->info[i].ibf_bucket_counter = 0; - session->info[i].ibf_state = IBF_STATE_NONE; - session->info[i].is_outgoing = GNUNET_NO; - } - - last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; - i = (session->local_peer_idx + 1) % session->num_peers; - while (i != last) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); - session->info[i].is_outgoing = GNUNET_YES; - embrace_peer (&session->info[i]); - i = (i + 1) % session->num_peers; - } - // tie-breaker for even number of peers - if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); - session->info[last].is_outgoing = GNUNET_YES; - embrace_peer (&session->info[last]); - } - - for (i = 0; i < session->num_peers; i++) - { - if (GNUNET_NO == session->info[i].is_outgoing) - replay_premature_message (&session->info[i]); - } -} - - -/** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -send_client_elements_iter (void *cls, - const struct GNUNET_HashCode * key, - void *value) -{ - struct ConsensusSession *session = cls; - struct ElementInfo *ei = value; - struct PendingMessage *pm; - - /* is the client still there? */ - if (NULL == session->scss.client) - return GNUNET_NO; - - pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size, - GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); - message_queue_add (session->client_mq, pm); - return GNUNET_YES; -} + unsigned int remote_idx = cpi - cpi->session->info; + unsigned int local_idx = cpi->session->local_peer_idx; + GNUNET_assert ((cpi == cpi->session->partner_outgoing) || + (cpi == cpi->session->partner_incoming)); - -/** - * Start the next round. - * This function can be invoked as a timeout task, or called manually (tc will be NULL then). - * - * @param cls the session - * @param tc task context, for when this task is invoked by the scheduler, - * NULL if invoked for another reason - */ -static void -round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct ConsensusSession *session; - - /* don't kick off next round if we're shutting down */ - if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - - session = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); - - if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) - { - GNUNET_SCHEDULER_cancel (session->round_timeout_tid); - session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; - } - - switch (session->current_round) + switch (status) { - case CONSENSUS_ROUND_BEGIN: - session->current_round = CONSENSUS_ROUND_EXCHANGE; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_EXCHANGE: - /* handle two peers specially */ - if (session->num_peers <= 2) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); - GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); - send_client_conclude_done (session); - session->current_round = CONSENSUS_ROUND_FINISH; - return; - } - session->current_round = CONSENSUS_ROUND_INVENTORY; - start_inventory (session); - break; - case CONSENSUS_ROUND_INVENTORY: - session->current_round = CONSENSUS_ROUND_COMPLETION; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_COMPLETION: - session->current_round = CONSENSUS_ROUND_FINISH; - send_client_conclude_done (session); + case GNUNET_SET_STATUS_OK: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n", + local_idx, remote_idx); break; - default: - GNUNET_assert (0); - } -} - - -static void -fin_sent_cb (void *cls) -{ - struct ConsensusPeerInformation *cpi; - cpi = cls; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); - switch (cpi->session->current_round) - { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_COMPLETION: - if (cpi->session->current_round != cpi->apparent_round) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); - break; - } + case GNUNET_SET_STATUS_FAILURE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n", + local_idx, remote_idx); + cpi->set_op = NULL; + return; + case GNUNET_SET_STATUS_HALF_DONE: + case GNUNET_SET_STATUS_DONE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n", + local_idx, remote_idx); cpi->exp_subround_finished = GNUNET_YES; - /* the subround is only really over if *both* partners are done */ - if (GNUNET_YES == exp_subround_finished (cpi->session)) + cpi->set_op = NULL; + if (have_exp_subround_finished (cpi->session) == GNUNET_YES) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n", + local_idx); subround_over (cpi->session, NULL); + } else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); - break; - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) - round_over (cpi->session, NULL); - /* FIXME: maybe go to next round */ - break; - default: - GNUNET_break (0); - } -} - - -/** - * The other peer wants us to inform that he sent us all the elements we requested. - */ -static int -handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) -{ - struct ConsensusRoundMessage *round_msg; - round_msg = (struct ConsensusRoundMessage *) msg; - /* FIXME: only call subround_over if round is the current one! */ - switch (cpi->session->current_round) - { - case CONSENSUS_ROUND_EXCHANGE: - case CONSENSUS_ROUND_COMPLETION: - if (cpi->session->current_round != round_msg->round) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->ibf_state = IBF_STATE_NONE; - cpi->ibf_bucket_counter = 0; - break; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n", + local_idx); } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - cpi->exp_subround_finished = GNUNET_YES; - if (GNUNET_YES == exp_subround_finished (cpi->session)) - subround_over (cpi->session, NULL); - else - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); - break; - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); - if (inventory_round_finished (cpi->session)) - round_over (cpi->session, NULL); - break; + return; default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); - break; + GNUNET_break (0); + return; } - return GNUNET_YES; -} - - -/** - * Gets called when the other peer wants us to inform that - * it has decoded our ibf and sent us all elements / requests - */ -static int -handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) -{ - struct PendingMessage *pm; - struct ConsensusRoundMessage *fin_msg; - /* FIXME: why handle current round?? */ switch (cpi->session->current_round) { - case CONSENSUS_ROUND_INVENTORY: - cpi->inventory_synced = GNUNET_YES; - case CONSENSUS_ROUND_COMPLETION: case CONSENSUS_ROUND_EXCHANGE: - LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n"); - pm = new_pending_message (sizeof *fin_msg, - GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); - fin_msg = (struct ConsensusRoundMessage *) pm->msg; - fin_msg->round = cpi->apparent_round; - /* the subround is over once we kicked off sending the fin msg */ - /* FIXME: assert we are talking to the right peer! */ - /* FIXME: mark peer as synced */ - pm->sent_cb = fin_sent_cb; - pm->sent_cb_cls = cpi; - message_queue_add (cpi->mss.mq, pm); + GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL); break; default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); - break; - } - return GNUNET_YES; -} - - -/** - * Functions with this signature are called whenever a - * complete message is received by the tokenizer. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure - * @param client identification of the client - * @param message the actual message - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct ConsensusPeerInformation *cpi = cls; - GNUNET_assert (NULL == client); - GNUNET_assert (NULL != cls); - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: - return handle_p2p_strata (cpi, (struct StrataMessage *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: - return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: - return handle_p2p_element (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT: - return handle_p2p_element_report (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: - return handle_p2p_element_request (cpi, (struct ElementRequest *) message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED: - return handle_p2p_synced (cpi, message); - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN: - return handle_p2p_fin (cpi, message); - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n", - ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey)); - } - return GNUNET_OK; -} - - -static void -shuffle (struct ConsensusSession *session) -{ - /* adapted from random_permute in util/crypto_random.c */ - /* FIXME - unsigned int *ret; - unsigned int i; - unsigned int tmp; - uint32_t x; - - GNUNET_assert (n > 0); - ret = GNUNET_malloc (n * sizeof (unsigned int)); - for (i = 0; i < n; i++) - ret[i] = i; - for (i = n - 1; i > 0; i--) - { - x = GNUNET_CRYPTO_random_u32 (mode, i + 1); - tmp = ret[x]; - ret[x] = ret[i]; - ret[i] = tmp; + GNUNET_break (0); + return; } - */ } /** - * Find and set the partner_incoming and partner_outgoing of our peer, - * one of them may not exist in most cases. + * Compare the round the session is in with the round of the given context message. * - * @param session the consensus session + * @param session a consensus session + * @param ri a round context message + * @return 0 if it's the same round, -1 if the session is in an earlier round, + * 1 if the session is in a later round */ -static void -find_partners (struct ConsensusSession *session) -{ - int mark[session->num_peers]; - int i; - memset (mark, 0, session->num_peers * sizeof (int)); - session->partner_incoming = session->partner_outgoing = NULL; - for (i = 0; i < session->num_peers; i++) - { - int arc; - if (0 != mark[i]) - continue; - arc = (i + (1 << session->exp_subround)) % session->num_peers; - mark[i] = mark[arc] = 1; - GNUNET_assert (i != arc); - if (i == session->local_peer_idx) - { - GNUNET_assert (NULL == session->partner_outgoing); - session->partner_outgoing = &session->info[session->shuffle[arc]]; - session->partner_outgoing->exp_subround_finished = GNUNET_NO; - } - if (arc == session->local_peer_idx) - { - GNUNET_assert (NULL == session->partner_incoming); - session->partner_incoming = &session->info[session->shuffle[i]]; - session->partner_incoming->exp_subround_finished = GNUNET_NO; - } +static int +rounds_compare (struct ConsensusSession *session, + struct RoundInfo* ri) +{ + if (session->current_round < ri->round) + return -1; + if (session->current_round > ri->round) + return 1; + if (session->current_round == CONSENSUS_ROUND_EXCHANGE) + { + if (session->exp_round < ri->exp_round) + return -1; + if (session->exp_round > ri->exp_round) + return 1; + if (session->exp_subround < ri->exp_subround) + return -1; + if (session->exp_subround < ri->exp_subround) + return 1; + return 0; } + /* comparing rounds when we are not in a exp round */ + GNUNET_assert (0); } @@ -1905,17 +674,18 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) return; session = cls; /* cancel timeout */ - if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) + if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK) + { GNUNET_SCHEDULER_cancel (session->round_timeout_tid); - session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; - /* check if we are done with the log phase, 2-peer consensus only does one log round */ - if ( (session->exp_round == NUM_EXP_ROUNDS) || - ((session->num_peers == 2) && (session->exp_round == 1))) + session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; + } + + if (session->exp_round >= NUM_EXP_ROUNDS) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); round_over (session, NULL); return; } + if (session->exp_round == 0) { /* initialize everything for the log-rounds */ @@ -1923,8 +693,10 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) session->exp_subround = 0; if (NULL == session->shuffle) session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); + if (NULL == session->shuffle_inv) + session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers); for (i = 0; i < session->num_peers; i++) - session->shuffle[i] = i; + session->shuffle[i] = session->shuffle_inv[i] = i; } else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) { @@ -1933,13 +705,68 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) session->exp_subround = 0; shuffle (session); } - else + else { session->exp_subround++; } + /* determine the incoming and outgoing partner */ find_partners (session); + GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]); + GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]); + + /* initiate set operation with the outgoing partner */ + if (NULL != session->partner_outgoing) + { + struct GNUNET_CONSENSUS_RoundContextMessage *msg; + msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); + msg->header.size = htons (sizeof *msg); + msg->round = htonl (session->current_round); + msg->exp_round = htonl (session->exp_round); + msg->exp_subround = htonl (session->exp_subround); + + if (NULL != session->partner_outgoing->set_op) + { + GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); + } + session->partner_outgoing->set_op = + GNUNET_SET_prepare (&session->partner_outgoing->peer_id, + &session->global_id, + (struct GNUNET_MessageHeader *) msg, + 0, /* FIXME: salt */ + GNUNET_SET_RESULT_ADDED, + set_result_cb, session->partner_outgoing); + GNUNET_free (msg); + GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set); + } + + /* commit to the delayed set operation */ + if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op)) + { + int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info); + + if (NULL != session->partner_incoming->set_op) + { + GNUNET_SET_operation_cancel (session->partner_incoming->set_op); + session->partner_incoming->set_op = NULL; + } + if (cmp == 0) + { + GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set); + session->partner_incoming->set_op = session->partner_incoming->delayed_set_op; + session->partner_incoming->delayed_set_op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n", + session->local_peer_idx, (int) (session->partner_incoming - session->info)); + } + else + { + /* this should not happen -- a round has been skipped! */ + GNUNET_break_op (0); + } + } + #ifdef GNUNET_EXTRA_LOGGING { int in; @@ -1952,34 +779,11 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) in = -1; else in = (int) (session->partner_incoming - session->info); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, session->exp_round, session->exp_subround, in, out); } #endif /* GNUNET_EXTRA_LOGGING */ - if (NULL != session->partner_incoming) - { - session->partner_incoming->ibf_state = IBF_STATE_NONE; - session->partner_incoming->exp_subround_finished = GNUNET_NO; - session->partner_incoming->ibf_bucket_counter = 0; - - /* maybe there's an early strata estimator? */ - replay_premature_message (session->partner_incoming); - } - - if (NULL != session->partner_outgoing) - { - session->partner_outgoing->ibf_state = IBF_STATE_NONE; - session->partner_outgoing->ibf_bucket_counter = 0; - session->partner_outgoing->exp_subround_finished = GNUNET_NO; - /* make sure peer is connected and send the SE */ - embrace_peer (session->partner_outgoing); - } - - /* - session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), - subround_over, session); - */ } @@ -2001,146 +805,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess } -/** - * Handle a HELLO-message, send when another peer wants to join a session where - * our peer is a member. The session may or may not be inhabited yet. - */ -static int -handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) -{ - struct ConsensusSession *session; - - if (NULL != inc->requested_gid) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n"); - return GNUNET_YES; - } - if (NULL != inc->cpi) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n"); - return GNUNET_YES; - } - - for (session = sessions_head; NULL != session; session = session->next) - { - int idx; - struct ConsensusPeerInformation *cpi; - if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) - continue; - idx = get_peer_idx (&inc->peer_id, session); - GNUNET_assert (-1 != idx); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); - cpi = &session->info[idx]; - inc->cpi = cpi; - cpi->mss = inc->mss; - cpi = &session->info[idx]; - cpi->hello = GNUNET_YES; - cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); - embrace_peer (cpi); - return GNUNET_YES; - } - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); - inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode)); - return GNUNET_YES; -} - - - -/** - * Handle tokenized messages from stream sockets. - * Delegate them if the socket belongs to a session, - * handle hello messages otherwise. - * - * Do not call GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure, unused - * @param client incoming socket this message comes from - * @param message the actual message - * - * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing - */ -static int -mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) -{ - struct IncomingSocket *inc; - GNUNET_assert (NULL == client); - GNUNET_assert (NULL != cls); - inc = (struct IncomingSocket *) cls; - switch (ntohs( message->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: - return handle_p2p_hello (inc, (struct ConsensusHello *) message); - default: - if (NULL != inc->cpi) - return mst_session_callback (inc->cpi, client, message); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n", - ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey)); - } - return GNUNET_OK; -} - - -/** - * Functions of this type are called upon new stream connection from other peers - * or upon binding error which happen when the app_port given in - * GNUNET_STREAM_listen() is already taken. - * - * @param cls the closure from GNUNET_STREAM_listen - * @param socket the socket representing the stream; NULL on binding error - * @param initiator the identity of the peer who wants to establish a stream - * with us; NULL on binding error - * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the - * stream (the socket will be invalid after the call) - */ -static int -listen_cb (void *cls, - struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_PeerIdentity *initiator) -{ - struct IncomingSocket *incoming; - - if (NULL == socket) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - incoming = GNUNET_malloc (sizeof *incoming); - incoming->peer_id = *initiator; - incoming->mss.socket = socket; - incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &stream_data_processor, &incoming->mss); - incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); - incoming->mss.mst_cls = incoming; - GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); - return GNUNET_OK; -} - - -/** - * Disconnect a client, and destroy all sessions associated with it. - * - * @param client the client to disconnect - */ -static void -disconnect_client (struct GNUNET_SERVER_Client *client) -{ - struct ConsensusSession *session; - GNUNET_SERVER_client_disconnect (client); - - /* if the client owns a session, remove it */ - session = sessions_head; - while (NULL != session) - { - if (client == session->scss.client) - { - destroy_session (session); - break; - } - session = session->next; - } -} - - /** * Compute a global, (hopefully) unique consensus session id, * from the local id of the consensus session, and the identities of all participants. @@ -2151,15 +815,20 @@ disconnect_client (struct GNUNET_SERVER_Client *client) * @param session_id local id of the consensus session */ static void -compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id) +compute_global_id (struct ConsensusSession *session, + const struct GNUNET_HashCode *session_id) { int i; struct GNUNET_HashCode tmp; + struct GNUNET_HashCode phash; + + /* FIXME: use kdf? */ session->global_id = *session_id; for (i = 0; i < session->num_peers; ++i) { - GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp); + GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash); + GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp); session->global_id = tmp; GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp); session->global_id = tmp; @@ -2188,7 +857,8 @@ hash_cmp (const void *h1, const void *h2) * add the local peer if not in the join message. */ static void -initialize_session_peer_list (struct ConsensusSession *session) +initialize_session_peer_list (struct ConsensusSession *session, + struct GNUNET_CONSENSUS_JoinMessage *join_msg) { unsigned int local_peer_in_list; uint32_t listed_peers; @@ -2196,19 +866,19 @@ initialize_session_peer_list (struct ConsensusSession *session) struct GNUNET_PeerIdentity *peers; unsigned int i; - GNUNET_assert (NULL != session->join_msg); + GNUNET_assert (NULL != join_msg); /* peers in the join message, may or may not include the local peer */ - listed_peers = ntohl (session->join_msg->num_peers); - + listed_peers = ntohl (join_msg->num_peers); + session->num_peers = listed_peers; - msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1]; + msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; local_peer_in_list = GNUNET_NO; for (i = 0; i < listed_peers; i++) { - if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) { local_peer_in_list = GNUNET_YES; break; @@ -2221,7 +891,7 @@ initialize_session_peer_list (struct ConsensusSession *session) peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); if (GNUNET_NO == local_peer_in_list) - peers[session->num_peers - 1] = *my_peer; + peers[session->num_peers - 1] = my_peer; memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); @@ -2236,37 +906,104 @@ initialize_session_peer_list (struct ConsensusSession *session) session->info[i].peer_id = peers[i]; } - free (peers); + GNUNET_free (peers); } /** - * Add incoming peer connections to the session, - * for peers who have connected to us before the local session has been established + * Called when another peer wants to do a set operation with the + * local peer. * - * @param session ... + * @param cls closure + * @param other_peer the other peer + * @param context_msg message with application specific information from + * the other peer + * @param request request from the other peer, use GNUNET_SET_accept + * to accept it, otherwise the request will be refused + * Note that we don't use a return value here, as it is also + * necessary to specify the set we want to do the operation with, + * whith sometimes can be derived from the context message. + * Also necessary to specify the timeout. */ static void -add_incoming_peers (struct ConsensusSession *session) +set_listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SET_Request *request) { - struct IncomingSocket *inc; - int i; + struct ConsensusSession *session = cls; + struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; struct ConsensusPeerInformation *cpi; + struct GNUNET_SET_OperationHandle *set_op; + struct RoundInfo round_info; + int index; + int cmp; - for (inc = incoming_sockets_head; NULL != inc; inc = inc->next) + if (NULL == context_msg) { - if ( (NULL == inc->requested_gid) || - (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) ) - continue; - for (i = 0; i < session->num_peers; i++) - { - cpi = &session->info[i]; - cpi->peer_id = inc->peer_id; - cpi->mss = inc->mss; - cpi->hello = GNUNET_YES; - inc->cpi = cpi; + GNUNET_break_op (0); + return; + } + + index = get_peer_idx (other_peer, session); + + if (index < 0) + { + GNUNET_break_op (0); + return; + } + + round_info.round = ntohl (msg->round); + round_info.exp_round = ntohl (msg->exp_round); + round_info.exp_subround = ntohl (msg->exp_subround); + + cpi = &session->info[index]; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index); + + switch (session->current_round) + { + case CONSENSUS_ROUND_BEGIN: + /* we're in the begin round, so requests for the exchange round may + * come in, they will be delayed for now! */ + case CONSENSUS_ROUND_EXCHANGE: + cmp = rounds_compare (session, &round_info); + if (cmp > 0) + { + /* the other peer is too late */ + GNUNET_break_op (0); + return; + } + /* kill old request, if any. this is legal, + * as the other peer would not make a new request if it would want to + * complete the old one! */ + if (NULL != cpi->set_op) + { + GNUNET_SET_operation_cancel (cpi->set_op); + cpi->set_op = NULL; + } + set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, + set_result_cb, &session->info[index]); + if (cmp == 0) + { + cpi->set_op = set_op; + GNUNET_SET_commit (set_op, session->element_set); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index); + } + else + { + /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */ + cpi->delayed_set_op = set_op; + cpi->delayed_round_info = round_info; + cpi->exp_subround_finished = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index); + } break; - } + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n", + session->local_peer_idx, session->current_round, index); + GNUNET_break_op (0); + return; } } @@ -2275,45 +1012,66 @@ add_incoming_peers (struct ConsensusSession *session) * Initialize the session, continue receiving messages from the owning client * * @param session the session to initialize + * @param join_msg the join message from the client */ static void -initialize_session (struct ConsensusSession *session) +initialize_session (struct ConsensusSession *session, + struct GNUNET_CONSENSUS_JoinMessage *join_msg) { struct ConsensusSession *other_session; - GNUNET_assert (NULL != session->join_msg); - initialize_session_peer_list (session); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); - compute_global_id (session, &session->join_msg->session_id); + initialize_session_peer_list (session, join_msg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); + compute_global_id (session, &join_msg->session_id); - /* Check if some local client already owns the session. */ + /* check if some local client already owns the session. + * it is only legal to have a session with an existing global id + * if all other sessions with this global id are finished.*/ other_session = sessions_head; while (NULL != other_session) { - if ((other_session != session) && + if ((other_session != session) && (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) { - if (GNUNET_NO == other_session->conclude) + if (CONSENSUS_ROUND_FINISH != other_session->current_round) { GNUNET_break (0); destroy_session (session); return; } - GNUNET_SERVER_client_drop (other_session->scss.client); - other_session->scss.client = NULL; break; } other_session = other_session->next; } - session->local_peer_idx = get_peer_idx (my_peer, session); + session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline); + session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start); + + session->local_peer_idx = get_peer_idx (&my_peer, session); GNUNET_assert (-1 != session->local_peer_idx); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); - GNUNET_free (session->join_msg); - session->join_msg = NULL; - add_incoming_peers (session); - GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); + session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); + GNUNET_assert (NULL != session->element_set); + session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, + &session->global_id, + set_listen_cb, session); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id)); +} + + +static struct ConsensusSession * +get_session_by_client (struct GNUNET_SERVER_Client *client) +{ + struct ConsensusSession *session; + + session = sessions_head; + while (NULL != session) + { + if (session->client == client) + return session; + session = session->next; + } + return NULL; } @@ -2331,45 +1089,26 @@ client_join (void *cls, { struct ConsensusSession *session; - // make sure the client has not already joined a session - session = sessions_head; - while (NULL != session) - { - if (session->scss.client == client) - { - GNUNET_break (0); - disconnect_client (client); - return; - } - session = session->next; - } - - session = GNUNET_new (struct ConsensusSession); - session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); - /* these have to be initialized here, as the client can already start to give us values */ - session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); - session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); - session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); - session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); - session->scss.client = client; - session->client_mq = create_message_queue_for_server_client (&session->scss); - GNUNET_SERVER_client_keep (client); - - GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n"); - // Initialize session later if local peer identity is not known yet. - if (NULL == my_peer) + session = get_session_by_client (client); + if (NULL != session) { - GNUNET_SERVER_disable_receive_done_warning (client); + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + session = GNUNET_new (struct ConsensusSession); + session->client = client; + session->client_mq = GNUNET_MQ_queue_for_server_client (client); + GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); + initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); + GNUNET_SERVER_receive_done (client, GNUNET_OK); - initialize_session (session); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n"); } - - /** * Called when a client performs an insert operation. * @@ -2379,39 +1118,48 @@ client_join (void *cls, */ void client_insert (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) { struct ConsensusSession *session; struct GNUNET_CONSENSUS_ElementMessage *msg; - struct GNUNET_CONSENSUS_Element *element; - int element_size; + struct GNUNET_SET_Element *element; + ssize_t element_size; - session = sessions_head; - while (NULL != session) + session = get_session_by_client (client); + + if (NULL == session) { - if (session->scss.client == client) - break; + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; } - if (NULL == session) + if (CONSENSUS_ROUND_BEGIN != session->current_round) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n"); + GNUNET_break (0); GNUNET_SERVER_client_disconnect (client); return; } msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; - element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); - element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); + element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + if (element_size < 0) + { + GNUNET_break (0); + return; + } + + element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); element->type = msg->element_type; element->size = element_size; memcpy (&element[1], &msg[1], element_size); element->data = &element[1]; - GNUNET_assert (NULL != element->data); - insert_element (session, element); - + GNUNET_SET_add_element (session->element_set, element, NULL, NULL); + GNUNET_free (element); GNUNET_SERVER_receive_done (client, GNUNET_OK); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx); } @@ -2428,13 +1176,9 @@ client_conclude (void *cls, const struct GNUNET_MessageHeader *message) { struct ConsensusSession *session; - struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; - - cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; - session = sessions_head; - while ((session != NULL) && (session->scss.client != client)) - session = session->next; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n"); + session = get_session_by_client (client); if (NULL == session) { /* client not found */ @@ -2442,70 +1186,28 @@ client_conclude (void *cls, GNUNET_SERVER_client_disconnect (client); return; } - if (CONSENSUS_ROUND_BEGIN != session->current_round) { /* client requested conclude twice */ GNUNET_break (0); - /* client may still own a session, destroy it */ - disconnect_client (client); return; } - - session->conclude = GNUNET_YES; - if (session->num_peers <= 1) { - send_client_conclude_done (session); + session->current_round = CONSENSUS_ROUND_FINISH; + GNUNET_SET_iterate (session->element_set, send_to_client_iter, session); } else { - session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); - /* the 'begin' round is over, start with the next, real round */ + /* the 'begin' round is over, start with the next, actual round */ round_over (session, NULL); } + GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round); GNUNET_SERVER_receive_done (client, GNUNET_OK); } -/** - * Task that disconnects from core. - * - * @param cls core handle - * @param tc context information (why was this task triggered now) - */ -static void -disconnect_core (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - if (core != NULL) - { - GNUNET_CORE_disconnect (core); - core = NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); -} - - -static void -core_startup (void *cls, - struct GNUNET_CORE_Handle *core, - const struct GNUNET_PeerIdentity *peer) -{ - struct ConsensusSession *session; - - my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); - /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ - GNUNET_SCHEDULER_add_now (&disconnect_core, core); - GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); - /* initialize sessions that are waiting for the local peer identity */ - for (session = sessions_head; NULL != session; session = session->next) - if (NULL != session->join_msg) - initialize_session (session); -} - - /** * Called to clean up, after a shutdown has been requested. * @@ -2516,37 +1218,33 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - while (NULL != incoming_sockets_head) - { - struct IncomingSocket *socket; - socket = incoming_sockets_head; - if (NULL == socket->cpi) - clear_message_stream_state (&socket->mss); - incoming_sockets_head = incoming_sockets_head->next; - GNUNET_free (socket); - } - while (NULL != sessions_head) - { - struct ConsensusSession *session; - session = sessions_head->next; destroy_session (sessions_head); - sessions_head = session; - } - if (NULL != core) - { - GNUNET_CORE_disconnect (core); - core = NULL; - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); +} - if (NULL != listener) - { - GNUNET_STREAM_listen_close (listener); - listener = NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); +/** + * Clean up after a client after it is + * disconnected (either by us or by itself) + * + * @param cls closure, unused + * @param client the client to clean up after + */ +void +handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + struct ConsensusSession *session; + + session = get_session_by_client (client); + if (NULL == session) + return; + if ((CONSENSUS_ROUND_BEGIN == session->current_round) || + (CONSENSUS_ROUND_FINISH == session->current_round)) + destroy_session (session); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n"); } @@ -2558,37 +1256,29 @@ shutdown_task (void *cls, * @param c configuration to use */ static void -run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) +run (void *cls, struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *c) { - /* core is only used to retrieve the peer identity */ - static const struct GNUNET_CORE_MessageHandler core_handlers[] = { - {NULL, 0, 0} - }; static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { - {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, - {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, - sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, + sizeof (struct GNUNET_MessageHeader)}, + {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, + {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, {NULL, NULL, 0, 0} }; cfg = c; srv = server; - + if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + } GNUNET_SERVER_add_handlers (server, server_handlers); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); - - listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, - &listen_cb, NULL, - GNUNET_STREAM_OPTION_END); - - /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ - core = GNUNET_CORE_connect (c, NULL, - &core_startup, NULL, - NULL, NULL, GNUNET_NO, NULL, - GNUNET_NO, core_handlers); - GNUNET_assert (NULL != core); + GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); } @@ -2605,7 +1295,7 @@ main (int argc, char *const *argv) { int ret; ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); return (GNUNET_OK == ret) ? 0 : 1; }