2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold <flo@dold.me>
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
42 * Vote that nothing should change.
43 * This option is never voted explicitly.
47 * Vote that an element should be added.
51 * Vote that an element should be removed.
57 enum EarlyStoppingPhase
59 EARLY_STOPPING_NONE = 0,
60 EARLY_STOPPING_ONE_MORE = 1,
61 EARLY_STOPPING_DONE = 2,
65 GNUNET_NETWORK_STRUCT_BEGIN
68 * Tuple of integers that together
69 * identify a task uniquely.
73 * A value from 'enum PhaseKind'.
75 uint16_t kind GNUNET_PACKED;
78 * Number of the first peer
81 int16_t peer1 GNUNET_PACKED;
84 * Number of the second peer in canonical order.
86 int16_t peer2 GNUNET_PACKED;
89 * Repetition of the gradecast phase.
91 int16_t repetition GNUNET_PACKED;
94 * Leader in the gradecast phase.
96 * Can be different from both peer1 and peer2.
98 int16_t leader GNUNET_PACKED;
105 int set_kind GNUNET_PACKED;
106 int k1 GNUNET_PACKED;
107 int k2 GNUNET_PACKED;
114 struct GNUNET_SET_Handle *h;
116 * GNUNET_YES if the set resulted
117 * from applying a referendum with contested
126 int diff_kind GNUNET_PACKED;
127 int k1 GNUNET_PACKED;
128 int k2 GNUNET_PACKED;
133 int rfn_kind GNUNET_PACKED;
134 int k1 GNUNET_PACKED;
135 int k2 GNUNET_PACKED;
139 GNUNET_NETWORK_STRUCT_END
143 PHASE_KIND_ALL_TO_ALL,
144 PHASE_KIND_ALL_TO_ALL_2,
145 PHASE_KIND_GRADECAST_LEADER,
146 PHASE_KIND_GRADECAST_ECHO,
147 PHASE_KIND_GRADECAST_ECHO_GRADE,
148 PHASE_KIND_GRADECAST_CONFIRM,
149 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
151 * Apply a repetition of the all-to-all
152 * gradecast to the current set.
154 PHASE_KIND_APPLY_REP,
164 * Last result set from a gradecast
166 SET_KIND_LAST_GRADECAST,
167 SET_KIND_LEADER_PROPOSAL,
168 SET_KIND_ECHO_RESULT,
174 DIFF_KIND_LEADER_PROPOSAL,
175 DIFF_KIND_LEADER_CONSENSUS,
176 DIFF_KIND_GRADECAST_RESULT,
184 RFN_KIND_GRADECAST_RESULT
190 struct SetKey input_set;
192 struct SetKey output_set;
193 struct RfnKey output_rfn;
194 struct DiffKey output_diff;
198 int transceive_contested;
200 struct GNUNET_SET_OperationHandle *op;
206 struct SetKey input_set;
210 * Closure for both @a start_task
211 * and @a cancel_task.
215 struct SetOpCls setop;
216 struct FinishCls finish;
221 typedef void (*TaskFunc) (struct TaskEntry *task);
224 * Node in the consensus task graph.
239 union TaskFuncCls cls;
246 * All steps of one session are in a
247 * linked list for easier deallocation.
252 * All steps of one session are in a
253 * linked list for easier deallocation.
257 struct ConsensusSession *session;
260 * Tasks that this step is composed of.
262 struct TaskEntry **tasks;
263 unsigned int tasks_len;
264 unsigned int tasks_cap;
266 unsigned int finished_tasks;
269 * Tasks that have this task as dependency.
271 * We store pointers to subordinates rather
272 * than to prerequisites since it makes
273 * tracking the readiness of a task easier.
275 struct Step **subordinates;
276 unsigned int subordinates_len;
277 unsigned int subordinates_cap;
280 * Counter for the prerequisites of
283 size_t pending_prereq;
286 * Task that will run this step despite
287 * any pending prerequisites.
289 struct GNUNET_SCHEDULER_Task *timeout_task;
291 unsigned int is_running;
293 unsigned int is_finished;
296 * Synchrony round of the task.
297 * Determines the deadline for the task.
302 * Human-readable name for
303 * the task, used for debugging.
308 * When we're doing an early finish, how should this step be
310 * If GNUNET_YES, the step will be marked as finished
311 * without actually running its tasks.
312 * Otherwise, the step will still be run even after
315 * Note that a task may never be finished early if
316 * it is already running.
318 int early_finishable;
322 struct RfnElementInfo
324 const struct GNUNET_SET_Element *element;
327 * GNUNET_YES if the peer votes for the proposal.
332 * Proposal for this element,
333 * can only be VOTE_ADD or VOTE_REMOVE.
335 enum ReferendumVote proposal;
339 struct ReferendumEntry
344 * Elements where there is at least one proposed change.
346 * Maps the hash of the GNUNET_SET_Element
347 * to 'struct RfnElementInfo'.
349 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
351 unsigned int num_peers;
354 * Stores, for every peer in the session,
355 * whether the peer finished the whole referendum.
357 * Votes from peers are only counted if they're
358 * marked as commited (#GNUNET_YES) in the referendum.
360 * Otherwise (#GNUNET_NO), the requested changes are
361 * not counted for majority votes or thresholds.
367 * Contestation state of the peer. If a peer is contested, the values it
368 * contributed are still counted for applying changes, but the grading is
375 struct DiffElementInfo
377 const struct GNUNET_SET_Element *element;
380 * Positive weight for 'add', negative
381 * weights for 'remove'.
393 struct GNUNET_CONTAINER_MultiHashMap *changes;
398 struct SetHandle *prev;
399 struct SetHandle *next;
401 struct GNUNET_SET_Handle *h;
407 * A consensus session consists of one local client and the remote authorities.
409 struct ConsensusSession
412 * Consensus sessions are kept in a DLL.
414 struct ConsensusSession *next;
417 * Consensus sessions are kept in a DLL.
419 struct ConsensusSession *prev;
421 unsigned int num_client_insert_pending;
423 struct GNUNET_CONTAINER_MultiHashMap *setmap;
424 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
425 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
428 * Array of peers with length 'num_peers'.
430 int *peers_blacklisted;
433 * Mapping from (hashed) TaskKey to TaskEntry.
435 * We map the application_id for a round to the task that should be
436 * executed, so we don't have to go through all task whenever we get
437 * an incoming set op request.
439 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
441 struct Step *steps_head;
442 struct Step *steps_tail;
444 int conclude_started;
449 * Global consensus identification, computed
450 * from the session id and participating authorities.
452 struct GNUNET_HashCode global_id;
455 * Client that inhabits the session
457 struct GNUNET_SERVICE_Client *client;
460 * Queued messages to the client.
462 struct GNUNET_MQ_Handle *client_mq;
465 * Time when the conclusion of the consensus should begin.
467 struct GNUNET_TIME_Absolute conclude_start;
470 * Timeout for all rounds together, single rounds will schedule a timeout task
471 * with a fraction of the conclude timeout.
472 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
474 struct GNUNET_TIME_Absolute conclude_deadline;
476 struct GNUNET_PeerIdentity *peers;
479 * Number of other peers in the consensus.
481 unsigned int num_peers;
484 * Index of the local peer in the peers array
486 unsigned int local_peer_idx;
489 * Listener for requests from other peers.
490 * Uses the session's global id as app id.
492 struct GNUNET_SET_ListenHandle *set_listener;
495 * State of our early stopping scheme.
500 * Our set size from the first round.
504 uint64_t *first_sizes_received;
507 * Bounded Eppstein lower bound.
509 uint64_t lower_bound;
511 struct SetHandle *set_handles_head;
512 struct SetHandle *set_handles_tail;
516 * Linked list of sessions this peer participates in.
518 static struct ConsensusSession *sessions_head;
521 * Linked list of sessions this peer participates in.
523 static struct ConsensusSession *sessions_tail;
526 * Configuration of the consensus service.
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
531 * Peer that runs this service.
533 static struct GNUNET_PeerIdentity my_peer;
538 struct GNUNET_STATISTICS_Handle *statistics;
542 finish_task (struct TaskEntry *task);
546 run_ready_steps (struct ConsensusSession *session);
550 phasename (uint16_t phase)
554 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556 case PHASE_KIND_FINISH: return "FINISH";
557 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563 default: return "(unknown)";
569 setname (uint16_t kind)
573 case SET_KIND_CURRENT: return "CURRENT";
574 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575 case SET_KIND_NONE: return "NONE";
576 default: return "(unknown)";
581 rfnname (uint16_t kind)
585 case RFN_KIND_NONE: return "NONE";
586 case RFN_KIND_ECHO: return "ECHO";
587 case RFN_KIND_CONFIRM: return "CONFIRM";
588 default: return "(unknown)";
593 diffname (uint16_t kind)
597 case DIFF_KIND_NONE: return "NONE";
598 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601 default: return "(unknown)";
605 #ifdef GNUNET_EXTRA_LOGGING
609 debug_str_element (const struct GNUNET_SET_Element *el)
611 struct GNUNET_HashCode hash;
613 GNUNET_SET_element_hash (el, &hash);
615 return GNUNET_h2s (&hash);
619 debug_str_task_key (struct TaskKey *tk)
621 static char buf[256];
623 snprintf (buf, sizeof (buf),
624 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625 phasename (tk->kind), tk->peer1, tk->peer2,
626 tk->leader, tk->repetition);
632 debug_str_diff_key (struct DiffKey *dk)
634 static char buf[256];
636 snprintf (buf, sizeof (buf),
637 "DiffKey kind=%s, k1=%d, k2=%d",
638 diffname (dk->diff_kind), dk->k1, dk->k2);
644 debug_str_set_key (const struct SetKey *sk)
646 static char buf[256];
648 snprintf (buf, sizeof (buf),
649 "SetKey kind=%s, k1=%d, k2=%d",
650 setname (sk->set_kind), sk->k1, sk->k2);
657 debug_str_rfn_key (const struct RfnKey *rk)
659 static char buf[256];
661 snprintf (buf, sizeof (buf),
662 "RfnKey kind=%s, k1=%d, k2=%d",
663 rfnname (rk->rfn_kind), rk->k1, rk->k2);
668 #endif /* GNUNET_EXTRA_LOGGING */
672 * Send the final result set of the consensus to the client, element by
676 * @param element the current element, NULL if all elements have been
678 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
681 send_to_client_iter (void *cls,
682 const struct GNUNET_SET_Element *element)
684 struct TaskEntry *task = (struct TaskEntry *) cls;
685 struct ConsensusSession *session = task->step->session;
686 struct GNUNET_MQ_Envelope *ev;
690 struct GNUNET_CONSENSUS_ElementMessage *m;
691 const struct ConsensusElement *ce;
693 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
701 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702 "P%d: sending element %s to client\n",
703 session->local_peer_idx,
704 debug_str_element (element));
706 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
707 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
708 m->element_type = ce->payload_type;
709 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710 GNUNET_MQ_send (session->client_mq, ev);
714 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715 "P%d: finished iterating elements for client\n",
716 session->local_peer_idx);
717 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
718 GNUNET_MQ_send (session->client_mq, ev);
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
727 struct GNUNET_HashCode hash;
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "P%u: looking up set {%s}\n",
731 session->local_peer_idx,
732 debug_str_set_key (key));
734 GNUNET_assert (SET_KIND_NONE != key->set_kind);
735 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
743 struct GNUNET_HashCode hash;
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "P%u: looking up diff {%s}\n",
747 session->local_peer_idx,
748 debug_str_diff_key (key));
750 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
751 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
759 struct GNUNET_HashCode hash;
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "P%u: looking up rfn {%s}\n",
763 session->local_peer_idx,
764 debug_str_rfn_key (key));
766 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
767 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
773 diff_insert (struct DiffEntry *diff,
775 const struct GNUNET_SET_Element *element)
777 struct DiffElementInfo *di;
778 struct GNUNET_HashCode hash;
780 GNUNET_assert ( (1 == weight) || (-1 == weight));
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "diff_insert with element size %u\n",
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "hashing element\n");
789 GNUNET_SET_element_hash (element, &hash);
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
798 di = GNUNET_new (struct DiffElementInfo);
799 di->element = GNUNET_SET_element_dup (element);
800 GNUNET_assert (GNUNET_OK ==
801 GNUNET_CONTAINER_multihashmap_put (diff->changes,
803 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
811 rfn_commit (struct ReferendumEntry *rfn,
812 uint16_t commit_peer)
814 GNUNET_assert (commit_peer < rfn->num_peers);
816 rfn->peer_commited[commit_peer] = GNUNET_YES;
821 rfn_contest (struct ReferendumEntry *rfn,
822 uint16_t contested_peer)
824 GNUNET_assert (contested_peer < rfn->num_peers);
826 rfn->peer_contested[contested_peer] = GNUNET_YES;
831 rfn_noncontested (struct ReferendumEntry *rfn)
837 for (i = 0; i < rfn->num_peers; i++)
838 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
846 rfn_vote (struct ReferendumEntry *rfn,
847 uint16_t voting_peer,
848 enum ReferendumVote vote,
849 const struct GNUNET_SET_Element *element)
851 struct RfnElementInfo *ri;
852 struct GNUNET_HashCode hash;
854 GNUNET_assert (voting_peer < rfn->num_peers);
856 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857 since VOTE_KEEP is implicit in not voting. */
858 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
860 GNUNET_SET_element_hash (element, &hash);
861 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
865 ri = GNUNET_new (struct RfnElementInfo);
866 ri->element = GNUNET_SET_element_dup (element);
867 ri->votes = GNUNET_new_array (rfn->num_peers, int);
868 GNUNET_assert (GNUNET_OK ==
869 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
871 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
874 ri->votes[voting_peer] = GNUNET_YES;
880 task_other_peer (struct TaskEntry *task)
882 uint16_t me = task->step->session->local_peer_idx;
883 if (task->key.peer1 == me)
884 return task->key.peer2;
885 return task->key.peer1;
890 cmp_uint64_t (const void *pa, const void *pb)
892 uint64_t a = *(uint64_t *) pa;
893 uint64_t b = *(uint64_t *) pb;
904 * Callback for set operation results. Called for each element
908 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
909 * @param current_size current set size
910 * @param status see enum GNUNET_SET_Status
913 set_result_cb (void *cls,
914 const struct GNUNET_SET_Element *element,
915 uint64_t current_size,
916 enum GNUNET_SET_Status status)
918 struct TaskEntry *task = cls;
919 struct ConsensusSession *session = task->step->session;
920 struct SetEntry *output_set = NULL;
921 struct DiffEntry *output_diff = NULL;
922 struct ReferendumEntry *output_rfn = NULL;
923 unsigned int other_idx;
924 struct SetOpCls *setop;
925 const struct ConsensusElement *consensus_element = NULL;
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930 "P%u: got element of type %u, status %u\n",
931 session->local_peer_idx,
932 (unsigned) element->element_type,
934 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
935 consensus_element = element->data;
938 setop = &task->cls.setop;
941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942 "P%u: got set result for {%s}, status %u\n",
943 session->local_peer_idx,
944 debug_str_task_key (&task->key),
947 if (GNUNET_NO == task->is_started)
953 if (GNUNET_YES == task->is_finished)
959 other_idx = task_other_peer (task);
961 if (SET_KIND_NONE != setop->output_set.set_kind)
963 output_set = lookup_set (session, &setop->output_set);
964 GNUNET_assert (NULL != output_set);
967 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
969 output_diff = lookup_diff (session, &setop->output_diff);
970 GNUNET_assert (NULL != output_diff);
973 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
975 output_rfn = lookup_rfn (session, &setop->output_rfn);
976 GNUNET_assert (NULL != output_rfn);
979 if (GNUNET_YES == session->peers_blacklisted[other_idx])
981 /* Peer might have been blacklisted
982 by a gradecast running in parallel, ignore elements from now */
983 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
985 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
989 if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992 "P%u: got some marker\n",
993 session->local_peer_idx);
994 if ( (GNUNET_YES == setop->transceive_contested) &&
995 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
997 GNUNET_assert (NULL != output_rfn);
998 rfn_contest (output_rfn, task_other_peer (task));
1002 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1005 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006 "P%u: got size marker\n",
1007 session->local_peer_idx);
1010 struct ConsensusSizeElement *cse = (void *) consensus_element;
1012 if (cse->sender_index == other_idx)
1014 if (NULL == session->first_sizes_received)
1015 session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1018 uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019 qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020 session->lower_bound = copy[session->num_peers / 3 + 1];
1021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022 "P%u: lower bound %llu\n",
1023 session->local_peer_idx,
1024 (long long) session->lower_bound);
1035 case GNUNET_SET_STATUS_ADD_LOCAL:
1036 GNUNET_assert (NULL != consensus_element);
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "Adding element in Task {%s}\n",
1039 debug_str_task_key (&task->key));
1040 if (NULL != output_set)
1042 // FIXME: record pending adds, use callback
1043 GNUNET_SET_add_element (output_set->h,
1047 #ifdef GNUNET_EXTRA_LOGGING
1048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049 "P%u: adding element %s into set {%s} of task {%s}\n",
1050 session->local_peer_idx,
1051 debug_str_element (element),
1052 debug_str_set_key (&setop->output_set),
1053 debug_str_task_key (&task->key));
1056 if (NULL != output_diff)
1058 diff_insert (output_diff, 1, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061 "P%u: adding element %s into diff {%s} of task {%s}\n",
1062 session->local_peer_idx,
1063 debug_str_element (element),
1064 debug_str_diff_key (&setop->output_diff),
1065 debug_str_task_key (&task->key));
1068 if (NULL != output_rfn)
1070 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1071 #ifdef GNUNET_EXTRA_LOGGING
1072 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1074 session->local_peer_idx,
1075 debug_str_element (element),
1076 debug_str_rfn_key (&setop->output_rfn),
1077 debug_str_task_key (&task->key));
1080 // XXX: add result to structures in task
1082 case GNUNET_SET_STATUS_ADD_REMOTE:
1083 GNUNET_assert (NULL != consensus_element);
1084 if (GNUNET_YES == setop->do_not_remove)
1086 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1089 "Removing element in Task {%s}\n",
1090 debug_str_task_key (&task->key));
1091 if (NULL != output_set)
1093 // FIXME: record pending adds, use callback
1094 GNUNET_SET_remove_element (output_set->h,
1098 #ifdef GNUNET_EXTRA_LOGGING
1099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1100 "P%u: removing element %s from set {%s} of task {%s}\n",
1101 session->local_peer_idx,
1102 debug_str_element (element),
1103 debug_str_set_key (&setop->output_set),
1104 debug_str_task_key (&task->key));
1107 if (NULL != output_diff)
1109 diff_insert (output_diff, -1, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1111 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1112 "P%u: removing element %s from diff {%s} of task {%s}\n",
1113 session->local_peer_idx,
1114 debug_str_element (element),
1115 debug_str_diff_key (&setop->output_diff),
1116 debug_str_task_key (&task->key));
1119 if (NULL != output_rfn)
1121 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1122 #ifdef GNUNET_EXTRA_LOGGING
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1125 session->local_peer_idx,
1126 debug_str_element (element),
1127 debug_str_rfn_key (&setop->output_rfn),
1128 debug_str_task_key (&task->key));
1132 case GNUNET_SET_STATUS_DONE:
1133 // XXX: check first if any changes to the underlying
1134 // set are still pending
1135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136 "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1137 session->local_peer_idx,
1138 debug_str_task_key (&task->key),
1139 (unsigned int) task->step->finished_tasks,
1140 (unsigned int) task->step->tasks_len);
1141 if (NULL != output_rfn)
1143 rfn_commit (output_rfn, task_other_peer (task));
1145 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1147 session->first_size = current_size;
1151 case GNUNET_SET_STATUS_FAILURE:
1153 GNUNET_break_op (0);
1174 enum EvilnessSubType
1177 EVILNESS_SUB_REPLACEMENT,
1178 EVILNESS_SUB_NO_REPLACEMENT,
1183 enum EvilnessType type;
1184 enum EvilnessSubType subtype;
1190 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1192 if (0 == strcmp ("replace", evil_subtype_str))
1194 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1196 else if (0 == strcmp ("noreplace", evil_subtype_str))
1198 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1202 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1203 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1205 return GNUNET_SYSERR;
1212 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1216 char *evil_type_str = NULL;
1217 char *evil_subtype_str = NULL;
1219 GNUNET_assert (NULL != evil);
1221 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224 "P%u: no evilness\n",
1225 session->local_peer_idx);
1226 evil->type = EVILNESS_NONE;
1229 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230 "P%u: got evilness spec\n",
1231 session->local_peer_idx);
1233 for (field = strtok (evil_spec, "/");
1235 field = strtok (NULL, "/"))
1237 unsigned int peer_num;
1238 unsigned int evil_num;
1241 evil_type_str = NULL;
1242 evil_subtype_str = NULL;
1244 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1248 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1249 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1255 GNUNET_assert (NULL != evil_type_str);
1256 GNUNET_assert (NULL != evil_subtype_str);
1258 if (peer_num == session->local_peer_idx)
1260 if (0 == strcmp ("slack", evil_type_str))
1262 evil->type = EVILNESS_SLACK;
1264 if (0 == strcmp ("slack-a2a", evil_type_str))
1266 evil->type = EVILNESS_SLACK_A2A;
1268 else if (0 == strcmp ("cram-all", evil_type_str))
1270 evil->type = EVILNESS_CRAM_ALL;
1271 evil->num = evil_num;
1272 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1275 else if (0 == strcmp ("cram-lead", evil_type_str))
1277 evil->type = EVILNESS_CRAM_LEAD;
1278 evil->num = evil_num;
1279 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1282 else if (0 == strcmp ("cram-echo", evil_type_str))
1284 evil->type = EVILNESS_CRAM_ECHO;
1285 evil->num = evil_num;
1286 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1291 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1292 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1298 /* No GNUNET_free since memory was allocated by libc */
1299 free (evil_type_str);
1300 evil_type_str = NULL;
1301 evil_subtype_str = NULL;
1304 evil->type = EVILNESS_NONE;
1306 GNUNET_free (evil_spec);
1307 /* no GNUNET_free_non_null since it wasn't
1308 * allocated with GNUNET_malloc */
1309 if (NULL != evil_type_str)
1310 free (evil_type_str);
1311 if (NULL != evil_subtype_str)
1312 free (evil_subtype_str);
1319 * Commit the appropriate set for a
1323 commit_set (struct ConsensusSession *session,
1324 struct TaskEntry *task)
1326 struct SetEntry *set;
1327 struct SetOpCls *setop = &task->cls.setop;
1329 GNUNET_assert (NULL != setop->op);
1330 set = lookup_set (session, &setop->input_set);
1331 GNUNET_assert (NULL != set);
1333 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1335 struct GNUNET_SET_Element element;
1336 struct ConsensusElement ce = { 0 };
1337 ce.marker = CONSENSUS_MARKER_CONTESTED;
1339 element.size = sizeof (struct ConsensusElement);
1340 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1341 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1344 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1346 struct GNUNET_SET_Element element;
1347 struct ConsensusSizeElement cse = {
1351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1352 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1353 cse.size = GNUNET_htonll (session->first_size);
1354 cse.sender_index = session->local_peer_idx;
1355 element.data = &cse;
1356 element.size = sizeof (struct ConsensusSizeElement);
1357 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1358 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1364 struct Evilness evil;
1366 get_evilness (session, &evil);
1367 if (EVILNESS_NONE != evil.type)
1369 /* Useful for evaluation */
1370 GNUNET_STATISTICS_set (statistics,
1377 case EVILNESS_CRAM_ALL:
1378 case EVILNESS_CRAM_LEAD:
1379 case EVILNESS_CRAM_ECHO:
1380 /* We're not cramming elements in the
1381 all-to-all round, since that would just
1382 add more elements to the result set, but
1383 wouldn't test robustness. */
1384 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1386 GNUNET_SET_commit (setop->op, set->h);
1389 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1390 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1392 GNUNET_SET_commit (setop->op, set->h);
1395 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1397 GNUNET_SET_commit (setop->op, set->h);
1400 for (i = 0; i < evil.num; i++)
1402 struct GNUNET_SET_Element element;
1403 struct ConsensusStuffedElement se = {
1404 .ce.payload_type = 0,
1408 element.size = sizeof (struct ConsensusStuffedElement);
1409 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1411 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1413 /* Always generate a new element. */
1414 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1416 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1418 /* Always cram the same elements, derived from counter. */
1419 GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1425 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1426 #ifdef GNUNET_EXTRA_LOGGING
1427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1428 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1429 session->local_peer_idx,
1430 debug_str_element (&element),
1431 debug_str_set_key (&setop->input_set),
1432 debug_str_task_key (&task->key));
1435 GNUNET_STATISTICS_update (statistics,
1436 "# stuffed elements",
1439 GNUNET_SET_commit (setop->op, set->h);
1441 case EVILNESS_SLACK:
1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1443 "P%u: evil peer: slacking\n",
1444 (unsigned int) session->local_peer_idx);
1446 case EVILNESS_SLACK_A2A:
1447 if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1448 (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1450 struct GNUNET_SET_Handle *empty_set;
1451 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1452 GNUNET_SET_commit (setop->op, empty_set);
1453 GNUNET_SET_destroy (empty_set);
1457 GNUNET_SET_commit (setop->op, set->h);
1461 GNUNET_SET_commit (setop->op, set->h);
1466 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1468 GNUNET_SET_commit (setop->op, set->h);
1472 /* For our testcases, we don't want the blacklisted
1474 GNUNET_SET_operation_cancel (setop->op);
1483 put_diff (struct ConsensusSession *session,
1484 struct DiffEntry *diff)
1486 struct GNUNET_HashCode hash;
1488 GNUNET_assert (NULL != diff);
1490 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1491 GNUNET_assert (GNUNET_OK ==
1492 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1493 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1497 put_set (struct ConsensusSession *session,
1498 struct SetEntry *set)
1500 struct GNUNET_HashCode hash;
1502 GNUNET_assert (NULL != set->h);
1504 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1506 debug_str_set_key (&set->key));
1508 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1509 GNUNET_assert (GNUNET_SYSERR !=
1510 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1511 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1516 put_rfn (struct ConsensusSession *session,
1517 struct ReferendumEntry *rfn)
1519 struct GNUNET_HashCode hash;
1521 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1522 GNUNET_assert (GNUNET_OK ==
1523 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1524 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1530 task_cancel_reconcile (struct TaskEntry *task)
1532 /* not implemented yet */
1538 apply_diff_to_rfn (struct DiffEntry *diff,
1539 struct ReferendumEntry *rfn,
1540 uint16_t voting_peer,
1543 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1544 struct DiffElementInfo *di;
1546 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1548 while (GNUNET_YES ==
1549 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1551 (const void **) &di))
1555 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1559 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1563 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1570 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1572 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1579 diff_compose (struct DiffEntry *diff_1,
1580 struct DiffEntry *diff_2)
1582 struct DiffEntry *diff_new;
1583 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1584 struct DiffElementInfo *di;
1586 diff_new = diff_create ();
1588 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1589 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1591 diff_insert (diff_new, di->weight, di->element);
1593 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1595 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1596 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1598 diff_insert (diff_new, di->weight, di->element);
1600 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1606 struct ReferendumEntry *
1607 rfn_create (uint16_t size)
1609 struct ReferendumEntry *rfn;
1611 rfn = GNUNET_new (struct ReferendumEntry);
1612 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1613 rfn->peer_commited = GNUNET_new_array (size, int);
1614 rfn->peer_contested = GNUNET_new_array (size, int);
1615 rfn->num_peers = size;
1623 diff_destroy (struct DiffEntry *diff)
1625 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1632 * For a given majority, count what the outcome
1633 * is (add/remove/keep), and give the number
1634 * of peers that voted for this outcome.
1637 rfn_majority (const struct ReferendumEntry *rfn,
1638 const struct RfnElementInfo *ri,
1639 uint16_t *ret_majority,
1640 enum ReferendumVote *ret_vote)
1642 uint16_t votes_yes = 0;
1643 uint16_t num_commited = 0;
1646 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1647 "Computing rfn majority for element %s of rfn {%s}\n",
1648 debug_str_element (ri->element),
1649 debug_str_rfn_key (&rfn->key));
1651 for (i = 0; i < rfn->num_peers; i++)
1653 if (GNUNET_NO == rfn->peer_commited[i])
1657 if (GNUNET_YES == ri->votes[i])
1661 if (votes_yes > (num_commited) / 2)
1663 *ret_vote = ri->proposal;
1664 *ret_majority = votes_yes;
1668 *ret_vote = VOTE_STAY;
1669 *ret_majority = num_commited - votes_yes;
1676 struct TaskEntry *task;
1677 struct SetKey dst_set_key;
1682 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1684 struct SetCopyCls *scc = cls;
1685 struct TaskEntry *task = scc->task;
1686 struct SetKey dst_set_key = scc->dst_set_key;
1687 struct SetEntry *set;
1688 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1691 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1692 task->step->session->set_handles_tail,
1696 set = GNUNET_new (struct SetEntry);
1698 set->key = dst_set_key;
1699 put_set (task->step->session, set);
1706 * Call the start function of the given
1707 * task again after we created a copy of the given set.
1710 create_set_copy_for_task (struct TaskEntry *task,
1711 struct SetKey *src_set_key,
1712 struct SetKey *dst_set_key)
1714 struct SetEntry *src_set;
1715 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718 "Copying set {%s} to {%s} for task {%s}\n",
1719 debug_str_set_key (src_set_key),
1720 debug_str_set_key (dst_set_key),
1721 debug_str_task_key (&task->key));
1724 scc->dst_set_key = *dst_set_key;
1725 src_set = lookup_set (task->step->session, src_set_key);
1726 GNUNET_assert (NULL != src_set);
1727 GNUNET_SET_copy_lazy (src_set->h,
1733 struct SetMutationProgressCls
1737 * Task to finish once all changes are through.
1739 struct TaskEntry *task;
1744 set_mutation_done (void *cls)
1746 struct SetMutationProgressCls *pc = cls;
1748 GNUNET_assert (pc->num_pending > 0);
1752 if (0 == pc->num_pending)
1754 struct TaskEntry *task = pc->task;
1762 try_finish_step_early (struct Step *step)
1766 if (GNUNET_YES == step->is_running)
1768 if (GNUNET_YES == step->is_finished)
1770 if (GNUNET_NO == step->early_finishable)
1773 step->is_finished = GNUNET_YES;
1775 #ifdef GNUNET_EXTRA_LOGGING
1776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1777 "Finishing step `%s' early.\n",
1781 for (i = 0; i < step->subordinates_len; i++)
1783 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1784 step->subordinates[i]->pending_prereq--;
1785 #ifdef GNUNET_EXTRA_LOGGING
1786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1787 "Decreased pending_prereq to %u for step `%s'.\n",
1788 (unsigned int) step->subordinates[i]->pending_prereq,
1789 step->subordinates[i]->debug_name);
1792 try_finish_step_early (step->subordinates[i]);
1795 // XXX: maybe schedule as task to avoid recursion?
1796 run_ready_steps (step->session);
1801 finish_step (struct Step *step)
1805 GNUNET_assert (step->finished_tasks == step->tasks_len);
1806 GNUNET_assert (GNUNET_YES == step->is_running);
1807 GNUNET_assert (GNUNET_NO == step->is_finished);
1809 #ifdef GNUNET_EXTRA_LOGGING
1810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1811 "All tasks of step `%s' with %u subordinates finished.\n",
1813 step->subordinates_len);
1816 for (i = 0; i < step->subordinates_len; i++)
1818 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1819 step->subordinates[i]->pending_prereq--;
1820 #ifdef GNUNET_EXTRA_LOGGING
1821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1822 "Decreased pending_prereq to %u for step `%s'.\n",
1823 (unsigned int) step->subordinates[i]->pending_prereq,
1824 step->subordinates[i]->debug_name);
1829 step->is_finished = GNUNET_YES;
1831 // XXX: maybe schedule as task to avoid recursion?
1832 run_ready_steps (step->session);
1838 * Apply the result from one round of gradecasts (i.e. every peer
1839 * should have gradecasted) to the peer's current set.
1841 * @param task the task with context information
1844 task_start_apply_round (struct TaskEntry *task)
1846 struct ConsensusSession *session = task->step->session;
1847 struct SetKey sk_in;
1848 struct SetKey sk_out;
1849 struct RfnKey rk_in;
1850 struct SetEntry *set_out;
1851 struct ReferendumEntry *rfn_in;
1852 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1853 struct RfnElementInfo *ri;
1854 struct SetMutationProgressCls *progress_cls;
1855 uint16_t worst_majority = UINT16_MAX;
1857 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1858 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1859 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1861 set_out = lookup_set (session, &sk_out);
1862 if (NULL == set_out)
1864 create_set_copy_for_task (task, &sk_in, &sk_out);
1868 rfn_in = lookup_rfn (session, &rk_in);
1869 GNUNET_assert (NULL != rfn_in);
1871 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1872 progress_cls->task = task;
1874 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1876 while (GNUNET_YES ==
1877 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1879 (const void **) &ri))
1881 uint16_t majority_num;
1882 enum ReferendumVote majority_vote;
1884 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1886 if (worst_majority > majority_num)
1887 worst_majority = majority_num;
1889 switch (majority_vote)
1892 progress_cls->num_pending++;
1893 GNUNET_assert (GNUNET_OK ==
1894 GNUNET_SET_add_element (set_out->h,
1898 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1899 "P%u: apply round: adding element %s with %u-majority.\n",
1900 session->local_peer_idx,
1901 debug_str_element (ri->element), majority_num);
1904 progress_cls->num_pending++;
1905 GNUNET_assert (GNUNET_OK ==
1906 GNUNET_SET_remove_element (set_out->h,
1910 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911 "P%u: apply round: deleting element %s with %u-majority.\n",
1912 session->local_peer_idx,
1913 debug_str_element (ri->element), majority_num);
1916 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917 "P%u: apply round: keeping element %s with %u-majority.\n",
1918 session->local_peer_idx,
1919 debug_str_element (ri->element), majority_num);
1928 if (0 == progress_cls->num_pending)
1930 // call closure right now, no pending ops
1931 GNUNET_free (progress_cls);
1936 uint16_t thresh = (session->num_peers / 3) * 2;
1938 if (worst_majority >= thresh)
1940 switch (session->early_stopping)
1942 case EARLY_STOPPING_NONE:
1943 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1944 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1945 "P%u: Stopping early (after one more superround)\n",
1946 session->local_peer_idx);
1948 case EARLY_STOPPING_ONE_MORE:
1949 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1950 session->local_peer_idx);
1951 session->early_stopping = EARLY_STOPPING_DONE;
1954 for (step = session->steps_head; NULL != step; step = step->next)
1955 try_finish_step_early (step);
1958 case EARLY_STOPPING_DONE:
1959 /* We shouldn't be here anymore after early stopping */
1967 else if (EARLY_STOPPING_NONE != session->early_stopping)
1969 // Our assumption about the number of bad peers
1971 GNUNET_break_op (0);
1975 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1976 session->local_peer_idx);
1979 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1984 task_start_grade (struct TaskEntry *task)
1986 struct ConsensusSession *session = task->step->session;
1987 struct ReferendumEntry *output_rfn;
1988 struct ReferendumEntry *input_rfn;
1989 struct DiffEntry *input_diff;
1990 struct RfnKey rfn_key;
1991 struct DiffKey diff_key;
1992 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1993 struct RfnElementInfo *ri;
1994 unsigned int gradecast_confidence = 2;
1996 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1997 output_rfn = lookup_rfn (session, &rfn_key);
1998 if (NULL == output_rfn)
2000 output_rfn = rfn_create (session->num_peers);
2001 output_rfn->key = rfn_key;
2002 put_rfn (session, output_rfn);
2005 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2006 input_diff = lookup_diff (session, &diff_key);
2007 GNUNET_assert (NULL != input_diff);
2009 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2010 input_rfn = lookup_rfn (session, &rfn_key);
2011 GNUNET_assert (NULL != input_rfn);
2013 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2015 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2017 while (GNUNET_YES ==
2018 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2020 (const void **) &ri))
2022 uint16_t majority_num;
2023 enum ReferendumVote majority_vote;
2025 // XXX: we need contested votes and non-contested votes here
2026 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2028 if (majority_num <= session->num_peers / 3)
2029 majority_vote = VOTE_REMOVE;
2031 switch (majority_vote)
2036 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2039 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2046 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2049 uint16_t noncontested;
2050 noncontested = rfn_noncontested (input_rfn);
2051 if (noncontested < (session->num_peers / 3) * 2)
2053 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2055 if (noncontested < (session->num_peers / 3) + 1)
2057 gradecast_confidence = 0;
2061 if (gradecast_confidence >= 1)
2062 rfn_commit (output_rfn, task->key.leader);
2064 if (gradecast_confidence <= 1)
2065 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2072 task_start_reconcile (struct TaskEntry *task)
2074 struct SetEntry *input;
2075 struct SetOpCls *setop = &task->cls.setop;
2076 struct ConsensusSession *session = task->step->session;
2078 input = lookup_set (session, &setop->input_set);
2079 GNUNET_assert (NULL != input);
2080 GNUNET_assert (NULL != input->h);
2082 /* We create the outputs for the operation here
2083 (rather than in the set operation callback)
2084 because we want something valid in there, even
2085 if the other peer doesn't talk to us */
2087 if (SET_KIND_NONE != setop->output_set.set_kind)
2089 /* If we don't have an existing output set,
2090 we clone the input set. */
2091 if (NULL == lookup_set (session, &setop->output_set))
2093 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2098 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2100 if (NULL == lookup_rfn (session, &setop->output_rfn))
2102 struct ReferendumEntry *rfn;
2104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2105 "P%u: output rfn <%s> missing, creating.\n",
2106 session->local_peer_idx,
2107 debug_str_rfn_key (&setop->output_rfn));
2109 rfn = rfn_create (session->num_peers);
2110 rfn->key = setop->output_rfn;
2111 put_rfn (session, rfn);
2115 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2117 if (NULL == lookup_diff (session, &setop->output_diff))
2119 struct DiffEntry *diff;
2121 diff = diff_create ();
2122 diff->key = setop->output_diff;
2123 put_diff (session, diff);
2127 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2129 /* XXX: mark the corresponding rfn as commited if necessary */
2134 if (task->key.peer1 == session->local_peer_idx)
2136 struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2139 "P%u: Looking up set {%s} to run remote union\n",
2140 session->local_peer_idx,
2141 debug_str_set_key (&setop->input_set));
2143 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2144 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2146 rcm.kind = htons (task->key.kind);
2147 rcm.peer1 = htons (task->key.peer1);
2148 rcm.peer2 = htons (task->key.peer2);
2149 rcm.leader = htons (task->key.leader);
2150 rcm.repetition = htons (task->key.repetition);
2151 rcm.is_contested = htons (0);
2153 GNUNET_assert (NULL == setop->op);
2154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2155 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2157 struct GNUNET_SET_Option opts[] = {
2158 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2159 { GNUNET_SET_OPTION_END },
2162 // XXX: maybe this should be done while
2163 // setting up tasks alreays?
2164 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2165 &session->global_id,
2167 GNUNET_SET_RESULT_SYMMETRIC,
2172 commit_set (session, task);
2174 else if (task->key.peer2 == session->local_peer_idx)
2176 /* Wait for the other peer to contact us */
2177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2178 session->local_peer_idx, task->key.peer1);
2180 if (NULL != setop->op)
2182 commit_set (session, task);
2187 /* We made an error while constructing the task graph. */
2194 task_start_eval_echo (struct TaskEntry *task)
2196 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2197 struct ReferendumEntry *input_rfn;
2198 struct RfnElementInfo *ri;
2199 struct SetEntry *output_set;
2200 struct SetMutationProgressCls *progress_cls;
2201 struct ConsensusSession *session = task->step->session;
2202 struct SetKey sk_in;
2203 struct SetKey sk_out;
2204 struct RfnKey rk_in;
2206 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2207 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2208 output_set = lookup_set (session, &sk_out);
2209 if (NULL == output_set)
2211 create_set_copy_for_task (task, &sk_in, &sk_out);
2217 // FIXME: should be marked as a shallow copy, so
2218 // we can destroy everything correctly
2219 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2220 last_set->h = output_set->h;
2221 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2222 put_set (session, last_set);
2225 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226 "Evaluating referendum in Task {%s}\n",
2227 debug_str_task_key (&task->key));
2229 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2230 progress_cls->task = task;
2232 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2233 input_rfn = lookup_rfn (session, &rk_in);
2235 GNUNET_assert (NULL != input_rfn);
2237 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2238 GNUNET_assert (NULL != iter);
2240 while (GNUNET_YES ==
2241 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2243 (const void **) &ri))
2245 enum ReferendumVote majority_vote;
2246 uint16_t majority_num;
2248 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2250 if (majority_num < session->num_peers / 3)
2252 /* It is not the case that all nonfaulty peers
2253 echoed the same value. Since we're doing a set reconciliation, we
2254 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2255 reconciliation as contested. Other peers might not know that the
2256 leader is faulty, thus we still re-distribute in the confirmation
2258 output_set->is_contested = GNUNET_YES;
2261 switch (majority_vote)
2264 progress_cls->num_pending++;
2265 GNUNET_assert (GNUNET_OK ==
2266 GNUNET_SET_add_element (output_set->h,
2272 progress_cls->num_pending++;
2273 GNUNET_assert (GNUNET_OK ==
2274 GNUNET_SET_remove_element (output_set->h,
2280 /* Nothing to do. */
2288 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2290 if (0 == progress_cls->num_pending)
2292 // call closure right now, no pending ops
2293 GNUNET_free (progress_cls);
2300 task_start_finish (struct TaskEntry *task)
2302 struct SetEntry *final_set;
2303 struct ConsensusSession *session = task->step->session;
2305 final_set = lookup_set (session, &task->cls.finish.input_set);
2307 GNUNET_assert (NULL != final_set);
2310 GNUNET_SET_iterate (final_set->h,
2311 send_to_client_iter,
2316 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2320 GNUNET_assert (GNUNET_NO == task->is_started);
2321 GNUNET_assert (GNUNET_NO == task->is_finished);
2322 GNUNET_assert (NULL != task->start);
2326 task->is_started = GNUNET_YES;
2333 * Run all steps of the session that don't any
2334 * more dependencies.
2337 run_ready_steps (struct ConsensusSession *session)
2341 step = session->steps_head;
2343 while (NULL != step)
2345 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2349 GNUNET_assert (0 == step->finished_tasks);
2351 #ifdef GNUNET_EXTRA_LOGGING
2352 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2353 session->local_peer_idx,
2355 step->round, step->tasks_len, step->subordinates_len);
2358 step->is_running = GNUNET_YES;
2359 for (i = 0; i < step->tasks_len; i++)
2360 start_task (session, step->tasks[i]);
2362 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2363 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2366 /* Running the next ready steps will be triggered by task completion */
2378 finish_task (struct TaskEntry *task)
2380 GNUNET_assert (GNUNET_NO == task->is_finished);
2381 task->is_finished = GNUNET_YES;
2383 task->step->finished_tasks++;
2385 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2386 "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2387 task->step->session->local_peer_idx,
2388 debug_str_task_key (&task->key),
2389 (unsigned int) task->step->finished_tasks,
2390 (unsigned int) task->step->tasks_len);
2392 if (task->step->finished_tasks == task->step->tasks_len)
2393 finish_step (task->step);
2398 * Search peer in the list of peers in session.
2400 * @param peer peer to find
2401 * @param session session with peer
2402 * @return index of peer, -1 if peer is not in session
2405 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2408 for (i = 0; i < session->num_peers; i++)
2409 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2416 * Compute a global, (hopefully) unique consensus session id,
2417 * from the local id of the consensus session, and the identities of all participants.
2418 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2419 * exactly the same peers, the global id will be different.
2421 * @param session session to generate the global id for
2422 * @param local_session_id local id of the consensus session
2425 compute_global_id (struct ConsensusSession *session,
2426 const struct GNUNET_HashCode *local_session_id)
2428 const char *salt = "gnunet-service-consensus/session_id";
2430 GNUNET_assert (GNUNET_YES ==
2431 GNUNET_CRYPTO_kdf (&session->global_id,
2432 sizeof (struct GNUNET_HashCode),
2436 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2438 sizeof (struct GNUNET_HashCode),
2444 * Compare two peer identities.
2446 * @param h1 some peer identity
2447 * @param h2 some peer identity
2448 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2451 peer_id_cmp (const void *h1, const void *h2)
2453 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2458 * Create the sorted list of peers for the session,
2459 * add the local peer if not in the join message.
2461 * @param session session to initialize
2462 * @param join_msg join message with the list of peers participating at the end
2465 initialize_session_peer_list (struct ConsensusSession *session,
2466 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2468 const struct GNUNET_PeerIdentity *msg_peers
2469 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2470 int local_peer_in_list;
2472 session->num_peers = ntohl (join_msg->num_peers);
2474 /* Peers in the join message, may or may not include the local peer,
2475 Add it if it is missing. */
2476 local_peer_in_list = GNUNET_NO;
2477 for (unsigned int i = 0; i < session->num_peers; i++)
2479 if (0 == memcmp (&msg_peers[i],
2481 sizeof (struct GNUNET_PeerIdentity)))
2483 local_peer_in_list = GNUNET_YES;
2487 if (GNUNET_NO == local_peer_in_list)
2488 session->num_peers++;
2490 session->peers = GNUNET_new_array (session->num_peers,
2491 struct GNUNET_PeerIdentity);
2492 if (GNUNET_NO == local_peer_in_list)
2493 session->peers[session->num_peers - 1] = my_peer;
2495 GNUNET_memcpy (session->peers,
2497 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2498 qsort (session->peers,
2500 sizeof (struct GNUNET_PeerIdentity),
2505 static struct TaskEntry *
2506 lookup_task (struct ConsensusSession *session,
2507 struct TaskKey *key)
2509 struct GNUNET_HashCode hash;
2512 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2514 GNUNET_h2s (&hash));
2515 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2520 * Called when another peer wants to do a set operation with the
2523 * @param cls closure
2524 * @param other_peer the other peer
2525 * @param context_msg message with application specific information from
2527 * @param request request from the other peer, use GNUNET_SET_accept
2528 * to accept it, otherwise the request will be refused
2529 * Note that we don't use a return value here, as it is also
2530 * necessary to specify the set we want to do the operation with,
2531 * whith sometimes can be derived from the context message.
2532 * Also necessary to specify the timeout.
2535 set_listen_cb (void *cls,
2536 const struct GNUNET_PeerIdentity *other_peer,
2537 const struct GNUNET_MessageHeader *context_msg,
2538 struct GNUNET_SET_Request *request)
2540 struct ConsensusSession *session = cls;
2542 struct TaskEntry *task;
2543 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2545 if (NULL == context_msg)
2547 GNUNET_break_op (0);
2551 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2553 GNUNET_break_op (0);
2557 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2559 GNUNET_break_op (0);
2563 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2565 tk = ((struct TaskKey) {
2566 .kind = ntohs (cm->kind),
2567 .peer1 = ntohs (cm->peer1),
2568 .peer2 = ntohs (cm->peer2),
2569 .repetition = ntohs (cm->repetition),
2570 .leader = ntohs (cm->leader),
2573 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2574 session->local_peer_idx, debug_str_task_key (&tk));
2576 task = lookup_task (session, &tk);
2580 GNUNET_break_op (0);
2584 if (GNUNET_YES == task->is_finished)
2586 GNUNET_break_op (0);
2590 if (task->key.peer2 != session->local_peer_idx)
2592 /* We're being asked, so we must be thne 2nd peer. */
2593 GNUNET_break_op (0);
2597 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2598 (task->key.peer2 == session->local_peer_idx)));
2600 struct GNUNET_SET_Option opts[] = {
2601 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2602 { GNUNET_SET_OPTION_END },
2605 task->cls.setop.op = GNUNET_SET_accept (request,
2606 GNUNET_SET_RESULT_SYMMETRIC,
2611 /* If the task hasn't been started yet,
2612 we wait for that until we commit. */
2614 if (GNUNET_YES == task->is_started)
2616 commit_set (session, task);
2623 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2624 struct TaskEntry *t)
2626 struct GNUNET_HashCode round_hash;
2629 GNUNET_assert (NULL != t->step);
2631 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2635 if (s->tasks_len == s->tasks_cap)
2637 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2638 GNUNET_array_grow (s->tasks,
2643 #ifdef GNUNET_EXTRA_LOGGING
2644 GNUNET_assert (NULL != s->debug_name);
2645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2646 debug_str_task_key (&t->key),
2650 s->tasks[s->tasks_len] = t;
2653 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2654 GNUNET_assert (GNUNET_OK ==
2655 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2656 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2661 install_step_timeouts (struct ConsensusSession *session)
2663 /* Given the fully constructed task graph
2664 with rounds for tasks, we can give the tasks timeouts. */
2666 // unsigned int max_round;
2668 /* XXX: implement! */
2674 * Arrange two peers in some canonical order.
2677 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2682 GNUNET_assert (*p1 < n);
2683 GNUNET_assert (*p2 < n);
2696 /* For uniformly random *p1, *p2,
2697 this condition is true with 50% chance */
2698 if (((b - a) + n) % n <= n / 2)
2712 * Record @a dep as a dependency of @a step.
2715 step_depend_on (struct Step *step, struct Step *dep)
2717 /* We're not checking for cyclic dependencies,
2718 but this is a cheap sanity check. */
2719 GNUNET_assert (step != dep);
2720 GNUNET_assert (NULL != step);
2721 GNUNET_assert (NULL != dep);
2722 GNUNET_assert (dep->round <= step->round);
2724 #ifdef GNUNET_EXTRA_LOGGING
2725 /* Make sure we have complete debugging information.
2726 Also checks that we don't screw up too badly
2727 constructing the task graph. */
2728 GNUNET_assert (NULL != step->debug_name);
2729 GNUNET_assert (NULL != dep->debug_name);
2730 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2731 "Making step `%s' depend on `%s'\n",
2736 if (dep->subordinates_cap == dep->subordinates_len)
2738 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2739 GNUNET_array_grow (dep->subordinates,
2740 dep->subordinates_cap,
2744 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2746 dep->subordinates[dep->subordinates_len] = step;
2747 dep->subordinates_len++;
2749 step->pending_prereq++;
2753 static struct Step *
2754 create_step (struct ConsensusSession *session, int round, int early_finishable)
2757 step = GNUNET_new (struct Step);
2758 step->session = session;
2759 step->round = round;
2760 step->early_finishable = early_finishable;
2761 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2762 session->steps_tail,
2769 * Construct the task graph for a single
2773 construct_task_graph_gradecast (struct ConsensusSession *session,
2776 struct Step *step_before,
2777 struct Step *step_after)
2779 uint16_t n = session->num_peers;
2780 uint16_t me = session->local_peer_idx;
2785 /* The task we're currently setting up. */
2786 struct TaskEntry task;
2789 struct Step *prev_step;
2795 round = step_before->round + 1;
2797 /* gcast step 1: leader disseminates */
2799 step = create_step (session, round, GNUNET_YES);
2801 #ifdef GNUNET_EXTRA_LOGGING
2802 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2804 step_depend_on (step, step_before);
2808 for (k = 0; k < n; k++)
2814 arrange_peers (&p1, &p2, n);
2815 task = ((struct TaskEntry) {
2817 .start = task_start_reconcile,
2818 .cancel = task_cancel_reconcile,
2819 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2821 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2822 put_task (session->taskmap, &task);
2824 /* We run this task to make sure that the leader
2825 has the stored the SET_KIND_LEADER set of himself,
2826 so it can participate in the rest of the gradecast
2827 without the code having to handle any special cases. */
2828 task = ((struct TaskEntry) {
2830 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2831 .start = task_start_reconcile,
2832 .cancel = task_cancel_reconcile,
2834 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2835 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2836 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2837 put_task (session->taskmap, &task);
2843 arrange_peers (&p1, &p2, n);
2844 task = ((struct TaskEntry) {
2846 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2847 .start = task_start_reconcile,
2848 .cancel = task_cancel_reconcile,
2850 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2851 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2852 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2853 put_task (session->taskmap, &task);
2856 /* gcast phase 2: echo */
2859 step = create_step (session, round, GNUNET_YES);
2860 #ifdef GNUNET_EXTRA_LOGGING
2861 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2863 step_depend_on (step, prev_step);
2865 for (k = 0; k < n; k++)
2869 arrange_peers (&p1, &p2, n);
2870 task = ((struct TaskEntry) {
2872 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2873 .start = task_start_reconcile,
2874 .cancel = task_cancel_reconcile,
2876 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2877 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2878 put_task (session->taskmap, &task);
2882 /* Same round, since step only has local tasks */
2883 step = create_step (session, round, GNUNET_YES);
2884 #ifdef GNUNET_EXTRA_LOGGING
2885 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2887 step_depend_on (step, prev_step);
2889 arrange_peers (&p1, &p2, n);
2890 task = ((struct TaskEntry) {
2891 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2893 .start = task_start_eval_echo
2895 put_task (session->taskmap, &task);
2899 step = create_step (session, round, GNUNET_YES);
2900 #ifdef GNUNET_EXTRA_LOGGING
2901 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2903 step_depend_on (step, prev_step);
2905 /* gcast phase 3: confirmation and grading */
2906 for (k = 0; k < n; k++)
2910 arrange_peers (&p1, &p2, n);
2911 task = ((struct TaskEntry) {
2913 .start = task_start_reconcile,
2914 .cancel = task_cancel_reconcile,
2915 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2917 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2918 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2919 /* If there was at least one element in the echo round that was
2920 contested (i.e. it had no n-t majority), then we let the other peers
2921 know, and other peers let us know. The contested flag for each peer is
2922 stored in the rfn. */
2923 task.cls.setop.transceive_contested = GNUNET_YES;
2924 put_task (session->taskmap, &task);
2928 /* Same round, since step only has local tasks */
2929 step = create_step (session, round, GNUNET_YES);
2930 #ifdef GNUNET_EXTRA_LOGGING
2931 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2933 step_depend_on (step, prev_step);
2935 task = ((struct TaskEntry) {
2937 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2938 .start = task_start_grade,
2940 put_task (session->taskmap, &task);
2942 step_depend_on (step_after, step);
2947 construct_task_graph (struct ConsensusSession *session)
2949 uint16_t n = session->num_peers;
2952 uint16_t me = session->local_peer_idx;
2954 /* The task we're currently setting up. */
2955 struct TaskEntry task;
2957 /* Current leader */
2961 struct Step *prev_step;
2963 unsigned int round = 0;
2967 // XXX: introduce first step,
2968 // where we wait for all insert acks
2969 // from the set service
2971 /* faster but brittle all-to-all */
2973 // XXX: Not implemented yet
2975 /* all-to-all step */
2977 step = create_step (session, round, GNUNET_NO);
2979 #ifdef GNUNET_EXTRA_LOGGING
2980 step->debug_name = GNUNET_strdup ("all to all");
2983 for (i = 0; i < n; i++)
2990 arrange_peers (&p1, &p2, n);
2991 task = ((struct TaskEntry) {
2992 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2994 .start = task_start_reconcile,
2995 .cancel = task_cancel_reconcile,
2997 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2998 task.cls.setop.output_set = task.cls.setop.input_set;
2999 task.cls.setop.do_not_remove = GNUNET_YES;
3000 put_task (session->taskmap, &task);
3005 step = create_step (session, round, GNUNET_NO);;
3006 #ifdef GNUNET_EXTRA_LOGGING
3007 step->debug_name = GNUNET_strdup ("all to all 2");
3009 step_depend_on (step, prev_step);
3012 for (i = 0; i < n; i++)
3019 arrange_peers (&p1, &p2, n);
3020 task = ((struct TaskEntry) {
3021 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3023 .start = task_start_reconcile,
3024 .cancel = task_cancel_reconcile,
3026 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3027 task.cls.setop.output_set = task.cls.setop.input_set;
3028 task.cls.setop.do_not_remove = GNUNET_YES;
3029 put_task (session->taskmap, &task);
3039 /* Byzantine union */
3041 /* sequential repetitions of the gradecasts */
3042 for (i = 0; i < t + 1; i++)
3044 struct Step *step_rep_start;
3045 struct Step *step_rep_end;
3047 /* Every repetition is in a separate round. */
3048 step_rep_start = create_step (session, round, GNUNET_YES);
3049 #ifdef GNUNET_EXTRA_LOGGING
3050 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3053 step_depend_on (step_rep_start, prev_step);
3055 /* gradecast has three rounds */
3057 step_rep_end = create_step (session, round, GNUNET_YES);
3058 #ifdef GNUNET_EXTRA_LOGGING
3059 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3062 /* parallel gradecasts */
3063 for (lead = 0; lead < n; lead++)
3064 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3066 task = ((struct TaskEntry) {
3067 .step = step_rep_end,
3068 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3069 .start = task_start_apply_round,
3071 put_task (session->taskmap, &task);
3073 prev_step = step_rep_end;
3076 /* There is no next gradecast round, thus the final
3077 start step is the overall end step of the gradecasts */
3079 step = create_step (session, round, GNUNET_NO);
3080 #ifdef GNUNET_EXTRA_LOGGING
3081 GNUNET_asprintf (&step->debug_name, "finish");
3083 step_depend_on (step, prev_step);
3085 task = ((struct TaskEntry) {
3087 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3088 .start = task_start_finish,
3090 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3092 put_task (session->taskmap, &task);
3098 * Check join message.
3100 * @param cls session of client that sent the message
3101 * @param m message sent by the client
3102 * @return #GNUNET_OK if @a m is well-formed
3105 check_client_join (void *cls,
3106 const struct GNUNET_CONSENSUS_JoinMessage *m)
3108 uint32_t listed_peers = ntohl (m->num_peers);
3110 if ( (ntohs (m->header.size) - sizeof (*m)) !=
3111 listed_peers * sizeof (struct GNUNET_PeerIdentity))
3114 return GNUNET_SYSERR;
3121 * Called when a client wants to join a consensus session.
3123 * @param cls session of client that sent the message
3124 * @param m message sent by the client
3127 handle_client_join (void *cls,
3128 const struct GNUNET_CONSENSUS_JoinMessage *m)
3130 struct ConsensusSession *session = cls;
3131 struct ConsensusSession *other_session;
3133 initialize_session_peer_list (session,
3135 compute_global_id (session,
3138 /* Check if some local client already owns the session.
3139 It is only legal to have a session with an existing global id
3140 if all other sessions with this global id are finished.*/
3141 for (other_session = sessions_head;
3142 NULL != other_session;
3143 other_session = other_session->next)
3145 if ( (other_session != session) &&
3146 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3147 &other_session->global_id)) )
3151 session->conclude_deadline
3152 = GNUNET_TIME_absolute_ntoh (m->deadline);
3153 session->conclude_start
3154 = GNUNET_TIME_absolute_ntoh (m->start);
3155 session->local_peer_idx = get_peer_idx (&my_peer,
3157 GNUNET_assert (-1 != session->local_peer_idx);
3159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3160 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3161 GNUNET_h2s (&m->session_id),
3163 session->local_peer_idx,
3164 GNUNET_STRINGS_relative_time_to_string
3165 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3166 session->conclude_deadline),
3169 session->set_listener
3170 = GNUNET_SET_listen (cfg,
3171 GNUNET_SET_OPERATION_UNION,
3172 &session->global_id,
3176 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3178 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3180 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3182 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3186 struct SetEntry *client_set;
3188 client_set = GNUNET_new (struct SetEntry);
3189 client_set->h = GNUNET_SET_create (cfg,
3190 GNUNET_SET_OPERATION_UNION);
3191 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3192 sh->h = client_set->h;
3193 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3194 session->set_handles_tail,
3196 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3201 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3204 /* Just construct the task graph,
3205 but don't run anything until the client calls conclude. */
3206 construct_task_graph (session);
3207 GNUNET_SERVICE_client_continue (session->client);
3212 client_insert_done (void *cls)
3219 * Called when a client performs an insert operation.
3221 * @param cls client handle
3222 * @param msg message sent by the client
3223 * @return #GNUNET_OK (always well-formed)
3226 check_client_insert (void *cls,
3227 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3234 * Called when a client performs an insert operation.
3236 * @param cls client handle
3237 * @param msg message sent by the client
3240 handle_client_insert (void *cls,
3241 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3243 struct ConsensusSession *session = cls;
3244 ssize_t element_size;
3245 struct GNUNET_SET_Handle *initial_set;
3246 struct ConsensusElement *ce;
3248 if (GNUNET_YES == session->conclude_started)
3251 GNUNET_SERVICE_client_drop (session->client);
3255 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3256 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3257 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3258 ce->payload_type = msg->element_type;
3260 struct GNUNET_SET_Element element = {
3261 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3262 .size = sizeof (struct ConsensusElement) + element_size,
3267 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3268 struct SetEntry *entry;
3270 entry = lookup_set (session,
3272 GNUNET_assert (NULL != entry);
3273 initial_set = entry->h;
3276 session->num_client_insert_pending++;
3277 GNUNET_SET_add_element (initial_set,
3279 &client_insert_done,
3282 #ifdef GNUNET_EXTRA_LOGGING
3284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3285 "P%u: element %s added\n",
3286 session->local_peer_idx,
3287 debug_str_element (&element));
3291 GNUNET_SERVICE_client_continue (session->client);
3296 * Called when a client performs the conclude operation.
3298 * @param cls client handle
3299 * @param message message sent by the client
3302 handle_client_conclude (void *cls,
3303 const struct GNUNET_MessageHeader *message)
3305 struct ConsensusSession *session = cls;
3307 if (GNUNET_YES == session->conclude_started)
3309 /* conclude started twice */
3311 GNUNET_SERVICE_client_drop (session->client);
3314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3315 "conclude requested\n");
3316 session->conclude_started = GNUNET_YES;
3317 install_step_timeouts (session);
3318 run_ready_steps (session);
3319 GNUNET_SERVICE_client_continue (session->client);
3324 * Called to clean up, after a shutdown has been requested.
3326 * @param cls closure
3329 shutdown_task (void *cls)
3331 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3333 GNUNET_STATISTICS_destroy (statistics,
3340 * Start processing consensus requests.
3342 * @param cls closure
3343 * @param c configuration to use
3344 * @param service the initialized service
3348 const struct GNUNET_CONFIGURATION_Handle *c,
3349 struct GNUNET_SERVICE_Handle *service)
3353 GNUNET_CRYPTO_get_peer_identity (cfg,
3356 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3357 "Could not retrieve host identity\n");
3358 GNUNET_SCHEDULER_shutdown ();
3361 statistics = GNUNET_STATISTICS_create ("consensus",
3363 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3369 * Callback called when a client connects to the service.
3371 * @param cls closure for the service
3372 * @param c the new client that connected to the service
3373 * @param mq the message queue used to send messages to the client
3377 client_connect_cb (void *cls,
3378 struct GNUNET_SERVICE_Client *c,
3379 struct GNUNET_MQ_Handle *mq)
3381 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3383 session->client = c;
3384 session->client_mq = mq;
3385 GNUNET_CONTAINER_DLL_insert (sessions_head,
3393 * Callback called when a client disconnected from the service
3395 * @param cls closure for the service
3396 * @param c the client that disconnected
3397 * @param internal_cls should be equal to @a c
3400 client_disconnect_cb (void *cls,
3401 struct GNUNET_SERVICE_Client *c,
3404 struct ConsensusSession *session = internal_cls;
3406 if (NULL != session->set_listener)
3408 GNUNET_SET_listen_cancel (session->set_listener);
3409 session->set_listener = NULL;
3411 GNUNET_CONTAINER_DLL_remove (sessions_head,
3415 while (session->set_handles_head)
3417 struct SetHandle *sh = session->set_handles_head;
3418 session->set_handles_head = sh->next;
3419 GNUNET_SET_destroy (sh->h);
3422 GNUNET_free (session);
3427 * Define "main" method using service macro.
3431 GNUNET_SERVICE_OPTION_NONE,
3434 &client_disconnect_cb,
3436 GNUNET_MQ_hd_fixed_size (client_conclude,
3437 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3438 struct GNUNET_MessageHeader,
3440 GNUNET_MQ_hd_var_size (client_insert,
3441 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3442 struct GNUNET_CONSENSUS_ElementMessage,
3444 GNUNET_MQ_hd_var_size (client_join,
3445 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3446 struct GNUNET_CONSENSUS_JoinMessage,
3448 GNUNET_MQ_handler_end ());
3450 /* end of gnunet-service-consensus.c */