2 This file is part of GNUnet
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
38 * Log macro that prefixes the local peer and the peer we are in contact with.
40 * @param kind log level
41 * @param cpi ConsensusPeerInformation of the partner peer
42 * @param m log message
44 #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
45 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
49 * Number of exponential rounds, used in the exp and completion round.
51 #define NUM_EXP_REPETITIONS 4
54 /* forward declarations */
56 /* mutual recursion with struct ConsensusSession */
57 struct ConsensusPeerInformation;
59 /* mutual recursion with round_over */
61 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
65 * Describes the current round a consensus session is in.
70 * Not started the protocol yet.
72 CONSENSUS_ROUND_BEGIN=0,
74 * Distribution of elements with the exponential scheme.
76 CONSENSUS_ROUND_EXCHANGE,
78 * Collect and distribute missing values.
80 CONSENSUS_ROUND_COMPLETION,
82 * Consensus concluded. After timeout and finished communication with client,
83 * consensus session will be destroyed.
85 CONSENSUS_ROUND_FINISH
90 * Information about the current round.
95 * The current main round.
97 enum ConsensusRound round;
99 * The current exp round repetition, valid if
100 * the main round is an exp round.
102 uint32_t exp_repetition;
104 * The current exp subround, valid if
105 * the main round is an exp round.
107 uint32_t exp_subround;
112 * A consensus session consists of one local client and the remote authorities.
114 struct ConsensusSession
117 * Consensus sessions are kept in a DLL.
119 struct ConsensusSession *next;
122 * Consensus sessions are kept in a DLL.
124 struct ConsensusSession *prev;
127 * Global consensus identification, computed
128 * from the session id and participating authorities.
130 struct GNUNET_HashCode global_id;
133 * Client that inhabits the session
135 struct GNUNET_SERVER_Client *client;
138 * Queued messages to the client.
140 struct GNUNET_MQ_Handle *client_mq;
143 * Time when the conclusion of the consensus should begin.
145 struct GNUNET_TIME_Absolute conclude_start;
148 * Timeout for all rounds together, single rounds will schedule a timeout task
149 * with a fraction of the conclude timeout.
150 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
152 struct GNUNET_TIME_Absolute conclude_deadline;
155 * Timeout task identifier for the current round or subround.
157 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
160 * Number of other peers in the consensus.
162 unsigned int num_peers;
165 * Information about the other peers,
168 struct ConsensusPeerInformation *info;
171 * Index of the local peer in the peers array
173 unsigned int local_peer_idx;
178 enum ConsensusRound current_round;
181 * Permutation of peers for the current round,
186 * Inverse permutation of peers for the current round,
188 uint32_t *shuffle_inv;
191 * Current round of the exponential scheme.
193 uint32_t exp_repetition;
196 * Current sub-round of the exponential scheme.
198 uint32_t exp_subround;
201 * The partner for the current exp-round.
202 * The local peer will initiate the set reconciliation with the
205 struct ConsensusPeerInformation *partner_outgoing;
208 * The partner for the current exp-round
209 * The incoming peer will initiate the set reconciliation with
212 struct ConsensusPeerInformation *partner_incoming;
215 * The consensus set of this session.
217 struct GNUNET_SET_Handle *element_set;
220 * Listener for requests from other peers.
221 * Uses the session's global id as app id.
223 struct GNUNET_SET_ListenHandle *set_listener;
228 * Information about a peer that is in a consensus session.
230 struct ConsensusPeerInformation
233 * Peer identitty of the peer in the consensus session
235 struct GNUNET_PeerIdentity peer_id;
238 * Back-reference to the consensus session,
239 * to that ConsensusPeerInformation can be used as a closure
241 struct ConsensusSession *session;
244 * Have we finished the set operation for this (sub-)round?
249 * Set operation we are currently executing with this peer.
251 struct GNUNET_SET_OperationHandle *set_op;
254 * Set operation we are planning on executing with this peer.
256 struct GNUNET_SET_OperationHandle *delayed_set_op;
259 * Info about the round of the delayed set operation.
261 struct RoundInfo delayed_round_info;
266 * Linked list of sessions this peer participates in.
268 static struct ConsensusSession *sessions_head;
271 * Linked list of sessions this peer participates in.
273 static struct ConsensusSession *sessions_tail;
276 * Configuration of the consensus service.
278 static const struct GNUNET_CONFIGURATION_Handle *cfg;
281 * Handle to the server for this service.
283 static struct GNUNET_SERVER_Handle *srv;
286 * Peer that runs this service.
288 static struct GNUNET_PeerIdentity my_peer;
292 * Check if the current subround has finished.
293 * Must only be called when an exp-round is the current round.
295 * @param session session to check for exp-round completion
296 * @return GNUNET_YES if the subround has finished,
300 have_exp_subround_finished (const struct ConsensusSession *session)
304 GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round);
307 if ( (NULL != session->partner_outgoing) &&
308 (GNUNET_NO == session->partner_outgoing->set_op_finished) )
310 if ( (NULL != session->partner_incoming) &&
311 (GNUNET_NO == session->partner_incoming->set_op_finished) )
313 if (0 == not_finished)
320 * Destroy a session, free all resources associated with it.
322 * @param session the session to destroy
325 destroy_session (struct ConsensusSession *session)
329 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
330 if (NULL != session->element_set)
332 GNUNET_SET_destroy (session->element_set);
333 session->element_set = NULL;
335 if (NULL != session->set_listener)
337 GNUNET_SET_listen_cancel (session->set_listener);
338 session->set_listener = NULL;
340 if (NULL != session->client_mq)
342 GNUNET_MQ_destroy (session->client_mq);
343 session->client_mq = NULL;
345 if (NULL != session->client)
347 GNUNET_SERVER_client_disconnect (session->client);
348 session->client = NULL;
350 if (NULL != session->shuffle)
352 GNUNET_free (session->shuffle);
353 session->shuffle = NULL;
355 if (NULL != session->shuffle_inv)
357 GNUNET_free (session->shuffle_inv);
358 session->shuffle_inv = NULL;
360 if (NULL != session->info)
362 for (i = 0; i < session->num_peers; i++)
364 struct ConsensusPeerInformation *cpi;
365 cpi = &session->info[i];
366 if (NULL != cpi->set_op)
368 GNUNET_SET_operation_cancel (cpi->set_op);
372 GNUNET_free (session->info);
373 session->info = NULL;
375 GNUNET_free (session);
380 * Iterator for set elements.
383 * @param element the current element, NULL if all elements have been
385 * @return GNUNET_YES to continue iterating, GNUNET_NO to stop.
388 send_to_client_iter (void *cls,
389 const struct GNUNET_SET_Element *element)
391 struct ConsensusSession *session = cls;
392 struct GNUNET_MQ_Envelope *ev;
396 struct GNUNET_CONSENSUS_ElementMessage *m;
398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: got element for client\n",
399 session->local_peer_idx);
401 ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
402 m->element_type = htons (element->type);
403 memcpy (&m[1], element->data, element->size);
404 GNUNET_MQ_send (session->client_mq, ev);
408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished iterating elements for client\n",
409 session->local_peer_idx);
410 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
411 GNUNET_MQ_send (session->client_mq, ev);
418 * Start the next round.
419 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
421 * @param cls the session
422 * @param tc task context, for when this task is invoked by the scheduler,
423 * NULL if invoked for another reason
426 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
428 struct ConsensusSession *session;
432 /* don't kick off next round if we're shutting down */
433 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: round over\n", session->local_peer_idx);
440 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
442 if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
444 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
445 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
448 for (i = 0; i < session->num_peers; i++)
450 if (NULL != session->info[i].set_op)
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
453 session->local_peer_idx, i);
454 GNUNET_SET_operation_cancel (session->info[i].set_op);
455 session->info[i].set_op = NULL;
457 /* we're in the new round, nothing finished yet */
458 session->info[i].set_op_finished = GNUNET_NO;
461 switch (session->current_round)
463 case CONSENSUS_ROUND_BEGIN:
464 session->current_round = CONSENSUS_ROUND_EXCHANGE;
465 session->exp_repetition = 0;
466 subround_over (session, NULL);
468 case CONSENSUS_ROUND_EXCHANGE:
469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: finished, sending elements to client\n",
470 session->local_peer_idx);
471 session->current_round = CONSENSUS_ROUND_FINISH;
472 res = GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
473 if (GNUNET_SYSERR == res)
475 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: set invalid\n");
477 else if (GNUNET_NO == res)
479 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "can't iterate set: iterator already active\n");
489 * Create a new permutation for the session's peers in session->shuffle.
490 * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
491 * both the global session id and the current round index.
493 * @param session the session to create the new permutation for
496 shuffle (struct ConsensusSession *session)
499 uint32_t randomness[session->num_peers-1];
501 if (NULL == session->shuffle)
502 session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
503 if (NULL == session->shuffle_inv)
504 session->shuffle_inv = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle_inv));
506 GNUNET_CRYPTO_kdf (randomness, sizeof (randomness),
507 &session->exp_repetition, sizeof (uint32_t),
508 &session->global_id, sizeof (struct GNUNET_HashCode),
511 for (i = 0; i < session->num_peers; i++)
512 session->shuffle[i] = i;
514 for (i = session->num_peers - 1; i > 0; i--)
518 x = randomness[i-1] % session->num_peers;
519 tmp = session->shuffle[x];
520 session->shuffle[x] = session->shuffle[i];
521 session->shuffle[i] = tmp;
524 /* create the inverse */
525 for (i = 0; i < session->num_peers; i++)
526 session->shuffle_inv[session->shuffle[i]] = i;
531 * Find and set the partner_incoming and partner_outgoing of our peer,
532 * one of them may not exist (and thus set to NULL) if the number of peers
533 * in the session is not a power of two.
535 * @param session the consensus session
538 find_partners (struct ConsensusSession *session)
541 unsigned int num_ghosts;
542 unsigned int largest_arc;
545 /* shuffled local index */
546 int my_idx = session->shuffle[session->local_peer_idx];
548 /* distance to neighboring peer in current subround */
549 arc = 1 << session->exp_subround;
551 while (largest_arc < session->num_peers)
553 num_ghosts = largest_arc - session->num_peers;
554 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "largest arc: %u\n", largest_arc);
555 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "arc: %u\n", arc);
556 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "num ghosts: %u\n", num_ghosts);
558 if (0 == (my_idx & arc))
560 /* we are outgoing */
561 partner_idx = (my_idx + arc) % session->num_peers;
562 session->partner_outgoing = &session->info[session->shuffle_inv[partner_idx]];
563 GNUNET_assert (GNUNET_NO == session->partner_outgoing->set_op_finished);
564 /* are we a 'ghost' of a peer that would exist if
565 * the number of peers was a power of two, and thus have to partner
566 * with an additional peer?
568 if (my_idx < num_ghosts)
570 int ghost_partner_idx;
571 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers);
572 ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
573 /* platform dependent; modulo sometimes returns negative values */
574 if (ghost_partner_idx < 0)
575 ghost_partner_idx += session->num_peers;
576 /* we only need to have a ghost partner if the partner is outgoing */
577 if (0 == (ghost_partner_idx & arc))
579 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ghost partner is %d\n", ghost_partner_idx);
580 session->partner_incoming = &session->info[session->shuffle_inv[ghost_partner_idx]];
581 GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
585 session->partner_incoming = NULL;
588 /* we only have an incoming connection */
589 partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
591 partner_idx += session->num_peers;
592 session->partner_outgoing = NULL;
593 session->partner_incoming = &session->info[session->shuffle_inv[partner_idx]];
594 GNUNET_assert (GNUNET_NO == session->partner_incoming->set_op_finished);
599 * Callback for set operation results. Called for each element
603 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
604 * @param status see enum GNUNET_SET_Status
607 set_result_cb (void *cls,
608 const struct GNUNET_SET_Element *element,
609 enum GNUNET_SET_Status status)
611 struct ConsensusPeerInformation *cpi = cls;
612 unsigned int remote_idx = cpi - cpi->session->info;
613 unsigned int local_idx = cpi->session->local_peer_idx;
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u with status %u\n",
616 local_idx, remote_idx, (unsigned int) status);
618 GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
619 (cpi == cpi->session->partner_incoming));
623 case GNUNET_SET_STATUS_OK:
624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: element\n",
625 local_idx, remote_idx);
627 case GNUNET_SET_STATUS_FAILURE:
628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: failure\n",
629 local_idx, remote_idx);
632 case GNUNET_SET_STATUS_HALF_DONE:
633 case GNUNET_SET_STATUS_DONE:
634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: set result from P%u: done\n",
635 local_idx, remote_idx);
636 cpi->set_op_finished = GNUNET_YES;
638 if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: all reconciliations of subround done\n",
642 subround_over (cpi->session, NULL);
646 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting for further set results\n",
655 switch (cpi->session->current_round)
657 case CONSENSUS_ROUND_COMPLETION:
658 case CONSENSUS_ROUND_EXCHANGE:
659 GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
669 * Compare the round the session is in with the round of the given context message.
671 * @param session a consensus session
672 * @param ri a round context message
673 * @return 0 if it's the same round, -1 if the session is in an earlier round,
674 * 1 if the session is in a later round
677 rounds_compare (struct ConsensusSession *session,
678 struct RoundInfo* ri)
680 if (session->current_round < ri->round)
682 if (session->current_round > ri->round)
684 if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
686 if (session->exp_repetition < ri->exp_repetition)
688 if (session->exp_repetition > ri->exp_repetition)
690 if (session->exp_subround < ri->exp_subround)
692 if (session->exp_subround > ri->exp_subround)
696 /* other rounds have no subrounds / repetitions to compare */
702 * Do the next subround in the exp-scheme.
703 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
705 * @param cls the session
706 * @param tc task context, for when this task is invoked by the scheduler,
707 * NULL if invoked for another reason
710 subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
712 struct ConsensusSession *session;
713 struct GNUNET_TIME_Relative subround_timeout;
716 /* don't kick off next subround if we're shutting down */
717 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
722 GNUNET_assert (CONSENSUS_ROUND_EXCHANGE == session->current_round);
726 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
727 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "P%u: consensus subround timed out\n",
728 session->local_peer_idx);
732 if (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)
734 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
735 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
738 for (i = 0; i < session->num_peers; i++)
740 if (NULL != session->info[i].set_op)
742 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: canceling stray op with P%d\n",
743 session->local_peer_idx, i);
744 GNUNET_SET_operation_cancel (session->info[i].set_op);
745 session->info[i].set_op = NULL;
747 /* we're in the new round, nothing finished yet */
748 session->info[i].set_op_finished = GNUNET_NO;
751 if (session->exp_repetition >= NUM_EXP_REPETITIONS)
753 round_over (session, NULL);
757 if (session->exp_repetition == 0)
759 /* initialize everything for the log-rounds */
760 session->exp_repetition = 1;
761 session->exp_subround = 0;
762 if (NULL == session->shuffle)
763 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
764 if (NULL == session->shuffle_inv)
765 session->shuffle_inv = GNUNET_malloc ((sizeof (int)) * session->num_peers);
766 for (i = 0; i < session->num_peers; i++)
767 session->shuffle[i] = session->shuffle_inv[i] = i;
769 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
771 /* subrounds done, start new log-round */
772 session->exp_repetition++;
773 session->exp_subround = 0;
778 session->exp_subround++;
782 GNUNET_TIME_relative_divide (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline),
783 2 * NUM_EXP_REPETITIONS * ((int) ceil (log2 (session->num_peers))));
785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "subround timeout: %u ms\n", subround_timeout.rel_value_us / 1000);
787 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (subround_timeout, subround_over, session);
789 /* determine the incoming and outgoing partner */
790 find_partners (session);
792 GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]);
793 GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]);
795 /* initiate set operation with the outgoing partner */
796 if (NULL != session->partner_outgoing)
798 struct GNUNET_CONSENSUS_RoundContextMessage *msg;
799 msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
800 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
801 msg->header.size = htons (sizeof *msg);
802 msg->round = htonl (session->current_round);
803 msg->exp_repetition = htonl (session->exp_repetition);
804 msg->exp_subround = htonl (session->exp_subround);
806 if (NULL != session->partner_outgoing->set_op)
809 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
811 session->partner_outgoing->set_op =
812 GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
814 (struct GNUNET_MessageHeader *) msg,
816 GNUNET_SET_RESULT_ADDED,
817 set_result_cb, session->partner_outgoing);
819 if (GNUNET_OK != GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set))
822 session->partner_outgoing->set_op = NULL;
823 session->partner_outgoing->set_op_finished = GNUNET_YES;
827 /* commit to the delayed set operation */
828 if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op))
830 int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info);
832 if (NULL != session->partner_incoming->set_op)
835 GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
836 session->partner_incoming->set_op = NULL;
840 if (GNUNET_OK != GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set))
844 session->partner_incoming->set_op = session->partner_incoming->delayed_set_op;
845 session->partner_incoming->delayed_set_op = NULL;
846 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d resumed delayed round with P%d\n",
847 session->local_peer_idx, (int) (session->partner_incoming - session->info));
851 /* this should not happen -- a round has been skipped! */
856 #ifdef GNUNET_EXTRA_LOGGING
860 if (session->partner_outgoing == NULL)
863 out = (int) (session->partner_outgoing - session->info);
864 if (session->partner_incoming == NULL)
867 in = (int) (session->partner_incoming - session->info);
868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
869 session->exp_repetition, session->exp_subround, in, out);
871 #endif /* GNUNET_EXTRA_LOGGING */
877 * Search peer in the list of peers in session.
879 * @param peer peer to find
880 * @param session session with peer
881 * @return index of peer, -1 if peer is not in session
884 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
887 for (i = 0; i < session->num_peers; i++)
888 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
895 * Compute a global, (hopefully) unique consensus session id,
896 * from the local id of the consensus session, and the identities of all participants.
897 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
898 * exactly the same peers, the global id will be different.
900 * @param session session to generate the global id for
901 * @param session_id local id of the consensus session
904 compute_global_id (struct ConsensusSession *session,
905 const struct GNUNET_HashCode *session_id)
908 struct GNUNET_HashCode tmp;
909 struct GNUNET_HashCode phash;
911 /* FIXME: use kdf? */
913 session->global_id = *session_id;
914 for (i = 0; i < session->num_peers; ++i)
916 GNUNET_CRYPTO_hash (&session->info[i].peer_id, sizeof (struct GNUNET_PeerIdentity), &phash);
917 GNUNET_CRYPTO_hash_xor (&session->global_id, &phash, &tmp);
918 session->global_id = tmp;
919 GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
920 session->global_id = tmp;
926 * Compare two peer identities.
928 * @param h1 some peer identity
929 * @param h2 some peer identity
930 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
933 peer_id_cmp (const void *h1, const void *h2)
935 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
940 * Create the sorted list of peers for the session,
941 * add the local peer if not in the join message.
944 initialize_session_peer_list (struct ConsensusSession *session,
945 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
947 unsigned int local_peer_in_list;
948 uint32_t listed_peers;
949 const struct GNUNET_PeerIdentity *msg_peers;
950 struct GNUNET_PeerIdentity *peers;
953 GNUNET_assert (NULL != join_msg);
955 /* peers in the join message, may or may not include the local peer */
956 listed_peers = ntohl (join_msg->num_peers);
958 session->num_peers = listed_peers;
960 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
962 local_peer_in_list = GNUNET_NO;
963 for (i = 0; i < listed_peers; i++)
965 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
967 local_peer_in_list = GNUNET_YES;
972 if (GNUNET_NO == local_peer_in_list)
973 session->num_peers++;
975 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
977 if (GNUNET_NO == local_peer_in_list)
978 peers[session->num_peers - 1] = my_peer;
980 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
981 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
983 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
985 for (i = 0; i < session->num_peers; ++i)
987 /* initialize back-references, so consensus peer information can
988 * be used as closure */
989 session->info[i].session = session;
990 session->info[i].peer_id = peers[i];
998 * Called when another peer wants to do a set operation with the
1001 * @param cls closure
1002 * @param other_peer the other peer
1003 * @param context_msg message with application specific information from
1005 * @param request request from the other peer, use GNUNET_SET_accept
1006 * to accept it, otherwise the request will be refused
1007 * Note that we don't use a return value here, as it is also
1008 * necessary to specify the set we want to do the operation with,
1009 * whith sometimes can be derived from the context message.
1010 * Also necessary to specify the timeout.
1013 set_listen_cb (void *cls,
1014 const struct GNUNET_PeerIdentity *other_peer,
1015 const struct GNUNET_MessageHeader *context_msg,
1016 struct GNUNET_SET_Request *request)
1018 struct ConsensusSession *session = cls;
1019 struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
1020 struct ConsensusPeerInformation *cpi;
1021 struct GNUNET_SET_OperationHandle *set_op;
1022 struct RoundInfo round_info;
1026 if (NULL == context_msg)
1028 GNUNET_break_op (0);
1032 index = get_peer_idx (other_peer, session);
1036 GNUNET_break_op (0);
1040 round_info.round = ntohl (msg->round);
1041 round_info.exp_repetition = ntohl (msg->exp_repetition);
1042 round_info.exp_subround = ntohl (msg->exp_subround);
1044 cpi = &session->info[index];
1046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d got set request from P%d\n", session->local_peer_idx, index);
1048 switch (session->current_round)
1050 case CONSENSUS_ROUND_BEGIN:
1051 /* we're in the begin round, so requests for the exchange round may
1052 * come in, they will be delayed for now! */
1053 case CONSENSUS_ROUND_EXCHANGE:
1054 cmp = rounds_compare (session, &round_info);
1057 /* the other peer is too late */
1058 LOG_PP (GNUNET_ERROR_TYPE_DEBUG, cpi, "too late for the current round\n");
1061 /* kill old request, if any. this is legal,
1062 * as the other peer would not make a new request if it would want to
1063 * complete the old one! */
1064 if (NULL != cpi->set_op)
1066 LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got new request from same peer, canceling old one\n");
1067 GNUNET_SET_operation_cancel (cpi->set_op);
1070 set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
1071 set_result_cb, &session->info[index]);
1074 /* we're in exactly the right round for the incoming request */
1075 if (cpi != cpi->session->partner_incoming)
1077 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%u: got request from %u (with matching round), "
1078 "but incoming partner is %d\n", cpi->session->local_peer_idx, cpi - cpi->session->info,
1079 ((NULL == cpi->session->partner_incoming) ? -1 : (cpi->session->partner_incoming - cpi->session->info)));
1080 GNUNET_SET_operation_cancel (set_op);
1083 cpi->set_op = set_op;
1084 if (GNUNET_OK != GNUNET_SET_commit (set_op, session->element_set))
1088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d commited to set request from P%d\n", session->local_peer_idx, index);
1092 /* we still have wait until we have finished the current round,
1093 * as the other peer's round is larger */
1094 cpi->delayed_set_op = set_op;
1095 cpi->delayed_round_info = round_info;
1096 /* The current setop is finished, as we canceled the current setop above. */
1097 cpi->set_op_finished = GNUNET_YES;
1098 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d delaying set request from P%d\n", session->local_peer_idx, index);
1102 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "P%d got unexpected set request in round %d from P%d\n",
1103 session->local_peer_idx, session->current_round, index);
1104 GNUNET_break_op (0);
1111 * Initialize the session, continue receiving messages from the owning client
1113 * @param session the session to initialize
1114 * @param join_msg the join message from the client
1117 initialize_session (struct ConsensusSession *session,
1118 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
1120 struct ConsensusSession *other_session;
1122 initialize_session_peer_list (session, join_msg);
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
1124 compute_global_id (session, &join_msg->session_id);
1126 /* check if some local client already owns the session.
1127 * it is only legal to have a session with an existing global id
1128 * if all other sessions with this global id are finished.*/
1129 other_session = sessions_head;
1130 while (NULL != other_session)
1132 if ((other_session != session) &&
1133 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
1135 if (CONSENSUS_ROUND_FINISH != other_session->current_round)
1138 destroy_session (session);
1143 other_session = other_session->next;
1146 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
1147 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
1149 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
1150 (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
1152 session->local_peer_idx = get_peer_idx (&my_peer, session);
1153 GNUNET_assert (-1 != session->local_peer_idx);
1154 session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1155 GNUNET_assert (NULL != session->element_set);
1156 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
1157 &session->global_id,
1158 set_listen_cb, session);
1159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
1160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1164 static struct ConsensusSession *
1165 get_session_by_client (struct GNUNET_SERVER_Client *client)
1167 struct ConsensusSession *session;
1169 session = sessions_head;
1170 while (NULL != session)
1172 if (session->client == client)
1174 session = session->next;
1181 * Called when a client wants to join a consensus session.
1184 * @param client client that sent the message
1185 * @param m message sent by the client
1188 client_join (void *cls,
1189 struct GNUNET_SERVER_Client *client,
1190 const struct GNUNET_MessageHeader *m)
1192 struct ConsensusSession *session;
1194 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
1196 session = get_session_by_client (client);
1197 if (NULL != session)
1200 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1203 session = GNUNET_new (struct ConsensusSession);
1204 session->client = client;
1205 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
1206 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
1207 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
1208 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
1215 * Called when a client performs an insert operation.
1217 * @param cls (unused)
1218 * @param client client handle
1219 * @param m message sent by the client
1222 client_insert (void *cls,
1223 struct GNUNET_SERVER_Client *client,
1224 const struct GNUNET_MessageHeader *m)
1226 struct ConsensusSession *session;
1227 struct GNUNET_CONSENSUS_ElementMessage *msg;
1228 struct GNUNET_SET_Element *element;
1229 ssize_t element_size;
1231 session = get_session_by_client (client);
1233 if (NULL == session)
1236 GNUNET_SERVER_client_disconnect (client);
1240 if (CONSENSUS_ROUND_BEGIN != session->current_round)
1243 GNUNET_SERVER_client_disconnect (client);
1247 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1248 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1249 if (element_size < 0)
1255 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
1256 element->type = msg->element_type;
1257 element->size = element_size;
1258 memcpy (&element[1], &msg[1], element_size);
1259 element->data = &element[1];
1260 GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
1261 GNUNET_free (element);
1262 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1264 // GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
1269 * Called when a client performs the conclude operation.
1271 * @param cls (unused)
1272 * @param client client handle
1273 * @param message message sent by the client
1276 client_conclude (void *cls,
1277 struct GNUNET_SERVER_Client *client,
1278 const struct GNUNET_MessageHeader *message)
1280 struct ConsensusSession *session;
1282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
1283 session = get_session_by_client (client);
1284 if (NULL == session)
1286 /* client not found */
1288 GNUNET_SERVER_client_disconnect (client);
1291 if (CONSENSUS_ROUND_BEGIN != session->current_round)
1293 /* client requested conclude twice */
1297 if (session->num_peers <= 1)
1299 session->current_round = CONSENSUS_ROUND_FINISH;
1300 GNUNET_SET_iterate (session->element_set, send_to_client_iter, session);
1304 /* the 'begin' round is over, start with the next, actual round */
1305 round_over (session, NULL);
1308 GNUNET_assert (CONSENSUS_ROUND_BEGIN != session->current_round);
1309 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1314 * Called to clean up, after a shutdown has been requested.
1316 * @param cls closure
1317 * @param tc context information (why was this task triggered now)
1320 shutdown_task (void *cls,
1321 const struct GNUNET_SCHEDULER_TaskContext *tc)
1323 while (NULL != sessions_head)
1324 destroy_session (sessions_head);
1326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1331 * Clean up after a client after it is
1332 * disconnected (either by us or by itself)
1334 * @param cls closure, unused
1335 * @param client the client to clean up after
1338 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1340 struct ConsensusSession *session;
1342 session = get_session_by_client (client);
1343 if (NULL == session)
1345 if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1346 (CONSENSUS_ROUND_FINISH == session->current_round))
1348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, destroying session\n");
1349 destroy_session (session);
1352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1357 * Start processing consensus requests.
1359 * @param cls closure
1360 * @param server the initialized server
1361 * @param c configuration to use
1364 run (void *cls, struct GNUNET_SERVER_Handle *server,
1365 const struct GNUNET_CONFIGURATION_Handle *c)
1367 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1368 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
1369 sizeof (struct GNUNET_MessageHeader)},
1370 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1371 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
1377 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
1379 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
1381 GNUNET_SCHEDULER_shutdown ();
1384 GNUNET_SERVER_add_handlers (server, server_handlers);
1385 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1386 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
1387 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1392 * The main function for the consensus service.
1394 * @param argc number of arguments from the command line
1395 * @param argv command line arguments
1396 * @return 0 ok, 1 on error
1399 main (int argc, char *const *argv)
1402 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1404 return (GNUNET_OK == ret) ? 0 : 1;