2 This file is part of GNUnet
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
48 * Vote that an element should be added.
52 * Vote that an element should be removed.
58 enum EarlyStoppingPhase
60 EARLY_STOPPING_NONE = 0,
61 EARLY_STOPPING_ONE_MORE = 1,
62 EARLY_STOPPING_DONE = 2,
66 GNUNET_NETWORK_STRUCT_BEGIN
69 struct ContestedPayload
74 * Tuple of integers that together
75 * identify a task uniquely.
79 * A value from 'enum PhaseKind'.
81 uint16_t kind GNUNET_PACKED;
84 * Number of the first peer
87 int16_t peer1 GNUNET_PACKED;
90 * Number of the second peer in canonical order.
92 int16_t peer2 GNUNET_PACKED;
95 * Repetition of the gradecast phase.
97 int16_t repetition GNUNET_PACKED;
100 * Leader in the gradecast phase.
102 * Can be different from both peer1 and peer2.
104 int16_t leader GNUNET_PACKED;
111 int set_kind GNUNET_PACKED;
112 int k1 GNUNET_PACKED;
113 int k2 GNUNET_PACKED;
120 struct GNUNET_SET_Handle *h;
122 * GNUNET_YES if the set resulted
123 * from applying a referendum with contested
132 int diff_kind GNUNET_PACKED;
133 int k1 GNUNET_PACKED;
134 int k2 GNUNET_PACKED;
139 int rfn_kind GNUNET_PACKED;
140 int k1 GNUNET_PACKED;
141 int k2 GNUNET_PACKED;
145 GNUNET_NETWORK_STRUCT_END
149 PHASE_KIND_ALL_TO_ALL,
150 PHASE_KIND_GRADECAST_LEADER,
151 PHASE_KIND_GRADECAST_ECHO,
152 PHASE_KIND_GRADECAST_ECHO_GRADE,
153 PHASE_KIND_GRADECAST_CONFIRM,
154 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
156 * Apply a repetition of the all-to-all
157 * gradecast to the current set.
159 PHASE_KIND_APPLY_REP,
169 * Last result set from a gradecast
171 SET_KIND_LAST_GRADECAST,
172 SET_KIND_LEADER_PROPOSAL,
173 SET_KIND_ECHO_RESULT,
179 DIFF_KIND_LEADER_PROPOSAL,
180 DIFF_KIND_LEADER_CONSENSUS,
181 DIFF_KIND_GRADECAST_RESULT,
189 RFN_KIND_GRADECAST_RESULT
195 struct SetKey input_set;
197 struct SetKey output_set;
198 struct RfnKey output_rfn;
199 struct DiffKey output_diff;
203 int transceive_contested;
205 struct GNUNET_SET_OperationHandle *op;
211 struct SetKey input_set;
215 * Closure for both @a start_task
216 * and @a cancel_task.
220 struct SetOpCls setop;
221 struct FinishCls finish;
226 typedef void (*TaskFunc) (struct TaskEntry *task);
229 * Node in the consensus task graph.
244 union TaskFuncCls cls;
251 * All steps of one session are in a
252 * linked list for easier deallocation.
257 * All steps of one session are in a
258 * linked list for easier deallocation.
262 struct ConsensusSession *session;
265 * Tasks that this step is composed of.
267 struct TaskEntry **tasks;
268 unsigned int tasks_len;
269 unsigned int tasks_cap;
271 unsigned int finished_tasks;
274 * Tasks that have this task as dependency.
276 * We store pointers to subordinates rather
277 * than to prerequisites since it makes
278 * tracking the readiness of a task easier.
280 struct Step **subordinates;
281 unsigned int subordinates_len;
282 unsigned int subordinates_cap;
285 * Counter for the prerequisites of
288 size_t pending_prereq;
291 * Task that will run this step despite
292 * any pending prerequisites.
294 struct GNUNET_SCHEDULER_Task *timeout_task;
296 unsigned int is_running;
298 unsigned int is_finished;
301 * Synchrony round of the task.
302 * Determines the deadline for the task.
307 * Human-readable name for
308 * the task, used for debugging.
313 * When we're doing an early finish, how should this step be
315 * If GNUNET_YES, the step will be marked as finished
316 * without actually running its tasks.
317 * Otherwise, the step will still be run even after
320 * Note that a task may never be finished early if
321 * it is already running.
323 int early_finishable;
327 struct RfnElementInfo
329 const struct GNUNET_SET_Element *element;
332 * GNUNET_YES if the peer votes for the proposal.
337 * Proposal for this element,
338 * can only be VOTE_ADD or VOTE_REMOVE.
340 enum ReferendumVote proposal;
344 struct ReferendumEntry
349 * Elements where there is at least one proposed change.
351 * Maps the hash of the GNUNET_SET_Element
352 * to 'struct RfnElementInfo'.
354 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
356 unsigned int num_peers;
359 * Stores, for every peer in the session,
360 * whether the peer finished the whole referendum.
362 * Votes from peers are only counted if they're
363 * marked as commited (#GNUNET_YES) in the referendum.
365 * Otherwise (#GNUNET_NO), the requested changes are
366 * not counted for majority votes or thresholds.
372 * Contestation state of the peer. If a peer is contested, the values it
373 * contributed are still counted for applying changes, but the grading is
380 struct DiffElementInfo
382 const struct GNUNET_SET_Element *element;
385 * Positive weight for 'add', negative
386 * weights for 'remove'.
398 struct GNUNET_CONTAINER_MultiHashMap *changes;
404 * A consensus session consists of one local client and the remote authorities.
406 struct ConsensusSession
409 * Consensus sessions are kept in a DLL.
411 struct ConsensusSession *next;
414 * Consensus sessions are kept in a DLL.
416 struct ConsensusSession *prev;
418 unsigned int num_client_insert_pending;
420 struct GNUNET_CONTAINER_MultiHashMap *setmap;
421 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
425 * Array of peers with length 'num_peers'.
427 int *peers_blacklisted;
430 * Mapping from (hashed) TaskKey to TaskEntry.
432 * We map the application_id for a round to the task that should be
433 * executed, so we don't have to go through all task whenever we get
434 * an incoming set op request.
436 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
438 struct Step *steps_head;
439 struct Step *steps_tail;
441 int conclude_started;
446 * Global consensus identification, computed
447 * from the session id and participating authorities.
449 struct GNUNET_HashCode global_id;
452 * Client that inhabits the session
454 struct GNUNET_SERVICE_Client *client;
457 * Queued messages to the client.
459 struct GNUNET_MQ_Handle *client_mq;
462 * Time when the conclusion of the consensus should begin.
464 struct GNUNET_TIME_Absolute conclude_start;
467 * Timeout for all rounds together, single rounds will schedule a timeout task
468 * with a fraction of the conclude timeout.
469 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
471 struct GNUNET_TIME_Absolute conclude_deadline;
473 struct GNUNET_PeerIdentity *peers;
476 * Number of other peers in the consensus.
478 unsigned int num_peers;
481 * Index of the local peer in the peers array
483 unsigned int local_peer_idx;
486 * Listener for requests from other peers.
487 * Uses the session's global id as app id.
489 struct GNUNET_SET_ListenHandle *set_listener;
492 * State of our early stopping scheme.
498 * Linked list of sessions this peer participates in.
500 static struct ConsensusSession *sessions_head;
503 * Linked list of sessions this peer participates in.
505 static struct ConsensusSession *sessions_tail;
508 * Configuration of the consensus service.
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
513 * Peer that runs this service.
515 static struct GNUNET_PeerIdentity my_peer;
520 struct GNUNET_STATISTICS_Handle *statistics;
524 finish_task (struct TaskEntry *task);
528 run_ready_steps (struct ConsensusSession *session);
532 phasename (uint16_t phase)
536 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
537 case PHASE_KIND_FINISH: return "FINISH";
538 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
539 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
540 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
541 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
542 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
543 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
544 default: return "(unknown)";
550 setname (uint16_t kind)
554 case SET_KIND_CURRENT: return "CURRENT";
555 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
556 case SET_KIND_NONE: return "NONE";
557 default: return "(unknown)";
562 rfnname (uint16_t kind)
566 case RFN_KIND_NONE: return "NONE";
567 case RFN_KIND_ECHO: return "ECHO";
568 case RFN_KIND_CONFIRM: return "CONFIRM";
569 default: return "(unknown)";
574 diffname (uint16_t kind)
578 case DIFF_KIND_NONE: return "NONE";
579 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
580 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
581 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
582 default: return "(unknown)";
586 #ifdef GNUNET_EXTRA_LOGGING
590 debug_str_element (const struct GNUNET_SET_Element *el)
592 struct GNUNET_HashCode hash;
594 GNUNET_SET_element_hash (el, &hash);
596 return GNUNET_h2s (&hash);
600 debug_str_task_key (struct TaskKey *tk)
602 static char buf[256];
604 snprintf (buf, sizeof (buf),
605 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
606 phasename (tk->kind), tk->peer1, tk->peer2,
607 tk->leader, tk->repetition);
613 debug_str_diff_key (struct DiffKey *dk)
615 static char buf[256];
617 snprintf (buf, sizeof (buf),
618 "DiffKey kind=%s, k1=%d, k2=%d",
619 diffname (dk->diff_kind), dk->k1, dk->k2);
625 debug_str_set_key (const struct SetKey *sk)
627 static char buf[256];
629 snprintf (buf, sizeof (buf),
630 "SetKey kind=%s, k1=%d, k2=%d",
631 setname (sk->set_kind), sk->k1, sk->k2);
638 debug_str_rfn_key (const struct RfnKey *rk)
640 static char buf[256];
642 snprintf (buf, sizeof (buf),
643 "RfnKey kind=%s, k1=%d, k2=%d",
644 rfnname (rk->rfn_kind), rk->k1, rk->k2);
649 #endif /* GNUNET_EXTRA_LOGGING */
653 * Send the final result set of the consensus to the client, element by
657 * @param element the current element, NULL if all elements have been
659 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
662 send_to_client_iter (void *cls,
663 const struct GNUNET_SET_Element *element)
665 struct TaskEntry *task = (struct TaskEntry *) cls;
666 struct ConsensusSession *session = task->step->session;
667 struct GNUNET_MQ_Envelope *ev;
671 struct GNUNET_CONSENSUS_ElementMessage *m;
673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674 "P%d: sending element %s to client\n",
675 session->local_peer_idx,
676 debug_str_element (element));
678 ev = GNUNET_MQ_msg_extra (m, element->size,
679 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
680 m->element_type = htons (element->element_type);
681 GNUNET_memcpy (&m[1], element->data, element->size);
682 GNUNET_MQ_send (session->client_mq, ev);
686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687 "P%d: finished iterating elements for client\n",
688 session->local_peer_idx);
689 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
690 GNUNET_MQ_send (session->client_mq, ev);
696 static struct SetEntry *
697 lookup_set (struct ConsensusSession *session, struct SetKey *key)
699 struct GNUNET_HashCode hash;
701 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702 "P%u: looking up set {%s}\n",
703 session->local_peer_idx,
704 debug_str_set_key (key));
706 GNUNET_assert (SET_KIND_NONE != key->set_kind);
707 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
708 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
712 static struct DiffEntry *
713 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
715 struct GNUNET_HashCode hash;
717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718 "P%u: looking up diff {%s}\n",
719 session->local_peer_idx,
720 debug_str_diff_key (key));
722 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
723 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
724 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
728 static struct ReferendumEntry *
729 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
731 struct GNUNET_HashCode hash;
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "P%u: looking up rfn {%s}\n",
735 session->local_peer_idx,
736 debug_str_rfn_key (key));
738 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
739 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
740 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
745 diff_insert (struct DiffEntry *diff,
747 const struct GNUNET_SET_Element *element)
749 struct DiffElementInfo *di;
750 struct GNUNET_HashCode hash;
752 GNUNET_assert ( (1 == weight) || (-1 == weight));
754 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
755 "diff_insert with element size %u\n",
758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759 "hashing element\n");
761 GNUNET_SET_element_hash (element, &hash);
763 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
770 di = GNUNET_new (struct DiffElementInfo);
771 di->element = GNUNET_SET_element_dup (element);
772 GNUNET_assert (GNUNET_OK ==
773 GNUNET_CONTAINER_multihashmap_put (diff->changes,
775 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
783 rfn_commit (struct ReferendumEntry *rfn,
784 uint16_t commit_peer)
786 GNUNET_assert (commit_peer < rfn->num_peers);
788 rfn->peer_commited[commit_peer] = GNUNET_YES;
793 rfn_contest (struct ReferendumEntry *rfn,
794 uint16_t contested_peer)
796 GNUNET_assert (contested_peer < rfn->num_peers);
798 rfn->peer_contested[contested_peer] = GNUNET_YES;
803 rfn_noncontested (struct ReferendumEntry *rfn)
809 for (i = 0; i < rfn->num_peers; i++)
810 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
818 rfn_vote (struct ReferendumEntry *rfn,
819 uint16_t voting_peer,
820 enum ReferendumVote vote,
821 const struct GNUNET_SET_Element *element)
823 struct RfnElementInfo *ri;
824 struct GNUNET_HashCode hash;
826 GNUNET_assert (voting_peer < rfn->num_peers);
828 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
829 since VOTE_KEEP is implicit in not voting. */
830 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
832 GNUNET_SET_element_hash (element, &hash);
833 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
837 ri = GNUNET_new (struct RfnElementInfo);
838 ri->element = GNUNET_SET_element_dup (element);
839 ri->votes = GNUNET_new_array (rfn->num_peers, int);
840 GNUNET_assert (GNUNET_OK ==
841 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
843 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
846 ri->votes[voting_peer] = GNUNET_YES;
852 task_other_peer (struct TaskEntry *task)
854 uint16_t me = task->step->session->local_peer_idx;
855 if (task->key.peer1 == me)
856 return task->key.peer2;
857 return task->key.peer1;
862 * Callback for set operation results. Called for each element
866 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
867 * @param status see enum GNUNET_SET_Status
870 set_result_cb (void *cls,
871 const struct GNUNET_SET_Element *element,
872 enum GNUNET_SET_Status status)
874 struct TaskEntry *task = cls;
875 struct ConsensusSession *session = task->step->session;
876 struct SetEntry *output_set = NULL;
877 struct DiffEntry *output_diff = NULL;
878 struct ReferendumEntry *output_rfn = NULL;
879 unsigned int other_idx;
880 struct SetOpCls *setop;
882 setop = &task->cls.setop;
885 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886 "P%u: got set result for {%s}, status %u\n",
887 session->local_peer_idx,
888 debug_str_task_key (&task->key),
891 if (GNUNET_NO == task->is_started)
897 if (GNUNET_YES == task->is_finished)
903 other_idx = task_other_peer (task);
905 if (SET_KIND_NONE != setop->output_set.set_kind)
907 output_set = lookup_set (session, &setop->output_set);
908 GNUNET_assert (NULL != output_set);
911 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
913 output_diff = lookup_diff (session, &setop->output_diff);
914 GNUNET_assert (NULL != output_diff);
917 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
919 output_rfn = lookup_rfn (session, &setop->output_rfn);
920 GNUNET_assert (NULL != output_rfn);
923 if (GNUNET_YES == session->peers_blacklisted[other_idx])
925 /* Peer might have been blacklisted
926 by a gradecast running in parallel, ignore elements from now */
927 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
929 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
933 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
935 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
937 GNUNET_assert (NULL != output_rfn);
938 rfn_contest (output_rfn, task_other_peer (task));
945 case GNUNET_SET_STATUS_ADD_LOCAL:
946 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947 "Adding element in Task {%s}\n",
948 debug_str_task_key (&task->key));
949 if (NULL != output_set)
951 // FIXME: record pending adds, use callback
952 GNUNET_SET_add_element (output_set->h,
956 #ifdef GNUNET_EXTRA_LOGGING
957 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
958 "P%u: adding element %s into set {%s} of task {%s}\n",
959 session->local_peer_idx,
960 debug_str_element (element),
961 debug_str_set_key (&setop->output_set),
962 debug_str_task_key (&task->key));
965 if (NULL != output_diff)
967 diff_insert (output_diff, 1, element);
968 #ifdef GNUNET_EXTRA_LOGGING
969 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
970 "P%u: adding element %s into diff {%s} of task {%s}\n",
971 session->local_peer_idx,
972 debug_str_element (element),
973 debug_str_diff_key (&setop->output_diff),
974 debug_str_task_key (&task->key));
977 if (NULL != output_rfn)
979 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
980 #ifdef GNUNET_EXTRA_LOGGING
981 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
982 "P%u: adding element %s into rfn {%s} of task {%s}\n",
983 session->local_peer_idx,
984 debug_str_element (element),
985 debug_str_rfn_key (&setop->output_rfn),
986 debug_str_task_key (&task->key));
989 // XXX: add result to structures in task
991 case GNUNET_SET_STATUS_ADD_REMOTE:
992 if (GNUNET_YES == setop->do_not_remove)
994 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997 "Removing element in Task {%s}\n",
998 debug_str_task_key (&task->key));
999 if (NULL != output_set)
1001 // FIXME: record pending adds, use callback
1002 GNUNET_SET_remove_element (output_set->h,
1006 #ifdef GNUNET_EXTRA_LOGGING
1007 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1008 "P%u: removing element %s from set {%s} of task {%s}\n",
1009 session->local_peer_idx,
1010 debug_str_element (element),
1011 debug_str_set_key (&setop->output_set),
1012 debug_str_task_key (&task->key));
1015 if (NULL != output_diff)
1017 diff_insert (output_diff, -1, element);
1018 #ifdef GNUNET_EXTRA_LOGGING
1019 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1020 "P%u: removing element %s from diff {%s} of task {%s}\n",
1021 session->local_peer_idx,
1022 debug_str_element (element),
1023 debug_str_diff_key (&setop->output_diff),
1024 debug_str_task_key (&task->key));
1027 if (NULL != output_rfn)
1029 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1030 #ifdef GNUNET_EXTRA_LOGGING
1031 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1032 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1033 session->local_peer_idx,
1034 debug_str_element (element),
1035 debug_str_rfn_key (&setop->output_rfn),
1036 debug_str_task_key (&task->key));
1040 case GNUNET_SET_STATUS_DONE:
1041 // XXX: check first if any changes to the underlying
1042 // set are still pending
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "Finishing setop in Task {%s}\n",
1045 debug_str_task_key (&task->key));
1046 if (NULL != output_rfn)
1048 rfn_commit (output_rfn, task_other_peer (task));
1052 case GNUNET_SET_STATUS_FAILURE:
1054 GNUNET_break_op (0);
1074 enum EvilnessSubType
1077 EVILNESS_SUB_REPLACEMENT,
1078 EVILNESS_SUB_NO_REPLACEMENT,
1083 enum EvilnessType type;
1084 enum EvilnessSubType subtype;
1090 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1092 if (0 == strcmp ("replace", evil_subtype_str))
1094 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1096 else if (0 == strcmp ("noreplace", evil_subtype_str))
1098 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1102 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1103 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1105 return GNUNET_SYSERR;
1112 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1116 char *evil_type_str = NULL;
1117 char *evil_subtype_str = NULL;
1119 GNUNET_assert (NULL != evil);
1121 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124 "P%u: no evilness\n",
1125 session->local_peer_idx);
1126 evil->type = EVILNESS_NONE;
1129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130 "P%u: got evilness spec\n",
1131 session->local_peer_idx);
1133 for (field = strtok (evil_spec, "/");
1135 field = strtok (NULL, "/"))
1137 unsigned int peer_num;
1138 unsigned int evil_num;
1141 evil_type_str = NULL;
1142 evil_subtype_str = NULL;
1144 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1148 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1149 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1155 GNUNET_assert (NULL != evil_type_str);
1156 GNUNET_assert (NULL != evil_subtype_str);
1158 if (peer_num == session->local_peer_idx)
1160 if (0 == strcmp ("slack", evil_type_str))
1162 evil->type = EVILNESS_SLACK;
1164 else if (0 == strcmp ("cram-all", evil_type_str))
1166 evil->type = EVILNESS_CRAM_ALL;
1167 evil->num = evil_num;
1168 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1171 else if (0 == strcmp ("cram-lead", evil_type_str))
1173 evil->type = EVILNESS_CRAM_LEAD;
1174 evil->num = evil_num;
1175 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1178 else if (0 == strcmp ("cram-echo", evil_type_str))
1180 evil->type = EVILNESS_CRAM_ECHO;
1181 evil->num = evil_num;
1182 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1188 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1194 /* No GNUNET_free since memory was allocated by libc */
1195 free (evil_type_str);
1196 evil_type_str = NULL;
1197 evil_subtype_str = NULL;
1200 evil->type = EVILNESS_NONE;
1202 GNUNET_free (evil_spec);
1203 /* no GNUNET_free_non_null since it wasn't
1204 * allocated with GNUNET_malloc */
1205 if (NULL != evil_type_str)
1206 free (evil_type_str);
1207 if (NULL != evil_subtype_str)
1208 free (evil_subtype_str);
1215 * Commit the appropriate set for a
1219 commit_set (struct ConsensusSession *session,
1220 struct TaskEntry *task)
1222 struct SetEntry *set;
1223 struct SetOpCls *setop = &task->cls.setop;
1225 GNUNET_assert (NULL != setop->op);
1226 set = lookup_set (session, &setop->input_set);
1227 GNUNET_assert (NULL != set);
1232 struct Evilness evil;
1234 get_evilness (session, &evil);
1235 if (EVILNESS_NONE != evil.type)
1237 /* Useful for evaluation */
1238 GNUNET_STATISTICS_set (statistics,
1245 case EVILNESS_CRAM_ALL:
1246 case EVILNESS_CRAM_LEAD:
1247 case EVILNESS_CRAM_ECHO:
1248 /* We're not cramming elements in the
1249 all-to-all round, since that would just
1250 add more elements to the result set, but
1251 wouldn't test robustness. */
1252 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1254 GNUNET_SET_commit (setop->op, set->h);
1257 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1258 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1260 GNUNET_SET_commit (setop->op, set->h);
1263 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1265 GNUNET_SET_commit (setop->op, set->h);
1268 for (i = 0; i < evil.num; i++)
1270 struct GNUNET_HashCode hash;
1271 struct GNUNET_SET_Element element;
1272 element.data = &hash;
1273 element.size = sizeof (struct GNUNET_HashCode);
1274 element.element_type = 0;
1276 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1278 /* Always generate a new element. */
1279 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1281 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1283 /* Always cram the same elements, derived from counter. */
1284 GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1290 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1291 #ifdef GNUNET_EXTRA_LOGGING
1292 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1293 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1294 session->local_peer_idx,
1295 debug_str_element (&element),
1296 debug_str_set_key (&setop->input_set),
1297 debug_str_task_key (&task->key));
1300 GNUNET_STATISTICS_update (statistics,
1301 "# stuffed elements",
1304 GNUNET_SET_commit (setop->op, set->h);
1306 case EVILNESS_SLACK:
1307 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1308 "P%u: evil peer: slacking\n",
1309 (unsigned int) session->local_peer_idx);
1313 GNUNET_SET_commit (setop->op, set->h);
1318 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1320 struct GNUNET_SET_Element element;
1321 struct ContestedPayload payload;
1322 element.data = &payload;
1323 element.size = sizeof (struct ContestedPayload);
1324 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1325 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1327 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1329 GNUNET_SET_commit (setop->op, set->h);
1333 /* For our testcases, we don't want the blacklisted
1335 GNUNET_SET_operation_cancel (setop->op);
1343 put_diff (struct ConsensusSession *session,
1344 struct DiffEntry *diff)
1346 struct GNUNET_HashCode hash;
1348 GNUNET_assert (NULL != diff);
1350 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1351 GNUNET_assert (GNUNET_OK ==
1352 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1353 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1357 put_set (struct ConsensusSession *session,
1358 struct SetEntry *set)
1360 struct GNUNET_HashCode hash;
1362 GNUNET_assert (NULL != set->h);
1364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366 debug_str_set_key (&set->key));
1368 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1369 GNUNET_assert (GNUNET_SYSERR !=
1370 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1371 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1376 put_rfn (struct ConsensusSession *session,
1377 struct ReferendumEntry *rfn)
1379 struct GNUNET_HashCode hash;
1381 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1382 GNUNET_assert (GNUNET_OK ==
1383 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1384 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1390 task_cancel_reconcile (struct TaskEntry *task)
1392 /* not implemented yet */
1398 apply_diff_to_rfn (struct DiffEntry *diff,
1399 struct ReferendumEntry *rfn,
1400 uint16_t voting_peer,
1403 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1404 struct DiffElementInfo *di;
1406 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1408 while (GNUNET_YES ==
1409 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1411 (const void **) &di))
1415 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1419 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1423 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1430 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1432 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1439 diff_compose (struct DiffEntry *diff_1,
1440 struct DiffEntry *diff_2)
1442 struct DiffEntry *diff_new;
1443 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1444 struct DiffElementInfo *di;
1446 diff_new = diff_create ();
1448 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1449 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1451 diff_insert (diff_new, di->weight, di->element);
1453 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1455 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1456 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1458 diff_insert (diff_new, di->weight, di->element);
1460 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1466 struct ReferendumEntry *
1467 rfn_create (uint16_t size)
1469 struct ReferendumEntry *rfn;
1471 rfn = GNUNET_new (struct ReferendumEntry);
1472 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1473 rfn->peer_commited = GNUNET_new_array (size, int);
1474 rfn->peer_contested = GNUNET_new_array (size, int);
1475 rfn->num_peers = size;
1483 diff_destroy (struct DiffEntry *diff)
1485 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1492 * For a given majority, count what the outcome
1493 * is (add/remove/keep), and give the number
1494 * of peers that voted for this outcome.
1497 rfn_majority (const struct ReferendumEntry *rfn,
1498 const struct RfnElementInfo *ri,
1499 uint16_t *ret_majority,
1500 enum ReferendumVote *ret_vote)
1502 uint16_t votes_yes = 0;
1503 uint16_t num_commited = 0;
1506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1507 "Computing rfn majority for element %s of rfn {%s}\n",
1508 debug_str_element (ri->element),
1509 debug_str_rfn_key (&rfn->key));
1511 for (i = 0; i < rfn->num_peers; i++)
1513 if (GNUNET_NO == rfn->peer_commited[i])
1517 if (GNUNET_YES == ri->votes[i])
1521 if (votes_yes > (num_commited) / 2)
1523 *ret_vote = ri->proposal;
1524 *ret_majority = votes_yes;
1528 *ret_vote = VOTE_STAY;
1529 *ret_majority = num_commited - votes_yes;
1536 struct TaskEntry *task;
1537 struct SetKey dst_set_key;
1542 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1544 struct SetCopyCls *scc = cls;
1545 struct TaskEntry *task = scc->task;
1546 struct SetKey dst_set_key = scc->dst_set_key;
1547 struct SetEntry *set;
1550 set = GNUNET_new (struct SetEntry);
1552 set->key = dst_set_key;
1553 put_set (task->step->session, set);
1560 * Call the start function of the given
1561 * task again after we created a copy of the given set.
1564 create_set_copy_for_task (struct TaskEntry *task,
1565 struct SetKey *src_set_key,
1566 struct SetKey *dst_set_key)
1568 struct SetEntry *src_set;
1569 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1571 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572 "Copying set {%s} to {%s} for task {%s}\n",
1573 debug_str_set_key (src_set_key),
1574 debug_str_set_key (dst_set_key),
1575 debug_str_task_key (&task->key));
1578 scc->dst_set_key = *dst_set_key;
1579 src_set = lookup_set (task->step->session, src_set_key);
1580 GNUNET_assert (NULL != src_set);
1581 GNUNET_SET_copy_lazy (src_set->h,
1587 struct SetMutationProgressCls
1591 * Task to finish once all changes are through.
1593 struct TaskEntry *task;
1598 set_mutation_done (void *cls)
1600 struct SetMutationProgressCls *pc = cls;
1602 GNUNET_assert (pc->num_pending > 0);
1606 if (0 == pc->num_pending)
1608 struct TaskEntry *task = pc->task;
1616 try_finish_step_early (struct Step *step)
1620 if (GNUNET_YES == step->is_running)
1622 if (GNUNET_YES == step->is_finished)
1624 if (GNUNET_NO == step->early_finishable)
1627 step->is_finished = GNUNET_YES;
1629 #ifdef GNUNET_EXTRA_LOGGING
1630 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1631 "Finishing step `%s' early.\n",
1635 for (i = 0; i < step->subordinates_len; i++)
1637 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1638 step->subordinates[i]->pending_prereq--;
1639 #ifdef GNUNET_EXTRA_LOGGING
1640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641 "Decreased pending_prereq to %u for step `%s'.\n",
1642 (unsigned int) step->subordinates[i]->pending_prereq,
1643 step->subordinates[i]->debug_name);
1646 try_finish_step_early (step->subordinates[i]);
1649 // XXX: maybe schedule as task to avoid recursion?
1650 run_ready_steps (step->session);
1655 finish_step (struct Step *step)
1659 GNUNET_assert (step->finished_tasks == step->tasks_len);
1660 GNUNET_assert (GNUNET_YES == step->is_running);
1661 GNUNET_assert (GNUNET_NO == step->is_finished);
1663 #ifdef GNUNET_EXTRA_LOGGING
1664 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665 "All tasks of step `%s' with %u subordinates finished.\n",
1667 step->subordinates_len);
1670 for (i = 0; i < step->subordinates_len; i++)
1672 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1673 step->subordinates[i]->pending_prereq--;
1674 #ifdef GNUNET_EXTRA_LOGGING
1675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1676 "Decreased pending_prereq to %u for step `%s'.\n",
1677 (unsigned int) step->subordinates[i]->pending_prereq,
1678 step->subordinates[i]->debug_name);
1683 step->is_finished = GNUNET_YES;
1685 // XXX: maybe schedule as task to avoid recursion?
1686 run_ready_steps (step->session);
1692 * Apply the result from one round of gradecasts (i.e. every peer
1693 * should have gradecasted) to the peer's current set.
1695 * @param task the task with context information
1698 task_start_apply_round (struct TaskEntry *task)
1700 struct ConsensusSession *session = task->step->session;
1701 struct SetKey sk_in;
1702 struct SetKey sk_out;
1703 struct RfnKey rk_in;
1704 struct SetEntry *set_out;
1705 struct ReferendumEntry *rfn_in;
1706 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1707 struct RfnElementInfo *ri;
1708 struct SetMutationProgressCls *progress_cls;
1709 uint16_t worst_majority = UINT16_MAX;
1711 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1712 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1713 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1715 set_out = lookup_set (session, &sk_out);
1716 if (NULL == set_out)
1718 create_set_copy_for_task (task, &sk_in, &sk_out);
1722 rfn_in = lookup_rfn (session, &rk_in);
1723 GNUNET_assert (NULL != rfn_in);
1725 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1726 progress_cls->task = task;
1728 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1730 while (GNUNET_YES ==
1731 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1733 (const void **) &ri))
1735 uint16_t majority_num;
1736 enum ReferendumVote majority_vote;
1738 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1740 if (worst_majority > majority_num)
1741 worst_majority = majority_num;
1743 switch (majority_vote)
1746 progress_cls->num_pending++;
1747 GNUNET_assert (GNUNET_OK ==
1748 GNUNET_SET_add_element (set_out->h,
1752 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1753 "P%u: apply round: adding element %s with %u-majority.\n",
1754 session->local_peer_idx,
1755 debug_str_element (ri->element), majority_num);
1758 progress_cls->num_pending++;
1759 GNUNET_assert (GNUNET_OK ==
1760 GNUNET_SET_remove_element (set_out->h,
1764 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1765 "P%u: apply round: deleting element %s with %u-majority.\n",
1766 session->local_peer_idx,
1767 debug_str_element (ri->element), majority_num);
1770 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1771 "P%u: apply round: keeping element %s with %u-majority.\n",
1772 session->local_peer_idx,
1773 debug_str_element (ri->element), majority_num);
1782 if (0 == progress_cls->num_pending)
1784 // call closure right now, no pending ops
1785 GNUNET_free (progress_cls);
1790 uint16_t thresh = (session->num_peers / 3) * 2;
1792 if (worst_majority >= thresh)
1794 switch (session->early_stopping)
1796 case EARLY_STOPPING_NONE:
1797 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1798 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1799 "P%u: Stopping early (after one more superround)\n",
1800 session->local_peer_idx);
1802 case EARLY_STOPPING_ONE_MORE:
1803 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1804 session->local_peer_idx);
1805 session->early_stopping = EARLY_STOPPING_DONE;
1808 for (step = session->steps_head; NULL != step; step = step->next)
1809 try_finish_step_early (step);
1812 case EARLY_STOPPING_DONE:
1813 /* We shouldn't be here anymore after early stopping */
1821 else if (EARLY_STOPPING_NONE != session->early_stopping)
1823 // Our assumption about the number of bad peers
1825 GNUNET_break_op (0);
1829 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1830 session->local_peer_idx);
1833 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1838 task_start_grade (struct TaskEntry *task)
1840 struct ConsensusSession *session = task->step->session;
1841 struct ReferendumEntry *output_rfn;
1842 struct ReferendumEntry *input_rfn;
1843 struct DiffEntry *input_diff;
1844 struct RfnKey rfn_key;
1845 struct DiffKey diff_key;
1846 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1847 struct RfnElementInfo *ri;
1848 unsigned int gradecast_confidence = 2;
1850 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1851 output_rfn = lookup_rfn (session, &rfn_key);
1852 if (NULL == output_rfn)
1854 output_rfn = rfn_create (session->num_peers);
1855 output_rfn->key = rfn_key;
1856 put_rfn (session, output_rfn);
1859 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1860 input_diff = lookup_diff (session, &diff_key);
1861 GNUNET_assert (NULL != input_diff);
1863 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1864 input_rfn = lookup_rfn (session, &rfn_key);
1865 GNUNET_assert (NULL != input_rfn);
1867 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1869 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1871 while (GNUNET_YES ==
1872 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1874 (const void **) &ri))
1876 uint16_t majority_num;
1877 enum ReferendumVote majority_vote;
1879 // XXX: we need contested votes and non-contested votes here
1880 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1882 if (majority_num <= session->num_peers / 3)
1883 majority_vote = VOTE_REMOVE;
1885 switch (majority_vote)
1890 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1893 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1900 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1903 uint16_t noncontested;
1904 noncontested = rfn_noncontested (input_rfn);
1905 if (noncontested < (session->num_peers / 3) * 2)
1907 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1909 if (noncontested < (session->num_peers / 3) + 1)
1911 gradecast_confidence = 0;
1915 if (gradecast_confidence >= 1)
1916 rfn_commit (output_rfn, task->key.leader);
1918 if (gradecast_confidence <= 1)
1919 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1926 task_start_reconcile (struct TaskEntry *task)
1928 struct SetEntry *input;
1929 struct SetOpCls *setop = &task->cls.setop;
1930 struct ConsensusSession *session = task->step->session;
1932 input = lookup_set (session, &setop->input_set);
1933 GNUNET_assert (NULL != input);
1934 GNUNET_assert (NULL != input->h);
1936 /* We create the outputs for the operation here
1937 (rather than in the set operation callback)
1938 because we want something valid in there, even
1939 if the other peer doesn't talk to us */
1941 if (SET_KIND_NONE != setop->output_set.set_kind)
1943 /* If we don't have an existing output set,
1944 we clone the input set. */
1945 if (NULL == lookup_set (session, &setop->output_set))
1947 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1952 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1954 if (NULL == lookup_rfn (session, &setop->output_rfn))
1956 struct ReferendumEntry *rfn;
1958 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959 "P%u: output rfn <%s> missing, creating.\n",
1960 session->local_peer_idx,
1961 debug_str_rfn_key (&setop->output_rfn));
1963 rfn = rfn_create (session->num_peers);
1964 rfn->key = setop->output_rfn;
1965 put_rfn (session, rfn);
1969 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1971 if (NULL == lookup_diff (session, &setop->output_diff))
1973 struct DiffEntry *diff;
1975 diff = diff_create ();
1976 diff->key = setop->output_diff;
1977 put_diff (session, diff);
1981 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
1983 /* XXX: mark the corresponding rfn as commited if necessary */
1988 if (task->key.peer1 == session->local_peer_idx)
1990 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993 "P%u: Looking up set {%s} to run remote union\n",
1994 session->local_peer_idx,
1995 debug_str_set_key (&setop->input_set));
1997 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1998 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2000 rcm.kind = htons (task->key.kind);
2001 rcm.peer1 = htons (task->key.peer1);
2002 rcm.peer2 = htons (task->key.peer2);
2003 rcm.leader = htons (task->key.leader);
2004 rcm.repetition = htons (task->key.repetition);
2006 GNUNET_assert (NULL == setop->op);
2007 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2008 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2010 // XXX: maybe this should be done while
2011 // setting up tasks alreays?
2012 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2013 &session->global_id,
2015 GNUNET_SET_RESULT_SYMMETRIC,
2019 commit_set (session, task);
2021 else if (task->key.peer2 == session->local_peer_idx)
2023 /* Wait for the other peer to contact us */
2024 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2025 session->local_peer_idx, task->key.peer1);
2027 if (NULL != setop->op)
2029 commit_set (session, task);
2034 /* We made an error while constructing the task graph. */
2041 task_start_eval_echo (struct TaskEntry *task)
2043 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2044 struct ReferendumEntry *input_rfn;
2045 struct RfnElementInfo *ri;
2046 struct SetEntry *output_set;
2047 struct SetMutationProgressCls *progress_cls;
2048 struct ConsensusSession *session = task->step->session;
2049 struct SetKey sk_in;
2050 struct SetKey sk_out;
2051 struct RfnKey rk_in;
2053 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2054 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2055 output_set = lookup_set (session, &sk_out);
2056 if (NULL == output_set)
2058 create_set_copy_for_task (task, &sk_in, &sk_out);
2064 // FIXME: should be marked as a shallow copy, so
2065 // we can destroy everything correctly
2066 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2067 last_set->h = output_set->h;
2068 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2069 put_set (session, last_set);
2072 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2073 "Evaluating referendum in Task {%s}\n",
2074 debug_str_task_key (&task->key));
2076 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2077 progress_cls->task = task;
2079 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2080 input_rfn = lookup_rfn (session, &rk_in);
2082 GNUNET_assert (NULL != input_rfn);
2084 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2085 GNUNET_assert (NULL != iter);
2087 while (GNUNET_YES ==
2088 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2090 (const void **) &ri))
2092 enum ReferendumVote majority_vote;
2093 uint16_t majority_num;
2095 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2097 if (majority_num < session->num_peers / 3)
2099 /* It is not the case that all nonfaulty peers
2100 echoed the same value. Since we're doing a set reconciliation, we
2101 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2102 reconciliation as contested. Other peers might not know that the
2103 leader is faulty, thus we still re-distribute in the confirmation
2105 output_set->is_contested = GNUNET_YES;
2108 switch (majority_vote)
2111 progress_cls->num_pending++;
2112 GNUNET_assert (GNUNET_OK ==
2113 GNUNET_SET_add_element (output_set->h,
2119 progress_cls->num_pending++;
2120 GNUNET_assert (GNUNET_OK ==
2121 GNUNET_SET_remove_element (output_set->h,
2127 /* Nothing to do. */
2135 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2137 if (0 == progress_cls->num_pending)
2139 // call closure right now, no pending ops
2140 GNUNET_free (progress_cls);
2147 task_start_finish (struct TaskEntry *task)
2149 struct SetEntry *final_set;
2150 struct ConsensusSession *session = task->step->session;
2152 final_set = lookup_set (session, &task->cls.finish.input_set);
2154 GNUNET_assert (NULL != final_set);
2157 GNUNET_SET_iterate (final_set->h,
2158 send_to_client_iter,
2163 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2165 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2167 GNUNET_assert (GNUNET_NO == task->is_started);
2168 GNUNET_assert (GNUNET_NO == task->is_finished);
2169 GNUNET_assert (NULL != task->start);
2173 task->is_started = GNUNET_YES;
2180 * Run all steps of the session that don't any
2181 * more dependencies.
2184 run_ready_steps (struct ConsensusSession *session)
2188 step = session->steps_head;
2190 while (NULL != step)
2192 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2196 GNUNET_assert (0 == step->finished_tasks);
2198 #ifdef GNUNET_EXTRA_LOGGING
2199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2200 session->local_peer_idx,
2202 step->round, step->tasks_len, step->subordinates_len);
2205 step->is_running = GNUNET_YES;
2206 for (i = 0; i < step->tasks_len; i++)
2207 start_task (session, step->tasks[i]);
2209 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2210 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2213 /* Running the next ready steps will be triggered by task completion */
2225 finish_task (struct TaskEntry *task)
2227 GNUNET_assert (GNUNET_NO == task->is_finished);
2228 task->is_finished = GNUNET_YES;
2230 task->step->finished_tasks++;
2232 if (task->step->finished_tasks == task->step->tasks_len)
2233 finish_step (task->step);
2238 * Search peer in the list of peers in session.
2240 * @param peer peer to find
2241 * @param session session with peer
2242 * @return index of peer, -1 if peer is not in session
2245 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2248 for (i = 0; i < session->num_peers; i++)
2249 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2256 * Compute a global, (hopefully) unique consensus session id,
2257 * from the local id of the consensus session, and the identities of all participants.
2258 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2259 * exactly the same peers, the global id will be different.
2261 * @param session session to generate the global id for
2262 * @param local_session_id local id of the consensus session
2265 compute_global_id (struct ConsensusSession *session,
2266 const struct GNUNET_HashCode *local_session_id)
2268 const char *salt = "gnunet-service-consensus/session_id";
2270 GNUNET_assert (GNUNET_YES ==
2271 GNUNET_CRYPTO_kdf (&session->global_id,
2272 sizeof (struct GNUNET_HashCode),
2276 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2278 sizeof (struct GNUNET_HashCode),
2284 * Compare two peer identities.
2286 * @param h1 some peer identity
2287 * @param h2 some peer identity
2288 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2291 peer_id_cmp (const void *h1, const void *h2)
2293 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2298 * Create the sorted list of peers for the session,
2299 * add the local peer if not in the join message.
2301 * @param session session to initialize
2302 * @param join_msg join message with the list of peers participating at the end
2305 initialize_session_peer_list (struct ConsensusSession *session,
2306 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2308 const struct GNUNET_PeerIdentity *msg_peers
2309 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2310 int local_peer_in_list;
2312 session->num_peers = ntohl (join_msg->num_peers);
2314 /* Peers in the join message, may or may not include the local peer,
2315 Add it if it is missing. */
2316 local_peer_in_list = GNUNET_NO;
2317 for (unsigned int i = 0; i < session->num_peers; i++)
2319 if (0 == memcmp (&msg_peers[i],
2321 sizeof (struct GNUNET_PeerIdentity)))
2323 local_peer_in_list = GNUNET_YES;
2327 if (GNUNET_NO == local_peer_in_list)
2328 session->num_peers++;
2330 session->peers = GNUNET_new_array (session->num_peers,
2331 struct GNUNET_PeerIdentity);
2332 if (GNUNET_NO == local_peer_in_list)
2333 session->peers[session->num_peers - 1] = my_peer;
2335 GNUNET_memcpy (session->peers,
2337 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2338 qsort (session->peers,
2340 sizeof (struct GNUNET_PeerIdentity),
2345 static struct TaskEntry *
2346 lookup_task (struct ConsensusSession *session,
2347 struct TaskKey *key)
2349 struct GNUNET_HashCode hash;
2352 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2354 GNUNET_h2s (&hash));
2355 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2360 * Called when another peer wants to do a set operation with the
2363 * @param cls closure
2364 * @param other_peer the other peer
2365 * @param context_msg message with application specific information from
2367 * @param request request from the other peer, use GNUNET_SET_accept
2368 * to accept it, otherwise the request will be refused
2369 * Note that we don't use a return value here, as it is also
2370 * necessary to specify the set we want to do the operation with,
2371 * whith sometimes can be derived from the context message.
2372 * Also necessary to specify the timeout.
2375 set_listen_cb (void *cls,
2376 const struct GNUNET_PeerIdentity *other_peer,
2377 const struct GNUNET_MessageHeader *context_msg,
2378 struct GNUNET_SET_Request *request)
2380 struct ConsensusSession *session = cls;
2382 struct TaskEntry *task;
2383 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2385 if (NULL == context_msg)
2387 GNUNET_break_op (0);
2391 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2393 GNUNET_break_op (0);
2397 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2399 GNUNET_break_op (0);
2403 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2405 tk = ((struct TaskKey) {
2406 .kind = ntohs (cm->kind),
2407 .peer1 = ntohs (cm->peer1),
2408 .peer2 = ntohs (cm->peer2),
2409 .repetition = ntohs (cm->repetition),
2410 .leader = ntohs (cm->leader),
2413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2414 session->local_peer_idx, debug_str_task_key (&tk));
2416 task = lookup_task (session, &tk);
2420 GNUNET_break_op (0);
2424 if (GNUNET_YES == task->is_finished)
2426 GNUNET_break_op (0);
2430 if (task->key.peer2 != session->local_peer_idx)
2432 /* We're being asked, so we must be thne 2nd peer. */
2433 GNUNET_break_op (0);
2437 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2438 (task->key.peer2 == session->local_peer_idx)));
2440 task->cls.setop.op = GNUNET_SET_accept (request,
2441 GNUNET_SET_RESULT_SYMMETRIC,
2445 /* If the task hasn't been started yet,
2446 we wait for that until we commit. */
2448 if (GNUNET_YES == task->is_started)
2450 commit_set (session, task);
2457 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2458 struct TaskEntry *t)
2460 struct GNUNET_HashCode round_hash;
2463 GNUNET_assert (NULL != t->step);
2465 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2469 if (s->tasks_len == s->tasks_cap)
2471 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2472 GNUNET_array_grow (s->tasks,
2477 #ifdef GNUNET_EXTRA_LOGGING
2478 GNUNET_assert (NULL != s->debug_name);
2479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2480 debug_str_task_key (&t->key),
2484 s->tasks[s->tasks_len] = t;
2487 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2488 GNUNET_assert (GNUNET_OK ==
2489 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2490 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2495 install_step_timeouts (struct ConsensusSession *session)
2497 /* Given the fully constructed task graph
2498 with rounds for tasks, we can give the tasks timeouts. */
2500 // unsigned int max_round;
2502 /* XXX: implement! */
2508 * Arrange two peers in some canonical order.
2511 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2516 GNUNET_assert (*p1 < n);
2517 GNUNET_assert (*p2 < n);
2530 /* For uniformly random *p1, *p2,
2531 this condition is true with 50% chance */
2532 if (((b - a) + n) % n <= n / 2)
2546 * Record @a dep as a dependency of @a step.
2549 step_depend_on (struct Step *step, struct Step *dep)
2551 /* We're not checking for cyclic dependencies,
2552 but this is a cheap sanity check. */
2553 GNUNET_assert (step != dep);
2554 GNUNET_assert (NULL != step);
2555 GNUNET_assert (NULL != dep);
2556 GNUNET_assert (dep->round <= step->round);
2558 #ifdef GNUNET_EXTRA_LOGGING
2559 /* Make sure we have complete debugging information.
2560 Also checks that we don't screw up too badly
2561 constructing the task graph. */
2562 GNUNET_assert (NULL != step->debug_name);
2563 GNUNET_assert (NULL != dep->debug_name);
2564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2565 "Making step `%s' depend on `%s'\n",
2570 if (dep->subordinates_cap == dep->subordinates_len)
2572 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2573 GNUNET_array_grow (dep->subordinates,
2574 dep->subordinates_cap,
2578 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2580 dep->subordinates[dep->subordinates_len] = step;
2581 dep->subordinates_len++;
2583 step->pending_prereq++;
2587 static struct Step *
2588 create_step (struct ConsensusSession *session, int round, int early_finishable)
2591 step = GNUNET_new (struct Step);
2592 step->session = session;
2593 step->round = round;
2594 step->early_finishable = early_finishable;
2595 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2596 session->steps_tail,
2603 * Construct the task graph for a single
2607 construct_task_graph_gradecast (struct ConsensusSession *session,
2610 struct Step *step_before,
2611 struct Step *step_after)
2613 uint16_t n = session->num_peers;
2614 uint16_t me = session->local_peer_idx;
2619 /* The task we're currently setting up. */
2620 struct TaskEntry task;
2623 struct Step *prev_step;
2629 round = step_before->round + 1;
2631 /* gcast step 1: leader disseminates */
2633 step = create_step (session, round, GNUNET_YES);
2635 #ifdef GNUNET_EXTRA_LOGGING
2636 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2638 step_depend_on (step, step_before);
2642 for (k = 0; k < n; k++)
2648 arrange_peers (&p1, &p2, n);
2649 task = ((struct TaskEntry) {
2651 .start = task_start_reconcile,
2652 .cancel = task_cancel_reconcile,
2653 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2655 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2656 put_task (session->taskmap, &task);
2658 /* We run this task to make sure that the leader
2659 has the stored the SET_KIND_LEADER set of himself,
2660 so he can participate in the rest of the gradecast
2661 without the code having to handle any special cases. */
2662 task = ((struct TaskEntry) {
2664 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2665 .start = task_start_reconcile,
2666 .cancel = task_cancel_reconcile,
2668 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2669 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2670 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2671 put_task (session->taskmap, &task);
2677 arrange_peers (&p1, &p2, n);
2678 task = ((struct TaskEntry) {
2680 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2681 .start = task_start_reconcile,
2682 .cancel = task_cancel_reconcile,
2684 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2685 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2686 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2687 put_task (session->taskmap, &task);
2690 /* gcast phase 2: echo */
2693 step = create_step (session, round, GNUNET_YES);
2694 #ifdef GNUNET_EXTRA_LOGGING
2695 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2697 step_depend_on (step, prev_step);
2699 for (k = 0; k < n; k++)
2703 arrange_peers (&p1, &p2, n);
2704 task = ((struct TaskEntry) {
2706 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2707 .start = task_start_reconcile,
2708 .cancel = task_cancel_reconcile,
2710 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2711 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2712 put_task (session->taskmap, &task);
2716 /* Same round, since step only has local tasks */
2717 step = create_step (session, round, GNUNET_YES);
2718 #ifdef GNUNET_EXTRA_LOGGING
2719 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2721 step_depend_on (step, prev_step);
2723 arrange_peers (&p1, &p2, n);
2724 task = ((struct TaskEntry) {
2725 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2727 .start = task_start_eval_echo
2729 put_task (session->taskmap, &task);
2733 step = create_step (session, round, GNUNET_YES);
2734 #ifdef GNUNET_EXTRA_LOGGING
2735 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2737 step_depend_on (step, prev_step);
2739 /* gcast phase 3: confirmation and grading */
2740 for (k = 0; k < n; k++)
2744 arrange_peers (&p1, &p2, n);
2745 task = ((struct TaskEntry) {
2747 .start = task_start_reconcile,
2748 .cancel = task_cancel_reconcile,
2749 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2751 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2752 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2753 /* If there was at least one element in the echo round that was
2754 contested (i.e. it had no n-t majority), then we let the other peers
2755 know, and other peers let us know. The contested flag for each peer is
2756 stored in the rfn. */
2757 task.cls.setop.transceive_contested = GNUNET_YES;
2758 put_task (session->taskmap, &task);
2762 /* Same round, since step only has local tasks */
2763 step = create_step (session, round, GNUNET_YES);
2764 #ifdef GNUNET_EXTRA_LOGGING
2765 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2767 step_depend_on (step, prev_step);
2769 task = ((struct TaskEntry) {
2771 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2772 .start = task_start_grade,
2774 put_task (session->taskmap, &task);
2776 step_depend_on (step_after, step);
2781 construct_task_graph (struct ConsensusSession *session)
2783 uint16_t n = session->num_peers;
2786 uint16_t me = session->local_peer_idx;
2788 /* The task we're currently setting up. */
2789 struct TaskEntry task;
2791 /* Current leader */
2795 struct Step *prev_step;
2797 unsigned int round = 0;
2801 // XXX: introduce first step,
2802 // where we wait for all insert acks
2803 // from the set service
2805 /* faster but brittle all-to-all */
2807 // XXX: Not implemented yet
2809 /* all-to-all step */
2811 step = create_step (session, round, GNUNET_NO);
2813 #ifdef GNUNET_EXTRA_LOGGING
2814 step->debug_name = GNUNET_strdup ("all to all");
2817 for (i = 0; i < n; i++)
2824 arrange_peers (&p1, &p2, n);
2825 task = ((struct TaskEntry) {
2826 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2828 .start = task_start_reconcile,
2829 .cancel = task_cancel_reconcile,
2831 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2832 task.cls.setop.output_set = task.cls.setop.input_set;
2833 task.cls.setop.do_not_remove = GNUNET_YES;
2834 put_task (session->taskmap, &task);
2842 /* Byzantine union */
2844 /* sequential repetitions of the gradecasts */
2845 for (i = 0; i < t + 1; i++)
2847 struct Step *step_rep_start;
2848 struct Step *step_rep_end;
2850 /* Every repetition is in a separate round. */
2851 step_rep_start = create_step (session, round, GNUNET_YES);
2852 #ifdef GNUNET_EXTRA_LOGGING
2853 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2856 step_depend_on (step_rep_start, prev_step);
2858 /* gradecast has three rounds */
2860 step_rep_end = create_step (session, round, GNUNET_YES);
2861 #ifdef GNUNET_EXTRA_LOGGING
2862 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2865 /* parallel gradecasts */
2866 for (lead = 0; lead < n; lead++)
2867 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2869 task = ((struct TaskEntry) {
2870 .step = step_rep_end,
2871 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2872 .start = task_start_apply_round,
2874 put_task (session->taskmap, &task);
2876 prev_step = step_rep_end;
2879 /* There is no next gradecast round, thus the final
2880 start step is the overall end step of the gradecasts */
2882 step = create_step (session, round, GNUNET_NO);
2883 #ifdef GNUNET_EXTRA_LOGGING
2884 GNUNET_asprintf (&step->debug_name, "finish");
2886 step_depend_on (step, prev_step);
2888 task = ((struct TaskEntry) {
2890 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2891 .start = task_start_finish,
2893 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2895 put_task (session->taskmap, &task);
2901 * Check join message.
2903 * @param cls session of client that sent the message
2904 * @param m message sent by the client
2905 * @return #GNUNET_OK if @a m is well-formed
2908 check_client_join (void *cls,
2909 const struct GNUNET_CONSENSUS_JoinMessage *m)
2911 uint32_t listed_peers = ntohl (m->num_peers);
2913 if ( (ntohs (m->header.size) - sizeof (*m)) !=
2914 listed_peers * sizeof (struct GNUNET_PeerIdentity))
2917 return GNUNET_SYSERR;
2924 * Called when a client wants to join a consensus session.
2926 * @param cls session of client that sent the message
2927 * @param m message sent by the client
2930 handle_client_join (void *cls,
2931 const struct GNUNET_CONSENSUS_JoinMessage *m)
2933 struct ConsensusSession *session = cls;
2934 struct ConsensusSession *other_session;
2936 initialize_session_peer_list (session,
2938 compute_global_id (session,
2941 /* Check if some local client already owns the session.
2942 It is only legal to have a session with an existing global id
2943 if all other sessions with this global id are finished.*/
2944 for (other_session = sessions_head;
2945 NULL != other_session;
2946 other_session = other_session->next)
2948 if ( (other_session != session) &&
2949 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
2950 &other_session->global_id)) )
2954 session->conclude_deadline
2955 = GNUNET_TIME_absolute_ntoh (m->deadline);
2956 session->conclude_start
2957 = GNUNET_TIME_absolute_ntoh (m->start);
2958 session->local_peer_idx = get_peer_idx (&my_peer,
2960 GNUNET_assert (-1 != session->local_peer_idx);
2962 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2963 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
2964 GNUNET_h2s (&m->session_id),
2966 session->local_peer_idx,
2967 GNUNET_STRINGS_relative_time_to_string
2968 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
2969 session->conclude_deadline),
2972 session->set_listener
2973 = GNUNET_SET_listen (cfg,
2974 GNUNET_SET_OPERATION_UNION,
2975 &session->global_id,
2979 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
2981 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
2983 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
2985 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
2989 struct SetEntry *client_set;
2991 client_set = GNUNET_new (struct SetEntry);
2992 client_set->h = GNUNET_SET_create (cfg,
2993 GNUNET_SET_OPERATION_UNION);
2994 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2999 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3002 /* Just construct the task graph,
3003 but don't run anything until the client calls conclude. */
3004 construct_task_graph (session);
3005 GNUNET_SERVICE_client_continue (session->client);
3010 client_insert_done (void *cls)
3017 * Called when a client performs an insert operation.
3019 * @param cls client handle
3020 * @param msg message sent by the client
3021 * @return #GNUNET_OK (always well-formed)
3024 check_client_insert (void *cls,
3025 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3032 * Called when a client performs an insert operation.
3034 * @param cls client handle
3035 * @param msg message sent by the client
3038 handle_client_insert (void *cls,
3039 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3041 struct ConsensusSession *session = cls;
3042 struct GNUNET_SET_Element *element;
3043 ssize_t element_size;
3044 struct GNUNET_SET_Handle *initial_set;
3046 if (GNUNET_YES == session->conclude_started)
3049 GNUNET_SERVICE_client_drop (session->client);
3052 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3053 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3054 element->element_type = msg->element_type;
3055 element->size = element_size;
3056 GNUNET_memcpy (&element[1], &msg[1], element_size);
3057 element->data = &element[1];
3059 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3060 struct SetEntry *entry;
3062 entry = lookup_set (session,
3064 GNUNET_assert (NULL != entry);
3065 initial_set = entry->h;
3067 session->num_client_insert_pending++;
3068 GNUNET_SET_add_element (initial_set,
3070 &client_insert_done,
3073 #ifdef GNUNET_EXTRA_LOGGING
3075 struct GNUNET_HashCode hash;
3077 GNUNET_SET_element_hash (element,
3080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3081 "P%u: element %s added\n",
3082 session->local_peer_idx,
3083 GNUNET_h2s (&hash));
3086 GNUNET_free (element);
3087 GNUNET_SERVICE_client_continue (session->client);
3092 * Called when a client performs the conclude operation.
3094 * @param cls client handle
3095 * @param message message sent by the client
3098 handle_client_conclude (void *cls,
3099 const struct GNUNET_MessageHeader *message)
3101 struct ConsensusSession *session = cls;
3103 if (GNUNET_YES == session->conclude_started)
3105 /* conclude started twice */
3107 GNUNET_SERVICE_client_drop (session->client);
3110 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3111 "conclude requested\n");
3112 session->conclude_started = GNUNET_YES;
3113 install_step_timeouts (session);
3114 run_ready_steps (session);
3115 GNUNET_SERVICE_client_continue (session->client);
3120 * Called to clean up, after a shutdown has been requested.
3122 * @param cls closure
3125 shutdown_task (void *cls)
3127 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3129 GNUNET_STATISTICS_destroy (statistics,
3136 * Start processing consensus requests.
3138 * @param cls closure
3139 * @param c configuration to use
3140 * @param service the initialized service
3144 const struct GNUNET_CONFIGURATION_Handle *c,
3145 struct GNUNET_SERVICE_Handle *service)
3149 GNUNET_CRYPTO_get_peer_identity (cfg,
3152 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3153 "Could not retrieve host identity\n");
3154 GNUNET_SCHEDULER_shutdown ();
3157 statistics = GNUNET_STATISTICS_create ("consensus",
3159 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3165 * Callback called when a client connects to the service.
3167 * @param cls closure for the service
3168 * @param c the new client that connected to the service
3169 * @param mq the message queue used to send messages to the client
3173 client_connect_cb (void *cls,
3174 struct GNUNET_SERVICE_Client *c,
3175 struct GNUNET_MQ_Handle *mq)
3177 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3179 session->client = c;
3180 session->client_mq = mq;
3181 GNUNET_CONTAINER_DLL_insert (sessions_head,
3189 * Callback called when a client disconnected from the service
3191 * @param cls closure for the service
3192 * @param c the client that disconnected
3193 * @param internal_cls should be equal to @a c
3196 client_disconnect_cb (void *cls,
3197 struct GNUNET_SERVICE_Client *c,
3200 struct ConsensusSession *session = internal_cls;
3202 if (NULL != session->set_listener)
3204 GNUNET_SET_listen_cancel (session->set_listener);
3205 session->set_listener = NULL;
3207 GNUNET_CONTAINER_DLL_remove (sessions_head,
3210 GNUNET_free (session);
3215 * Define "main" method using service macro.
3219 GNUNET_SERVICE_OPTION_NONE,
3222 &client_disconnect_cb,
3224 GNUNET_MQ_hd_fixed_size (client_conclude,
3225 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3226 struct GNUNET_MessageHeader,
3228 GNUNET_MQ_hd_var_size (client_insert,
3229 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3230 struct GNUNET_CONSENSUS_ElementMessage,
3232 GNUNET_MQ_hd_var_size (client_join,
3233 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3234 struct GNUNET_CONSENSUS_JoinMessage,
3236 GNUNET_MQ_handler_end ());
3238 /* end of gnunet-service-consensus.c */