2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold <flo@dold.me>
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
42 * Vote that nothing should change.
43 * This option is never voted explicitly.
47 * Vote that an element should be added.
51 * Vote that an element should be removed.
57 enum EarlyStoppingPhase
59 EARLY_STOPPING_NONE = 0,
60 EARLY_STOPPING_ONE_MORE = 1,
61 EARLY_STOPPING_DONE = 2,
65 GNUNET_NETWORK_STRUCT_BEGIN
68 * Tuple of integers that together
69 * identify a task uniquely.
74 * A value from 'enum PhaseKind'.
76 uint16_t kind GNUNET_PACKED;
79 * Number of the first peer
82 int16_t peer1 GNUNET_PACKED;
85 * Number of the second peer in canonical order.
87 int16_t peer2 GNUNET_PACKED;
90 * Repetition of the gradecast phase.
92 int16_t repetition GNUNET_PACKED;
95 * Leader in the gradecast phase.
97 * Can be different from both peer1 and peer2.
99 int16_t leader GNUNET_PACKED;
105 int set_kind GNUNET_PACKED;
106 int k1 GNUNET_PACKED;
107 int k2 GNUNET_PACKED;
114 struct GNUNET_SET_Handle *h;
116 * GNUNET_YES if the set resulted
117 * from applying a referendum with contested
126 int diff_kind GNUNET_PACKED;
127 int k1 GNUNET_PACKED;
128 int k2 GNUNET_PACKED;
133 int rfn_kind GNUNET_PACKED;
134 int k1 GNUNET_PACKED;
135 int k2 GNUNET_PACKED;
139 GNUNET_NETWORK_STRUCT_END
143 PHASE_KIND_ALL_TO_ALL,
144 PHASE_KIND_ALL_TO_ALL_2,
145 PHASE_KIND_GRADECAST_LEADER,
146 PHASE_KIND_GRADECAST_ECHO,
147 PHASE_KIND_GRADECAST_ECHO_GRADE,
148 PHASE_KIND_GRADECAST_CONFIRM,
149 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
151 * Apply a repetition of the all-to-all
152 * gradecast to the current set.
154 PHASE_KIND_APPLY_REP,
164 * Last result set from a gradecast
166 SET_KIND_LAST_GRADECAST,
167 SET_KIND_LEADER_PROPOSAL,
168 SET_KIND_ECHO_RESULT,
174 DIFF_KIND_LEADER_PROPOSAL,
175 DIFF_KIND_LEADER_CONSENSUS,
176 DIFF_KIND_GRADECAST_RESULT,
184 RFN_KIND_GRADECAST_RESULT
190 struct SetKey input_set;
192 struct SetKey output_set;
193 struct RfnKey output_rfn;
194 struct DiffKey output_diff;
198 int transceive_contested;
200 struct GNUNET_SET_OperationHandle *op;
206 struct SetKey input_set;
210 * Closure for both @a start_task
211 * and @a cancel_task.
215 struct SetOpCls setop;
216 struct FinishCls finish;
221 typedef void (*TaskFunc) (struct TaskEntry *task);
224 * Node in the consensus task graph.
239 union TaskFuncCls cls;
246 * All steps of one session are in a
247 * linked list for easier deallocation.
252 * All steps of one session are in a
253 * linked list for easier deallocation.
257 struct ConsensusSession *session;
260 * Tasks that this step is composed of.
262 struct TaskEntry **tasks;
263 unsigned int tasks_len;
264 unsigned int tasks_cap;
266 unsigned int finished_tasks;
269 * Tasks that have this task as dependency.
271 * We store pointers to subordinates rather
272 * than to prerequisites since it makes
273 * tracking the readiness of a task easier.
275 struct Step **subordinates;
276 unsigned int subordinates_len;
277 unsigned int subordinates_cap;
280 * Counter for the prerequisites of
283 size_t pending_prereq;
286 * Task that will run this step despite
287 * any pending prerequisites.
289 struct GNUNET_SCHEDULER_Task *timeout_task;
291 unsigned int is_running;
293 unsigned int is_finished;
296 * Synchrony round of the task.
297 * Determines the deadline for the task.
302 * Human-readable name for
303 * the task, used for debugging.
308 * When we're doing an early finish, how should this step be
310 * If GNUNET_YES, the step will be marked as finished
311 * without actually running its tasks.
312 * Otherwise, the step will still be run even after
315 * Note that a task may never be finished early if
316 * it is already running.
318 int early_finishable;
322 struct RfnElementInfo
324 const struct GNUNET_SET_Element *element;
327 * GNUNET_YES if the peer votes for the proposal.
332 * Proposal for this element,
333 * can only be VOTE_ADD or VOTE_REMOVE.
335 enum ReferendumVote proposal;
339 struct ReferendumEntry
344 * Elements where there is at least one proposed change.
346 * Maps the hash of the GNUNET_SET_Element
347 * to 'struct RfnElementInfo'.
349 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
351 unsigned int num_peers;
354 * Stores, for every peer in the session,
355 * whether the peer finished the whole referendum.
357 * Votes from peers are only counted if they're
358 * marked as commited (#GNUNET_YES) in the referendum.
360 * Otherwise (#GNUNET_NO), the requested changes are
361 * not counted for majority votes or thresholds.
367 * Contestation state of the peer. If a peer is contested, the values it
368 * contributed are still counted for applying changes, but the grading is
375 struct DiffElementInfo
377 const struct GNUNET_SET_Element *element;
380 * Positive weight for 'add', negative
381 * weights for 'remove'.
393 struct GNUNET_CONTAINER_MultiHashMap *changes;
398 struct SetHandle *prev;
399 struct SetHandle *next;
401 struct GNUNET_SET_Handle *h;
406 * A consensus session consists of one local client and the remote authorities.
408 struct ConsensusSession
411 * Consensus sessions are kept in a DLL.
413 struct ConsensusSession *next;
416 * Consensus sessions are kept in a DLL.
418 struct ConsensusSession *prev;
420 unsigned int num_client_insert_pending;
422 struct GNUNET_CONTAINER_MultiHashMap *setmap;
423 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
424 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
427 * Array of peers with length 'num_peers'.
429 int *peers_blacklisted;
432 * Mapping from (hashed) TaskKey to TaskEntry.
434 * We map the application_id for a round to the task that should be
435 * executed, so we don't have to go through all task whenever we get
436 * an incoming set op request.
438 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
440 struct Step *steps_head;
441 struct Step *steps_tail;
443 int conclude_started;
448 * Global consensus identification, computed
449 * from the session id and participating authorities.
451 struct GNUNET_HashCode global_id;
454 * Client that inhabits the session
456 struct GNUNET_SERVICE_Client *client;
459 * Queued messages to the client.
461 struct GNUNET_MQ_Handle *client_mq;
464 * Time when the conclusion of the consensus should begin.
466 struct GNUNET_TIME_Absolute conclude_start;
469 * Timeout for all rounds together, single rounds will schedule a timeout task
470 * with a fraction of the conclude timeout.
471 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
473 struct GNUNET_TIME_Absolute conclude_deadline;
475 struct GNUNET_PeerIdentity *peers;
478 * Number of other peers in the consensus.
480 unsigned int num_peers;
483 * Index of the local peer in the peers array
485 unsigned int local_peer_idx;
488 * Listener for requests from other peers.
489 * Uses the session's global id as app id.
491 struct GNUNET_SET_ListenHandle *set_listener;
494 * State of our early stopping scheme.
499 * Our set size from the first round.
503 uint64_t *first_sizes_received;
506 * Bounded Eppstein lower bound.
508 uint64_t lower_bound;
510 struct SetHandle *set_handles_head;
511 struct SetHandle *set_handles_tail;
515 * Linked list of sessions this peer participates in.
517 static struct ConsensusSession *sessions_head;
520 * Linked list of sessions this peer participates in.
522 static struct ConsensusSession *sessions_tail;
525 * Configuration of the consensus service.
527 static const struct GNUNET_CONFIGURATION_Handle *cfg;
530 * Peer that runs this service.
532 static struct GNUNET_PeerIdentity my_peer;
537 struct GNUNET_STATISTICS_Handle *statistics;
541 finish_task (struct TaskEntry *task);
545 run_ready_steps (struct ConsensusSession *session);
549 phasename (uint16_t phase)
553 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
557 case PHASE_KIND_FINISH: return "FINISH";
559 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
561 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
563 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
565 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
567 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
569 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
571 default: return "(unknown)";
577 setname (uint16_t kind)
581 case SET_KIND_CURRENT: return "CURRENT";
583 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585 case SET_KIND_NONE: return "NONE";
587 default: return "(unknown)";
593 rfnname (uint16_t kind)
597 case RFN_KIND_NONE: return "NONE";
599 case RFN_KIND_ECHO: return "ECHO";
601 case RFN_KIND_CONFIRM: return "CONFIRM";
603 default: return "(unknown)";
609 diffname (uint16_t kind)
613 case DIFF_KIND_NONE: return "NONE";
615 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
617 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
619 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
621 default: return "(unknown)";
626 #ifdef GNUNET_EXTRA_LOGGING
630 debug_str_element (const struct GNUNET_SET_Element *el)
632 struct GNUNET_HashCode hash;
634 GNUNET_SET_element_hash (el, &hash);
636 return GNUNET_h2s (&hash);
641 debug_str_task_key (struct TaskKey *tk)
643 static char buf[256];
645 snprintf (buf, sizeof(buf),
646 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
647 phasename (tk->kind), tk->peer1, tk->peer2,
648 tk->leader, tk->repetition);
655 debug_str_diff_key (struct DiffKey *dk)
657 static char buf[256];
659 snprintf (buf, sizeof(buf),
660 "DiffKey kind=%s, k1=%d, k2=%d",
661 diffname (dk->diff_kind), dk->k1, dk->k2);
668 debug_str_set_key (const struct SetKey *sk)
670 static char buf[256];
672 snprintf (buf, sizeof(buf),
673 "SetKey kind=%s, k1=%d, k2=%d",
674 setname (sk->set_kind), sk->k1, sk->k2);
681 debug_str_rfn_key (const struct RfnKey *rk)
683 static char buf[256];
685 snprintf (buf, sizeof(buf),
686 "RfnKey kind=%s, k1=%d, k2=%d",
687 rfnname (rk->rfn_kind), rk->k1, rk->k2);
693 #endif /* GNUNET_EXTRA_LOGGING */
697 * Send the final result set of the consensus to the client, element by
701 * @param element the current element, NULL if all elements have been
703 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
706 send_to_client_iter (void *cls,
707 const struct GNUNET_SET_Element *element)
709 struct TaskEntry *task = (struct TaskEntry *) cls;
710 struct ConsensusSession *session = task->step->session;
711 struct GNUNET_MQ_Envelope *ev;
715 struct GNUNET_CONSENSUS_ElementMessage *m;
716 const struct ConsensusElement *ce;
718 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT ==
719 element->element_type);
722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n",
723 (unsigned) ce->marker);
728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729 "P%d: sending element %s to client\n",
730 session->local_peer_idx,
731 debug_str_element (element));
733 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof(struct
735 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
736 m->element_type = ce->payload_type;
737 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof(struct
739 GNUNET_MQ_send (session->client_mq, ev);
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "P%d: finished iterating elements for client\n",
745 session->local_peer_idx);
746 ev = GNUNET_MQ_msg_header (
747 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
748 GNUNET_MQ_send (session->client_mq, ev);
754 static struct SetEntry *
755 lookup_set (struct ConsensusSession *session, struct SetKey *key)
757 struct GNUNET_HashCode hash;
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760 "P%u: looking up set {%s}\n",
761 session->local_peer_idx,
762 debug_str_set_key (key));
764 GNUNET_assert (SET_KIND_NONE != key->set_kind);
765 GNUNET_CRYPTO_hash (key, sizeof(struct SetKey), &hash);
766 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
770 static struct DiffEntry *
771 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
773 struct GNUNET_HashCode hash;
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776 "P%u: looking up diff {%s}\n",
777 session->local_peer_idx,
778 debug_str_diff_key (key));
780 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
781 GNUNET_CRYPTO_hash (key, sizeof(struct DiffKey), &hash);
782 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
786 static struct ReferendumEntry *
787 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
789 struct GNUNET_HashCode hash;
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "P%u: looking up rfn {%s}\n",
793 session->local_peer_idx,
794 debug_str_rfn_key (key));
796 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
797 GNUNET_CRYPTO_hash (key, sizeof(struct RfnKey), &hash);
798 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
803 diff_insert (struct DiffEntry *diff,
805 const struct GNUNET_SET_Element *element)
807 struct DiffElementInfo *di;
808 struct GNUNET_HashCode hash;
810 GNUNET_assert ((1 == weight) || (-1 == weight));
812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813 "diff_insert with element size %u\n",
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817 "hashing element\n");
819 GNUNET_SET_element_hash (element, &hash);
821 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
828 di = GNUNET_new (struct DiffElementInfo);
829 di->element = GNUNET_SET_element_dup (element);
830 GNUNET_assert (GNUNET_OK ==
831 GNUNET_CONTAINER_multihashmap_put (diff->changes,
833 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
841 rfn_commit (struct ReferendumEntry *rfn,
842 uint16_t commit_peer)
844 GNUNET_assert (commit_peer < rfn->num_peers);
846 rfn->peer_commited[commit_peer] = GNUNET_YES;
851 rfn_contest (struct ReferendumEntry *rfn,
852 uint16_t contested_peer)
854 GNUNET_assert (contested_peer < rfn->num_peers);
856 rfn->peer_contested[contested_peer] = GNUNET_YES;
861 rfn_noncontested (struct ReferendumEntry *rfn)
867 for (i = 0; i < rfn->num_peers; i++)
868 if ((GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO ==
869 rfn->peer_contested[i]))
877 rfn_vote (struct ReferendumEntry *rfn,
878 uint16_t voting_peer,
879 enum ReferendumVote vote,
880 const struct GNUNET_SET_Element *element)
882 struct RfnElementInfo *ri;
883 struct GNUNET_HashCode hash;
885 GNUNET_assert (voting_peer < rfn->num_peers);
887 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
888 since VOTE_KEEP is implicit in not voting. */
889 GNUNET_assert ((VOTE_ADD == vote) || (VOTE_REMOVE == vote));
891 GNUNET_SET_element_hash (element, &hash);
892 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
896 ri = GNUNET_new (struct RfnElementInfo);
897 ri->element = GNUNET_SET_element_dup (element);
898 ri->votes = GNUNET_new_array (rfn->num_peers, int);
899 GNUNET_assert (GNUNET_OK ==
900 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
902 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
905 ri->votes[voting_peer] = GNUNET_YES;
911 task_other_peer (struct TaskEntry *task)
913 uint16_t me = task->step->session->local_peer_idx;
915 if (task->key.peer1 == me)
916 return task->key.peer2;
917 return task->key.peer1;
922 cmp_uint64_t (const void *pa, const void *pb)
924 uint64_t a = *(uint64_t *) pa;
925 uint64_t b = *(uint64_t *) pb;
936 * Callback for set operation results. Called for each element
940 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
941 * @param current_size current set size
942 * @param status see enum GNUNET_SET_Status
945 set_result_cb (void *cls,
946 const struct GNUNET_SET_Element *element,
947 uint64_t current_size,
948 enum GNUNET_SET_Status status)
950 struct TaskEntry *task = cls;
951 struct ConsensusSession *session = task->step->session;
952 struct SetEntry *output_set = NULL;
953 struct DiffEntry *output_diff = NULL;
954 struct ReferendumEntry *output_rfn = NULL;
955 unsigned int other_idx;
956 struct SetOpCls *setop;
957 const struct ConsensusElement *consensus_element = NULL;
961 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962 "P%u: got element of type %u, status %u\n",
963 session->local_peer_idx,
964 (unsigned) element->element_type,
966 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT ==
967 element->element_type);
968 consensus_element = element->data;
971 setop = &task->cls.setop;
974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975 "P%u: got set result for {%s}, status %u\n",
976 session->local_peer_idx,
977 debug_str_task_key (&task->key),
980 if (GNUNET_NO == task->is_started)
986 if (GNUNET_YES == task->is_finished)
992 other_idx = task_other_peer (task);
994 if (SET_KIND_NONE != setop->output_set.set_kind)
996 output_set = lookup_set (session, &setop->output_set);
997 GNUNET_assert (NULL != output_set);
1000 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1002 output_diff = lookup_diff (session, &setop->output_diff);
1003 GNUNET_assert (NULL != output_diff);
1006 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1008 output_rfn = lookup_rfn (session, &setop->output_rfn);
1009 GNUNET_assert (NULL != output_rfn);
1012 if (GNUNET_YES == session->peers_blacklisted[other_idx])
1014 /* Peer might have been blacklisted
1015 by a gradecast running in parallel, ignore elements from now */
1016 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
1018 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
1022 if ((NULL != consensus_element) && (0 != consensus_element->marker))
1024 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1025 "P%u: got some marker\n",
1026 session->local_peer_idx);
1027 if ((GNUNET_YES == setop->transceive_contested) &&
1028 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker))
1030 GNUNET_assert (NULL != output_rfn);
1031 rfn_contest (output_rfn, task_other_peer (task));
1035 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "P%u: got size marker\n",
1039 session->local_peer_idx);
1042 struct ConsensusSizeElement *cse = (void *) consensus_element;
1044 if (cse->sender_index == other_idx)
1046 if (NULL == session->first_sizes_received)
1047 session->first_sizes_received = GNUNET_new_array (session->num_peers,
1049 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1051 uint64_t *copy = GNUNET_memdup (session->first_sizes_received,
1052 sizeof(uint64_t) * session->num_peers);
1053 qsort (copy, session->num_peers, sizeof(uint64_t), cmp_uint64_t);
1054 session->lower_bound = copy[session->num_peers / 3 + 1];
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056 "P%u: lower bound %llu\n",
1057 session->local_peer_idx,
1058 (long long) session->lower_bound);
1069 case GNUNET_SET_STATUS_ADD_LOCAL:
1070 GNUNET_assert (NULL != consensus_element);
1071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072 "Adding element in Task {%s}\n",
1073 debug_str_task_key (&task->key));
1074 if (NULL != output_set)
1076 // FIXME: record pending adds, use callback
1077 GNUNET_SET_add_element (output_set->h,
1081 #ifdef GNUNET_EXTRA_LOGGING
1082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083 "P%u: adding element %s into set {%s} of task {%s}\n",
1084 session->local_peer_idx,
1085 debug_str_element (element),
1086 debug_str_set_key (&setop->output_set),
1087 debug_str_task_key (&task->key));
1090 if (NULL != output_diff)
1092 diff_insert (output_diff, 1, element);
1093 #ifdef GNUNET_EXTRA_LOGGING
1094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095 "P%u: adding element %s into diff {%s} of task {%s}\n",
1096 session->local_peer_idx,
1097 debug_str_element (element),
1098 debug_str_diff_key (&setop->output_diff),
1099 debug_str_task_key (&task->key));
1102 if (NULL != output_rfn)
1104 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1105 #ifdef GNUNET_EXTRA_LOGGING
1106 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1108 session->local_peer_idx,
1109 debug_str_element (element),
1110 debug_str_rfn_key (&setop->output_rfn),
1111 debug_str_task_key (&task->key));
1114 // XXX: add result to structures in task
1117 case GNUNET_SET_STATUS_ADD_REMOTE:
1118 GNUNET_assert (NULL != consensus_element);
1119 if (GNUNET_YES == setop->do_not_remove)
1121 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124 "Removing element in Task {%s}\n",
1125 debug_str_task_key (&task->key));
1126 if (NULL != output_set)
1128 // FIXME: record pending adds, use callback
1129 GNUNET_SET_remove_element (output_set->h,
1133 #ifdef GNUNET_EXTRA_LOGGING
1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135 "P%u: removing element %s from set {%s} of task {%s}\n",
1136 session->local_peer_idx,
1137 debug_str_element (element),
1138 debug_str_set_key (&setop->output_set),
1139 debug_str_task_key (&task->key));
1142 if (NULL != output_diff)
1144 diff_insert (output_diff, -1, element);
1145 #ifdef GNUNET_EXTRA_LOGGING
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "P%u: removing element %s from diff {%s} of task {%s}\n",
1148 session->local_peer_idx,
1149 debug_str_element (element),
1150 debug_str_diff_key (&setop->output_diff),
1151 debug_str_task_key (&task->key));
1154 if (NULL != output_rfn)
1156 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1157 #ifdef GNUNET_EXTRA_LOGGING
1158 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1159 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1160 session->local_peer_idx,
1161 debug_str_element (element),
1162 debug_str_rfn_key (&setop->output_rfn),
1163 debug_str_task_key (&task->key));
1168 case GNUNET_SET_STATUS_DONE:
1169 // XXX: check first if any changes to the underlying
1170 // set are still pending
1171 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172 "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1173 session->local_peer_idx,
1174 debug_str_task_key (&task->key),
1175 (unsigned int) task->step->finished_tasks,
1176 (unsigned int) task->step->tasks_len);
1177 if (NULL != output_rfn)
1179 rfn_commit (output_rfn, task_other_peer (task));
1181 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1183 session->first_size = current_size;
1188 case GNUNET_SET_STATUS_FAILURE:
1190 GNUNET_break_op (0);
1213 enum EvilnessSubType
1216 EVILNESS_SUB_REPLACEMENT,
1217 EVILNESS_SUB_NO_REPLACEMENT,
1222 enum EvilnessType type;
1223 enum EvilnessSubType subtype;
1229 parse_evilness_cram_subtype (const char *evil_subtype_str, struct
1232 if (0 == strcmp ("replace", evil_subtype_str))
1234 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1236 else if (0 == strcmp ("noreplace", evil_subtype_str))
1238 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1242 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1243 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1245 return GNUNET_SYSERR;
1252 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1256 char *evil_type_str = NULL;
1257 char *evil_subtype_str = NULL;
1259 GNUNET_assert (NULL != evil);
1261 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus",
1265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266 "P%u: no evilness\n",
1267 session->local_peer_idx);
1268 evil->type = EVILNESS_NONE;
1271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1272 "P%u: got evilness spec\n",
1273 session->local_peer_idx);
1275 for (field = strtok (evil_spec, "/");
1277 field = strtok (NULL, "/"))
1279 unsigned int peer_num;
1280 unsigned int evil_num;
1283 evil_type_str = NULL;
1284 evil_subtype_str = NULL;
1286 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str,
1287 &evil_subtype_str, &evil_num);
1291 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1292 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1298 GNUNET_assert (NULL != evil_type_str);
1299 GNUNET_assert (NULL != evil_subtype_str);
1301 if (peer_num == session->local_peer_idx)
1303 if (0 == strcmp ("slack", evil_type_str))
1305 evil->type = EVILNESS_SLACK;
1307 if (0 == strcmp ("slack-a2a", evil_type_str))
1309 evil->type = EVILNESS_SLACK_A2A;
1311 else if (0 == strcmp ("cram-all", evil_type_str))
1313 evil->type = EVILNESS_CRAM_ALL;
1314 evil->num = evil_num;
1315 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1318 else if (0 == strcmp ("cram-lead", evil_type_str))
1320 evil->type = EVILNESS_CRAM_LEAD;
1321 evil->num = evil_num;
1322 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1325 else if (0 == strcmp ("cram-echo", evil_type_str))
1327 evil->type = EVILNESS_CRAM_ECHO;
1328 evil->num = evil_num;
1329 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1334 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1335 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1341 /* No GNUNET_free since memory was allocated by libc */
1342 free (evil_type_str);
1343 evil_type_str = NULL;
1344 evil_subtype_str = NULL;
1347 evil->type = EVILNESS_NONE;
1349 GNUNET_free (evil_spec);
1350 /* no GNUNET_free_non_null since it wasn't
1351 * allocated with GNUNET_malloc */
1352 if (NULL != evil_type_str)
1353 free (evil_type_str);
1354 if (NULL != evil_subtype_str)
1355 free (evil_subtype_str);
1363 * Commit the appropriate set for a
1367 commit_set (struct ConsensusSession *session,
1368 struct TaskEntry *task)
1370 struct SetEntry *set;
1371 struct SetOpCls *setop = &task->cls.setop;
1373 GNUNET_assert (NULL != setop->op);
1374 set = lookup_set (session, &setop->input_set);
1375 GNUNET_assert (NULL != set);
1377 if ((GNUNET_YES == setop->transceive_contested) && (GNUNET_YES ==
1380 struct GNUNET_SET_Element element;
1381 struct ConsensusElement ce = { 0 };
1382 ce.marker = CONSENSUS_MARKER_CONTESTED;
1384 element.size = sizeof(struct ConsensusElement);
1385 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1386 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1389 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1391 struct GNUNET_SET_Element element;
1392 struct ConsensusSizeElement cse = {
1396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1397 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1398 cse.size = GNUNET_htonll (session->first_size);
1399 cse.sender_index = session->local_peer_idx;
1400 element.data = &cse;
1401 element.size = sizeof(struct ConsensusSizeElement);
1402 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1403 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1409 struct Evilness evil;
1411 get_evilness (session, &evil);
1412 if (EVILNESS_NONE != evil.type)
1414 /* Useful for evaluation */
1415 GNUNET_STATISTICS_set (statistics,
1422 case EVILNESS_CRAM_ALL:
1423 case EVILNESS_CRAM_LEAD:
1424 case EVILNESS_CRAM_ECHO:
1425 /* We're not cramming elements in the
1426 all-to-all round, since that would just
1427 add more elements to the result set, but
1428 wouldn't test robustness. */
1429 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1431 GNUNET_SET_commit (setop->op, set->h);
1434 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1435 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) ||
1436 (SET_KIND_CURRENT != set->key.set_kind) ))
1438 GNUNET_SET_commit (setop->op, set->h);
1441 if ((EVILNESS_CRAM_ECHO == evil.type) && (PHASE_KIND_GRADECAST_ECHO !=
1444 GNUNET_SET_commit (setop->op, set->h);
1447 for (i = 0; i < evil.num; i++)
1449 struct GNUNET_SET_Element element;
1450 struct ConsensusStuffedElement se = {
1451 .ce.payload_type = 0,
1455 element.size = sizeof(struct ConsensusStuffedElement);
1456 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1458 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1460 /* Always generate a new element. */
1461 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
1464 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1466 /* Always cram the same elements, derived from counter. */
1467 GNUNET_CRYPTO_hash (&i, sizeof(i), &se.rand);
1473 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1474 #ifdef GNUNET_EXTRA_LOGGING
1475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1476 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1477 session->local_peer_idx,
1478 debug_str_element (&element),
1479 debug_str_set_key (&setop->input_set),
1480 debug_str_task_key (&task->key));
1483 GNUNET_STATISTICS_update (statistics,
1484 "# stuffed elements",
1487 GNUNET_SET_commit (setop->op, set->h);
1490 case EVILNESS_SLACK:
1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492 "P%u: evil peer: slacking\n",
1493 (unsigned int) session->local_peer_idx);
1496 case EVILNESS_SLACK_A2A:
1497 if ((PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) ||
1498 (PHASE_KIND_ALL_TO_ALL == task->key.kind))
1500 struct GNUNET_SET_Handle *empty_set;
1501 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1502 GNUNET_SET_commit (setop->op, empty_set);
1503 GNUNET_SET_destroy (empty_set);
1507 GNUNET_SET_commit (setop->op, set->h);
1512 GNUNET_SET_commit (setop->op, set->h);
1517 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1519 GNUNET_SET_commit (setop->op, set->h);
1523 /* For our testcases, we don't want the blacklisted
1525 GNUNET_SET_operation_cancel (setop->op);
1534 put_diff (struct ConsensusSession *session,
1535 struct DiffEntry *diff)
1537 struct GNUNET_HashCode hash;
1539 GNUNET_assert (NULL != diff);
1541 GNUNET_CRYPTO_hash (&diff->key, sizeof(struct DiffKey), &hash);
1542 GNUNET_assert (GNUNET_OK ==
1543 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash,
1545 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1550 put_set (struct ConsensusSession *session,
1551 struct SetEntry *set)
1553 struct GNUNET_HashCode hash;
1555 GNUNET_assert (NULL != set->h);
1557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1559 debug_str_set_key (&set->key));
1561 GNUNET_CRYPTO_hash (&set->key, sizeof(struct SetKey), &hash);
1562 GNUNET_assert (GNUNET_SYSERR !=
1563 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1564 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1569 put_rfn (struct ConsensusSession *session,
1570 struct ReferendumEntry *rfn)
1572 struct GNUNET_HashCode hash;
1574 GNUNET_CRYPTO_hash (&rfn->key, sizeof(struct RfnKey), &hash);
1575 GNUNET_assert (GNUNET_OK ==
1576 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1577 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1582 task_cancel_reconcile (struct TaskEntry *task)
1584 /* not implemented yet */
1590 apply_diff_to_rfn (struct DiffEntry *diff,
1591 struct ReferendumEntry *rfn,
1592 uint16_t voting_peer,
1595 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1596 struct DiffElementInfo *di;
1598 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1600 while (GNUNET_YES ==
1601 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1603 (const void **) &di))
1607 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1611 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1615 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1622 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1624 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1631 diff_compose (struct DiffEntry *diff_1,
1632 struct DiffEntry *diff_2)
1634 struct DiffEntry *diff_new;
1635 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1636 struct DiffElementInfo *di;
1638 diff_new = diff_create ();
1640 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1641 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL,
1646 diff_insert (diff_new, di->weight, di->element);
1648 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1650 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1651 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL,
1656 diff_insert (diff_new, di->weight, di->element);
1658 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1664 struct ReferendumEntry *
1665 rfn_create (uint16_t size)
1667 struct ReferendumEntry *rfn;
1669 rfn = GNUNET_new (struct ReferendumEntry);
1670 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1671 rfn->peer_commited = GNUNET_new_array (size, int);
1672 rfn->peer_contested = GNUNET_new_array (size, int);
1673 rfn->num_peers = size;
1681 diff_destroy (struct DiffEntry *diff)
1683 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1692 * For a given majority, count what the outcome
1693 * is (add/remove/keep), and give the number
1694 * of peers that voted for this outcome.
1697 rfn_majority (const struct ReferendumEntry *rfn,
1698 const struct RfnElementInfo *ri,
1699 uint16_t *ret_majority,
1700 enum ReferendumVote *ret_vote)
1702 uint16_t votes_yes = 0;
1703 uint16_t num_commited = 0;
1706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707 "Computing rfn majority for element %s of rfn {%s}\n",
1708 debug_str_element (ri->element),
1709 debug_str_rfn_key (&rfn->key));
1711 for (i = 0; i < rfn->num_peers; i++)
1713 if (GNUNET_NO == rfn->peer_commited[i])
1717 if (GNUNET_YES == ri->votes[i])
1721 if (votes_yes > (num_commited) / 2)
1723 *ret_vote = ri->proposal;
1724 *ret_majority = votes_yes;
1728 *ret_vote = VOTE_STAY;
1729 *ret_majority = num_commited - votes_yes;
1736 struct TaskEntry *task;
1737 struct SetKey dst_set_key;
1742 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1744 struct SetCopyCls *scc = cls;
1745 struct TaskEntry *task = scc->task;
1746 struct SetKey dst_set_key = scc->dst_set_key;
1747 struct SetEntry *set;
1748 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1751 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1752 task->step->session->set_handles_tail,
1756 set = GNUNET_new (struct SetEntry);
1758 set->key = dst_set_key;
1759 put_set (task->step->session, set);
1766 * Call the start function of the given
1767 * task again after we created a copy of the given set.
1770 create_set_copy_for_task (struct TaskEntry *task,
1771 struct SetKey *src_set_key,
1772 struct SetKey *dst_set_key)
1774 struct SetEntry *src_set;
1775 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778 "Copying set {%s} to {%s} for task {%s}\n",
1779 debug_str_set_key (src_set_key),
1780 debug_str_set_key (dst_set_key),
1781 debug_str_task_key (&task->key));
1784 scc->dst_set_key = *dst_set_key;
1785 src_set = lookup_set (task->step->session, src_set_key);
1786 GNUNET_assert (NULL != src_set);
1787 GNUNET_SET_copy_lazy (src_set->h,
1793 struct SetMutationProgressCls
1797 * Task to finish once all changes are through.
1799 struct TaskEntry *task;
1804 set_mutation_done (void *cls)
1806 struct SetMutationProgressCls *pc = cls;
1808 GNUNET_assert (pc->num_pending > 0);
1812 if (0 == pc->num_pending)
1814 struct TaskEntry *task = pc->task;
1822 try_finish_step_early (struct Step *step)
1826 if (GNUNET_YES == step->is_running)
1828 if (GNUNET_YES == step->is_finished)
1830 if (GNUNET_NO == step->early_finishable)
1833 step->is_finished = GNUNET_YES;
1835 #ifdef GNUNET_EXTRA_LOGGING
1836 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837 "Finishing step `%s' early.\n",
1841 for (i = 0; i < step->subordinates_len; i++)
1843 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1844 step->subordinates[i]->pending_prereq--;
1845 #ifdef GNUNET_EXTRA_LOGGING
1846 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1847 "Decreased pending_prereq to %u for step `%s'.\n",
1848 (unsigned int) step->subordinates[i]->pending_prereq,
1849 step->subordinates[i]->debug_name);
1851 try_finish_step_early (step->subordinates[i]);
1854 // XXX: maybe schedule as task to avoid recursion?
1855 run_ready_steps (step->session);
1860 finish_step (struct Step *step)
1864 GNUNET_assert (step->finished_tasks == step->tasks_len);
1865 GNUNET_assert (GNUNET_YES == step->is_running);
1866 GNUNET_assert (GNUNET_NO == step->is_finished);
1868 #ifdef GNUNET_EXTRA_LOGGING
1869 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1870 "All tasks of step `%s' with %u subordinates finished.\n",
1872 step->subordinates_len);
1875 for (i = 0; i < step->subordinates_len; i++)
1877 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1878 step->subordinates[i]->pending_prereq--;
1879 #ifdef GNUNET_EXTRA_LOGGING
1880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1881 "Decreased pending_prereq to %u for step `%s'.\n",
1882 (unsigned int) step->subordinates[i]->pending_prereq,
1883 step->subordinates[i]->debug_name);
1887 step->is_finished = GNUNET_YES;
1889 // XXX: maybe schedule as task to avoid recursion?
1890 run_ready_steps (step->session);
1895 * Apply the result from one round of gradecasts (i.e. every peer
1896 * should have gradecasted) to the peer's current set.
1898 * @param task the task with context information
1901 task_start_apply_round (struct TaskEntry *task)
1903 struct ConsensusSession *session = task->step->session;
1904 struct SetKey sk_in;
1905 struct SetKey sk_out;
1906 struct RfnKey rk_in;
1907 struct SetEntry *set_out;
1908 struct ReferendumEntry *rfn_in;
1909 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1910 struct RfnElementInfo *ri;
1911 struct SetMutationProgressCls *progress_cls;
1912 uint16_t worst_majority = UINT16_MAX;
1914 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1915 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1916 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1918 set_out = lookup_set (session, &sk_out);
1919 if (NULL == set_out)
1921 create_set_copy_for_task (task, &sk_in, &sk_out);
1925 rfn_in = lookup_rfn (session, &rk_in);
1926 GNUNET_assert (NULL != rfn_in);
1928 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1929 progress_cls->task = task;
1931 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1933 while (GNUNET_YES ==
1934 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1936 (const void **) &ri))
1938 uint16_t majority_num;
1939 enum ReferendumVote majority_vote;
1941 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1943 if (worst_majority > majority_num)
1944 worst_majority = majority_num;
1946 switch (majority_vote)
1949 progress_cls->num_pending++;
1950 GNUNET_assert (GNUNET_OK ==
1951 GNUNET_SET_add_element (set_out->h,
1955 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956 "P%u: apply round: adding element %s with %u-majority.\n",
1957 session->local_peer_idx,
1958 debug_str_element (ri->element), majority_num);
1962 progress_cls->num_pending++;
1963 GNUNET_assert (GNUNET_OK ==
1964 GNUNET_SET_remove_element (set_out->h,
1968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1969 "P%u: apply round: deleting element %s with %u-majority.\n",
1970 session->local_peer_idx,
1971 debug_str_element (ri->element), majority_num);
1975 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976 "P%u: apply round: keeping element %s with %u-majority.\n",
1977 session->local_peer_idx,
1978 debug_str_element (ri->element), majority_num);
1988 if (0 == progress_cls->num_pending)
1990 // call closure right now, no pending ops
1991 GNUNET_free (progress_cls);
1996 uint16_t thresh = (session->num_peers / 3) * 2;
1998 if (worst_majority >= thresh)
2000 switch (session->early_stopping)
2002 case EARLY_STOPPING_NONE:
2003 session->early_stopping = EARLY_STOPPING_ONE_MORE;
2004 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2005 "P%u: Stopping early (after one more superround)\n",
2006 session->local_peer_idx);
2009 case EARLY_STOPPING_ONE_MORE:
2010 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2011 "P%u: finishing steps due to early finish\n",
2012 session->local_peer_idx);
2013 session->early_stopping = EARLY_STOPPING_DONE;
2016 for (step = session->steps_head; NULL != step; step = step->next)
2017 try_finish_step_early (step);
2021 case EARLY_STOPPING_DONE:
2022 /* We shouldn't be here anymore after early stopping */
2031 else if (EARLY_STOPPING_NONE != session->early_stopping)
2033 // Our assumption about the number of bad peers
2035 GNUNET_break_op (0);
2039 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2040 "P%u: NOT finishing early (majority not good enough)\n",
2041 session->local_peer_idx);
2044 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2049 task_start_grade (struct TaskEntry *task)
2051 struct ConsensusSession *session = task->step->session;
2052 struct ReferendumEntry *output_rfn;
2053 struct ReferendumEntry *input_rfn;
2054 struct DiffEntry *input_diff;
2055 struct RfnKey rfn_key;
2056 struct DiffKey diff_key;
2057 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2058 struct RfnElementInfo *ri;
2059 unsigned int gradecast_confidence = 2;
2061 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
2062 output_rfn = lookup_rfn (session, &rfn_key);
2063 if (NULL == output_rfn)
2065 output_rfn = rfn_create (session->num_peers);
2066 output_rfn->key = rfn_key;
2067 put_rfn (session, output_rfn);
2070 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition,
2072 input_diff = lookup_diff (session, &diff_key);
2073 GNUNET_assert (NULL != input_diff);
2075 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2077 input_rfn = lookup_rfn (session, &rfn_key);
2078 GNUNET_assert (NULL != input_rfn);
2080 iter = GNUNET_CONTAINER_multihashmap_iterator_create (
2081 input_rfn->rfn_elements);
2083 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader,
2084 session->num_peers);
2086 while (GNUNET_YES ==
2087 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2089 (const void **) &ri))
2091 uint16_t majority_num;
2092 enum ReferendumVote majority_vote;
2094 // XXX: we need contested votes and non-contested votes here
2095 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2097 if (majority_num <= session->num_peers / 3)
2098 majority_vote = VOTE_REMOVE;
2100 switch (majority_vote)
2106 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2110 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2118 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2121 uint16_t noncontested;
2122 noncontested = rfn_noncontested (input_rfn);
2123 if (noncontested < (session->num_peers / 3) * 2)
2125 gradecast_confidence = GNUNET_MIN (1, gradecast_confidence);
2127 if (noncontested < (session->num_peers / 3) + 1)
2129 gradecast_confidence = 0;
2133 if (gradecast_confidence >= 1)
2134 rfn_commit (output_rfn, task->key.leader);
2136 if (gradecast_confidence <= 1)
2137 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2144 task_start_reconcile (struct TaskEntry *task)
2146 struct SetEntry *input;
2147 struct SetOpCls *setop = &task->cls.setop;
2148 struct ConsensusSession *session = task->step->session;
2150 input = lookup_set (session, &setop->input_set);
2151 GNUNET_assert (NULL != input);
2152 GNUNET_assert (NULL != input->h);
2154 /* We create the outputs for the operation here
2155 (rather than in the set operation callback)
2156 because we want something valid in there, even
2157 if the other peer doesn't talk to us */
2159 if (SET_KIND_NONE != setop->output_set.set_kind)
2161 /* If we don't have an existing output set,
2162 we clone the input set. */
2163 if (NULL == lookup_set (session, &setop->output_set))
2165 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2170 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2172 if (NULL == lookup_rfn (session, &setop->output_rfn))
2174 struct ReferendumEntry *rfn;
2176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2177 "P%u: output rfn <%s> missing, creating.\n",
2178 session->local_peer_idx,
2179 debug_str_rfn_key (&setop->output_rfn));
2181 rfn = rfn_create (session->num_peers);
2182 rfn->key = setop->output_rfn;
2183 put_rfn (session, rfn);
2187 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2189 if (NULL == lookup_diff (session, &setop->output_diff))
2191 struct DiffEntry *diff;
2193 diff = diff_create ();
2194 diff->key = setop->output_diff;
2195 put_diff (session, diff);
2199 if ((task->key.peer1 == session->local_peer_idx) && (task->key.peer2 ==
2200 session->local_peer_idx))
2202 /* XXX: mark the corresponding rfn as commited if necessary */
2207 if (task->key.peer1 == session->local_peer_idx)
2209 struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2212 "P%u: Looking up set {%s} to run remote union\n",
2213 session->local_peer_idx,
2214 debug_str_set_key (&setop->input_set));
2216 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2217 rcm.header.size = htons (sizeof(struct
2218 GNUNET_CONSENSUS_RoundContextMessage));
2220 rcm.kind = htons (task->key.kind);
2221 rcm.peer1 = htons (task->key.peer1);
2222 rcm.peer2 = htons (task->key.peer2);
2223 rcm.leader = htons (task->key.leader);
2224 rcm.repetition = htons (task->key.repetition);
2225 rcm.is_contested = htons (0);
2227 GNUNET_assert (NULL == setop->op);
2228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2229 "P%u: initiating set op with P%u, our set is %s\n",
2230 session->local_peer_idx, task->key.peer2, debug_str_set_key (
2231 &setop->input_set));
2233 struct GNUNET_SET_Option opts[] = {
2234 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2235 { GNUNET_SET_OPTION_END },
2238 // XXX: maybe this should be done while
2239 // setting up tasks alreays?
2240 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2241 &session->global_id,
2243 GNUNET_SET_RESULT_SYMMETRIC,
2248 commit_set (session, task);
2250 else if (task->key.peer2 == session->local_peer_idx)
2252 /* Wait for the other peer to contact us */
2253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2254 session->local_peer_idx, task->key.peer1);
2256 if (NULL != setop->op)
2258 commit_set (session, task);
2263 /* We made an error while constructing the task graph. */
2270 task_start_eval_echo (struct TaskEntry *task)
2272 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2273 struct ReferendumEntry *input_rfn;
2274 struct RfnElementInfo *ri;
2275 struct SetEntry *output_set;
2276 struct SetMutationProgressCls *progress_cls;
2277 struct ConsensusSession *session = task->step->session;
2278 struct SetKey sk_in;
2279 struct SetKey sk_out;
2280 struct RfnKey rk_in;
2282 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition,
2284 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition,
2286 output_set = lookup_set (session, &sk_out);
2287 if (NULL == output_set)
2289 create_set_copy_for_task (task, &sk_in, &sk_out);
2295 // FIXME: should be marked as a shallow copy, so
2296 // we can destroy everything correctly
2297 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2298 last_set->h = output_set->h;
2299 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2300 put_set (session, last_set);
2303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2304 "Evaluating referendum in Task {%s}\n",
2305 debug_str_task_key (&task->key));
2307 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2308 progress_cls->task = task;
2310 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2312 input_rfn = lookup_rfn (session, &rk_in);
2314 GNUNET_assert (NULL != input_rfn);
2316 iter = GNUNET_CONTAINER_multihashmap_iterator_create (
2317 input_rfn->rfn_elements);
2318 GNUNET_assert (NULL != iter);
2320 while (GNUNET_YES ==
2321 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2323 (const void **) &ri))
2325 enum ReferendumVote majority_vote;
2326 uint16_t majority_num;
2328 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2330 if (majority_num < session->num_peers / 3)
2332 /* It is not the case that all nonfaulty peers
2333 echoed the same value. Since we're doing a set reconciliation, we
2334 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2335 reconciliation as contested. Other peers might not know that the
2336 leader is faulty, thus we still re-distribute in the confirmation
2337 round. */output_set->is_contested = GNUNET_YES;
2340 switch (majority_vote)
2343 progress_cls->num_pending++;
2344 GNUNET_assert (GNUNET_OK ==
2345 GNUNET_SET_add_element (output_set->h,
2352 progress_cls->num_pending++;
2353 GNUNET_assert (GNUNET_OK ==
2354 GNUNET_SET_remove_element (output_set->h,
2361 /* Nothing to do. */
2370 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2372 if (0 == progress_cls->num_pending)
2374 // call closure right now, no pending ops
2375 GNUNET_free (progress_cls);
2382 task_start_finish (struct TaskEntry *task)
2384 struct SetEntry *final_set;
2385 struct ConsensusSession *session = task->step->session;
2387 final_set = lookup_set (session, &task->cls.finish.input_set);
2389 GNUNET_assert (NULL != final_set);
2392 GNUNET_SET_iterate (final_set->h,
2393 send_to_client_iter,
2399 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2401 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n",
2402 session->local_peer_idx, debug_str_task_key (&task->key));
2404 GNUNET_assert (GNUNET_NO == task->is_started);
2405 GNUNET_assert (GNUNET_NO == task->is_finished);
2406 GNUNET_assert (NULL != task->start);
2410 task->is_started = GNUNET_YES;
2415 * Run all steps of the session that don't any
2416 * more dependencies.
2419 run_ready_steps (struct ConsensusSession *session)
2423 step = session->steps_head;
2425 while (NULL != step)
2427 if ((GNUNET_NO == step->is_running) && (0 == step->pending_prereq) &&
2428 (GNUNET_NO == step->is_finished))
2432 GNUNET_assert (0 == step->finished_tasks);
2434 #ifdef GNUNET_EXTRA_LOGGING
2435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2436 "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2437 session->local_peer_idx,
2439 step->round, step->tasks_len, step->subordinates_len);
2442 step->is_running = GNUNET_YES;
2443 for (i = 0; i < step->tasks_len; i++)
2444 start_task (session, step->tasks[i]);
2446 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2447 if ((step->finished_tasks == step->tasks_len) && (GNUNET_NO ==
2451 /* Running the next ready steps will be triggered by task completion */
2462 finish_task (struct TaskEntry *task)
2464 GNUNET_assert (GNUNET_NO == task->is_finished);
2465 task->is_finished = GNUNET_YES;
2467 task->step->finished_tasks++;
2469 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2470 "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2471 task->step->session->local_peer_idx,
2472 debug_str_task_key (&task->key),
2473 (unsigned int) task->step->finished_tasks,
2474 (unsigned int) task->step->tasks_len);
2476 if (task->step->finished_tasks == task->step->tasks_len)
2477 finish_step (task->step);
2482 * Search peer in the list of peers in session.
2484 * @param peer peer to find
2485 * @param session session with peer
2486 * @return index of peer, -1 if peer is not in session
2489 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct
2490 ConsensusSession *session)
2494 for (i = 0; i < session->num_peers; i++)
2495 if (0 == GNUNET_memcmp (peer, &session->peers[i]))
2502 * Compute a global, (hopefully) unique consensus session id,
2503 * from the local id of the consensus session, and the identities of all participants.
2504 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2505 * exactly the same peers, the global id will be different.
2507 * @param session session to generate the global id for
2508 * @param local_session_id local id of the consensus session
2511 compute_global_id (struct ConsensusSession *session,
2512 const struct GNUNET_HashCode *local_session_id)
2514 const char *salt = "gnunet-service-consensus/session_id";
2516 GNUNET_assert (GNUNET_YES ==
2517 GNUNET_CRYPTO_kdf (&session->global_id,
2518 sizeof(struct GNUNET_HashCode),
2522 session->num_peers * sizeof(struct
2523 GNUNET_PeerIdentity),
2525 sizeof(struct GNUNET_HashCode),
2531 * Compare two peer identities.
2533 * @param h1 some peer identity
2534 * @param h2 some peer identity
2535 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2538 peer_id_cmp (const void *h1, const void *h2)
2540 return memcmp (h1, h2, sizeof(struct GNUNET_PeerIdentity));
2545 * Create the sorted list of peers for the session,
2546 * add the local peer if not in the join message.
2548 * @param session session to initialize
2549 * @param join_msg join message with the list of peers participating at the end
2552 initialize_session_peer_list (struct ConsensusSession *session,
2554 GNUNET_CONSENSUS_JoinMessage *join_msg)
2556 const struct GNUNET_PeerIdentity *msg_peers
2557 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2558 int local_peer_in_list;
2560 session->num_peers = ntohl (join_msg->num_peers);
2562 /* Peers in the join message, may or may not include the local peer,
2563 Add it if it is missing. */
2564 local_peer_in_list = GNUNET_NO;
2565 for (unsigned int i = 0; i < session->num_peers; i++)
2567 if (0 == GNUNET_memcmp (&msg_peers[i],
2570 local_peer_in_list = GNUNET_YES;
2574 if (GNUNET_NO == local_peer_in_list)
2575 session->num_peers++;
2577 session->peers = GNUNET_new_array (session->num_peers,
2578 struct GNUNET_PeerIdentity);
2579 if (GNUNET_NO == local_peer_in_list)
2580 session->peers[session->num_peers - 1] = my_peer;
2582 GNUNET_memcpy (session->peers,
2584 ntohl (join_msg->num_peers) * sizeof(struct
2585 GNUNET_PeerIdentity));
2586 qsort (session->peers,
2588 sizeof(struct GNUNET_PeerIdentity),
2593 static struct TaskEntry *
2594 lookup_task (struct ConsensusSession *session,
2595 struct TaskKey *key)
2597 struct GNUNET_HashCode hash;
2600 GNUNET_CRYPTO_hash (key, sizeof(struct TaskKey), &hash);
2601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2602 GNUNET_h2s (&hash));
2603 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2608 * Called when another peer wants to do a set operation with the
2611 * @param cls closure
2612 * @param other_peer the other peer
2613 * @param context_msg message with application specific information from
2615 * @param request request from the other peer, use GNUNET_SET_accept
2616 * to accept it, otherwise the request will be refused
2617 * Note that we don't use a return value here, as it is also
2618 * necessary to specify the set we want to do the operation with,
2619 * whith sometimes can be derived from the context message.
2620 * Also necessary to specify the timeout.
2623 set_listen_cb (void *cls,
2624 const struct GNUNET_PeerIdentity *other_peer,
2625 const struct GNUNET_MessageHeader *context_msg,
2626 struct GNUNET_SET_Request *request)
2628 struct ConsensusSession *session = cls;
2630 struct TaskEntry *task;
2631 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2633 if (NULL == context_msg)
2635 GNUNET_break_op (0);
2639 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (
2642 GNUNET_break_op (0);
2646 if (sizeof(struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (
2649 GNUNET_break_op (0);
2653 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2655 tk = ((struct TaskKey) {
2656 .kind = ntohs (cm->kind),
2657 .peer1 = ntohs (cm->peer1),
2658 .peer2 = ntohs (cm->peer2),
2659 .repetition = ntohs (cm->repetition),
2660 .leader = ntohs (cm->leader),
2663 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2664 session->local_peer_idx, debug_str_task_key (&tk));
2666 task = lookup_task (session, &tk);
2670 GNUNET_break_op (0);
2674 if (GNUNET_YES == task->is_finished)
2676 GNUNET_break_op (0);
2680 if (task->key.peer2 != session->local_peer_idx)
2682 /* We're being asked, so we must be thne 2nd peer. */
2683 GNUNET_break_op (0);
2687 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2688 (task->key.peer2 == session->local_peer_idx)));
2690 struct GNUNET_SET_Option opts[] = {
2691 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2692 { GNUNET_SET_OPTION_END },
2695 task->cls.setop.op = GNUNET_SET_accept (request,
2696 GNUNET_SET_RESULT_SYMMETRIC,
2701 /* If the task hasn't been started yet,
2702 we wait for that until we commit. */
2704 if (GNUNET_YES == task->is_started)
2706 commit_set (session, task);
2712 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2713 struct TaskEntry *t)
2715 struct GNUNET_HashCode round_hash;
2718 GNUNET_assert (NULL != t->step);
2720 t = GNUNET_memdup (t, sizeof(struct TaskEntry));
2724 if (s->tasks_len == s->tasks_cap)
2726 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2727 GNUNET_array_grow (s->tasks,
2732 #ifdef GNUNET_EXTRA_LOGGING
2733 GNUNET_assert (NULL != s->debug_name);
2734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2735 debug_str_task_key (&t->key),
2739 s->tasks[s->tasks_len] = t;
2742 GNUNET_CRYPTO_hash (&t->key, sizeof(struct TaskKey), &round_hash);
2743 GNUNET_assert (GNUNET_OK ==
2744 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2745 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2750 install_step_timeouts (struct ConsensusSession *session)
2752 /* Given the fully constructed task graph
2753 with rounds for tasks, we can give the tasks timeouts. */
2755 // unsigned int max_round;
2757 /* XXX: implement! */
2762 * Arrange two peers in some canonical order.
2765 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2770 GNUNET_assert (*p1 < n);
2771 GNUNET_assert (*p2 < n);
2784 /* For uniformly random *p1, *p2,
2785 this condition is true with 50% chance */
2786 if (((b - a) + n) % n <= n / 2)
2800 * Record @a dep as a dependency of @a step.
2803 step_depend_on (struct Step *step, struct Step *dep)
2805 /* We're not checking for cyclic dependencies,
2806 but this is a cheap sanity check. */
2807 GNUNET_assert (step != dep);
2808 GNUNET_assert (NULL != step);
2809 GNUNET_assert (NULL != dep);
2810 GNUNET_assert (dep->round <= step->round);
2812 #ifdef GNUNET_EXTRA_LOGGING
2813 /* Make sure we have complete debugging information.
2814 Also checks that we don't screw up too badly
2815 constructing the task graph. */
2816 GNUNET_assert (NULL != step->debug_name);
2817 GNUNET_assert (NULL != dep->debug_name);
2818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2819 "Making step `%s' depend on `%s'\n",
2824 if (dep->subordinates_cap == dep->subordinates_len)
2826 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2827 GNUNET_array_grow (dep->subordinates,
2828 dep->subordinates_cap,
2832 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2834 dep->subordinates[dep->subordinates_len] = step;
2835 dep->subordinates_len++;
2837 step->pending_prereq++;
2841 static struct Step *
2842 create_step (struct ConsensusSession *session, int round, int early_finishable)
2846 step = GNUNET_new (struct Step);
2847 step->session = session;
2848 step->round = round;
2849 step->early_finishable = early_finishable;
2850 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2851 session->steps_tail,
2858 * Construct the task graph for a single
2862 construct_task_graph_gradecast (struct ConsensusSession *session,
2865 struct Step *step_before,
2866 struct Step *step_after)
2868 uint16_t n = session->num_peers;
2869 uint16_t me = session->local_peer_idx;
2874 /* The task we're currently setting up. */
2875 struct TaskEntry task;
2878 struct Step *prev_step;
2884 round = step_before->round + 1;
2886 /* gcast step 1: leader disseminates */
2888 step = create_step (session, round, GNUNET_YES);
2890 #ifdef GNUNET_EXTRA_LOGGING
2891 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead,
2894 step_depend_on (step, step_before);
2898 for (k = 0; k < n; k++)
2904 arrange_peers (&p1, &p2, n);
2905 task = ((struct TaskEntry) {
2907 .start = task_start_reconcile,
2908 .cancel = task_cancel_reconcile,
2909 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2912 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2913 put_task (session->taskmap, &task);
2915 /* We run this task to make sure that the leader
2916 has the stored the SET_KIND_LEADER set of himself,
2917 so it can participate in the rest of the gradecast
2918 without the code having to handle any special cases. */
2919 task = ((struct TaskEntry) {
2921 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2922 .start = task_start_reconcile,
2923 .cancel = task_cancel_reconcile,
2925 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2926 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2928 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
2930 put_task (session->taskmap, &task);
2936 arrange_peers (&p1, &p2, n);
2937 task = ((struct TaskEntry) {
2939 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2941 .start = task_start_reconcile,
2942 .cancel = task_cancel_reconcile,
2944 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2945 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2947 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL,
2949 put_task (session->taskmap, &task);
2952 /* gcast phase 2: echo */
2955 step = create_step (session, round, GNUNET_YES);
2956 #ifdef GNUNET_EXTRA_LOGGING
2957 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2959 step_depend_on (step, prev_step);
2961 for (k = 0; k < n; k++)
2965 arrange_peers (&p1, &p2, n);
2966 task = ((struct TaskEntry) {
2968 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2969 .start = task_start_reconcile,
2970 .cancel = task_cancel_reconcile,
2972 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2974 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2975 put_task (session->taskmap, &task);
2979 /* Same round, since step only has local tasks */
2980 step = create_step (session, round, GNUNET_YES);
2981 #ifdef GNUNET_EXTRA_LOGGING
2982 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2984 step_depend_on (step, prev_step);
2986 arrange_peers (&p1, &p2, n);
2987 task = ((struct TaskEntry) {
2988 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep,
2991 .start = task_start_eval_echo
2993 put_task (session->taskmap, &task);
2997 step = create_step (session, round, GNUNET_YES);
2998 #ifdef GNUNET_EXTRA_LOGGING
2999 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
3001 step_depend_on (step, prev_step);
3003 /* gcast phase 3: confirmation and grading */
3004 for (k = 0; k < n; k++)
3008 arrange_peers (&p1, &p2, n);
3009 task = ((struct TaskEntry) {
3011 .start = task_start_reconcile,
3012 .cancel = task_cancel_reconcile,
3013 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep,
3016 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep,
3018 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
3019 /* If there was at least one element in the echo round that was
3020 contested (i.e. it had no n-t majority), then we let the other peers
3021 know, and other peers let us know. The contested flag for each peer is
3022 stored in the rfn. */
3023 task.cls.setop.transceive_contested = GNUNET_YES;
3024 put_task (session->taskmap, &task);
3028 /* Same round, since step only has local tasks */
3029 step = create_step (session, round, GNUNET_YES);
3030 #ifdef GNUNET_EXTRA_LOGGING
3031 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead,
3034 step_depend_on (step, prev_step);
3036 task = ((struct TaskEntry) {
3038 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep,
3040 .start = task_start_grade,
3042 put_task (session->taskmap, &task);
3044 step_depend_on (step_after, step);
3049 construct_task_graph (struct ConsensusSession *session)
3051 uint16_t n = session->num_peers;
3054 uint16_t me = session->local_peer_idx;
3056 /* The task we're currently setting up. */
3057 struct TaskEntry task;
3059 /* Current leader */
3063 struct Step *prev_step;
3065 unsigned int round = 0;
3069 // XXX: introduce first step,
3070 // where we wait for all insert acks
3071 // from the set service
3073 /* faster but brittle all-to-all */
3075 // XXX: Not implemented yet
3077 /* all-to-all step */
3079 step = create_step (session, round, GNUNET_NO);
3081 #ifdef GNUNET_EXTRA_LOGGING
3082 step->debug_name = GNUNET_strdup ("all to all");
3085 for (i = 0; i < n; i++)
3092 arrange_peers (&p1, &p2, n);
3093 task = ((struct TaskEntry) {
3094 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
3096 .start = task_start_reconcile,
3097 .cancel = task_cancel_reconcile,
3099 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3100 task.cls.setop.output_set = task.cls.setop.input_set;
3101 task.cls.setop.do_not_remove = GNUNET_YES;
3102 put_task (session->taskmap, &task);
3107 step = create_step (session, round, GNUNET_NO);;
3108 #ifdef GNUNET_EXTRA_LOGGING
3109 step->debug_name = GNUNET_strdup ("all to all 2");
3111 step_depend_on (step, prev_step);
3114 for (i = 0; i < n; i++)
3121 arrange_peers (&p1, &p2, n);
3122 task = ((struct TaskEntry) {
3123 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3125 .start = task_start_reconcile,
3126 .cancel = task_cancel_reconcile,
3128 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3129 task.cls.setop.output_set = task.cls.setop.input_set;
3130 task.cls.setop.do_not_remove = GNUNET_YES;
3131 put_task (session->taskmap, &task);
3140 /* Byzantine union */
3142 /* sequential repetitions of the gradecasts */
3143 for (i = 0; i < t + 1; i++)
3145 struct Step *step_rep_start;
3146 struct Step *step_rep_end;
3148 /* Every repetition is in a separate round. */
3149 step_rep_start = create_step (session, round, GNUNET_YES);
3150 #ifdef GNUNET_EXTRA_LOGGING
3151 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3154 step_depend_on (step_rep_start, prev_step);
3156 /* gradecast has three rounds */
3158 step_rep_end = create_step (session, round, GNUNET_YES);
3159 #ifdef GNUNET_EXTRA_LOGGING
3160 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3163 /* parallel gradecasts */
3164 for (lead = 0; lead < n; lead++)
3165 construct_task_graph_gradecast (session, i, lead, step_rep_start,
3168 task = ((struct TaskEntry) {
3169 .step = step_rep_end,
3170 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1 },
3171 .start = task_start_apply_round,
3173 put_task (session->taskmap, &task);
3175 prev_step = step_rep_end;
3178 /* There is no next gradecast round, thus the final
3179 start step is the overall end step of the gradecasts */
3181 step = create_step (session, round, GNUNET_NO);
3182 #ifdef GNUNET_EXTRA_LOGGING
3183 GNUNET_asprintf (&step->debug_name, "finish");
3185 step_depend_on (step, prev_step);
3187 task = ((struct TaskEntry) {
3189 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3190 .start = task_start_finish,
3192 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3194 put_task (session->taskmap, &task);
3199 * Check join message.
3201 * @param cls session of client that sent the message
3202 * @param m message sent by the client
3203 * @return #GNUNET_OK if @a m is well-formed
3206 check_client_join (void *cls,
3207 const struct GNUNET_CONSENSUS_JoinMessage *m)
3209 uint32_t listed_peers = ntohl (m->num_peers);
3211 if ((ntohs (m->header.size) - sizeof(*m)) !=
3212 listed_peers * sizeof(struct GNUNET_PeerIdentity))
3215 return GNUNET_SYSERR;
3222 * Called when a client wants to join a consensus session.
3224 * @param cls session of client that sent the message
3225 * @param m message sent by the client
3228 handle_client_join (void *cls,
3229 const struct GNUNET_CONSENSUS_JoinMessage *m)
3231 struct ConsensusSession *session = cls;
3232 struct ConsensusSession *other_session;
3234 initialize_session_peer_list (session,
3236 compute_global_id (session,
3239 /* Check if some local client already owns the session.
3240 It is only legal to have a session with an existing global id
3241 if all other sessions with this global id are finished.*/
3242 for (other_session = sessions_head;
3243 NULL != other_session;
3244 other_session = other_session->next)
3246 if ((other_session != session) &&
3247 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3248 &other_session->global_id)))
3252 session->conclude_deadline
3253 = GNUNET_TIME_absolute_ntoh (m->deadline);
3254 session->conclude_start
3255 = GNUNET_TIME_absolute_ntoh (m->start);
3256 session->local_peer_idx = get_peer_idx (&my_peer,
3258 GNUNET_assert (-1 != session->local_peer_idx);
3260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3261 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3262 GNUNET_h2s (&m->session_id),
3264 session->local_peer_idx,
3265 GNUNET_STRINGS_relative_time_to_string
3266 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3267 session->conclude_deadline),
3270 session->set_listener
3271 = GNUNET_SET_listen (cfg,
3272 GNUNET_SET_OPERATION_UNION,
3273 &session->global_id,
3277 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3279 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3281 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3283 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3287 struct SetEntry *client_set;
3289 client_set = GNUNET_new (struct SetEntry);
3290 client_set->h = GNUNET_SET_create (cfg,
3291 GNUNET_SET_OPERATION_UNION);
3292 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3293 sh->h = client_set->h;
3294 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3295 session->set_handles_tail,
3297 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3302 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3305 /* Just construct the task graph,
3306 but don't run anything until the client calls conclude. */
3307 construct_task_graph (session);
3308 GNUNET_SERVICE_client_continue (session->client);
3313 client_insert_done (void *cls)
3320 * Called when a client performs an insert operation.
3322 * @param cls client handle
3323 * @param msg message sent by the client
3324 * @return #GNUNET_OK (always well-formed)
3327 check_client_insert (void *cls,
3328 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3335 * Called when a client performs an insert operation.
3337 * @param cls client handle
3338 * @param msg message sent by the client
3341 handle_client_insert (void *cls,
3342 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3344 struct ConsensusSession *session = cls;
3345 ssize_t element_size;
3346 struct GNUNET_SET_Handle *initial_set;
3347 struct ConsensusElement *ce;
3349 if (GNUNET_YES == session->conclude_started)
3352 GNUNET_SERVICE_client_drop (session->client);
3356 element_size = ntohs (msg->header.size) - sizeof(struct
3357 GNUNET_CONSENSUS_ElementMessage);
3358 ce = GNUNET_malloc (sizeof(struct ConsensusElement) + element_size);
3359 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3360 ce->payload_type = msg->element_type;
3362 struct GNUNET_SET_Element element = {
3363 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3364 .size = sizeof(struct ConsensusElement) + element_size,
3369 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3370 struct SetEntry *entry;
3372 entry = lookup_set (session,
3374 GNUNET_assert (NULL != entry);
3375 initial_set = entry->h;
3378 session->num_client_insert_pending++;
3379 GNUNET_SET_add_element (initial_set,
3381 &client_insert_done,
3384 #ifdef GNUNET_EXTRA_LOGGING
3386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3387 "P%u: element %s added\n",
3388 session->local_peer_idx,
3389 debug_str_element (&element));
3393 GNUNET_SERVICE_client_continue (session->client);
3398 * Called when a client performs the conclude operation.
3400 * @param cls client handle
3401 * @param message message sent by the client
3404 handle_client_conclude (void *cls,
3405 const struct GNUNET_MessageHeader *message)
3407 struct ConsensusSession *session = cls;
3409 if (GNUNET_YES == session->conclude_started)
3411 /* conclude started twice */
3413 GNUNET_SERVICE_client_drop (session->client);
3416 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3417 "conclude requested\n");
3418 session->conclude_started = GNUNET_YES;
3419 install_step_timeouts (session);
3420 run_ready_steps (session);
3421 GNUNET_SERVICE_client_continue (session->client);
3426 * Called to clean up, after a shutdown has been requested.
3428 * @param cls closure
3431 shutdown_task (void *cls)
3433 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3435 GNUNET_STATISTICS_destroy (statistics,
3442 * Start processing consensus requests.
3444 * @param cls closure
3445 * @param c configuration to use
3446 * @param service the initialized service
3450 const struct GNUNET_CONFIGURATION_Handle *c,
3451 struct GNUNET_SERVICE_Handle *service)
3455 GNUNET_CRYPTO_get_peer_identity (cfg,
3458 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3459 "Could not retrieve host identity\n");
3460 GNUNET_SCHEDULER_shutdown ();
3463 statistics = GNUNET_STATISTICS_create ("consensus",
3465 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3471 * Callback called when a client connects to the service.
3473 * @param cls closure for the service
3474 * @param c the new client that connected to the service
3475 * @param mq the message queue used to send messages to the client
3479 client_connect_cb (void *cls,
3480 struct GNUNET_SERVICE_Client *c,
3481 struct GNUNET_MQ_Handle *mq)
3483 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3485 session->client = c;
3486 session->client_mq = mq;
3487 GNUNET_CONTAINER_DLL_insert (sessions_head,
3495 * Callback called when a client disconnected from the service
3497 * @param cls closure for the service
3498 * @param c the client that disconnected
3499 * @param internal_cls should be equal to @a c
3502 client_disconnect_cb (void *cls,
3503 struct GNUNET_SERVICE_Client *c,
3506 struct ConsensusSession *session = internal_cls;
3508 if (NULL != session->set_listener)
3510 GNUNET_SET_listen_cancel (session->set_listener);
3511 session->set_listener = NULL;
3513 GNUNET_CONTAINER_DLL_remove (sessions_head,
3517 while (session->set_handles_head)
3519 struct SetHandle *sh = session->set_handles_head;
3520 session->set_handles_head = sh->next;
3521 GNUNET_SET_destroy (sh->h);
3524 GNUNET_free (session);
3529 * Define "main" method using service macro.
3533 GNUNET_SERVICE_OPTION_NONE,
3536 &client_disconnect_cb,
3538 GNUNET_MQ_hd_fixed_size (client_conclude,
3539 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3540 struct GNUNET_MessageHeader,
3542 GNUNET_MQ_hd_var_size (client_insert,
3543 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3544 struct GNUNET_CONSENSUS_ElementMessage,
3546 GNUNET_MQ_hd_var_size (client_join,
3547 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3548 struct GNUNET_CONSENSUS_JoinMessage,
3550 GNUNET_MQ_handler_end ());
3552 /* end of gnunet-service-consensus.c */