2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_consensus_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_stream_lib.h"
36 #include "consensus_protocol.h"
37 #include "consensus.h"
39 #include "strata_estimator.h"
43 * Log macro that prefixes the local peer and the peer we are in contact with.
45 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
46 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
50 * Number of IBFs in a strata estimator.
52 #define SE_STRATA_COUNT 32
54 * Size of the IBFs in the strata estimator.
56 #define SE_IBF_SIZE 80
58 * hash num parameter for the difference digests and strata estimators
60 #define SE_IBF_HASH_NUM 3
63 * Number of buckets that can be transmitted in one message.
65 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
68 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
69 * Choose this value so that computing the IBF is still cheaper
70 * than transmitting all values.
72 #define MAX_IBF_ORDER (16)
75 * Number of exponential rounds, used in the inventory and completion round.
77 #define NUM_EXP_ROUNDS (4)
80 /* forward declarations */
82 /* mutual recursion with struct ConsensusSession */
83 struct ConsensusPeerInformation;
87 /* mutual recursion with round_over */
89 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
91 /* mutial recursion with transmit_queued */
93 client_send_next (struct MessageQueue *mq);
95 /* mutual recursion with mst_session_callback */
97 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket);
100 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message);
104 * Additional information about a consensus element.
109 * The element itself.
111 struct GNUNET_CONSENSUS_Element *element;
113 * Hash of the element
115 struct GNUNET_HashCode *element_hash;
117 * Number of other peers that have the element in the inventory.
119 unsigned int inventory_count;
121 * Bitmap of peers that have this element in their inventory
123 uint8_t *inventory_bitmap;
128 * Describes the current round a consensus session is in.
133 * Not started the protocol yet.
135 CONSENSUS_ROUND_BEGIN=0,
137 * Distribution of elements with the exponential scheme.
139 CONSENSUS_ROUND_EXCHANGE,
141 * Exchange which elements each peer has, but not the elements.
142 * This round uses the all-to-all scheme.
144 CONSENSUS_ROUND_INVENTORY,
146 * Collect and distribute missing values with the exponential scheme.
148 CONSENSUS_ROUND_COMPLETION,
150 * Consensus concluded. After timeout and finished communication with client,
151 * consensus session will be destroyed.
153 CONSENSUS_ROUND_FINISH
156 /* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */
159 * State of another peer with respect to the
162 enum ConsensusIBFState {
164 * There is nothing going on with the IBF.
168 * We currently receive an ibf.
172 * we decode a received ibf
176 * wait for elements and element requests
178 IBF_STATE_ANTICIPATE_DIFF
182 typedef void (*AddCallback) (struct MessageQueue *mq);
183 typedef void (*MessageSentCallback) (void *cls);
187 * Collection of the state necessary to read and write gnunet messages
188 * to a stream socket. Should be used as closure for stream_data_processor.
190 struct MessageStreamState
192 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
193 struct MessageQueue *mq;
195 struct GNUNET_STREAM_Socket *socket;
196 struct GNUNET_STREAM_ReadHandle *rh;
197 struct GNUNET_STREAM_WriteHandle *wh;
201 struct ServerClientSocketState
203 struct GNUNET_SERVER_Client *client;
204 struct GNUNET_SERVER_TransmitHandle* th;
209 * Generic message queue, for queueing outgoing messages.
215 struct PendingMessage *pending_head;
216 struct PendingMessage *pending_tail;
217 struct PendingMessage *current_pm;
221 struct PendingMessage
223 struct GNUNET_MessageHeader *msg;
224 struct MessageQueue *parent_queue;
225 struct PendingMessage *next;
226 struct PendingMessage *prev;
227 MessageSentCallback sent_cb;
233 * A consensus session consists of one local client and the remote authorities.
235 struct ConsensusSession
238 * Consensus sessions are kept in a DLL.
240 struct ConsensusSession *next;
243 * Consensus sessions are kept in a DLL.
245 struct ConsensusSession *prev;
248 * Join message. Used to initialize the session later,
249 * if the identity of the local peer is not yet known.
250 * NULL if the session has been fully initialized.
252 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
255 * Global consensus identification, computed
256 * from the session id and participating authorities.
258 struct GNUNET_HashCode global_id;
261 * The server's client and associated local state
263 struct ServerClientSocketState scss;
266 * Queued messages to the client.
268 struct MessageQueue *client_mq;
271 * IBF_Key -> 2^(HashCode*)
273 * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *.
275 struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map;
278 * Maps HashCodes to ElementInfos
280 struct GNUNET_CONTAINER_MultiHashMap *values;
283 * Currently active transmit handle for sending to the client
285 struct GNUNET_SERVER_TransmitHandle *client_th;
288 * Timeout for all rounds together, single rounds will schedule a timeout task
289 * with a fraction of the conclude timeout.
291 struct GNUNET_TIME_Relative conclude_timeout;
294 * Timeout task identifier for the current round
296 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
299 * Number of other peers in the consensus
301 unsigned int num_peers;
304 * Information about the other peers,
307 struct ConsensusPeerInformation *info;
310 * GNUNET_YES if the client has called conclude.
315 * Index of the local peer in the peers array
317 unsigned int local_peer_idx;
320 * Strata estimator, computed online
322 struct StrataEstimator *se;
327 struct InvertibleBloomFilter **ibfs;
332 enum ConsensusRound current_round;
335 * Permutation of peers for the current round,
336 * maps logical index (for current round) to physical index (location in info array)
345 * The partner for the current exp-round
347 struct ConsensusPeerInformation* partner_outgoing;
350 * The partner for the current exp-round
352 struct ConsensusPeerInformation* partner_incoming;
357 * Information about a peer that is in a consensus session.
359 struct ConsensusPeerInformation
362 * Peer identitty of the peer in the consensus session
364 struct GNUNET_PeerIdentity peer_id;
367 * Do we connect to the peer, or does the peer connect to us?
368 * Only valid for all-to-all phases
373 * Did we receive/send a consensus hello?
380 struct MessageStreamState mss;
385 enum ConsensusIBFState ibf_state;
388 * What is the order (=log2 size) of the ibf
389 * we're currently dealing with?
390 * Interpretation depends on ibf_state.
395 * The current IBF for this peer,
396 * purpose dependent on ibf_state
398 struct InvertibleBloomFilter *ibf;
401 * How many buckets have we transmitted/received?
402 * Interpretatin depends on ibf_state
404 int ibf_bucket_counter;
407 * Strata estimator of the peer, NULL if our peer
408 * initiated the reconciliation.
410 struct StrataEstimator *se;
413 * Back-reference to the consensus session,
414 * to that ConsensusPeerInformation can be used as a closure
416 struct ConsensusSession *session;
419 * True if we are actually replaying the strata message,
420 * e.g. currently handling the premature_strata_message.
422 int replaying_strata_message;
425 * A strata message that is not actually for the current round,
426 * used in the exp-scheme.
428 struct StrataMessage *premature_strata_message;
431 * We have finishes the exp-subround with the peer.
433 int exp_subround_finished;
436 * GNUNET_YES if we synced inventory with this peer;
437 * GNUNET_NO otherwise.
439 int inventory_synced;
442 * Round this peer seems to be in, according to the last SE we got.
443 * Necessary to store this, as we sometimes need to respond to a request from an
444 * older round, while we are already in the next round.
446 enum ConsensusRound apparent_round;
451 * Sockets from other peers who want to communicate with us.
452 * It may not be known yet which consensus session they belong to, we have to wait for the
454 * Also, the session might not exist yet locally, we have to wait for a local client to connect.
456 struct IncomingSocket
459 * Incoming sockets are kept in a double linked list.
461 struct IncomingSocket *next;
464 * Incoming sockets are kept in a double linked list.
466 struct IncomingSocket *prev;
469 * Peer that connected to us with the socket.
471 struct GNUNET_PeerIdentity peer_id;
474 * Peer-in-session this socket belongs to, once known, otherwise NULL.
476 struct ConsensusPeerInformation *cpi;
479 * Set to the global session id, if the peer sent us a hello-message,
480 * but the session does not exist yet.
482 struct GNUNET_HashCode *requested_gid;
485 * Timeout, will disconnect the socket if not yet in a session.
488 GNUNET_SCHEDULER_TaskIdentifier timeout;
491 struct MessageStreamState mss;
496 * Linked list of incoming sockets.
498 static struct IncomingSocket *incoming_sockets_head;
501 * Linked list of incoming sockets.
503 static struct IncomingSocket *incoming_sockets_tail;
506 * Linked list of sessions this peer participates in.
508 static struct ConsensusSession *sessions_head;
511 * Linked list of sessions this peer participates in.
513 static struct ConsensusSession *sessions_tail;
516 * Configuration of the consensus service.
518 static const struct GNUNET_CONFIGURATION_Handle *cfg;
521 * Handle to the server for this service.
523 static struct GNUNET_SERVER_Handle *srv;
526 * Peer that runs this service.
528 static struct GNUNET_PeerIdentity *my_peer;
531 * Handle to the core service. Only used during service startup, will be NULL after that.
533 static struct GNUNET_CORE_Handle *core;
536 * Listener for sockets from peers that want to reconcile with us.
538 static struct GNUNET_STREAM_ListenSocket *listener;
542 * Transmit a queued message to the session's client.
544 * @param cls consensus session
545 * @param size number of bytes available in buf
546 * @param buf where the callee should write the message
547 * @return number of bytes written to buf
550 transmit_queued (void *cls, size_t size,
553 struct MessageQueue *mq = cls;
554 struct PendingMessage *pm = mq->pending_head;
555 struct ServerClientSocketState *state = mq->state;
558 GNUNET_assert (NULL != pm);
559 GNUNET_assert (NULL != buf);
560 msg_size = ntohs (pm->msg->size);
561 GNUNET_assert (size >= msg_size);
562 memcpy (buf, pm->msg, msg_size);
563 GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
565 client_send_next (cls);
572 client_send_next (struct MessageQueue *mq)
574 struct ServerClientSocketState *state = mq->state;
577 GNUNET_assert (NULL != state);
579 if ( (NULL != state->th) ||
580 (NULL == mq->pending_head) )
582 msize = ntohs (mq->pending_head->msg->size);
584 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
585 GNUNET_TIME_UNIT_FOREVER_REL,
586 &transmit_queued, mq);
590 struct MessageQueue *
591 create_message_queue_for_server_client (struct ServerClientSocketState *scss)
593 struct MessageQueue *mq;
594 mq = GNUNET_new (struct MessageQueue);
595 mq->add_cb = client_send_next;
602 * Functions of this signature are called whenever writing operations
603 * on a stream are executed
605 * @param cls the closure from GNUNET_STREAM_write
606 * @param status the status of the stream at the time this function is called;
607 * GNUNET_STREAM_OK if writing to stream was completed successfully;
608 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
609 * (this doesn't mean that the data is never sent, the receiver may
610 * have read the data but its ACKs may have been lost);
611 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
612 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
614 * @param size the number of bytes written
617 write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
619 struct MessageQueue *mq = cls;
620 struct MessageStreamState *mss = mq->state;
621 struct PendingMessage *pm;
623 GNUNET_assert (GNUNET_STREAM_OK == status);
625 /* call cb for message we finished sending */
629 if (NULL != pm->sent_cb)
630 pm->sent_cb (pm->sent_cb_cls);
636 pm = mq->pending_head;
640 GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
641 mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size),
642 GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls);
643 GNUNET_assert (NULL != mss->wh);
648 stream_socket_add_cb (struct MessageQueue *mq)
650 if (NULL != mq->current_pm)
652 write_queued (mq, GNUNET_STREAM_OK, 0);
656 struct MessageQueue *
657 create_message_queue_for_stream_socket (struct MessageStreamState *mss)
659 struct MessageQueue *mq;
660 mq = GNUNET_new (struct MessageQueue);
662 mq->add_cb = stream_socket_add_cb;
667 struct PendingMessage *
668 new_pending_message (uint16_t size, uint16_t type)
670 struct PendingMessage *pm;
671 pm = GNUNET_malloc (sizeof *pm + size);
672 pm->msg = (void *) &pm[1];
673 pm->msg->size = htons (size);
674 pm->msg->type = htons (type);
680 * Queue a message in a message queue.
682 * @param queue the message queue
683 * @param pending message, message with additional information
686 message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg)
688 GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg);
689 queue->add_cb (queue);
694 * Called when we receive data from a peer via stream.
696 * @param cls the closure from GNUNET_STREAM_read
697 * @param status the status of the stream at the time this function is called
698 * @param data traffic from the other side
699 * @param size the number of bytes available in data read; will be 0 on timeout
700 * @return number of bytes of processed from 'data' (any data remaining should be
701 * given to the next time the read processor is called).
704 stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size)
706 struct MessageStreamState *mss = cls;
711 if (GNUNET_STREAM_OK != status)
713 /* FIXME: handle this correctly */
717 GNUNET_assert (NULL != mss->mst);
718 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES);
719 if (GNUNET_SYSERR == ret)
721 /* FIXME: handle this correctly */
726 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss);
727 /* we always read all data */
733 * Send element or element report to the peer specified in cpi.
735 * @param cpi peer to send the elements to
736 * @param head head of the element list
739 send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e)
741 struct PendingMessage *pm;
743 switch (cpi->apparent_round)
745 case CONSENSUS_ROUND_COMPLETION:
746 case CONSENSUS_ROUND_EXCHANGE:
747 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size,
748 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
749 memcpy (&pm->msg[1], e->element->data, e->element->size);
750 message_queue_add (cpi->mss.mq, pm);
752 case CONSENSUS_ROUND_INVENTORY:
753 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode),
754 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
755 memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode));
756 message_queue_add (cpi->mss.mq, pm);
765 * Iterator to insert values into an ibf.
768 * @param key current key code
769 * @param value value in the hash map
770 * @return GNUNET_YES if we should continue to
775 ibf_values_iterator (void *cls,
776 const struct GNUNET_HashCode *key,
779 struct ConsensusPeerInformation *cpi = cls;
780 struct ElementInfo *e = value;
781 struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash);
783 GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
784 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
789 * Create and populate an IBF for the specified peer,
790 * if it does not already exist.
792 * @param cpi peer to create the ibf for
795 prepare_ibf (struct ConsensusPeerInformation *cpi)
797 if (NULL != cpi->session->ibfs[cpi->ibf_order])
799 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
800 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
805 * Called when a remote peer wants to inform the local peer
806 * that the remote peer misses elements.
807 * Elements are not reconciled.
813 handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
820 exp_subround_finished (const struct ConsensusSession *session)
824 if ( (NULL != session->partner_outgoing) &&
825 (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
827 if ( (NULL != session->partner_incoming) &&
828 (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
830 if (0 == not_finished)
837 inventory_round_finished (struct ConsensusSession *session)
842 for (i = 0; i < session->num_peers; i++)
843 if (GNUNET_YES == session->info[i].inventory_synced)
845 if (finished >= (session->num_peers / 2))
852 clear_message_stream_state (struct MessageStreamState *mss)
854 if (NULL != mss->mst)
856 GNUNET_SERVER_mst_destroy (mss->mst);
861 GNUNET_STREAM_read_cancel (mss->rh);
866 GNUNET_STREAM_write_cancel (mss->wh);
869 if (NULL != mss->socket)
871 GNUNET_STREAM_close (mss->socket);
876 GNUNET_free (mss->mq);
883 * Iterator over hash map entries.
886 * @param key current key code
887 * @param value value in the hash map
888 * @return GNUNET_YES if we should continue to
893 destroy_element_info_iter (void *cls,
894 const struct GNUNET_HashCode * key,
897 struct ElementInfo *ei = value;
898 GNUNET_free (ei->element);
899 GNUNET_free (ei->element_hash);
906 * Destroy a session, free all resources associated with it.
908 * @param session the session to destroy
911 destroy_session (struct ConsensusSession *session)
915 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
916 GNUNET_SERVER_client_drop (session->scss.client);
917 session->scss.client = NULL;
918 if (NULL != session->client_mq)
920 GNUNET_free (session->client_mq);
921 session->client_mq = NULL;
923 if (NULL != session->shuffle)
925 GNUNET_free (session->shuffle);
926 session->shuffle = NULL;
928 if (NULL != session->se)
930 strata_estimator_destroy (session->se);
933 if (NULL != session->info)
935 for (i = 0; i < session->num_peers; i++)
937 struct ConsensusPeerInformation *cpi;
938 cpi = &session->info[i];
939 clear_message_stream_state (&cpi->mss);
942 strata_estimator_destroy (cpi->se);
945 if (NULL != cpi->ibf)
947 ibf_destroy (cpi->ibf);
951 GNUNET_free (session->info);
952 session->info = NULL;
954 if (NULL != session->ibfs)
956 for (i = 0; i <= MAX_IBF_ORDER; i++)
958 if (NULL != session->ibfs[i])
960 ibf_destroy (session->ibfs[i]);
961 session->ibfs[i] = NULL;
964 GNUNET_free (session->ibfs);
965 session->ibfs = NULL;
967 if (NULL != session->values)
969 GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL);
970 GNUNET_CONTAINER_multihashmap_destroy (session->values);
971 session->values = NULL;
974 if (NULL != session->ibf_key_map)
976 GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map);
977 session->ibf_key_map = NULL;
979 GNUNET_free (session);
984 send_client_conclude_done (struct ConsensusSession *session)
986 struct PendingMessage *pm;
988 /* check if client is even there anymore */
989 if (NULL == session->scss.client)
991 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader),
992 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
993 message_queue_add (session->client_mq, pm);
998 * Check if a strata message is for the current round or not
1000 * @param session session we are in
1001 * @param strata_msg the strata message to check
1002 * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise
1005 is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
1007 switch (strata_msg->round)
1009 case CONSENSUS_ROUND_COMPLETION:
1010 case CONSENSUS_ROUND_EXCHANGE:
1011 /* here, we also have to compare subrounds */
1012 if ( (strata_msg->round != session->current_round) ||
1013 (strata_msg->exp_round != session->exp_round) ||
1014 (strata_msg->exp_subround != session->exp_subround) )
1018 if (session->current_round != strata_msg->round)
1027 * Send a strata estimator.
1029 * @param cpi the peer
1032 send_strata_estimator (struct ConsensusPeerInformation *cpi)
1034 struct PendingMessage *pm;
1035 struct StrataMessage *strata_msg;
1037 /* FIXME: why is this correct? */
1038 cpi->apparent_round = cpi->session->current_round;
1039 cpi->ibf_state = IBF_STATE_NONE;
1040 cpi->ibf_bucket_counter = 0;
1042 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round);
1044 pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE),
1045 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1046 strata_msg = (struct StrataMessage *) pm->msg;
1047 strata_msg->round = cpi->session->current_round;
1048 strata_msg->exp_round = cpi->session->exp_round;
1049 strata_msg->exp_subround = cpi->session->exp_subround;
1050 strata_estimator_write (cpi->session->se, &strata_msg[1]);
1051 message_queue_add (cpi->mss.mq, pm);
1056 * Send an IBF of the order specified in cpi.
1058 * @param cpi the peer
1061 send_ibf (struct ConsensusPeerInformation *cpi)
1063 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1064 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1066 cpi->ibf_bucket_counter = 0;
1067 while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1069 unsigned int num_buckets;
1070 struct PendingMessage *pm;
1071 struct DifferenceDigest *digest;
1073 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1074 /* limit to maximum */
1075 if (num_buckets > BUCKETS_PER_MESSAGE)
1076 num_buckets = BUCKETS_PER_MESSAGE;
1078 pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE),
1079 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1080 digest = (struct DifferenceDigest *) pm->msg;
1081 digest->order = cpi->ibf_order;
1082 digest->round = cpi->apparent_round;
1083 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]);
1084 cpi->ibf_bucket_counter += num_buckets;
1085 message_queue_add (cpi->mss.mq, pm);
1087 cpi->ibf_bucket_counter = 0;
1088 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1093 * Called when a peer sends us its strata estimator.
1094 * In response, we sent out IBF of appropriate size back.
1096 * @param cpi session
1097 * @param strata_msg message
1100 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
1104 if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) &&
1105 (strata_msg->round == CONSENSUS_ROUND_INVENTORY) )
1107 /* we still have to handle this request appropriately */
1108 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
1109 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1111 else if (is_premature_strata_message (cpi->session, strata_msg))
1113 if (GNUNET_NO == cpi->replaying_strata_message)
1115 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n",
1116 strata_msg->exp_round, strata_msg->exp_subround);
1117 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header);
1122 if (NULL == cpi->se)
1123 cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
1125 cpi->apparent_round = strata_msg->round;
1127 if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
1129 LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n");
1132 strata_estimator_read (&strata_msg[1], cpi->se);
1133 GNUNET_assert (NULL != cpi->session->se);
1134 diff = strata_estimator_difference (cpi->session->se, cpi->se);
1136 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
1137 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
1139 switch (cpi->session->current_round)
1141 case CONSENSUS_ROUND_EXCHANGE:
1142 case CONSENSUS_ROUND_INVENTORY:
1143 case CONSENSUS_ROUND_COMPLETION:
1144 /* send IBF of the right size */
1146 while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) )
1148 if (cpi->ibf_order > MAX_IBF_ORDER)
1149 cpi->ibf_order = MAX_IBF_ORDER;
1150 cpi->ibf_order += 1;
1151 /* create ibf if not already pre-computed */
1153 if (NULL != cpi->ibf)
1154 ibf_destroy (cpi->ibf);
1155 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1156 cpi->ibf_bucket_counter = 0;
1160 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
1161 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1170 send_elements_iterator (void *cls,
1171 const struct GNUNET_HashCode * key,
1174 struct ConsensusPeerInformation *cpi = cls;
1175 struct ElementInfo *ei;
1176 ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value);
1179 LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n",
1180 GNUNET_h2s((struct GNUNET_HashCode *) value));
1183 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n");
1184 send_element_or_report (cpi, ei);
1190 * Decode the current diff ibf, and send elements/requests/reports/
1192 * @param cpi partner peer
1195 decode (struct ConsensusPeerInformation *cpi)
1200 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1206 res = ibf_decode (cpi->ibf, &side, &key);
1207 if (GNUNET_SYSERR == res)
1209 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1210 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1213 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1214 cpi->ibf_bucket_counter = 0;
1218 if (GNUNET_NO == res)
1220 struct PendingMessage *pm;
1221 struct ConsensusRoundMessage *rmsg;
1222 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1224 pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1225 rmsg = (struct ConsensusRoundMessage *) pm->msg;
1226 rmsg->round = cpi->apparent_round;
1227 message_queue_add (cpi->mss.mq, pm);
1232 struct GNUNET_HashCode hashcode;
1233 /* we have the element(s), send it to the other peer */
1234 ibf_hashcode_from_key (key, &hashcode);
1235 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1239 struct PendingMessage *pm;
1242 switch (cpi->apparent_round)
1244 case CONSENSUS_ROUND_COMPLETION:
1245 /* FIXME: check if we really want to request the element */
1246 case CONSENSUS_ROUND_EXCHANGE:
1247 case CONSENSUS_ROUND_INVENTORY:
1248 type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST;
1253 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key),
1255 *(struct IBF_Key *) &pm->msg[1] = key;
1256 message_queue_add (cpi->mss.mq, pm);
1263 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
1267 /* FIXME: find out if we're still expecting the same ibf! */
1269 cpi->apparent_round = cpi->session->current_round;
1270 // FIXME: check header.size >= sizeof (DD)
1271 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
1272 switch (cpi->ibf_state)
1274 case IBF_STATE_NONE:
1275 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1276 cpi->ibf_state = IBF_STATE_RECEIVING;
1277 cpi->ibf_order = digest->order;
1278 cpi->ibf_bucket_counter = 0;
1279 if (NULL != cpi->ibf)
1281 ibf_destroy (cpi->ibf);
1285 case IBF_STATE_ANTICIPATE_DIFF:
1286 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
1287 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1288 cpi->ibf_state = IBF_STATE_RECEIVING;
1289 cpi->ibf_order = digest->order;
1290 cpi->ibf_bucket_counter = 0;
1291 if (NULL != cpi->ibf)
1293 ibf_destroy (cpi->ibf);
1297 case IBF_STATE_RECEIVING:
1300 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1304 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
1306 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1310 if (NULL == cpi->ibf)
1311 cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM);
1313 ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1314 cpi->ibf_bucket_counter += num_buckets;
1316 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1318 cpi->ibf_state = IBF_STATE_DECODING;
1319 cpi->ibf_bucket_counter = 0;
1321 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
1329 * Insert an element into the consensus set of the specified session.
1330 * The element will not be copied, and freed when destroying the session.
1332 * @param session session for new element
1333 * @param element element to insert
1336 insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
1338 struct GNUNET_HashCode hash;
1339 struct ElementInfo *e;
1340 struct IBF_Key ibf_key;
1343 e = GNUNET_new (struct ElementInfo);
1344 e->element = element;
1345 e->element_hash = GNUNET_new (struct GNUNET_HashCode);
1346 GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash);
1347 ibf_key = ibf_key_from_hashcode (e->element_hash);
1348 ibf_hashcode_from_key (ibf_key, &hash);
1349 strata_estimator_insert (session->se, &hash);
1350 GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e,
1351 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1352 GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash,
1353 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1355 for (i = 0; i <= MAX_IBF_ORDER; i++)
1357 if (NULL == session->ibfs[i])
1359 ibf_insert (session->ibfs[i], ibf_key);
1365 * Handle an element that another peer sent us
1368 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
1370 struct GNUNET_CONSENSUS_Element *element;
1373 switch (cpi->session->current_round)
1375 case CONSENSUS_ROUND_COMPLETION:
1376 /* FIXME: check if we really expect the element */
1377 case CONSENSUS_ROUND_EXCHANGE:
1380 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
1384 size = ntohs (element_msg->size) - sizeof *element_msg;
1386 element = GNUNET_malloc (size + sizeof *element);
1387 element->size = size;
1388 memcpy (&element[1], &element_msg[1], size);
1389 element->data = &element[1];
1391 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n");
1393 insert_element (cpi->session, element);
1400 * Handle a request for elements.
1402 * @param cpi peer that is requesting the element
1403 * @param msg the element request message
1406 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
1408 struct GNUNET_HashCode hashcode;
1409 struct IBF_Key *ibf_key;
1412 /* element requests are allowed in every round */
1414 num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1416 ibf_key = (struct IBF_Key *) &msg[1];
1419 ibf_hashcode_from_key (*ibf_key, &hashcode);
1420 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi);
1427 is_peer_connected (struct ConsensusPeerInformation *cpi)
1429 if (NULL == cpi->mss.socket)
1436 ensure_peer_connected (struct ConsensusPeerInformation *cpi)
1438 if (NULL != cpi->mss.socket)
1440 cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
1441 open_cb, cpi, GNUNET_STREAM_OPTION_END);
1446 * If necessary, send a message to the peer, depending on the current
1450 embrace_peer (struct ConsensusPeerInformation *cpi)
1452 if (GNUNET_NO == is_peer_connected (cpi))
1454 ensure_peer_connected (cpi);
1457 if (GNUNET_NO == cpi->hello)
1459 /* FIXME: correctness of switch */
1460 switch (cpi->session->current_round)
1462 case CONSENSUS_ROUND_EXCHANGE:
1463 case CONSENSUS_ROUND_INVENTORY:
1464 if (cpi->session->partner_outgoing != cpi)
1467 case CONSENSUS_ROUND_COMPLETION:
1468 send_strata_estimator (cpi);
1476 * Called when stream has finishes writing the hello message
1479 hello_cont (void *cls)
1481 struct ConsensusPeerInformation *cpi = cls;
1483 cpi->hello = GNUNET_YES;
1489 * Called when we established a stream connection to another peer
1491 * @param cls cpi of the peer we just connected to
1492 * @param socket socket to use to communicate with the other side (read/write)
1495 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1497 struct ConsensusPeerInformation *cpi = cls;
1498 struct PendingMessage *pm;
1499 struct ConsensusHello *hello;
1501 GNUNET_assert (NULL == cpi->mss.mst);
1502 GNUNET_assert (NULL == cpi->mss.mq);
1504 cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
1505 cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1506 cpi->mss.mst_cls = cpi;
1508 pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1509 hello = (struct ConsensusHello *) pm->msg;
1510 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1511 pm->sent_cb = hello_cont;
1512 pm->sent_cb_cls = cpi;
1513 message_queue_add (cpi->mss.mq, pm);
1514 cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1515 &stream_data_processor, &cpi->mss);
1520 replay_premature_message (struct ConsensusPeerInformation *cpi)
1522 if (NULL != cpi->premature_strata_message)
1524 struct StrataMessage *sm;
1526 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
1527 sm = cpi->premature_strata_message;
1528 cpi->premature_strata_message = NULL;
1530 cpi->replaying_strata_message = GNUNET_YES;
1531 handle_p2p_strata (cpi, sm);
1532 cpi->replaying_strata_message = GNUNET_NO;
1540 * Start the inventory round, contact all peers we are supposed to contact.
1542 * @param session the current session
1545 start_inventory (struct ConsensusSession *session)
1550 for (i = 0; i < session->num_peers; i++)
1552 session->info[i].ibf_bucket_counter = 0;
1553 session->info[i].ibf_state = IBF_STATE_NONE;
1554 session->info[i].is_outgoing = GNUNET_NO;
1557 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1558 i = (session->local_peer_idx + 1) % session->num_peers;
1561 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
1562 session->info[i].is_outgoing = GNUNET_YES;
1563 embrace_peer (&session->info[i]);
1564 i = (i + 1) % session->num_peers;
1566 // tie-breaker for even number of peers
1567 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1569 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
1570 session->info[last].is_outgoing = GNUNET_YES;
1571 embrace_peer (&session->info[last]);
1574 for (i = 0; i < session->num_peers; i++)
1576 if (GNUNET_NO == session->info[i].is_outgoing)
1577 replay_premature_message (&session->info[i]);
1583 * Iterator over hash map entries.
1585 * @param cls closure
1586 * @param key current key code
1587 * @param value value in the hash map
1588 * @return GNUNET_YES if we should continue to
1593 send_client_elements_iter (void *cls,
1594 const struct GNUNET_HashCode * key,
1597 struct ConsensusSession *session = cls;
1598 struct ElementInfo *ei = value;
1599 struct PendingMessage *pm;
1601 /* is the client still there? */
1602 if (NULL == session->scss.client)
1605 pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size,
1606 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1607 message_queue_add (session->client_mq, pm);
1614 * Start the next round.
1615 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1617 * @param cls the session
1618 * @param tc task context, for when this task is invoked by the scheduler,
1619 * NULL if invoked for another reason
1622 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1624 struct ConsensusSession *session;
1626 /* don't kick off next round if we're shutting down */
1627 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1631 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
1633 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1635 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1636 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1639 switch (session->current_round)
1641 case CONSENSUS_ROUND_BEGIN:
1642 session->current_round = CONSENSUS_ROUND_EXCHANGE;
1643 session->exp_round = 0;
1644 subround_over (session, NULL);
1646 case CONSENSUS_ROUND_EXCHANGE:
1647 /* handle two peers specially */
1648 if (session->num_peers <= 2)
1650 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
1651 GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
1652 send_client_conclude_done (session);
1653 session->current_round = CONSENSUS_ROUND_FINISH;
1656 session->current_round = CONSENSUS_ROUND_INVENTORY;
1657 start_inventory (session);
1659 case CONSENSUS_ROUND_INVENTORY:
1660 session->current_round = CONSENSUS_ROUND_COMPLETION;
1661 session->exp_round = 0;
1662 subround_over (session, NULL);
1664 case CONSENSUS_ROUND_COMPLETION:
1665 session->current_round = CONSENSUS_ROUND_FINISH;
1666 send_client_conclude_done (session);
1675 fin_sent_cb (void *cls)
1677 struct ConsensusPeerInformation *cpi;
1679 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
1680 switch (cpi->session->current_round)
1682 case CONSENSUS_ROUND_EXCHANGE:
1683 case CONSENSUS_ROUND_COMPLETION:
1684 if (cpi->session->current_round != cpi->apparent_round)
1686 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
1689 cpi->exp_subround_finished = GNUNET_YES;
1690 /* the subround is only really over if *both* partners are done */
1691 if (GNUNET_YES == exp_subround_finished (cpi->session))
1692 subround_over (cpi->session, NULL);
1694 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
1696 case CONSENSUS_ROUND_INVENTORY:
1697 cpi->inventory_synced = GNUNET_YES;
1698 if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
1699 round_over (cpi->session, NULL);
1700 /* FIXME: maybe go to next round */
1709 * The other peer wants us to inform that he sent us all the elements we requested.
1712 handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1714 struct ConsensusRoundMessage *round_msg;
1715 round_msg = (struct ConsensusRoundMessage *) msg;
1716 /* FIXME: only call subround_over if round is the current one! */
1717 switch (cpi->session->current_round)
1719 case CONSENSUS_ROUND_EXCHANGE:
1720 case CONSENSUS_ROUND_COMPLETION:
1721 if (cpi->session->current_round != round_msg->round)
1723 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1724 cpi->ibf_state = IBF_STATE_NONE;
1725 cpi->ibf_bucket_counter = 0;
1728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1729 cpi->exp_subround_finished = GNUNET_YES;
1730 if (GNUNET_YES == exp_subround_finished (cpi->session))
1731 subround_over (cpi->session, NULL);
1733 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
1735 case CONSENSUS_ROUND_INVENTORY:
1736 cpi->inventory_synced = GNUNET_YES;
1737 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1738 if (inventory_round_finished (cpi->session))
1739 round_over (cpi->session, NULL);
1742 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
1750 * Gets called when the other peer wants us to inform that
1751 * it has decoded our ibf and sent us all elements / requests
1754 handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
1756 struct PendingMessage *pm;
1757 struct ConsensusRoundMessage *fin_msg;
1759 /* FIXME: why handle current round?? */
1760 switch (cpi->session->current_round)
1762 case CONSENSUS_ROUND_INVENTORY:
1763 cpi->inventory_synced = GNUNET_YES;
1764 case CONSENSUS_ROUND_COMPLETION:
1765 case CONSENSUS_ROUND_EXCHANGE:
1766 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n");
1767 pm = new_pending_message (sizeof *fin_msg,
1768 GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
1769 fin_msg = (struct ConsensusRoundMessage *) pm->msg;
1770 fin_msg->round = cpi->apparent_round;
1771 /* the subround is over once we kicked off sending the fin msg */
1772 /* FIXME: assert we are talking to the right peer! */
1773 /* FIXME: mark peer as synced */
1774 pm->sent_cb = fin_sent_cb;
1775 pm->sent_cb_cls = cpi;
1776 message_queue_add (cpi->mss.mq, pm);
1779 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
1787 * Functions with this signature are called whenever a
1788 * complete message is received by the tokenizer.
1790 * Do not call GNUNET_SERVER_mst_destroy in callback
1792 * @param cls closure
1793 * @param client identification of the client
1794 * @param message the actual message
1795 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1798 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1800 struct ConsensusPeerInformation *cpi = cls;
1801 GNUNET_assert (NULL == client);
1802 GNUNET_assert (NULL != cls);
1803 switch (ntohs (message->type))
1805 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
1806 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
1807 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
1808 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
1809 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
1810 return handle_p2p_element (cpi, message);
1811 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
1812 return handle_p2p_element_report (cpi, message);
1813 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
1814 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
1815 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
1816 return handle_p2p_synced (cpi, message);
1817 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
1818 return handle_p2p_fin (cpi, message);
1820 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
1821 ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
1828 shuffle (struct ConsensusSession *session)
1830 /* adapted from random_permute in util/crypto_random.c */
1837 GNUNET_assert (n > 0);
1838 ret = GNUNET_malloc (n * sizeof (unsigned int));
1839 for (i = 0; i < n; i++)
1841 for (i = n - 1; i > 0; i--)
1843 x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
1853 * Find and set the partner_incoming and partner_outgoing of our peer,
1854 * one of them may not exist in most cases.
1856 * @param session the consensus session
1859 find_partners (struct ConsensusSession *session)
1861 int mark[session->num_peers];
1863 memset (mark, 0, session->num_peers * sizeof (int));
1864 session->partner_incoming = session->partner_outgoing = NULL;
1865 for (i = 0; i < session->num_peers; i++)
1870 arc = (i + (1 << session->exp_subround)) % session->num_peers;
1871 mark[i] = mark[arc] = 1;
1872 GNUNET_assert (i != arc);
1873 if (i == session->local_peer_idx)
1875 GNUNET_assert (NULL == session->partner_outgoing);
1876 session->partner_outgoing = &session->info[session->shuffle[arc]];
1877 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1879 if (arc == session->local_peer_idx)
1881 GNUNET_assert (NULL == session->partner_incoming);
1882 session->partner_incoming = &session->info[session->shuffle[i]];
1883 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1890 * Do the next subround in the exp-scheme.
1891 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1893 * @param cls the session
1894 * @param tc task context, for when this task is invoked by the scheduler,
1895 * NULL if invoked for another reason
1898 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1900 struct ConsensusSession *session;
1903 /* don't kick off next subround if we're shutting down */
1904 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1907 /* cancel timeout */
1908 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1909 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1910 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1911 /* check if we are done with the log phase, 2-peer consensus only does one log round */
1912 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
1913 ((session->num_peers == 2) && (session->exp_round == 1)))
1915 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
1916 round_over (session, NULL);
1919 if (session->exp_round == 0)
1921 /* initialize everything for the log-rounds */
1922 session->exp_round = 1;
1923 session->exp_subround = 0;
1924 if (NULL == session->shuffle)
1925 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
1926 for (i = 0; i < session->num_peers; i++)
1927 session->shuffle[i] = i;
1929 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
1931 /* subrounds done, start new log-round */
1932 session->exp_round++;
1933 session->exp_subround = 0;
1938 session->exp_subround++;
1941 find_partners (session);
1943 #ifdef GNUNET_EXTRA_LOGGING
1947 if (session->partner_outgoing == NULL)
1950 out = (int) (session->partner_outgoing - session->info);
1951 if (session->partner_incoming == NULL)
1954 in = (int) (session->partner_incoming - session->info);
1955 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
1956 session->exp_round, session->exp_subround, in, out);
1958 #endif /* GNUNET_EXTRA_LOGGING */
1960 if (NULL != session->partner_incoming)
1962 session->partner_incoming->ibf_state = IBF_STATE_NONE;
1963 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1964 session->partner_incoming->ibf_bucket_counter = 0;
1966 /* maybe there's an early strata estimator? */
1967 replay_premature_message (session->partner_incoming);
1970 if (NULL != session->partner_outgoing)
1972 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
1973 session->partner_outgoing->ibf_bucket_counter = 0;
1974 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1975 /* make sure peer is connected and send the SE */
1976 embrace_peer (session->partner_outgoing);
1980 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
1981 subround_over, session);
1987 * Search peer in the list of peers in session.
1989 * @param peer peer to find
1990 * @param session session with peer
1991 * @return index of peer, -1 if peer is not in session
1994 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1997 for (i = 0; i < session->num_peers; i++)
1998 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
2005 * Handle a HELLO-message, send when another peer wants to join a session where
2006 * our peer is a member. The session may or may not be inhabited yet.
2009 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
2011 struct ConsensusSession *session;
2013 if (NULL != inc->requested_gid)
2015 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n");
2018 if (NULL != inc->cpi)
2020 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n");
2024 for (session = sessions_head; NULL != session; session = session->next)
2027 struct ConsensusPeerInformation *cpi;
2028 if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
2030 idx = get_peer_idx (&inc->peer_id, session);
2031 GNUNET_assert (-1 != idx);
2032 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
2033 cpi = &session->info[idx];
2035 cpi->mss = inc->mss;
2036 cpi = &session->info[idx];
2037 cpi->hello = GNUNET_YES;
2038 cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss);
2042 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
2043 inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode));
2050 * Handle tokenized messages from stream sockets.
2051 * Delegate them if the socket belongs to a session,
2052 * handle hello messages otherwise.
2054 * Do not call GNUNET_SERVER_mst_destroy in callback
2056 * @param cls closure, unused
2057 * @param client incoming socket this message comes from
2058 * @param message the actual message
2060 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
2063 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
2065 struct IncomingSocket *inc;
2066 GNUNET_assert (NULL == client);
2067 GNUNET_assert (NULL != cls);
2068 inc = (struct IncomingSocket *) cls;
2069 switch (ntohs( message->type))
2071 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
2072 return handle_p2p_hello (inc, (struct ConsensusHello *) message);
2074 if (NULL != inc->cpi)
2075 return mst_session_callback (inc->cpi, client, message);
2076 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
2077 ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
2084 * Functions of this type are called upon new stream connection from other peers
2085 * or upon binding error which happen when the app_port given in
2086 * GNUNET_STREAM_listen() is already taken.
2088 * @param cls the closure from GNUNET_STREAM_listen
2089 * @param socket the socket representing the stream; NULL on binding error
2090 * @param initiator the identity of the peer who wants to establish a stream
2091 * with us; NULL on binding error
2092 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
2093 * stream (the socket will be invalid after the call)
2096 listen_cb (void *cls,
2097 struct GNUNET_STREAM_Socket *socket,
2098 const struct GNUNET_PeerIdentity *initiator)
2100 struct IncomingSocket *incoming;
2105 return GNUNET_SYSERR;
2107 incoming = GNUNET_malloc (sizeof *incoming);
2108 incoming->peer_id = *initiator;
2109 incoming->mss.socket = socket;
2110 incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
2111 &stream_data_processor, &incoming->mss);
2112 incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
2113 incoming->mss.mst_cls = incoming;
2114 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
2120 * Disconnect a client, and destroy all sessions associated with it.
2122 * @param client the client to disconnect
2125 disconnect_client (struct GNUNET_SERVER_Client *client)
2127 struct ConsensusSession *session;
2128 GNUNET_SERVER_client_disconnect (client);
2130 /* if the client owns a session, remove it */
2131 session = sessions_head;
2132 while (NULL != session)
2134 if (client == session->scss.client)
2136 destroy_session (session);
2139 session = session->next;
2145 * Compute a global, (hopefully) unique consensus session id,
2146 * from the local id of the consensus session, and the identities of all participants.
2147 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2148 * exactly the same peers, the global id will be different.
2150 * @param session session to generate the global id for
2151 * @param session_id local id of the consensus session
2154 compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
2157 struct GNUNET_HashCode tmp;
2159 session->global_id = *session_id;
2160 for (i = 0; i < session->num_peers; ++i)
2162 GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
2163 session->global_id = tmp;
2164 GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
2165 session->global_id = tmp;
2171 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
2172 * the correct signature to be used with e.g. qsort.
2173 * We use this function instead.
2175 * @param h1 some hash code
2176 * @param h2 some hash code
2177 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2180 hash_cmp (const void *h1, const void *h2)
2182 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
2187 * Create the sorted list of peers for the session,
2188 * add the local peer if not in the join message.
2191 initialize_session_peer_list (struct ConsensusSession *session)
2193 unsigned int local_peer_in_list;
2194 uint32_t listed_peers;
2195 const struct GNUNET_PeerIdentity *msg_peers;
2196 struct GNUNET_PeerIdentity *peers;
2199 GNUNET_assert (NULL != session->join_msg);
2201 /* peers in the join message, may or may not include the local peer */
2202 listed_peers = ntohl (session->join_msg->num_peers);
2204 session->num_peers = listed_peers;
2206 msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
2208 local_peer_in_list = GNUNET_NO;
2209 for (i = 0; i < listed_peers; i++)
2211 if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
2213 local_peer_in_list = GNUNET_YES;
2218 if (GNUNET_NO == local_peer_in_list)
2219 session->num_peers++;
2221 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2223 if (GNUNET_NO == local_peer_in_list)
2224 peers[session->num_peers - 1] = *my_peer;
2226 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2227 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
2229 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
2231 for (i = 0; i < session->num_peers; ++i)
2233 /* initialize back-references, so consensus peer information can
2234 * be used as closure */
2235 session->info[i].session = session;
2236 session->info[i].peer_id = peers[i];
2244 * Add incoming peer connections to the session,
2245 * for peers who have connected to us before the local session has been established
2247 * @param session ...
2250 add_incoming_peers (struct ConsensusSession *session)
2252 struct IncomingSocket *inc;
2254 struct ConsensusPeerInformation *cpi;
2256 for (inc = incoming_sockets_head; NULL != inc; inc = inc->next)
2258 if ( (NULL == inc->requested_gid) ||
2259 (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) )
2261 for (i = 0; i < session->num_peers; i++)
2263 cpi = &session->info[i];
2264 cpi->peer_id = inc->peer_id;
2265 cpi->mss = inc->mss;
2266 cpi->hello = GNUNET_YES;
2275 * Initialize the session, continue receiving messages from the owning client
2277 * @param session the session to initialize
2280 initialize_session (struct ConsensusSession *session)
2282 struct ConsensusSession *other_session;
2284 GNUNET_assert (NULL != session->join_msg);
2285 initialize_session_peer_list (session);
2286 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
2287 compute_global_id (session, &session->join_msg->session_id);
2289 /* Check if some local client already owns the session. */
2290 other_session = sessions_head;
2291 while (NULL != other_session)
2293 if ((other_session != session) &&
2294 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2296 if (GNUNET_NO == other_session->conclude)
2299 destroy_session (session);
2302 GNUNET_SERVER_client_drop (other_session->scss.client);
2303 other_session->scss.client = NULL;
2306 other_session = other_session->next;
2309 session->local_peer_idx = get_peer_idx (my_peer, session);
2310 GNUNET_assert (-1 != session->local_peer_idx);
2311 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
2312 GNUNET_free (session->join_msg);
2313 session->join_msg = NULL;
2314 add_incoming_peers (session);
2315 GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK);
2316 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2321 * Called when a client wants to join a consensus session.
2324 * @param client client that sent the message
2325 * @param m message sent by the client
2328 client_join (void *cls,
2329 struct GNUNET_SERVER_Client *client,
2330 const struct GNUNET_MessageHeader *m)
2332 struct ConsensusSession *session;
2334 // make sure the client has not already joined a session
2335 session = sessions_head;
2336 while (NULL != session)
2338 if (session->scss.client == client)
2341 disconnect_client (client);
2344 session = session->next;
2347 session = GNUNET_new (struct ConsensusSession);
2348 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
2349 /* these have to be initialized here, as the client can already start to give us values */
2350 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2351 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2352 session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2353 session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM);
2354 session->scss.client = client;
2355 session->client_mq = create_message_queue_for_server_client (&session->scss);
2356 GNUNET_SERVER_client_keep (client);
2358 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2360 // Initialize session later if local peer identity is not known yet.
2361 if (NULL == my_peer)
2363 GNUNET_SERVER_disable_receive_done_warning (client);
2367 initialize_session (session);
2374 * Called when a client performs an insert operation.
2376 * @param cls (unused)
2377 * @param client client handle
2378 * @param m message sent by the client
2381 client_insert (void *cls,
2382 struct GNUNET_SERVER_Client *client,
2383 const struct GNUNET_MessageHeader *m)
2385 struct ConsensusSession *session;
2386 struct GNUNET_CONSENSUS_ElementMessage *msg;
2387 struct GNUNET_CONSENSUS_Element *element;
2390 session = sessions_head;
2391 while (NULL != session)
2393 if (session->scss.client == client)
2397 if (NULL == session)
2399 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
2400 GNUNET_SERVER_client_disconnect (client);
2404 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2405 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2406 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
2407 element->type = msg->element_type;
2408 element->size = element_size;
2409 memcpy (&element[1], &msg[1], element_size);
2410 element->data = &element[1];
2411 GNUNET_assert (NULL != element->data);
2412 insert_element (session, element);
2414 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2419 * Called when a client performs the conclude operation.
2421 * @param cls (unused)
2422 * @param client client handle
2423 * @param message message sent by the client
2426 client_conclude (void *cls,
2427 struct GNUNET_SERVER_Client *client,
2428 const struct GNUNET_MessageHeader *message)
2430 struct ConsensusSession *session;
2431 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
2433 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
2435 session = sessions_head;
2436 while ((session != NULL) && (session->scss.client != client))
2437 session = session->next;
2438 if (NULL == session)
2440 /* client not found */
2442 GNUNET_SERVER_client_disconnect (client);
2446 if (CONSENSUS_ROUND_BEGIN != session->current_round)
2448 /* client requested conclude twice */
2450 /* client may still own a session, destroy it */
2451 disconnect_client (client);
2455 session->conclude = GNUNET_YES;
2457 if (session->num_peers <= 1)
2459 send_client_conclude_done (session);
2463 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
2464 /* the 'begin' round is over, start with the next, real round */
2465 round_over (session, NULL);
2468 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2473 * Task that disconnects from core.
2475 * @param cls core handle
2476 * @param tc context information (why was this task triggered now)
2479 disconnect_core (void *cls,
2480 const struct GNUNET_SCHEDULER_TaskContext *tc)
2484 GNUNET_CORE_disconnect (core);
2487 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
2492 core_startup (void *cls,
2493 struct GNUNET_CORE_Handle *core,
2494 const struct GNUNET_PeerIdentity *peer)
2496 struct ConsensusSession *session;
2498 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
2499 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
2500 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
2501 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
2502 /* initialize sessions that are waiting for the local peer identity */
2503 for (session = sessions_head; NULL != session; session = session->next)
2504 if (NULL != session->join_msg)
2505 initialize_session (session);
2510 * Called to clean up, after a shutdown has been requested.
2512 * @param cls closure
2513 * @param tc context information (why was this task triggered now)
2516 shutdown_task (void *cls,
2517 const struct GNUNET_SCHEDULER_TaskContext *tc)
2519 while (NULL != incoming_sockets_head)
2521 struct IncomingSocket *socket;
2522 socket = incoming_sockets_head;
2523 if (NULL == socket->cpi)
2524 clear_message_stream_state (&socket->mss);
2525 incoming_sockets_head = incoming_sockets_head->next;
2526 GNUNET_free (socket);
2529 while (NULL != sessions_head)
2531 struct ConsensusSession *session;
2532 session = sessions_head->next;
2533 destroy_session (sessions_head);
2534 sessions_head = session;
2539 GNUNET_CORE_disconnect (core);
2543 if (NULL != listener)
2545 GNUNET_STREAM_listen_close (listener);
2549 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
2554 * Start processing consensus requests.
2556 * @param cls closure
2557 * @param server the initialized server
2558 * @param c configuration to use
2561 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
2563 /* core is only used to retrieve the peer identity */
2564 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
2567 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2568 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2569 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2570 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2571 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
2578 GNUNET_SERVER_add_handlers (server, server_handlers);
2580 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2582 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
2584 GNUNET_STREAM_OPTION_END);
2586 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
2587 core = GNUNET_CORE_connect (c, NULL,
2588 &core_startup, NULL,
2589 NULL, NULL, GNUNET_NO, NULL,
2590 GNUNET_NO, core_handlers);
2591 GNUNET_assert (NULL != core);
2592 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2597 * The main function for the consensus service.
2599 * @param argc number of arguments from the command line
2600 * @param argv command line arguments
2601 * @return 0 ok, 1 on error
2604 main (int argc, char *const *argv)
2607 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
2608 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
2609 return (GNUNET_OK == ret) ? 0 : 1;