2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
42 * Vote that nothing should change.
43 * This option is never voted explicitly.
47 * Vote that an element should be added.
51 * Vote that an element should be removed.
57 enum EarlyStoppingPhase
59 EARLY_STOPPING_NONE = 0,
60 EARLY_STOPPING_ONE_MORE = 1,
61 EARLY_STOPPING_DONE = 2,
65 GNUNET_NETWORK_STRUCT_BEGIN
68 * Tuple of integers that together
69 * identify a task uniquely.
73 * A value from 'enum PhaseKind'.
75 uint16_t kind GNUNET_PACKED;
78 * Number of the first peer
81 int16_t peer1 GNUNET_PACKED;
84 * Number of the second peer in canonical order.
86 int16_t peer2 GNUNET_PACKED;
89 * Repetition of the gradecast phase.
91 int16_t repetition GNUNET_PACKED;
94 * Leader in the gradecast phase.
96 * Can be different from both peer1 and peer2.
98 int16_t leader GNUNET_PACKED;
105 int set_kind GNUNET_PACKED;
106 int k1 GNUNET_PACKED;
107 int k2 GNUNET_PACKED;
114 struct GNUNET_SET_Handle *h;
116 * GNUNET_YES if the set resulted
117 * from applying a referendum with contested
126 int diff_kind GNUNET_PACKED;
127 int k1 GNUNET_PACKED;
128 int k2 GNUNET_PACKED;
133 int rfn_kind GNUNET_PACKED;
134 int k1 GNUNET_PACKED;
135 int k2 GNUNET_PACKED;
139 GNUNET_NETWORK_STRUCT_END
143 PHASE_KIND_ALL_TO_ALL,
144 PHASE_KIND_ALL_TO_ALL_2,
145 PHASE_KIND_GRADECAST_LEADER,
146 PHASE_KIND_GRADECAST_ECHO,
147 PHASE_KIND_GRADECAST_ECHO_GRADE,
148 PHASE_KIND_GRADECAST_CONFIRM,
149 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
151 * Apply a repetition of the all-to-all
152 * gradecast to the current set.
154 PHASE_KIND_APPLY_REP,
164 * Last result set from a gradecast
166 SET_KIND_LAST_GRADECAST,
167 SET_KIND_LEADER_PROPOSAL,
168 SET_KIND_ECHO_RESULT,
174 DIFF_KIND_LEADER_PROPOSAL,
175 DIFF_KIND_LEADER_CONSENSUS,
176 DIFF_KIND_GRADECAST_RESULT,
184 RFN_KIND_GRADECAST_RESULT
190 struct SetKey input_set;
192 struct SetKey output_set;
193 struct RfnKey output_rfn;
194 struct DiffKey output_diff;
198 int transceive_contested;
200 struct GNUNET_SET_OperationHandle *op;
206 struct SetKey input_set;
210 * Closure for both @a start_task
211 * and @a cancel_task.
215 struct SetOpCls setop;
216 struct FinishCls finish;
221 typedef void (*TaskFunc) (struct TaskEntry *task);
224 * Node in the consensus task graph.
239 union TaskFuncCls cls;
246 * All steps of one session are in a
247 * linked list for easier deallocation.
252 * All steps of one session are in a
253 * linked list for easier deallocation.
257 struct ConsensusSession *session;
260 * Tasks that this step is composed of.
262 struct TaskEntry **tasks;
263 unsigned int tasks_len;
264 unsigned int tasks_cap;
266 unsigned int finished_tasks;
269 * Tasks that have this task as dependency.
271 * We store pointers to subordinates rather
272 * than to prerequisites since it makes
273 * tracking the readiness of a task easier.
275 struct Step **subordinates;
276 unsigned int subordinates_len;
277 unsigned int subordinates_cap;
280 * Counter for the prerequisites of
283 size_t pending_prereq;
286 * Task that will run this step despite
287 * any pending prerequisites.
289 struct GNUNET_SCHEDULER_Task *timeout_task;
291 unsigned int is_running;
293 unsigned int is_finished;
296 * Synchrony round of the task.
297 * Determines the deadline for the task.
302 * Human-readable name for
303 * the task, used for debugging.
308 * When we're doing an early finish, how should this step be
310 * If GNUNET_YES, the step will be marked as finished
311 * without actually running its tasks.
312 * Otherwise, the step will still be run even after
315 * Note that a task may never be finished early if
316 * it is already running.
318 int early_finishable;
322 struct RfnElementInfo
324 const struct GNUNET_SET_Element *element;
327 * GNUNET_YES if the peer votes for the proposal.
332 * Proposal for this element,
333 * can only be VOTE_ADD or VOTE_REMOVE.
335 enum ReferendumVote proposal;
339 struct ReferendumEntry
344 * Elements where there is at least one proposed change.
346 * Maps the hash of the GNUNET_SET_Element
347 * to 'struct RfnElementInfo'.
349 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
351 unsigned int num_peers;
354 * Stores, for every peer in the session,
355 * whether the peer finished the whole referendum.
357 * Votes from peers are only counted if they're
358 * marked as commited (#GNUNET_YES) in the referendum.
360 * Otherwise (#GNUNET_NO), the requested changes are
361 * not counted for majority votes or thresholds.
367 * Contestation state of the peer. If a peer is contested, the values it
368 * contributed are still counted for applying changes, but the grading is
375 struct DiffElementInfo
377 const struct GNUNET_SET_Element *element;
380 * Positive weight for 'add', negative
381 * weights for 'remove'.
393 struct GNUNET_CONTAINER_MultiHashMap *changes;
399 * A consensus session consists of one local client and the remote authorities.
401 struct ConsensusSession
404 * Consensus sessions are kept in a DLL.
406 struct ConsensusSession *next;
409 * Consensus sessions are kept in a DLL.
411 struct ConsensusSession *prev;
413 unsigned int num_client_insert_pending;
415 struct GNUNET_CONTAINER_MultiHashMap *setmap;
416 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
417 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
420 * Array of peers with length 'num_peers'.
422 int *peers_blacklisted;
425 * Mapping from (hashed) TaskKey to TaskEntry.
427 * We map the application_id for a round to the task that should be
428 * executed, so we don't have to go through all task whenever we get
429 * an incoming set op request.
431 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
433 struct Step *steps_head;
434 struct Step *steps_tail;
436 int conclude_started;
441 * Global consensus identification, computed
442 * from the session id and participating authorities.
444 struct GNUNET_HashCode global_id;
447 * Client that inhabits the session
449 struct GNUNET_SERVICE_Client *client;
452 * Queued messages to the client.
454 struct GNUNET_MQ_Handle *client_mq;
457 * Time when the conclusion of the consensus should begin.
459 struct GNUNET_TIME_Absolute conclude_start;
462 * Timeout for all rounds together, single rounds will schedule a timeout task
463 * with a fraction of the conclude timeout.
464 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
466 struct GNUNET_TIME_Absolute conclude_deadline;
468 struct GNUNET_PeerIdentity *peers;
471 * Number of other peers in the consensus.
473 unsigned int num_peers;
476 * Index of the local peer in the peers array
478 unsigned int local_peer_idx;
481 * Listener for requests from other peers.
482 * Uses the session's global id as app id.
484 struct GNUNET_SET_ListenHandle *set_listener;
487 * State of our early stopping scheme.
492 * Our set size from the first round.
496 uint64_t *first_sizes_received;
499 * Bounded Eppstein lower bound.
501 uint64_t lower_bound;
505 * Linked list of sessions this peer participates in.
507 static struct ConsensusSession *sessions_head;
510 * Linked list of sessions this peer participates in.
512 static struct ConsensusSession *sessions_tail;
515 * Configuration of the consensus service.
517 static const struct GNUNET_CONFIGURATION_Handle *cfg;
520 * Peer that runs this service.
522 static struct GNUNET_PeerIdentity my_peer;
527 struct GNUNET_STATISTICS_Handle *statistics;
531 finish_task (struct TaskEntry *task);
535 run_ready_steps (struct ConsensusSession *session);
539 phasename (uint16_t phase)
543 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
544 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
545 case PHASE_KIND_FINISH: return "FINISH";
546 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
547 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
548 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
549 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
550 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
551 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
552 default: return "(unknown)";
558 setname (uint16_t kind)
562 case SET_KIND_CURRENT: return "CURRENT";
563 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
564 case SET_KIND_NONE: return "NONE";
565 default: return "(unknown)";
570 rfnname (uint16_t kind)
574 case RFN_KIND_NONE: return "NONE";
575 case RFN_KIND_ECHO: return "ECHO";
576 case RFN_KIND_CONFIRM: return "CONFIRM";
577 default: return "(unknown)";
582 diffname (uint16_t kind)
586 case DIFF_KIND_NONE: return "NONE";
587 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
588 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
589 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
590 default: return "(unknown)";
594 #ifdef GNUNET_EXTRA_LOGGING
598 debug_str_element (const struct GNUNET_SET_Element *el)
600 struct GNUNET_HashCode hash;
602 GNUNET_SET_element_hash (el, &hash);
604 return GNUNET_h2s (&hash);
608 debug_str_task_key (struct TaskKey *tk)
610 static char buf[256];
612 snprintf (buf, sizeof (buf),
613 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
614 phasename (tk->kind), tk->peer1, tk->peer2,
615 tk->leader, tk->repetition);
621 debug_str_diff_key (struct DiffKey *dk)
623 static char buf[256];
625 snprintf (buf, sizeof (buf),
626 "DiffKey kind=%s, k1=%d, k2=%d",
627 diffname (dk->diff_kind), dk->k1, dk->k2);
633 debug_str_set_key (const struct SetKey *sk)
635 static char buf[256];
637 snprintf (buf, sizeof (buf),
638 "SetKey kind=%s, k1=%d, k2=%d",
639 setname (sk->set_kind), sk->k1, sk->k2);
646 debug_str_rfn_key (const struct RfnKey *rk)
648 static char buf[256];
650 snprintf (buf, sizeof (buf),
651 "RfnKey kind=%s, k1=%d, k2=%d",
652 rfnname (rk->rfn_kind), rk->k1, rk->k2);
657 #endif /* GNUNET_EXTRA_LOGGING */
661 * Send the final result set of the consensus to the client, element by
665 * @param element the current element, NULL if all elements have been
667 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
670 send_to_client_iter (void *cls,
671 const struct GNUNET_SET_Element *element)
673 struct TaskEntry *task = (struct TaskEntry *) cls;
674 struct ConsensusSession *session = task->step->session;
675 struct GNUNET_MQ_Envelope *ev;
679 struct GNUNET_CONSENSUS_ElementMessage *m;
680 const struct ConsensusElement *ce;
682 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
685 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691 "P%d: sending element %s to client\n",
692 session->local_peer_idx,
693 debug_str_element (element));
695 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
696 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
697 m->element_type = ce->payload_type;
698 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
699 GNUNET_MQ_send (session->client_mq, ev);
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704 "P%d: finished iterating elements for client\n",
705 session->local_peer_idx);
706 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
707 GNUNET_MQ_send (session->client_mq, ev);
713 static struct SetEntry *
714 lookup_set (struct ConsensusSession *session, struct SetKey *key)
716 struct GNUNET_HashCode hash;
718 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719 "P%u: looking up set {%s}\n",
720 session->local_peer_idx,
721 debug_str_set_key (key));
723 GNUNET_assert (SET_KIND_NONE != key->set_kind);
724 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
725 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
729 static struct DiffEntry *
730 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
732 struct GNUNET_HashCode hash;
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "P%u: looking up diff {%s}\n",
736 session->local_peer_idx,
737 debug_str_diff_key (key));
739 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
740 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
741 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
745 static struct ReferendumEntry *
746 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
748 struct GNUNET_HashCode hash;
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "P%u: looking up rfn {%s}\n",
752 session->local_peer_idx,
753 debug_str_rfn_key (key));
755 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
756 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
757 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
762 diff_insert (struct DiffEntry *diff,
764 const struct GNUNET_SET_Element *element)
766 struct DiffElementInfo *di;
767 struct GNUNET_HashCode hash;
769 GNUNET_assert ( (1 == weight) || (-1 == weight));
771 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
772 "diff_insert with element size %u\n",
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776 "hashing element\n");
778 GNUNET_SET_element_hash (element, &hash);
780 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
787 di = GNUNET_new (struct DiffElementInfo);
788 di->element = GNUNET_SET_element_dup (element);
789 GNUNET_assert (GNUNET_OK ==
790 GNUNET_CONTAINER_multihashmap_put (diff->changes,
792 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
800 rfn_commit (struct ReferendumEntry *rfn,
801 uint16_t commit_peer)
803 GNUNET_assert (commit_peer < rfn->num_peers);
805 rfn->peer_commited[commit_peer] = GNUNET_YES;
810 rfn_contest (struct ReferendumEntry *rfn,
811 uint16_t contested_peer)
813 GNUNET_assert (contested_peer < rfn->num_peers);
815 rfn->peer_contested[contested_peer] = GNUNET_YES;
820 rfn_noncontested (struct ReferendumEntry *rfn)
826 for (i = 0; i < rfn->num_peers; i++)
827 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
835 rfn_vote (struct ReferendumEntry *rfn,
836 uint16_t voting_peer,
837 enum ReferendumVote vote,
838 const struct GNUNET_SET_Element *element)
840 struct RfnElementInfo *ri;
841 struct GNUNET_HashCode hash;
843 GNUNET_assert (voting_peer < rfn->num_peers);
845 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
846 since VOTE_KEEP is implicit in not voting. */
847 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
849 GNUNET_SET_element_hash (element, &hash);
850 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
854 ri = GNUNET_new (struct RfnElementInfo);
855 ri->element = GNUNET_SET_element_dup (element);
856 ri->votes = GNUNET_new_array (rfn->num_peers, int);
857 GNUNET_assert (GNUNET_OK ==
858 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
860 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
863 ri->votes[voting_peer] = GNUNET_YES;
869 task_other_peer (struct TaskEntry *task)
871 uint16_t me = task->step->session->local_peer_idx;
872 if (task->key.peer1 == me)
873 return task->key.peer2;
874 return task->key.peer1;
879 cmp_uint64_t (const void *pa, const void *pb)
881 uint64_t a = *(uint64_t *) pa;
882 uint64_t b = *(uint64_t *) pb;
893 * Callback for set operation results. Called for each element
897 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
898 * @param current_size current set size
899 * @param status see enum GNUNET_SET_Status
902 set_result_cb (void *cls,
903 const struct GNUNET_SET_Element *element,
904 uint64_t current_size,
905 enum GNUNET_SET_Status status)
907 struct TaskEntry *task = cls;
908 struct ConsensusSession *session = task->step->session;
909 struct SetEntry *output_set = NULL;
910 struct DiffEntry *output_diff = NULL;
911 struct ReferendumEntry *output_rfn = NULL;
912 unsigned int other_idx;
913 struct SetOpCls *setop;
914 const struct ConsensusElement *consensus_element = NULL;
918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919 "P%u: got element of type %u, status %u\n",
920 session->local_peer_idx,
921 (unsigned) element->element_type,
923 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
924 consensus_element = element->data;
927 setop = &task->cls.setop;
930 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
931 "P%u: got set result for {%s}, status %u\n",
932 session->local_peer_idx,
933 debug_str_task_key (&task->key),
936 if (GNUNET_NO == task->is_started)
942 if (GNUNET_YES == task->is_finished)
948 other_idx = task_other_peer (task);
950 if (SET_KIND_NONE != setop->output_set.set_kind)
952 output_set = lookup_set (session, &setop->output_set);
953 GNUNET_assert (NULL != output_set);
956 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
958 output_diff = lookup_diff (session, &setop->output_diff);
959 GNUNET_assert (NULL != output_diff);
962 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
964 output_rfn = lookup_rfn (session, &setop->output_rfn);
965 GNUNET_assert (NULL != output_rfn);
968 if (GNUNET_YES == session->peers_blacklisted[other_idx])
970 /* Peer might have been blacklisted
971 by a gradecast running in parallel, ignore elements from now */
972 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
974 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
978 if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
980 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
981 "P%u: got some marker\n",
982 session->local_peer_idx);
983 if ( (GNUNET_YES == setop->transceive_contested) &&
984 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
986 GNUNET_assert (NULL != output_rfn);
987 rfn_contest (output_rfn, task_other_peer (task));
991 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
994 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
995 "P%u: got size marker\n",
996 session->local_peer_idx);
999 struct ConsensusSizeElement *cse = (void *) consensus_element;
1001 if (cse->sender_index == other_idx)
1003 if (NULL == session->first_sizes_received)
1004 session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1005 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1007 uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1008 qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1009 session->lower_bound = copy[session->num_peers / 3 + 1];
1010 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1011 "P%u: lower bound %llu\n",
1012 session->local_peer_idx,
1013 (long long) session->lower_bound);
1023 case GNUNET_SET_STATUS_ADD_LOCAL:
1024 GNUNET_assert (NULL != consensus_element);
1025 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026 "Adding element in Task {%s}\n",
1027 debug_str_task_key (&task->key));
1028 if (NULL != output_set)
1030 // FIXME: record pending adds, use callback
1031 GNUNET_SET_add_element (output_set->h,
1035 #ifdef GNUNET_EXTRA_LOGGING
1036 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1037 "P%u: adding element %s into set {%s} of task {%s}\n",
1038 session->local_peer_idx,
1039 debug_str_element (element),
1040 debug_str_set_key (&setop->output_set),
1041 debug_str_task_key (&task->key));
1044 if (NULL != output_diff)
1046 diff_insert (output_diff, 1, element);
1047 #ifdef GNUNET_EXTRA_LOGGING
1048 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1049 "P%u: adding element %s into diff {%s} of task {%s}\n",
1050 session->local_peer_idx,
1051 debug_str_element (element),
1052 debug_str_diff_key (&setop->output_diff),
1053 debug_str_task_key (&task->key));
1056 if (NULL != output_rfn)
1058 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1060 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1061 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1062 session->local_peer_idx,
1063 debug_str_element (element),
1064 debug_str_rfn_key (&setop->output_rfn),
1065 debug_str_task_key (&task->key));
1068 // XXX: add result to structures in task
1070 case GNUNET_SET_STATUS_ADD_REMOTE:
1071 GNUNET_assert (NULL != consensus_element);
1072 if (GNUNET_YES == setop->do_not_remove)
1074 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Removing element in Task {%s}\n",
1078 debug_str_task_key (&task->key));
1079 if (NULL != output_set)
1081 // FIXME: record pending adds, use callback
1082 GNUNET_SET_remove_element (output_set->h,
1086 #ifdef GNUNET_EXTRA_LOGGING
1087 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1088 "P%u: removing element %s from set {%s} of task {%s}\n",
1089 session->local_peer_idx,
1090 debug_str_element (element),
1091 debug_str_set_key (&setop->output_set),
1092 debug_str_task_key (&task->key));
1095 if (NULL != output_diff)
1097 diff_insert (output_diff, -1, element);
1098 #ifdef GNUNET_EXTRA_LOGGING
1099 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1100 "P%u: removing element %s from diff {%s} of task {%s}\n",
1101 session->local_peer_idx,
1102 debug_str_element (element),
1103 debug_str_diff_key (&setop->output_diff),
1104 debug_str_task_key (&task->key));
1107 if (NULL != output_rfn)
1109 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1111 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1112 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1113 session->local_peer_idx,
1114 debug_str_element (element),
1115 debug_str_rfn_key (&setop->output_rfn),
1116 debug_str_task_key (&task->key));
1120 case GNUNET_SET_STATUS_DONE:
1121 // XXX: check first if any changes to the underlying
1122 // set are still pending
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124 "Finishing setop in Task {%s}\n",
1125 debug_str_task_key (&task->key));
1126 if (NULL != output_rfn)
1128 rfn_commit (output_rfn, task_other_peer (task));
1130 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1132 session->first_size = current_size;
1136 case GNUNET_SET_STATUS_FAILURE:
1138 GNUNET_break_op (0);
1159 enum EvilnessSubType
1162 EVILNESS_SUB_REPLACEMENT,
1163 EVILNESS_SUB_NO_REPLACEMENT,
1168 enum EvilnessType type;
1169 enum EvilnessSubType subtype;
1175 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1177 if (0 == strcmp ("replace", evil_subtype_str))
1179 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1181 else if (0 == strcmp ("noreplace", evil_subtype_str))
1183 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1188 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1190 return GNUNET_SYSERR;
1197 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1201 char *evil_type_str = NULL;
1202 char *evil_subtype_str = NULL;
1204 GNUNET_assert (NULL != evil);
1206 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209 "P%u: no evilness\n",
1210 session->local_peer_idx);
1211 evil->type = EVILNESS_NONE;
1214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1215 "P%u: got evilness spec\n",
1216 session->local_peer_idx);
1218 for (field = strtok (evil_spec, "/");
1220 field = strtok (NULL, "/"))
1222 unsigned int peer_num;
1223 unsigned int evil_num;
1226 evil_type_str = NULL;
1227 evil_subtype_str = NULL;
1229 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1233 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1234 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1240 GNUNET_assert (NULL != evil_type_str);
1241 GNUNET_assert (NULL != evil_subtype_str);
1243 if (peer_num == session->local_peer_idx)
1245 if (0 == strcmp ("slack", evil_type_str))
1247 evil->type = EVILNESS_SLACK;
1249 if (0 == strcmp ("slack-a2a", evil_type_str))
1251 evil->type = EVILNESS_SLACK_A2A;
1253 else if (0 == strcmp ("cram-all", evil_type_str))
1255 evil->type = EVILNESS_CRAM_ALL;
1256 evil->num = evil_num;
1257 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1260 else if (0 == strcmp ("cram-lead", evil_type_str))
1262 evil->type = EVILNESS_CRAM_LEAD;
1263 evil->num = evil_num;
1264 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1267 else if (0 == strcmp ("cram-echo", evil_type_str))
1269 evil->type = EVILNESS_CRAM_ECHO;
1270 evil->num = evil_num;
1271 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1276 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1283 /* No GNUNET_free since memory was allocated by libc */
1284 free (evil_type_str);
1285 evil_type_str = NULL;
1286 evil_subtype_str = NULL;
1289 evil->type = EVILNESS_NONE;
1291 GNUNET_free (evil_spec);
1292 /* no GNUNET_free_non_null since it wasn't
1293 * allocated with GNUNET_malloc */
1294 if (NULL != evil_type_str)
1295 free (evil_type_str);
1296 if (NULL != evil_subtype_str)
1297 free (evil_subtype_str);
1304 * Commit the appropriate set for a
1308 commit_set (struct ConsensusSession *session,
1309 struct TaskEntry *task)
1311 struct SetEntry *set;
1312 struct SetOpCls *setop = &task->cls.setop;
1314 GNUNET_assert (NULL != setop->op);
1315 set = lookup_set (session, &setop->input_set);
1316 GNUNET_assert (NULL != set);
1318 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1320 struct GNUNET_SET_Element element;
1321 struct ConsensusElement ce = { 0 };
1322 ce.marker = CONSENSUS_MARKER_CONTESTED;
1324 element.size = sizeof (struct ConsensusElement);
1325 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1326 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1329 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1331 struct GNUNET_SET_Element element;
1332 struct ConsensusSizeElement cse = { 0 };
1333 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
1334 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1335 cse.size = GNUNET_htonll (session->first_size);
1336 cse.sender_index = session->local_peer_idx;
1337 element.data = &cse;
1338 element.size = sizeof (struct ConsensusSizeElement);
1339 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1340 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1346 struct Evilness evil;
1348 get_evilness (session, &evil);
1349 if (EVILNESS_NONE != evil.type)
1351 /* Useful for evaluation */
1352 GNUNET_STATISTICS_set (statistics,
1359 case EVILNESS_CRAM_ALL:
1360 case EVILNESS_CRAM_LEAD:
1361 case EVILNESS_CRAM_ECHO:
1362 /* We're not cramming elements in the
1363 all-to-all round, since that would just
1364 add more elements to the result set, but
1365 wouldn't test robustness. */
1366 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1368 GNUNET_SET_commit (setop->op, set->h);
1371 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1372 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1374 GNUNET_SET_commit (setop->op, set->h);
1377 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1379 GNUNET_SET_commit (setop->op, set->h);
1382 for (i = 0; i < evil.num; i++)
1384 struct GNUNET_SET_Element element;
1385 struct ConsensusStuffedElement se = { 0 };
1387 element.size = sizeof (struct ConsensusStuffedElement);
1388 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1390 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1392 /* Always generate a new element. */
1393 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1395 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1397 /* Always cram the same elements, derived from counter. */
1398 GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1404 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1405 #ifdef GNUNET_EXTRA_LOGGING
1406 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1407 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1408 session->local_peer_idx,
1409 debug_str_element (&element),
1410 debug_str_set_key (&setop->input_set),
1411 debug_str_task_key (&task->key));
1414 GNUNET_STATISTICS_update (statistics,
1415 "# stuffed elements",
1418 GNUNET_SET_commit (setop->op, set->h);
1420 case EVILNESS_SLACK:
1421 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1422 "P%u: evil peer: slacking\n",
1423 (unsigned int) session->local_peer_idx);
1425 case EVILNESS_SLACK_A2A:
1426 if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1427 (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1429 struct GNUNET_SET_Handle *empty_set;
1430 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1431 GNUNET_SET_commit (setop->op, empty_set);
1432 GNUNET_SET_destroy (empty_set);
1436 GNUNET_SET_commit (setop->op, set->h);
1440 GNUNET_SET_commit (setop->op, set->h);
1445 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1447 GNUNET_SET_commit (setop->op, set->h);
1451 /* For our testcases, we don't want the blacklisted
1453 GNUNET_SET_operation_cancel (setop->op);
1461 put_diff (struct ConsensusSession *session,
1462 struct DiffEntry *diff)
1464 struct GNUNET_HashCode hash;
1466 GNUNET_assert (NULL != diff);
1468 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1469 GNUNET_assert (GNUNET_OK ==
1470 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1471 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1475 put_set (struct ConsensusSession *session,
1476 struct SetEntry *set)
1478 struct GNUNET_HashCode hash;
1480 GNUNET_assert (NULL != set->h);
1482 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1484 debug_str_set_key (&set->key));
1486 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1487 GNUNET_assert (GNUNET_SYSERR !=
1488 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1489 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1494 put_rfn (struct ConsensusSession *session,
1495 struct ReferendumEntry *rfn)
1497 struct GNUNET_HashCode hash;
1499 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1500 GNUNET_assert (GNUNET_OK ==
1501 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1502 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1508 task_cancel_reconcile (struct TaskEntry *task)
1510 /* not implemented yet */
1516 apply_diff_to_rfn (struct DiffEntry *diff,
1517 struct ReferendumEntry *rfn,
1518 uint16_t voting_peer,
1521 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1522 struct DiffElementInfo *di;
1524 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1526 while (GNUNET_YES ==
1527 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1529 (const void **) &di))
1533 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1537 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1541 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1548 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1550 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1557 diff_compose (struct DiffEntry *diff_1,
1558 struct DiffEntry *diff_2)
1560 struct DiffEntry *diff_new;
1561 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1562 struct DiffElementInfo *di;
1564 diff_new = diff_create ();
1566 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1567 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1569 diff_insert (diff_new, di->weight, di->element);
1571 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1573 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1574 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1576 diff_insert (diff_new, di->weight, di->element);
1578 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1584 struct ReferendumEntry *
1585 rfn_create (uint16_t size)
1587 struct ReferendumEntry *rfn;
1589 rfn = GNUNET_new (struct ReferendumEntry);
1590 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1591 rfn->peer_commited = GNUNET_new_array (size, int);
1592 rfn->peer_contested = GNUNET_new_array (size, int);
1593 rfn->num_peers = size;
1601 diff_destroy (struct DiffEntry *diff)
1603 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1610 * For a given majority, count what the outcome
1611 * is (add/remove/keep), and give the number
1612 * of peers that voted for this outcome.
1615 rfn_majority (const struct ReferendumEntry *rfn,
1616 const struct RfnElementInfo *ri,
1617 uint16_t *ret_majority,
1618 enum ReferendumVote *ret_vote)
1620 uint16_t votes_yes = 0;
1621 uint16_t num_commited = 0;
1624 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1625 "Computing rfn majority for element %s of rfn {%s}\n",
1626 debug_str_element (ri->element),
1627 debug_str_rfn_key (&rfn->key));
1629 for (i = 0; i < rfn->num_peers; i++)
1631 if (GNUNET_NO == rfn->peer_commited[i])
1635 if (GNUNET_YES == ri->votes[i])
1639 if (votes_yes > (num_commited) / 2)
1641 *ret_vote = ri->proposal;
1642 *ret_majority = votes_yes;
1646 *ret_vote = VOTE_STAY;
1647 *ret_majority = num_commited - votes_yes;
1654 struct TaskEntry *task;
1655 struct SetKey dst_set_key;
1660 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1662 struct SetCopyCls *scc = cls;
1663 struct TaskEntry *task = scc->task;
1664 struct SetKey dst_set_key = scc->dst_set_key;
1665 struct SetEntry *set;
1668 set = GNUNET_new (struct SetEntry);
1670 set->key = dst_set_key;
1671 put_set (task->step->session, set);
1678 * Call the start function of the given
1679 * task again after we created a copy of the given set.
1682 create_set_copy_for_task (struct TaskEntry *task,
1683 struct SetKey *src_set_key,
1684 struct SetKey *dst_set_key)
1686 struct SetEntry *src_set;
1687 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690 "Copying set {%s} to {%s} for task {%s}\n",
1691 debug_str_set_key (src_set_key),
1692 debug_str_set_key (dst_set_key),
1693 debug_str_task_key (&task->key));
1696 scc->dst_set_key = *dst_set_key;
1697 src_set = lookup_set (task->step->session, src_set_key);
1698 GNUNET_assert (NULL != src_set);
1699 GNUNET_SET_copy_lazy (src_set->h,
1705 struct SetMutationProgressCls
1709 * Task to finish once all changes are through.
1711 struct TaskEntry *task;
1716 set_mutation_done (void *cls)
1718 struct SetMutationProgressCls *pc = cls;
1720 GNUNET_assert (pc->num_pending > 0);
1724 if (0 == pc->num_pending)
1726 struct TaskEntry *task = pc->task;
1734 try_finish_step_early (struct Step *step)
1738 if (GNUNET_YES == step->is_running)
1740 if (GNUNET_YES == step->is_finished)
1742 if (GNUNET_NO == step->early_finishable)
1745 step->is_finished = GNUNET_YES;
1747 #ifdef GNUNET_EXTRA_LOGGING
1748 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1749 "Finishing step `%s' early.\n",
1753 for (i = 0; i < step->subordinates_len; i++)
1755 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1756 step->subordinates[i]->pending_prereq--;
1757 #ifdef GNUNET_EXTRA_LOGGING
1758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1759 "Decreased pending_prereq to %u for step `%s'.\n",
1760 (unsigned int) step->subordinates[i]->pending_prereq,
1761 step->subordinates[i]->debug_name);
1764 try_finish_step_early (step->subordinates[i]);
1767 // XXX: maybe schedule as task to avoid recursion?
1768 run_ready_steps (step->session);
1773 finish_step (struct Step *step)
1777 GNUNET_assert (step->finished_tasks == step->tasks_len);
1778 GNUNET_assert (GNUNET_YES == step->is_running);
1779 GNUNET_assert (GNUNET_NO == step->is_finished);
1781 #ifdef GNUNET_EXTRA_LOGGING
1782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783 "All tasks of step `%s' with %u subordinates finished.\n",
1785 step->subordinates_len);
1788 for (i = 0; i < step->subordinates_len; i++)
1790 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1791 step->subordinates[i]->pending_prereq--;
1792 #ifdef GNUNET_EXTRA_LOGGING
1793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794 "Decreased pending_prereq to %u for step `%s'.\n",
1795 (unsigned int) step->subordinates[i]->pending_prereq,
1796 step->subordinates[i]->debug_name);
1801 step->is_finished = GNUNET_YES;
1803 // XXX: maybe schedule as task to avoid recursion?
1804 run_ready_steps (step->session);
1810 * Apply the result from one round of gradecasts (i.e. every peer
1811 * should have gradecasted) to the peer's current set.
1813 * @param task the task with context information
1816 task_start_apply_round (struct TaskEntry *task)
1818 struct ConsensusSession *session = task->step->session;
1819 struct SetKey sk_in;
1820 struct SetKey sk_out;
1821 struct RfnKey rk_in;
1822 struct SetEntry *set_out;
1823 struct ReferendumEntry *rfn_in;
1824 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1825 struct RfnElementInfo *ri;
1826 struct SetMutationProgressCls *progress_cls;
1827 uint16_t worst_majority = UINT16_MAX;
1829 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1830 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1831 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1833 set_out = lookup_set (session, &sk_out);
1834 if (NULL == set_out)
1836 create_set_copy_for_task (task, &sk_in, &sk_out);
1840 rfn_in = lookup_rfn (session, &rk_in);
1841 GNUNET_assert (NULL != rfn_in);
1843 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1844 progress_cls->task = task;
1846 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1848 while (GNUNET_YES ==
1849 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1851 (const void **) &ri))
1853 uint16_t majority_num;
1854 enum ReferendumVote majority_vote;
1856 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1858 if (worst_majority > majority_num)
1859 worst_majority = majority_num;
1861 switch (majority_vote)
1864 progress_cls->num_pending++;
1865 GNUNET_assert (GNUNET_OK ==
1866 GNUNET_SET_add_element (set_out->h,
1870 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1871 "P%u: apply round: adding element %s with %u-majority.\n",
1872 session->local_peer_idx,
1873 debug_str_element (ri->element), majority_num);
1876 progress_cls->num_pending++;
1877 GNUNET_assert (GNUNET_OK ==
1878 GNUNET_SET_remove_element (set_out->h,
1882 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1883 "P%u: apply round: deleting element %s with %u-majority.\n",
1884 session->local_peer_idx,
1885 debug_str_element (ri->element), majority_num);
1888 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1889 "P%u: apply round: keeping element %s with %u-majority.\n",
1890 session->local_peer_idx,
1891 debug_str_element (ri->element), majority_num);
1900 if (0 == progress_cls->num_pending)
1902 // call closure right now, no pending ops
1903 GNUNET_free (progress_cls);
1908 uint16_t thresh = (session->num_peers / 3) * 2;
1910 if (worst_majority >= thresh)
1912 switch (session->early_stopping)
1914 case EARLY_STOPPING_NONE:
1915 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1916 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1917 "P%u: Stopping early (after one more superround)\n",
1918 session->local_peer_idx);
1920 case EARLY_STOPPING_ONE_MORE:
1921 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1922 session->local_peer_idx);
1923 session->early_stopping = EARLY_STOPPING_DONE;
1926 for (step = session->steps_head; NULL != step; step = step->next)
1927 try_finish_step_early (step);
1930 case EARLY_STOPPING_DONE:
1931 /* We shouldn't be here anymore after early stopping */
1939 else if (EARLY_STOPPING_NONE != session->early_stopping)
1941 // Our assumption about the number of bad peers
1943 GNUNET_break_op (0);
1947 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1948 session->local_peer_idx);
1951 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1956 task_start_grade (struct TaskEntry *task)
1958 struct ConsensusSession *session = task->step->session;
1959 struct ReferendumEntry *output_rfn;
1960 struct ReferendumEntry *input_rfn;
1961 struct DiffEntry *input_diff;
1962 struct RfnKey rfn_key;
1963 struct DiffKey diff_key;
1964 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1965 struct RfnElementInfo *ri;
1966 unsigned int gradecast_confidence = 2;
1968 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1969 output_rfn = lookup_rfn (session, &rfn_key);
1970 if (NULL == output_rfn)
1972 output_rfn = rfn_create (session->num_peers);
1973 output_rfn->key = rfn_key;
1974 put_rfn (session, output_rfn);
1977 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1978 input_diff = lookup_diff (session, &diff_key);
1979 GNUNET_assert (NULL != input_diff);
1981 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1982 input_rfn = lookup_rfn (session, &rfn_key);
1983 GNUNET_assert (NULL != input_rfn);
1985 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1987 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1989 while (GNUNET_YES ==
1990 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1992 (const void **) &ri))
1994 uint16_t majority_num;
1995 enum ReferendumVote majority_vote;
1997 // XXX: we need contested votes and non-contested votes here
1998 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2000 if (majority_num <= session->num_peers / 3)
2001 majority_vote = VOTE_REMOVE;
2003 switch (majority_vote)
2008 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2011 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2018 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2021 uint16_t noncontested;
2022 noncontested = rfn_noncontested (input_rfn);
2023 if (noncontested < (session->num_peers / 3) * 2)
2025 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2027 if (noncontested < (session->num_peers / 3) + 1)
2029 gradecast_confidence = 0;
2033 if (gradecast_confidence >= 1)
2034 rfn_commit (output_rfn, task->key.leader);
2036 if (gradecast_confidence <= 1)
2037 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2044 task_start_reconcile (struct TaskEntry *task)
2046 struct SetEntry *input;
2047 struct SetOpCls *setop = &task->cls.setop;
2048 struct ConsensusSession *session = task->step->session;
2050 input = lookup_set (session, &setop->input_set);
2051 GNUNET_assert (NULL != input);
2052 GNUNET_assert (NULL != input->h);
2054 /* We create the outputs for the operation here
2055 (rather than in the set operation callback)
2056 because we want something valid in there, even
2057 if the other peer doesn't talk to us */
2059 if (SET_KIND_NONE != setop->output_set.set_kind)
2061 /* If we don't have an existing output set,
2062 we clone the input set. */
2063 if (NULL == lookup_set (session, &setop->output_set))
2065 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2070 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2072 if (NULL == lookup_rfn (session, &setop->output_rfn))
2074 struct ReferendumEntry *rfn;
2076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077 "P%u: output rfn <%s> missing, creating.\n",
2078 session->local_peer_idx,
2079 debug_str_rfn_key (&setop->output_rfn));
2081 rfn = rfn_create (session->num_peers);
2082 rfn->key = setop->output_rfn;
2083 put_rfn (session, rfn);
2087 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2089 if (NULL == lookup_diff (session, &setop->output_diff))
2091 struct DiffEntry *diff;
2093 diff = diff_create ();
2094 diff->key = setop->output_diff;
2095 put_diff (session, diff);
2099 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2101 /* XXX: mark the corresponding rfn as commited if necessary */
2106 if (task->key.peer1 == session->local_peer_idx)
2108 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2110 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2111 "P%u: Looking up set {%s} to run remote union\n",
2112 session->local_peer_idx,
2113 debug_str_set_key (&setop->input_set));
2115 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2116 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2118 rcm.kind = htons (task->key.kind);
2119 rcm.peer1 = htons (task->key.peer1);
2120 rcm.peer2 = htons (task->key.peer2);
2121 rcm.leader = htons (task->key.leader);
2122 rcm.repetition = htons (task->key.repetition);
2124 GNUNET_assert (NULL == setop->op);
2125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2126 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2128 struct GNUNET_SET_Option opts[] = {
2129 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2130 { GNUNET_SET_OPTION_END },
2133 // XXX: maybe this should be done while
2134 // setting up tasks alreays?
2135 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2136 &session->global_id,
2138 GNUNET_SET_RESULT_SYMMETRIC,
2143 commit_set (session, task);
2145 else if (task->key.peer2 == session->local_peer_idx)
2147 /* Wait for the other peer to contact us */
2148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2149 session->local_peer_idx, task->key.peer1);
2151 if (NULL != setop->op)
2153 commit_set (session, task);
2158 /* We made an error while constructing the task graph. */
2165 task_start_eval_echo (struct TaskEntry *task)
2167 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2168 struct ReferendumEntry *input_rfn;
2169 struct RfnElementInfo *ri;
2170 struct SetEntry *output_set;
2171 struct SetMutationProgressCls *progress_cls;
2172 struct ConsensusSession *session = task->step->session;
2173 struct SetKey sk_in;
2174 struct SetKey sk_out;
2175 struct RfnKey rk_in;
2177 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2178 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2179 output_set = lookup_set (session, &sk_out);
2180 if (NULL == output_set)
2182 create_set_copy_for_task (task, &sk_in, &sk_out);
2188 // FIXME: should be marked as a shallow copy, so
2189 // we can destroy everything correctly
2190 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2191 last_set->h = output_set->h;
2192 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2193 put_set (session, last_set);
2196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2197 "Evaluating referendum in Task {%s}\n",
2198 debug_str_task_key (&task->key));
2200 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2201 progress_cls->task = task;
2203 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2204 input_rfn = lookup_rfn (session, &rk_in);
2206 GNUNET_assert (NULL != input_rfn);
2208 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2209 GNUNET_assert (NULL != iter);
2211 while (GNUNET_YES ==
2212 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2214 (const void **) &ri))
2216 enum ReferendumVote majority_vote;
2217 uint16_t majority_num;
2219 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2221 if (majority_num < session->num_peers / 3)
2223 /* It is not the case that all nonfaulty peers
2224 echoed the same value. Since we're doing a set reconciliation, we
2225 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2226 reconciliation as contested. Other peers might not know that the
2227 leader is faulty, thus we still re-distribute in the confirmation
2229 output_set->is_contested = GNUNET_YES;
2232 switch (majority_vote)
2235 progress_cls->num_pending++;
2236 GNUNET_assert (GNUNET_OK ==
2237 GNUNET_SET_add_element (output_set->h,
2243 progress_cls->num_pending++;
2244 GNUNET_assert (GNUNET_OK ==
2245 GNUNET_SET_remove_element (output_set->h,
2251 /* Nothing to do. */
2259 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2261 if (0 == progress_cls->num_pending)
2263 // call closure right now, no pending ops
2264 GNUNET_free (progress_cls);
2271 task_start_finish (struct TaskEntry *task)
2273 struct SetEntry *final_set;
2274 struct ConsensusSession *session = task->step->session;
2276 final_set = lookup_set (session, &task->cls.finish.input_set);
2278 GNUNET_assert (NULL != final_set);
2281 GNUNET_SET_iterate (final_set->h,
2282 send_to_client_iter,
2287 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2291 GNUNET_assert (GNUNET_NO == task->is_started);
2292 GNUNET_assert (GNUNET_NO == task->is_finished);
2293 GNUNET_assert (NULL != task->start);
2297 task->is_started = GNUNET_YES;
2304 * Run all steps of the session that don't any
2305 * more dependencies.
2308 run_ready_steps (struct ConsensusSession *session)
2312 step = session->steps_head;
2314 while (NULL != step)
2316 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2320 GNUNET_assert (0 == step->finished_tasks);
2322 #ifdef GNUNET_EXTRA_LOGGING
2323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2324 session->local_peer_idx,
2326 step->round, step->tasks_len, step->subordinates_len);
2329 step->is_running = GNUNET_YES;
2330 for (i = 0; i < step->tasks_len; i++)
2331 start_task (session, step->tasks[i]);
2333 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2334 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2337 /* Running the next ready steps will be triggered by task completion */
2349 finish_task (struct TaskEntry *task)
2351 GNUNET_assert (GNUNET_NO == task->is_finished);
2352 task->is_finished = GNUNET_YES;
2354 task->step->finished_tasks++;
2356 if (task->step->finished_tasks == task->step->tasks_len)
2357 finish_step (task->step);
2362 * Search peer in the list of peers in session.
2364 * @param peer peer to find
2365 * @param session session with peer
2366 * @return index of peer, -1 if peer is not in session
2369 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2372 for (i = 0; i < session->num_peers; i++)
2373 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2380 * Compute a global, (hopefully) unique consensus session id,
2381 * from the local id of the consensus session, and the identities of all participants.
2382 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2383 * exactly the same peers, the global id will be different.
2385 * @param session session to generate the global id for
2386 * @param local_session_id local id of the consensus session
2389 compute_global_id (struct ConsensusSession *session,
2390 const struct GNUNET_HashCode *local_session_id)
2392 const char *salt = "gnunet-service-consensus/session_id";
2394 GNUNET_assert (GNUNET_YES ==
2395 GNUNET_CRYPTO_kdf (&session->global_id,
2396 sizeof (struct GNUNET_HashCode),
2400 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2402 sizeof (struct GNUNET_HashCode),
2408 * Compare two peer identities.
2410 * @param h1 some peer identity
2411 * @param h2 some peer identity
2412 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2415 peer_id_cmp (const void *h1, const void *h2)
2417 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2422 * Create the sorted list of peers for the session,
2423 * add the local peer if not in the join message.
2425 * @param session session to initialize
2426 * @param join_msg join message with the list of peers participating at the end
2429 initialize_session_peer_list (struct ConsensusSession *session,
2430 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2432 const struct GNUNET_PeerIdentity *msg_peers
2433 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2434 int local_peer_in_list;
2436 session->num_peers = ntohl (join_msg->num_peers);
2438 /* Peers in the join message, may or may not include the local peer,
2439 Add it if it is missing. */
2440 local_peer_in_list = GNUNET_NO;
2441 for (unsigned int i = 0; i < session->num_peers; i++)
2443 if (0 == memcmp (&msg_peers[i],
2445 sizeof (struct GNUNET_PeerIdentity)))
2447 local_peer_in_list = GNUNET_YES;
2451 if (GNUNET_NO == local_peer_in_list)
2452 session->num_peers++;
2454 session->peers = GNUNET_new_array (session->num_peers,
2455 struct GNUNET_PeerIdentity);
2456 if (GNUNET_NO == local_peer_in_list)
2457 session->peers[session->num_peers - 1] = my_peer;
2459 GNUNET_memcpy (session->peers,
2461 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2462 qsort (session->peers,
2464 sizeof (struct GNUNET_PeerIdentity),
2469 static struct TaskEntry *
2470 lookup_task (struct ConsensusSession *session,
2471 struct TaskKey *key)
2473 struct GNUNET_HashCode hash;
2476 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2477 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2478 GNUNET_h2s (&hash));
2479 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2484 * Called when another peer wants to do a set operation with the
2487 * @param cls closure
2488 * @param other_peer the other peer
2489 * @param context_msg message with application specific information from
2491 * @param request request from the other peer, use GNUNET_SET_accept
2492 * to accept it, otherwise the request will be refused
2493 * Note that we don't use a return value here, as it is also
2494 * necessary to specify the set we want to do the operation with,
2495 * whith sometimes can be derived from the context message.
2496 * Also necessary to specify the timeout.
2499 set_listen_cb (void *cls,
2500 const struct GNUNET_PeerIdentity *other_peer,
2501 const struct GNUNET_MessageHeader *context_msg,
2502 struct GNUNET_SET_Request *request)
2504 struct ConsensusSession *session = cls;
2506 struct TaskEntry *task;
2507 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2509 if (NULL == context_msg)
2511 GNUNET_break_op (0);
2515 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2517 GNUNET_break_op (0);
2521 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2523 GNUNET_break_op (0);
2527 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2529 tk = ((struct TaskKey) {
2530 .kind = ntohs (cm->kind),
2531 .peer1 = ntohs (cm->peer1),
2532 .peer2 = ntohs (cm->peer2),
2533 .repetition = ntohs (cm->repetition),
2534 .leader = ntohs (cm->leader),
2537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2538 session->local_peer_idx, debug_str_task_key (&tk));
2540 task = lookup_task (session, &tk);
2544 GNUNET_break_op (0);
2548 if (GNUNET_YES == task->is_finished)
2550 GNUNET_break_op (0);
2554 if (task->key.peer2 != session->local_peer_idx)
2556 /* We're being asked, so we must be thne 2nd peer. */
2557 GNUNET_break_op (0);
2561 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2562 (task->key.peer2 == session->local_peer_idx)));
2564 struct GNUNET_SET_Option opts[] = {
2565 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2566 { GNUNET_SET_OPTION_END },
2569 task->cls.setop.op = GNUNET_SET_accept (request,
2570 GNUNET_SET_RESULT_SYMMETRIC,
2575 /* If the task hasn't been started yet,
2576 we wait for that until we commit. */
2578 if (GNUNET_YES == task->is_started)
2580 commit_set (session, task);
2587 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2588 struct TaskEntry *t)
2590 struct GNUNET_HashCode round_hash;
2593 GNUNET_assert (NULL != t->step);
2595 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2599 if (s->tasks_len == s->tasks_cap)
2601 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2602 GNUNET_array_grow (s->tasks,
2607 #ifdef GNUNET_EXTRA_LOGGING
2608 GNUNET_assert (NULL != s->debug_name);
2609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2610 debug_str_task_key (&t->key),
2614 s->tasks[s->tasks_len] = t;
2617 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2618 GNUNET_assert (GNUNET_OK ==
2619 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2620 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2625 install_step_timeouts (struct ConsensusSession *session)
2627 /* Given the fully constructed task graph
2628 with rounds for tasks, we can give the tasks timeouts. */
2630 // unsigned int max_round;
2632 /* XXX: implement! */
2638 * Arrange two peers in some canonical order.
2641 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2646 GNUNET_assert (*p1 < n);
2647 GNUNET_assert (*p2 < n);
2660 /* For uniformly random *p1, *p2,
2661 this condition is true with 50% chance */
2662 if (((b - a) + n) % n <= n / 2)
2676 * Record @a dep as a dependency of @a step.
2679 step_depend_on (struct Step *step, struct Step *dep)
2681 /* We're not checking for cyclic dependencies,
2682 but this is a cheap sanity check. */
2683 GNUNET_assert (step != dep);
2684 GNUNET_assert (NULL != step);
2685 GNUNET_assert (NULL != dep);
2686 GNUNET_assert (dep->round <= step->round);
2688 #ifdef GNUNET_EXTRA_LOGGING
2689 /* Make sure we have complete debugging information.
2690 Also checks that we don't screw up too badly
2691 constructing the task graph. */
2692 GNUNET_assert (NULL != step->debug_name);
2693 GNUNET_assert (NULL != dep->debug_name);
2694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2695 "Making step `%s' depend on `%s'\n",
2700 if (dep->subordinates_cap == dep->subordinates_len)
2702 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2703 GNUNET_array_grow (dep->subordinates,
2704 dep->subordinates_cap,
2708 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2710 dep->subordinates[dep->subordinates_len] = step;
2711 dep->subordinates_len++;
2713 step->pending_prereq++;
2717 static struct Step *
2718 create_step (struct ConsensusSession *session, int round, int early_finishable)
2721 step = GNUNET_new (struct Step);
2722 step->session = session;
2723 step->round = round;
2724 step->early_finishable = early_finishable;
2725 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2726 session->steps_tail,
2733 * Construct the task graph for a single
2737 construct_task_graph_gradecast (struct ConsensusSession *session,
2740 struct Step *step_before,
2741 struct Step *step_after)
2743 uint16_t n = session->num_peers;
2744 uint16_t me = session->local_peer_idx;
2749 /* The task we're currently setting up. */
2750 struct TaskEntry task;
2753 struct Step *prev_step;
2759 round = step_before->round + 1;
2761 /* gcast step 1: leader disseminates */
2763 step = create_step (session, round, GNUNET_YES);
2765 #ifdef GNUNET_EXTRA_LOGGING
2766 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2768 step_depend_on (step, step_before);
2772 for (k = 0; k < n; k++)
2778 arrange_peers (&p1, &p2, n);
2779 task = ((struct TaskEntry) {
2781 .start = task_start_reconcile,
2782 .cancel = task_cancel_reconcile,
2783 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2785 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2786 put_task (session->taskmap, &task);
2788 /* We run this task to make sure that the leader
2789 has the stored the SET_KIND_LEADER set of himself,
2790 so he can participate in the rest of the gradecast
2791 without the code having to handle any special cases. */
2792 task = ((struct TaskEntry) {
2794 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2795 .start = task_start_reconcile,
2796 .cancel = task_cancel_reconcile,
2798 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2799 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2800 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2801 put_task (session->taskmap, &task);
2807 arrange_peers (&p1, &p2, n);
2808 task = ((struct TaskEntry) {
2810 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2811 .start = task_start_reconcile,
2812 .cancel = task_cancel_reconcile,
2814 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2815 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2816 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2817 put_task (session->taskmap, &task);
2820 /* gcast phase 2: echo */
2823 step = create_step (session, round, GNUNET_YES);
2824 #ifdef GNUNET_EXTRA_LOGGING
2825 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2827 step_depend_on (step, prev_step);
2829 for (k = 0; k < n; k++)
2833 arrange_peers (&p1, &p2, n);
2834 task = ((struct TaskEntry) {
2836 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2837 .start = task_start_reconcile,
2838 .cancel = task_cancel_reconcile,
2840 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2841 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2842 put_task (session->taskmap, &task);
2846 /* Same round, since step only has local tasks */
2847 step = create_step (session, round, GNUNET_YES);
2848 #ifdef GNUNET_EXTRA_LOGGING
2849 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2851 step_depend_on (step, prev_step);
2853 arrange_peers (&p1, &p2, n);
2854 task = ((struct TaskEntry) {
2855 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2857 .start = task_start_eval_echo
2859 put_task (session->taskmap, &task);
2863 step = create_step (session, round, GNUNET_YES);
2864 #ifdef GNUNET_EXTRA_LOGGING
2865 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2867 step_depend_on (step, prev_step);
2869 /* gcast phase 3: confirmation and grading */
2870 for (k = 0; k < n; k++)
2874 arrange_peers (&p1, &p2, n);
2875 task = ((struct TaskEntry) {
2877 .start = task_start_reconcile,
2878 .cancel = task_cancel_reconcile,
2879 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2881 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2882 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2883 /* If there was at least one element in the echo round that was
2884 contested (i.e. it had no n-t majority), then we let the other peers
2885 know, and other peers let us know. The contested flag for each peer is
2886 stored in the rfn. */
2887 task.cls.setop.transceive_contested = GNUNET_YES;
2888 put_task (session->taskmap, &task);
2892 /* Same round, since step only has local tasks */
2893 step = create_step (session, round, GNUNET_YES);
2894 #ifdef GNUNET_EXTRA_LOGGING
2895 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2897 step_depend_on (step, prev_step);
2899 task = ((struct TaskEntry) {
2901 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2902 .start = task_start_grade,
2904 put_task (session->taskmap, &task);
2906 step_depend_on (step_after, step);
2911 construct_task_graph (struct ConsensusSession *session)
2913 uint16_t n = session->num_peers;
2916 uint16_t me = session->local_peer_idx;
2918 /* The task we're currently setting up. */
2919 struct TaskEntry task;
2921 /* Current leader */
2925 struct Step *prev_step;
2927 unsigned int round = 0;
2931 // XXX: introduce first step,
2932 // where we wait for all insert acks
2933 // from the set service
2935 /* faster but brittle all-to-all */
2937 // XXX: Not implemented yet
2939 /* all-to-all step */
2941 step = create_step (session, round, GNUNET_NO);
2943 #ifdef GNUNET_EXTRA_LOGGING
2944 step->debug_name = GNUNET_strdup ("all to all");
2947 for (i = 0; i < n; i++)
2954 arrange_peers (&p1, &p2, n);
2955 task = ((struct TaskEntry) {
2956 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2958 .start = task_start_reconcile,
2959 .cancel = task_cancel_reconcile,
2961 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2962 task.cls.setop.output_set = task.cls.setop.input_set;
2963 task.cls.setop.do_not_remove = GNUNET_YES;
2964 put_task (session->taskmap, &task);
2969 step = create_step (session, round, GNUNET_NO);;
2970 #ifdef GNUNET_EXTRA_LOGGING
2971 step->debug_name = GNUNET_strdup ("all to all 2");
2973 step_depend_on (step, prev_step);
2976 for (i = 0; i < n; i++)
2983 arrange_peers (&p1, &p2, n);
2984 task = ((struct TaskEntry) {
2985 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
2987 .start = task_start_reconcile,
2988 .cancel = task_cancel_reconcile,
2990 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2991 task.cls.setop.output_set = task.cls.setop.input_set;
2992 task.cls.setop.do_not_remove = GNUNET_YES;
2993 put_task (session->taskmap, &task);
3003 /* Byzantine union */
3005 /* sequential repetitions of the gradecasts */
3006 for (i = 0; i < t + 1; i++)
3008 struct Step *step_rep_start;
3009 struct Step *step_rep_end;
3011 /* Every repetition is in a separate round. */
3012 step_rep_start = create_step (session, round, GNUNET_YES);
3013 #ifdef GNUNET_EXTRA_LOGGING
3014 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3017 step_depend_on (step_rep_start, prev_step);
3019 /* gradecast has three rounds */
3021 step_rep_end = create_step (session, round, GNUNET_YES);
3022 #ifdef GNUNET_EXTRA_LOGGING
3023 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3026 /* parallel gradecasts */
3027 for (lead = 0; lead < n; lead++)
3028 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3030 task = ((struct TaskEntry) {
3031 .step = step_rep_end,
3032 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3033 .start = task_start_apply_round,
3035 put_task (session->taskmap, &task);
3037 prev_step = step_rep_end;
3040 /* There is no next gradecast round, thus the final
3041 start step is the overall end step of the gradecasts */
3043 step = create_step (session, round, GNUNET_NO);
3044 #ifdef GNUNET_EXTRA_LOGGING
3045 GNUNET_asprintf (&step->debug_name, "finish");
3047 step_depend_on (step, prev_step);
3049 task = ((struct TaskEntry) {
3051 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3052 .start = task_start_finish,
3054 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3056 put_task (session->taskmap, &task);
3062 * Check join message.
3064 * @param cls session of client that sent the message
3065 * @param m message sent by the client
3066 * @return #GNUNET_OK if @a m is well-formed
3069 check_client_join (void *cls,
3070 const struct GNUNET_CONSENSUS_JoinMessage *m)
3072 uint32_t listed_peers = ntohl (m->num_peers);
3074 if ( (ntohs (m->header.size) - sizeof (*m)) !=
3075 listed_peers * sizeof (struct GNUNET_PeerIdentity))
3078 return GNUNET_SYSERR;
3085 * Called when a client wants to join a consensus session.
3087 * @param cls session of client that sent the message
3088 * @param m message sent by the client
3091 handle_client_join (void *cls,
3092 const struct GNUNET_CONSENSUS_JoinMessage *m)
3094 struct ConsensusSession *session = cls;
3095 struct ConsensusSession *other_session;
3097 initialize_session_peer_list (session,
3099 compute_global_id (session,
3102 /* Check if some local client already owns the session.
3103 It is only legal to have a session with an existing global id
3104 if all other sessions with this global id are finished.*/
3105 for (other_session = sessions_head;
3106 NULL != other_session;
3107 other_session = other_session->next)
3109 if ( (other_session != session) &&
3110 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3111 &other_session->global_id)) )
3115 session->conclude_deadline
3116 = GNUNET_TIME_absolute_ntoh (m->deadline);
3117 session->conclude_start
3118 = GNUNET_TIME_absolute_ntoh (m->start);
3119 session->local_peer_idx = get_peer_idx (&my_peer,
3121 GNUNET_assert (-1 != session->local_peer_idx);
3123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3124 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3125 GNUNET_h2s (&m->session_id),
3127 session->local_peer_idx,
3128 GNUNET_STRINGS_relative_time_to_string
3129 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3130 session->conclude_deadline),
3133 session->set_listener
3134 = GNUNET_SET_listen (cfg,
3135 GNUNET_SET_OPERATION_UNION,
3136 &session->global_id,
3140 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3142 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3144 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3146 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3150 struct SetEntry *client_set;
3152 client_set = GNUNET_new (struct SetEntry);
3153 client_set->h = GNUNET_SET_create (cfg,
3154 GNUNET_SET_OPERATION_UNION);
3155 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3160 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3163 /* Just construct the task graph,
3164 but don't run anything until the client calls conclude. */
3165 construct_task_graph (session);
3166 GNUNET_SERVICE_client_continue (session->client);
3171 client_insert_done (void *cls)
3178 * Called when a client performs an insert operation.
3180 * @param cls client handle
3181 * @param msg message sent by the client
3182 * @return #GNUNET_OK (always well-formed)
3185 check_client_insert (void *cls,
3186 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3193 * Called when a client performs an insert operation.
3195 * @param cls client handle
3196 * @param msg message sent by the client
3199 handle_client_insert (void *cls,
3200 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3202 struct ConsensusSession *session = cls;
3203 ssize_t element_size;
3204 struct GNUNET_SET_Handle *initial_set;
3205 struct ConsensusElement *ce;
3207 if (GNUNET_YES == session->conclude_started)
3210 GNUNET_SERVICE_client_drop (session->client);
3214 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3215 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3216 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3217 ce->payload_type = msg->element_type;
3219 struct GNUNET_SET_Element element = {
3220 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3221 .size = sizeof (struct ConsensusElement) + element_size,
3226 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3227 struct SetEntry *entry;
3229 entry = lookup_set (session,
3231 GNUNET_assert (NULL != entry);
3232 initial_set = entry->h;
3235 session->num_client_insert_pending++;
3236 GNUNET_SET_add_element (initial_set,
3238 &client_insert_done,
3241 #ifdef GNUNET_EXTRA_LOGGING
3243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3244 "P%u: element %s added\n",
3245 session->local_peer_idx,
3246 debug_str_element (&element));
3250 GNUNET_SERVICE_client_continue (session->client);
3255 * Called when a client performs the conclude operation.
3257 * @param cls client handle
3258 * @param message message sent by the client
3261 handle_client_conclude (void *cls,
3262 const struct GNUNET_MessageHeader *message)
3264 struct ConsensusSession *session = cls;
3266 if (GNUNET_YES == session->conclude_started)
3268 /* conclude started twice */
3270 GNUNET_SERVICE_client_drop (session->client);
3273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3274 "conclude requested\n");
3275 session->conclude_started = GNUNET_YES;
3276 install_step_timeouts (session);
3277 run_ready_steps (session);
3278 GNUNET_SERVICE_client_continue (session->client);
3283 * Called to clean up, after a shutdown has been requested.
3285 * @param cls closure
3288 shutdown_task (void *cls)
3290 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3292 GNUNET_STATISTICS_destroy (statistics,
3299 * Start processing consensus requests.
3301 * @param cls closure
3302 * @param c configuration to use
3303 * @param service the initialized service
3307 const struct GNUNET_CONFIGURATION_Handle *c,
3308 struct GNUNET_SERVICE_Handle *service)
3312 GNUNET_CRYPTO_get_peer_identity (cfg,
3315 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3316 "Could not retrieve host identity\n");
3317 GNUNET_SCHEDULER_shutdown ();
3320 statistics = GNUNET_STATISTICS_create ("consensus",
3322 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3328 * Callback called when a client connects to the service.
3330 * @param cls closure for the service
3331 * @param c the new client that connected to the service
3332 * @param mq the message queue used to send messages to the client
3336 client_connect_cb (void *cls,
3337 struct GNUNET_SERVICE_Client *c,
3338 struct GNUNET_MQ_Handle *mq)
3340 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3342 session->client = c;
3343 session->client_mq = mq;
3344 GNUNET_CONTAINER_DLL_insert (sessions_head,
3352 * Callback called when a client disconnected from the service
3354 * @param cls closure for the service
3355 * @param c the client that disconnected
3356 * @param internal_cls should be equal to @a c
3359 client_disconnect_cb (void *cls,
3360 struct GNUNET_SERVICE_Client *c,
3363 struct ConsensusSession *session = internal_cls;
3365 if (NULL != session->set_listener)
3367 GNUNET_SET_listen_cancel (session->set_listener);
3368 session->set_listener = NULL;
3370 GNUNET_CONTAINER_DLL_remove (sessions_head,
3373 GNUNET_free (session);
3378 * Define "main" method using service macro.
3382 GNUNET_SERVICE_OPTION_NONE,
3385 &client_disconnect_cb,
3387 GNUNET_MQ_hd_fixed_size (client_conclude,
3388 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3389 struct GNUNET_MessageHeader,
3391 GNUNET_MQ_hd_var_size (client_insert,
3392 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3393 struct GNUNET_CONSENSUS_ElementMessage,
3395 GNUNET_MQ_hd_var_size (client_join,
3396 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3397 struct GNUNET_CONSENSUS_JoinMessage,
3399 GNUNET_MQ_handler_end ());
3401 /* end of gnunet-service-consensus.c */