2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_common.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_consensus_service.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_stream_lib.h"
35 #include "consensus_protocol.h"
37 #include "consensus.h"
41 * Number of IBFs in a strata estimator.
43 #define STRATA_COUNT 32
45 * Number of buckets per IBF.
47 #define STRATA_IBF_BUCKETS 80
49 * hash num parameter for the difference digests and strata estimators
51 #define STRATA_HASH_NUM 3
54 * Number of buckets that can be transmitted in one message.
56 #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values.
63 #define MAX_IBF_ORDER (16)
68 #define NUM_EXP_ROUNDS (4)
71 /* forward declarations */
73 struct ConsensusSession;
74 struct IncomingSocket;
75 struct ConsensusPeerInformation;
78 client_send_next (struct ConsensusSession *session);
81 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
84 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
87 send_ibf (struct ConsensusPeerInformation *cpi);
90 send_strata_estimator (struct ConsensusPeerInformation *cpi);
93 decode (struct ConsensusPeerInformation *cpi);
96 write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size);
99 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
103 * An element that is waiting to be transmitted to the client.
105 struct PendingElement
108 * Pending elements are kept in a DLL.
110 struct PendingElement *next;
113 * Pending elements are kept in a DLL.
115 struct PendingElement *prev;
120 struct GNUNET_CONSENSUS_Element *element;
122 /* peer this element is coming from */
123 struct ConsensusPeerInformation *cpi;
129 struct ElementList *next;
130 struct GNUNET_CONSENSUS_Element *element;
131 struct GNUNET_HashCode *element_hash;
136 * Describes the current round a consensus session is in.
141 * Not started the protocol yet.
143 CONSENSUS_ROUND_BEGIN=0,
145 * Distribution of elements with the exponential scheme.
147 CONSENSUS_ROUND_EXCHANGE,
149 * Exchange which elements each peer has, but not the elements.
151 CONSENSUS_ROUND_INVENTORY,
153 * Collect and distribute missing values.
155 CONSENSUS_ROUND_STOCK,
157 * Consensus concluded.
159 CONSENSUS_ROUND_FINISH
164 * Information about a peer that is in a consensus session.
166 struct ConsensusPeerInformation
168 struct GNUNET_PeerIdentity peer_id;
171 * Socket for communicating with the peer, either created by the local peer,
172 * or the remote peer.
174 struct GNUNET_STREAM_Socket *socket;
177 * Message tokenizer, for the data received from this peer via the stream socket.
179 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
182 * Do we connect to the peer, or does the peer connect to us?
183 * Only valid for all-to-all phases
188 * Did we receive/send a consensus hello?
193 * Handle for currently active read
195 struct GNUNET_STREAM_ReadHandle *rh;
198 * Handle for currently active read
200 struct GNUNET_STREAM_WriteHandle *wh;
203 /* beginning of round */
205 /* we currently receive an ibf */
207 /* we currently transmit an ibf */
208 IBF_STATE_TRANSMITTING,
209 /* we decode a received ibf */
211 /* wait for elements and element requests */
212 IBF_STATE_ANTICIPATE_DIFF
216 * What is the order (=log2 size) of the ibf
217 * we're currently dealing with?
218 * Interpretation depends on ibf_state.
223 * The current IBF for this peer,
224 * purpose dependent on ibf_state
226 struct InvertibleBloomFilter *ibf;
229 * How many buckets have we transmitted/received?
230 * Interpretatin depends on ibf_state
232 int ibf_bucket_counter;
235 * Strata estimator of the peer, NULL if our peer
236 * initiated the reconciliation.
238 struct StrataEstimator *se;
241 * Element keys that this peer misses, but we have them.
243 struct GNUNET_CONTAINER_MultiHashMap *requested_keys;
246 * Element keys that this peer has, but we miss.
248 struct GNUNET_CONTAINER_MultiHashMap *reported_keys;
251 * Back-reference to the consensus session,
252 * to that ConsensusPeerInformation can be used as a closure
254 struct ConsensusSession *session;
257 * Messages queued for the current round.
259 struct QueuedMessage *messages_head;
262 * Messages queued for the current round.
264 struct QueuedMessage *messages_tail;
267 * True if we are actually replaying the strata message,
268 * e.g. currently handling the premature_strata_message.
270 int replaying_strata_message;
273 * A strata message that is not actually for the current round,
274 * used in the exp-scheme.
276 struct StrataMessage *premature_strata_message;
279 * We have finishes the exp-subround with the peer.
281 int exp_subround_finished;
283 int inventory_synced;
286 * Round this peer seems to be in, according to the last SE we got.
287 * Necessary to store this, as we sometimes need to respond to a request from an
288 * older round, while we are already in the next round.
290 enum ConsensusRound apparent_round;
294 typedef void (*QueuedMessageCallback) (void *msg);
297 * A doubly linked list of messages.
301 struct GNUNET_MessageHeader *msg;
304 * Queued messages are stored in a doubly linked list.
306 struct QueuedMessage *next;
309 * Queued messages are stored in a doubly linked list.
311 struct QueuedMessage *prev;
313 QueuedMessageCallback cb;
319 struct StrataEstimator
321 struct InvertibleBloomFilter **strata;
326 * A consensus session consists of one local client and the remote authorities.
328 struct ConsensusSession
331 * Consensus sessions are kept in a DLL.
333 struct ConsensusSession *next;
336 * Consensus sessions are kept in a DLL.
338 struct ConsensusSession *prev;
341 * Join message. Used to initialize the session later,
342 * if the identity of the local peer is not yet known.
343 * NULL if the session has been fully initialized.
345 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
348 * Global consensus identification, computed
349 * from the session id and participating authorities.
351 struct GNUNET_HashCode global_id;
354 * Local client in this consensus session.
355 * There is only one client per consensus session.
357 struct GNUNET_SERVER_Client *client;
360 * Elements in the consensus set of this session,
361 * all of them either have been sent by or approved by the client.
362 * Contains ElementList.
363 * Used as a unique-key hashmap.
365 struct GNUNET_CONTAINER_MultiHashMap *values;
368 * Elements that have not been approved (or rejected) by the client yet.
370 struct PendingElement *client_approval_head;
373 * Elements that have not been approved (or rejected) by the client yet.
375 struct PendingElement *client_approval_tail;
378 * Messages to be sent to the local client that owns this session
380 struct QueuedMessage *client_messages_head;
383 * Messages to be sent to the local client that owns this session
385 struct QueuedMessage *client_messages_tail;
388 * Currently active transmit handle for sending to the client
390 struct GNUNET_SERVER_TransmitHandle *client_th;
393 * Timeout for all rounds together, single rounds will schedule a timeout task
394 * with a fraction of the conclude timeout.
396 struct GNUNET_TIME_Relative conclude_timeout;
399 * Timeout task identifier for the current round
401 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
404 * Number of other peers in the consensus
406 unsigned int num_peers;
409 * Information about the other peers,
412 struct ConsensusPeerInformation *info;
415 * Index of the local peer in the peers array
420 * Strata estimator, computed online
422 struct StrataEstimator *se;
427 struct InvertibleBloomFilter **ibfs;
432 enum ConsensusRound current_round;
439 * Permutation of peers for the current round,
440 * maps logical index (for current round) to physical index (location in info array)
445 * The partner for the current exp-round
447 struct ConsensusPeerInformation* partner_outgoing;
450 * The partner for the current exp-round
452 struct ConsensusPeerInformation* partner_incoming;
457 * Sockets from other peers who want to communicate with us.
458 * It may not be known yet which consensus session they belong to.
459 * Also, the session might not exist yet locally.
461 struct IncomingSocket
464 * Incoming sockets are kept in a double linked list.
466 struct IncomingSocket *next;
469 * Incoming sockets are kept in a double linked list.
471 struct IncomingSocket *prev;
476 struct GNUNET_STREAM_Socket *socket;
479 * Handle for currently active read
481 struct GNUNET_STREAM_ReadHandle *rh;
484 * Peer that connected to us with the socket.
486 struct GNUNET_PeerIdentity peer_id;
489 * Message stream tokenizer for this socket.
491 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
494 * Peer-in-session this socket belongs to, once known, otherwise NULL.
496 struct ConsensusPeerInformation *cpi;
499 * Set to the global session id, if the peer sent us a hello-message,
500 * but the session does not exist yet.
502 struct GNUNET_HashCode *requested_gid;
506 static struct IncomingSocket *incoming_sockets_head;
507 static struct IncomingSocket *incoming_sockets_tail;
510 * Linked list of sesstions this peer participates in.
512 static struct ConsensusSession *sessions_head;
515 * Linked list of sesstions this peer participates in.
517 static struct ConsensusSession *sessions_tail;
520 * Configuration of the consensus service.
522 static const struct GNUNET_CONFIGURATION_Handle *cfg;
525 * Handle to the server for this service.
527 static struct GNUNET_SERVER_Handle *srv;
530 * Peer that runs this service.
532 static struct GNUNET_PeerIdentity *my_peer;
535 * Handle to the core service. Only used during service startup, will be NULL after that.
537 static struct GNUNET_CORE_Handle *core;
540 * Listener for sockets from peers that want to reconcile with us.
542 static struct GNUNET_STREAM_ListenSocket *listener;
546 * Queue a message to be sent to the inhabiting client of a session.
548 * @param session session
549 * @param msg message we want to queue
552 queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
554 struct QueuedMessage *qm;
555 qm = GNUNET_malloc (sizeof *qm);
557 GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
561 * Queue a message to be sent to another peer
564 * @param msg message we want to queue
565 * @param cb callback, called when the message is given to strem
566 * @param cls closure for cb
569 queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
571 struct QueuedMessage *qm;
572 qm = GNUNET_malloc (sizeof *qm);
576 GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm);
578 write_queued (cpi, GNUNET_STREAM_OK, 0);
583 * Queue a message to be sent to another peer
586 * @param msg message we want to queue
589 queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg)
591 queue_peer_message_with_cls (cpi, msg, NULL, NULL);
597 clear_peer_messages (struct ConsensusPeerInformation *cpi)
599 cpi->messages_head = NULL;
600 cpi->messages_tail = NULL;
606 * Estimate set difference with two strata estimators,
607 * i.e. arrays of IBFs.
608 * Does not not modify its arguments.
610 * @param se1 first strata estimator
611 * @param se2 second strata estimator
612 * @return the estimated difference
615 estimate_difference (const struct StrataEstimator *se1,
616 const struct StrataEstimator *se2)
621 for (i = STRATA_COUNT - 1; i >= 0; i--)
623 struct InvertibleBloomFilter *diff;
624 /* number of keys decoded from the ibf */
628 /* FIXME: implement this without always allocating new IBFs */
629 diff = ibf_dup (se1->strata[i]);
630 ibf_subtract (diff, se2->strata[i]);
633 more = ibf_decode (diff, NULL, NULL);
634 if (GNUNET_NO == more)
639 if (GNUNET_SYSERR == more)
642 return count * (1 << (i + 1));
653 * Called when receiving data from a peer that is member of
654 * an inhabited consensus session.
656 * @param cls the closure from GNUNET_STREAM_read
657 * @param status the status of the stream at the time this function is called
658 * @param data traffic from the other side
659 * @param size the number of bytes available in data read; will be 0 on timeout
660 * @return number of bytes of processed from 'data' (any data remaining should be
661 * given to the next time the read processor is called).
664 session_stream_data_processor (void *cls,
665 enum GNUNET_STREAM_Status status,
669 struct ConsensusPeerInformation *cpi;
672 GNUNET_assert (GNUNET_STREAM_OK == status);
674 GNUNET_assert (NULL != cpi->mst);
675 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
676 if (GNUNET_SYSERR == ret)
678 /* FIXME: handle this correctly */
682 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
683 &session_stream_data_processor, cpi);
684 /* we always read all data */
689 * Called when we receive data from a peer that is not member of
690 * a session yet, or the session is not yet inhabited.
692 * @param cls the closure from GNUNET_STREAM_read
693 * @param status the status of the stream at the time this function is called
694 * @param data traffic from the other side
695 * @param size the number of bytes available in data read; will be 0 on timeout
696 * @return number of bytes of processed from 'data' (any data remaining should be
697 * given to the next time the read processor is called).
700 incoming_stream_data_processor (void *cls,
701 enum GNUNET_STREAM_Status status,
705 struct IncomingSocket *incoming;
708 GNUNET_assert (GNUNET_STREAM_OK == status);
710 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
711 if (GNUNET_SYSERR == ret)
713 /* FIXME: handle this correctly */
717 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
718 &incoming_stream_data_processor, incoming);
719 /* we always read all data */
725 send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head)
727 struct GNUNET_CONSENSUS_Element *element;
728 struct GNUNET_MessageHeader *element_msg;
733 element = head->element;
734 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
735 element_msg = GNUNET_malloc (msize);
736 element_msg->size = htons (msize);
737 switch (cpi->apparent_round)
739 case CONSENSUS_ROUND_STOCK:
740 case CONSENSUS_ROUND_EXCHANGE:
741 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
743 case CONSENSUS_ROUND_INVENTORY:
744 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
749 GNUNET_assert (NULL != element->data);
750 memcpy (&element_msg[1], element->data, element->size);
751 queue_peer_message (cpi, element_msg);
757 * Iterator to insert values into an ibf.
760 * @param key current key code
761 * @param value value in the hash map
762 * @return GNUNET_YES if we should continue to
767 ibf_values_iterator (void *cls,
768 const struct GNUNET_HashCode *key,
771 struct ConsensusPeerInformation *cpi;
772 struct ElementList *head;
773 struct IBF_Key ibf_key;
776 ibf_key = ibf_key_from_hashcode (head->element_hash);
777 GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
778 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
783 * Create and populate an IBF for the specified peer,
784 * if it does not already exist.
786 * @param cpi peer to create the ibf for
789 prepare_ibf (struct ConsensusPeerInformation *cpi)
791 if (NULL == cpi->session->ibfs[cpi->ibf_order])
793 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
794 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
800 * Called when a remote peer wants to inform the local peer
801 * that the remote peer misses elements.
802 * Elements are not reconciled.
808 handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
815 exp_subround_finished (const struct ConsensusSession *session)
819 if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO))
821 if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO))
823 if (0 == not_finished)
829 inventory_round_finished (struct ConsensusSession *session)
834 for (i = 0; i < session->num_peers; i++)
835 if (GNUNET_YES == session->info[i].inventory_synced)
837 if (finished >= (session->num_peers / 2))
845 fin_sent_cb (void *cls)
847 struct ConsensusPeerInformation *cpi;
849 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
850 switch (cpi->session->current_round)
852 case CONSENSUS_ROUND_EXCHANGE:
853 case CONSENSUS_ROUND_STOCK:
854 if (cpi->session->current_round != cpi->apparent_round)
856 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
859 cpi->exp_subround_finished = GNUNET_YES;
860 /* the subround is only really over if *both* partners are done */
861 if (GNUNET_YES == exp_subround_finished (cpi->session))
862 subround_over (cpi->session, NULL);
864 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
866 case CONSENSUS_ROUND_INVENTORY:
867 cpi->inventory_synced = GNUNET_YES;
868 if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
869 round_over (cpi->session, NULL);
870 /* FIXME: maybe go to next round */
879 * Gets called when the other peer wants us to inform that
880 * it has decoded our ibf and sent us all elements / requests
883 handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
885 struct ConsensusRoundMessage *fin_msg;
887 switch (cpi->session->current_round)
889 case CONSENSUS_ROUND_INVENTORY:
890 cpi->inventory_synced = GNUNET_YES;
891 case CONSENSUS_ROUND_STOCK:
892 case CONSENSUS_ROUND_EXCHANGE:
893 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
894 fin_msg = GNUNET_malloc (sizeof *fin_msg);
895 fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
896 fin_msg->header.size = htons (sizeof *fin_msg);
897 fin_msg->round = cpi->apparent_round;
898 /* the subround os over once we kicked off sending the fin msg */
899 /* FIXME: assert we are talking to the right peer! */
900 queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi);
901 /* FIXME: mark peer as synced */
904 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
912 * The other peer wants us to inform that he sent us all the elements we requested.
915 handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
917 struct ConsensusRoundMessage *round_msg;
918 round_msg = (struct ConsensusRoundMessage *) msg;
919 /* FIXME: only call subround_over if round is the current one! */
920 switch (cpi->session->current_round)
922 case CONSENSUS_ROUND_EXCHANGE:
923 case CONSENSUS_ROUND_STOCK:
924 if (cpi->session->current_round != round_msg->round)
926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
927 cpi->ibf_state = IBF_STATE_NONE;
928 cpi->ibf_bucket_counter = 0;
931 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
932 cpi->exp_subround_finished = GNUNET_YES;
933 if (GNUNET_YES == exp_subround_finished (cpi->session))
934 subround_over (cpi->session, NULL);
936 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
938 case CONSENSUS_ROUND_INVENTORY:
939 cpi->inventory_synced = GNUNET_YES;
940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
941 if (inventory_round_finished (cpi->session))
942 round_over (cpi->session, NULL);
945 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
952 static struct StrataEstimator *
953 strata_estimator_create ()
955 struct StrataEstimator *se;
958 /* fixme: allocate everything in one chunk */
960 se = GNUNET_malloc (sizeof (struct StrataEstimator));
961 se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT);
962 for (i = 0; i < STRATA_COUNT; i++)
963 se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
969 strata_estimator_destroy (struct StrataEstimator *se)
972 for (i = 0; i < STRATA_COUNT; i++)
973 ibf_destroy (se->strata[i]);
974 GNUNET_free (se->strata);
980 is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
982 switch (strata_msg->round)
984 case CONSENSUS_ROUND_STOCK:
985 case CONSENSUS_ROUND_EXCHANGE:
986 /* here, we also have to compare subrounds */
987 if ( (strata_msg->round != session->current_round) ||
988 (strata_msg->exp_round != session->exp_round) ||
989 (strata_msg->exp_subround != session->exp_subround))
993 if (session->current_round != strata_msg->round)
1002 * Called when a peer sends us its strata estimator.
1003 * In response, we sent out IBF of appropriate size back.
1005 * @param cpi session
1006 * @param strata_msg message
1009 handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
1016 if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY))
1018 /* we still have to handle this request appropriately */
1019 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
1020 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1022 else if (is_premature_strata_message (cpi->session, strata_msg))
1024 if (GNUNET_NO == cpi->replaying_strata_message)
1026 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n",
1027 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
1028 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
1033 if (NULL == cpi->se)
1034 cpi->se = strata_estimator_create ();
1036 cpi->apparent_round = strata_msg->round;
1038 size = ntohs (strata_msg->header.size);
1039 buf = (void *) &strata_msg[1];
1040 for (i = 0; i < STRATA_COUNT; i++)
1043 res = ibf_read (&buf, &size, cpi->se->strata[i]);
1044 GNUNET_assert (GNUNET_OK == res);
1047 diff = estimate_difference (cpi->session->se, cpi->se);
1049 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
1050 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
1052 switch (cpi->session->current_round)
1054 case CONSENSUS_ROUND_EXCHANGE:
1055 case CONSENSUS_ROUND_INVENTORY:
1056 case CONSENSUS_ROUND_STOCK:
1057 /* send IBF of the right size */
1059 while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) )
1061 if (cpi->ibf_order > MAX_IBF_ORDER)
1062 cpi->ibf_order = MAX_IBF_ORDER;
1063 cpi->ibf_order += 1;
1064 /* create ibf if not already pre-computed */
1066 if (NULL != cpi->ibf)
1067 ibf_destroy (cpi->ibf);
1068 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1069 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1070 cpi->ibf_bucket_counter = 0;
1074 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
1075 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1083 handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
1088 /* FIXME: find out if we're still expecting the same ibf! */
1090 cpi->apparent_round = cpi->session->current_round;
1092 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
1093 switch (cpi->ibf_state)
1095 case IBF_STATE_NONE:
1096 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1097 cpi->ibf_state = IBF_STATE_RECEIVING;
1098 cpi->ibf_order = digest->order;
1099 cpi->ibf_bucket_counter = 0;
1100 if (NULL != cpi->ibf)
1102 ibf_destroy (cpi->ibf);
1106 case IBF_STATE_ANTICIPATE_DIFF:
1107 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
1108 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1109 cpi->ibf_state = IBF_STATE_RECEIVING;
1110 cpi->ibf_order = digest->order;
1111 cpi->ibf_bucket_counter = 0;
1112 if (NULL != cpi->ibf)
1114 ibf_destroy (cpi->ibf);
1118 case IBF_STATE_RECEIVING:
1121 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1125 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
1127 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1132 if (NULL == cpi->ibf)
1133 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
1135 buf = (void *) &digest[1];
1136 ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
1138 cpi->ibf_bucket_counter += num_buckets;
1140 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1142 cpi->ibf_state = IBF_STATE_DECODING;
1143 cpi->ibf_bucket_counter = 0;
1145 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
1153 * Handle an element that another peer sent us
1156 handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
1158 struct PendingElement *pending_element;
1159 struct GNUNET_CONSENSUS_Element *element;
1160 struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
1163 switch (cpi->session->current_round)
1165 case CONSENSUS_ROUND_STOCK:
1166 /* FIXME: check if we really expect the element */
1167 case CONSENSUS_ROUND_EXCHANGE:
1170 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
1174 size = ntohs (element_msg->size) - sizeof *element_msg;
1176 element = GNUNET_malloc (size + sizeof *element);
1177 element->size = size;
1178 memcpy (&element[1], &element_msg[1], size);
1179 element->data = &element[1];
1181 pending_element = GNUNET_malloc (sizeof *pending_element);
1182 pending_element->element = element;
1183 GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element);
1185 client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
1186 client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
1187 client_element_msg->header.size = htons (size + sizeof *client_element_msg);
1188 memcpy (&client_element_msg[1], &element[1], size);
1190 queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
1192 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size);
1194 client_send_next (cpi->session);
1201 * Handle a request for elements.
1203 * @param cpi peer that is requesting the element
1204 * @param msg the element request message
1207 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
1209 struct GNUNET_HashCode hashcode;
1210 struct IBF_Key *ibf_key;
1213 /* element requests are allowed in every round */
1215 num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1216 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num);
1218 ibf_key = (struct IBF_Key *) &msg[1];
1221 struct ElementList *head;
1222 ibf_hashcode_from_key (*ibf_key, &hashcode);
1223 head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1224 send_elements (cpi, head);
1231 * If necessary, send a message to the peer, depending on the current
1235 embrace_peer (struct ConsensusPeerInformation *cpi)
1237 GNUNET_assert (GNUNET_YES == cpi->hello);
1238 switch (cpi->session->current_round)
1240 case CONSENSUS_ROUND_EXCHANGE:
1241 if (cpi->session->partner_outgoing != cpi)
1244 case CONSENSUS_ROUND_INVENTORY:
1246 case CONSENSUS_ROUND_STOCK:
1247 if (cpi == cpi->session->partner_outgoing)
1248 send_strata_estimator (cpi);
1256 * Handle a HELLO-message, send when another peer wants to join a session where
1257 * our peer is a member. The session may or may not be inhabited yet.
1260 handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
1262 /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
1263 struct ConsensusSession *session;
1265 session = sessions_head;
1266 while (NULL != session)
1268 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
1271 idx = get_peer_idx (&inc->peer_id, session);
1272 GNUNET_assert (-1 != idx);
1273 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
1274 inc->cpi = &session->info[idx];
1275 inc->cpi->mst = inc->mst;
1276 inc->cpi->hello = GNUNET_YES;
1277 inc->cpi->socket = inc->socket;
1278 embrace_peer (inc->cpi);
1281 session = session->next;
1283 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
1289 * Send a strata estimator.
1291 * @param cpi the peer
1294 send_strata_estimator (struct ConsensusPeerInformation *cpi)
1296 struct StrataMessage *strata_msg;
1301 cpi->apparent_round = cpi->session->current_round;
1302 cpi->ibf_state = IBF_STATE_NONE;
1303 cpi->ibf_bucket_counter = 0;
1305 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n",
1306 cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info));
1308 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1310 strata_msg = GNUNET_malloc (msize);
1311 strata_msg->header.size = htons (msize);
1312 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1313 strata_msg->round = cpi->session->current_round;
1314 strata_msg->exp_round = cpi->session->exp_round;
1315 strata_msg->exp_subround = cpi->session->exp_subround;
1317 buf = &strata_msg[1];
1318 for (i = 0; i < STRATA_COUNT; i++)
1320 ibf_write (cpi->session->se->strata[i], &buf, NULL);
1323 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
1328 * Send an IBF of the order specified in cpi.
1330 * @param cpi the peer
1333 send_ibf (struct ConsensusPeerInformation *cpi)
1335 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1336 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1338 cpi->ibf_bucket_counter = 0;
1339 while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1343 struct DifferenceDigest *digest;
1346 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1347 /* limit to maximum */
1348 if (num_buckets > BUCKETS_PER_MESSAGE)
1349 num_buckets = BUCKETS_PER_MESSAGE;
1351 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1353 digest = GNUNET_malloc (msize);
1354 digest->header.size = htons (msize);
1355 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1356 digest->order = cpi->ibf_order;
1357 digest->round = cpi->apparent_round;
1360 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
1362 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
1364 cpi->ibf_bucket_counter += num_buckets;
1366 cpi->ibf_bucket_counter = 0;
1367 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1372 * Decode the current diff ibf, and send elements/requests/reports/
1374 * @param cpi partner peer
1377 decode (struct ConsensusPeerInformation *cpi)
1380 struct GNUNET_HashCode hashcode;
1383 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1389 res = ibf_decode (cpi->ibf, &side, &key);
1390 if (GNUNET_SYSERR == res)
1392 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1393 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1396 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1397 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1398 cpi->ibf_bucket_counter = 0;
1402 if (GNUNET_NO == res)
1404 struct ConsensusRoundMessage *msg;
1405 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1406 msg = GNUNET_malloc (sizeof *msg);
1407 msg->header.size = htons (sizeof *msg);
1408 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1409 msg->round = cpi->apparent_round;
1410 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
1415 struct ElementList *head;
1416 /* we have the element(s), send it to the other peer */
1417 ibf_hashcode_from_key (key, &hashcode);
1418 head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1419 send_elements (cpi, head);
1423 struct ElementRequest *msg;
1427 msize = (sizeof *msg) + sizeof (struct IBF_Key);
1428 msg = GNUNET_malloc (msize);
1429 switch (cpi->apparent_round)
1431 case CONSENSUS_ROUND_STOCK:
1432 /* FIXME: check if we really want to request the element */
1433 case CONSENSUS_ROUND_EXCHANGE:
1434 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1436 case CONSENSUS_ROUND_INVENTORY:
1437 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
1442 msg->header.size = htons (msize);
1443 p = (struct IBF_Key *) &msg[1];
1445 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
1452 * Functions with this signature are called whenever a
1453 * complete message is received by the tokenizer.
1455 * Do not call GNUNET_SERVER_mst_destroy in callback
1457 * @param cls closure
1458 * @param client identification of the client
1459 * @param message the actual message
1460 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1463 mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1465 struct ConsensusPeerInformation *cpi;
1467 switch (ntohs (message->type))
1469 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
1470 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
1471 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
1472 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
1473 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
1474 return handle_p2p_element (cpi, message);
1475 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
1476 return handle_p2p_element_report (cpi, message);
1477 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
1478 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
1479 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
1480 return handle_p2p_synced (cpi, message);
1481 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
1482 return handle_p2p_fin (cpi, message);
1484 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
1485 ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
1492 * Handle tokenized messages from stream sockets.
1493 * Delegate them if the socket belongs to a session,
1494 * handle hello messages otherwise.
1496 * Do not call GNUNET_SERVER_mst_destroy in callback
1498 * @param cls closure, unused
1499 * @param client incoming socket this message comes from
1500 * @param message the actual message
1502 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1505 mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
1507 struct IncomingSocket *inc;
1508 inc = (struct IncomingSocket *) client;
1509 switch (ntohs( message->type))
1511 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO:
1512 return handle_p2p_hello (inc, (struct ConsensusHello *) message);
1514 if (NULL != inc->cpi)
1515 return mst_session_callback (inc->cpi, client, message);
1516 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
1517 ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
1524 * Functions of this type are called upon new stream connection from other peers
1525 * or upon binding error which happen when the app_port given in
1526 * GNUNET_STREAM_listen() is already taken.
1528 * @param cls the closure from GNUNET_STREAM_listen
1529 * @param socket the socket representing the stream; NULL on binding error
1530 * @param initiator the identity of the peer who wants to establish a stream
1531 * with us; NULL on binding error
1532 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
1533 * stream (the socket will be invalid after the call)
1536 listen_cb (void *cls,
1537 struct GNUNET_STREAM_Socket *socket,
1538 const struct GNUNET_PeerIdentity *initiator)
1540 struct IncomingSocket *incoming;
1541 GNUNET_assert (NULL != socket);
1542 incoming = GNUNET_malloc (sizeof *incoming);
1543 incoming->socket = socket;
1544 incoming->peer_id = *initiator;
1545 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1546 &incoming_stream_data_processor, incoming);
1547 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
1548 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
1554 * Iterator over hash map entries.
1556 * @param cls closure
1557 * @param key current key code
1558 * @param value value in the hash map
1559 * @return GNUNET_YES if we should continue to
1564 destroy_element_list_iter (void *cls,
1565 const struct GNUNET_HashCode * key,
1568 struct ElementList *el;
1572 struct ElementList *el_old;
1575 GNUNET_free (el_old->element_hash);
1576 GNUNET_free (el_old->element);
1577 GNUNET_free (el_old);
1584 * Destroy a session, free all resources associated with it.
1586 * @param session the session to destroy
1589 destroy_session (struct ConsensusSession *session)
1593 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
1594 GNUNET_SERVER_client_drop (session->client);
1595 session->client = NULL;
1596 if (NULL != session->shuffle)
1598 GNUNET_free (session->shuffle);
1599 session->shuffle = NULL;
1601 if (NULL != session->se)
1603 strata_estimator_destroy (session->se);
1606 if (NULL != session->info)
1608 for (i = 0; i < session->num_peers; i++)
1610 struct ConsensusPeerInformation *cpi;
1611 cpi = &session->info[i];
1612 if ((NULL != cpi) && (NULL != cpi->socket))
1614 if (NULL != cpi->rh)
1616 GNUNET_STREAM_read_cancel (cpi->rh);
1619 if (NULL != cpi->wh)
1621 GNUNET_STREAM_write_cancel (cpi->wh);
1624 GNUNET_STREAM_close (cpi->socket);
1627 if (NULL != cpi->se)
1629 strata_estimator_destroy (cpi->se);
1632 if (NULL != cpi->ibf)
1634 ibf_destroy (cpi->ibf);
1637 if (NULL != cpi->mst)
1639 GNUNET_SERVER_mst_destroy (cpi->mst);
1643 GNUNET_free (session->info);
1644 session->info = NULL;
1646 if (NULL != session->ibfs)
1648 for (i = 0; i <= MAX_IBF_ORDER; i++)
1650 if (NULL != session->ibfs[i])
1652 ibf_destroy (session->ibfs[i]);
1653 session->ibfs[i] = NULL;
1656 GNUNET_free (session->ibfs);
1657 session->ibfs = NULL;
1659 if (NULL != session->values)
1661 GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL);
1662 GNUNET_CONTAINER_multihashmap_destroy (session->values);
1663 session->values = NULL;
1665 GNUNET_free (session);
1670 * Disconnect a client, and destroy all sessions associated with it.
1672 * @param client the client to disconnect
1675 disconnect_client (struct GNUNET_SERVER_Client *client)
1677 struct ConsensusSession *session;
1678 GNUNET_SERVER_client_disconnect (client);
1680 /* if the client owns a session, remove it */
1681 session = sessions_head;
1682 while (NULL != session)
1684 if (client == session->client)
1686 destroy_session (session);
1689 session = session->next;
1695 * Compute a global, (hopefully) unique consensus session id,
1696 * from the local id of the consensus session, and the identities of all participants.
1697 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
1698 * exactly the same peers, the global id will be different.
1700 * @param session session to generate the global id for
1701 * @param session_id local id of the consensus session
1704 compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
1707 struct GNUNET_HashCode tmp;
1709 session->global_id = *session_id;
1710 for (i = 0; i < session->num_peers; ++i)
1712 GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
1713 session->global_id = tmp;
1714 GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
1715 session->global_id = tmp;
1721 * Transmit a queued message to the session's client.
1723 * @param cls consensus session
1724 * @param size number of bytes available in buf
1725 * @param buf where the callee should write the message
1726 * @return number of bytes written to buf
1729 transmit_queued (void *cls, size_t size,
1732 struct ConsensusSession *session;
1733 struct QueuedMessage *qmsg;
1737 session->client_th = NULL;
1739 qmsg = session->client_messages_head;
1740 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1741 GNUNET_assert (qmsg);
1745 destroy_session (session);
1749 msg_size = ntohs (qmsg->msg->size);
1751 GNUNET_assert (size >= msg_size);
1753 memcpy (buf, qmsg->msg, msg_size);
1754 GNUNET_free (qmsg->msg);
1757 client_send_next (session);
1764 * Schedule transmitting the next queued message (if any) to the inhabiting client of a session.
1766 * @param session the consensus session
1769 client_send_next (struct ConsensusSession *session)
1772 GNUNET_assert (NULL != session);
1774 if (NULL != session->client_th)
1777 if (NULL != session->client_messages_head)
1780 msize = ntohs (session->client_messages_head->msg->size);
1781 session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
1782 GNUNET_TIME_UNIT_FOREVER_REL,
1783 &transmit_queued, session);
1789 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
1790 * the correct signature to be used with e.g. qsort.
1791 * We use this function instead.
1793 * @param h1 some hash code
1794 * @param h2 some hash code
1795 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
1798 hash_cmp (const void *h1, const void *h2)
1800 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
1805 * Search peer in the list of peers in session.
1807 * @param peer peer to find
1808 * @param session session with peer
1809 * @return index of peer, -1 if peer is not in session
1812 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1815 for (i = 0; i < session->num_peers; i++)
1816 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
1823 * Called when stream has finishes writing the hello message
1826 hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1828 struct ConsensusPeerInformation *cpi;
1832 cpi->hello = GNUNET_YES;
1833 GNUNET_assert (GNUNET_STREAM_OK == status);
1839 * Called when we established a stream connection to another peer
1841 * @param cls cpi of the peer we just connected to
1842 * @param socket socket to use to communicate with the other side (read/write)
1845 open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1847 struct ConsensusPeerInformation *cpi;
1848 struct ConsensusHello *hello;
1851 hello = GNUNET_malloc (sizeof *hello);
1852 hello->header.size = htons (sizeof *hello);
1853 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1854 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1855 GNUNET_assert (NULL == cpi->mst);
1856 cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1858 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1859 GNUNET_free (hello);
1860 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1861 &session_stream_data_processor, cpi);
1866 * Create the sorted list of peers for the session,
1867 * add the local peer if not in the join message.
1870 initialize_session_peer_list (struct ConsensusSession *session)
1872 unsigned int local_peer_in_list;
1873 uint32_t listed_peers;
1874 const struct GNUNET_PeerIdentity *msg_peers;
1875 struct GNUNET_PeerIdentity *peers;
1878 GNUNET_assert (NULL != session->join_msg);
1880 /* peers in the join message, may or may not include the local peer */
1881 listed_peers = ntohl (session->join_msg->num_peers);
1883 session->num_peers = listed_peers;
1885 msg_peers = (struct GNUNET_PeerIdentity *) &session->join_msg[1];
1887 local_peer_in_list = GNUNET_NO;
1888 for (i = 0; i < listed_peers; i++)
1890 if (0 == memcmp (&msg_peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
1892 local_peer_in_list = GNUNET_YES;
1897 if (GNUNET_NO == local_peer_in_list)
1898 session->num_peers++;
1900 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
1902 if (GNUNET_NO == local_peer_in_list)
1903 peers[session->num_peers - 1] = *my_peer;
1905 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
1906 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1908 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1910 for (i = 0; i < session->num_peers; ++i)
1912 /* initialize back-references, so consensus peer information can
1913 * be used as closure */
1914 session->info[i].session = session;
1915 session->info[i].peer_id = peers[i];
1923 strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key)
1928 /* count trailing '1'-bits of v */
1929 for (i = 0; v & 1; v>>=1, i++)
1931 ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
1936 * Add incoming peer connections to the session,
1937 * for peers who have connected to us before the local session has been established
1939 * @param session ...
1942 add_incoming_peers (struct ConsensusSession *session)
1944 struct IncomingSocket *inc;
1945 inc = incoming_sockets_head;
1949 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid))
1952 for (i = 0; i < session->num_peers; i++)
1954 struct ConsensusPeerInformation *cpi;
1955 cpi = &session->info[i];
1956 if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity)))
1958 cpi->socket = inc->socket;
1960 inc->cpi->mst = inc->mst;
1961 inc->cpi->hello = GNUNET_YES;
1972 * Initialize the session, continue receiving messages from the owning client
1974 * @param session the session to initialize
1977 initialize_session (struct ConsensusSession *session)
1979 const struct ConsensusSession *other_session;
1981 GNUNET_assert (NULL != session->join_msg);
1982 initialize_session_peer_list (session);
1983 session->current_round = CONSENSUS_ROUND_BEGIN;
1984 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1985 compute_global_id (session, &session->join_msg->session_id);
1987 /* Check if some local client already owns the session. */
1988 other_session = sessions_head;
1989 while (NULL != other_session)
1991 if ((other_session != session) &&
1992 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1994 /* session already owned by another client */
1996 disconnect_client (session->client);
1999 other_session = other_session->next;
2002 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
2003 session->local_peer_idx = get_peer_idx (my_peer, session);
2004 GNUNET_assert (-1 != session->local_peer_idx);
2005 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
2006 session->se = strata_estimator_create ();
2007 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
2008 GNUNET_free (session->join_msg);
2009 session->join_msg = NULL;
2010 add_incoming_peers (session);
2011 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
2012 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2017 * Called when a client wants to join a consensus session.
2020 * @param client client that sent the message
2021 * @param m message sent by the client
2024 client_join (void *cls,
2025 struct GNUNET_SERVER_Client *client,
2026 const struct GNUNET_MessageHeader *m)
2028 struct ConsensusSession *session;
2030 // make sure the client has not already joined a session
2031 session = sessions_head;
2032 while (NULL != session)
2034 if (session->client == client)
2037 disconnect_client (client);
2040 session = session->next;
2043 session = GNUNET_malloc (sizeof (struct ConsensusSession));
2044 session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m);
2045 session->client = client;
2046 GNUNET_SERVER_client_keep (client);
2048 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2050 // Initialize session later if local peer identity is not known yet.
2051 if (NULL == my_peer)
2053 GNUNET_SERVER_disable_receive_done_warning (client);
2057 initialize_session (session);
2062 * Hash a block of data, producing a replicated ibf hash.
2065 hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
2067 struct IBF_Key ibf_key;
2068 GNUNET_CRYPTO_hash (block, size, ret);
2069 ibf_key = ibf_key_from_hashcode (ret);
2070 ibf_hashcode_from_key (ibf_key, ret);
2075 insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
2077 struct GNUNET_HashCode hash;
2078 struct ElementList *head;
2080 hash_for_ibf (element->data, element->size, &hash);
2082 head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
2088 head = GNUNET_malloc (sizeof *head);
2089 head->element = element;
2091 head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2092 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
2093 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2094 strata_estimator_insert (session->se, &hash);
2096 for (i = 0; i <= MAX_IBF_ORDER; i++)
2097 if (NULL != session->ibfs[i])
2098 ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
2102 struct ElementList *el;
2103 el = GNUNET_malloc (sizeof *el);
2104 head->element = element;
2106 head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2107 while (NULL != head->next)
2115 * Called when a client performs an insert operation.
2117 * @param cls (unused)
2118 * @param client client handle
2119 * @param m message sent by the client
2122 client_insert (void *cls,
2123 struct GNUNET_SERVER_Client *client,
2124 const struct GNUNET_MessageHeader *m)
2126 struct ConsensusSession *session;
2127 struct GNUNET_CONSENSUS_ElementMessage *msg;
2128 struct GNUNET_CONSENSUS_Element *element;
2131 session = sessions_head;
2132 while (NULL != session)
2134 if (session->client == client)
2138 if (NULL == session)
2140 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to insert, but client is not in any session\n");
2141 GNUNET_SERVER_client_disconnect (client);
2145 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2146 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2148 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
2150 element->type = msg->element_type;
2151 element->size = element_size;
2152 memcpy (&element[1], &msg[1], element_size);
2153 element->data = &element[1];
2155 GNUNET_assert (NULL != element->data);
2157 insert_element (session, element);
2159 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2161 client_send_next (session);
2167 * Functions of this signature are called whenever writing operations
2168 * on a stream are executed
2170 * @param cls the closure from GNUNET_STREAM_write
2171 * @param status the status of the stream at the time this function is called;
2172 * GNUNET_STREAM_OK if writing to stream was completed successfully;
2173 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
2174 * (this doesn't mean that the data is never sent, the receiver may
2175 * have read the data but its ACKs may have been lost);
2176 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
2177 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
2179 * @param size the number of bytes written
2182 write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
2184 struct ConsensusPeerInformation *cpi;
2186 GNUNET_assert (GNUNET_STREAM_OK == status);
2189 if (NULL != cpi->messages_head)
2191 struct QueuedMessage *qm;
2192 qm = cpi->messages_head;
2193 GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm);
2194 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
2195 GNUNET_TIME_UNIT_FOREVER_REL,
2199 GNUNET_free (qm->msg);
2201 GNUNET_assert (NULL != cpi->wh);
2207 shuffle (struct ConsensusSession *session)
2209 /* FIXME: implement */
2214 * Find and set the partner_incoming and partner_outgoing of our peer,
2215 * one of them may not exist in most cases.
2217 * @param session the consensus session
2220 find_partners (struct ConsensusSession *session)
2222 int mark[session->num_peers];
2224 memset (mark, 0, session->num_peers * sizeof (int));
2225 session->partner_incoming = session->partner_outgoing = NULL;
2226 for (i = 0; i < session->num_peers; i++)
2231 arc = (i + (1 << session->exp_subround)) % session->num_peers;
2232 mark[i] = mark[arc] = 1;
2233 GNUNET_assert (i != arc);
2234 if (i == session->local_peer_idx)
2236 GNUNET_assert (NULL == session->partner_outgoing);
2237 session->partner_outgoing = &session->info[session->shuffle[arc]];
2238 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2240 if (arc == session->local_peer_idx)
2242 GNUNET_assert (NULL == session->partner_incoming);
2243 session->partner_incoming = &session->info[session->shuffle[i]];
2244 session->partner_incoming->exp_subround_finished = GNUNET_NO;
2251 replay_premature_message (struct ConsensusPeerInformation *cpi)
2253 if (NULL != cpi->premature_strata_message)
2255 struct StrataMessage *sm;
2257 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
2258 sm = cpi->premature_strata_message;
2259 cpi->premature_strata_message = NULL;
2261 cpi->replaying_strata_message = GNUNET_YES;
2262 handle_p2p_strata (cpi, sm);
2263 cpi->replaying_strata_message = GNUNET_NO;
2271 * Do the next subround in the exp-scheme.
2272 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2274 * @param cls the session
2275 * @param tc task context, for when this task is invoked by the scheduler,
2276 * NULL if invoked for another reason
2279 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2281 struct ConsensusSession *session;
2284 /* don't kick off next subround if we're shutting down */
2285 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2288 /* don't send any messages from the last round */
2290 clear_peer_messages (session->partner_outgoing);
2291 clear_peer_messages (session->partner_incoming);
2292 for (i = 0; i < session->num_peers; i++)
2293 clear_peer_messages (&session->info[i]);
2295 /* cancel timeout */
2296 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2297 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2298 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2299 /* check if we are done with the log phase, 2-peer consensus only does one log round */
2300 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
2301 ((session->num_peers == 2) && (session->exp_round == 1)))
2303 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
2304 round_over (session, NULL);
2307 if (session->exp_round == 0)
2309 /* initialize everything for the log-rounds */
2310 session->exp_round = 1;
2311 session->exp_subround = 0;
2312 if (NULL == session->shuffle)
2313 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
2314 for (i = 0; i < session->num_peers; i++)
2315 session->shuffle[i] = i;
2317 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
2319 /* subrounds done, start new log-round */
2320 session->exp_round++;
2321 session->exp_subround = 0;
2326 session->exp_subround++;
2329 find_partners (session);
2331 #ifdef GNUNET_EXTRA_LOGGING
2335 if (session->partner_outgoing == NULL)
2338 out = (int) (session->partner_outgoing - session->info);
2339 if (session->partner_incoming == NULL)
2342 in = (int) (session->partner_incoming - session->info);
2343 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
2344 session->exp_round, session->exp_subround, in, out);
2346 #endif /* GNUNET_EXTRA_LOGGING */
2348 if (NULL != session->partner_incoming)
2350 session->partner_incoming->ibf_state = IBF_STATE_NONE;
2351 session->partner_incoming->exp_subround_finished = GNUNET_NO;
2352 session->partner_incoming->ibf_bucket_counter = 0;
2354 /* maybe there's an early strata estimator? */
2355 replay_premature_message (session->partner_incoming);
2358 if (NULL != session->partner_outgoing)
2360 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
2361 session->partner_outgoing->ibf_bucket_counter = 0;
2362 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2364 if (NULL == session->partner_outgoing->socket)
2366 session->partner_outgoing->socket =
2367 GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2368 open_cb, session->partner_outgoing,
2369 GNUNET_STREAM_OPTION_END);
2371 else if (GNUNET_YES == session->partner_outgoing->hello)
2373 send_strata_estimator (session->partner_outgoing);
2375 /* else: do nothing, the send hello cb will handle this */
2379 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
2380 subround_over, session);
2385 contact_peer_a2a (struct ConsensusPeerInformation *cpi)
2387 cpi->is_outgoing = GNUNET_YES;
2388 if (NULL == cpi->socket)
2390 cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2391 open_cb, cpi, GNUNET_STREAM_OPTION_END);
2393 else if (GNUNET_YES == cpi->hello)
2395 send_strata_estimator (cpi);
2400 * Start the inventory round, contact all peers we are supposed to contact.
2402 * @param session the current session
2405 start_inventory (struct ConsensusSession *session)
2410 for (i = 0; i < session->num_peers; i++)
2412 session->info[i].ibf_bucket_counter = 0;
2413 session->info[i].ibf_state = IBF_STATE_NONE;
2414 session->info[i].is_outgoing = GNUNET_NO;
2417 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
2418 i = (session->local_peer_idx + 1) % session->num_peers;
2421 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
2422 contact_peer_a2a (&session->info[i]);
2423 session->info[i].is_outgoing = GNUNET_YES;
2424 i = (i + 1) % session->num_peers;
2426 // tie-breaker for even number of peers
2427 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
2429 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
2430 session->info[last].is_outgoing = GNUNET_YES;
2431 contact_peer_a2a (&session->info[last]);
2434 for (i = 0; i < session->num_peers; i++)
2436 if (GNUNET_NO == session->info[i].is_outgoing)
2437 replay_premature_message (&session->info[i]);
2442 send_client_conclude_done (struct ConsensusSession *session)
2444 struct GNUNET_MessageHeader *msg;
2445 session->current_round = CONSENSUS_ROUND_FINISH;
2446 msg = GNUNET_malloc (sizeof *msg);
2447 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2448 msg->size = htons (sizeof *msg);
2449 queue_client_message (session, msg);
2450 client_send_next (session);
2454 * Start the next round.
2455 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2457 * @param cls the session
2458 * @param tc task context, for when this task is invoked by the scheduler,
2459 * NULL if invoked for another reason
2462 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2464 struct ConsensusSession *session;
2466 /* don't kick off next round if we're shutting down */
2467 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2471 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
2474 for (i = 0; i < session->num_peers; i++)
2475 clear_peer_messages (&session->info[i]);
2478 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2480 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2481 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2484 switch (session->current_round)
2486 case CONSENSUS_ROUND_BEGIN:
2487 session->current_round = CONSENSUS_ROUND_EXCHANGE;
2488 session->exp_round = 0;
2489 subround_over (session, NULL);
2491 case CONSENSUS_ROUND_EXCHANGE:
2492 /* handle two peers specially */
2493 if (session->num_peers <= 2)
2495 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx);
2496 send_client_conclude_done (session);
2499 session->current_round = CONSENSUS_ROUND_INVENTORY;
2500 start_inventory (session);
2502 case CONSENSUS_ROUND_INVENTORY:
2503 session->current_round = CONSENSUS_ROUND_STOCK;
2504 session->exp_round = 0;
2505 subround_over (session, NULL);
2507 case CONSENSUS_ROUND_STOCK:
2508 session->current_round = CONSENSUS_ROUND_FINISH;
2509 send_client_conclude_done (session);
2518 * Called when a client performs the conclude operation.
2520 * @param cls (unused)
2521 * @param client client handle
2522 * @param message message sent by the client
2525 client_conclude (void *cls,
2526 struct GNUNET_SERVER_Client *client,
2527 const struct GNUNET_MessageHeader *message)
2529 struct ConsensusSession *session;
2530 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
2532 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
2534 session = sessions_head;
2535 while ((session != NULL) && (session->client != client))
2536 session = session->next;
2537 if (NULL == session)
2539 /* client not found */
2541 GNUNET_SERVER_client_disconnect (client);
2545 if (CONSENSUS_ROUND_BEGIN != session->current_round)
2547 /* client requested conclude twice */
2549 /* client may still own a session, destroy it */
2550 disconnect_client (client);
2554 if (session->num_peers <= 1)
2556 send_client_conclude_done (session);
2560 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
2561 /* the 'begin' round is over, start with the next, real round */
2562 round_over (session, NULL);
2565 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2566 client_send_next (session);
2571 * Called when a client sends an ack
2573 * @param cls (unused)
2574 * @param client client handle
2575 * @param message message sent by the client
2578 client_ack (void *cls,
2579 struct GNUNET_SERVER_Client *client,
2580 const struct GNUNET_MessageHeader *message)
2582 struct ConsensusSession *session;
2583 struct GNUNET_CONSENSUS_AckMessage *msg;
2584 struct PendingElement *pending;
2585 struct GNUNET_CONSENSUS_Element *element;
2587 session = sessions_head;
2588 while (NULL != session)
2590 if (session->client == client)
2594 if (NULL == session)
2596 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
2597 GNUNET_SERVER_client_disconnect (client);
2601 pending = session->client_approval_head;
2603 GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending);
2605 msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
2609 element = pending->element;
2610 insert_element (session, element);
2611 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
2614 GNUNET_free (pending);
2616 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2621 * Task that disconnects from core.
2623 * @param cls core handle
2624 * @param tc context information (why was this task triggered now)
2627 disconnect_core (void *cls,
2628 const struct GNUNET_SCHEDULER_TaskContext *tc)
2632 GNUNET_CORE_disconnect (core);
2635 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
2640 core_startup (void *cls,
2641 struct GNUNET_CORE_Handle *core,
2642 const struct GNUNET_PeerIdentity *peer)
2644 struct ConsensusSession *session;
2646 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
2647 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
2648 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
2649 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
2651 session = sessions_head;
2652 while (NULL != session)
2654 if (NULL != session->join_msg)
2655 initialize_session (session);
2656 session = session->next;
2662 * Called to clean up, after a shutdown has been requested.
2664 * @param cls closure
2665 * @param tc context information (why was this task triggered now)
2668 shutdown_task (void *cls,
2669 const struct GNUNET_SCHEDULER_TaskContext *tc)
2671 /* FIXME: complete; write separate destructors for different data types */
2673 while (NULL != incoming_sockets_head)
2675 struct IncomingSocket *socket;
2676 socket = incoming_sockets_head;
2677 if (NULL != socket->rh)
2679 GNUNET_STREAM_read_cancel (socket->rh);
2682 if (NULL == socket->cpi)
2684 GNUNET_STREAM_close (socket->socket);
2685 socket->socket = NULL;
2686 if (NULL != socket->mst)
2688 GNUNET_SERVER_mst_destroy (socket->mst);
2692 incoming_sockets_head = incoming_sockets_head->next;
2693 GNUNET_free (socket);
2696 while (NULL != sessions_head)
2698 struct ConsensusSession *session;
2699 session = sessions_head->next;
2700 destroy_session (sessions_head);
2701 sessions_head = session;
2706 GNUNET_CORE_disconnect (core);
2710 if (NULL != listener)
2712 GNUNET_STREAM_listen_close (listener);
2716 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
2721 * Start processing consensus requests.
2723 * @param cls closure
2724 * @param server the initialized server
2725 * @param c configuration to use
2728 run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
2730 /* core is only used to retrieve the peer identity */
2731 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
2734 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2735 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2736 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2737 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2738 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
2739 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
2740 sizeof (struct GNUNET_CONSENSUS_AckMessage)},
2747 GNUNET_SERVER_add_handlers (server, server_handlers);
2749 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2751 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
2753 GNUNET_STREAM_OPTION_END);
2755 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
2756 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
2757 GNUNET_assert (NULL != core);
2759 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2764 * The main function for the consensus service.
2766 * @param argc number of arguments from the command line
2767 * @param argv command line arguments
2768 * @return 0 ok, 1 on error
2771 main (int argc, char *const *argv)
2774 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
2775 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
2776 return (GNUNET_OK == ret) ? 0 : 1;