2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "gnunet_consensus_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
42 * Vote that nothing should change.
43 * This option is never voted explicitly.
47 * Vote that an element should be added.
51 * Vote that an element should be removed.
57 enum EarlyStoppingPhase
59 EARLY_STOPPING_NONE = 0,
60 EARLY_STOPPING_ONE_MORE = 1,
61 EARLY_STOPPING_DONE = 2,
65 GNUNET_NETWORK_STRUCT_BEGIN
68 * Tuple of integers that together
69 * identify a task uniquely.
73 * A value from 'enum PhaseKind'.
75 uint16_t kind GNUNET_PACKED;
78 * Number of the first peer
81 int16_t peer1 GNUNET_PACKED;
84 * Number of the second peer in canonical order.
86 int16_t peer2 GNUNET_PACKED;
89 * Repetition of the gradecast phase.
91 int16_t repetition GNUNET_PACKED;
94 * Leader in the gradecast phase.
96 * Can be different from both peer1 and peer2.
98 int16_t leader GNUNET_PACKED;
105 int set_kind GNUNET_PACKED;
106 int k1 GNUNET_PACKED;
107 int k2 GNUNET_PACKED;
114 struct GNUNET_SET_Handle *h;
116 * GNUNET_YES if the set resulted
117 * from applying a referendum with contested
126 int diff_kind GNUNET_PACKED;
127 int k1 GNUNET_PACKED;
128 int k2 GNUNET_PACKED;
133 int rfn_kind GNUNET_PACKED;
134 int k1 GNUNET_PACKED;
135 int k2 GNUNET_PACKED;
139 GNUNET_NETWORK_STRUCT_END
143 PHASE_KIND_ALL_TO_ALL,
144 PHASE_KIND_ALL_TO_ALL_2,
145 PHASE_KIND_GRADECAST_LEADER,
146 PHASE_KIND_GRADECAST_ECHO,
147 PHASE_KIND_GRADECAST_ECHO_GRADE,
148 PHASE_KIND_GRADECAST_CONFIRM,
149 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
151 * Apply a repetition of the all-to-all
152 * gradecast to the current set.
154 PHASE_KIND_APPLY_REP,
164 * Last result set from a gradecast
166 SET_KIND_LAST_GRADECAST,
167 SET_KIND_LEADER_PROPOSAL,
168 SET_KIND_ECHO_RESULT,
174 DIFF_KIND_LEADER_PROPOSAL,
175 DIFF_KIND_LEADER_CONSENSUS,
176 DIFF_KIND_GRADECAST_RESULT,
184 RFN_KIND_GRADECAST_RESULT
190 struct SetKey input_set;
192 struct SetKey output_set;
193 struct RfnKey output_rfn;
194 struct DiffKey output_diff;
198 int transceive_contested;
200 struct GNUNET_SET_OperationHandle *op;
206 struct SetKey input_set;
210 * Closure for both @a start_task
211 * and @a cancel_task.
215 struct SetOpCls setop;
216 struct FinishCls finish;
221 typedef void (*TaskFunc) (struct TaskEntry *task);
224 * Node in the consensus task graph.
239 union TaskFuncCls cls;
246 * All steps of one session are in a
247 * linked list for easier deallocation.
252 * All steps of one session are in a
253 * linked list for easier deallocation.
257 struct ConsensusSession *session;
260 * Tasks that this step is composed of.
262 struct TaskEntry **tasks;
263 unsigned int tasks_len;
264 unsigned int tasks_cap;
266 unsigned int finished_tasks;
269 * Tasks that have this task as dependency.
271 * We store pointers to subordinates rather
272 * than to prerequisites since it makes
273 * tracking the readiness of a task easier.
275 struct Step **subordinates;
276 unsigned int subordinates_len;
277 unsigned int subordinates_cap;
280 * Counter for the prerequisites of
283 size_t pending_prereq;
286 * Task that will run this step despite
287 * any pending prerequisites.
289 struct GNUNET_SCHEDULER_Task *timeout_task;
291 unsigned int is_running;
293 unsigned int is_finished;
296 * Synchrony round of the task.
297 * Determines the deadline for the task.
302 * Human-readable name for
303 * the task, used for debugging.
308 * When we're doing an early finish, how should this step be
310 * If GNUNET_YES, the step will be marked as finished
311 * without actually running its tasks.
312 * Otherwise, the step will still be run even after
315 * Note that a task may never be finished early if
316 * it is already running.
318 int early_finishable;
322 struct RfnElementInfo
324 const struct GNUNET_SET_Element *element;
327 * GNUNET_YES if the peer votes for the proposal.
332 * Proposal for this element,
333 * can only be VOTE_ADD or VOTE_REMOVE.
335 enum ReferendumVote proposal;
339 struct ReferendumEntry
344 * Elements where there is at least one proposed change.
346 * Maps the hash of the GNUNET_SET_Element
347 * to 'struct RfnElementInfo'.
349 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
351 unsigned int num_peers;
354 * Stores, for every peer in the session,
355 * whether the peer finished the whole referendum.
357 * Votes from peers are only counted if they're
358 * marked as commited (#GNUNET_YES) in the referendum.
360 * Otherwise (#GNUNET_NO), the requested changes are
361 * not counted for majority votes or thresholds.
367 * Contestation state of the peer. If a peer is contested, the values it
368 * contributed are still counted for applying changes, but the grading is
375 struct DiffElementInfo
377 const struct GNUNET_SET_Element *element;
380 * Positive weight for 'add', negative
381 * weights for 'remove'.
393 struct GNUNET_CONTAINER_MultiHashMap *changes;
398 struct SetHandle *prev;
399 struct SetHandle *next;
401 struct GNUNET_SET_Handle *h;
407 * A consensus session consists of one local client and the remote authorities.
409 struct ConsensusSession
412 * Consensus sessions are kept in a DLL.
414 struct ConsensusSession *next;
417 * Consensus sessions are kept in a DLL.
419 struct ConsensusSession *prev;
421 unsigned int num_client_insert_pending;
423 struct GNUNET_CONTAINER_MultiHashMap *setmap;
424 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
425 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
428 * Array of peers with length 'num_peers'.
430 int *peers_blacklisted;
433 * Mapping from (hashed) TaskKey to TaskEntry.
435 * We map the application_id for a round to the task that should be
436 * executed, so we don't have to go through all task whenever we get
437 * an incoming set op request.
439 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
441 struct Step *steps_head;
442 struct Step *steps_tail;
444 int conclude_started;
449 * Global consensus identification, computed
450 * from the session id and participating authorities.
452 struct GNUNET_HashCode global_id;
455 * Client that inhabits the session
457 struct GNUNET_SERVICE_Client *client;
460 * Queued messages to the client.
462 struct GNUNET_MQ_Handle *client_mq;
465 * Time when the conclusion of the consensus should begin.
467 struct GNUNET_TIME_Absolute conclude_start;
470 * Timeout for all rounds together, single rounds will schedule a timeout task
471 * with a fraction of the conclude timeout.
472 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
474 struct GNUNET_TIME_Absolute conclude_deadline;
476 struct GNUNET_PeerIdentity *peers;
479 * Number of other peers in the consensus.
481 unsigned int num_peers;
484 * Index of the local peer in the peers array
486 unsigned int local_peer_idx;
489 * Listener for requests from other peers.
490 * Uses the session's global id as app id.
492 struct GNUNET_SET_ListenHandle *set_listener;
495 * State of our early stopping scheme.
500 * Our set size from the first round.
504 uint64_t *first_sizes_received;
507 * Bounded Eppstein lower bound.
509 uint64_t lower_bound;
511 struct SetHandle *set_handles_head;
512 struct SetHandle *set_handles_tail;
516 * Linked list of sessions this peer participates in.
518 static struct ConsensusSession *sessions_head;
521 * Linked list of sessions this peer participates in.
523 static struct ConsensusSession *sessions_tail;
526 * Configuration of the consensus service.
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
531 * Peer that runs this service.
533 static struct GNUNET_PeerIdentity my_peer;
538 struct GNUNET_STATISTICS_Handle *statistics;
542 finish_task (struct TaskEntry *task);
546 run_ready_steps (struct ConsensusSession *session);
550 phasename (uint16_t phase)
554 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556 case PHASE_KIND_FINISH: return "FINISH";
557 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563 default: return "(unknown)";
569 setname (uint16_t kind)
573 case SET_KIND_CURRENT: return "CURRENT";
574 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575 case SET_KIND_NONE: return "NONE";
576 default: return "(unknown)";
581 rfnname (uint16_t kind)
585 case RFN_KIND_NONE: return "NONE";
586 case RFN_KIND_ECHO: return "ECHO";
587 case RFN_KIND_CONFIRM: return "CONFIRM";
588 default: return "(unknown)";
593 diffname (uint16_t kind)
597 case DIFF_KIND_NONE: return "NONE";
598 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601 default: return "(unknown)";
605 #ifdef GNUNET_EXTRA_LOGGING
609 debug_str_element (const struct GNUNET_SET_Element *el)
611 struct GNUNET_HashCode hash;
613 GNUNET_SET_element_hash (el, &hash);
615 return GNUNET_h2s (&hash);
619 debug_str_task_key (struct TaskKey *tk)
621 static char buf[256];
623 snprintf (buf, sizeof (buf),
624 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625 phasename (tk->kind), tk->peer1, tk->peer2,
626 tk->leader, tk->repetition);
632 debug_str_diff_key (struct DiffKey *dk)
634 static char buf[256];
636 snprintf (buf, sizeof (buf),
637 "DiffKey kind=%s, k1=%d, k2=%d",
638 diffname (dk->diff_kind), dk->k1, dk->k2);
644 debug_str_set_key (const struct SetKey *sk)
646 static char buf[256];
648 snprintf (buf, sizeof (buf),
649 "SetKey kind=%s, k1=%d, k2=%d",
650 setname (sk->set_kind), sk->k1, sk->k2);
657 debug_str_rfn_key (const struct RfnKey *rk)
659 static char buf[256];
661 snprintf (buf, sizeof (buf),
662 "RfnKey kind=%s, k1=%d, k2=%d",
663 rfnname (rk->rfn_kind), rk->k1, rk->k2);
668 #endif /* GNUNET_EXTRA_LOGGING */
672 * Send the final result set of the consensus to the client, element by
676 * @param element the current element, NULL if all elements have been
678 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
681 send_to_client_iter (void *cls,
682 const struct GNUNET_SET_Element *element)
684 struct TaskEntry *task = (struct TaskEntry *) cls;
685 struct ConsensusSession *session = task->step->session;
686 struct GNUNET_MQ_Envelope *ev;
690 struct GNUNET_CONSENSUS_ElementMessage *m;
691 const struct ConsensusElement *ce;
693 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
696 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
701 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702 "P%d: sending element %s to client\n",
703 session->local_peer_idx,
704 debug_str_element (element));
706 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
707 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
708 m->element_type = ce->payload_type;
709 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710 GNUNET_MQ_send (session->client_mq, ev);
714 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
715 "P%d: finished iterating elements for client\n",
716 session->local_peer_idx);
717 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
718 GNUNET_MQ_send (session->client_mq, ev);
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
727 struct GNUNET_HashCode hash;
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "P%u: looking up set {%s}\n",
731 session->local_peer_idx,
732 debug_str_set_key (key));
734 GNUNET_assert (SET_KIND_NONE != key->set_kind);
735 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
743 struct GNUNET_HashCode hash;
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "P%u: looking up diff {%s}\n",
747 session->local_peer_idx,
748 debug_str_diff_key (key));
750 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
751 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
759 struct GNUNET_HashCode hash;
761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762 "P%u: looking up rfn {%s}\n",
763 session->local_peer_idx,
764 debug_str_rfn_key (key));
766 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
767 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
773 diff_insert (struct DiffEntry *diff,
775 const struct GNUNET_SET_Element *element)
777 struct DiffElementInfo *di;
778 struct GNUNET_HashCode hash;
780 GNUNET_assert ( (1 == weight) || (-1 == weight));
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "diff_insert with element size %u\n",
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787 "hashing element\n");
789 GNUNET_SET_element_hash (element, &hash);
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
798 di = GNUNET_new (struct DiffElementInfo);
799 di->element = GNUNET_SET_element_dup (element);
800 GNUNET_assert (GNUNET_OK ==
801 GNUNET_CONTAINER_multihashmap_put (diff->changes,
803 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
811 rfn_commit (struct ReferendumEntry *rfn,
812 uint16_t commit_peer)
814 GNUNET_assert (commit_peer < rfn->num_peers);
816 rfn->peer_commited[commit_peer] = GNUNET_YES;
821 rfn_contest (struct ReferendumEntry *rfn,
822 uint16_t contested_peer)
824 GNUNET_assert (contested_peer < rfn->num_peers);
826 rfn->peer_contested[contested_peer] = GNUNET_YES;
831 rfn_noncontested (struct ReferendumEntry *rfn)
837 for (i = 0; i < rfn->num_peers; i++)
838 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
846 rfn_vote (struct ReferendumEntry *rfn,
847 uint16_t voting_peer,
848 enum ReferendumVote vote,
849 const struct GNUNET_SET_Element *element)
851 struct RfnElementInfo *ri;
852 struct GNUNET_HashCode hash;
854 GNUNET_assert (voting_peer < rfn->num_peers);
856 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857 since VOTE_KEEP is implicit in not voting. */
858 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
860 GNUNET_SET_element_hash (element, &hash);
861 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
865 ri = GNUNET_new (struct RfnElementInfo);
866 ri->element = GNUNET_SET_element_dup (element);
867 ri->votes = GNUNET_new_array (rfn->num_peers, int);
868 GNUNET_assert (GNUNET_OK ==
869 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
871 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
874 ri->votes[voting_peer] = GNUNET_YES;
880 task_other_peer (struct TaskEntry *task)
882 uint16_t me = task->step->session->local_peer_idx;
883 if (task->key.peer1 == me)
884 return task->key.peer2;
885 return task->key.peer1;
890 cmp_uint64_t (const void *pa, const void *pb)
892 uint64_t a = *(uint64_t *) pa;
893 uint64_t b = *(uint64_t *) pb;
904 * Callback for set operation results. Called for each element
908 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
909 * @param current_size current set size
910 * @param status see enum GNUNET_SET_Status
913 set_result_cb (void *cls,
914 const struct GNUNET_SET_Element *element,
915 uint64_t current_size,
916 enum GNUNET_SET_Status status)
918 struct TaskEntry *task = cls;
919 struct ConsensusSession *session = task->step->session;
920 struct SetEntry *output_set = NULL;
921 struct DiffEntry *output_diff = NULL;
922 struct ReferendumEntry *output_rfn = NULL;
923 unsigned int other_idx;
924 struct SetOpCls *setop;
925 const struct ConsensusElement *consensus_element = NULL;
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930 "P%u: got element of type %u, status %u\n",
931 session->local_peer_idx,
932 (unsigned) element->element_type,
934 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
935 consensus_element = element->data;
938 setop = &task->cls.setop;
941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942 "P%u: got set result for {%s}, status %u\n",
943 session->local_peer_idx,
944 debug_str_task_key (&task->key),
947 if (GNUNET_NO == task->is_started)
953 if (GNUNET_YES == task->is_finished)
959 other_idx = task_other_peer (task);
961 if (SET_KIND_NONE != setop->output_set.set_kind)
963 output_set = lookup_set (session, &setop->output_set);
964 GNUNET_assert (NULL != output_set);
967 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
969 output_diff = lookup_diff (session, &setop->output_diff);
970 GNUNET_assert (NULL != output_diff);
973 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
975 output_rfn = lookup_rfn (session, &setop->output_rfn);
976 GNUNET_assert (NULL != output_rfn);
979 if (GNUNET_YES == session->peers_blacklisted[other_idx])
981 /* Peer might have been blacklisted
982 by a gradecast running in parallel, ignore elements from now */
983 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
985 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
989 if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
991 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
992 "P%u: got some marker\n",
993 session->local_peer_idx);
994 if ( (GNUNET_YES == setop->transceive_contested) &&
995 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
997 GNUNET_assert (NULL != output_rfn);
998 rfn_contest (output_rfn, task_other_peer (task));
1002 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1005 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1006 "P%u: got size marker\n",
1007 session->local_peer_idx);
1010 struct ConsensusSizeElement *cse = (void *) consensus_element;
1012 if (cse->sender_index == other_idx)
1014 if (NULL == session->first_sizes_received)
1015 session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1018 uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019 qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020 session->lower_bound = copy[session->num_peers / 3 + 1];
1021 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1022 "P%u: lower bound %llu\n",
1023 session->local_peer_idx,
1024 (long long) session->lower_bound);
1034 case GNUNET_SET_STATUS_ADD_LOCAL:
1035 GNUNET_assert (NULL != consensus_element);
1036 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037 "Adding element in Task {%s}\n",
1038 debug_str_task_key (&task->key));
1039 if (NULL != output_set)
1041 // FIXME: record pending adds, use callback
1042 GNUNET_SET_add_element (output_set->h,
1046 #ifdef GNUNET_EXTRA_LOGGING
1047 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1048 "P%u: adding element %s into set {%s} of task {%s}\n",
1049 session->local_peer_idx,
1050 debug_str_element (element),
1051 debug_str_set_key (&setop->output_set),
1052 debug_str_task_key (&task->key));
1055 if (NULL != output_diff)
1057 diff_insert (output_diff, 1, element);
1058 #ifdef GNUNET_EXTRA_LOGGING
1059 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1060 "P%u: adding element %s into diff {%s} of task {%s}\n",
1061 session->local_peer_idx,
1062 debug_str_element (element),
1063 debug_str_diff_key (&setop->output_diff),
1064 debug_str_task_key (&task->key));
1067 if (NULL != output_rfn)
1069 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1070 #ifdef GNUNET_EXTRA_LOGGING
1071 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1072 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1073 session->local_peer_idx,
1074 debug_str_element (element),
1075 debug_str_rfn_key (&setop->output_rfn),
1076 debug_str_task_key (&task->key));
1079 // XXX: add result to structures in task
1081 case GNUNET_SET_STATUS_ADD_REMOTE:
1082 GNUNET_assert (NULL != consensus_element);
1083 if (GNUNET_YES == setop->do_not_remove)
1085 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1087 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088 "Removing element in Task {%s}\n",
1089 debug_str_task_key (&task->key));
1090 if (NULL != output_set)
1092 // FIXME: record pending adds, use callback
1093 GNUNET_SET_remove_element (output_set->h,
1097 #ifdef GNUNET_EXTRA_LOGGING
1098 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1099 "P%u: removing element %s from set {%s} of task {%s}\n",
1100 session->local_peer_idx,
1101 debug_str_element (element),
1102 debug_str_set_key (&setop->output_set),
1103 debug_str_task_key (&task->key));
1106 if (NULL != output_diff)
1108 diff_insert (output_diff, -1, element);
1109 #ifdef GNUNET_EXTRA_LOGGING
1110 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1111 "P%u: removing element %s from diff {%s} of task {%s}\n",
1112 session->local_peer_idx,
1113 debug_str_element (element),
1114 debug_str_diff_key (&setop->output_diff),
1115 debug_str_task_key (&task->key));
1118 if (NULL != output_rfn)
1120 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1121 #ifdef GNUNET_EXTRA_LOGGING
1122 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1123 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1124 session->local_peer_idx,
1125 debug_str_element (element),
1126 debug_str_rfn_key (&setop->output_rfn),
1127 debug_str_task_key (&task->key));
1131 case GNUNET_SET_STATUS_DONE:
1132 // XXX: check first if any changes to the underlying
1133 // set are still pending
1134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135 "Finishing setop in Task {%s}\n",
1136 debug_str_task_key (&task->key));
1137 if (NULL != output_rfn)
1139 rfn_commit (output_rfn, task_other_peer (task));
1141 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1143 session->first_size = current_size;
1147 case GNUNET_SET_STATUS_FAILURE:
1149 GNUNET_break_op (0);
1170 enum EvilnessSubType
1173 EVILNESS_SUB_REPLACEMENT,
1174 EVILNESS_SUB_NO_REPLACEMENT,
1179 enum EvilnessType type;
1180 enum EvilnessSubType subtype;
1186 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1188 if (0 == strcmp ("replace", evil_subtype_str))
1190 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1192 else if (0 == strcmp ("noreplace", evil_subtype_str))
1194 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1198 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1199 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1201 return GNUNET_SYSERR;
1208 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1212 char *evil_type_str = NULL;
1213 char *evil_subtype_str = NULL;
1215 GNUNET_assert (NULL != evil);
1217 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220 "P%u: no evilness\n",
1221 session->local_peer_idx);
1222 evil->type = EVILNESS_NONE;
1225 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1226 "P%u: got evilness spec\n",
1227 session->local_peer_idx);
1229 for (field = strtok (evil_spec, "/");
1231 field = strtok (NULL, "/"))
1233 unsigned int peer_num;
1234 unsigned int evil_num;
1237 evil_type_str = NULL;
1238 evil_subtype_str = NULL;
1240 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1244 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1245 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1251 GNUNET_assert (NULL != evil_type_str);
1252 GNUNET_assert (NULL != evil_subtype_str);
1254 if (peer_num == session->local_peer_idx)
1256 if (0 == strcmp ("slack", evil_type_str))
1258 evil->type = EVILNESS_SLACK;
1260 if (0 == strcmp ("slack-a2a", evil_type_str))
1262 evil->type = EVILNESS_SLACK_A2A;
1264 else if (0 == strcmp ("cram-all", evil_type_str))
1266 evil->type = EVILNESS_CRAM_ALL;
1267 evil->num = evil_num;
1268 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1271 else if (0 == strcmp ("cram-lead", evil_type_str))
1273 evil->type = EVILNESS_CRAM_LEAD;
1274 evil->num = evil_num;
1275 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1278 else if (0 == strcmp ("cram-echo", evil_type_str))
1280 evil->type = EVILNESS_CRAM_ECHO;
1281 evil->num = evil_num;
1282 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1287 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1288 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1294 /* No GNUNET_free since memory was allocated by libc */
1295 free (evil_type_str);
1296 evil_type_str = NULL;
1297 evil_subtype_str = NULL;
1300 evil->type = EVILNESS_NONE;
1302 GNUNET_free (evil_spec);
1303 /* no GNUNET_free_non_null since it wasn't
1304 * allocated with GNUNET_malloc */
1305 if (NULL != evil_type_str)
1306 free (evil_type_str);
1307 if (NULL != evil_subtype_str)
1308 free (evil_subtype_str);
1315 * Commit the appropriate set for a
1319 commit_set (struct ConsensusSession *session,
1320 struct TaskEntry *task)
1322 struct SetEntry *set;
1323 struct SetOpCls *setop = &task->cls.setop;
1325 GNUNET_assert (NULL != setop->op);
1326 set = lookup_set (session, &setop->input_set);
1327 GNUNET_assert (NULL != set);
1329 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1331 struct GNUNET_SET_Element element;
1332 struct ConsensusElement ce = { 0 };
1333 ce.marker = CONSENSUS_MARKER_CONTESTED;
1335 element.size = sizeof (struct ConsensusElement);
1336 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1337 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1340 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1342 struct GNUNET_SET_Element element;
1343 struct ConsensusSizeElement cse = { 0 };
1344 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
1345 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1346 cse.size = GNUNET_htonll (session->first_size);
1347 cse.sender_index = session->local_peer_idx;
1348 element.data = &cse;
1349 element.size = sizeof (struct ConsensusSizeElement);
1350 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1351 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1357 struct Evilness evil;
1359 get_evilness (session, &evil);
1360 if (EVILNESS_NONE != evil.type)
1362 /* Useful for evaluation */
1363 GNUNET_STATISTICS_set (statistics,
1370 case EVILNESS_CRAM_ALL:
1371 case EVILNESS_CRAM_LEAD:
1372 case EVILNESS_CRAM_ECHO:
1373 /* We're not cramming elements in the
1374 all-to-all round, since that would just
1375 add more elements to the result set, but
1376 wouldn't test robustness. */
1377 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1379 GNUNET_SET_commit (setop->op, set->h);
1382 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1383 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1385 GNUNET_SET_commit (setop->op, set->h);
1388 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1390 GNUNET_SET_commit (setop->op, set->h);
1393 for (i = 0; i < evil.num; i++)
1395 struct GNUNET_SET_Element element;
1396 struct ConsensusStuffedElement se = { 0 };
1398 element.size = sizeof (struct ConsensusStuffedElement);
1399 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1401 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1403 /* Always generate a new element. */
1404 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1406 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1408 /* Always cram the same elements, derived from counter. */
1409 GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1415 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1416 #ifdef GNUNET_EXTRA_LOGGING
1417 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1418 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1419 session->local_peer_idx,
1420 debug_str_element (&element),
1421 debug_str_set_key (&setop->input_set),
1422 debug_str_task_key (&task->key));
1425 GNUNET_STATISTICS_update (statistics,
1426 "# stuffed elements",
1429 GNUNET_SET_commit (setop->op, set->h);
1431 case EVILNESS_SLACK:
1432 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1433 "P%u: evil peer: slacking\n",
1434 (unsigned int) session->local_peer_idx);
1436 case EVILNESS_SLACK_A2A:
1437 if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1438 (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1440 struct GNUNET_SET_Handle *empty_set;
1441 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1442 GNUNET_SET_commit (setop->op, empty_set);
1443 GNUNET_SET_destroy (empty_set);
1447 GNUNET_SET_commit (setop->op, set->h);
1451 GNUNET_SET_commit (setop->op, set->h);
1456 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1458 GNUNET_SET_commit (setop->op, set->h);
1462 /* For our testcases, we don't want the blacklisted
1464 GNUNET_SET_operation_cancel (setop->op);
1472 put_diff (struct ConsensusSession *session,
1473 struct DiffEntry *diff)
1475 struct GNUNET_HashCode hash;
1477 GNUNET_assert (NULL != diff);
1479 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1480 GNUNET_assert (GNUNET_OK ==
1481 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1482 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1486 put_set (struct ConsensusSession *session,
1487 struct SetEntry *set)
1489 struct GNUNET_HashCode hash;
1491 GNUNET_assert (NULL != set->h);
1493 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1495 debug_str_set_key (&set->key));
1497 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1498 GNUNET_assert (GNUNET_SYSERR !=
1499 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1500 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1505 put_rfn (struct ConsensusSession *session,
1506 struct ReferendumEntry *rfn)
1508 struct GNUNET_HashCode hash;
1510 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1511 GNUNET_assert (GNUNET_OK ==
1512 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1513 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1519 task_cancel_reconcile (struct TaskEntry *task)
1521 /* not implemented yet */
1527 apply_diff_to_rfn (struct DiffEntry *diff,
1528 struct ReferendumEntry *rfn,
1529 uint16_t voting_peer,
1532 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1533 struct DiffElementInfo *di;
1535 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1537 while (GNUNET_YES ==
1538 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1540 (const void **) &di))
1544 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1548 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1552 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1559 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1561 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1568 diff_compose (struct DiffEntry *diff_1,
1569 struct DiffEntry *diff_2)
1571 struct DiffEntry *diff_new;
1572 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1573 struct DiffElementInfo *di;
1575 diff_new = diff_create ();
1577 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1578 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1580 diff_insert (diff_new, di->weight, di->element);
1582 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1584 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1585 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1587 diff_insert (diff_new, di->weight, di->element);
1589 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1595 struct ReferendumEntry *
1596 rfn_create (uint16_t size)
1598 struct ReferendumEntry *rfn;
1600 rfn = GNUNET_new (struct ReferendumEntry);
1601 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1602 rfn->peer_commited = GNUNET_new_array (size, int);
1603 rfn->peer_contested = GNUNET_new_array (size, int);
1604 rfn->num_peers = size;
1612 diff_destroy (struct DiffEntry *diff)
1614 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1621 * For a given majority, count what the outcome
1622 * is (add/remove/keep), and give the number
1623 * of peers that voted for this outcome.
1626 rfn_majority (const struct ReferendumEntry *rfn,
1627 const struct RfnElementInfo *ri,
1628 uint16_t *ret_majority,
1629 enum ReferendumVote *ret_vote)
1631 uint16_t votes_yes = 0;
1632 uint16_t num_commited = 0;
1635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636 "Computing rfn majority for element %s of rfn {%s}\n",
1637 debug_str_element (ri->element),
1638 debug_str_rfn_key (&rfn->key));
1640 for (i = 0; i < rfn->num_peers; i++)
1642 if (GNUNET_NO == rfn->peer_commited[i])
1646 if (GNUNET_YES == ri->votes[i])
1650 if (votes_yes > (num_commited) / 2)
1652 *ret_vote = ri->proposal;
1653 *ret_majority = votes_yes;
1657 *ret_vote = VOTE_STAY;
1658 *ret_majority = num_commited - votes_yes;
1665 struct TaskEntry *task;
1666 struct SetKey dst_set_key;
1671 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1673 struct SetCopyCls *scc = cls;
1674 struct TaskEntry *task = scc->task;
1675 struct SetKey dst_set_key = scc->dst_set_key;
1676 struct SetEntry *set;
1677 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1680 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1681 task->step->session->set_handles_tail,
1685 set = GNUNET_new (struct SetEntry);
1687 set->key = dst_set_key;
1688 put_set (task->step->session, set);
1695 * Call the start function of the given
1696 * task again after we created a copy of the given set.
1699 create_set_copy_for_task (struct TaskEntry *task,
1700 struct SetKey *src_set_key,
1701 struct SetKey *dst_set_key)
1703 struct SetEntry *src_set;
1704 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707 "Copying set {%s} to {%s} for task {%s}\n",
1708 debug_str_set_key (src_set_key),
1709 debug_str_set_key (dst_set_key),
1710 debug_str_task_key (&task->key));
1713 scc->dst_set_key = *dst_set_key;
1714 src_set = lookup_set (task->step->session, src_set_key);
1715 GNUNET_assert (NULL != src_set);
1716 GNUNET_SET_copy_lazy (src_set->h,
1722 struct SetMutationProgressCls
1726 * Task to finish once all changes are through.
1728 struct TaskEntry *task;
1733 set_mutation_done (void *cls)
1735 struct SetMutationProgressCls *pc = cls;
1737 GNUNET_assert (pc->num_pending > 0);
1741 if (0 == pc->num_pending)
1743 struct TaskEntry *task = pc->task;
1751 try_finish_step_early (struct Step *step)
1755 if (GNUNET_YES == step->is_running)
1757 if (GNUNET_YES == step->is_finished)
1759 if (GNUNET_NO == step->early_finishable)
1762 step->is_finished = GNUNET_YES;
1764 #ifdef GNUNET_EXTRA_LOGGING
1765 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1766 "Finishing step `%s' early.\n",
1770 for (i = 0; i < step->subordinates_len; i++)
1772 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1773 step->subordinates[i]->pending_prereq--;
1774 #ifdef GNUNET_EXTRA_LOGGING
1775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1776 "Decreased pending_prereq to %u for step `%s'.\n",
1777 (unsigned int) step->subordinates[i]->pending_prereq,
1778 step->subordinates[i]->debug_name);
1781 try_finish_step_early (step->subordinates[i]);
1784 // XXX: maybe schedule as task to avoid recursion?
1785 run_ready_steps (step->session);
1790 finish_step (struct Step *step)
1794 GNUNET_assert (step->finished_tasks == step->tasks_len);
1795 GNUNET_assert (GNUNET_YES == step->is_running);
1796 GNUNET_assert (GNUNET_NO == step->is_finished);
1798 #ifdef GNUNET_EXTRA_LOGGING
1799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800 "All tasks of step `%s' with %u subordinates finished.\n",
1802 step->subordinates_len);
1805 for (i = 0; i < step->subordinates_len; i++)
1807 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1808 step->subordinates[i]->pending_prereq--;
1809 #ifdef GNUNET_EXTRA_LOGGING
1810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1811 "Decreased pending_prereq to %u for step `%s'.\n",
1812 (unsigned int) step->subordinates[i]->pending_prereq,
1813 step->subordinates[i]->debug_name);
1818 step->is_finished = GNUNET_YES;
1820 // XXX: maybe schedule as task to avoid recursion?
1821 run_ready_steps (step->session);
1827 * Apply the result from one round of gradecasts (i.e. every peer
1828 * should have gradecasted) to the peer's current set.
1830 * @param task the task with context information
1833 task_start_apply_round (struct TaskEntry *task)
1835 struct ConsensusSession *session = task->step->session;
1836 struct SetKey sk_in;
1837 struct SetKey sk_out;
1838 struct RfnKey rk_in;
1839 struct SetEntry *set_out;
1840 struct ReferendumEntry *rfn_in;
1841 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1842 struct RfnElementInfo *ri;
1843 struct SetMutationProgressCls *progress_cls;
1844 uint16_t worst_majority = UINT16_MAX;
1846 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1847 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1848 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1850 set_out = lookup_set (session, &sk_out);
1851 if (NULL == set_out)
1853 create_set_copy_for_task (task, &sk_in, &sk_out);
1857 rfn_in = lookup_rfn (session, &rk_in);
1858 GNUNET_assert (NULL != rfn_in);
1860 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1861 progress_cls->task = task;
1863 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1865 while (GNUNET_YES ==
1866 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1868 (const void **) &ri))
1870 uint16_t majority_num;
1871 enum ReferendumVote majority_vote;
1873 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1875 if (worst_majority > majority_num)
1876 worst_majority = majority_num;
1878 switch (majority_vote)
1881 progress_cls->num_pending++;
1882 GNUNET_assert (GNUNET_OK ==
1883 GNUNET_SET_add_element (set_out->h,
1887 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1888 "P%u: apply round: adding element %s with %u-majority.\n",
1889 session->local_peer_idx,
1890 debug_str_element (ri->element), majority_num);
1893 progress_cls->num_pending++;
1894 GNUNET_assert (GNUNET_OK ==
1895 GNUNET_SET_remove_element (set_out->h,
1899 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1900 "P%u: apply round: deleting element %s with %u-majority.\n",
1901 session->local_peer_idx,
1902 debug_str_element (ri->element), majority_num);
1905 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1906 "P%u: apply round: keeping element %s with %u-majority.\n",
1907 session->local_peer_idx,
1908 debug_str_element (ri->element), majority_num);
1917 if (0 == progress_cls->num_pending)
1919 // call closure right now, no pending ops
1920 GNUNET_free (progress_cls);
1925 uint16_t thresh = (session->num_peers / 3) * 2;
1927 if (worst_majority >= thresh)
1929 switch (session->early_stopping)
1931 case EARLY_STOPPING_NONE:
1932 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1933 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1934 "P%u: Stopping early (after one more superround)\n",
1935 session->local_peer_idx);
1937 case EARLY_STOPPING_ONE_MORE:
1938 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1939 session->local_peer_idx);
1940 session->early_stopping = EARLY_STOPPING_DONE;
1943 for (step = session->steps_head; NULL != step; step = step->next)
1944 try_finish_step_early (step);
1947 case EARLY_STOPPING_DONE:
1948 /* We shouldn't be here anymore after early stopping */
1956 else if (EARLY_STOPPING_NONE != session->early_stopping)
1958 // Our assumption about the number of bad peers
1960 GNUNET_break_op (0);
1964 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1965 session->local_peer_idx);
1968 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1973 task_start_grade (struct TaskEntry *task)
1975 struct ConsensusSession *session = task->step->session;
1976 struct ReferendumEntry *output_rfn;
1977 struct ReferendumEntry *input_rfn;
1978 struct DiffEntry *input_diff;
1979 struct RfnKey rfn_key;
1980 struct DiffKey diff_key;
1981 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1982 struct RfnElementInfo *ri;
1983 unsigned int gradecast_confidence = 2;
1985 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1986 output_rfn = lookup_rfn (session, &rfn_key);
1987 if (NULL == output_rfn)
1989 output_rfn = rfn_create (session->num_peers);
1990 output_rfn->key = rfn_key;
1991 put_rfn (session, output_rfn);
1994 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1995 input_diff = lookup_diff (session, &diff_key);
1996 GNUNET_assert (NULL != input_diff);
1998 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1999 input_rfn = lookup_rfn (session, &rfn_key);
2000 GNUNET_assert (NULL != input_rfn);
2002 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2004 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2006 while (GNUNET_YES ==
2007 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2009 (const void **) &ri))
2011 uint16_t majority_num;
2012 enum ReferendumVote majority_vote;
2014 // XXX: we need contested votes and non-contested votes here
2015 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2017 if (majority_num <= session->num_peers / 3)
2018 majority_vote = VOTE_REMOVE;
2020 switch (majority_vote)
2025 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2028 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2035 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2038 uint16_t noncontested;
2039 noncontested = rfn_noncontested (input_rfn);
2040 if (noncontested < (session->num_peers / 3) * 2)
2042 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2044 if (noncontested < (session->num_peers / 3) + 1)
2046 gradecast_confidence = 0;
2050 if (gradecast_confidence >= 1)
2051 rfn_commit (output_rfn, task->key.leader);
2053 if (gradecast_confidence <= 1)
2054 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2061 task_start_reconcile (struct TaskEntry *task)
2063 struct SetEntry *input;
2064 struct SetOpCls *setop = &task->cls.setop;
2065 struct ConsensusSession *session = task->step->session;
2067 input = lookup_set (session, &setop->input_set);
2068 GNUNET_assert (NULL != input);
2069 GNUNET_assert (NULL != input->h);
2071 /* We create the outputs for the operation here
2072 (rather than in the set operation callback)
2073 because we want something valid in there, even
2074 if the other peer doesn't talk to us */
2076 if (SET_KIND_NONE != setop->output_set.set_kind)
2078 /* If we don't have an existing output set,
2079 we clone the input set. */
2080 if (NULL == lookup_set (session, &setop->output_set))
2082 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2087 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2089 if (NULL == lookup_rfn (session, &setop->output_rfn))
2091 struct ReferendumEntry *rfn;
2093 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2094 "P%u: output rfn <%s> missing, creating.\n",
2095 session->local_peer_idx,
2096 debug_str_rfn_key (&setop->output_rfn));
2098 rfn = rfn_create (session->num_peers);
2099 rfn->key = setop->output_rfn;
2100 put_rfn (session, rfn);
2104 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2106 if (NULL == lookup_diff (session, &setop->output_diff))
2108 struct DiffEntry *diff;
2110 diff = diff_create ();
2111 diff->key = setop->output_diff;
2112 put_diff (session, diff);
2116 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2118 /* XXX: mark the corresponding rfn as commited if necessary */
2123 if (task->key.peer1 == session->local_peer_idx)
2125 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2127 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2128 "P%u: Looking up set {%s} to run remote union\n",
2129 session->local_peer_idx,
2130 debug_str_set_key (&setop->input_set));
2132 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2133 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2135 rcm.kind = htons (task->key.kind);
2136 rcm.peer1 = htons (task->key.peer1);
2137 rcm.peer2 = htons (task->key.peer2);
2138 rcm.leader = htons (task->key.leader);
2139 rcm.repetition = htons (task->key.repetition);
2141 GNUNET_assert (NULL == setop->op);
2142 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2143 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2145 struct GNUNET_SET_Option opts[] = {
2146 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2147 { GNUNET_SET_OPTION_END },
2150 // XXX: maybe this should be done while
2151 // setting up tasks alreays?
2152 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2153 &session->global_id,
2155 GNUNET_SET_RESULT_SYMMETRIC,
2160 commit_set (session, task);
2162 else if (task->key.peer2 == session->local_peer_idx)
2164 /* Wait for the other peer to contact us */
2165 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2166 session->local_peer_idx, task->key.peer1);
2168 if (NULL != setop->op)
2170 commit_set (session, task);
2175 /* We made an error while constructing the task graph. */
2182 task_start_eval_echo (struct TaskEntry *task)
2184 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2185 struct ReferendumEntry *input_rfn;
2186 struct RfnElementInfo *ri;
2187 struct SetEntry *output_set;
2188 struct SetMutationProgressCls *progress_cls;
2189 struct ConsensusSession *session = task->step->session;
2190 struct SetKey sk_in;
2191 struct SetKey sk_out;
2192 struct RfnKey rk_in;
2194 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2195 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2196 output_set = lookup_set (session, &sk_out);
2197 if (NULL == output_set)
2199 create_set_copy_for_task (task, &sk_in, &sk_out);
2205 // FIXME: should be marked as a shallow copy, so
2206 // we can destroy everything correctly
2207 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2208 last_set->h = output_set->h;
2209 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2210 put_set (session, last_set);
2213 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2214 "Evaluating referendum in Task {%s}\n",
2215 debug_str_task_key (&task->key));
2217 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2218 progress_cls->task = task;
2220 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2221 input_rfn = lookup_rfn (session, &rk_in);
2223 GNUNET_assert (NULL != input_rfn);
2225 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2226 GNUNET_assert (NULL != iter);
2228 while (GNUNET_YES ==
2229 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2231 (const void **) &ri))
2233 enum ReferendumVote majority_vote;
2234 uint16_t majority_num;
2236 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2238 if (majority_num < session->num_peers / 3)
2240 /* It is not the case that all nonfaulty peers
2241 echoed the same value. Since we're doing a set reconciliation, we
2242 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2243 reconciliation as contested. Other peers might not know that the
2244 leader is faulty, thus we still re-distribute in the confirmation
2246 output_set->is_contested = GNUNET_YES;
2249 switch (majority_vote)
2252 progress_cls->num_pending++;
2253 GNUNET_assert (GNUNET_OK ==
2254 GNUNET_SET_add_element (output_set->h,
2260 progress_cls->num_pending++;
2261 GNUNET_assert (GNUNET_OK ==
2262 GNUNET_SET_remove_element (output_set->h,
2268 /* Nothing to do. */
2276 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2278 if (0 == progress_cls->num_pending)
2280 // call closure right now, no pending ops
2281 GNUNET_free (progress_cls);
2288 task_start_finish (struct TaskEntry *task)
2290 struct SetEntry *final_set;
2291 struct ConsensusSession *session = task->step->session;
2293 final_set = lookup_set (session, &task->cls.finish.input_set);
2295 GNUNET_assert (NULL != final_set);
2298 GNUNET_SET_iterate (final_set->h,
2299 send_to_client_iter,
2304 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2308 GNUNET_assert (GNUNET_NO == task->is_started);
2309 GNUNET_assert (GNUNET_NO == task->is_finished);
2310 GNUNET_assert (NULL != task->start);
2314 task->is_started = GNUNET_YES;
2321 * Run all steps of the session that don't any
2322 * more dependencies.
2325 run_ready_steps (struct ConsensusSession *session)
2329 step = session->steps_head;
2331 while (NULL != step)
2333 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2337 GNUNET_assert (0 == step->finished_tasks);
2339 #ifdef GNUNET_EXTRA_LOGGING
2340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2341 session->local_peer_idx,
2343 step->round, step->tasks_len, step->subordinates_len);
2346 step->is_running = GNUNET_YES;
2347 for (i = 0; i < step->tasks_len; i++)
2348 start_task (session, step->tasks[i]);
2350 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2351 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2354 /* Running the next ready steps will be triggered by task completion */
2366 finish_task (struct TaskEntry *task)
2368 GNUNET_assert (GNUNET_NO == task->is_finished);
2369 task->is_finished = GNUNET_YES;
2371 task->step->finished_tasks++;
2373 if (task->step->finished_tasks == task->step->tasks_len)
2374 finish_step (task->step);
2379 * Search peer in the list of peers in session.
2381 * @param peer peer to find
2382 * @param session session with peer
2383 * @return index of peer, -1 if peer is not in session
2386 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2389 for (i = 0; i < session->num_peers; i++)
2390 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2397 * Compute a global, (hopefully) unique consensus session id,
2398 * from the local id of the consensus session, and the identities of all participants.
2399 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2400 * exactly the same peers, the global id will be different.
2402 * @param session session to generate the global id for
2403 * @param local_session_id local id of the consensus session
2406 compute_global_id (struct ConsensusSession *session,
2407 const struct GNUNET_HashCode *local_session_id)
2409 const char *salt = "gnunet-service-consensus/session_id";
2411 GNUNET_assert (GNUNET_YES ==
2412 GNUNET_CRYPTO_kdf (&session->global_id,
2413 sizeof (struct GNUNET_HashCode),
2417 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2419 sizeof (struct GNUNET_HashCode),
2425 * Compare two peer identities.
2427 * @param h1 some peer identity
2428 * @param h2 some peer identity
2429 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2432 peer_id_cmp (const void *h1, const void *h2)
2434 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2439 * Create the sorted list of peers for the session,
2440 * add the local peer if not in the join message.
2442 * @param session session to initialize
2443 * @param join_msg join message with the list of peers participating at the end
2446 initialize_session_peer_list (struct ConsensusSession *session,
2447 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2449 const struct GNUNET_PeerIdentity *msg_peers
2450 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2451 int local_peer_in_list;
2453 session->num_peers = ntohl (join_msg->num_peers);
2455 /* Peers in the join message, may or may not include the local peer,
2456 Add it if it is missing. */
2457 local_peer_in_list = GNUNET_NO;
2458 for (unsigned int i = 0; i < session->num_peers; i++)
2460 if (0 == memcmp (&msg_peers[i],
2462 sizeof (struct GNUNET_PeerIdentity)))
2464 local_peer_in_list = GNUNET_YES;
2468 if (GNUNET_NO == local_peer_in_list)
2469 session->num_peers++;
2471 session->peers = GNUNET_new_array (session->num_peers,
2472 struct GNUNET_PeerIdentity);
2473 if (GNUNET_NO == local_peer_in_list)
2474 session->peers[session->num_peers - 1] = my_peer;
2476 GNUNET_memcpy (session->peers,
2478 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2479 qsort (session->peers,
2481 sizeof (struct GNUNET_PeerIdentity),
2486 static struct TaskEntry *
2487 lookup_task (struct ConsensusSession *session,
2488 struct TaskKey *key)
2490 struct GNUNET_HashCode hash;
2493 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2494 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2495 GNUNET_h2s (&hash));
2496 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2501 * Called when another peer wants to do a set operation with the
2504 * @param cls closure
2505 * @param other_peer the other peer
2506 * @param context_msg message with application specific information from
2508 * @param request request from the other peer, use GNUNET_SET_accept
2509 * to accept it, otherwise the request will be refused
2510 * Note that we don't use a return value here, as it is also
2511 * necessary to specify the set we want to do the operation with,
2512 * whith sometimes can be derived from the context message.
2513 * Also necessary to specify the timeout.
2516 set_listen_cb (void *cls,
2517 const struct GNUNET_PeerIdentity *other_peer,
2518 const struct GNUNET_MessageHeader *context_msg,
2519 struct GNUNET_SET_Request *request)
2521 struct ConsensusSession *session = cls;
2523 struct TaskEntry *task;
2524 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2526 if (NULL == context_msg)
2528 GNUNET_break_op (0);
2532 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2534 GNUNET_break_op (0);
2538 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2540 GNUNET_break_op (0);
2544 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2546 tk = ((struct TaskKey) {
2547 .kind = ntohs (cm->kind),
2548 .peer1 = ntohs (cm->peer1),
2549 .peer2 = ntohs (cm->peer2),
2550 .repetition = ntohs (cm->repetition),
2551 .leader = ntohs (cm->leader),
2554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2555 session->local_peer_idx, debug_str_task_key (&tk));
2557 task = lookup_task (session, &tk);
2561 GNUNET_break_op (0);
2565 if (GNUNET_YES == task->is_finished)
2567 GNUNET_break_op (0);
2571 if (task->key.peer2 != session->local_peer_idx)
2573 /* We're being asked, so we must be thne 2nd peer. */
2574 GNUNET_break_op (0);
2578 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2579 (task->key.peer2 == session->local_peer_idx)));
2581 struct GNUNET_SET_Option opts[] = {
2582 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2583 { GNUNET_SET_OPTION_END },
2586 task->cls.setop.op = GNUNET_SET_accept (request,
2587 GNUNET_SET_RESULT_SYMMETRIC,
2592 /* If the task hasn't been started yet,
2593 we wait for that until we commit. */
2595 if (GNUNET_YES == task->is_started)
2597 commit_set (session, task);
2604 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2605 struct TaskEntry *t)
2607 struct GNUNET_HashCode round_hash;
2610 GNUNET_assert (NULL != t->step);
2612 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2616 if (s->tasks_len == s->tasks_cap)
2618 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2619 GNUNET_array_grow (s->tasks,
2624 #ifdef GNUNET_EXTRA_LOGGING
2625 GNUNET_assert (NULL != s->debug_name);
2626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2627 debug_str_task_key (&t->key),
2631 s->tasks[s->tasks_len] = t;
2634 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2635 GNUNET_assert (GNUNET_OK ==
2636 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2637 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2642 install_step_timeouts (struct ConsensusSession *session)
2644 /* Given the fully constructed task graph
2645 with rounds for tasks, we can give the tasks timeouts. */
2647 // unsigned int max_round;
2649 /* XXX: implement! */
2655 * Arrange two peers in some canonical order.
2658 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2663 GNUNET_assert (*p1 < n);
2664 GNUNET_assert (*p2 < n);
2677 /* For uniformly random *p1, *p2,
2678 this condition is true with 50% chance */
2679 if (((b - a) + n) % n <= n / 2)
2693 * Record @a dep as a dependency of @a step.
2696 step_depend_on (struct Step *step, struct Step *dep)
2698 /* We're not checking for cyclic dependencies,
2699 but this is a cheap sanity check. */
2700 GNUNET_assert (step != dep);
2701 GNUNET_assert (NULL != step);
2702 GNUNET_assert (NULL != dep);
2703 GNUNET_assert (dep->round <= step->round);
2705 #ifdef GNUNET_EXTRA_LOGGING
2706 /* Make sure we have complete debugging information.
2707 Also checks that we don't screw up too badly
2708 constructing the task graph. */
2709 GNUNET_assert (NULL != step->debug_name);
2710 GNUNET_assert (NULL != dep->debug_name);
2711 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2712 "Making step `%s' depend on `%s'\n",
2717 if (dep->subordinates_cap == dep->subordinates_len)
2719 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2720 GNUNET_array_grow (dep->subordinates,
2721 dep->subordinates_cap,
2725 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2727 dep->subordinates[dep->subordinates_len] = step;
2728 dep->subordinates_len++;
2730 step->pending_prereq++;
2734 static struct Step *
2735 create_step (struct ConsensusSession *session, int round, int early_finishable)
2738 step = GNUNET_new (struct Step);
2739 step->session = session;
2740 step->round = round;
2741 step->early_finishable = early_finishable;
2742 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2743 session->steps_tail,
2750 * Construct the task graph for a single
2754 construct_task_graph_gradecast (struct ConsensusSession *session,
2757 struct Step *step_before,
2758 struct Step *step_after)
2760 uint16_t n = session->num_peers;
2761 uint16_t me = session->local_peer_idx;
2766 /* The task we're currently setting up. */
2767 struct TaskEntry task;
2770 struct Step *prev_step;
2776 round = step_before->round + 1;
2778 /* gcast step 1: leader disseminates */
2780 step = create_step (session, round, GNUNET_YES);
2782 #ifdef GNUNET_EXTRA_LOGGING
2783 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2785 step_depend_on (step, step_before);
2789 for (k = 0; k < n; k++)
2795 arrange_peers (&p1, &p2, n);
2796 task = ((struct TaskEntry) {
2798 .start = task_start_reconcile,
2799 .cancel = task_cancel_reconcile,
2800 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2802 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2803 put_task (session->taskmap, &task);
2805 /* We run this task to make sure that the leader
2806 has the stored the SET_KIND_LEADER set of himself,
2807 so he can participate in the rest of the gradecast
2808 without the code having to handle any special cases. */
2809 task = ((struct TaskEntry) {
2811 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2812 .start = task_start_reconcile,
2813 .cancel = task_cancel_reconcile,
2815 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2816 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2817 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2818 put_task (session->taskmap, &task);
2824 arrange_peers (&p1, &p2, n);
2825 task = ((struct TaskEntry) {
2827 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2828 .start = task_start_reconcile,
2829 .cancel = task_cancel_reconcile,
2831 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2832 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2833 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2834 put_task (session->taskmap, &task);
2837 /* gcast phase 2: echo */
2840 step = create_step (session, round, GNUNET_YES);
2841 #ifdef GNUNET_EXTRA_LOGGING
2842 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2844 step_depend_on (step, prev_step);
2846 for (k = 0; k < n; k++)
2850 arrange_peers (&p1, &p2, n);
2851 task = ((struct TaskEntry) {
2853 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2854 .start = task_start_reconcile,
2855 .cancel = task_cancel_reconcile,
2857 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2858 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2859 put_task (session->taskmap, &task);
2863 /* Same round, since step only has local tasks */
2864 step = create_step (session, round, GNUNET_YES);
2865 #ifdef GNUNET_EXTRA_LOGGING
2866 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2868 step_depend_on (step, prev_step);
2870 arrange_peers (&p1, &p2, n);
2871 task = ((struct TaskEntry) {
2872 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2874 .start = task_start_eval_echo
2876 put_task (session->taskmap, &task);
2880 step = create_step (session, round, GNUNET_YES);
2881 #ifdef GNUNET_EXTRA_LOGGING
2882 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2884 step_depend_on (step, prev_step);
2886 /* gcast phase 3: confirmation and grading */
2887 for (k = 0; k < n; k++)
2891 arrange_peers (&p1, &p2, n);
2892 task = ((struct TaskEntry) {
2894 .start = task_start_reconcile,
2895 .cancel = task_cancel_reconcile,
2896 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2898 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2899 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2900 /* If there was at least one element in the echo round that was
2901 contested (i.e. it had no n-t majority), then we let the other peers
2902 know, and other peers let us know. The contested flag for each peer is
2903 stored in the rfn. */
2904 task.cls.setop.transceive_contested = GNUNET_YES;
2905 put_task (session->taskmap, &task);
2909 /* Same round, since step only has local tasks */
2910 step = create_step (session, round, GNUNET_YES);
2911 #ifdef GNUNET_EXTRA_LOGGING
2912 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2914 step_depend_on (step, prev_step);
2916 task = ((struct TaskEntry) {
2918 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2919 .start = task_start_grade,
2921 put_task (session->taskmap, &task);
2923 step_depend_on (step_after, step);
2928 construct_task_graph (struct ConsensusSession *session)
2930 uint16_t n = session->num_peers;
2933 uint16_t me = session->local_peer_idx;
2935 /* The task we're currently setting up. */
2936 struct TaskEntry task;
2938 /* Current leader */
2942 struct Step *prev_step;
2944 unsigned int round = 0;
2948 // XXX: introduce first step,
2949 // where we wait for all insert acks
2950 // from the set service
2952 /* faster but brittle all-to-all */
2954 // XXX: Not implemented yet
2956 /* all-to-all step */
2958 step = create_step (session, round, GNUNET_NO);
2960 #ifdef GNUNET_EXTRA_LOGGING
2961 step->debug_name = GNUNET_strdup ("all to all");
2964 for (i = 0; i < n; i++)
2971 arrange_peers (&p1, &p2, n);
2972 task = ((struct TaskEntry) {
2973 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2975 .start = task_start_reconcile,
2976 .cancel = task_cancel_reconcile,
2978 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2979 task.cls.setop.output_set = task.cls.setop.input_set;
2980 task.cls.setop.do_not_remove = GNUNET_YES;
2981 put_task (session->taskmap, &task);
2986 step = create_step (session, round, GNUNET_NO);;
2987 #ifdef GNUNET_EXTRA_LOGGING
2988 step->debug_name = GNUNET_strdup ("all to all 2");
2990 step_depend_on (step, prev_step);
2993 for (i = 0; i < n; i++)
3000 arrange_peers (&p1, &p2, n);
3001 task = ((struct TaskEntry) {
3002 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3004 .start = task_start_reconcile,
3005 .cancel = task_cancel_reconcile,
3007 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3008 task.cls.setop.output_set = task.cls.setop.input_set;
3009 task.cls.setop.do_not_remove = GNUNET_YES;
3010 put_task (session->taskmap, &task);
3020 /* Byzantine union */
3022 /* sequential repetitions of the gradecasts */
3023 for (i = 0; i < t + 1; i++)
3025 struct Step *step_rep_start;
3026 struct Step *step_rep_end;
3028 /* Every repetition is in a separate round. */
3029 step_rep_start = create_step (session, round, GNUNET_YES);
3030 #ifdef GNUNET_EXTRA_LOGGING
3031 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3034 step_depend_on (step_rep_start, prev_step);
3036 /* gradecast has three rounds */
3038 step_rep_end = create_step (session, round, GNUNET_YES);
3039 #ifdef GNUNET_EXTRA_LOGGING
3040 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3043 /* parallel gradecasts */
3044 for (lead = 0; lead < n; lead++)
3045 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3047 task = ((struct TaskEntry) {
3048 .step = step_rep_end,
3049 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3050 .start = task_start_apply_round,
3052 put_task (session->taskmap, &task);
3054 prev_step = step_rep_end;
3057 /* There is no next gradecast round, thus the final
3058 start step is the overall end step of the gradecasts */
3060 step = create_step (session, round, GNUNET_NO);
3061 #ifdef GNUNET_EXTRA_LOGGING
3062 GNUNET_asprintf (&step->debug_name, "finish");
3064 step_depend_on (step, prev_step);
3066 task = ((struct TaskEntry) {
3068 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3069 .start = task_start_finish,
3071 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3073 put_task (session->taskmap, &task);
3079 * Check join message.
3081 * @param cls session of client that sent the message
3082 * @param m message sent by the client
3083 * @return #GNUNET_OK if @a m is well-formed
3086 check_client_join (void *cls,
3087 const struct GNUNET_CONSENSUS_JoinMessage *m)
3089 uint32_t listed_peers = ntohl (m->num_peers);
3091 if ( (ntohs (m->header.size) - sizeof (*m)) !=
3092 listed_peers * sizeof (struct GNUNET_PeerIdentity))
3095 return GNUNET_SYSERR;
3102 * Called when a client wants to join a consensus session.
3104 * @param cls session of client that sent the message
3105 * @param m message sent by the client
3108 handle_client_join (void *cls,
3109 const struct GNUNET_CONSENSUS_JoinMessage *m)
3111 struct ConsensusSession *session = cls;
3112 struct ConsensusSession *other_session;
3114 initialize_session_peer_list (session,
3116 compute_global_id (session,
3119 /* Check if some local client already owns the session.
3120 It is only legal to have a session with an existing global id
3121 if all other sessions with this global id are finished.*/
3122 for (other_session = sessions_head;
3123 NULL != other_session;
3124 other_session = other_session->next)
3126 if ( (other_session != session) &&
3127 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3128 &other_session->global_id)) )
3132 session->conclude_deadline
3133 = GNUNET_TIME_absolute_ntoh (m->deadline);
3134 session->conclude_start
3135 = GNUNET_TIME_absolute_ntoh (m->start);
3136 session->local_peer_idx = get_peer_idx (&my_peer,
3138 GNUNET_assert (-1 != session->local_peer_idx);
3140 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3141 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3142 GNUNET_h2s (&m->session_id),
3144 session->local_peer_idx,
3145 GNUNET_STRINGS_relative_time_to_string
3146 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3147 session->conclude_deadline),
3150 session->set_listener
3151 = GNUNET_SET_listen (cfg,
3152 GNUNET_SET_OPERATION_UNION,
3153 &session->global_id,
3157 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3159 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3161 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3163 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3167 struct SetEntry *client_set;
3169 client_set = GNUNET_new (struct SetEntry);
3170 client_set->h = GNUNET_SET_create (cfg,
3171 GNUNET_SET_OPERATION_UNION);
3172 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3173 sh->h = client_set->h;
3174 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3175 session->set_handles_tail,
3177 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3182 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3185 /* Just construct the task graph,
3186 but don't run anything until the client calls conclude. */
3187 construct_task_graph (session);
3188 GNUNET_SERVICE_client_continue (session->client);
3193 client_insert_done (void *cls)
3200 * Called when a client performs an insert operation.
3202 * @param cls client handle
3203 * @param msg message sent by the client
3204 * @return #GNUNET_OK (always well-formed)
3207 check_client_insert (void *cls,
3208 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3215 * Called when a client performs an insert operation.
3217 * @param cls client handle
3218 * @param msg message sent by the client
3221 handle_client_insert (void *cls,
3222 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3224 struct ConsensusSession *session = cls;
3225 ssize_t element_size;
3226 struct GNUNET_SET_Handle *initial_set;
3227 struct ConsensusElement *ce;
3229 if (GNUNET_YES == session->conclude_started)
3232 GNUNET_SERVICE_client_drop (session->client);
3236 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3237 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3238 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3239 ce->payload_type = msg->element_type;
3241 struct GNUNET_SET_Element element = {
3242 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3243 .size = sizeof (struct ConsensusElement) + element_size,
3248 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3249 struct SetEntry *entry;
3251 entry = lookup_set (session,
3253 GNUNET_assert (NULL != entry);
3254 initial_set = entry->h;
3257 session->num_client_insert_pending++;
3258 GNUNET_SET_add_element (initial_set,
3260 &client_insert_done,
3263 #ifdef GNUNET_EXTRA_LOGGING
3265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3266 "P%u: element %s added\n",
3267 session->local_peer_idx,
3268 debug_str_element (&element));
3272 GNUNET_SERVICE_client_continue (session->client);
3277 * Called when a client performs the conclude operation.
3279 * @param cls client handle
3280 * @param message message sent by the client
3283 handle_client_conclude (void *cls,
3284 const struct GNUNET_MessageHeader *message)
3286 struct ConsensusSession *session = cls;
3288 if (GNUNET_YES == session->conclude_started)
3290 /* conclude started twice */
3292 GNUNET_SERVICE_client_drop (session->client);
3295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3296 "conclude requested\n");
3297 session->conclude_started = GNUNET_YES;
3298 install_step_timeouts (session);
3299 run_ready_steps (session);
3300 GNUNET_SERVICE_client_continue (session->client);
3305 * Called to clean up, after a shutdown has been requested.
3307 * @param cls closure
3310 shutdown_task (void *cls)
3312 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3314 GNUNET_STATISTICS_destroy (statistics,
3321 * Start processing consensus requests.
3323 * @param cls closure
3324 * @param c configuration to use
3325 * @param service the initialized service
3329 const struct GNUNET_CONFIGURATION_Handle *c,
3330 struct GNUNET_SERVICE_Handle *service)
3334 GNUNET_CRYPTO_get_peer_identity (cfg,
3337 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3338 "Could not retrieve host identity\n");
3339 GNUNET_SCHEDULER_shutdown ();
3342 statistics = GNUNET_STATISTICS_create ("consensus",
3344 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3350 * Callback called when a client connects to the service.
3352 * @param cls closure for the service
3353 * @param c the new client that connected to the service
3354 * @param mq the message queue used to send messages to the client
3358 client_connect_cb (void *cls,
3359 struct GNUNET_SERVICE_Client *c,
3360 struct GNUNET_MQ_Handle *mq)
3362 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3364 session->client = c;
3365 session->client_mq = mq;
3366 GNUNET_CONTAINER_DLL_insert (sessions_head,
3374 * Callback called when a client disconnected from the service
3376 * @param cls closure for the service
3377 * @param c the client that disconnected
3378 * @param internal_cls should be equal to @a c
3381 client_disconnect_cb (void *cls,
3382 struct GNUNET_SERVICE_Client *c,
3385 struct ConsensusSession *session = internal_cls;
3387 if (NULL != session->set_listener)
3389 GNUNET_SET_listen_cancel (session->set_listener);
3390 session->set_listener = NULL;
3392 GNUNET_CONTAINER_DLL_remove (sessions_head,
3396 while (session->set_handles_head)
3398 struct SetHandle *sh = session->set_handles_head;
3399 session->set_handles_head = sh->next;
3400 GNUNET_SET_destroy (sh->h);
3403 GNUNET_free (session);
3408 * Define "main" method using service macro.
3412 GNUNET_SERVICE_OPTION_NONE,
3415 &client_disconnect_cb,
3417 GNUNET_MQ_hd_fixed_size (client_conclude,
3418 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3419 struct GNUNET_MessageHeader,
3421 GNUNET_MQ_hd_var_size (client_insert,
3422 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3423 struct GNUNET_CONSENSUS_ElementMessage,
3425 GNUNET_MQ_hd_var_size (client_join,
3426 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3427 struct GNUNET_CONSENSUS_JoinMessage,
3429 GNUNET_MQ_handler_end ());
3431 /* end of gnunet-service-consensus.c */