2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
42 * Vote that nothing should change.
43 * This option is never voted explicitly.
47 * Vote that an element should be added.
51 * Vote that an element should be removed.
57 enum EarlyStoppingPhase
59 EARLY_STOPPING_NONE = 0,
60 EARLY_STOPPING_ONE_MORE = 1,
61 EARLY_STOPPING_DONE = 2,
65 GNUNET_NETWORK_STRUCT_BEGIN
68 * Tuple of integers that together
69 * identify a task uniquely.
73 * A value from 'enum PhaseKind'.
75 uint16_t kind GNUNET_PACKED;
78 * Number of the first peer
81 int16_t peer1 GNUNET_PACKED;
84 * Number of the second peer in canonical order.
86 int16_t peer2 GNUNET_PACKED;
89 * Repetition of the gradecast phase.
91 int16_t repetition GNUNET_PACKED;
94 * Leader in the gradecast phase.
96 * Can be different from both peer1 and peer2.
98 int16_t leader GNUNET_PACKED;
105 int set_kind GNUNET_PACKED;
106 int k1 GNUNET_PACKED;
107 int k2 GNUNET_PACKED;
114 struct GNUNET_SET_Handle *h;
116 * GNUNET_YES if the set resulted
117 * from applying a referendum with contested
126 int diff_kind GNUNET_PACKED;
127 int k1 GNUNET_PACKED;
128 int k2 GNUNET_PACKED;
133 int rfn_kind GNUNET_PACKED;
134 int k1 GNUNET_PACKED;
135 int k2 GNUNET_PACKED;
139 GNUNET_NETWORK_STRUCT_END
143 PHASE_KIND_ALL_TO_ALL,
144 PHASE_KIND_ALL_TO_ALL_2,
145 PHASE_KIND_GRADECAST_LEADER,
146 PHASE_KIND_GRADECAST_ECHO,
147 PHASE_KIND_GRADECAST_ECHO_GRADE,
148 PHASE_KIND_GRADECAST_CONFIRM,
149 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
151 * Apply a repetition of the all-to-all
152 * gradecast to the current set.
154 PHASE_KIND_APPLY_REP,
164 * Last result set from a gradecast
166 SET_KIND_LAST_GRADECAST,
167 SET_KIND_LEADER_PROPOSAL,
168 SET_KIND_ECHO_RESULT,
174 DIFF_KIND_LEADER_PROPOSAL,
175 DIFF_KIND_LEADER_CONSENSUS,
176 DIFF_KIND_GRADECAST_RESULT,
184 RFN_KIND_GRADECAST_RESULT
190 struct SetKey input_set;
192 struct SetKey output_set;
193 struct RfnKey output_rfn;
194 struct DiffKey output_diff;
198 int transceive_contested;
200 struct GNUNET_SET_OperationHandle *op;
206 struct SetKey input_set;
210 * Closure for both @a start_task
211 * and @a cancel_task.
215 struct SetOpCls setop;
216 struct FinishCls finish;
221 typedef void (*TaskFunc) (struct TaskEntry *task);
224 * Node in the consensus task graph.
239 union TaskFuncCls cls;
246 * All steps of one session are in a
247 * linked list for easier deallocation.
252 * All steps of one session are in a
253 * linked list for easier deallocation.
257 struct ConsensusSession *session;
260 * Tasks that this step is composed of.
262 struct TaskEntry **tasks;
263 unsigned int tasks_len;
264 unsigned int tasks_cap;
266 unsigned int finished_tasks;
269 * Tasks that have this task as dependency.
271 * We store pointers to subordinates rather
272 * than to prerequisites since it makes
273 * tracking the readiness of a task easier.
275 struct Step **subordinates;
276 unsigned int subordinates_len;
277 unsigned int subordinates_cap;
280 * Counter for the prerequisites of
283 size_t pending_prereq;
286 * Task that will run this step despite
287 * any pending prerequisites.
289 struct GNUNET_SCHEDULER_Task *timeout_task;
291 unsigned int is_running;
293 unsigned int is_finished;
296 * Synchrony round of the task.
297 * Determines the deadline for the task.
302 * Human-readable name for
303 * the task, used for debugging.
308 * When we're doing an early finish, how should this step be
310 * If GNUNET_YES, the step will be marked as finished
311 * without actually running its tasks.
312 * Otherwise, the step will still be run even after
315 * Note that a task may never be finished early if
316 * it is already running.
318 int early_finishable;
322 struct RfnElementInfo
324 const struct GNUNET_SET_Element *element;
327 * GNUNET_YES if the peer votes for the proposal.
332 * Proposal for this element,
333 * can only be VOTE_ADD or VOTE_REMOVE.
335 enum ReferendumVote proposal;
339 struct ReferendumEntry
344 * Elements where there is at least one proposed change.
346 * Maps the hash of the GNUNET_SET_Element
347 * to 'struct RfnElementInfo'.
349 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
351 unsigned int num_peers;
354 * Stores, for every peer in the session,
355 * whether the peer finished the whole referendum.
357 * Votes from peers are only counted if they're
358 * marked as commited (#GNUNET_YES) in the referendum.
360 * Otherwise (#GNUNET_NO), the requested changes are
361 * not counted for majority votes or thresholds.
367 * Contestation state of the peer. If a peer is contested, the values it
368 * contributed are still counted for applying changes, but the grading is
375 struct DiffElementInfo
377 const struct GNUNET_SET_Element *element;
380 * Positive weight for 'add', negative
381 * weights for 'remove'.
393 struct GNUNET_CONTAINER_MultiHashMap *changes;
398 struct SetHandle *prev;
399 struct SetHandle *next;
401 struct GNUNET_SET_Handle *h;
407 * A consensus session consists of one local client and the remote authorities.
409 struct ConsensusSession
412 * Consensus sessions are kept in a DLL.
414 struct ConsensusSession *next;
417 * Consensus sessions are kept in a DLL.
419 struct ConsensusSession *prev;
421 unsigned int num_client_insert_pending;
423 struct GNUNET_CONTAINER_MultiHashMap *setmap;
424 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
425 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
428 * Array of peers with length 'num_peers'.
430 int *peers_blacklisted;
433 * Mapping from (hashed) TaskKey to TaskEntry.
435 * We map the application_id for a round to the task that should be
436 * executed, so we don't have to go through all task whenever we get
437 * an incoming set op request.
439 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
441 struct Step *steps_head;
442 struct Step *steps_tail;
444 int conclude_started;
449 * Global consensus identification, computed
450 * from the session id and participating authorities.
452 struct GNUNET_HashCode global_id;
455 * Client that inhabits the session
457 struct GNUNET_SERVICE_Client *client;
460 * Queued messages to the client.
462 struct GNUNET_MQ_Handle *client_mq;
465 * Time when the conclusion of the consensus should begin.
467 struct GNUNET_TIME_Absolute conclude_start;
470 * Timeout for all rounds together, single rounds will schedule a timeout task
471 * with a fraction of the conclude timeout.
472 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
474 struct GNUNET_TIME_Absolute conclude_deadline;
476 struct GNUNET_PeerIdentity *peers;
479 * Number of other peers in the consensus.
481 unsigned int num_peers;
484 * Index of the local peer in the peers array
486 unsigned int local_peer_idx;
489 * Listener for requests from other peers.
490 * Uses the session's global id as app id.
492 struct GNUNET_SET_ListenHandle *set_listener;
495 * State of our early stopping scheme.
500 * Our set size from the first round.
504 uint64_t *first_sizes_received;
507 * Bounded Eppstein lower bound.
509 uint64_t lower_bound;
511 struct SetHandle *set_handles_head;
512 struct SetHandle *set_handles_tail;
516 * Linked list of sessions this peer participates in.
518 static struct ConsensusSession *sessions_head;
521 * Linked list of sessions this peer participates in.
523 static struct ConsensusSession *sessions_tail;
526 * Configuration of the consensus service.
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
531 * Peer that runs this service.
533 static struct GNUNET_PeerIdentity my_peer;
538 struct GNUNET_STATISTICS_Handle *statistics;
542 finish_task (struct TaskEntry *task);
546 run_ready_steps (struct ConsensusSession *session);
550 phasename (uint16_t phase)
554 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556 case PHASE_KIND_FINISH: return "FINISH";
557 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563 default: return "(unknown)";
569 setname (uint16_t kind)
573 case SET_KIND_CURRENT: return "CURRENT";
574 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575 case SET_KIND_NONE: return "NONE";
576 default: return "(unknown)";
581 rfnname (uint16_t kind)
585 case RFN_KIND_NONE: return "NONE";
586 case RFN_KIND_ECHO: return "ECHO";
587 case RFN_KIND_CONFIRM: return "CONFIRM";
588 default: return "(unknown)";
593 diffname (uint16_t kind)
597 case DIFF_KIND_NONE: return "NONE";
598 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601 default: return "(unknown)";
605 #ifdef GNUNET_EXTRA_LOGGING
609 debug_str_element (const struct GNUNET_SET_Element *el)
611 struct GNUNET_HashCode hash;
613 GNUNET_SET_element_hash (el, &hash);
615 return GNUNET_h2s (&hash);
619 debug_str_task_key (struct TaskKey *tk)
621 static char buf[256];
623 snprintf (buf, sizeof (buf),
624 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625 phasename (tk->kind), tk->peer1, tk->peer2,
626 tk->leader, tk->repetition);
632 debug_str_diff_key (struct DiffKey *dk)
634 static char buf[256];
636 snprintf (buf, sizeof (buf),
637 "DiffKey kind=%s, k1=%d, k2=%d",
638 diffname (dk->diff_kind), dk->k1, dk->k2);
644 debug_str_set_key (const struct SetKey *sk)
646 static char buf[256];
648 snprintf (buf, sizeof (buf),
649 "SetKey kind=%s, k1=%d, k2=%d",
650 setname (sk->set_kind), sk->k1, sk->k2);
657 debug_str_rfn_key (const struct RfnKey *rk)
659 static char buf[256];
661 snprintf (buf, sizeof (buf),
662 "RfnKey kind=%s, k1=%d, k2=%d",
663 rfnname (rk->rfn_kind), rk->k1, rk->k2);
668 #endif /* GNUNET_EXTRA_LOGGING */
672 * Send the final result set of the consensus to the client, element by
676 * @param element the current element, NULL if all elements have been
678 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
681 send_to_client_iter (void *cls,
682 const struct GNUNET_SET_Element *element)
684 struct TaskEntry *task = (struct TaskEntry *) cls;
685 struct ConsensusSession *session = task->step->session;
686 struct GNUNET_MQ_Envelope *ev;
690 struct GNUNET_CONSENSUS_ElementMessage *m;
691 const struct ConsensusElement *ce;
693 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
696 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
701 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702 "P%d: sending element %s to client\n",
703 session->local_peer_idx,
704 debug_str_element (element));
706 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
707 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
708 m->element_type = ce->payload_type;
709 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710 GNUNET_MQ_send (session->client_mq, ev);
714 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715 "P%d: finished iterating elements for client\n",
716 session->local_peer_idx);
717 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
718 GNUNET_MQ_send (session->client_mq, ev);
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
727 struct GNUNET_HashCode hash;
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "P%u: looking up set {%s}\n",
731 session->local_peer_idx,
732 debug_str_set_key (key));
734 GNUNET_assert (SET_KIND_NONE != key->set_kind);
735 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
743 struct GNUNET_HashCode hash;
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "P%u: looking up diff {%s}\n",
747 session->local_peer_idx,
748 debug_str_diff_key (key));
750 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
751 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
759 struct GNUNET_HashCode hash;
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "P%u: looking up rfn {%s}\n",
763 session->local_peer_idx,
764 debug_str_rfn_key (key));
766 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
767 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
773 diff_insert (struct DiffEntry *diff,
775 const struct GNUNET_SET_Element *element)
777 struct DiffElementInfo *di;
778 struct GNUNET_HashCode hash;
780 GNUNET_assert ( (1 == weight) || (-1 == weight));
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "diff_insert with element size %u\n",
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "hashing element\n");
789 GNUNET_SET_element_hash (element, &hash);
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
798 di = GNUNET_new (struct DiffElementInfo);
799 di->element = GNUNET_SET_element_dup (element);
800 GNUNET_assert (GNUNET_OK ==
801 GNUNET_CONTAINER_multihashmap_put (diff->changes,
803 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
811 rfn_commit (struct ReferendumEntry *rfn,
812 uint16_t commit_peer)
814 GNUNET_assert (commit_peer < rfn->num_peers);
816 rfn->peer_commited[commit_peer] = GNUNET_YES;
821 rfn_contest (struct ReferendumEntry *rfn,
822 uint16_t contested_peer)
824 GNUNET_assert (contested_peer < rfn->num_peers);
826 rfn->peer_contested[contested_peer] = GNUNET_YES;
831 rfn_noncontested (struct ReferendumEntry *rfn)
837 for (i = 0; i < rfn->num_peers; i++)
838 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
846 rfn_vote (struct ReferendumEntry *rfn,
847 uint16_t voting_peer,
848 enum ReferendumVote vote,
849 const struct GNUNET_SET_Element *element)
851 struct RfnElementInfo *ri;
852 struct GNUNET_HashCode hash;
854 GNUNET_assert (voting_peer < rfn->num_peers);
856 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857 since VOTE_KEEP is implicit in not voting. */
858 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
860 GNUNET_SET_element_hash (element, &hash);
861 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
865 ri = GNUNET_new (struct RfnElementInfo);
866 ri->element = GNUNET_SET_element_dup (element);
867 ri->votes = GNUNET_new_array (rfn->num_peers, int);
868 GNUNET_assert (GNUNET_OK ==
869 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
871 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
874 ri->votes[voting_peer] = GNUNET_YES;
880 task_other_peer (struct TaskEntry *task)
882 uint16_t me = task->step->session->local_peer_idx;
883 if (task->key.peer1 == me)
884 return task->key.peer2;
885 return task->key.peer1;
890 cmp_uint64_t (const void *pa, const void *pb)
892 uint64_t a = *(uint64_t *) pa;
893 uint64_t b = *(uint64_t *) pb;
904 * Callback for set operation results. Called for each element
908 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
909 * @param current_size current set size
910 * @param status see enum GNUNET_SET_Status
913 set_result_cb (void *cls,
914 const struct GNUNET_SET_Element *element,
915 uint64_t current_size,
916 enum GNUNET_SET_Status status)
918 struct TaskEntry *task = cls;
919 struct ConsensusSession *session = task->step->session;
920 struct SetEntry *output_set = NULL;
921 struct DiffEntry *output_diff = NULL;
922 struct ReferendumEntry *output_rfn = NULL;
923 unsigned int other_idx;
924 struct SetOpCls *setop;
925 const struct ConsensusElement *consensus_element = NULL;
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930 "P%u: got element of type %u, status %u\n",
931 session->local_peer_idx,
932 (unsigned) element->element_type,
934 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
935 consensus_element = element->data;
938 setop = &task->cls.setop;
941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942 "P%u: got set result for {%s}, status %u\n",
943 session->local_peer_idx,
944 debug_str_task_key (&task->key),
947 if (GNUNET_NO == task->is_started)
953 if (GNUNET_YES == task->is_finished)
959 other_idx = task_other_peer (task);
961 if (SET_KIND_NONE != setop->output_set.set_kind)
963 output_set = lookup_set (session, &setop->output_set);
964 GNUNET_assert (NULL != output_set);
967 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
969 output_diff = lookup_diff (session, &setop->output_diff);
970 GNUNET_assert (NULL != output_diff);
973 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
975 output_rfn = lookup_rfn (session, &setop->output_rfn);
976 GNUNET_assert (NULL != output_rfn);
979 if (GNUNET_YES == session->peers_blacklisted[other_idx])
981 /* Peer might have been blacklisted
982 by a gradecast running in parallel, ignore elements from now */
983 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
985 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
989 if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
991 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
992 "P%u: got some marker\n",
993 session->local_peer_idx);
994 if ( (GNUNET_YES == setop->transceive_contested) &&
995 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
997 GNUNET_assert (NULL != output_rfn);
998 rfn_contest (output_rfn, task_other_peer (task));
1002 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1005 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1006 "P%u: got size marker\n",
1007 session->local_peer_idx);
1010 struct ConsensusSizeElement *cse = (void *) consensus_element;
1012 if (cse->sender_index == other_idx)
1014 if (NULL == session->first_sizes_received)
1015 session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1018 uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019 qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020 session->lower_bound = copy[session->num_peers / 3 + 1];
1021 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1022 "P%u: lower bound %llu\n",
1023 session->local_peer_idx,
1024 (long long) session->lower_bound);
1035 case GNUNET_SET_STATUS_ADD_LOCAL:
1036 GNUNET_assert (NULL != consensus_element);
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "Adding element in Task {%s}\n",
1039 debug_str_task_key (&task->key));
1040 if (NULL != output_set)
1042 // FIXME: record pending adds, use callback
1043 GNUNET_SET_add_element (output_set->h,
1047 #ifdef GNUNET_EXTRA_LOGGING
1048 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1049 "P%u: adding element %s into set {%s} of task {%s}\n",
1050 session->local_peer_idx,
1051 debug_str_element (element),
1052 debug_str_set_key (&setop->output_set),
1053 debug_str_task_key (&task->key));
1056 if (NULL != output_diff)
1058 diff_insert (output_diff, 1, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1060 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1061 "P%u: adding element %s into diff {%s} of task {%s}\n",
1062 session->local_peer_idx,
1063 debug_str_element (element),
1064 debug_str_diff_key (&setop->output_diff),
1065 debug_str_task_key (&task->key));
1068 if (NULL != output_rfn)
1070 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1071 #ifdef GNUNET_EXTRA_LOGGING
1072 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1073 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1074 session->local_peer_idx,
1075 debug_str_element (element),
1076 debug_str_rfn_key (&setop->output_rfn),
1077 debug_str_task_key (&task->key));
1080 // XXX: add result to structures in task
1082 case GNUNET_SET_STATUS_ADD_REMOTE:
1083 GNUNET_assert (NULL != consensus_element);
1084 if (GNUNET_YES == setop->do_not_remove)
1086 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1088 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1089 "Removing element in Task {%s}\n",
1090 debug_str_task_key (&task->key));
1091 if (NULL != output_set)
1093 // FIXME: record pending adds, use callback
1094 GNUNET_SET_remove_element (output_set->h,
1098 #ifdef GNUNET_EXTRA_LOGGING
1099 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1100 "P%u: removing element %s from set {%s} of task {%s}\n",
1101 session->local_peer_idx,
1102 debug_str_element (element),
1103 debug_str_set_key (&setop->output_set),
1104 debug_str_task_key (&task->key));
1107 if (NULL != output_diff)
1109 diff_insert (output_diff, -1, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1111 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1112 "P%u: removing element %s from diff {%s} of task {%s}\n",
1113 session->local_peer_idx,
1114 debug_str_element (element),
1115 debug_str_diff_key (&setop->output_diff),
1116 debug_str_task_key (&task->key));
1119 if (NULL != output_rfn)
1121 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1122 #ifdef GNUNET_EXTRA_LOGGING
1123 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1124 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1125 session->local_peer_idx,
1126 debug_str_element (element),
1127 debug_str_rfn_key (&setop->output_rfn),
1128 debug_str_task_key (&task->key));
1132 case GNUNET_SET_STATUS_DONE:
1133 // XXX: check first if any changes to the underlying
1134 // set are still pending
1135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136 "Finishing setop in Task {%s}\n",
1137 debug_str_task_key (&task->key));
1138 if (NULL != output_rfn)
1140 rfn_commit (output_rfn, task_other_peer (task));
1142 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1144 session->first_size = current_size;
1148 case GNUNET_SET_STATUS_FAILURE:
1150 GNUNET_break_op (0);
1171 enum EvilnessSubType
1174 EVILNESS_SUB_REPLACEMENT,
1175 EVILNESS_SUB_NO_REPLACEMENT,
1180 enum EvilnessType type;
1181 enum EvilnessSubType subtype;
1187 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1189 if (0 == strcmp ("replace", evil_subtype_str))
1191 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1193 else if (0 == strcmp ("noreplace", evil_subtype_str))
1195 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1199 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1200 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1202 return GNUNET_SYSERR;
1209 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1213 char *evil_type_str = NULL;
1214 char *evil_subtype_str = NULL;
1216 GNUNET_assert (NULL != evil);
1218 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221 "P%u: no evilness\n",
1222 session->local_peer_idx);
1223 evil->type = EVILNESS_NONE;
1226 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227 "P%u: got evilness spec\n",
1228 session->local_peer_idx);
1230 for (field = strtok (evil_spec, "/");
1232 field = strtok (NULL, "/"))
1234 unsigned int peer_num;
1235 unsigned int evil_num;
1238 evil_type_str = NULL;
1239 evil_subtype_str = NULL;
1241 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1245 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1246 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1252 GNUNET_assert (NULL != evil_type_str);
1253 GNUNET_assert (NULL != evil_subtype_str);
1255 if (peer_num == session->local_peer_idx)
1257 if (0 == strcmp ("slack", evil_type_str))
1259 evil->type = EVILNESS_SLACK;
1261 if (0 == strcmp ("slack-a2a", evil_type_str))
1263 evil->type = EVILNESS_SLACK_A2A;
1265 else if (0 == strcmp ("cram-all", evil_type_str))
1267 evil->type = EVILNESS_CRAM_ALL;
1268 evil->num = evil_num;
1269 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1272 else if (0 == strcmp ("cram-lead", evil_type_str))
1274 evil->type = EVILNESS_CRAM_LEAD;
1275 evil->num = evil_num;
1276 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1279 else if (0 == strcmp ("cram-echo", evil_type_str))
1281 evil->type = EVILNESS_CRAM_ECHO;
1282 evil->num = evil_num;
1283 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1288 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1289 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1295 /* No GNUNET_free since memory was allocated by libc */
1296 free (evil_type_str);
1297 evil_type_str = NULL;
1298 evil_subtype_str = NULL;
1301 evil->type = EVILNESS_NONE;
1303 GNUNET_free (evil_spec);
1304 /* no GNUNET_free_non_null since it wasn't
1305 * allocated with GNUNET_malloc */
1306 if (NULL != evil_type_str)
1307 free (evil_type_str);
1308 if (NULL != evil_subtype_str)
1309 free (evil_subtype_str);
1316 * Commit the appropriate set for a
1320 commit_set (struct ConsensusSession *session,
1321 struct TaskEntry *task)
1323 struct SetEntry *set;
1324 struct SetOpCls *setop = &task->cls.setop;
1326 GNUNET_assert (NULL != setop->op);
1327 set = lookup_set (session, &setop->input_set);
1328 GNUNET_assert (NULL != set);
1330 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1332 struct GNUNET_SET_Element element;
1333 struct ConsensusElement ce = { 0 };
1334 ce.marker = CONSENSUS_MARKER_CONTESTED;
1336 element.size = sizeof (struct ConsensusElement);
1337 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1338 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1341 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1343 struct GNUNET_SET_Element element;
1344 struct ConsensusSizeElement cse = {
1348 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
1349 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1350 cse.size = GNUNET_htonll (session->first_size);
1351 cse.sender_index = session->local_peer_idx;
1352 element.data = &cse;
1353 element.size = sizeof (struct ConsensusSizeElement);
1354 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1355 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1361 struct Evilness evil;
1363 get_evilness (session, &evil);
1364 if (EVILNESS_NONE != evil.type)
1366 /* Useful for evaluation */
1367 GNUNET_STATISTICS_set (statistics,
1374 case EVILNESS_CRAM_ALL:
1375 case EVILNESS_CRAM_LEAD:
1376 case EVILNESS_CRAM_ECHO:
1377 /* We're not cramming elements in the
1378 all-to-all round, since that would just
1379 add more elements to the result set, but
1380 wouldn't test robustness. */
1381 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1383 GNUNET_SET_commit (setop->op, set->h);
1386 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1387 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1389 GNUNET_SET_commit (setop->op, set->h);
1392 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1394 GNUNET_SET_commit (setop->op, set->h);
1397 for (i = 0; i < evil.num; i++)
1399 struct GNUNET_SET_Element element;
1400 struct ConsensusStuffedElement se = {
1401 .ce.payload_type = 0,
1405 element.size = sizeof (struct ConsensusStuffedElement);
1406 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1408 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1410 /* Always generate a new element. */
1411 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1413 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1415 /* Always cram the same elements, derived from counter. */
1416 GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1422 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1423 #ifdef GNUNET_EXTRA_LOGGING
1424 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1425 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1426 session->local_peer_idx,
1427 debug_str_element (&element),
1428 debug_str_set_key (&setop->input_set),
1429 debug_str_task_key (&task->key));
1432 GNUNET_STATISTICS_update (statistics,
1433 "# stuffed elements",
1436 GNUNET_SET_commit (setop->op, set->h);
1438 case EVILNESS_SLACK:
1439 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1440 "P%u: evil peer: slacking\n",
1441 (unsigned int) session->local_peer_idx);
1443 case EVILNESS_SLACK_A2A:
1444 if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1445 (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1447 struct GNUNET_SET_Handle *empty_set;
1448 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1449 GNUNET_SET_commit (setop->op, empty_set);
1450 GNUNET_SET_destroy (empty_set);
1454 GNUNET_SET_commit (setop->op, set->h);
1458 GNUNET_SET_commit (setop->op, set->h);
1463 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1465 GNUNET_SET_commit (setop->op, set->h);
1469 /* For our testcases, we don't want the blacklisted
1471 GNUNET_SET_operation_cancel (setop->op);
1479 put_diff (struct ConsensusSession *session,
1480 struct DiffEntry *diff)
1482 struct GNUNET_HashCode hash;
1484 GNUNET_assert (NULL != diff);
1486 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1487 GNUNET_assert (GNUNET_OK ==
1488 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1489 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1493 put_set (struct ConsensusSession *session,
1494 struct SetEntry *set)
1496 struct GNUNET_HashCode hash;
1498 GNUNET_assert (NULL != set->h);
1500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1502 debug_str_set_key (&set->key));
1504 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1505 GNUNET_assert (GNUNET_SYSERR !=
1506 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1507 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1512 put_rfn (struct ConsensusSession *session,
1513 struct ReferendumEntry *rfn)
1515 struct GNUNET_HashCode hash;
1517 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1518 GNUNET_assert (GNUNET_OK ==
1519 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1520 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1526 task_cancel_reconcile (struct TaskEntry *task)
1528 /* not implemented yet */
1534 apply_diff_to_rfn (struct DiffEntry *diff,
1535 struct ReferendumEntry *rfn,
1536 uint16_t voting_peer,
1539 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1540 struct DiffElementInfo *di;
1542 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1544 while (GNUNET_YES ==
1545 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1547 (const void **) &di))
1551 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1555 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1559 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1566 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1568 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1575 diff_compose (struct DiffEntry *diff_1,
1576 struct DiffEntry *diff_2)
1578 struct DiffEntry *diff_new;
1579 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1580 struct DiffElementInfo *di;
1582 diff_new = diff_create ();
1584 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1585 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1587 diff_insert (diff_new, di->weight, di->element);
1589 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1591 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1592 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1594 diff_insert (diff_new, di->weight, di->element);
1596 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1602 struct ReferendumEntry *
1603 rfn_create (uint16_t size)
1605 struct ReferendumEntry *rfn;
1607 rfn = GNUNET_new (struct ReferendumEntry);
1608 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1609 rfn->peer_commited = GNUNET_new_array (size, int);
1610 rfn->peer_contested = GNUNET_new_array (size, int);
1611 rfn->num_peers = size;
1619 diff_destroy (struct DiffEntry *diff)
1621 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1628 * For a given majority, count what the outcome
1629 * is (add/remove/keep), and give the number
1630 * of peers that voted for this outcome.
1633 rfn_majority (const struct ReferendumEntry *rfn,
1634 const struct RfnElementInfo *ri,
1635 uint16_t *ret_majority,
1636 enum ReferendumVote *ret_vote)
1638 uint16_t votes_yes = 0;
1639 uint16_t num_commited = 0;
1642 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1643 "Computing rfn majority for element %s of rfn {%s}\n",
1644 debug_str_element (ri->element),
1645 debug_str_rfn_key (&rfn->key));
1647 for (i = 0; i < rfn->num_peers; i++)
1649 if (GNUNET_NO == rfn->peer_commited[i])
1653 if (GNUNET_YES == ri->votes[i])
1657 if (votes_yes > (num_commited) / 2)
1659 *ret_vote = ri->proposal;
1660 *ret_majority = votes_yes;
1664 *ret_vote = VOTE_STAY;
1665 *ret_majority = num_commited - votes_yes;
1672 struct TaskEntry *task;
1673 struct SetKey dst_set_key;
1678 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1680 struct SetCopyCls *scc = cls;
1681 struct TaskEntry *task = scc->task;
1682 struct SetKey dst_set_key = scc->dst_set_key;
1683 struct SetEntry *set;
1684 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1687 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1688 task->step->session->set_handles_tail,
1692 set = GNUNET_new (struct SetEntry);
1694 set->key = dst_set_key;
1695 put_set (task->step->session, set);
1702 * Call the start function of the given
1703 * task again after we created a copy of the given set.
1706 create_set_copy_for_task (struct TaskEntry *task,
1707 struct SetKey *src_set_key,
1708 struct SetKey *dst_set_key)
1710 struct SetEntry *src_set;
1711 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1713 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714 "Copying set {%s} to {%s} for task {%s}\n",
1715 debug_str_set_key (src_set_key),
1716 debug_str_set_key (dst_set_key),
1717 debug_str_task_key (&task->key));
1720 scc->dst_set_key = *dst_set_key;
1721 src_set = lookup_set (task->step->session, src_set_key);
1722 GNUNET_assert (NULL != src_set);
1723 GNUNET_SET_copy_lazy (src_set->h,
1729 struct SetMutationProgressCls
1733 * Task to finish once all changes are through.
1735 struct TaskEntry *task;
1740 set_mutation_done (void *cls)
1742 struct SetMutationProgressCls *pc = cls;
1744 GNUNET_assert (pc->num_pending > 0);
1748 if (0 == pc->num_pending)
1750 struct TaskEntry *task = pc->task;
1758 try_finish_step_early (struct Step *step)
1762 if (GNUNET_YES == step->is_running)
1764 if (GNUNET_YES == step->is_finished)
1766 if (GNUNET_NO == step->early_finishable)
1769 step->is_finished = GNUNET_YES;
1771 #ifdef GNUNET_EXTRA_LOGGING
1772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1773 "Finishing step `%s' early.\n",
1777 for (i = 0; i < step->subordinates_len; i++)
1779 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1780 step->subordinates[i]->pending_prereq--;
1781 #ifdef GNUNET_EXTRA_LOGGING
1782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783 "Decreased pending_prereq to %u for step `%s'.\n",
1784 (unsigned int) step->subordinates[i]->pending_prereq,
1785 step->subordinates[i]->debug_name);
1788 try_finish_step_early (step->subordinates[i]);
1791 // XXX: maybe schedule as task to avoid recursion?
1792 run_ready_steps (step->session);
1797 finish_step (struct Step *step)
1801 GNUNET_assert (step->finished_tasks == step->tasks_len);
1802 GNUNET_assert (GNUNET_YES == step->is_running);
1803 GNUNET_assert (GNUNET_NO == step->is_finished);
1805 #ifdef GNUNET_EXTRA_LOGGING
1806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1807 "All tasks of step `%s' with %u subordinates finished.\n",
1809 step->subordinates_len);
1812 for (i = 0; i < step->subordinates_len; i++)
1814 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1815 step->subordinates[i]->pending_prereq--;
1816 #ifdef GNUNET_EXTRA_LOGGING
1817 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818 "Decreased pending_prereq to %u for step `%s'.\n",
1819 (unsigned int) step->subordinates[i]->pending_prereq,
1820 step->subordinates[i]->debug_name);
1825 step->is_finished = GNUNET_YES;
1827 // XXX: maybe schedule as task to avoid recursion?
1828 run_ready_steps (step->session);
1834 * Apply the result from one round of gradecasts (i.e. every peer
1835 * should have gradecasted) to the peer's current set.
1837 * @param task the task with context information
1840 task_start_apply_round (struct TaskEntry *task)
1842 struct ConsensusSession *session = task->step->session;
1843 struct SetKey sk_in;
1844 struct SetKey sk_out;
1845 struct RfnKey rk_in;
1846 struct SetEntry *set_out;
1847 struct ReferendumEntry *rfn_in;
1848 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1849 struct RfnElementInfo *ri;
1850 struct SetMutationProgressCls *progress_cls;
1851 uint16_t worst_majority = UINT16_MAX;
1853 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1854 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1855 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1857 set_out = lookup_set (session, &sk_out);
1858 if (NULL == set_out)
1860 create_set_copy_for_task (task, &sk_in, &sk_out);
1864 rfn_in = lookup_rfn (session, &rk_in);
1865 GNUNET_assert (NULL != rfn_in);
1867 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1868 progress_cls->task = task;
1870 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1872 while (GNUNET_YES ==
1873 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1875 (const void **) &ri))
1877 uint16_t majority_num;
1878 enum ReferendumVote majority_vote;
1880 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1882 if (worst_majority > majority_num)
1883 worst_majority = majority_num;
1885 switch (majority_vote)
1888 progress_cls->num_pending++;
1889 GNUNET_assert (GNUNET_OK ==
1890 GNUNET_SET_add_element (set_out->h,
1894 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1895 "P%u: apply round: adding element %s with %u-majority.\n",
1896 session->local_peer_idx,
1897 debug_str_element (ri->element), majority_num);
1900 progress_cls->num_pending++;
1901 GNUNET_assert (GNUNET_OK ==
1902 GNUNET_SET_remove_element (set_out->h,
1906 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1907 "P%u: apply round: deleting element %s with %u-majority.\n",
1908 session->local_peer_idx,
1909 debug_str_element (ri->element), majority_num);
1912 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1913 "P%u: apply round: keeping element %s with %u-majority.\n",
1914 session->local_peer_idx,
1915 debug_str_element (ri->element), majority_num);
1924 if (0 == progress_cls->num_pending)
1926 // call closure right now, no pending ops
1927 GNUNET_free (progress_cls);
1932 uint16_t thresh = (session->num_peers / 3) * 2;
1934 if (worst_majority >= thresh)
1936 switch (session->early_stopping)
1938 case EARLY_STOPPING_NONE:
1939 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1940 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1941 "P%u: Stopping early (after one more superround)\n",
1942 session->local_peer_idx);
1944 case EARLY_STOPPING_ONE_MORE:
1945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1946 session->local_peer_idx);
1947 session->early_stopping = EARLY_STOPPING_DONE;
1950 for (step = session->steps_head; NULL != step; step = step->next)
1951 try_finish_step_early (step);
1954 case EARLY_STOPPING_DONE:
1955 /* We shouldn't be here anymore after early stopping */
1963 else if (EARLY_STOPPING_NONE != session->early_stopping)
1965 // Our assumption about the number of bad peers
1967 GNUNET_break_op (0);
1971 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1972 session->local_peer_idx);
1975 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1980 task_start_grade (struct TaskEntry *task)
1982 struct ConsensusSession *session = task->step->session;
1983 struct ReferendumEntry *output_rfn;
1984 struct ReferendumEntry *input_rfn;
1985 struct DiffEntry *input_diff;
1986 struct RfnKey rfn_key;
1987 struct DiffKey diff_key;
1988 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1989 struct RfnElementInfo *ri;
1990 unsigned int gradecast_confidence = 2;
1992 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1993 output_rfn = lookup_rfn (session, &rfn_key);
1994 if (NULL == output_rfn)
1996 output_rfn = rfn_create (session->num_peers);
1997 output_rfn->key = rfn_key;
1998 put_rfn (session, output_rfn);
2001 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2002 input_diff = lookup_diff (session, &diff_key);
2003 GNUNET_assert (NULL != input_diff);
2005 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2006 input_rfn = lookup_rfn (session, &rfn_key);
2007 GNUNET_assert (NULL != input_rfn);
2009 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2011 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2013 while (GNUNET_YES ==
2014 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2016 (const void **) &ri))
2018 uint16_t majority_num;
2019 enum ReferendumVote majority_vote;
2021 // XXX: we need contested votes and non-contested votes here
2022 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2024 if (majority_num <= session->num_peers / 3)
2025 majority_vote = VOTE_REMOVE;
2027 switch (majority_vote)
2032 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2035 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2042 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2045 uint16_t noncontested;
2046 noncontested = rfn_noncontested (input_rfn);
2047 if (noncontested < (session->num_peers / 3) * 2)
2049 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2051 if (noncontested < (session->num_peers / 3) + 1)
2053 gradecast_confidence = 0;
2057 if (gradecast_confidence >= 1)
2058 rfn_commit (output_rfn, task->key.leader);
2060 if (gradecast_confidence <= 1)
2061 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2068 task_start_reconcile (struct TaskEntry *task)
2070 struct SetEntry *input;
2071 struct SetOpCls *setop = &task->cls.setop;
2072 struct ConsensusSession *session = task->step->session;
2074 input = lookup_set (session, &setop->input_set);
2075 GNUNET_assert (NULL != input);
2076 GNUNET_assert (NULL != input->h);
2078 /* We create the outputs for the operation here
2079 (rather than in the set operation callback)
2080 because we want something valid in there, even
2081 if the other peer doesn't talk to us */
2083 if (SET_KIND_NONE != setop->output_set.set_kind)
2085 /* If we don't have an existing output set,
2086 we clone the input set. */
2087 if (NULL == lookup_set (session, &setop->output_set))
2089 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2094 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2096 if (NULL == lookup_rfn (session, &setop->output_rfn))
2098 struct ReferendumEntry *rfn;
2100 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2101 "P%u: output rfn <%s> missing, creating.\n",
2102 session->local_peer_idx,
2103 debug_str_rfn_key (&setop->output_rfn));
2105 rfn = rfn_create (session->num_peers);
2106 rfn->key = setop->output_rfn;
2107 put_rfn (session, rfn);
2111 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2113 if (NULL == lookup_diff (session, &setop->output_diff))
2115 struct DiffEntry *diff;
2117 diff = diff_create ();
2118 diff->key = setop->output_diff;
2119 put_diff (session, diff);
2123 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2125 /* XXX: mark the corresponding rfn as commited if necessary */
2130 if (task->key.peer1 == session->local_peer_idx)
2132 struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2135 "P%u: Looking up set {%s} to run remote union\n",
2136 session->local_peer_idx,
2137 debug_str_set_key (&setop->input_set));
2139 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2140 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2142 rcm.kind = htons (task->key.kind);
2143 rcm.peer1 = htons (task->key.peer1);
2144 rcm.peer2 = htons (task->key.peer2);
2145 rcm.leader = htons (task->key.leader);
2146 rcm.repetition = htons (task->key.repetition);
2147 rcm.is_contested = htons (0);
2149 GNUNET_assert (NULL == setop->op);
2150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2151 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2153 struct GNUNET_SET_Option opts[] = {
2154 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2155 { GNUNET_SET_OPTION_END },
2158 // XXX: maybe this should be done while
2159 // setting up tasks alreays?
2160 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2161 &session->global_id,
2163 GNUNET_SET_RESULT_SYMMETRIC,
2168 commit_set (session, task);
2170 else if (task->key.peer2 == session->local_peer_idx)
2172 /* Wait for the other peer to contact us */
2173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2174 session->local_peer_idx, task->key.peer1);
2176 if (NULL != setop->op)
2178 commit_set (session, task);
2183 /* We made an error while constructing the task graph. */
2190 task_start_eval_echo (struct TaskEntry *task)
2192 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2193 struct ReferendumEntry *input_rfn;
2194 struct RfnElementInfo *ri;
2195 struct SetEntry *output_set;
2196 struct SetMutationProgressCls *progress_cls;
2197 struct ConsensusSession *session = task->step->session;
2198 struct SetKey sk_in;
2199 struct SetKey sk_out;
2200 struct RfnKey rk_in;
2202 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2203 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2204 output_set = lookup_set (session, &sk_out);
2205 if (NULL == output_set)
2207 create_set_copy_for_task (task, &sk_in, &sk_out);
2213 // FIXME: should be marked as a shallow copy, so
2214 // we can destroy everything correctly
2215 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2216 last_set->h = output_set->h;
2217 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2218 put_set (session, last_set);
2221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2222 "Evaluating referendum in Task {%s}\n",
2223 debug_str_task_key (&task->key));
2225 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2226 progress_cls->task = task;
2228 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2229 input_rfn = lookup_rfn (session, &rk_in);
2231 GNUNET_assert (NULL != input_rfn);
2233 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2234 GNUNET_assert (NULL != iter);
2236 while (GNUNET_YES ==
2237 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2239 (const void **) &ri))
2241 enum ReferendumVote majority_vote;
2242 uint16_t majority_num;
2244 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2246 if (majority_num < session->num_peers / 3)
2248 /* It is not the case that all nonfaulty peers
2249 echoed the same value. Since we're doing a set reconciliation, we
2250 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2251 reconciliation as contested. Other peers might not know that the
2252 leader is faulty, thus we still re-distribute in the confirmation
2254 output_set->is_contested = GNUNET_YES;
2257 switch (majority_vote)
2260 progress_cls->num_pending++;
2261 GNUNET_assert (GNUNET_OK ==
2262 GNUNET_SET_add_element (output_set->h,
2268 progress_cls->num_pending++;
2269 GNUNET_assert (GNUNET_OK ==
2270 GNUNET_SET_remove_element (output_set->h,
2276 /* Nothing to do. */
2284 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2286 if (0 == progress_cls->num_pending)
2288 // call closure right now, no pending ops
2289 GNUNET_free (progress_cls);
2296 task_start_finish (struct TaskEntry *task)
2298 struct SetEntry *final_set;
2299 struct ConsensusSession *session = task->step->session;
2301 final_set = lookup_set (session, &task->cls.finish.input_set);
2303 GNUNET_assert (NULL != final_set);
2306 GNUNET_SET_iterate (final_set->h,
2307 send_to_client_iter,
2312 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2316 GNUNET_assert (GNUNET_NO == task->is_started);
2317 GNUNET_assert (GNUNET_NO == task->is_finished);
2318 GNUNET_assert (NULL != task->start);
2322 task->is_started = GNUNET_YES;
2329 * Run all steps of the session that don't any
2330 * more dependencies.
2333 run_ready_steps (struct ConsensusSession *session)
2337 step = session->steps_head;
2339 while (NULL != step)
2341 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2345 GNUNET_assert (0 == step->finished_tasks);
2347 #ifdef GNUNET_EXTRA_LOGGING
2348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2349 session->local_peer_idx,
2351 step->round, step->tasks_len, step->subordinates_len);
2354 step->is_running = GNUNET_YES;
2355 for (i = 0; i < step->tasks_len; i++)
2356 start_task (session, step->tasks[i]);
2358 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2359 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2362 /* Running the next ready steps will be triggered by task completion */
2374 finish_task (struct TaskEntry *task)
2376 GNUNET_assert (GNUNET_NO == task->is_finished);
2377 task->is_finished = GNUNET_YES;
2379 task->step->finished_tasks++;
2381 if (task->step->finished_tasks == task->step->tasks_len)
2382 finish_step (task->step);
2387 * Search peer in the list of peers in session.
2389 * @param peer peer to find
2390 * @param session session with peer
2391 * @return index of peer, -1 if peer is not in session
2394 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2397 for (i = 0; i < session->num_peers; i++)
2398 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2405 * Compute a global, (hopefully) unique consensus session id,
2406 * from the local id of the consensus session, and the identities of all participants.
2407 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2408 * exactly the same peers, the global id will be different.
2410 * @param session session to generate the global id for
2411 * @param local_session_id local id of the consensus session
2414 compute_global_id (struct ConsensusSession *session,
2415 const struct GNUNET_HashCode *local_session_id)
2417 const char *salt = "gnunet-service-consensus/session_id";
2419 GNUNET_assert (GNUNET_YES ==
2420 GNUNET_CRYPTO_kdf (&session->global_id,
2421 sizeof (struct GNUNET_HashCode),
2425 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2427 sizeof (struct GNUNET_HashCode),
2433 * Compare two peer identities.
2435 * @param h1 some peer identity
2436 * @param h2 some peer identity
2437 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2440 peer_id_cmp (const void *h1, const void *h2)
2442 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2447 * Create the sorted list of peers for the session,
2448 * add the local peer if not in the join message.
2450 * @param session session to initialize
2451 * @param join_msg join message with the list of peers participating at the end
2454 initialize_session_peer_list (struct ConsensusSession *session,
2455 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2457 const struct GNUNET_PeerIdentity *msg_peers
2458 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2459 int local_peer_in_list;
2461 session->num_peers = ntohl (join_msg->num_peers);
2463 /* Peers in the join message, may or may not include the local peer,
2464 Add it if it is missing. */
2465 local_peer_in_list = GNUNET_NO;
2466 for (unsigned int i = 0; i < session->num_peers; i++)
2468 if (0 == memcmp (&msg_peers[i],
2470 sizeof (struct GNUNET_PeerIdentity)))
2472 local_peer_in_list = GNUNET_YES;
2476 if (GNUNET_NO == local_peer_in_list)
2477 session->num_peers++;
2479 session->peers = GNUNET_new_array (session->num_peers,
2480 struct GNUNET_PeerIdentity);
2481 if (GNUNET_NO == local_peer_in_list)
2482 session->peers[session->num_peers - 1] = my_peer;
2484 GNUNET_memcpy (session->peers,
2486 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2487 qsort (session->peers,
2489 sizeof (struct GNUNET_PeerIdentity),
2494 static struct TaskEntry *
2495 lookup_task (struct ConsensusSession *session,
2496 struct TaskKey *key)
2498 struct GNUNET_HashCode hash;
2501 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2503 GNUNET_h2s (&hash));
2504 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2509 * Called when another peer wants to do a set operation with the
2512 * @param cls closure
2513 * @param other_peer the other peer
2514 * @param context_msg message with application specific information from
2516 * @param request request from the other peer, use GNUNET_SET_accept
2517 * to accept it, otherwise the request will be refused
2518 * Note that we don't use a return value here, as it is also
2519 * necessary to specify the set we want to do the operation with,
2520 * whith sometimes can be derived from the context message.
2521 * Also necessary to specify the timeout.
2524 set_listen_cb (void *cls,
2525 const struct GNUNET_PeerIdentity *other_peer,
2526 const struct GNUNET_MessageHeader *context_msg,
2527 struct GNUNET_SET_Request *request)
2529 struct ConsensusSession *session = cls;
2531 struct TaskEntry *task;
2532 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2534 if (NULL == context_msg)
2536 GNUNET_break_op (0);
2540 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2542 GNUNET_break_op (0);
2546 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2548 GNUNET_break_op (0);
2552 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2554 tk = ((struct TaskKey) {
2555 .kind = ntohs (cm->kind),
2556 .peer1 = ntohs (cm->peer1),
2557 .peer2 = ntohs (cm->peer2),
2558 .repetition = ntohs (cm->repetition),
2559 .leader = ntohs (cm->leader),
2562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2563 session->local_peer_idx, debug_str_task_key (&tk));
2565 task = lookup_task (session, &tk);
2569 GNUNET_break_op (0);
2573 if (GNUNET_YES == task->is_finished)
2575 GNUNET_break_op (0);
2579 if (task->key.peer2 != session->local_peer_idx)
2581 /* We're being asked, so we must be thne 2nd peer. */
2582 GNUNET_break_op (0);
2586 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2587 (task->key.peer2 == session->local_peer_idx)));
2589 struct GNUNET_SET_Option opts[] = {
2590 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2591 { GNUNET_SET_OPTION_END },
2594 task->cls.setop.op = GNUNET_SET_accept (request,
2595 GNUNET_SET_RESULT_SYMMETRIC,
2600 /* If the task hasn't been started yet,
2601 we wait for that until we commit. */
2603 if (GNUNET_YES == task->is_started)
2605 commit_set (session, task);
2612 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2613 struct TaskEntry *t)
2615 struct GNUNET_HashCode round_hash;
2618 GNUNET_assert (NULL != t->step);
2620 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2624 if (s->tasks_len == s->tasks_cap)
2626 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2627 GNUNET_array_grow (s->tasks,
2632 #ifdef GNUNET_EXTRA_LOGGING
2633 GNUNET_assert (NULL != s->debug_name);
2634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2635 debug_str_task_key (&t->key),
2639 s->tasks[s->tasks_len] = t;
2642 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2643 GNUNET_assert (GNUNET_OK ==
2644 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2645 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2650 install_step_timeouts (struct ConsensusSession *session)
2652 /* Given the fully constructed task graph
2653 with rounds for tasks, we can give the tasks timeouts. */
2655 // unsigned int max_round;
2657 /* XXX: implement! */
2663 * Arrange two peers in some canonical order.
2666 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2671 GNUNET_assert (*p1 < n);
2672 GNUNET_assert (*p2 < n);
2685 /* For uniformly random *p1, *p2,
2686 this condition is true with 50% chance */
2687 if (((b - a) + n) % n <= n / 2)
2701 * Record @a dep as a dependency of @a step.
2704 step_depend_on (struct Step *step, struct Step *dep)
2706 /* We're not checking for cyclic dependencies,
2707 but this is a cheap sanity check. */
2708 GNUNET_assert (step != dep);
2709 GNUNET_assert (NULL != step);
2710 GNUNET_assert (NULL != dep);
2711 GNUNET_assert (dep->round <= step->round);
2713 #ifdef GNUNET_EXTRA_LOGGING
2714 /* Make sure we have complete debugging information.
2715 Also checks that we don't screw up too badly
2716 constructing the task graph. */
2717 GNUNET_assert (NULL != step->debug_name);
2718 GNUNET_assert (NULL != dep->debug_name);
2719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2720 "Making step `%s' depend on `%s'\n",
2725 if (dep->subordinates_cap == dep->subordinates_len)
2727 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2728 GNUNET_array_grow (dep->subordinates,
2729 dep->subordinates_cap,
2733 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2735 dep->subordinates[dep->subordinates_len] = step;
2736 dep->subordinates_len++;
2738 step->pending_prereq++;
2742 static struct Step *
2743 create_step (struct ConsensusSession *session, int round, int early_finishable)
2746 step = GNUNET_new (struct Step);
2747 step->session = session;
2748 step->round = round;
2749 step->early_finishable = early_finishable;
2750 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2751 session->steps_tail,
2758 * Construct the task graph for a single
2762 construct_task_graph_gradecast (struct ConsensusSession *session,
2765 struct Step *step_before,
2766 struct Step *step_after)
2768 uint16_t n = session->num_peers;
2769 uint16_t me = session->local_peer_idx;
2774 /* The task we're currently setting up. */
2775 struct TaskEntry task;
2778 struct Step *prev_step;
2784 round = step_before->round + 1;
2786 /* gcast step 1: leader disseminates */
2788 step = create_step (session, round, GNUNET_YES);
2790 #ifdef GNUNET_EXTRA_LOGGING
2791 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2793 step_depend_on (step, step_before);
2797 for (k = 0; k < n; k++)
2803 arrange_peers (&p1, &p2, n);
2804 task = ((struct TaskEntry) {
2806 .start = task_start_reconcile,
2807 .cancel = task_cancel_reconcile,
2808 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2810 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2811 put_task (session->taskmap, &task);
2813 /* We run this task to make sure that the leader
2814 has the stored the SET_KIND_LEADER set of himself,
2815 so he can participate in the rest of the gradecast
2816 without the code having to handle any special cases. */
2817 task = ((struct TaskEntry) {
2819 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2820 .start = task_start_reconcile,
2821 .cancel = task_cancel_reconcile,
2823 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2824 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2825 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2826 put_task (session->taskmap, &task);
2832 arrange_peers (&p1, &p2, n);
2833 task = ((struct TaskEntry) {
2835 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2836 .start = task_start_reconcile,
2837 .cancel = task_cancel_reconcile,
2839 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2840 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2841 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2842 put_task (session->taskmap, &task);
2845 /* gcast phase 2: echo */
2848 step = create_step (session, round, GNUNET_YES);
2849 #ifdef GNUNET_EXTRA_LOGGING
2850 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2852 step_depend_on (step, prev_step);
2854 for (k = 0; k < n; k++)
2858 arrange_peers (&p1, &p2, n);
2859 task = ((struct TaskEntry) {
2861 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2862 .start = task_start_reconcile,
2863 .cancel = task_cancel_reconcile,
2865 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2866 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2867 put_task (session->taskmap, &task);
2871 /* Same round, since step only has local tasks */
2872 step = create_step (session, round, GNUNET_YES);
2873 #ifdef GNUNET_EXTRA_LOGGING
2874 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2876 step_depend_on (step, prev_step);
2878 arrange_peers (&p1, &p2, n);
2879 task = ((struct TaskEntry) {
2880 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2882 .start = task_start_eval_echo
2884 put_task (session->taskmap, &task);
2888 step = create_step (session, round, GNUNET_YES);
2889 #ifdef GNUNET_EXTRA_LOGGING
2890 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2892 step_depend_on (step, prev_step);
2894 /* gcast phase 3: confirmation and grading */
2895 for (k = 0; k < n; k++)
2899 arrange_peers (&p1, &p2, n);
2900 task = ((struct TaskEntry) {
2902 .start = task_start_reconcile,
2903 .cancel = task_cancel_reconcile,
2904 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2906 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2907 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2908 /* If there was at least one element in the echo round that was
2909 contested (i.e. it had no n-t majority), then we let the other peers
2910 know, and other peers let us know. The contested flag for each peer is
2911 stored in the rfn. */
2912 task.cls.setop.transceive_contested = GNUNET_YES;
2913 put_task (session->taskmap, &task);
2917 /* Same round, since step only has local tasks */
2918 step = create_step (session, round, GNUNET_YES);
2919 #ifdef GNUNET_EXTRA_LOGGING
2920 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2922 step_depend_on (step, prev_step);
2924 task = ((struct TaskEntry) {
2926 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2927 .start = task_start_grade,
2929 put_task (session->taskmap, &task);
2931 step_depend_on (step_after, step);
2936 construct_task_graph (struct ConsensusSession *session)
2938 uint16_t n = session->num_peers;
2941 uint16_t me = session->local_peer_idx;
2943 /* The task we're currently setting up. */
2944 struct TaskEntry task;
2946 /* Current leader */
2950 struct Step *prev_step;
2952 unsigned int round = 0;
2956 // XXX: introduce first step,
2957 // where we wait for all insert acks
2958 // from the set service
2960 /* faster but brittle all-to-all */
2962 // XXX: Not implemented yet
2964 /* all-to-all step */
2966 step = create_step (session, round, GNUNET_NO);
2968 #ifdef GNUNET_EXTRA_LOGGING
2969 step->debug_name = GNUNET_strdup ("all to all");
2972 for (i = 0; i < n; i++)
2979 arrange_peers (&p1, &p2, n);
2980 task = ((struct TaskEntry) {
2981 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2983 .start = task_start_reconcile,
2984 .cancel = task_cancel_reconcile,
2986 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2987 task.cls.setop.output_set = task.cls.setop.input_set;
2988 task.cls.setop.do_not_remove = GNUNET_YES;
2989 put_task (session->taskmap, &task);
2994 step = create_step (session, round, GNUNET_NO);;
2995 #ifdef GNUNET_EXTRA_LOGGING
2996 step->debug_name = GNUNET_strdup ("all to all 2");
2998 step_depend_on (step, prev_step);
3001 for (i = 0; i < n; i++)
3008 arrange_peers (&p1, &p2, n);
3009 task = ((struct TaskEntry) {
3010 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3012 .start = task_start_reconcile,
3013 .cancel = task_cancel_reconcile,
3015 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3016 task.cls.setop.output_set = task.cls.setop.input_set;
3017 task.cls.setop.do_not_remove = GNUNET_YES;
3018 put_task (session->taskmap, &task);
3028 /* Byzantine union */
3030 /* sequential repetitions of the gradecasts */
3031 for (i = 0; i < t + 1; i++)
3033 struct Step *step_rep_start;
3034 struct Step *step_rep_end;
3036 /* Every repetition is in a separate round. */
3037 step_rep_start = create_step (session, round, GNUNET_YES);
3038 #ifdef GNUNET_EXTRA_LOGGING
3039 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3042 step_depend_on (step_rep_start, prev_step);
3044 /* gradecast has three rounds */
3046 step_rep_end = create_step (session, round, GNUNET_YES);
3047 #ifdef GNUNET_EXTRA_LOGGING
3048 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3051 /* parallel gradecasts */
3052 for (lead = 0; lead < n; lead++)
3053 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3055 task = ((struct TaskEntry) {
3056 .step = step_rep_end,
3057 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3058 .start = task_start_apply_round,
3060 put_task (session->taskmap, &task);
3062 prev_step = step_rep_end;
3065 /* There is no next gradecast round, thus the final
3066 start step is the overall end step of the gradecasts */
3068 step = create_step (session, round, GNUNET_NO);
3069 #ifdef GNUNET_EXTRA_LOGGING
3070 GNUNET_asprintf (&step->debug_name, "finish");
3072 step_depend_on (step, prev_step);
3074 task = ((struct TaskEntry) {
3076 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3077 .start = task_start_finish,
3079 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3081 put_task (session->taskmap, &task);
3087 * Check join message.
3089 * @param cls session of client that sent the message
3090 * @param m message sent by the client
3091 * @return #GNUNET_OK if @a m is well-formed
3094 check_client_join (void *cls,
3095 const struct GNUNET_CONSENSUS_JoinMessage *m)
3097 uint32_t listed_peers = ntohl (m->num_peers);
3099 if ( (ntohs (m->header.size) - sizeof (*m)) !=
3100 listed_peers * sizeof (struct GNUNET_PeerIdentity))
3103 return GNUNET_SYSERR;
3110 * Called when a client wants to join a consensus session.
3112 * @param cls session of client that sent the message
3113 * @param m message sent by the client
3116 handle_client_join (void *cls,
3117 const struct GNUNET_CONSENSUS_JoinMessage *m)
3119 struct ConsensusSession *session = cls;
3120 struct ConsensusSession *other_session;
3122 initialize_session_peer_list (session,
3124 compute_global_id (session,
3127 /* Check if some local client already owns the session.
3128 It is only legal to have a session with an existing global id
3129 if all other sessions with this global id are finished.*/
3130 for (other_session = sessions_head;
3131 NULL != other_session;
3132 other_session = other_session->next)
3134 if ( (other_session != session) &&
3135 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3136 &other_session->global_id)) )
3140 session->conclude_deadline
3141 = GNUNET_TIME_absolute_ntoh (m->deadline);
3142 session->conclude_start
3143 = GNUNET_TIME_absolute_ntoh (m->start);
3144 session->local_peer_idx = get_peer_idx (&my_peer,
3146 GNUNET_assert (-1 != session->local_peer_idx);
3148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3149 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3150 GNUNET_h2s (&m->session_id),
3152 session->local_peer_idx,
3153 GNUNET_STRINGS_relative_time_to_string
3154 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3155 session->conclude_deadline),
3158 session->set_listener
3159 = GNUNET_SET_listen (cfg,
3160 GNUNET_SET_OPERATION_UNION,
3161 &session->global_id,
3165 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3167 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3169 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3171 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3175 struct SetEntry *client_set;
3177 client_set = GNUNET_new (struct SetEntry);
3178 client_set->h = GNUNET_SET_create (cfg,
3179 GNUNET_SET_OPERATION_UNION);
3180 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3181 sh->h = client_set->h;
3182 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3183 session->set_handles_tail,
3185 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3190 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3193 /* Just construct the task graph,
3194 but don't run anything until the client calls conclude. */
3195 construct_task_graph (session);
3196 GNUNET_SERVICE_client_continue (session->client);
3201 client_insert_done (void *cls)
3208 * Called when a client performs an insert operation.
3210 * @param cls client handle
3211 * @param msg message sent by the client
3212 * @return #GNUNET_OK (always well-formed)
3215 check_client_insert (void *cls,
3216 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3223 * Called when a client performs an insert operation.
3225 * @param cls client handle
3226 * @param msg message sent by the client
3229 handle_client_insert (void *cls,
3230 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3232 struct ConsensusSession *session = cls;
3233 ssize_t element_size;
3234 struct GNUNET_SET_Handle *initial_set;
3235 struct ConsensusElement *ce;
3237 if (GNUNET_YES == session->conclude_started)
3240 GNUNET_SERVICE_client_drop (session->client);
3244 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3245 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3246 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3247 ce->payload_type = msg->element_type;
3249 struct GNUNET_SET_Element element = {
3250 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3251 .size = sizeof (struct ConsensusElement) + element_size,
3256 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3257 struct SetEntry *entry;
3259 entry = lookup_set (session,
3261 GNUNET_assert (NULL != entry);
3262 initial_set = entry->h;
3265 session->num_client_insert_pending++;
3266 GNUNET_SET_add_element (initial_set,
3268 &client_insert_done,
3271 #ifdef GNUNET_EXTRA_LOGGING
3273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3274 "P%u: element %s added\n",
3275 session->local_peer_idx,
3276 debug_str_element (&element));
3280 GNUNET_SERVICE_client_continue (session->client);
3285 * Called when a client performs the conclude operation.
3287 * @param cls client handle
3288 * @param message message sent by the client
3291 handle_client_conclude (void *cls,
3292 const struct GNUNET_MessageHeader *message)
3294 struct ConsensusSession *session = cls;
3296 if (GNUNET_YES == session->conclude_started)
3298 /* conclude started twice */
3300 GNUNET_SERVICE_client_drop (session->client);
3303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3304 "conclude requested\n");
3305 session->conclude_started = GNUNET_YES;
3306 install_step_timeouts (session);
3307 run_ready_steps (session);
3308 GNUNET_SERVICE_client_continue (session->client);
3313 * Called to clean up, after a shutdown has been requested.
3315 * @param cls closure
3318 shutdown_task (void *cls)
3320 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3322 GNUNET_STATISTICS_destroy (statistics,
3329 * Start processing consensus requests.
3331 * @param cls closure
3332 * @param c configuration to use
3333 * @param service the initialized service
3337 const struct GNUNET_CONFIGURATION_Handle *c,
3338 struct GNUNET_SERVICE_Handle *service)
3342 GNUNET_CRYPTO_get_peer_identity (cfg,
3345 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3346 "Could not retrieve host identity\n");
3347 GNUNET_SCHEDULER_shutdown ();
3350 statistics = GNUNET_STATISTICS_create ("consensus",
3352 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3358 * Callback called when a client connects to the service.
3360 * @param cls closure for the service
3361 * @param c the new client that connected to the service
3362 * @param mq the message queue used to send messages to the client
3366 client_connect_cb (void *cls,
3367 struct GNUNET_SERVICE_Client *c,
3368 struct GNUNET_MQ_Handle *mq)
3370 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3372 session->client = c;
3373 session->client_mq = mq;
3374 GNUNET_CONTAINER_DLL_insert (sessions_head,
3382 * Callback called when a client disconnected from the service
3384 * @param cls closure for the service
3385 * @param c the client that disconnected
3386 * @param internal_cls should be equal to @a c
3389 client_disconnect_cb (void *cls,
3390 struct GNUNET_SERVICE_Client *c,
3393 struct ConsensusSession *session = internal_cls;
3395 if (NULL != session->set_listener)
3397 GNUNET_SET_listen_cancel (session->set_listener);
3398 session->set_listener = NULL;
3400 GNUNET_CONTAINER_DLL_remove (sessions_head,
3404 while (session->set_handles_head)
3406 struct SetHandle *sh = session->set_handles_head;
3407 session->set_handles_head = sh->next;
3408 GNUNET_SET_destroy (sh->h);
3411 GNUNET_free (session);
3416 * Define "main" method using service macro.
3420 GNUNET_SERVICE_OPTION_NONE,
3423 &client_disconnect_cb,
3425 GNUNET_MQ_hd_fixed_size (client_conclude,
3426 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3427 struct GNUNET_MessageHeader,
3429 GNUNET_MQ_hd_var_size (client_insert,
3430 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3431 struct GNUNET_CONSENSUS_ElementMessage,
3433 GNUNET_MQ_hd_var_size (client_join,
3434 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3435 struct GNUNET_CONSENSUS_JoinMessage,
3437 GNUNET_MQ_handler_end ());
3439 /* end of gnunet-service-consensus.c */