2 This file is part of GNUnet
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
38 * Log macro that prefixes the local peer and the peer we are in contact with.
40 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
41 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
45 * Number of exponential rounds, used in the exp and completion round.
47 #define NUM_EXP_ROUNDS 4
49 /* forward declarations */
51 /* mutual recursion with struct ConsensusSession */
52 struct ConsensusPeerInformation;
54 /* mutual recursion with round_over */
56 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
60 * Describes the current round a consensus session is in.
65 * Not started the protocol yet.
67 CONSENSUS_ROUND_BEGIN=0,
69 * Distribution of elements with the exponential scheme.
71 CONSENSUS_ROUND_EXCHANGE,
73 * Exchange which elements each peer has, but don't
74 * transmit the element's data, only their SHA-512 hashes.
75 * This round uses the all-to-all scheme.
77 CONSENSUS_ROUND_INVENTORY,
79 * Collect and distribute missing values with the exponential scheme.
81 CONSENSUS_ROUND_COMPLETION,
83 * Consensus concluded. After timeout and finished communication with client,
84 * consensus session will be destroyed.
86 CONSENSUS_ROUND_FINISH
91 * Complete information about the current round and all
97 * The current main round.
99 enum ConsensusRound round;
101 * The current exp round, valid if
102 * the main round is an exp round.
106 * The current exp subround, valid if
107 * the main round is an exp round.
109 uint32_t exp_subround;
114 * A consensus session consists of one local client and the remote authorities.
116 struct ConsensusSession
119 * Consensus sessions are kept in a DLL.
121 struct ConsensusSession *next;
124 * Consensus sessions are kept in a DLL.
126 struct ConsensusSession *prev;
129 * Global consensus identification, computed
130 * from the session id and participating authorities.
132 struct GNUNET_HashCode global_id;
135 * Client that inhabits the session
137 struct GNUNET_SERVER_Client *client;
140 * Queued messages to the client.
142 struct GNUNET_MQ_Handle *client_mq;
145 * Time when the conclusion of the consensus should begin.
147 struct GNUNET_TIME_Absolute conclude_start;
150 * Timeout for all rounds together, single rounds will schedule a timeout task
151 * with a fraction of the conclude timeout.
152 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
154 struct GNUNET_TIME_Absolute conclude_deadline;
157 * Timeout task identifier for the current round.
159 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
162 * Number of other peers in the consensus.
164 unsigned int num_peers;
167 * Information about the other peers,
170 struct ConsensusPeerInformation *info;
173 * Index of the local peer in the peers array
175 unsigned int local_peer_idx;
180 enum ConsensusRound current_round;
183 * Permutation of peers for the current round,
188 * Inverse permutation of peers for the current round,
190 uint32_t *shuffle_inv;
193 * Current round of the exponential scheme.
198 * Current sub-round of the exponential scheme.
200 uint32_t exp_subround;
203 * The partner for the current exp-round
205 struct ConsensusPeerInformation *partner_outgoing;
208 * The partner for the current exp-round
210 struct ConsensusPeerInformation *partner_incoming;
213 * The consensus set of this session.
215 struct GNUNET_SET_Handle *element_set;
218 * Listener for requests from other peers.
219 * Uses the session's global id as app id.
221 struct GNUNET_SET_ListenHandle *set_listener;
226 * Information about a peer that is in a consensus session.
228 struct ConsensusPeerInformation
231 * Peer identitty of the peer in the consensus session
233 struct GNUNET_PeerIdentity peer_id;
236 * Back-reference to the consensus session,
237 * to that ConsensusPeerInformation can be used as a closure
239 struct ConsensusSession *session;
242 * We have finishes the exp-subround with the peer.
244 int exp_subround_finished;
247 * Set operation we are currently executing with this peer.
249 struct GNUNET_SET_OperationHandle *set_op;
252 * Set operation we are planning on executing with this peer.
254 struct GNUNET_SET_OperationHandle *delayed_set_op;
257 * Info about the round of the delayed set operation.
259 struct RoundInfo delayed_round_info;
264 * Linked list of sessions this peer participates in.
266 static struct ConsensusSession *sessions_head;
269 * Linked list of sessions this peer participates in.
271 static struct ConsensusSession *sessions_tail;
274 * Configuration of the consensus service.
276 static const struct GNUNET_CONFIGURATION_Handle *cfg;
279 * Handle to the server for this service.
281 static struct GNUNET_SERVER_Handle *srv;
284 * Peer that runs this service.
286 static struct GNUNET_PeerIdentity my_peer;
290 have_exp_subround_finished (const struct ConsensusSession *session)
294 if ( (NULL != session->partner_outgoing) &&
295 (GNUNET_NO == session->partner_outgoing->exp_subround_finished) )
297 if ( (NULL != session->partner_incoming) &&
298 (GNUNET_NO == session->partner_incoming->exp_subround_finished) )
300 if (0 == not_finished)
307 * Destroy a session, free all resources associated with it.
309 * @param session the session to destroy
312 destroy_session (struct ConsensusSession *session)
316 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
317 if (NULL != session->element_set)
319 GNUNET_SET_destroy (session->element_set);
320 session->element_set = NULL;
322 if (NULL != session->set_listener)
324 GNUNET_SET_listen_cancel (session->set_listener);
325 session->set_listener = NULL;
327 if (NULL != session->client_mq)
329 GNUNET_MQ_destroy (session->client_mq);
330 session->client_mq = NULL;
332 if (NULL != session->client)
334 GNUNET_SERVER_client_disconnect (session->client);
335 session->client = NULL;
337 if (NULL != session->shuffle)
339 GNUNET_free (session->shuffle);
340 session->shuffle = NULL;
342 if (NULL != session->shuffle_inv)
344 GNUNET_free (session->shuffle_inv);
345 session->shuffle_inv = NULL;
347 if (NULL != session->info)
349 for (i = 0; i < session->num_peers; i++)
351 struct ConsensusPeerInformation *cpi;
352 cpi = &session->info[i];
353 if (NULL != cpi->set_op)
355 GNUNET_SET_operation_cancel (cpi->set_op);
359 GNUNET_free (session->info);
360 session->info = NULL;
362 GNUNET_free (session);
367 * Iterator for set elements.
370 * @param element the current element, NULL if all elements have been
372 * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
375 send_to_client_iter (void *cls,
376 const struct GNUNET_SET_Element *element)
378 struct ConsensusSession *session = cls;
379 struct GNUNET_MQ_Envelope *ev;
383 struct GNUNET_CONSENSUS_ElementMessage *m;
385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
386 session->local_peer_idx);
388 ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
389 m->element_type = htons (element->type);
390 memcpy (&m[1], element->data, element->size);
391 GNUNET_MQ_send (session->client_mq, ev);
395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
396 session->local_peer_idx);
397 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
398 GNUNET_MQ_send (session->client_mq, ev);
405 * Start the next round.
406 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
408 * @param cls the session
409 * @param tc task context, for when this task is invoked by the scheduler,
410 * NULL if invoked for another reason
413 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
415 struct ConsensusSession *session;
417 /* don't kick off next round if we're shutting down */
418 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
424 if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
426 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
427 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
430 switch (session->current_round)
432 case CONSENSUS_ROUND_BEGIN:
433 session->current_round = CONSENSUS_ROUND_EXCHANGE;
434 session->exp_round = 0;
435 subround_over (session, NULL);
437 case CONSENSUS_ROUND_EXCHANGE:
438 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
439 session->local_peer_idx);
440 session->current_round = CONSENSUS_ROUND_FINISH;
441 GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
450 * Create a new permutation for the session's peers in session->shuffle.
451 * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
452 * both the global session id and the current round index.
454 * @param session the session to create the new permutation for
457 shuffle (struct ConsensusSession *session)
460 uint32_t randomness[session->num_peers-1];
462 if (NULL == session->shuffle)
463 session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
464 if (NULL == session->shuffle_inv)
465 session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
467 GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
468 &session->exp_round, sizeof (uint32_t),
469 &session->global_id, sizeof (struct GNUNET_HashCode),
472 for (i = 0; i < session->num_peers; i++)
473 session->shuffle[i] = i;
475 for (i = session->num_peers - 1; i > 0; i--)
479 x = randomness[i-1] % session->num_peers;
480 tmp = session->shuffle[x];
481 session->shuffle[x] = session->shuffle[i];
482 session->shuffle[i] = tmp;
485 /* create the inverse */
486 for (i = 0; i < session->num_peers; i++)
487 session->shuffle_inv[session->shuffle[i]] = i;
492 * Find and set the partner_incoming and partner_outgoing of our peer,
493 * one of them may not exist (and thus set to NULL) if the number of peers
494 * in the session is not a power of two.
496 * @param session the consensus session
499 find_partners (struct ConsensusSession *session)
502 unsigned int num_ghosts;
503 unsigned int largest_arc;
506 /* shuffled local index */
507 int my_idx = session->shuffle[session->local_peer_idx];
509 /* distance to neighboring peer in current subround */
510 arc = 1 << session->exp_subround;
512 while (largest_arc < session->num_peers)
514 num_ghosts = largest_arc - session->num_peers;
515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
519 if (0 == (my_idx & arc))
521 /* we are outgoing */
522 partner_idx = (my_idx + arc) % session->num_peers;
523 session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
524 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
525 /* are we a 'ghost' of a peer that would exist if
526 * the number of peers was a power of two, and thus have to partner
527 * with an additional peer?
529 if (my_idx < num_ghosts)
531 int ghost_partner_idx;
532 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
533 ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
534 /* platform dependent; modulo sometimes returns negative values */
535 if (ghost_partner_idx < 0)
536 ghost_partner_idx += session->num_peers;
537 /* we only need to have a ghost partner if the partner is outgoing */
538 if (0 == (ghost_partner_idx & arc))
540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
541 session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
542 session->partner_incoming->exp_subround_finished = GNUNET_NO;
546 session->partner_incoming = NULL;
549 /* we only have an incoming connection */
550 partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
552 partner_idx += session->num_peers;
553 session->partner_outgoing = NULL;
554 session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
555 session->partner_incoming->exp_subround_finished = GNUNET_NO;
560 * Callback for set operation results. Called for each element
564 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
565 * @param status see enum GNUNET_SET_Status
568 set_result_cb (void *cls,
569 const struct GNUNET_SET_Element *element,
570 enum GNUNET_SET_Status status)
572 struct ConsensusPeerInformation *cpi = cls;
573 unsigned int remote_idx = cpi - cpi->session->info;
574 unsigned int local_idx = cpi->session->local_peer_idx;
576 GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
577 (cpi == cpi->session->partner_incoming));
581 case GNUNET_SET_STATUS_OK:
582 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
583 local_idx, remote_idx);
585 case GNUNET_SET_STATUS_FAILURE:
586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
587 local_idx, remote_idx);
590 case GNUNET_SET_STATUS_HALF_DONE:
591 case GNUNET_SET_STATUS_DONE:
592 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
593 local_idx, remote_idx);
594 cpi->exp_subround_finished = GNUNET_YES;
596 if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
600 subround_over (cpi->session, NULL);
604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
613 switch (cpi->session->current_round)
615 case CONSENSUS_ROUND_EXCHANGE:
616 GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
626 * Compare the round the session is in with the round of the given context message.
628 * @param session a consensus session
629 * @param ri a round context message
630 * @return 0 if it's the same round, -1 if the session is in an earlier round,
631 * 1 if the session is in a later round
634 rounds_compare (struct ConsensusSession *session,
635 struct RoundInfo* ri)
637 if (session->current_round < ri->round)
639 if (session->current_round > ri->round)
641 if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
643 if (session->exp_round < ri->exp_round)
645 if (session->exp_round > ri->exp_round)
647 if (session->exp_subround < ri->exp_subround)
649 if (session->exp_subround < ri->exp_subround)
653 /* comparing rounds when we are not in a exp round */
659 * Do the next subround in the exp-scheme.
660 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
662 * @param cls the session
663 * @param tc task context, for when this task is invoked by the scheduler,
664 * NULL if invoked for another reason
667 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
669 struct ConsensusSession *session;
672 /* don't kick off next subround if we're shutting down */
673 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
677 if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
679 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
680 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
683 if (session->exp_round >= NUM_EXP_ROUNDS)
685 round_over (session, NULL);
689 if (session->exp_round == 0)
691 /* initialize everything for the log-rounds */
692 session->exp_round = 1;
693 session->exp_subround = 0;
694 if (NULL == session->shuffle)
695 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
696 if (NULL == session->shuffle_inv)
697 session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
698 for (i = 0; i < session->num_peers; i++)
699 session->shuffle[i] = session->shuffle_inv[i] = i;
701 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
703 /* subrounds done, start new log-round */
704 session->exp_round++;
705 session->exp_subround = 0;
710 session->exp_subround++;
713 /* determine the incoming and outgoing partner */
714 find_partners (session);
716 GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
717 GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
719 /* initiate set operation with the outgoing partner */
720 if (NULL != session->partner_outgoing)
722 struct GNUNET_CONSENSUS_RoundContextMessage *msg;
723 msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
724 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
725 msg->header.size = htons (sizeof *msg);
726 msg->round = htonl (session->current_round);
727 msg->exp_round = htonl (session->exp_round);
728 msg->exp_subround = htonl (session->exp_subround);
730 if (NULL != session->partner_outgoing->set_op)
732 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
734 session->partner_outgoing->set_op =
735 GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
737 (struct GNUNET_MessageHeader *) msg,
739 GNUNET_SET_RESULT_ADDED,
740 set_result_cb, session->partner_outgoing);
742 GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
745 /* commit to the delayed set operation */
746 if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
748 int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
750 if (NULL != session->partner_incoming->set_op)
752 GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
753 session->partner_incoming->set_op = NULL;
757 GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set);
758 session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
759 session->partner_incoming->delayed_set_op = NULL;
760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
761 session->local_peer_idx, (int) (session->partner_incoming - session->info));
765 /* this should not happen -- a round has been skipped! */
770 #ifdef GNUNET_EXTRA_LOGGING
774 if (session->partner_outgoing == NULL)
777 out = (int) (session->partner_outgoing - session->info);
778 if (session->partner_incoming == NULL)
781 in = (int) (session->partner_incoming - session->info);
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
783 session->exp_round, session->exp_subround, in, out);
785 #endif /* GNUNET_EXTRA_LOGGING */
791 * Search peer in the list of peers in session.
793 * @param peer peer to find
794 * @param session session with peer
795 * @return index of peer, -1 if peer is not in session
798 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
801 for (i = 0; i < session->num_peers; i++)
802 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
809 * Compute a global, (hopefully) unique consensus session id,
810 * from the local id of the consensus session, and the identities of all participants.
811 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
812 * exactly the same peers, the global id will be different.
814 * @param session session to generate the global id for
815 * @param session_id local id of the consensus session
818 compute_global_id (struct ConsensusSession *session,
819 const struct GNUNET_HashCode *session_id)
822 struct GNUNET_HashCode tmp;
823 struct GNUNET_HashCode phash;
825 /* FIXME: use kdf? */
827 session->global_id = *session_id;
828 for (i = 0; i < session->num_peers; ++i)
830 GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
831 GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
832 session->global_id = tmp;
833 GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
834 session->global_id = tmp;
840 * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have
841 * the correct signature to be used with e.g. qsort.
842 * We use this function instead.
844 * @param h1 some hash code
845 * @param h2 some hash code
846 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
849 hash_cmp (const void *h1, const void *h2)
851 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
856 * Create the sorted list of peers for the session,
857 * add the local peer if not in the join message.
860 initialize_session_peer_list (struct ConsensusSession *session,
861 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
863 unsigned int local_peer_in_list;
864 uint32_t listed_peers;
865 const struct GNUNET_PeerIdentity *msg_peers;
866 struct GNUNET_PeerIdentity *peers;
869 GNUNET_assert (NULL != join_msg);
871 /* peers in the join message, may or may not include the local peer */
872 listed_peers = ntohl (join_msg->num_peers);
874 session->num_peers = listed_peers;
876 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
878 local_peer_in_list = GNUNET_NO;
879 for (i = 0; i < listed_peers; i++)
881 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
883 local_peer_in_list = GNUNET_YES;
888 if (GNUNET_NO == local_peer_in_list)
889 session->num_peers++;
891 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
893 if (GNUNET_NO == local_peer_in_list)
894 peers[session->num_peers - 1] = my_peer;
896 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
897 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
899 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
901 for (i = 0; i < session->num_peers; ++i)
903 /* initialize back-references, so consensus peer information can
904 * be used as closure */
905 session->info[i].session = session;
906 session->info[i].peer_id = peers[i];
914 * Called when another peer wants to do a set operation with the
918 * @param other_peer the other peer
919 * @param context_msg message with application specific information from
921 * @param request request from the other peer, use GNUNET_SET_accept
922 * to accept it, otherwise the request will be refused
923 * Note that we don't use a return value here, as it is also
924 * necessary to specify the set we want to do the operation with,
925 * whith sometimes can be derived from the context message.
926 * Also necessary to specify the timeout.
929 set_listen_cb (void *cls,
930 const struct GNUNET_PeerIdentity *other_peer,
931 const struct GNUNET_MessageHeader *context_msg,
932 struct GNUNET_SET_Request *request)
934 struct ConsensusSession *session = cls;
935 struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
936 struct ConsensusPeerInformation *cpi;
937 struct GNUNET_SET_OperationHandle *set_op;
938 struct RoundInfo round_info;
942 if (NULL == context_msg)
948 index = get_peer_idx (other_peer, session);
956 round_info.round = ntohl (msg->round);
957 round_info.exp_round = ntohl (msg->exp_round);
958 round_info.exp_subround = ntohl (msg->exp_subround);
960 cpi = &session->info[index];
962 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
964 switch (session->current_round)
966 case CONSENSUS_ROUND_BEGIN:
967 /* we're in the begin round, so requests for the exchange round may
968 * come in, they will be delayed for now! */
969 case CONSENSUS_ROUND_EXCHANGE:
970 cmp = rounds_compare (session, &round_info);
973 /* the other peer is too late */
977 /* kill old request, if any. this is legal,
978 * as the other peer would not make a new request if it would want to
979 * complete the old one! */
980 if (NULL != cpi->set_op)
982 GNUNET_SET_operation_cancel (cpi->set_op);
985 set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
986 set_result_cb, &session->info[index]);
989 cpi->set_op = set_op;
990 GNUNET_SET_commit (set_op, session->element_set);
991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
995 /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */
996 cpi->delayed_set_op = set_op;
997 cpi->delayed_round_info = round_info;
998 cpi->exp_subround_finished = GNUNET_YES;
999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
1003 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
1004 session->local_peer_idx, session->current_round, index);
1005 GNUNET_break_op (0);
1012 * Initialize the session, continue receiving messages from the owning client
1014 * @param session the session to initialize
1015 * @param join_msg the join message from the client
1018 initialize_session (struct ConsensusSession *session,
1019 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1021 struct ConsensusSession *other_session;
1023 initialize_session_peer_list (session, join_msg);
1024 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1025 compute_global_id (session, &join_msg->session_id);
1027 /* check if some local client already owns the session.
1028 * it is only legal to have a session with an existing global id
1029 * if all other sessions with this global id are finished.*/
1030 other_session = sessions_head;
1031 while (NULL != other_session)
1033 if ((other_session != session) &&
1034 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1036 if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1039 destroy_session (session);
1044 other_session = other_session->next;
1047 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
1048 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
1050 session->local_peer_idx = get_peer_idx (&my_peer, session);
1051 GNUNET_assert (-1 != session->local_peer_idx);
1052 session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1053 GNUNET_assert (NULL != session->element_set);
1054 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1055 &session->global_id,
1056 set_listen_cb, session);
1057 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1058 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1062 static struct ConsensusSession *
1063 get_session_by_client (struct GNUNET_SERVER_Client *client)
1065 struct ConsensusSession *session;
1067 session = sessions_head;
1068 while (NULL != session)
1070 if (session->client == client)
1072 session = session->next;
1079 * Called when a client wants to join a consensus session.
1082 * @param client client that sent the message
1083 * @param m message sent by the client
1086 client_join (void *cls,
1087 struct GNUNET_SERVER_Client *client,
1088 const struct GNUNET_MessageHeader *m)
1090 struct ConsensusSession *session;
1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1094 session = get_session_by_client (client);
1095 if (NULL != session)
1098 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1101 session = GNUNET_new (struct ConsensusSession);
1102 session->client = client;
1103 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1104 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1105 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1106 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1108 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1113 * Called when a client performs an insert operation.
1115 * @param cls (unused)
1116 * @param client client handle
1117 * @param m message sent by the client
1120 client_insert (void *cls,
1121 struct GNUNET_SERVER_Client *client,
1122 const struct GNUNET_MessageHeader *m)
1124 struct ConsensusSession *session;
1125 struct GNUNET_CONSENSUS_ElementMessage *msg;
1126 struct GNUNET_SET_Element *element;
1127 ssize_t element_size;
1129 session = get_session_by_client (client);
1131 if (NULL == session)
1134 GNUNET_SERVER_client_disconnect (client);
1138 if (CONSENSUS_ROUND_BEGIN != session->current_round)
1141 GNUNET_SERVER_client_disconnect (client);
1145 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1146 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1147 if (element_size < 0)
1153 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1154 element->type = msg->element_type;
1155 element->size = element_size;
1156 memcpy (&element[1], &msg[1], element_size);
1157 element->data = &element[1];
1158 GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1159 GNUNET_free (element);
1160 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1167 * Called when a client performs the conclude operation.
1169 * @param cls (unused)
1170 * @param client client handle
1171 * @param message message sent by the client
1174 client_conclude (void *cls,
1175 struct GNUNET_SERVER_Client *client,
1176 const struct GNUNET_MessageHeader *message)
1178 struct ConsensusSession *session;
1180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1181 session = get_session_by_client (client);
1182 if (NULL == session)
1184 /* client not found */
1186 GNUNET_SERVER_client_disconnect (client);
1189 if (CONSENSUS_ROUND_BEGIN != session->current_round)
1191 /* client requested conclude twice */
1195 if (session->num_peers <= 1)
1197 session->current_round = CONSENSUS_ROUND_FINISH;
1198 GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1202 /* the 'begin' round is over, start with the next, actual round */
1203 round_over (session, NULL);
1206 GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1207 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1212 * Called to clean up, after a shutdown has been requested.
1214 * @param cls closure
1215 * @param tc context information (why was this task triggered now)
1218 shutdown_task (void *cls,
1219 const struct GNUNET_SCHEDULER_TaskContext *tc)
1221 while (NULL != sessions_head)
1222 destroy_session (sessions_head);
1224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1229 * Clean up after a client after it is
1230 * disconnected (either by us or by itself)
1232 * @param cls closure, unused
1233 * @param client the client to clean up after
1236 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1238 struct ConsensusSession *session;
1240 session = get_session_by_client (client);
1241 if (NULL == session)
1243 if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1244 (CONSENSUS_ROUND_FINISH == session->current_round))
1245 destroy_session (session);
1247 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1252 * Start processing consensus requests.
1254 * @param cls closure
1255 * @param server the initialized server
1256 * @param c configuration to use
1259 run (void *cls, struct GNUNET_SERVER_Handle *server,
1260 const struct GNUNET_CONFIGURATION_Handle *c)
1262 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1263 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1264 sizeof (struct GNUNET_MessageHeader)},
1265 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1266 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1272 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
1274 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1276 GNUNET_SCHEDULER_shutdown ();
1279 GNUNET_SERVER_add_handlers (server, server_handlers);
1280 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1281 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1282 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1287 * The main function for the consensus service.
1289 * @param argc number of arguments from the command line
1290 * @param argv command line arguments
1291 * @return 0 ok, 1 on error
1294 main (int argc, char *const *argv)
1297 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1298 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1299 return (GNUNET_OK == ret) ? 0 : 1;