2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file consensus/gnunet-service-consensus.c
21 * @brief multi-peer set reconciliation
22 * @author Florian Dold <flo@dold.me>
26 #include "gnunet_util_lib.h"
27 #include "gnunet_block_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_applications.h"
30 #include "gnunet_set_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
40 * Vote that nothing should change.
41 * This option is never voted explicitly.
45 * Vote that an element should be added.
49 * Vote that an element should be removed.
55 enum EarlyStoppingPhase
57 EARLY_STOPPING_NONE = 0,
58 EARLY_STOPPING_ONE_MORE = 1,
59 EARLY_STOPPING_DONE = 2,
63 GNUNET_NETWORK_STRUCT_BEGIN
66 * Tuple of integers that together
67 * identify a task uniquely.
71 * A value from 'enum PhaseKind'.
73 uint16_t kind GNUNET_PACKED;
76 * Number of the first peer
79 int16_t peer1 GNUNET_PACKED;
82 * Number of the second peer in canonical order.
84 int16_t peer2 GNUNET_PACKED;
87 * Repetition of the gradecast phase.
89 int16_t repetition GNUNET_PACKED;
92 * Leader in the gradecast phase.
94 * Can be different from both peer1 and peer2.
96 int16_t leader GNUNET_PACKED;
103 int set_kind GNUNET_PACKED;
104 int k1 GNUNET_PACKED;
105 int k2 GNUNET_PACKED;
112 struct GNUNET_SET_Handle *h;
114 * GNUNET_YES if the set resulted
115 * from applying a referendum with contested
124 int diff_kind GNUNET_PACKED;
125 int k1 GNUNET_PACKED;
126 int k2 GNUNET_PACKED;
131 int rfn_kind GNUNET_PACKED;
132 int k1 GNUNET_PACKED;
133 int k2 GNUNET_PACKED;
137 GNUNET_NETWORK_STRUCT_END
141 PHASE_KIND_ALL_TO_ALL,
142 PHASE_KIND_ALL_TO_ALL_2,
143 PHASE_KIND_GRADECAST_LEADER,
144 PHASE_KIND_GRADECAST_ECHO,
145 PHASE_KIND_GRADECAST_ECHO_GRADE,
146 PHASE_KIND_GRADECAST_CONFIRM,
147 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
149 * Apply a repetition of the all-to-all
150 * gradecast to the current set.
152 PHASE_KIND_APPLY_REP,
162 * Last result set from a gradecast
164 SET_KIND_LAST_GRADECAST,
165 SET_KIND_LEADER_PROPOSAL,
166 SET_KIND_ECHO_RESULT,
172 DIFF_KIND_LEADER_PROPOSAL,
173 DIFF_KIND_LEADER_CONSENSUS,
174 DIFF_KIND_GRADECAST_RESULT,
182 RFN_KIND_GRADECAST_RESULT
188 struct SetKey input_set;
190 struct SetKey output_set;
191 struct RfnKey output_rfn;
192 struct DiffKey output_diff;
196 int transceive_contested;
198 struct GNUNET_SET_OperationHandle *op;
204 struct SetKey input_set;
208 * Closure for both @a start_task
209 * and @a cancel_task.
213 struct SetOpCls setop;
214 struct FinishCls finish;
219 typedef void (*TaskFunc) (struct TaskEntry *task);
222 * Node in the consensus task graph.
237 union TaskFuncCls cls;
244 * All steps of one session are in a
245 * linked list for easier deallocation.
250 * All steps of one session are in a
251 * linked list for easier deallocation.
255 struct ConsensusSession *session;
258 * Tasks that this step is composed of.
260 struct TaskEntry **tasks;
261 unsigned int tasks_len;
262 unsigned int tasks_cap;
264 unsigned int finished_tasks;
267 * Tasks that have this task as dependency.
269 * We store pointers to subordinates rather
270 * than to prerequisites since it makes
271 * tracking the readiness of a task easier.
273 struct Step **subordinates;
274 unsigned int subordinates_len;
275 unsigned int subordinates_cap;
278 * Counter for the prerequisites of
281 size_t pending_prereq;
284 * Task that will run this step despite
285 * any pending prerequisites.
287 struct GNUNET_SCHEDULER_Task *timeout_task;
289 unsigned int is_running;
291 unsigned int is_finished;
294 * Synchrony round of the task.
295 * Determines the deadline for the task.
300 * Human-readable name for
301 * the task, used for debugging.
306 * When we're doing an early finish, how should this step be
308 * If GNUNET_YES, the step will be marked as finished
309 * without actually running its tasks.
310 * Otherwise, the step will still be run even after
313 * Note that a task may never be finished early if
314 * it is already running.
316 int early_finishable;
320 struct RfnElementInfo
322 const struct GNUNET_SET_Element *element;
325 * GNUNET_YES if the peer votes for the proposal.
330 * Proposal for this element,
331 * can only be VOTE_ADD or VOTE_REMOVE.
333 enum ReferendumVote proposal;
337 struct ReferendumEntry
342 * Elements where there is at least one proposed change.
344 * Maps the hash of the GNUNET_SET_Element
345 * to 'struct RfnElementInfo'.
347 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
349 unsigned int num_peers;
352 * Stores, for every peer in the session,
353 * whether the peer finished the whole referendum.
355 * Votes from peers are only counted if they're
356 * marked as commited (#GNUNET_YES) in the referendum.
358 * Otherwise (#GNUNET_NO), the requested changes are
359 * not counted for majority votes or thresholds.
365 * Contestation state of the peer. If a peer is contested, the values it
366 * contributed are still counted for applying changes, but the grading is
373 struct DiffElementInfo
375 const struct GNUNET_SET_Element *element;
378 * Positive weight for 'add', negative
379 * weights for 'remove'.
391 struct GNUNET_CONTAINER_MultiHashMap *changes;
396 struct SetHandle *prev;
397 struct SetHandle *next;
399 struct GNUNET_SET_Handle *h;
405 * A consensus session consists of one local client and the remote authorities.
407 struct ConsensusSession
410 * Consensus sessions are kept in a DLL.
412 struct ConsensusSession *next;
415 * Consensus sessions are kept in a DLL.
417 struct ConsensusSession *prev;
419 unsigned int num_client_insert_pending;
421 struct GNUNET_CONTAINER_MultiHashMap *setmap;
422 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
423 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
426 * Array of peers with length 'num_peers'.
428 int *peers_blacklisted;
431 * Mapping from (hashed) TaskKey to TaskEntry.
433 * We map the application_id for a round to the task that should be
434 * executed, so we don't have to go through all task whenever we get
435 * an incoming set op request.
437 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
439 struct Step *steps_head;
440 struct Step *steps_tail;
442 int conclude_started;
447 * Global consensus identification, computed
448 * from the session id and participating authorities.
450 struct GNUNET_HashCode global_id;
453 * Client that inhabits the session
455 struct GNUNET_SERVICE_Client *client;
458 * Queued messages to the client.
460 struct GNUNET_MQ_Handle *client_mq;
463 * Time when the conclusion of the consensus should begin.
465 struct GNUNET_TIME_Absolute conclude_start;
468 * Timeout for all rounds together, single rounds will schedule a timeout task
469 * with a fraction of the conclude timeout.
470 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
472 struct GNUNET_TIME_Absolute conclude_deadline;
474 struct GNUNET_PeerIdentity *peers;
477 * Number of other peers in the consensus.
479 unsigned int num_peers;
482 * Index of the local peer in the peers array
484 unsigned int local_peer_idx;
487 * Listener for requests from other peers.
488 * Uses the session's global id as app id.
490 struct GNUNET_SET_ListenHandle *set_listener;
493 * State of our early stopping scheme.
498 * Our set size from the first round.
502 uint64_t *first_sizes_received;
505 * Bounded Eppstein lower bound.
507 uint64_t lower_bound;
509 struct SetHandle *set_handles_head;
510 struct SetHandle *set_handles_tail;
514 * Linked list of sessions this peer participates in.
516 static struct ConsensusSession *sessions_head;
519 * Linked list of sessions this peer participates in.
521 static struct ConsensusSession *sessions_tail;
524 * Configuration of the consensus service.
526 static const struct GNUNET_CONFIGURATION_Handle *cfg;
529 * Peer that runs this service.
531 static struct GNUNET_PeerIdentity my_peer;
536 struct GNUNET_STATISTICS_Handle *statistics;
540 finish_task (struct TaskEntry *task);
544 run_ready_steps (struct ConsensusSession *session);
548 phasename (uint16_t phase)
552 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
553 case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
554 case PHASE_KIND_FINISH: return "FINISH";
555 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
556 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
557 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
558 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
559 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
560 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
561 default: return "(unknown)";
567 setname (uint16_t kind)
571 case SET_KIND_CURRENT: return "CURRENT";
572 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
573 case SET_KIND_NONE: return "NONE";
574 default: return "(unknown)";
579 rfnname (uint16_t kind)
583 case RFN_KIND_NONE: return "NONE";
584 case RFN_KIND_ECHO: return "ECHO";
585 case RFN_KIND_CONFIRM: return "CONFIRM";
586 default: return "(unknown)";
591 diffname (uint16_t kind)
595 case DIFF_KIND_NONE: return "NONE";
596 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
597 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
598 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
599 default: return "(unknown)";
603 #ifdef GNUNET_EXTRA_LOGGING
607 debug_str_element (const struct GNUNET_SET_Element *el)
609 struct GNUNET_HashCode hash;
611 GNUNET_SET_element_hash (el, &hash);
613 return GNUNET_h2s (&hash);
617 debug_str_task_key (struct TaskKey *tk)
619 static char buf[256];
621 snprintf (buf, sizeof (buf),
622 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
623 phasename (tk->kind), tk->peer1, tk->peer2,
624 tk->leader, tk->repetition);
630 debug_str_diff_key (struct DiffKey *dk)
632 static char buf[256];
634 snprintf (buf, sizeof (buf),
635 "DiffKey kind=%s, k1=%d, k2=%d",
636 diffname (dk->diff_kind), dk->k1, dk->k2);
642 debug_str_set_key (const struct SetKey *sk)
644 static char buf[256];
646 snprintf (buf, sizeof (buf),
647 "SetKey kind=%s, k1=%d, k2=%d",
648 setname (sk->set_kind), sk->k1, sk->k2);
655 debug_str_rfn_key (const struct RfnKey *rk)
657 static char buf[256];
659 snprintf (buf, sizeof (buf),
660 "RfnKey kind=%s, k1=%d, k2=%d",
661 rfnname (rk->rfn_kind), rk->k1, rk->k2);
666 #endif /* GNUNET_EXTRA_LOGGING */
670 * Send the final result set of the consensus to the client, element by
674 * @param element the current element, NULL if all elements have been
676 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
679 send_to_client_iter (void *cls,
680 const struct GNUNET_SET_Element *element)
682 struct TaskEntry *task = (struct TaskEntry *) cls;
683 struct ConsensusSession *session = task->step->session;
684 struct GNUNET_MQ_Envelope *ev;
688 struct GNUNET_CONSENSUS_ElementMessage *m;
689 const struct ConsensusElement *ce;
691 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
694 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
699 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
700 "P%d: sending element %s to client\n",
701 session->local_peer_idx,
702 debug_str_element (element));
704 ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
705 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
706 m->element_type = ce->payload_type;
707 GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
708 GNUNET_MQ_send (session->client_mq, ev);
712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713 "P%d: finished iterating elements for client\n",
714 session->local_peer_idx);
715 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
716 GNUNET_MQ_send (session->client_mq, ev);
722 static struct SetEntry *
723 lookup_set (struct ConsensusSession *session, struct SetKey *key)
725 struct GNUNET_HashCode hash;
727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728 "P%u: looking up set {%s}\n",
729 session->local_peer_idx,
730 debug_str_set_key (key));
732 GNUNET_assert (SET_KIND_NONE != key->set_kind);
733 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
734 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
738 static struct DiffEntry *
739 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
741 struct GNUNET_HashCode hash;
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "P%u: looking up diff {%s}\n",
745 session->local_peer_idx,
746 debug_str_diff_key (key));
748 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
749 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
750 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
754 static struct ReferendumEntry *
755 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
757 struct GNUNET_HashCode hash;
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760 "P%u: looking up rfn {%s}\n",
761 session->local_peer_idx,
762 debug_str_rfn_key (key));
764 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
765 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
766 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
771 diff_insert (struct DiffEntry *diff,
773 const struct GNUNET_SET_Element *element)
775 struct DiffElementInfo *di;
776 struct GNUNET_HashCode hash;
778 GNUNET_assert ( (1 == weight) || (-1 == weight));
780 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781 "diff_insert with element size %u\n",
784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785 "hashing element\n");
787 GNUNET_SET_element_hash (element, &hash);
789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
796 di = GNUNET_new (struct DiffElementInfo);
797 di->element = GNUNET_SET_element_dup (element);
798 GNUNET_assert (GNUNET_OK ==
799 GNUNET_CONTAINER_multihashmap_put (diff->changes,
801 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
809 rfn_commit (struct ReferendumEntry *rfn,
810 uint16_t commit_peer)
812 GNUNET_assert (commit_peer < rfn->num_peers);
814 rfn->peer_commited[commit_peer] = GNUNET_YES;
819 rfn_contest (struct ReferendumEntry *rfn,
820 uint16_t contested_peer)
822 GNUNET_assert (contested_peer < rfn->num_peers);
824 rfn->peer_contested[contested_peer] = GNUNET_YES;
829 rfn_noncontested (struct ReferendumEntry *rfn)
835 for (i = 0; i < rfn->num_peers; i++)
836 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
844 rfn_vote (struct ReferendumEntry *rfn,
845 uint16_t voting_peer,
846 enum ReferendumVote vote,
847 const struct GNUNET_SET_Element *element)
849 struct RfnElementInfo *ri;
850 struct GNUNET_HashCode hash;
852 GNUNET_assert (voting_peer < rfn->num_peers);
854 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
855 since VOTE_KEEP is implicit in not voting. */
856 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
858 GNUNET_SET_element_hash (element, &hash);
859 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
863 ri = GNUNET_new (struct RfnElementInfo);
864 ri->element = GNUNET_SET_element_dup (element);
865 ri->votes = GNUNET_new_array (rfn->num_peers, int);
866 GNUNET_assert (GNUNET_OK ==
867 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
869 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
872 ri->votes[voting_peer] = GNUNET_YES;
878 task_other_peer (struct TaskEntry *task)
880 uint16_t me = task->step->session->local_peer_idx;
881 if (task->key.peer1 == me)
882 return task->key.peer2;
883 return task->key.peer1;
888 cmp_uint64_t (const void *pa, const void *pb)
890 uint64_t a = *(uint64_t *) pa;
891 uint64_t b = *(uint64_t *) pb;
902 * Callback for set operation results. Called for each element
906 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
907 * @param current_size current set size
908 * @param status see enum GNUNET_SET_Status
911 set_result_cb (void *cls,
912 const struct GNUNET_SET_Element *element,
913 uint64_t current_size,
914 enum GNUNET_SET_Status status)
916 struct TaskEntry *task = cls;
917 struct ConsensusSession *session = task->step->session;
918 struct SetEntry *output_set = NULL;
919 struct DiffEntry *output_diff = NULL;
920 struct ReferendumEntry *output_rfn = NULL;
921 unsigned int other_idx;
922 struct SetOpCls *setop;
923 const struct ConsensusElement *consensus_element = NULL;
927 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928 "P%u: got element of type %u, status %u\n",
929 session->local_peer_idx,
930 (unsigned) element->element_type,
932 GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
933 consensus_element = element->data;
936 setop = &task->cls.setop;
939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940 "P%u: got set result for {%s}, status %u\n",
941 session->local_peer_idx,
942 debug_str_task_key (&task->key),
945 if (GNUNET_NO == task->is_started)
951 if (GNUNET_YES == task->is_finished)
957 other_idx = task_other_peer (task);
959 if (SET_KIND_NONE != setop->output_set.set_kind)
961 output_set = lookup_set (session, &setop->output_set);
962 GNUNET_assert (NULL != output_set);
965 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
967 output_diff = lookup_diff (session, &setop->output_diff);
968 GNUNET_assert (NULL != output_diff);
971 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
973 output_rfn = lookup_rfn (session, &setop->output_rfn);
974 GNUNET_assert (NULL != output_rfn);
977 if (GNUNET_YES == session->peers_blacklisted[other_idx])
979 /* Peer might have been blacklisted
980 by a gradecast running in parallel, ignore elements from now */
981 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
983 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
987 if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990 "P%u: got some marker\n",
991 session->local_peer_idx);
992 if ( (GNUNET_YES == setop->transceive_contested) &&
993 (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
995 GNUNET_assert (NULL != output_rfn);
996 rfn_contest (output_rfn, task_other_peer (task));
1000 if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1003 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004 "P%u: got size marker\n",
1005 session->local_peer_idx);
1008 struct ConsensusSizeElement *cse = (void *) consensus_element;
1010 if (cse->sender_index == other_idx)
1012 if (NULL == session->first_sizes_received)
1013 session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1014 session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1016 uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1017 qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1018 session->lower_bound = copy[session->num_peers / 3 + 1];
1019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020 "P%u: lower bound %llu\n",
1021 session->local_peer_idx,
1022 (long long) session->lower_bound);
1033 case GNUNET_SET_STATUS_ADD_LOCAL:
1034 GNUNET_assert (NULL != consensus_element);
1035 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036 "Adding element in Task {%s}\n",
1037 debug_str_task_key (&task->key));
1038 if (NULL != output_set)
1040 // FIXME: record pending adds, use callback
1041 GNUNET_SET_add_element (output_set->h,
1045 #ifdef GNUNET_EXTRA_LOGGING
1046 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047 "P%u: adding element %s into set {%s} of task {%s}\n",
1048 session->local_peer_idx,
1049 debug_str_element (element),
1050 debug_str_set_key (&setop->output_set),
1051 debug_str_task_key (&task->key));
1054 if (NULL != output_diff)
1056 diff_insert (output_diff, 1, element);
1057 #ifdef GNUNET_EXTRA_LOGGING
1058 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059 "P%u: adding element %s into diff {%s} of task {%s}\n",
1060 session->local_peer_idx,
1061 debug_str_element (element),
1062 debug_str_diff_key (&setop->output_diff),
1063 debug_str_task_key (&task->key));
1066 if (NULL != output_rfn)
1068 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1069 #ifdef GNUNET_EXTRA_LOGGING
1070 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1072 session->local_peer_idx,
1073 debug_str_element (element),
1074 debug_str_rfn_key (&setop->output_rfn),
1075 debug_str_task_key (&task->key));
1078 // XXX: add result to structures in task
1080 case GNUNET_SET_STATUS_ADD_REMOTE:
1081 GNUNET_assert (NULL != consensus_element);
1082 if (GNUNET_YES == setop->do_not_remove)
1084 if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1086 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087 "Removing element in Task {%s}\n",
1088 debug_str_task_key (&task->key));
1089 if (NULL != output_set)
1091 // FIXME: record pending adds, use callback
1092 GNUNET_SET_remove_element (output_set->h,
1096 #ifdef GNUNET_EXTRA_LOGGING
1097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098 "P%u: removing element %s from set {%s} of task {%s}\n",
1099 session->local_peer_idx,
1100 debug_str_element (element),
1101 debug_str_set_key (&setop->output_set),
1102 debug_str_task_key (&task->key));
1105 if (NULL != output_diff)
1107 diff_insert (output_diff, -1, element);
1108 #ifdef GNUNET_EXTRA_LOGGING
1109 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110 "P%u: removing element %s from diff {%s} of task {%s}\n",
1111 session->local_peer_idx,
1112 debug_str_element (element),
1113 debug_str_diff_key (&setop->output_diff),
1114 debug_str_task_key (&task->key));
1117 if (NULL != output_rfn)
1119 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1120 #ifdef GNUNET_EXTRA_LOGGING
1121 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1123 session->local_peer_idx,
1124 debug_str_element (element),
1125 debug_str_rfn_key (&setop->output_rfn),
1126 debug_str_task_key (&task->key));
1130 case GNUNET_SET_STATUS_DONE:
1131 // XXX: check first if any changes to the underlying
1132 // set are still pending
1133 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134 "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1135 session->local_peer_idx,
1136 debug_str_task_key (&task->key),
1137 (unsigned int) task->step->finished_tasks,
1138 (unsigned int) task->step->tasks_len);
1139 if (NULL != output_rfn)
1141 rfn_commit (output_rfn, task_other_peer (task));
1143 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1145 session->first_size = current_size;
1149 case GNUNET_SET_STATUS_FAILURE:
1151 GNUNET_break_op (0);
1172 enum EvilnessSubType
1175 EVILNESS_SUB_REPLACEMENT,
1176 EVILNESS_SUB_NO_REPLACEMENT,
1181 enum EvilnessType type;
1182 enum EvilnessSubType subtype;
1188 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1190 if (0 == strcmp ("replace", evil_subtype_str))
1192 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1194 else if (0 == strcmp ("noreplace", evil_subtype_str))
1196 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1200 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1201 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1203 return GNUNET_SYSERR;
1210 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1214 char *evil_type_str = NULL;
1215 char *evil_subtype_str = NULL;
1217 GNUNET_assert (NULL != evil);
1219 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "P%u: no evilness\n",
1223 session->local_peer_idx);
1224 evil->type = EVILNESS_NONE;
1227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228 "P%u: got evilness spec\n",
1229 session->local_peer_idx);
1231 for (field = strtok (evil_spec, "/");
1233 field = strtok (NULL, "/"))
1235 unsigned int peer_num;
1236 unsigned int evil_num;
1239 evil_type_str = NULL;
1240 evil_subtype_str = NULL;
1242 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1246 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1247 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1253 GNUNET_assert (NULL != evil_type_str);
1254 GNUNET_assert (NULL != evil_subtype_str);
1256 if (peer_num == session->local_peer_idx)
1258 if (0 == strcmp ("slack", evil_type_str))
1260 evil->type = EVILNESS_SLACK;
1262 if (0 == strcmp ("slack-a2a", evil_type_str))
1264 evil->type = EVILNESS_SLACK_A2A;
1266 else if (0 == strcmp ("cram-all", evil_type_str))
1268 evil->type = EVILNESS_CRAM_ALL;
1269 evil->num = evil_num;
1270 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1273 else if (0 == strcmp ("cram-lead", evil_type_str))
1275 evil->type = EVILNESS_CRAM_LEAD;
1276 evil->num = evil_num;
1277 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1280 else if (0 == strcmp ("cram-echo", evil_type_str))
1282 evil->type = EVILNESS_CRAM_ECHO;
1283 evil->num = evil_num;
1284 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1289 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1290 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1296 /* No GNUNET_free since memory was allocated by libc */
1297 free (evil_type_str);
1298 evil_type_str = NULL;
1299 evil_subtype_str = NULL;
1302 evil->type = EVILNESS_NONE;
1304 GNUNET_free (evil_spec);
1305 /* no GNUNET_free_non_null since it wasn't
1306 * allocated with GNUNET_malloc */
1307 if (NULL != evil_type_str)
1308 free (evil_type_str);
1309 if (NULL != evil_subtype_str)
1310 free (evil_subtype_str);
1317 * Commit the appropriate set for a
1321 commit_set (struct ConsensusSession *session,
1322 struct TaskEntry *task)
1324 struct SetEntry *set;
1325 struct SetOpCls *setop = &task->cls.setop;
1327 GNUNET_assert (NULL != setop->op);
1328 set = lookup_set (session, &setop->input_set);
1329 GNUNET_assert (NULL != set);
1331 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1333 struct GNUNET_SET_Element element;
1334 struct ConsensusElement ce = { 0 };
1335 ce.marker = CONSENSUS_MARKER_CONTESTED;
1337 element.size = sizeof (struct ConsensusElement);
1338 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1339 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1342 if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1344 struct GNUNET_SET_Element element;
1345 struct ConsensusSizeElement cse = {
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1350 cse.ce.marker = CONSENSUS_MARKER_SIZE;
1351 cse.size = GNUNET_htonll (session->first_size);
1352 cse.sender_index = session->local_peer_idx;
1353 element.data = &cse;
1354 element.size = sizeof (struct ConsensusSizeElement);
1355 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1356 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1362 struct Evilness evil;
1364 get_evilness (session, &evil);
1365 if (EVILNESS_NONE != evil.type)
1367 /* Useful for evaluation */
1368 GNUNET_STATISTICS_set (statistics,
1375 case EVILNESS_CRAM_ALL:
1376 case EVILNESS_CRAM_LEAD:
1377 case EVILNESS_CRAM_ECHO:
1378 /* We're not cramming elements in the
1379 all-to-all round, since that would just
1380 add more elements to the result set, but
1381 wouldn't test robustness. */
1382 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1384 GNUNET_SET_commit (setop->op, set->h);
1387 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1388 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1390 GNUNET_SET_commit (setop->op, set->h);
1393 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1395 GNUNET_SET_commit (setop->op, set->h);
1398 for (i = 0; i < evil.num; i++)
1400 struct GNUNET_SET_Element element;
1401 struct ConsensusStuffedElement se = {
1402 .ce.payload_type = 0,
1406 element.size = sizeof (struct ConsensusStuffedElement);
1407 element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
1409 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1411 /* Always generate a new element. */
1412 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
1414 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1416 /* Always cram the same elements, derived from counter. */
1417 GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1423 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1424 #ifdef GNUNET_EXTRA_LOGGING
1425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1427 session->local_peer_idx,
1428 debug_str_element (&element),
1429 debug_str_set_key (&setop->input_set),
1430 debug_str_task_key (&task->key));
1433 GNUNET_STATISTICS_update (statistics,
1434 "# stuffed elements",
1437 GNUNET_SET_commit (setop->op, set->h);
1439 case EVILNESS_SLACK:
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "P%u: evil peer: slacking\n",
1442 (unsigned int) session->local_peer_idx);
1444 case EVILNESS_SLACK_A2A:
1445 if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1446 (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1448 struct GNUNET_SET_Handle *empty_set;
1449 empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1450 GNUNET_SET_commit (setop->op, empty_set);
1451 GNUNET_SET_destroy (empty_set);
1455 GNUNET_SET_commit (setop->op, set->h);
1459 GNUNET_SET_commit (setop->op, set->h);
1464 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1466 GNUNET_SET_commit (setop->op, set->h);
1470 /* For our testcases, we don't want the blacklisted
1472 GNUNET_SET_operation_cancel (setop->op);
1481 put_diff (struct ConsensusSession *session,
1482 struct DiffEntry *diff)
1484 struct GNUNET_HashCode hash;
1486 GNUNET_assert (NULL != diff);
1488 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1489 GNUNET_assert (GNUNET_OK ==
1490 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1491 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1495 put_set (struct ConsensusSession *session,
1496 struct SetEntry *set)
1498 struct GNUNET_HashCode hash;
1500 GNUNET_assert (NULL != set->h);
1502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1504 debug_str_set_key (&set->key));
1506 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1507 GNUNET_assert (GNUNET_SYSERR !=
1508 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1509 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1514 put_rfn (struct ConsensusSession *session,
1515 struct ReferendumEntry *rfn)
1517 struct GNUNET_HashCode hash;
1519 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1520 GNUNET_assert (GNUNET_OK ==
1521 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1522 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1528 task_cancel_reconcile (struct TaskEntry *task)
1530 /* not implemented yet */
1536 apply_diff_to_rfn (struct DiffEntry *diff,
1537 struct ReferendumEntry *rfn,
1538 uint16_t voting_peer,
1541 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1542 struct DiffElementInfo *di;
1544 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1546 while (GNUNET_YES ==
1547 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1549 (const void **) &di))
1553 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1557 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1561 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1568 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1570 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1577 diff_compose (struct DiffEntry *diff_1,
1578 struct DiffEntry *diff_2)
1580 struct DiffEntry *diff_new;
1581 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1582 struct DiffElementInfo *di;
1584 diff_new = diff_create ();
1586 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1587 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1589 diff_insert (diff_new, di->weight, di->element);
1591 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1593 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1594 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1596 diff_insert (diff_new, di->weight, di->element);
1598 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1604 struct ReferendumEntry *
1605 rfn_create (uint16_t size)
1607 struct ReferendumEntry *rfn;
1609 rfn = GNUNET_new (struct ReferendumEntry);
1610 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1611 rfn->peer_commited = GNUNET_new_array (size, int);
1612 rfn->peer_contested = GNUNET_new_array (size, int);
1613 rfn->num_peers = size;
1621 diff_destroy (struct DiffEntry *diff)
1623 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1630 * For a given majority, count what the outcome
1631 * is (add/remove/keep), and give the number
1632 * of peers that voted for this outcome.
1635 rfn_majority (const struct ReferendumEntry *rfn,
1636 const struct RfnElementInfo *ri,
1637 uint16_t *ret_majority,
1638 enum ReferendumVote *ret_vote)
1640 uint16_t votes_yes = 0;
1641 uint16_t num_commited = 0;
1644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1645 "Computing rfn majority for element %s of rfn {%s}\n",
1646 debug_str_element (ri->element),
1647 debug_str_rfn_key (&rfn->key));
1649 for (i = 0; i < rfn->num_peers; i++)
1651 if (GNUNET_NO == rfn->peer_commited[i])
1655 if (GNUNET_YES == ri->votes[i])
1659 if (votes_yes > (num_commited) / 2)
1661 *ret_vote = ri->proposal;
1662 *ret_majority = votes_yes;
1666 *ret_vote = VOTE_STAY;
1667 *ret_majority = num_commited - votes_yes;
1674 struct TaskEntry *task;
1675 struct SetKey dst_set_key;
1680 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1682 struct SetCopyCls *scc = cls;
1683 struct TaskEntry *task = scc->task;
1684 struct SetKey dst_set_key = scc->dst_set_key;
1685 struct SetEntry *set;
1686 struct SetHandle *sh = GNUNET_new (struct SetHandle);
1689 GNUNET_CONTAINER_DLL_insert (task->step->session->set_handles_head,
1690 task->step->session->set_handles_tail,
1694 set = GNUNET_new (struct SetEntry);
1696 set->key = dst_set_key;
1697 put_set (task->step->session, set);
1704 * Call the start function of the given
1705 * task again after we created a copy of the given set.
1708 create_set_copy_for_task (struct TaskEntry *task,
1709 struct SetKey *src_set_key,
1710 struct SetKey *dst_set_key)
1712 struct SetEntry *src_set;
1713 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716 "Copying set {%s} to {%s} for task {%s}\n",
1717 debug_str_set_key (src_set_key),
1718 debug_str_set_key (dst_set_key),
1719 debug_str_task_key (&task->key));
1722 scc->dst_set_key = *dst_set_key;
1723 src_set = lookup_set (task->step->session, src_set_key);
1724 GNUNET_assert (NULL != src_set);
1725 GNUNET_SET_copy_lazy (src_set->h,
1731 struct SetMutationProgressCls
1735 * Task to finish once all changes are through.
1737 struct TaskEntry *task;
1742 set_mutation_done (void *cls)
1744 struct SetMutationProgressCls *pc = cls;
1746 GNUNET_assert (pc->num_pending > 0);
1750 if (0 == pc->num_pending)
1752 struct TaskEntry *task = pc->task;
1760 try_finish_step_early (struct Step *step)
1764 if (GNUNET_YES == step->is_running)
1766 if (GNUNET_YES == step->is_finished)
1768 if (GNUNET_NO == step->early_finishable)
1771 step->is_finished = GNUNET_YES;
1773 #ifdef GNUNET_EXTRA_LOGGING
1774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1775 "Finishing step `%s' early.\n",
1779 for (i = 0; i < step->subordinates_len; i++)
1781 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1782 step->subordinates[i]->pending_prereq--;
1783 #ifdef GNUNET_EXTRA_LOGGING
1784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785 "Decreased pending_prereq to %u for step `%s'.\n",
1786 (unsigned int) step->subordinates[i]->pending_prereq,
1787 step->subordinates[i]->debug_name);
1790 try_finish_step_early (step->subordinates[i]);
1793 // XXX: maybe schedule as task to avoid recursion?
1794 run_ready_steps (step->session);
1799 finish_step (struct Step *step)
1803 GNUNET_assert (step->finished_tasks == step->tasks_len);
1804 GNUNET_assert (GNUNET_YES == step->is_running);
1805 GNUNET_assert (GNUNET_NO == step->is_finished);
1807 #ifdef GNUNET_EXTRA_LOGGING
1808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809 "All tasks of step `%s' with %u subordinates finished.\n",
1811 step->subordinates_len);
1814 for (i = 0; i < step->subordinates_len; i++)
1816 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1817 step->subordinates[i]->pending_prereq--;
1818 #ifdef GNUNET_EXTRA_LOGGING
1819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1820 "Decreased pending_prereq to %u for step `%s'.\n",
1821 (unsigned int) step->subordinates[i]->pending_prereq,
1822 step->subordinates[i]->debug_name);
1827 step->is_finished = GNUNET_YES;
1829 // XXX: maybe schedule as task to avoid recursion?
1830 run_ready_steps (step->session);
1836 * Apply the result from one round of gradecasts (i.e. every peer
1837 * should have gradecasted) to the peer's current set.
1839 * @param task the task with context information
1842 task_start_apply_round (struct TaskEntry *task)
1844 struct ConsensusSession *session = task->step->session;
1845 struct SetKey sk_in;
1846 struct SetKey sk_out;
1847 struct RfnKey rk_in;
1848 struct SetEntry *set_out;
1849 struct ReferendumEntry *rfn_in;
1850 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1851 struct RfnElementInfo *ri;
1852 struct SetMutationProgressCls *progress_cls;
1853 uint16_t worst_majority = UINT16_MAX;
1855 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1856 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1857 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1859 set_out = lookup_set (session, &sk_out);
1860 if (NULL == set_out)
1862 create_set_copy_for_task (task, &sk_in, &sk_out);
1866 rfn_in = lookup_rfn (session, &rk_in);
1867 GNUNET_assert (NULL != rfn_in);
1869 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1870 progress_cls->task = task;
1872 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1874 while (GNUNET_YES ==
1875 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1877 (const void **) &ri))
1879 uint16_t majority_num;
1880 enum ReferendumVote majority_vote;
1882 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1884 if (worst_majority > majority_num)
1885 worst_majority = majority_num;
1887 switch (majority_vote)
1890 progress_cls->num_pending++;
1891 GNUNET_assert (GNUNET_OK ==
1892 GNUNET_SET_add_element (set_out->h,
1896 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1897 "P%u: apply round: adding element %s with %u-majority.\n",
1898 session->local_peer_idx,
1899 debug_str_element (ri->element), majority_num);
1902 progress_cls->num_pending++;
1903 GNUNET_assert (GNUNET_OK ==
1904 GNUNET_SET_remove_element (set_out->h,
1908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1909 "P%u: apply round: deleting element %s with %u-majority.\n",
1910 session->local_peer_idx,
1911 debug_str_element (ri->element), majority_num);
1914 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1915 "P%u: apply round: keeping element %s with %u-majority.\n",
1916 session->local_peer_idx,
1917 debug_str_element (ri->element), majority_num);
1926 if (0 == progress_cls->num_pending)
1928 // call closure right now, no pending ops
1929 GNUNET_free (progress_cls);
1934 uint16_t thresh = (session->num_peers / 3) * 2;
1936 if (worst_majority >= thresh)
1938 switch (session->early_stopping)
1940 case EARLY_STOPPING_NONE:
1941 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1942 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1943 "P%u: Stopping early (after one more superround)\n",
1944 session->local_peer_idx);
1946 case EARLY_STOPPING_ONE_MORE:
1947 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1948 session->local_peer_idx);
1949 session->early_stopping = EARLY_STOPPING_DONE;
1952 for (step = session->steps_head; NULL != step; step = step->next)
1953 try_finish_step_early (step);
1956 case EARLY_STOPPING_DONE:
1957 /* We shouldn't be here anymore after early stopping */
1965 else if (EARLY_STOPPING_NONE != session->early_stopping)
1967 // Our assumption about the number of bad peers
1969 GNUNET_break_op (0);
1973 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1974 session->local_peer_idx);
1977 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1982 task_start_grade (struct TaskEntry *task)
1984 struct ConsensusSession *session = task->step->session;
1985 struct ReferendumEntry *output_rfn;
1986 struct ReferendumEntry *input_rfn;
1987 struct DiffEntry *input_diff;
1988 struct RfnKey rfn_key;
1989 struct DiffKey diff_key;
1990 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1991 struct RfnElementInfo *ri;
1992 unsigned int gradecast_confidence = 2;
1994 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1995 output_rfn = lookup_rfn (session, &rfn_key);
1996 if (NULL == output_rfn)
1998 output_rfn = rfn_create (session->num_peers);
1999 output_rfn->key = rfn_key;
2000 put_rfn (session, output_rfn);
2003 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2004 input_diff = lookup_diff (session, &diff_key);
2005 GNUNET_assert (NULL != input_diff);
2007 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2008 input_rfn = lookup_rfn (session, &rfn_key);
2009 GNUNET_assert (NULL != input_rfn);
2011 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2013 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2015 while (GNUNET_YES ==
2016 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2018 (const void **) &ri))
2020 uint16_t majority_num;
2021 enum ReferendumVote majority_vote;
2023 // XXX: we need contested votes and non-contested votes here
2024 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2026 if (majority_num <= session->num_peers / 3)
2027 majority_vote = VOTE_REMOVE;
2029 switch (majority_vote)
2034 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2037 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2044 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2047 uint16_t noncontested;
2048 noncontested = rfn_noncontested (input_rfn);
2049 if (noncontested < (session->num_peers / 3) * 2)
2051 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2053 if (noncontested < (session->num_peers / 3) + 1)
2055 gradecast_confidence = 0;
2059 if (gradecast_confidence >= 1)
2060 rfn_commit (output_rfn, task->key.leader);
2062 if (gradecast_confidence <= 1)
2063 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2070 task_start_reconcile (struct TaskEntry *task)
2072 struct SetEntry *input;
2073 struct SetOpCls *setop = &task->cls.setop;
2074 struct ConsensusSession *session = task->step->session;
2076 input = lookup_set (session, &setop->input_set);
2077 GNUNET_assert (NULL != input);
2078 GNUNET_assert (NULL != input->h);
2080 /* We create the outputs for the operation here
2081 (rather than in the set operation callback)
2082 because we want something valid in there, even
2083 if the other peer doesn't talk to us */
2085 if (SET_KIND_NONE != setop->output_set.set_kind)
2087 /* If we don't have an existing output set,
2088 we clone the input set. */
2089 if (NULL == lookup_set (session, &setop->output_set))
2091 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2096 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2098 if (NULL == lookup_rfn (session, &setop->output_rfn))
2100 struct ReferendumEntry *rfn;
2102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2103 "P%u: output rfn <%s> missing, creating.\n",
2104 session->local_peer_idx,
2105 debug_str_rfn_key (&setop->output_rfn));
2107 rfn = rfn_create (session->num_peers);
2108 rfn->key = setop->output_rfn;
2109 put_rfn (session, rfn);
2113 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2115 if (NULL == lookup_diff (session, &setop->output_diff))
2117 struct DiffEntry *diff;
2119 diff = diff_create ();
2120 diff->key = setop->output_diff;
2121 put_diff (session, diff);
2125 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2127 /* XXX: mark the corresponding rfn as commited if necessary */
2132 if (task->key.peer1 == session->local_peer_idx)
2134 struct GNUNET_CONSENSUS_RoundContextMessage rcm;
2136 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2137 "P%u: Looking up set {%s} to run remote union\n",
2138 session->local_peer_idx,
2139 debug_str_set_key (&setop->input_set));
2141 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2142 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2144 rcm.kind = htons (task->key.kind);
2145 rcm.peer1 = htons (task->key.peer1);
2146 rcm.peer2 = htons (task->key.peer2);
2147 rcm.leader = htons (task->key.leader);
2148 rcm.repetition = htons (task->key.repetition);
2149 rcm.is_contested = htons (0);
2151 GNUNET_assert (NULL == setop->op);
2152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2153 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2155 struct GNUNET_SET_Option opts[] = {
2156 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2157 { GNUNET_SET_OPTION_END },
2160 // XXX: maybe this should be done while
2161 // setting up tasks alreays?
2162 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2163 &session->global_id,
2165 GNUNET_SET_RESULT_SYMMETRIC,
2170 commit_set (session, task);
2172 else if (task->key.peer2 == session->local_peer_idx)
2174 /* Wait for the other peer to contact us */
2175 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2176 session->local_peer_idx, task->key.peer1);
2178 if (NULL != setop->op)
2180 commit_set (session, task);
2185 /* We made an error while constructing the task graph. */
2192 task_start_eval_echo (struct TaskEntry *task)
2194 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2195 struct ReferendumEntry *input_rfn;
2196 struct RfnElementInfo *ri;
2197 struct SetEntry *output_set;
2198 struct SetMutationProgressCls *progress_cls;
2199 struct ConsensusSession *session = task->step->session;
2200 struct SetKey sk_in;
2201 struct SetKey sk_out;
2202 struct RfnKey rk_in;
2204 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2205 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2206 output_set = lookup_set (session, &sk_out);
2207 if (NULL == output_set)
2209 create_set_copy_for_task (task, &sk_in, &sk_out);
2215 // FIXME: should be marked as a shallow copy, so
2216 // we can destroy everything correctly
2217 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2218 last_set->h = output_set->h;
2219 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2220 put_set (session, last_set);
2223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2224 "Evaluating referendum in Task {%s}\n",
2225 debug_str_task_key (&task->key));
2227 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2228 progress_cls->task = task;
2230 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2231 input_rfn = lookup_rfn (session, &rk_in);
2233 GNUNET_assert (NULL != input_rfn);
2235 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2236 GNUNET_assert (NULL != iter);
2238 while (GNUNET_YES ==
2239 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2241 (const void **) &ri))
2243 enum ReferendumVote majority_vote;
2244 uint16_t majority_num;
2246 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2248 if (majority_num < session->num_peers / 3)
2250 /* It is not the case that all nonfaulty peers
2251 echoed the same value. Since we're doing a set reconciliation, we
2252 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2253 reconciliation as contested. Other peers might not know that the
2254 leader is faulty, thus we still re-distribute in the confirmation
2256 output_set->is_contested = GNUNET_YES;
2259 switch (majority_vote)
2262 progress_cls->num_pending++;
2263 GNUNET_assert (GNUNET_OK ==
2264 GNUNET_SET_add_element (output_set->h,
2270 progress_cls->num_pending++;
2271 GNUNET_assert (GNUNET_OK ==
2272 GNUNET_SET_remove_element (output_set->h,
2278 /* Nothing to do. */
2286 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2288 if (0 == progress_cls->num_pending)
2290 // call closure right now, no pending ops
2291 GNUNET_free (progress_cls);
2298 task_start_finish (struct TaskEntry *task)
2300 struct SetEntry *final_set;
2301 struct ConsensusSession *session = task->step->session;
2303 final_set = lookup_set (session, &task->cls.finish.input_set);
2305 GNUNET_assert (NULL != final_set);
2308 GNUNET_SET_iterate (final_set->h,
2309 send_to_client_iter,
2314 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2316 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2318 GNUNET_assert (GNUNET_NO == task->is_started);
2319 GNUNET_assert (GNUNET_NO == task->is_finished);
2320 GNUNET_assert (NULL != task->start);
2324 task->is_started = GNUNET_YES;
2331 * Run all steps of the session that don't any
2332 * more dependencies.
2335 run_ready_steps (struct ConsensusSession *session)
2339 step = session->steps_head;
2341 while (NULL != step)
2343 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2347 GNUNET_assert (0 == step->finished_tasks);
2349 #ifdef GNUNET_EXTRA_LOGGING
2350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2351 session->local_peer_idx,
2353 step->round, step->tasks_len, step->subordinates_len);
2356 step->is_running = GNUNET_YES;
2357 for (i = 0; i < step->tasks_len; i++)
2358 start_task (session, step->tasks[i]);
2360 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2361 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2364 /* Running the next ready steps will be triggered by task completion */
2376 finish_task (struct TaskEntry *task)
2378 GNUNET_assert (GNUNET_NO == task->is_finished);
2379 task->is_finished = GNUNET_YES;
2381 task->step->finished_tasks++;
2383 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2384 "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2385 task->step->session->local_peer_idx,
2386 debug_str_task_key (&task->key),
2387 (unsigned int) task->step->finished_tasks,
2388 (unsigned int) task->step->tasks_len);
2390 if (task->step->finished_tasks == task->step->tasks_len)
2391 finish_step (task->step);
2396 * Search peer in the list of peers in session.
2398 * @param peer peer to find
2399 * @param session session with peer
2400 * @return index of peer, -1 if peer is not in session
2403 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2406 for (i = 0; i < session->num_peers; i++)
2407 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2414 * Compute a global, (hopefully) unique consensus session id,
2415 * from the local id of the consensus session, and the identities of all participants.
2416 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2417 * exactly the same peers, the global id will be different.
2419 * @param session session to generate the global id for
2420 * @param local_session_id local id of the consensus session
2423 compute_global_id (struct ConsensusSession *session,
2424 const struct GNUNET_HashCode *local_session_id)
2426 const char *salt = "gnunet-service-consensus/session_id";
2428 GNUNET_assert (GNUNET_YES ==
2429 GNUNET_CRYPTO_kdf (&session->global_id,
2430 sizeof (struct GNUNET_HashCode),
2434 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2436 sizeof (struct GNUNET_HashCode),
2442 * Compare two peer identities.
2444 * @param h1 some peer identity
2445 * @param h2 some peer identity
2446 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2449 peer_id_cmp (const void *h1, const void *h2)
2451 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2456 * Create the sorted list of peers for the session,
2457 * add the local peer if not in the join message.
2459 * @param session session to initialize
2460 * @param join_msg join message with the list of peers participating at the end
2463 initialize_session_peer_list (struct ConsensusSession *session,
2464 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2466 const struct GNUNET_PeerIdentity *msg_peers
2467 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2468 int local_peer_in_list;
2470 session->num_peers = ntohl (join_msg->num_peers);
2472 /* Peers in the join message, may or may not include the local peer,
2473 Add it if it is missing. */
2474 local_peer_in_list = GNUNET_NO;
2475 for (unsigned int i = 0; i < session->num_peers; i++)
2477 if (0 == memcmp (&msg_peers[i],
2479 sizeof (struct GNUNET_PeerIdentity)))
2481 local_peer_in_list = GNUNET_YES;
2485 if (GNUNET_NO == local_peer_in_list)
2486 session->num_peers++;
2488 session->peers = GNUNET_new_array (session->num_peers,
2489 struct GNUNET_PeerIdentity);
2490 if (GNUNET_NO == local_peer_in_list)
2491 session->peers[session->num_peers - 1] = my_peer;
2493 GNUNET_memcpy (session->peers,
2495 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2496 qsort (session->peers,
2498 sizeof (struct GNUNET_PeerIdentity),
2503 static struct TaskEntry *
2504 lookup_task (struct ConsensusSession *session,
2505 struct TaskKey *key)
2507 struct GNUNET_HashCode hash;
2510 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2512 GNUNET_h2s (&hash));
2513 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2518 * Called when another peer wants to do a set operation with the
2521 * @param cls closure
2522 * @param other_peer the other peer
2523 * @param context_msg message with application specific information from
2525 * @param request request from the other peer, use GNUNET_SET_accept
2526 * to accept it, otherwise the request will be refused
2527 * Note that we don't use a return value here, as it is also
2528 * necessary to specify the set we want to do the operation with,
2529 * whith sometimes can be derived from the context message.
2530 * Also necessary to specify the timeout.
2533 set_listen_cb (void *cls,
2534 const struct GNUNET_PeerIdentity *other_peer,
2535 const struct GNUNET_MessageHeader *context_msg,
2536 struct GNUNET_SET_Request *request)
2538 struct ConsensusSession *session = cls;
2540 struct TaskEntry *task;
2541 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2543 if (NULL == context_msg)
2545 GNUNET_break_op (0);
2549 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2551 GNUNET_break_op (0);
2555 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2557 GNUNET_break_op (0);
2561 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2563 tk = ((struct TaskKey) {
2564 .kind = ntohs (cm->kind),
2565 .peer1 = ntohs (cm->peer1),
2566 .peer2 = ntohs (cm->peer2),
2567 .repetition = ntohs (cm->repetition),
2568 .leader = ntohs (cm->leader),
2571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2572 session->local_peer_idx, debug_str_task_key (&tk));
2574 task = lookup_task (session, &tk);
2578 GNUNET_break_op (0);
2582 if (GNUNET_YES == task->is_finished)
2584 GNUNET_break_op (0);
2588 if (task->key.peer2 != session->local_peer_idx)
2590 /* We're being asked, so we must be thne 2nd peer. */
2591 GNUNET_break_op (0);
2595 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2596 (task->key.peer2 == session->local_peer_idx)));
2598 struct GNUNET_SET_Option opts[] = {
2599 { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2600 { GNUNET_SET_OPTION_END },
2603 task->cls.setop.op = GNUNET_SET_accept (request,
2604 GNUNET_SET_RESULT_SYMMETRIC,
2609 /* If the task hasn't been started yet,
2610 we wait for that until we commit. */
2612 if (GNUNET_YES == task->is_started)
2614 commit_set (session, task);
2621 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2622 struct TaskEntry *t)
2624 struct GNUNET_HashCode round_hash;
2627 GNUNET_assert (NULL != t->step);
2629 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2633 if (s->tasks_len == s->tasks_cap)
2635 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2636 GNUNET_array_grow (s->tasks,
2641 #ifdef GNUNET_EXTRA_LOGGING
2642 GNUNET_assert (NULL != s->debug_name);
2643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2644 debug_str_task_key (&t->key),
2648 s->tasks[s->tasks_len] = t;
2651 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2652 GNUNET_assert (GNUNET_OK ==
2653 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2654 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2659 install_step_timeouts (struct ConsensusSession *session)
2661 /* Given the fully constructed task graph
2662 with rounds for tasks, we can give the tasks timeouts. */
2664 // unsigned int max_round;
2666 /* XXX: implement! */
2672 * Arrange two peers in some canonical order.
2675 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2680 GNUNET_assert (*p1 < n);
2681 GNUNET_assert (*p2 < n);
2694 /* For uniformly random *p1, *p2,
2695 this condition is true with 50% chance */
2696 if (((b - a) + n) % n <= n / 2)
2710 * Record @a dep as a dependency of @a step.
2713 step_depend_on (struct Step *step, struct Step *dep)
2715 /* We're not checking for cyclic dependencies,
2716 but this is a cheap sanity check. */
2717 GNUNET_assert (step != dep);
2718 GNUNET_assert (NULL != step);
2719 GNUNET_assert (NULL != dep);
2720 GNUNET_assert (dep->round <= step->round);
2722 #ifdef GNUNET_EXTRA_LOGGING
2723 /* Make sure we have complete debugging information.
2724 Also checks that we don't screw up too badly
2725 constructing the task graph. */
2726 GNUNET_assert (NULL != step->debug_name);
2727 GNUNET_assert (NULL != dep->debug_name);
2728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2729 "Making step `%s' depend on `%s'\n",
2734 if (dep->subordinates_cap == dep->subordinates_len)
2736 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2737 GNUNET_array_grow (dep->subordinates,
2738 dep->subordinates_cap,
2742 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2744 dep->subordinates[dep->subordinates_len] = step;
2745 dep->subordinates_len++;
2747 step->pending_prereq++;
2751 static struct Step *
2752 create_step (struct ConsensusSession *session, int round, int early_finishable)
2755 step = GNUNET_new (struct Step);
2756 step->session = session;
2757 step->round = round;
2758 step->early_finishable = early_finishable;
2759 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2760 session->steps_tail,
2767 * Construct the task graph for a single
2771 construct_task_graph_gradecast (struct ConsensusSession *session,
2774 struct Step *step_before,
2775 struct Step *step_after)
2777 uint16_t n = session->num_peers;
2778 uint16_t me = session->local_peer_idx;
2783 /* The task we're currently setting up. */
2784 struct TaskEntry task;
2787 struct Step *prev_step;
2793 round = step_before->round + 1;
2795 /* gcast step 1: leader disseminates */
2797 step = create_step (session, round, GNUNET_YES);
2799 #ifdef GNUNET_EXTRA_LOGGING
2800 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2802 step_depend_on (step, step_before);
2806 for (k = 0; k < n; k++)
2812 arrange_peers (&p1, &p2, n);
2813 task = ((struct TaskEntry) {
2815 .start = task_start_reconcile,
2816 .cancel = task_cancel_reconcile,
2817 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2819 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2820 put_task (session->taskmap, &task);
2822 /* We run this task to make sure that the leader
2823 has the stored the SET_KIND_LEADER set of himself,
2824 so it can participate in the rest of the gradecast
2825 without the code having to handle any special cases. */
2826 task = ((struct TaskEntry) {
2828 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2829 .start = task_start_reconcile,
2830 .cancel = task_cancel_reconcile,
2832 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2833 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2834 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2835 put_task (session->taskmap, &task);
2841 arrange_peers (&p1, &p2, n);
2842 task = ((struct TaskEntry) {
2844 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2845 .start = task_start_reconcile,
2846 .cancel = task_cancel_reconcile,
2848 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2849 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2850 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2851 put_task (session->taskmap, &task);
2854 /* gcast phase 2: echo */
2857 step = create_step (session, round, GNUNET_YES);
2858 #ifdef GNUNET_EXTRA_LOGGING
2859 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2861 step_depend_on (step, prev_step);
2863 for (k = 0; k < n; k++)
2867 arrange_peers (&p1, &p2, n);
2868 task = ((struct TaskEntry) {
2870 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2871 .start = task_start_reconcile,
2872 .cancel = task_cancel_reconcile,
2874 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2875 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2876 put_task (session->taskmap, &task);
2880 /* Same round, since step only has local tasks */
2881 step = create_step (session, round, GNUNET_YES);
2882 #ifdef GNUNET_EXTRA_LOGGING
2883 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2885 step_depend_on (step, prev_step);
2887 arrange_peers (&p1, &p2, n);
2888 task = ((struct TaskEntry) {
2889 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2891 .start = task_start_eval_echo
2893 put_task (session->taskmap, &task);
2897 step = create_step (session, round, GNUNET_YES);
2898 #ifdef GNUNET_EXTRA_LOGGING
2899 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2901 step_depend_on (step, prev_step);
2903 /* gcast phase 3: confirmation and grading */
2904 for (k = 0; k < n; k++)
2908 arrange_peers (&p1, &p2, n);
2909 task = ((struct TaskEntry) {
2911 .start = task_start_reconcile,
2912 .cancel = task_cancel_reconcile,
2913 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2915 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2916 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2917 /* If there was at least one element in the echo round that was
2918 contested (i.e. it had no n-t majority), then we let the other peers
2919 know, and other peers let us know. The contested flag for each peer is
2920 stored in the rfn. */
2921 task.cls.setop.transceive_contested = GNUNET_YES;
2922 put_task (session->taskmap, &task);
2926 /* Same round, since step only has local tasks */
2927 step = create_step (session, round, GNUNET_YES);
2928 #ifdef GNUNET_EXTRA_LOGGING
2929 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2931 step_depend_on (step, prev_step);
2933 task = ((struct TaskEntry) {
2935 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2936 .start = task_start_grade,
2938 put_task (session->taskmap, &task);
2940 step_depend_on (step_after, step);
2945 construct_task_graph (struct ConsensusSession *session)
2947 uint16_t n = session->num_peers;
2950 uint16_t me = session->local_peer_idx;
2952 /* The task we're currently setting up. */
2953 struct TaskEntry task;
2955 /* Current leader */
2959 struct Step *prev_step;
2961 unsigned int round = 0;
2965 // XXX: introduce first step,
2966 // where we wait for all insert acks
2967 // from the set service
2969 /* faster but brittle all-to-all */
2971 // XXX: Not implemented yet
2973 /* all-to-all step */
2975 step = create_step (session, round, GNUNET_NO);
2977 #ifdef GNUNET_EXTRA_LOGGING
2978 step->debug_name = GNUNET_strdup ("all to all");
2981 for (i = 0; i < n; i++)
2988 arrange_peers (&p1, &p2, n);
2989 task = ((struct TaskEntry) {
2990 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2992 .start = task_start_reconcile,
2993 .cancel = task_cancel_reconcile,
2995 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2996 task.cls.setop.output_set = task.cls.setop.input_set;
2997 task.cls.setop.do_not_remove = GNUNET_YES;
2998 put_task (session->taskmap, &task);
3003 step = create_step (session, round, GNUNET_NO);;
3004 #ifdef GNUNET_EXTRA_LOGGING
3005 step->debug_name = GNUNET_strdup ("all to all 2");
3007 step_depend_on (step, prev_step);
3010 for (i = 0; i < n; i++)
3017 arrange_peers (&p1, &p2, n);
3018 task = ((struct TaskEntry) {
3019 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3021 .start = task_start_reconcile,
3022 .cancel = task_cancel_reconcile,
3024 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3025 task.cls.setop.output_set = task.cls.setop.input_set;
3026 task.cls.setop.do_not_remove = GNUNET_YES;
3027 put_task (session->taskmap, &task);
3037 /* Byzantine union */
3039 /* sequential repetitions of the gradecasts */
3040 for (i = 0; i < t + 1; i++)
3042 struct Step *step_rep_start;
3043 struct Step *step_rep_end;
3045 /* Every repetition is in a separate round. */
3046 step_rep_start = create_step (session, round, GNUNET_YES);
3047 #ifdef GNUNET_EXTRA_LOGGING
3048 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3051 step_depend_on (step_rep_start, prev_step);
3053 /* gradecast has three rounds */
3055 step_rep_end = create_step (session, round, GNUNET_YES);
3056 #ifdef GNUNET_EXTRA_LOGGING
3057 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3060 /* parallel gradecasts */
3061 for (lead = 0; lead < n; lead++)
3062 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3064 task = ((struct TaskEntry) {
3065 .step = step_rep_end,
3066 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3067 .start = task_start_apply_round,
3069 put_task (session->taskmap, &task);
3071 prev_step = step_rep_end;
3074 /* There is no next gradecast round, thus the final
3075 start step is the overall end step of the gradecasts */
3077 step = create_step (session, round, GNUNET_NO);
3078 #ifdef GNUNET_EXTRA_LOGGING
3079 GNUNET_asprintf (&step->debug_name, "finish");
3081 step_depend_on (step, prev_step);
3083 task = ((struct TaskEntry) {
3085 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3086 .start = task_start_finish,
3088 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3090 put_task (session->taskmap, &task);
3096 * Check join message.
3098 * @param cls session of client that sent the message
3099 * @param m message sent by the client
3100 * @return #GNUNET_OK if @a m is well-formed
3103 check_client_join (void *cls,
3104 const struct GNUNET_CONSENSUS_JoinMessage *m)
3106 uint32_t listed_peers = ntohl (m->num_peers);
3108 if ( (ntohs (m->header.size) - sizeof (*m)) !=
3109 listed_peers * sizeof (struct GNUNET_PeerIdentity))
3112 return GNUNET_SYSERR;
3119 * Called when a client wants to join a consensus session.
3121 * @param cls session of client that sent the message
3122 * @param m message sent by the client
3125 handle_client_join (void *cls,
3126 const struct GNUNET_CONSENSUS_JoinMessage *m)
3128 struct ConsensusSession *session = cls;
3129 struct ConsensusSession *other_session;
3131 initialize_session_peer_list (session,
3133 compute_global_id (session,
3136 /* Check if some local client already owns the session.
3137 It is only legal to have a session with an existing global id
3138 if all other sessions with this global id are finished.*/
3139 for (other_session = sessions_head;
3140 NULL != other_session;
3141 other_session = other_session->next)
3143 if ( (other_session != session) &&
3144 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3145 &other_session->global_id)) )
3149 session->conclude_deadline
3150 = GNUNET_TIME_absolute_ntoh (m->deadline);
3151 session->conclude_start
3152 = GNUNET_TIME_absolute_ntoh (m->start);
3153 session->local_peer_idx = get_peer_idx (&my_peer,
3155 GNUNET_assert (-1 != session->local_peer_idx);
3157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3158 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3159 GNUNET_h2s (&m->session_id),
3161 session->local_peer_idx,
3162 GNUNET_STRINGS_relative_time_to_string
3163 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
3164 session->conclude_deadline),
3167 session->set_listener
3168 = GNUNET_SET_listen (cfg,
3169 GNUNET_SET_OPERATION_UNION,
3170 &session->global_id,
3174 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
3176 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
3178 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
3180 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
3184 struct SetEntry *client_set;
3186 client_set = GNUNET_new (struct SetEntry);
3187 client_set->h = GNUNET_SET_create (cfg,
3188 GNUNET_SET_OPERATION_UNION);
3189 struct SetHandle *sh = GNUNET_new (struct SetHandle);
3190 sh->h = client_set->h;
3191 GNUNET_CONTAINER_DLL_insert (session->set_handles_head,
3192 session->set_handles_tail,
3194 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3199 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3202 /* Just construct the task graph,
3203 but don't run anything until the client calls conclude. */
3204 construct_task_graph (session);
3205 GNUNET_SERVICE_client_continue (session->client);
3210 client_insert_done (void *cls)
3217 * Called when a client performs an insert operation.
3219 * @param cls client handle
3220 * @param msg message sent by the client
3221 * @return #GNUNET_OK (always well-formed)
3224 check_client_insert (void *cls,
3225 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3232 * Called when a client performs an insert operation.
3234 * @param cls client handle
3235 * @param msg message sent by the client
3238 handle_client_insert (void *cls,
3239 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3241 struct ConsensusSession *session = cls;
3242 ssize_t element_size;
3243 struct GNUNET_SET_Handle *initial_set;
3244 struct ConsensusElement *ce;
3246 if (GNUNET_YES == session->conclude_started)
3249 GNUNET_SERVICE_client_drop (session->client);
3253 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3254 ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3255 GNUNET_memcpy (&ce[1], &msg[1], element_size);
3256 ce->payload_type = msg->element_type;
3258 struct GNUNET_SET_Element element = {
3259 .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
3260 .size = sizeof (struct ConsensusElement) + element_size,
3265 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3266 struct SetEntry *entry;
3268 entry = lookup_set (session,
3270 GNUNET_assert (NULL != entry);
3271 initial_set = entry->h;
3274 session->num_client_insert_pending++;
3275 GNUNET_SET_add_element (initial_set,
3277 &client_insert_done,
3280 #ifdef GNUNET_EXTRA_LOGGING
3282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3283 "P%u: element %s added\n",
3284 session->local_peer_idx,
3285 debug_str_element (&element));
3289 GNUNET_SERVICE_client_continue (session->client);
3294 * Called when a client performs the conclude operation.
3296 * @param cls client handle
3297 * @param message message sent by the client
3300 handle_client_conclude (void *cls,
3301 const struct GNUNET_MessageHeader *message)
3303 struct ConsensusSession *session = cls;
3305 if (GNUNET_YES == session->conclude_started)
3307 /* conclude started twice */
3309 GNUNET_SERVICE_client_drop (session->client);
3312 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3313 "conclude requested\n");
3314 session->conclude_started = GNUNET_YES;
3315 install_step_timeouts (session);
3316 run_ready_steps (session);
3317 GNUNET_SERVICE_client_continue (session->client);
3322 * Called to clean up, after a shutdown has been requested.
3324 * @param cls closure
3327 shutdown_task (void *cls)
3329 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3331 GNUNET_STATISTICS_destroy (statistics,
3338 * Start processing consensus requests.
3340 * @param cls closure
3341 * @param c configuration to use
3342 * @param service the initialized service
3346 const struct GNUNET_CONFIGURATION_Handle *c,
3347 struct GNUNET_SERVICE_Handle *service)
3351 GNUNET_CRYPTO_get_peer_identity (cfg,
3354 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3355 "Could not retrieve host identity\n");
3356 GNUNET_SCHEDULER_shutdown ();
3359 statistics = GNUNET_STATISTICS_create ("consensus",
3361 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3367 * Callback called when a client connects to the service.
3369 * @param cls closure for the service
3370 * @param c the new client that connected to the service
3371 * @param mq the message queue used to send messages to the client
3375 client_connect_cb (void *cls,
3376 struct GNUNET_SERVICE_Client *c,
3377 struct GNUNET_MQ_Handle *mq)
3379 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3381 session->client = c;
3382 session->client_mq = mq;
3383 GNUNET_CONTAINER_DLL_insert (sessions_head,
3391 * Callback called when a client disconnected from the service
3393 * @param cls closure for the service
3394 * @param c the client that disconnected
3395 * @param internal_cls should be equal to @a c
3398 client_disconnect_cb (void *cls,
3399 struct GNUNET_SERVICE_Client *c,
3402 struct ConsensusSession *session = internal_cls;
3404 if (NULL != session->set_listener)
3406 GNUNET_SET_listen_cancel (session->set_listener);
3407 session->set_listener = NULL;
3409 GNUNET_CONTAINER_DLL_remove (sessions_head,
3413 while (session->set_handles_head)
3415 struct SetHandle *sh = session->set_handles_head;
3416 session->set_handles_head = sh->next;
3417 GNUNET_SET_destroy (sh->h);
3420 GNUNET_free (session);
3425 * Define "main" method using service macro.
3429 GNUNET_SERVICE_OPTION_NONE,
3432 &client_disconnect_cb,
3434 GNUNET_MQ_hd_fixed_size (client_conclude,
3435 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3436 struct GNUNET_MessageHeader,
3438 GNUNET_MQ_hd_var_size (client_insert,
3439 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3440 struct GNUNET_CONSENSUS_ElementMessage,
3442 GNUNET_MQ_hd_var_size (client_join,
3443 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3444 struct GNUNET_CONSENSUS_JoinMessage,
3446 GNUNET_MQ_handler_end ());
3448 /* end of gnunet-service-consensus.c */