2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
17 * @file consensus/gnunet-service-consensus.c
18 * @brief multi-peer set reconciliation
19 * @author Florian Dold <flo@dold.me>
23 #include "gnunet_util_lib.h"
24 #include "gnunet_block_lib.h"
25 #include "gnunet_protocols.h"
26 #include "gnunet_applications.h"
27 #include "gnunet_set_service.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_consensus_service.h"
30 #include "consensus_protocol.h"
31 #include "consensus.h"
37 * Vote that nothing should change.
38 * This option is never voted explicitly.
42 * Vote that an element should be added.
46 * Vote that an element should be removed.
52 enum EarlyStoppingPhase
54 EARLY_STOPPING_NONE = 0,
55 EARLY_STOPPING_ONE_MORE = 1,
56 EARLY_STOPPING_DONE = 2,
60 GNUNET_NETWORK_STRUCT_BEGIN
63 * Tuple of integers that together
64 * identify a task uniquely.
68 * A value from 'enum PhaseKind'.
70 uint16_t kind GNUNET_PACKED;
73 * Number of the first peer
76 int16_t peer1 GNUNET_PACKED;
79 * Number of the second peer in canonical order.
81 int16_t peer2 GNUNET_PACKED;
84 * Repetition of the gradecast phase.
86 int16_t repetition GNUNET_PACKED;
89 * Leader in the gradecast phase.
91 * Can be different from both peer1 and peer2.
93 int16_t leader GNUNET_PACKED;
100 int set_kind GNUNET_PACKED;
101 int k1 GNUNET_PACKED;
102 int k2 GNUNET_PACKED;
109 struct GNUNET_SET_Handle *h;
111 * GNUNET_YES if the set resulted
112 * from applying a referendum with contested
121 int diff_kind GNUNET_PACKED;
122 int k1 GNUNET_PACKED;
123 int k2 GNUNET_PACKED;
128 int rfn_kind GNUNET_PACKED;
129 int k1 GNUNET_PACKED;
130 int k2 GNUNET_PACKED;
134 GNUNET_NETWORK_STRUCT_END
138 PHASE_KIND_ALL_TO_ALL,
139 PHASE_KIND_ALL_TO_ALL_2,
140 PHASE_KIND_GRADECAST_LEADER,
141 PHASE_KIND_GRADECAST_ECHO,
142 PHASE_KIND_GRADECAST_ECHO_GRADE,
143 PHASE_KIND_GRADECAST_CONFIRM,
144 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
146 * Apply a repetition of the all-to-all
147 * gradecast to the current set.
149 PHASE_KIND_APPLY_REP,
159 * Last result set from a gradecast
161 SET_KIND_LAST_GRADECAST,
162 SET_KIND_LEADER_PROPOSAL,
163 SET_KIND_ECHO_RESULT,
169 DIFF_KIND_LEADER_PROPOSAL,
170 DIFF_KIND_LEADER_CONSENSUS,
171 DIFF_KIND_GRADECAST_RESULT,
179 RFN_KIND_GRADECAST_RESULT
185 struct SetKey input_set;
187 struct SetKey output_set;
188 struct RfnKey output_rfn;
189 struct DiffKey output_diff;
193 int transceive_contested;
195 struct GNUNET_SET_OperationHandle *op;
201 struct SetKey input_set;
205 * Closure for both @a start_task
206 * and @a cancel_task.
210 struct SetOpCls setop;
211 struct FinishCls finish;
216 typedef void (*TaskFunc) (struct TaskEntry *task);
219 * Node in the consensus task graph.
234 union TaskFuncCls cls;
241 * All steps of one session are in a
242 * linked list for easier deallocation.
247 * All steps of one session are in a
248 * linked list for easier deallocation.
252 struct ConsensusSession *session;
255 * Tasks that this step is composed of.
257 struct TaskEntry **tasks;
258 unsigned int tasks_len;
259 unsigned int tasks_cap;
261 unsigned int finished_tasks;
264 * Tasks that have this task as dependency.
266 * We store pointers to subordinates rather
267 * than to prerequisites since it makes
268 * tracking the readiness of a task easier.
270 struct Step **subordinates;
271 unsigned int subordinates_len;
272 unsigned int subordinates_cap;
275 * Counter for the prerequisites of
278 size_t pending_prereq;
281 * Task that will run this step despite
282 * any pending prerequisites.
284 struct GNUNET_SCHEDULER_Task *timeout_task;
286 unsigned int is_running;
288 unsigned int is_finished;
291 * Synchrony round of the task.
292 * Determines the deadline for the task.
297 * Human-readable name for
298 * the task, used for debugging.
303 * When we're doing an early finish, how should this step be
305 * If GNUNET_YES, the step will be marked as finished
306 * without actually running its tasks.
307 * Otherwise, the step will still be run even after
310 * Note that a task may never be finished early if
311 * it is already running.
313 int early_finishable;
317 struct RfnElementInfo
319 const struct GNUNET_SET_Element *element;
322 * GNUNET_YES if the peer votes for the proposal.
327 * Proposal for this element,
328 * can only be VOTE_ADD or VOTE_REMOVE.
330 enum ReferendumVote proposal;
334 struct ReferendumEntry
339 * Elements where there is at least one proposed change.
341 * Maps the hash of the GNUNET_SET_Element
342 * to 'struct RfnElementInfo'.
344 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
346 unsigned int num_peers;
349 * Stores, for every peer in the session,
350 * whether the peer finished the whole referendum.
352 * Votes from peers are only counted if they're
353 * marked as commited (#GNUNET_YES) in the referendum.
355 * Otherwise (#GNUNET_NO), the requested changes are
356 * not counted for majority votes or thresholds.
362 * Contestation state of the peer. If a peer is contested, the values it
363 * contributed are still counted for applying changes, but the grading is
370 struct DiffElementInfo
372 const struct GNUNET_SET_Element *element;
375 * Positive weight for 'add', negative
376 * weights for 'remove'.
388 struct GNUNET_CONTAINER_MultiHashMap *changes;
393 struct SetHandle *prev;
394 struct SetHandle *next;
396 struct GNUNET_SET_Handle *h;
402 * A consensus session consists of one local client and the remote authorities.
404 struct ConsensusSession
407 * Consensus sessions are kept in a DLL.
409 struct ConsensusSession *next;
412 * Consensus sessions are kept in a DLL.
414 struct ConsensusSession *prev;
416 unsigned int num_client_insert_pending;
418 struct GNUNET_CONTAINER_MultiHashMap *setmap;
419 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
420 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
423 * Array of peers with length 'num_peers'.
425 int *peers_blacklisted;
428 * Mapping from (hashed) TaskKey to TaskEntry.
430 * We map the application_id for a round to the task that should be
431 * executed, so we don't have to go through all task whenever we get
432 * an incoming set op request.
434 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
436 struct Step *steps_head;
437 struct Step *steps_tail;
439 int conclude_started;
444 * Global consensus identification, computed
445 * from the session id and participating authorities.
447 struct GNUNET_HashCode global_id;
450 * Client that inhabits the session
452 struct GNUNET_SERVICE_Client *client;
455 * Queued messages to the client.
457 struct GNUNET_MQ_Handle *client_mq;
460 * Time when the conclusion of the consensus should begin.
462 struct GNUNET_TIME_Absolute conclude_start;
465 * Timeout for all rounds together, single rounds will schedule a timeout task
466 * with a fraction of the conclude timeout.
467 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
469 struct GNUNET_TIME_Absolute conclude_deadline;
471 struct GNUNET_PeerIdentity *peers;
474 * Number of other peers in the consensus.
476 unsigned int num_peers;
479 * Index of the local peer in the peers array
481 unsigned int local_peer_idx;
484 * Listener for requests from other peers.
485 * Uses the session's global id as app id.
487 struct GNUNET_SET_ListenHandle *set_listener;
490 * State of our early stopping scheme.
495 * Our set size from the first round.
499 uint64_t *first_sizes_received;
502 * Bounded Eppstein lower bound.
504 uint64_t lower_bound;
506 struct SetHandle *set_handles_head;
507 struct SetHandle *set_handles_tail;
511 * Linked list of sessions this peer participates in.
513 static struct ConsensusSession *sessions_head;
516 * Linked list of sessions this peer participates in.
518 static struct ConsensusSession *sessions_tail;
521 * Configuration of the consensus service.
523 static const struct GNUNET_CONFIGURATION_Handle *cfg;
526 * Peer that runs this service.
528 static struct GNUNET_PeerIdentity my_peer;
533 struct GNUNET_STATISTICS_Handle *statistics;
537 finish_task (struct TaskEntry *task);
541 run_ready_steps (struct ConsensusSession *session);
545 phasename (uint16_t phase)
549 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
550 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
551 case PHASE_KIND_FINISH: return "FINISH";
552 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
553 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
554 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
555 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
556 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
557 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
558 default: return "(unknown)";
564 setname (uint16_t kind)
568 case SET_KIND_CURRENT: return "CURRENT";
569 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
570 case SET_KIND_NONE: return "NONE";
571 default: return "(unknown)";
576 rfnname (uint16_t kind)
580 case RFN_KIND_NONE: return "NONE";
581 case RFN_KIND_ECHO: return "ECHO";
582 case RFN_KIND_CONFIRM: return "CONFIRM";
583 default: return "(unknown)";
588 diffname (uint16_t kind)
592 case DIFF_KIND_NONE: return "NONE";
593 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
594 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
595 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
596 default: return "(unknown)";
600 #ifdef GNUNET_EXTRA_LOGGING
604 debug_str_element (const struct GNUNET_SET_Element *el)
606 struct GNUNET_HashCode hash;
608 GNUNET_SET_element_hash (el, &hash);
610 return GNUNET_h2s (&hash);
614 debug_str_task_key (struct TaskKey *tk)
616 static char buf[256];
618 snprintf (buf, sizeof (buf),
619 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
620 phasename (tk->kind), tk->peer1, tk->peer2,
621 tk->leader, tk->repetition);
627 debug_str_diff_key (struct DiffKey *dk)
629 static char buf[256];
631 snprintf (buf, sizeof (buf),
632 "DiffKey kind=%s, k1=%d, k2=%d",
633 diffname (dk->diff_kind), dk->k1, dk->k2);
639 debug_str_set_key (const struct SetKey *sk)
641 static char buf[256];
643 snprintf (buf, sizeof (buf),
644 "SetKey kind=%s, k1=%d, k2=%d",
645 setname (sk->set_kind), sk->k1, sk->k2);
652 debug_str_rfn_key (const struct RfnKey *rk)
654 static char buf[256];
656 snprintf (buf, sizeof (buf),
657 "RfnKey kind=%s, k1=%d, k2=%d",
658 rfnname (rk->rfn_kind), rk->k1, rk->k2);
663 #endif /* GNUNET_EXTRA_LOGGING */
667 * Send the final result set of the consensus to the client, element by
671 * @param element the current element, NULL if all elements have been
673 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
676 send_to_client_iter (void *cls,
677 const struct GNUNET_SET_Element *element)
679 struct TaskEntry *task = (struct TaskEntry *) cls;
680 struct ConsensusSession *session = task->step->session;
681 struct GNUNET_MQ_Envelope *ev;
685 struct GNUNET_CONSENSUS_ElementMessage *m;
686 const struct ConsensusElement *ce;
688 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697 "P%d: sending element %s to client\n",
698 session->local_peer_idx,
699 debug_str_element (element));
701 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
702 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
703 m->element_type = ce->payload_type;
704 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
705 GNUNET_MQ_send (session->client_mq, ev);
709 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710 "P%d: finished iterating elements for client\n",
711 session->local_peer_idx);
712 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
713 GNUNET_MQ_send (session->client_mq, ev);
719 static struct SetEntry *
720 lookup_set (struct ConsensusSession *session, struct SetKey *key)
722 struct GNUNET_HashCode hash;
724 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
725 "P%u: looking up set {%s}\n",
726 session->local_peer_idx,
727 debug_str_set_key (key));
729 GNUNET_assert (SET_KIND_NONE != key->set_kind);
730 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
731 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
735 static struct DiffEntry *
736 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
738 struct GNUNET_HashCode hash;
740 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741 "P%u: looking up diff {%s}\n",
742 session->local_peer_idx,
743 debug_str_diff_key (key));
745 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
746 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
747 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
751 static struct ReferendumEntry *
752 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
754 struct GNUNET_HashCode hash;
756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757 "P%u: looking up rfn {%s}\n",
758 session->local_peer_idx,
759 debug_str_rfn_key (key));
761 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
762 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
763 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
768 diff_insert (struct DiffEntry *diff,
770 const struct GNUNET_SET_Element *element)
772 struct DiffElementInfo *di;
773 struct GNUNET_HashCode hash;
775 GNUNET_assert ( (1 == weight) || (-1 == weight));
777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778 "diff_insert with element size %u\n",
781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782 "hashing element\n");
784 GNUNET_SET_element_hash (element, &hash);
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
793 di = GNUNET_new (struct DiffElementInfo);
794 di->element = GNUNET_SET_element_dup (element);
795 GNUNET_assert (GNUNET_OK ==
796 GNUNET_CONTAINER_multihashmap_put (diff->changes,
798 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
806 rfn_commit (struct ReferendumEntry *rfn,
807 uint16_t commit_peer)
809 GNUNET_assert (commit_peer < rfn->num_peers);
811 rfn->peer_commited[commit_peer] = GNUNET_YES;
816 rfn_contest (struct ReferendumEntry *rfn,
817 uint16_t contested_peer)
819 GNUNET_assert (contested_peer < rfn->num_peers);
821 rfn->peer_contested[contested_peer] = GNUNET_YES;
826 rfn_noncontested (struct ReferendumEntry *rfn)
832 for (i = 0; i < rfn->num_peers; i++)
833 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
841 rfn_vote (struct ReferendumEntry *rfn,
842 uint16_t voting_peer,
843 enum ReferendumVote vote,
844 const struct GNUNET_SET_Element *element)
846 struct RfnElementInfo *ri;
847 struct GNUNET_HashCode hash;
849 GNUNET_assert (voting_peer < rfn->num_peers);
851 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
852 since VOTE_KEEP is implicit in not voting. */
853 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
855 GNUNET_SET_element_hash (element, &hash);
856 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
860 ri = GNUNET_new (struct RfnElementInfo);
861 ri->element = GNUNET_SET_element_dup (element);
862 ri->votes = GNUNET_new_array (rfn->num_peers, int);
863 GNUNET_assert (GNUNET_OK ==
864 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
866 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
869 ri->votes[voting_peer] = GNUNET_YES;
875 task_other_peer (struct TaskEntry *task)
877 uint16_t me = task->step->session->local_peer_idx;
878 if (task->key.peer1 == me)
879 return task->key.peer2;
880 return task->key.peer1;
885 cmp_uint64_t (const void *pa, const void *pb)
887 uint64_t a = *(uint64_t *) pa;
888 uint64_t b = *(uint64_t *) pb;
899 * Callback for set operation results. Called for each element
903 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
904 * @param current_size current set size
905 * @param status see enum GNUNET_SET_Status
908 set_result_cb (void *cls,
909 const struct GNUNET_SET_Element *element,
910 uint64_t current_size,
911 enum GNUNET_SET_Status status)
913 struct TaskEntry *task = cls;
914 struct ConsensusSession *session = task->step->session;
915 struct SetEntry *output_set = NULL;
916 struct DiffEntry *output_diff = NULL;
917 struct ReferendumEntry *output_rfn = NULL;
918 unsigned int other_idx;
919 struct SetOpCls *setop;
920 const struct ConsensusElement *consensus_element = NULL;
924 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925 "P%u: got element of type %u, status %u\n",
926 session->local_peer_idx,
927 (unsigned) element->element_type,
929 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
930 consensus_element = element->data;
933 setop = &task->cls.setop;
936 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
937 "P%u: got set result for {%s}, status %u\n",
938 session->local_peer_idx,
939 debug_str_task_key (&task->key),
942 if (GNUNET_NO == task->is_started)
948 if (GNUNET_YES == task->is_finished)
954 other_idx = task_other_peer (task);
956 if (SET_KIND_NONE != setop->output_set.set_kind)
958 output_set = lookup_set (session, &setop->output_set);
959 GNUNET_assert (NULL != output_set);
962 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
964 output_diff = lookup_diff (session, &setop->output_diff);
965 GNUNET_assert (NULL != output_diff);
968 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
970 output_rfn = lookup_rfn (session, &setop->output_rfn);
971 GNUNET_assert (NULL != output_rfn);
974 if (GNUNET_YES == session->peers_blacklisted[other_idx])
976 /* Peer might have been blacklisted
977 by a gradecast running in parallel, ignore elements from now */
978 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
980 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
984 if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987 "P%u: got some marker\n",
988 session->local_peer_idx);
989 if ( (GNUNET_YES == setop->transceive_contested) &&
990 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
992 GNUNET_assert (NULL != output_rfn);
993 rfn_contest (output_rfn, task_other_peer (task));
997 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1000 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001 "P%u: got size marker\n",
1002 session->local_peer_idx);
1005 struct ConsensusSizeElement *cse = (void *) consensus_element;
1007 if (cse->sender_index == other_idx)
1009 if (NULL == session->first_sizes_received)
1010 session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1011 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1013 uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1014 qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1015 session->lower_bound = copy[session->num_peers / 3 + 1];
1016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017 "P%u: lower bound %llu\n",
1018 session->local_peer_idx,
1019 (long long) session->lower_bound);
1030 case GNUNET_SET_STATUS_ADD_LOCAL:
1031 GNUNET_assert (NULL != consensus_element);
1032 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033 "Adding element in Task {%s}\n",
1034 debug_str_task_key (&task->key));
1035 if (NULL != output_set)
1037 // FIXME: record pending adds, use callback
1038 GNUNET_SET_add_element (output_set->h,
1042 #ifdef GNUNET_EXTRA_LOGGING
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "P%u: adding element %s into set {%s} of task {%s}\n",
1045 session->local_peer_idx,
1046 debug_str_element (element),
1047 debug_str_set_key (&setop->output_set),
1048 debug_str_task_key (&task->key));
1051 if (NULL != output_diff)
1053 diff_insert (output_diff, 1, element);
1054 #ifdef GNUNET_EXTRA_LOGGING
1055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056 "P%u: adding element %s into diff {%s} of task {%s}\n",
1057 session->local_peer_idx,
1058 debug_str_element (element),
1059 debug_str_diff_key (&setop->output_diff),
1060 debug_str_task_key (&task->key));
1063 if (NULL != output_rfn)
1065 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1066 #ifdef GNUNET_EXTRA_LOGGING
1067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1069 session->local_peer_idx,
1070 debug_str_element (element),
1071 debug_str_rfn_key (&setop->output_rfn),
1072 debug_str_task_key (&task->key));
1075 // XXX: add result to structures in task
1077 case GNUNET_SET_STATUS_ADD_REMOTE:
1078 GNUNET_assert (NULL != consensus_element);
1079 if (GNUNET_YES == setop->do_not_remove)
1081 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1083 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084 "Removing element in Task {%s}\n",
1085 debug_str_task_key (&task->key));
1086 if (NULL != output_set)
1088 // FIXME: record pending adds, use callback
1089 GNUNET_SET_remove_element (output_set->h,
1093 #ifdef GNUNET_EXTRA_LOGGING
1094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095 "P%u: removing element %s from set {%s} of task {%s}\n",
1096 session->local_peer_idx,
1097 debug_str_element (element),
1098 debug_str_set_key (&setop->output_set),
1099 debug_str_task_key (&task->key));
1102 if (NULL != output_diff)
1104 diff_insert (output_diff, -1, element);
1105 #ifdef GNUNET_EXTRA_LOGGING
1106 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107 "P%u: removing element %s from diff {%s} of task {%s}\n",
1108 session->local_peer_idx,
1109 debug_str_element (element),
1110 debug_str_diff_key (&setop->output_diff),
1111 debug_str_task_key (&task->key));
1114 if (NULL != output_rfn)
1116 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1117 #ifdef GNUNET_EXTRA_LOGGING
1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1120 session->local_peer_idx,
1121 debug_str_element (element),
1122 debug_str_rfn_key (&setop->output_rfn),
1123 debug_str_task_key (&task->key));
1127 case GNUNET_SET_STATUS_DONE:
1128 // XXX: check first if any changes to the underlying
1129 // set are still pending
1130 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131 "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1132 session->local_peer_idx,
1133 debug_str_task_key (&task->key),
1134 (unsigned int) task->step->finished_tasks,
1135 (unsigned int) task->step->tasks_len);
1136 if (NULL != output_rfn)
1138 rfn_commit (output_rfn, task_other_peer (task));
1140 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1142 session->first_size = current_size;
1146 case GNUNET_SET_STATUS_FAILURE:
1148 GNUNET_break_op (0);
1169 enum EvilnessSubType
1172 EVILNESS_SUB_REPLACEMENT,
1173 EVILNESS_SUB_NO_REPLACEMENT,
1178 enum EvilnessType type;
1179 enum EvilnessSubType subtype;
1185 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1187 if (0 == strcmp ("replace", evil_subtype_str))
1189 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1191 else if (0 == strcmp ("noreplace", evil_subtype_str))
1193 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1197 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1198 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1200 return GNUNET_SYSERR;
1207 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1211 char *evil_type_str = NULL;
1212 char *evil_subtype_str = NULL;
1214 GNUNET_assert (NULL != evil);
1216 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219 "P%u: no evilness\n",
1220 session->local_peer_idx);
1221 evil->type = EVILNESS_NONE;
1224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225 "P%u: got evilness spec\n",
1226 session->local_peer_idx);
1228 for (field = strtok (evil_spec, "/");
1230 field = strtok (NULL, "/"))
1232 unsigned int peer_num;
1233 unsigned int evil_num;
1236 evil_type_str = NULL;
1237 evil_subtype_str = NULL;
1239 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1243 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1244 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1250 GNUNET_assert (NULL != evil_type_str);
1251 GNUNET_assert (NULL != evil_subtype_str);
1253 if (peer_num == session->local_peer_idx)
1255 if (0 == strcmp ("slack", evil_type_str))
1257 evil->type = EVILNESS_SLACK;
1259 if (0 == strcmp ("slack-a2a", evil_type_str))
1261 evil->type = EVILNESS_SLACK_A2A;
1263 else if (0 == strcmp ("cram-all", evil_type_str))
1265 evil->type = EVILNESS_CRAM_ALL;
1266 evil->num = evil_num;
1267 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1270 else if (0 == strcmp ("cram-lead", evil_type_str))
1272 evil->type = EVILNESS_CRAM_LEAD;
1273 evil->num = evil_num;
1274 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1277 else if (0 == strcmp ("cram-echo", evil_type_str))
1279 evil->type = EVILNESS_CRAM_ECHO;
1280 evil->num = evil_num;
1281 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1286 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1287 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1293 /* No GNUNET_free since memory was allocated by libc */
1294 free (evil_type_str);
1295 evil_type_str = NULL;
1296 evil_subtype_str = NULL;
1299 evil->type = EVILNESS_NONE;
1301 GNUNET_free (evil_spec);
1302 /* no GNUNET_free_non_null since it wasn't
1303 * allocated with GNUNET_malloc */
1304 if (NULL != evil_type_str)
1305 free (evil_type_str);
1306 if (NULL != evil_subtype_str)
1307 free (evil_subtype_str);
1314 * Commit the appropriate set for a
1318 commit_set (struct ConsensusSession *session,
1319 struct TaskEntry *task)
1321 struct SetEntry *set;
1322 struct SetOpCls *setop = &task->cls.setop;
1324 GNUNET_assert (NULL != setop->op);
1325 set = lookup_set (session, &setop->input_set);
1326 GNUNET_assert (NULL != set);
1328 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1330 struct GNUNET_SET_Element element;
1331 struct ConsensusElement ce = { 0 };
1332 ce.marker = CONSENSUS_MARKER_CONTESTED;
1334 element.size = sizeof (struct ConsensusElement);
1335 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1336 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1339 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1341 struct GNUNET_SET_Element element;
1342 struct ConsensusSizeElement cse = {
1346 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1347 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1348 cse.size = GNUNET_htonll (session->first_size);
1349 cse.sender_index = session->local_peer_idx;
1350 element.data = &cse;
1351 element.size = sizeof (struct ConsensusSizeElement);
1352 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1353 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1359 struct Evilness evil;
1361 get_evilness (session, &evil);
1362 if (EVILNESS_NONE != evil.type)
1364 /* Useful for evaluation */
1365 GNUNET_STATISTICS_set (statistics,
1372 case EVILNESS_CRAM_ALL:
1373 case EVILNESS_CRAM_LEAD:
1374 case EVILNESS_CRAM_ECHO:
1375 /* We're not cramming elements in the
1376 all-to-all round, since that would just
1377 add more elements to the result set, but
1378 wouldn't test robustness. */
1379 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1381 GNUNET_SET_commit (setop->op, set->h);
1384 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1385 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1387 GNUNET_SET_commit (setop->op, set->h);
1390 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1392 GNUNET_SET_commit (setop->op, set->h);
1395 for (i = 0; i < evil.num; i++)
1397 struct GNUNET_SET_Element element;
1398 struct ConsensusStuffedElement se = {
1399 .ce.payload_type = 0,
1403 element.size = sizeof (struct ConsensusStuffedElement);
1404 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1406 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1408 /* Always generate a new element. */
1409 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1411 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1413 /* Always cram the same elements, derived from counter. */
1414 GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1420 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1421 #ifdef GNUNET_EXTRA_LOGGING
1422 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1423 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1424 session->local_peer_idx,
1425 debug_str_element (&element),
1426 debug_str_set_key (&setop->input_set),
1427 debug_str_task_key (&task->key));
1430 GNUNET_STATISTICS_update (statistics,
1431 "# stuffed elements",
1434 GNUNET_SET_commit (setop->op, set->h);
1436 case EVILNESS_SLACK:
1437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1438 "P%u: evil peer: slacking\n",
1439 (unsigned int) session->local_peer_idx);
1441 case EVILNESS_SLACK_A2A:
1442 if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1443 (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1445 struct GNUNET_SET_Handle *empty_set;
1446 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1447 GNUNET_SET_commit (setop->op, empty_set);
1448 GNUNET_SET_destroy (empty_set);
1452 GNUNET_SET_commit (setop->op, set->h);
1456 GNUNET_SET_commit (setop->op, set->h);
1461 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1463 GNUNET_SET_commit (setop->op, set->h);
1467 /* For our testcases, we don't want the blacklisted
1469 GNUNET_SET_operation_cancel (setop->op);
1478 put_diff (struct ConsensusSession *session,
1479 struct DiffEntry *diff)
1481 struct GNUNET_HashCode hash;
1483 GNUNET_assert (NULL != diff);
1485 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1486 GNUNET_assert (GNUNET_OK ==
1487 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1488 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1492 put_set (struct ConsensusSession *session,
1493 struct SetEntry *set)
1495 struct GNUNET_HashCode hash;
1497 GNUNET_assert (NULL != set->h);
1499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501 debug_str_set_key (&set->key));
1503 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1504 GNUNET_assert (GNUNET_SYSERR !=
1505 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1506 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1511 put_rfn (struct ConsensusSession *session,
1512 struct ReferendumEntry *rfn)
1514 struct GNUNET_HashCode hash;
1516 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1517 GNUNET_assert (GNUNET_OK ==
1518 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1519 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1525 task_cancel_reconcile (struct TaskEntry *task)
1527 /* not implemented yet */
1533 apply_diff_to_rfn (struct DiffEntry *diff,
1534 struct ReferendumEntry *rfn,
1535 uint16_t voting_peer,
1538 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1539 struct DiffElementInfo *di;
1541 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1543 while (GNUNET_YES ==
1544 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1546 (const void **) &di))
1550 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1554 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1558 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1565 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1567 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1574 diff_compose (struct DiffEntry *diff_1,
1575 struct DiffEntry *diff_2)
1577 struct DiffEntry *diff_new;
1578 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1579 struct DiffElementInfo *di;
1581 diff_new = diff_create ();
1583 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1584 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1586 diff_insert (diff_new, di->weight, di->element);
1588 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1590 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1591 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1593 diff_insert (diff_new, di->weight, di->element);
1595 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1601 struct ReferendumEntry *
1602 rfn_create (uint16_t size)
1604 struct ReferendumEntry *rfn;
1606 rfn = GNUNET_new (struct ReferendumEntry);
1607 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1608 rfn->peer_commited = GNUNET_new_array (size, int);
1609 rfn->peer_contested = GNUNET_new_array (size, int);
1610 rfn->num_peers = size;
1618 diff_destroy (struct DiffEntry *diff)
1620 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1627 * For a given majority, count what the outcome
1628 * is (add/remove/keep), and give the number
1629 * of peers that voted for this outcome.
1632 rfn_majority (const struct ReferendumEntry *rfn,
1633 const struct RfnElementInfo *ri,
1634 uint16_t *ret_majority,
1635 enum ReferendumVote *ret_vote)
1637 uint16_t votes_yes = 0;
1638 uint16_t num_commited = 0;
1641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642 "Computing rfn majority for element %s of rfn {%s}\n",
1643 debug_str_element (ri->element),
1644 debug_str_rfn_key (&rfn->key));
1646 for (i = 0; i < rfn->num_peers; i++)
1648 if (GNUNET_NO == rfn->peer_commited[i])
1652 if (GNUNET_YES == ri->votes[i])
1656 if (votes_yes > (num_commited) / 2)
1658 *ret_vote = ri->proposal;
1659 *ret_majority = votes_yes;
1663 *ret_vote = VOTE_STAY;
1664 *ret_majority = num_commited - votes_yes;
1671 struct TaskEntry *task;
1672 struct SetKey dst_set_key;
1677 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1679 struct SetCopyCls *scc = cls;
1680 struct TaskEntry *task = scc->task;
1681 struct SetKey dst_set_key = scc->dst_set_key;
1682 struct SetEntry *set;
1683 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1686 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1687 task->step->session->set_handles_tail,
1691 set = GNUNET_new (struct SetEntry);
1693 set->key = dst_set_key;
1694 put_set (task->step->session, set);
1701 * Call the start function of the given
1702 * task again after we created a copy of the given set.
1705 create_set_copy_for_task (struct TaskEntry *task,
1706 struct SetKey *src_set_key,
1707 struct SetKey *dst_set_key)
1709 struct SetEntry *src_set;
1710 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1713 "Copying set {%s} to {%s} for task {%s}\n",
1714 debug_str_set_key (src_set_key),
1715 debug_str_set_key (dst_set_key),
1716 debug_str_task_key (&task->key));
1719 scc->dst_set_key = *dst_set_key;
1720 src_set = lookup_set (task->step->session, src_set_key);
1721 GNUNET_assert (NULL != src_set);
1722 GNUNET_SET_copy_lazy (src_set->h,
1728 struct SetMutationProgressCls
1732 * Task to finish once all changes are through.
1734 struct TaskEntry *task;
1739 set_mutation_done (void *cls)
1741 struct SetMutationProgressCls *pc = cls;
1743 GNUNET_assert (pc->num_pending > 0);
1747 if (0 == pc->num_pending)
1749 struct TaskEntry *task = pc->task;
1757 try_finish_step_early (struct Step *step)
1761 if (GNUNET_YES == step->is_running)
1763 if (GNUNET_YES == step->is_finished)
1765 if (GNUNET_NO == step->early_finishable)
1768 step->is_finished = GNUNET_YES;
1770 #ifdef GNUNET_EXTRA_LOGGING
1771 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1772 "Finishing step `%s' early.\n",
1776 for (i = 0; i < step->subordinates_len; i++)
1778 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1779 step->subordinates[i]->pending_prereq--;
1780 #ifdef GNUNET_EXTRA_LOGGING
1781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782 "Decreased pending_prereq to %u for step `%s'.\n",
1783 (unsigned int) step->subordinates[i]->pending_prereq,
1784 step->subordinates[i]->debug_name);
1787 try_finish_step_early (step->subordinates[i]);
1790 // XXX: maybe schedule as task to avoid recursion?
1791 run_ready_steps (step->session);
1796 finish_step (struct Step *step)
1800 GNUNET_assert (step->finished_tasks == step->tasks_len);
1801 GNUNET_assert (GNUNET_YES == step->is_running);
1802 GNUNET_assert (GNUNET_NO == step->is_finished);
1804 #ifdef GNUNET_EXTRA_LOGGING
1805 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1806 "All tasks of step `%s' with %u subordinates finished.\n",
1808 step->subordinates_len);
1811 for (i = 0; i < step->subordinates_len; i++)
1813 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1814 step->subordinates[i]->pending_prereq--;
1815 #ifdef GNUNET_EXTRA_LOGGING
1816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1817 "Decreased pending_prereq to %u for step `%s'.\n",
1818 (unsigned int) step->subordinates[i]->pending_prereq,
1819 step->subordinates[i]->debug_name);
1824 step->is_finished = GNUNET_YES;
1826 // XXX: maybe schedule as task to avoid recursion?
1827 run_ready_steps (step->session);
1833 * Apply the result from one round of gradecasts (i.e. every peer
1834 * should have gradecasted) to the peer's current set.
1836 * @param task the task with context information
1839 task_start_apply_round (struct TaskEntry *task)
1841 struct ConsensusSession *session = task->step->session;
1842 struct SetKey sk_in;
1843 struct SetKey sk_out;
1844 struct RfnKey rk_in;
1845 struct SetEntry *set_out;
1846 struct ReferendumEntry *rfn_in;
1847 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1848 struct RfnElementInfo *ri;
1849 struct SetMutationProgressCls *progress_cls;
1850 uint16_t worst_majority = UINT16_MAX;
1852 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1853 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1854 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1856 set_out = lookup_set (session, &sk_out);
1857 if (NULL == set_out)
1859 create_set_copy_for_task (task, &sk_in, &sk_out);
1863 rfn_in = lookup_rfn (session, &rk_in);
1864 GNUNET_assert (NULL != rfn_in);
1866 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1867 progress_cls->task = task;
1869 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1871 while (GNUNET_YES ==
1872 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1874 (const void **) &ri))
1876 uint16_t majority_num;
1877 enum ReferendumVote majority_vote;
1879 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1881 if (worst_majority > majority_num)
1882 worst_majority = majority_num;
1884 switch (majority_vote)
1887 progress_cls->num_pending++;
1888 GNUNET_assert (GNUNET_OK ==
1889 GNUNET_SET_add_element (set_out->h,
1893 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1894 "P%u: apply round: adding element %s with %u-majority.\n",
1895 session->local_peer_idx,
1896 debug_str_element (ri->element), majority_num);
1899 progress_cls->num_pending++;
1900 GNUNET_assert (GNUNET_OK ==
1901 GNUNET_SET_remove_element (set_out->h,
1905 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906 "P%u: apply round: deleting element %s with %u-majority.\n",
1907 session->local_peer_idx,
1908 debug_str_element (ri->element), majority_num);
1911 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1912 "P%u: apply round: keeping element %s with %u-majority.\n",
1913 session->local_peer_idx,
1914 debug_str_element (ri->element), majority_num);
1923 if (0 == progress_cls->num_pending)
1925 // call closure right now, no pending ops
1926 GNUNET_free (progress_cls);
1931 uint16_t thresh = (session->num_peers / 3) * 2;
1933 if (worst_majority >= thresh)
1935 switch (session->early_stopping)
1937 case EARLY_STOPPING_NONE:
1938 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1939 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1940 "P%u: Stopping early (after one more superround)\n",
1941 session->local_peer_idx);
1943 case EARLY_STOPPING_ONE_MORE:
1944 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1945 session->local_peer_idx);
1946 session->early_stopping = EARLY_STOPPING_DONE;
1949 for (step = session->steps_head; NULL != step; step = step->next)
1950 try_finish_step_early (step);
1953 case EARLY_STOPPING_DONE:
1954 /* We shouldn't be here anymore after early stopping */
1962 else if (EARLY_STOPPING_NONE != session->early_stopping)
1964 // Our assumption about the number of bad peers
1966 GNUNET_break_op (0);
1970 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1971 session->local_peer_idx);
1974 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1979 task_start_grade (struct TaskEntry *task)
1981 struct ConsensusSession *session = task->step->session;
1982 struct ReferendumEntry *output_rfn;
1983 struct ReferendumEntry *input_rfn;
1984 struct DiffEntry *input_diff;
1985 struct RfnKey rfn_key;
1986 struct DiffKey diff_key;
1987 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1988 struct RfnElementInfo *ri;
1989 unsigned int gradecast_confidence = 2;
1991 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1992 output_rfn = lookup_rfn (session, &rfn_key);
1993 if (NULL == output_rfn)
1995 output_rfn = rfn_create (session->num_peers);
1996 output_rfn->key = rfn_key;
1997 put_rfn (session, output_rfn);
2000 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2001 input_diff = lookup_diff (session, &diff_key);
2002 GNUNET_assert (NULL != input_diff);
2004 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2005 input_rfn = lookup_rfn (session, &rfn_key);
2006 GNUNET_assert (NULL != input_rfn);
2008 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2010 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2012 while (GNUNET_YES ==
2013 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2015 (const void **) &ri))
2017 uint16_t majority_num;
2018 enum ReferendumVote majority_vote;
2020 // XXX: we need contested votes and non-contested votes here
2021 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2023 if (majority_num <= session->num_peers / 3)
2024 majority_vote = VOTE_REMOVE;
2026 switch (majority_vote)
2031 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2034 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2041 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2044 uint16_t noncontested;
2045 noncontested = rfn_noncontested (input_rfn);
2046 if (noncontested < (session->num_peers / 3) * 2)
2048 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2050 if (noncontested < (session->num_peers / 3) + 1)
2052 gradecast_confidence = 0;
2056 if (gradecast_confidence >= 1)
2057 rfn_commit (output_rfn, task->key.leader);
2059 if (gradecast_confidence <= 1)
2060 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2067 task_start_reconcile (struct TaskEntry *task)
2069 struct SetEntry *input;
2070 struct SetOpCls *setop = &task->cls.setop;
2071 struct ConsensusSession *session = task->step->session;
2073 input = lookup_set (session, &setop->input_set);
2074 GNUNET_assert (NULL != input);
2075 GNUNET_assert (NULL != input->h);
2077 /* We create the outputs for the operation here
2078 (rather than in the set operation callback)
2079 because we want something valid in there, even
2080 if the other peer doesn't talk to us */
2082 if (SET_KIND_NONE != setop->output_set.set_kind)
2084 /* If we don't have an existing output set,
2085 we clone the input set. */
2086 if (NULL == lookup_set (session, &setop->output_set))
2088 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2093 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2095 if (NULL == lookup_rfn (session, &setop->output_rfn))
2097 struct ReferendumEntry *rfn;
2099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2100 "P%u: output rfn <%s> missing, creating.\n",
2101 session->local_peer_idx,
2102 debug_str_rfn_key (&setop->output_rfn));
2104 rfn = rfn_create (session->num_peers);
2105 rfn->key = setop->output_rfn;
2106 put_rfn (session, rfn);
2110 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2112 if (NULL == lookup_diff (session, &setop->output_diff))
2114 struct DiffEntry *diff;
2116 diff = diff_create ();
2117 diff->key = setop->output_diff;
2118 put_diff (session, diff);
2122 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2124 /* XXX: mark the corresponding rfn as commited if necessary */
2129 if (task->key.peer1 == session->local_peer_idx)
2131 struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2133 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134 "P%u: Looking up set {%s} to run remote union\n",
2135 session->local_peer_idx,
2136 debug_str_set_key (&setop->input_set));
2138 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2139 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2141 rcm.kind = htons (task->key.kind);
2142 rcm.peer1 = htons (task->key.peer1);
2143 rcm.peer2 = htons (task->key.peer2);
2144 rcm.leader = htons (task->key.leader);
2145 rcm.repetition = htons (task->key.repetition);
2146 rcm.is_contested = htons (0);
2148 GNUNET_assert (NULL == setop->op);
2149 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2150 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2152 struct GNUNET_SET_Option opts[] = {
2153 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2154 { GNUNET_SET_OPTION_END },
2157 // XXX: maybe this should be done while
2158 // setting up tasks alreays?
2159 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2160 &session->global_id,
2162 GNUNET_SET_RESULT_SYMMETRIC,
2167 commit_set (session, task);
2169 else if (task->key.peer2 == session->local_peer_idx)
2171 /* Wait for the other peer to contact us */
2172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2173 session->local_peer_idx, task->key.peer1);
2175 if (NULL != setop->op)
2177 commit_set (session, task);
2182 /* We made an error while constructing the task graph. */
2189 task_start_eval_echo (struct TaskEntry *task)
2191 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2192 struct ReferendumEntry *input_rfn;
2193 struct RfnElementInfo *ri;
2194 struct SetEntry *output_set;
2195 struct SetMutationProgressCls *progress_cls;
2196 struct ConsensusSession *session = task->step->session;
2197 struct SetKey sk_in;
2198 struct SetKey sk_out;
2199 struct RfnKey rk_in;
2201 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2202 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2203 output_set = lookup_set (session, &sk_out);
2204 if (NULL == output_set)
2206 create_set_copy_for_task (task, &sk_in, &sk_out);
2212 // FIXME: should be marked as a shallow copy, so
2213 // we can destroy everything correctly
2214 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2215 last_set->h = output_set->h;
2216 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2217 put_set (session, last_set);
2220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2221 "Evaluating referendum in Task {%s}\n",
2222 debug_str_task_key (&task->key));
2224 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2225 progress_cls->task = task;
2227 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2228 input_rfn = lookup_rfn (session, &rk_in);
2230 GNUNET_assert (NULL != input_rfn);
2232 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2233 GNUNET_assert (NULL != iter);
2235 while (GNUNET_YES ==
2236 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2238 (const void **) &ri))
2240 enum ReferendumVote majority_vote;
2241 uint16_t majority_num;
2243 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2245 if (majority_num < session->num_peers / 3)
2247 /* It is not the case that all nonfaulty peers
2248 echoed the same value. Since we're doing a set reconciliation, we
2249 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2250 reconciliation as contested. Other peers might not know that the
2251 leader is faulty, thus we still re-distribute in the confirmation
2253 output_set->is_contested = GNUNET_YES;
2256 switch (majority_vote)
2259 progress_cls->num_pending++;
2260 GNUNET_assert (GNUNET_OK ==
2261 GNUNET_SET_add_element (output_set->h,
2267 progress_cls->num_pending++;
2268 GNUNET_assert (GNUNET_OK ==
2269 GNUNET_SET_remove_element (output_set->h,
2275 /* Nothing to do. */
2283 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2285 if (0 == progress_cls->num_pending)
2287 // call closure right now, no pending ops
2288 GNUNET_free (progress_cls);
2295 task_start_finish (struct TaskEntry *task)
2297 struct SetEntry *final_set;
2298 struct ConsensusSession *session = task->step->session;
2300 final_set = lookup_set (session, &task->cls.finish.input_set);
2302 GNUNET_assert (NULL != final_set);
2305 GNUNET_SET_iterate (final_set->h,
2306 send_to_client_iter,
2311 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2315 GNUNET_assert (GNUNET_NO == task->is_started);
2316 GNUNET_assert (GNUNET_NO == task->is_finished);
2317 GNUNET_assert (NULL != task->start);
2321 task->is_started = GNUNET_YES;
2328 * Run all steps of the session that don't any
2329 * more dependencies.
2332 run_ready_steps (struct ConsensusSession *session)
2336 step = session->steps_head;
2338 while (NULL != step)
2340 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2344 GNUNET_assert (0 == step->finished_tasks);
2346 #ifdef GNUNET_EXTRA_LOGGING
2347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2348 session->local_peer_idx,
2350 step->round, step->tasks_len, step->subordinates_len);
2353 step->is_running = GNUNET_YES;
2354 for (i = 0; i < step->tasks_len; i++)
2355 start_task (session, step->tasks[i]);
2357 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2358 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2361 /* Running the next ready steps will be triggered by task completion */
2373 finish_task (struct TaskEntry *task)
2375 GNUNET_assert (GNUNET_NO == task->is_finished);
2376 task->is_finished = GNUNET_YES;
2378 task->step->finished_tasks++;
2380 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2381 "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2382 task->step->session->local_peer_idx,
2383 debug_str_task_key (&task->key),
2384 (unsigned int) task->step->finished_tasks,
2385 (unsigned int) task->step->tasks_len);
2387 if (task->step->finished_tasks == task->step->tasks_len)
2388 finish_step (task->step);
2393 * Search peer in the list of peers in session.
2395 * @param peer peer to find
2396 * @param session session with peer
2397 * @return index of peer, -1 if peer is not in session
2400 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2403 for (i = 0; i < session->num_peers; i++)
2404 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2411 * Compute a global, (hopefully) unique consensus session id,
2412 * from the local id of the consensus session, and the identities of all participants.
2413 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2414 * exactly the same peers, the global id will be different.
2416 * @param session session to generate the global id for
2417 * @param local_session_id local id of the consensus session
2420 compute_global_id (struct ConsensusSession *session,
2421 const struct GNUNET_HashCode *local_session_id)
2423 const char *salt = "gnunet-service-consensus/session_id";
2425 GNUNET_assert (GNUNET_YES ==
2426 GNUNET_CRYPTO_kdf (&session->global_id,
2427 sizeof (struct GNUNET_HashCode),
2431 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2433 sizeof (struct GNUNET_HashCode),
2439 * Compare two peer identities.
2441 * @param h1 some peer identity
2442 * @param h2 some peer identity
2443 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2446 peer_id_cmp (const void *h1, const void *h2)
2448 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2453 * Create the sorted list of peers for the session,
2454 * add the local peer if not in the join message.
2456 * @param session session to initialize
2457 * @param join_msg join message with the list of peers participating at the end
2460 initialize_session_peer_list (struct ConsensusSession *session,
2461 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2463 const struct GNUNET_PeerIdentity *msg_peers
2464 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2465 int local_peer_in_list;
2467 session->num_peers = ntohl (join_msg->num_peers);
2469 /* Peers in the join message, may or may not include the local peer,
2470 Add it if it is missing. */
2471 local_peer_in_list = GNUNET_NO;
2472 for (unsigned int i = 0; i < session->num_peers; i++)
2474 if (0 == memcmp (&msg_peers[i],
2476 sizeof (struct GNUNET_PeerIdentity)))
2478 local_peer_in_list = GNUNET_YES;
2482 if (GNUNET_NO == local_peer_in_list)
2483 session->num_peers++;
2485 session->peers = GNUNET_new_array (session->num_peers,
2486 struct GNUNET_PeerIdentity);
2487 if (GNUNET_NO == local_peer_in_list)
2488 session->peers[session->num_peers - 1] = my_peer;
2490 GNUNET_memcpy (session->peers,
2492 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2493 qsort (session->peers,
2495 sizeof (struct GNUNET_PeerIdentity),
2500 static struct TaskEntry *
2501 lookup_task (struct ConsensusSession *session,
2502 struct TaskKey *key)
2504 struct GNUNET_HashCode hash;
2507 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2509 GNUNET_h2s (&hash));
2510 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2515 * Called when another peer wants to do a set operation with the
2518 * @param cls closure
2519 * @param other_peer the other peer
2520 * @param context_msg message with application specific information from
2522 * @param request request from the other peer, use GNUNET_SET_accept
2523 * to accept it, otherwise the request will be refused
2524 * Note that we don't use a return value here, as it is also
2525 * necessary to specify the set we want to do the operation with,
2526 * whith sometimes can be derived from the context message.
2527 * Also necessary to specify the timeout.
2530 set_listen_cb (void *cls,
2531 const struct GNUNET_PeerIdentity *other_peer,
2532 const struct GNUNET_MessageHeader *context_msg,
2533 struct GNUNET_SET_Request *request)
2535 struct ConsensusSession *session = cls;
2537 struct TaskEntry *task;
2538 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2540 if (NULL == context_msg)
2542 GNUNET_break_op (0);
2546 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2548 GNUNET_break_op (0);
2552 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2554 GNUNET_break_op (0);
2558 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2560 tk = ((struct TaskKey) {
2561 .kind = ntohs (cm->kind),
2562 .peer1 = ntohs (cm->peer1),
2563 .peer2 = ntohs (cm->peer2),
2564 .repetition = ntohs (cm->repetition),
2565 .leader = ntohs (cm->leader),
2568 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2569 session->local_peer_idx, debug_str_task_key (&tk));
2571 task = lookup_task (session, &tk);
2575 GNUNET_break_op (0);
2579 if (GNUNET_YES == task->is_finished)
2581 GNUNET_break_op (0);
2585 if (task->key.peer2 != session->local_peer_idx)
2587 /* We're being asked, so we must be thne 2nd peer. */
2588 GNUNET_break_op (0);
2592 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2593 (task->key.peer2 == session->local_peer_idx)));
2595 struct GNUNET_SET_Option opts[] = {
2596 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2597 { GNUNET_SET_OPTION_END },
2600 task->cls.setop.op = GNUNET_SET_accept (request,
2601 GNUNET_SET_RESULT_SYMMETRIC,
2606 /* If the task hasn't been started yet,
2607 we wait for that until we commit. */
2609 if (GNUNET_YES == task->is_started)
2611 commit_set (session, task);
2618 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2619 struct TaskEntry *t)
2621 struct GNUNET_HashCode round_hash;
2624 GNUNET_assert (NULL != t->step);
2626 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2630 if (s->tasks_len == s->tasks_cap)
2632 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2633 GNUNET_array_grow (s->tasks,
2638 #ifdef GNUNET_EXTRA_LOGGING
2639 GNUNET_assert (NULL != s->debug_name);
2640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2641 debug_str_task_key (&t->key),
2645 s->tasks[s->tasks_len] = t;
2648 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2649 GNUNET_assert (GNUNET_OK ==
2650 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2651 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2656 install_step_timeouts (struct ConsensusSession *session)
2658 /* Given the fully constructed task graph
2659 with rounds for tasks, we can give the tasks timeouts. */
2661 // unsigned int max_round;
2663 /* XXX: implement! */
2669 * Arrange two peers in some canonical order.
2672 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2677 GNUNET_assert (*p1 < n);
2678 GNUNET_assert (*p2 < n);
2691 /* For uniformly random *p1, *p2,
2692 this condition is true with 50% chance */
2693 if (((b - a) + n) % n <= n / 2)
2707 * Record @a dep as a dependency of @a step.
2710 step_depend_on (struct Step *step, struct Step *dep)
2712 /* We're not checking for cyclic dependencies,
2713 but this is a cheap sanity check. */
2714 GNUNET_assert (step != dep);
2715 GNUNET_assert (NULL != step);
2716 GNUNET_assert (NULL != dep);
2717 GNUNET_assert (dep->round <= step->round);
2719 #ifdef GNUNET_EXTRA_LOGGING
2720 /* Make sure we have complete debugging information.
2721 Also checks that we don't screw up too badly
2722 constructing the task graph. */
2723 GNUNET_assert (NULL != step->debug_name);
2724 GNUNET_assert (NULL != dep->debug_name);
2725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2726 "Making step `%s' depend on `%s'\n",
2731 if (dep->subordinates_cap == dep->subordinates_len)
2733 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2734 GNUNET_array_grow (dep->subordinates,
2735 dep->subordinates_cap,
2739 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2741 dep->subordinates[dep->subordinates_len] = step;
2742 dep->subordinates_len++;
2744 step->pending_prereq++;
2748 static struct Step *
2749 create_step (struct ConsensusSession *session, int round, int early_finishable)
2752 step = GNUNET_new (struct Step);
2753 step->session = session;
2754 step->round = round;
2755 step->early_finishable = early_finishable;
2756 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2757 session->steps_tail,
2764 * Construct the task graph for a single
2768 construct_task_graph_gradecast (struct ConsensusSession *session,
2771 struct Step *step_before,
2772 struct Step *step_after)
2774 uint16_t n = session->num_peers;
2775 uint16_t me = session->local_peer_idx;
2780 /* The task we're currently setting up. */
2781 struct TaskEntry task;
2784 struct Step *prev_step;
2790 round = step_before->round + 1;
2792 /* gcast step 1: leader disseminates */
2794 step = create_step (session, round, GNUNET_YES);
2796 #ifdef GNUNET_EXTRA_LOGGING
2797 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2799 step_depend_on (step, step_before);
2803 for (k = 0; k < n; k++)
2809 arrange_peers (&p1, &p2, n);
2810 task = ((struct TaskEntry) {
2812 .start = task_start_reconcile,
2813 .cancel = task_cancel_reconcile,
2814 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2816 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2817 put_task (session->taskmap, &task);
2819 /* We run this task to make sure that the leader
2820 has the stored the SET_KIND_LEADER set of himself,
2821 so he can participate in the rest of the gradecast
2822 without the code having to handle any special cases. */
2823 task = ((struct TaskEntry) {
2825 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2826 .start = task_start_reconcile,
2827 .cancel = task_cancel_reconcile,
2829 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2830 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2831 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2832 put_task (session->taskmap, &task);
2838 arrange_peers (&p1, &p2, n);
2839 task = ((struct TaskEntry) {
2841 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2842 .start = task_start_reconcile,
2843 .cancel = task_cancel_reconcile,
2845 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2846 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2847 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2848 put_task (session->taskmap, &task);
2851 /* gcast phase 2: echo */
2854 step = create_step (session, round, GNUNET_YES);
2855 #ifdef GNUNET_EXTRA_LOGGING
2856 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2858 step_depend_on (step, prev_step);
2860 for (k = 0; k < n; k++)
2864 arrange_peers (&p1, &p2, n);
2865 task = ((struct TaskEntry) {
2867 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2868 .start = task_start_reconcile,
2869 .cancel = task_cancel_reconcile,
2871 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2872 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2873 put_task (session->taskmap, &task);
2877 /* Same round, since step only has local tasks */
2878 step = create_step (session, round, GNUNET_YES);
2879 #ifdef GNUNET_EXTRA_LOGGING
2880 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2882 step_depend_on (step, prev_step);
2884 arrange_peers (&p1, &p2, n);
2885 task = ((struct TaskEntry) {
2886 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2888 .start = task_start_eval_echo
2890 put_task (session->taskmap, &task);
2894 step = create_step (session, round, GNUNET_YES);
2895 #ifdef GNUNET_EXTRA_LOGGING
2896 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2898 step_depend_on (step, prev_step);
2900 /* gcast phase 3: confirmation and grading */
2901 for (k = 0; k < n; k++)
2905 arrange_peers (&p1, &p2, n);
2906 task = ((struct TaskEntry) {
2908 .start = task_start_reconcile,
2909 .cancel = task_cancel_reconcile,
2910 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2912 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2913 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2914 /* If there was at least one element in the echo round that was
2915 contested (i.e. it had no n-t majority), then we let the other peers
2916 know, and other peers let us know. The contested flag for each peer is
2917 stored in the rfn. */
2918 task.cls.setop.transceive_contested = GNUNET_YES;
2919 put_task (session->taskmap, &task);
2923 /* Same round, since step only has local tasks */
2924 step = create_step (session, round, GNUNET_YES);
2925 #ifdef GNUNET_EXTRA_LOGGING
2926 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2928 step_depend_on (step, prev_step);
2930 task = ((struct TaskEntry) {
2932 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2933 .start = task_start_grade,
2935 put_task (session->taskmap, &task);
2937 step_depend_on (step_after, step);
2942 construct_task_graph (struct ConsensusSession *session)
2944 uint16_t n = session->num_peers;
2947 uint16_t me = session->local_peer_idx;
2949 /* The task we're currently setting up. */
2950 struct TaskEntry task;
2952 /* Current leader */
2956 struct Step *prev_step;
2958 unsigned int round = 0;
2962 // XXX: introduce first step,
2963 // where we wait for all insert acks
2964 // from the set service
2966 /* faster but brittle all-to-all */
2968 // XXX: Not implemented yet
2970 /* all-to-all step */
2972 step = create_step (session, round, GNUNET_NO);
2974 #ifdef GNUNET_EXTRA_LOGGING
2975 step->debug_name = GNUNET_strdup ("all to all");
2978 for (i = 0; i < n; i++)
2985 arrange_peers (&p1, &p2, n);
2986 task = ((struct TaskEntry) {
2987 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2989 .start = task_start_reconcile,
2990 .cancel = task_cancel_reconcile,
2992 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2993 task.cls.setop.output_set = task.cls.setop.input_set;
2994 task.cls.setop.do_not_remove = GNUNET_YES;
2995 put_task (session->taskmap, &task);
3000 step = create_step (session, round, GNUNET_NO);;
3001 #ifdef GNUNET_EXTRA_LOGGING
3002 step->debug_name = GNUNET_strdup ("all to all 2");
3004 step_depend_on (step, prev_step);
3007 for (i = 0; i < n; i++)
3014 arrange_peers (&p1, &p2, n);
3015 task = ((struct TaskEntry) {
3016 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3018 .start = task_start_reconcile,
3019 .cancel = task_cancel_reconcile,
3021 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3022 task.cls.setop.output_set = task.cls.setop.input_set;
3023 task.cls.setop.do_not_remove = GNUNET_YES;
3024 put_task (session->taskmap, &task);
3034 /* Byzantine union */
3036 /* sequential repetitions of the gradecasts */
3037 for (i = 0; i < t + 1; i++)
3039 struct Step *step_rep_start;
3040 struct Step *step_rep_end;
3042 /* Every repetition is in a separate round. */
3043 step_rep_start = create_step (session, round, GNUNET_YES);
3044 #ifdef GNUNET_EXTRA_LOGGING
3045 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3048 step_depend_on (step_rep_start, prev_step);
3050 /* gradecast has three rounds */
3052 step_rep_end = create_step (session, round, GNUNET_YES);
3053 #ifdef GNUNET_EXTRA_LOGGING
3054 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3057 /* parallel gradecasts */
3058 for (lead = 0; lead < n; lead++)
3059 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3061 task = ((struct TaskEntry) {
3062 .step = step_rep_end,
3063 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3064 .start = task_start_apply_round,
3066 put_task (session->taskmap, &task);
3068 prev_step = step_rep_end;
3071 /* There is no next gradecast round, thus the final
3072 start step is the overall end step of the gradecasts */
3074 step = create_step (session, round, GNUNET_NO);
3075 #ifdef GNUNET_EXTRA_LOGGING
3076 GNUNET_asprintf (&step->debug_name, "finish");
3078 step_depend_on (step, prev_step);
3080 task = ((struct TaskEntry) {
3082 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3083 .start = task_start_finish,
3085 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3087 put_task (session->taskmap, &task);
3093 * Check join message.
3095 * @param cls session of client that sent the message
3096 * @param m message sent by the client
3097 * @return #GNUNET_OK if @a m is well-formed
3100 check_client_join (void *cls,
3101 const struct GNUNET_CONSENSUS_JoinMessage *m)
3103 uint32_t listed_peers = ntohl (m->num_peers);
3105 if ( (ntohs (m->header.size) - sizeof (*m)) !=
3106 listed_peers * sizeof (struct GNUNET_PeerIdentity))
3109 return GNUNET_SYSERR;
3116 * Called when a client wants to join a consensus session.
3118 * @param cls session of client that sent the message
3119 * @param m message sent by the client
3122 handle_client_join (void *cls,
3123 const struct GNUNET_CONSENSUS_JoinMessage *m)
3125 struct ConsensusSession *session = cls;
3126 struct ConsensusSession *other_session;
3128 initialize_session_peer_list (session,
3130 compute_global_id (session,
3133 /* Check if some local client already owns the session.
3134 It is only legal to have a session with an existing global id
3135 if all other sessions with this global id are finished.*/
3136 for (other_session = sessions_head;
3137 NULL != other_session;
3138 other_session = other_session->next)
3140 if ( (other_session != session) &&
3141 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3142 &other_session->global_id)) )
3146 session->conclude_deadline
3147 = GNUNET_TIME_absolute_ntoh (m->deadline);
3148 session->conclude_start
3149 = GNUNET_TIME_absolute_ntoh (m->start);
3150 session->local_peer_idx = get_peer_idx (&my_peer,
3152 GNUNET_assert (-1 != session->local_peer_idx);
3154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3155 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3156 GNUNET_h2s (&m->session_id),
3158 session->local_peer_idx,
3159 GNUNET_STRINGS_relative_time_to_string
3160 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3161 session->conclude_deadline),
3164 session->set_listener
3165 = GNUNET_SET_listen (cfg,
3166 GNUNET_SET_OPERATION_UNION,
3167 &session->global_id,
3171 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3173 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3175 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3177 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3181 struct SetEntry *client_set;
3183 client_set = GNUNET_new (struct SetEntry);
3184 client_set->h = GNUNET_SET_create (cfg,
3185 GNUNET_SET_OPERATION_UNION);
3186 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3187 sh->h = client_set->h;
3188 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3189 session->set_handles_tail,
3191 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3196 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3199 /* Just construct the task graph,
3200 but don't run anything until the client calls conclude. */
3201 construct_task_graph (session);
3202 GNUNET_SERVICE_client_continue (session->client);
3207 client_insert_done (void *cls)
3214 * Called when a client performs an insert operation.
3216 * @param cls client handle
3217 * @param msg message sent by the client
3218 * @return #GNUNET_OK (always well-formed)
3221 check_client_insert (void *cls,
3222 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3229 * Called when a client performs an insert operation.
3231 * @param cls client handle
3232 * @param msg message sent by the client
3235 handle_client_insert (void *cls,
3236 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3238 struct ConsensusSession *session = cls;
3239 ssize_t element_size;
3240 struct GNUNET_SET_Handle *initial_set;
3241 struct ConsensusElement *ce;
3243 if (GNUNET_YES == session->conclude_started)
3246 GNUNET_SERVICE_client_drop (session->client);
3250 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3251 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3252 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3253 ce->payload_type = msg->element_type;
3255 struct GNUNET_SET_Element element = {
3256 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3257 .size = sizeof (struct ConsensusElement) + element_size,
3262 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3263 struct SetEntry *entry;
3265 entry = lookup_set (session,
3267 GNUNET_assert (NULL != entry);
3268 initial_set = entry->h;
3271 session->num_client_insert_pending++;
3272 GNUNET_SET_add_element (initial_set,
3274 &client_insert_done,
3277 #ifdef GNUNET_EXTRA_LOGGING
3279 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3280 "P%u: element %s added\n",
3281 session->local_peer_idx,
3282 debug_str_element (&element));
3286 GNUNET_SERVICE_client_continue (session->client);
3291 * Called when a client performs the conclude operation.
3293 * @param cls client handle
3294 * @param message message sent by the client
3297 handle_client_conclude (void *cls,
3298 const struct GNUNET_MessageHeader *message)
3300 struct ConsensusSession *session = cls;
3302 if (GNUNET_YES == session->conclude_started)
3304 /* conclude started twice */
3306 GNUNET_SERVICE_client_drop (session->client);
3309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3310 "conclude requested\n");
3311 session->conclude_started = GNUNET_YES;
3312 install_step_timeouts (session);
3313 run_ready_steps (session);
3314 GNUNET_SERVICE_client_continue (session->client);
3319 * Called to clean up, after a shutdown has been requested.
3321 * @param cls closure
3324 shutdown_task (void *cls)
3326 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3328 GNUNET_STATISTICS_destroy (statistics,
3335 * Start processing consensus requests.
3337 * @param cls closure
3338 * @param c configuration to use
3339 * @param service the initialized service
3343 const struct GNUNET_CONFIGURATION_Handle *c,
3344 struct GNUNET_SERVICE_Handle *service)
3348 GNUNET_CRYPTO_get_peer_identity (cfg,
3351 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3352 "Could not retrieve host identity\n");
3353 GNUNET_SCHEDULER_shutdown ();
3356 statistics = GNUNET_STATISTICS_create ("consensus",
3358 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3364 * Callback called when a client connects to the service.
3366 * @param cls closure for the service
3367 * @param c the new client that connected to the service
3368 * @param mq the message queue used to send messages to the client
3372 client_connect_cb (void *cls,
3373 struct GNUNET_SERVICE_Client *c,
3374 struct GNUNET_MQ_Handle *mq)
3376 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3378 session->client = c;
3379 session->client_mq = mq;
3380 GNUNET_CONTAINER_DLL_insert (sessions_head,
3388 * Callback called when a client disconnected from the service
3390 * @param cls closure for the service
3391 * @param c the client that disconnected
3392 * @param internal_cls should be equal to @a c
3395 client_disconnect_cb (void *cls,
3396 struct GNUNET_SERVICE_Client *c,
3399 struct ConsensusSession *session = internal_cls;
3401 if (NULL != session->set_listener)
3403 GNUNET_SET_listen_cancel (session->set_listener);
3404 session->set_listener = NULL;
3406 GNUNET_CONTAINER_DLL_remove (sessions_head,
3410 while (session->set_handles_head)
3412 struct SetHandle *sh = session->set_handles_head;
3413 session->set_handles_head = sh->next;
3414 GNUNET_SET_destroy (sh->h);
3417 GNUNET_free (session);
3422 * Define "main" method using service macro.
3426 GNUNET_SERVICE_OPTION_NONE,
3429 &client_disconnect_cb,
3431 GNUNET_MQ_hd_fixed_size (client_conclude,
3432 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3433 struct GNUNET_MessageHeader,
3435 GNUNET_MQ_hd_var_size (client_insert,
3436 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3437 struct GNUNET_CONSENSUS_ElementMessage,
3439 GNUNET_MQ_hd_var_size (client_join,
3440 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3441 struct GNUNET_CONSENSUS_JoinMessage,
3443 GNUNET_MQ_handler_end ());
3445 /* end of gnunet-service-consensus.c */