2 This file is part of GNUnet
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
48 * Vote that an element should be added.
52 * Vote that an element should be removed.
58 enum EarlyStoppingPhase
60 EARLY_STOPPING_NONE = 0,
61 EARLY_STOPPING_ONE_MORE = 1,
62 EARLY_STOPPING_DONE = 2,
66 GNUNET_NETWORK_STRUCT_BEGIN
69 struct ContestedPayload
74 * Tuple of integers that together
75 * identify a task uniquely.
79 * A value from 'enum PhaseKind'.
81 uint16_t kind GNUNET_PACKED;
84 * Number of the first peer
87 int16_t peer1 GNUNET_PACKED;
90 * Number of the second peer in canonical order.
92 int16_t peer2 GNUNET_PACKED;
95 * Repetition of the gradecast phase.
97 int16_t repetition GNUNET_PACKED;
100 * Leader in the gradecast phase.
102 * Can be different from both peer1 and peer2.
104 int16_t leader GNUNET_PACKED;
111 int set_kind GNUNET_PACKED;
112 int k1 GNUNET_PACKED;
113 int k2 GNUNET_PACKED;
120 struct GNUNET_SET_Handle *h;
122 * GNUNET_YES if the set resulted
123 * from applying a referendum with contested
132 int diff_kind GNUNET_PACKED;
133 int k1 GNUNET_PACKED;
134 int k2 GNUNET_PACKED;
139 int rfn_kind GNUNET_PACKED;
140 int k1 GNUNET_PACKED;
141 int k2 GNUNET_PACKED;
145 GNUNET_NETWORK_STRUCT_END
149 PHASE_KIND_ALL_TO_ALL,
150 PHASE_KIND_GRADECAST_LEADER,
151 PHASE_KIND_GRADECAST_ECHO,
152 PHASE_KIND_GRADECAST_ECHO_GRADE,
153 PHASE_KIND_GRADECAST_CONFIRM,
154 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
156 * Apply a repetition of the all-to-all
157 * gradecast to the current set.
159 PHASE_KIND_APPLY_REP,
169 * Last result set from a gradecast
171 SET_KIND_LAST_GRADECAST,
172 SET_KIND_LEADER_PROPOSAL,
173 SET_KIND_ECHO_RESULT,
179 DIFF_KIND_LEADER_PROPOSAL,
180 DIFF_KIND_LEADER_CONSENSUS,
181 DIFF_KIND_GRADECAST_RESULT,
189 RFN_KIND_GRADECAST_RESULT
195 struct SetKey input_set;
197 struct SetKey output_set;
198 struct RfnKey output_rfn;
199 struct DiffKey output_diff;
203 int transceive_contested;
205 struct GNUNET_SET_OperationHandle *op;
211 struct SetKey input_set;
215 * Closure for both @a start_task
216 * and @a cancel_task.
220 struct SetOpCls setop;
221 struct FinishCls finish;
226 typedef void (*TaskFunc) (struct TaskEntry *task);
229 * Node in the consensus task graph.
244 union TaskFuncCls cls;
251 * All steps of one session are in a
252 * linked list for easier deallocation.
257 * All steps of one session are in a
258 * linked list for easier deallocation.
262 struct ConsensusSession *session;
265 * Tasks that this step is composed of.
267 struct TaskEntry **tasks;
268 unsigned int tasks_len;
269 unsigned int tasks_cap;
271 unsigned int finished_tasks;
274 * Tasks that have this task as dependency.
276 * We store pointers to subordinates rather
277 * than to prerequisites since it makes
278 * tracking the readiness of a task easier.
280 struct Step **subordinates;
281 unsigned int subordinates_len;
282 unsigned int subordinates_cap;
285 * Counter for the prerequisites of
288 size_t pending_prereq;
291 * Task that will run this step despite
292 * any pending prerequisites.
294 struct GNUNET_SCHEDULER_Task *timeout_task;
296 unsigned int is_running;
298 unsigned int is_finished;
301 * Synchrony round of the task.
302 * Determines the deadline for the task.
307 * Human-readable name for
308 * the task, used for debugging.
313 * When we're doing an early finish, how should this step be
315 * If GNUNET_YES, the step will be marked as finished
316 * without actually running its tasks.
317 * Otherwise, the step will still be run even after
320 * Note that a task may never be finished early if
321 * it is already running.
323 int early_finishable;
327 struct RfnElementInfo
329 const struct GNUNET_SET_Element *element;
332 * GNUNET_YES if the peer votes for the proposal.
337 * Proposal for this element,
338 * can only be VOTE_ADD or VOTE_REMOVE.
340 enum ReferendumVote proposal;
344 struct ReferendumEntry
349 * Elements where there is at least one proposed change.
351 * Maps the hash of the GNUNET_SET_Element
352 * to 'struct RfnElementInfo'.
354 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
356 unsigned int num_peers;
359 * Stores, for every peer in the session,
360 * whether the peer finished the whole referendum.
362 * Votes from peers are only counted if they're
363 * marked as commited (#GNUNET_YES) in the referendum.
365 * Otherwise (#GNUNET_NO), the requested changes are
366 * not counted for majority votes or thresholds.
372 * Contestation state of the peer. If a peer is contested, the values it
373 * contributed are still counted for applying changes, but the grading is
380 struct DiffElementInfo
382 const struct GNUNET_SET_Element *element;
385 * Positive weight for 'add', negative
386 * weights for 'remove'.
398 struct GNUNET_CONTAINER_MultiHashMap *changes;
404 * A consensus session consists of one local client and the remote authorities.
406 struct ConsensusSession
409 * Consensus sessions are kept in a DLL.
411 struct ConsensusSession *next;
414 * Consensus sessions are kept in a DLL.
416 struct ConsensusSession *prev;
418 unsigned int num_client_insert_pending;
420 struct GNUNET_CONTAINER_MultiHashMap *setmap;
421 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
425 * Array of peers with length 'num_peers'.
427 int *peers_blacklisted;
430 * Mapping from (hashed) TaskKey to TaskEntry.
432 * We map the application_id for a round to the task that should be
433 * executed, so we don't have to go through all task whenever we get
434 * an incoming set op request.
436 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
438 struct Step *steps_head;
439 struct Step *steps_tail;
441 int conclude_started;
446 * Global consensus identification, computed
447 * from the session id and participating authorities.
449 struct GNUNET_HashCode global_id;
452 * Client that inhabits the session
454 struct GNUNET_SERVER_Client *client;
457 * Queued messages to the client.
459 struct GNUNET_MQ_Handle *client_mq;
462 * Time when the conclusion of the consensus should begin.
464 struct GNUNET_TIME_Absolute conclude_start;
467 * Timeout for all rounds together, single rounds will schedule a timeout task
468 * with a fraction of the conclude timeout.
469 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
471 struct GNUNET_TIME_Absolute conclude_deadline;
473 struct GNUNET_PeerIdentity *peers;
476 * Number of other peers in the consensus.
478 unsigned int num_peers;
481 * Index of the local peer in the peers array
483 unsigned int local_peer_idx;
486 * Listener for requests from other peers.
487 * Uses the session's global id as app id.
489 struct GNUNET_SET_ListenHandle *set_listener;
492 * State of our early stopping scheme.
498 * Linked list of sessions this peer participates in.
500 static struct ConsensusSession *sessions_head;
503 * Linked list of sessions this peer participates in.
505 static struct ConsensusSession *sessions_tail;
508 * Configuration of the consensus service.
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
513 * Handle to the server for this service.
515 static struct GNUNET_SERVER_Handle *srv;
518 * Peer that runs this service.
520 static struct GNUNET_PeerIdentity my_peer;
525 struct GNUNET_STATISTICS_Handle *statistics;
529 finish_task (struct TaskEntry *task);
532 run_ready_steps (struct ConsensusSession *session);
535 phasename (uint16_t phase)
539 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
540 case PHASE_KIND_FINISH: return "FINISH";
541 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
542 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
543 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
545 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
546 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
547 default: return "(unknown)";
553 setname (uint16_t kind)
557 case SET_KIND_CURRENT: return "CURRENT";
558 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
559 case SET_KIND_NONE: return "NONE";
560 default: return "(unknown)";
565 rfnname (uint16_t kind)
569 case RFN_KIND_NONE: return "NONE";
570 case RFN_KIND_ECHO: return "ECHO";
571 case RFN_KIND_CONFIRM: return "CONFIRM";
572 default: return "(unknown)";
577 diffname (uint16_t kind)
581 case DIFF_KIND_NONE: return "NONE";
582 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
583 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
584 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585 default: return "(unknown)";
589 #ifdef GNUNET_EXTRA_LOGGING
593 debug_str_element (const struct GNUNET_SET_Element *el)
595 struct GNUNET_HashCode hash;
597 GNUNET_SET_element_hash (el, &hash);
599 return GNUNET_h2s (&hash);
603 debug_str_task_key (struct TaskKey *tk)
605 static char buf[256];
607 snprintf (buf, sizeof (buf),
608 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
609 phasename (tk->kind), tk->peer1, tk->peer2,
610 tk->leader, tk->repetition);
616 debug_str_diff_key (struct DiffKey *dk)
618 static char buf[256];
620 snprintf (buf, sizeof (buf),
621 "DiffKey kind=%s, k1=%d, k2=%d",
622 diffname (dk->diff_kind), dk->k1, dk->k2);
628 debug_str_set_key (const struct SetKey *sk)
630 static char buf[256];
632 snprintf (buf, sizeof (buf),
633 "SetKey kind=%s, k1=%d, k2=%d",
634 setname (sk->set_kind), sk->k1, sk->k2);
641 debug_str_rfn_key (const struct RfnKey *rk)
643 static char buf[256];
645 snprintf (buf, sizeof (buf),
646 "RfnKey kind=%s, k1=%d, k2=%d",
647 rfnname (rk->rfn_kind), rk->k1, rk->k2);
652 #endif /* GNUNET_EXTRA_LOGGING */
656 * Destroy a session, free all resources associated with it.
658 * @param session the session to destroy
661 destroy_session (struct ConsensusSession *session)
663 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664 if (NULL != session->set_listener)
666 GNUNET_SET_listen_cancel (session->set_listener);
667 session->set_listener = NULL;
669 if (NULL != session->client_mq)
671 GNUNET_MQ_destroy (session->client_mq);
672 session->client_mq = NULL;
673 /* The MQ cleanup will also disconnect the underlying client. */
674 session->client = NULL;
676 if (NULL != session->client)
678 GNUNET_SERVER_client_disconnect (session->client);
679 session->client = NULL;
681 GNUNET_free (session);
686 * Send the final result set of the consensus to the client, element by
690 * @param element the current element, NULL if all elements have been
692 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
695 send_to_client_iter (void *cls,
696 const struct GNUNET_SET_Element *element)
698 struct TaskEntry *task = (struct TaskEntry *) cls;
699 struct ConsensusSession *session = task->step->session;
700 struct GNUNET_MQ_Envelope *ev;
704 struct GNUNET_CONSENSUS_ElementMessage *m;
706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707 "P%d: sending element %s to client\n",
708 session->local_peer_idx,
709 debug_str_element (element));
711 ev = GNUNET_MQ_msg_extra (m, element->size,
712 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
713 m->element_type = htons (element->element_type);
714 memcpy (&m[1], element->data, element->size);
715 GNUNET_MQ_send (session->client_mq, ev);
719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720 "P%d: finished iterating elements for client\n",
721 session->local_peer_idx);
722 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
723 GNUNET_MQ_send (session->client_mq, ev);
729 static struct SetEntry *
730 lookup_set (struct ConsensusSession *session, struct SetKey *key)
732 struct GNUNET_HashCode hash;
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "P%u: looking up set {%s}\n",
736 session->local_peer_idx,
737 debug_str_set_key (key));
739 GNUNET_assert (SET_KIND_NONE != key->set_kind);
740 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
741 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
745 static struct DiffEntry *
746 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
748 struct GNUNET_HashCode hash;
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "P%u: looking up diff {%s}\n",
752 session->local_peer_idx,
753 debug_str_diff_key (key));
755 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
756 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
757 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
761 static struct ReferendumEntry *
762 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
764 struct GNUNET_HashCode hash;
766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767 "P%u: looking up rfn {%s}\n",
768 session->local_peer_idx,
769 debug_str_rfn_key (key));
771 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
772 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
773 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
778 diff_insert (struct DiffEntry *diff,
780 const struct GNUNET_SET_Element *element)
782 struct DiffElementInfo *di;
783 struct GNUNET_HashCode hash;
785 GNUNET_assert ( (1 == weight) || (-1 == weight));
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788 "diff_insert with element size %u\n",
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "hashing element\n");
794 GNUNET_SET_element_hash (element, &hash);
796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
803 di = GNUNET_new (struct DiffElementInfo);
804 di->element = GNUNET_SET_element_dup (element);
805 GNUNET_assert (GNUNET_OK ==
806 GNUNET_CONTAINER_multihashmap_put (diff->changes,
808 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
816 rfn_commit (struct ReferendumEntry *rfn,
817 uint16_t commit_peer)
819 GNUNET_assert (commit_peer < rfn->num_peers);
821 rfn->peer_commited[commit_peer] = GNUNET_YES;
826 rfn_contest (struct ReferendumEntry *rfn,
827 uint16_t contested_peer)
829 GNUNET_assert (contested_peer < rfn->num_peers);
831 rfn->peer_contested[contested_peer] = GNUNET_YES;
836 rfn_noncontested (struct ReferendumEntry *rfn)
842 for (i = 0; i < rfn->num_peers; i++)
843 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
851 rfn_vote (struct ReferendumEntry *rfn,
852 uint16_t voting_peer,
853 enum ReferendumVote vote,
854 const struct GNUNET_SET_Element *element)
856 struct RfnElementInfo *ri;
857 struct GNUNET_HashCode hash;
859 GNUNET_assert (voting_peer < rfn->num_peers);
861 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
862 since VOTE_KEEP is implicit in not voting. */
863 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
865 GNUNET_SET_element_hash (element, &hash);
866 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
870 ri = GNUNET_new (struct RfnElementInfo);
871 ri->element = GNUNET_SET_element_dup (element);
872 ri->votes = GNUNET_new_array (rfn->num_peers, int);
873 GNUNET_assert (GNUNET_OK ==
874 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
876 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
879 ri->votes[voting_peer] = GNUNET_YES;
885 task_other_peer (struct TaskEntry *task)
887 uint16_t me = task->step->session->local_peer_idx;
888 if (task->key.peer1 == me)
889 return task->key.peer2;
890 return task->key.peer1;
895 * Callback for set operation results. Called for each element
899 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
900 * @param status see enum GNUNET_SET_Status
903 set_result_cb (void *cls,
904 const struct GNUNET_SET_Element *element,
905 enum GNUNET_SET_Status status)
907 struct TaskEntry *task = cls;
908 struct ConsensusSession *session = task->step->session;
909 struct SetEntry *output_set = NULL;
910 struct DiffEntry *output_diff = NULL;
911 struct ReferendumEntry *output_rfn = NULL;
912 unsigned int other_idx;
913 struct SetOpCls *setop;
915 setop = &task->cls.setop;
918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919 "P%u: got set result for {%s}, status %u\n",
920 session->local_peer_idx,
921 debug_str_task_key (&task->key),
924 if (GNUNET_NO == task->is_started)
930 if (GNUNET_YES == task->is_finished)
936 other_idx = task_other_peer (task);
938 if (SET_KIND_NONE != setop->output_set.set_kind)
940 output_set = lookup_set (session, &setop->output_set);
941 GNUNET_assert (NULL != output_set);
944 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
946 output_diff = lookup_diff (session, &setop->output_diff);
947 GNUNET_assert (NULL != output_diff);
950 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
952 output_rfn = lookup_rfn (session, &setop->output_rfn);
953 GNUNET_assert (NULL != output_rfn);
956 if (GNUNET_YES == session->peers_blacklisted[other_idx])
958 /* Peer might have been blacklisted
959 by a gradecast running in parallel, ignore elements from now */
960 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
962 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
966 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
968 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
970 GNUNET_assert (NULL != output_rfn);
971 rfn_contest (output_rfn, task_other_peer (task));
978 case GNUNET_SET_STATUS_ADD_LOCAL:
979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980 "Adding element in Task {%s}\n",
981 debug_str_task_key (&task->key));
982 if (NULL != output_set)
984 // FIXME: record pending adds, use callback
985 GNUNET_SET_add_element (output_set->h,
989 #ifdef GNUNET_EXTRA_LOGGING
990 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
991 "P%u: adding element %s into set {%s} of task {%s}\n",
992 session->local_peer_idx,
993 debug_str_element (element),
994 debug_str_set_key (&setop->output_set),
995 debug_str_task_key (&task->key));
998 if (NULL != output_diff)
1000 diff_insert (output_diff, 1, element);
1001 #ifdef GNUNET_EXTRA_LOGGING
1002 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003 "P%u: adding element %s into diff {%s} of task {%s}\n",
1004 session->local_peer_idx,
1005 debug_str_element (element),
1006 debug_str_diff_key (&setop->output_diff),
1007 debug_str_task_key (&task->key));
1010 if (NULL != output_rfn)
1012 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1013 #ifdef GNUNET_EXTRA_LOGGING
1014 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1015 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1016 session->local_peer_idx,
1017 debug_str_element (element),
1018 debug_str_rfn_key (&setop->output_rfn),
1019 debug_str_task_key (&task->key));
1022 // XXX: add result to structures in task
1024 case GNUNET_SET_STATUS_ADD_REMOTE:
1025 if (GNUNET_YES == setop->do_not_remove)
1027 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "Removing element in Task {%s}\n",
1031 debug_str_task_key (&task->key));
1032 if (NULL != output_set)
1034 // FIXME: record pending adds, use callback
1035 GNUNET_SET_remove_element (output_set->h,
1039 #ifdef GNUNET_EXTRA_LOGGING
1040 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1041 "P%u: removing element %s from set {%s} of task {%s}\n",
1042 session->local_peer_idx,
1043 debug_str_element (element),
1044 debug_str_set_key (&setop->output_set),
1045 debug_str_task_key (&task->key));
1048 if (NULL != output_diff)
1050 diff_insert (output_diff, -1, element);
1051 #ifdef GNUNET_EXTRA_LOGGING
1052 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1053 "P%u: removing element %s from diff {%s} of task {%s}\n",
1054 session->local_peer_idx,
1055 debug_str_element (element),
1056 debug_str_diff_key (&setop->output_diff),
1057 debug_str_task_key (&task->key));
1060 if (NULL != output_rfn)
1062 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1063 #ifdef GNUNET_EXTRA_LOGGING
1064 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1065 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1066 session->local_peer_idx,
1067 debug_str_element (element),
1068 debug_str_rfn_key (&setop->output_rfn),
1069 debug_str_task_key (&task->key));
1073 case GNUNET_SET_STATUS_DONE:
1074 // XXX: check first if any changes to the underlying
1075 // set are still pending
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Finishing setop in Task {%s}\n",
1078 debug_str_task_key (&task->key));
1079 if (NULL != output_rfn)
1081 rfn_commit (output_rfn, task_other_peer (task));
1085 case GNUNET_SET_STATUS_FAILURE:
1087 GNUNET_break_op (0);
1107 enum EvilnessSubType
1110 EVILNESS_SUB_REPLACEMENT,
1111 EVILNESS_SUB_NO_REPLACEMENT,
1116 enum EvilnessType type;
1117 enum EvilnessSubType subtype;
1123 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1125 if (0 == strcmp ("replace", evil_subtype_str))
1127 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1129 else if (0 == strcmp ("noreplace", evil_subtype_str))
1131 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1135 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1138 return GNUNET_SYSERR;
1145 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1149 char *evil_type_str = NULL;
1150 char *evil_subtype_str = NULL;
1152 GNUNET_assert (NULL != evil);
1154 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "P%u: no evilness\n",
1158 session->local_peer_idx);
1159 evil->type = EVILNESS_NONE;
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "P%u: got evilness spec\n",
1164 session->local_peer_idx);
1166 for (field = strtok (evil_spec, "/");
1168 field = strtok (NULL, "/"))
1170 unsigned int peer_num;
1171 unsigned int evil_num;
1174 evil_type_str = NULL;
1175 evil_subtype_str = NULL;
1177 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1181 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1182 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1188 GNUNET_assert (NULL != evil_type_str);
1189 GNUNET_assert (NULL != evil_subtype_str);
1191 if (peer_num == session->local_peer_idx)
1193 if (0 == strcmp ("slack", evil_type_str))
1195 evil->type = EVILNESS_SLACK;
1197 else if (0 == strcmp ("cram-all", evil_type_str))
1199 evil->type = EVILNESS_CRAM_ALL;
1200 evil->num = evil_num;
1201 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1204 else if (0 == strcmp ("cram-lead", evil_type_str))
1206 evil->type = EVILNESS_CRAM_LEAD;
1207 evil->num = evil_num;
1208 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1211 else if (0 == strcmp ("cram-echo", evil_type_str))
1213 evil->type = EVILNESS_CRAM_ECHO;
1214 evil->num = evil_num;
1215 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1220 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1221 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1227 /* No GNUNET_free since memory was allocated by libc */
1228 free (evil_type_str);
1229 evil_type_str = NULL;
1230 evil_subtype_str = NULL;
1233 evil->type = EVILNESS_NONE;
1235 GNUNET_free (evil_spec);
1236 /* no GNUNET_free_non_null since it wasn't
1237 * allocated with GNUNET_malloc */
1238 if (NULL != evil_type_str)
1239 free (evil_type_str);
1240 if (NULL != evil_subtype_str)
1241 free (evil_subtype_str);
1248 * Commit the appropriate set for a
1252 commit_set (struct ConsensusSession *session,
1253 struct TaskEntry *task)
1255 struct SetEntry *set;
1256 struct SetOpCls *setop = &task->cls.setop;
1258 GNUNET_assert (NULL != setop->op);
1259 set = lookup_set (session, &setop->input_set);
1260 GNUNET_assert (NULL != set);
1265 struct Evilness evil;
1267 get_evilness (session, &evil);
1268 if (EVILNESS_NONE != evil.type)
1270 /* Useful for evaluation */
1271 GNUNET_STATISTICS_set (statistics,
1278 case EVILNESS_CRAM_ALL:
1279 case EVILNESS_CRAM_LEAD:
1280 case EVILNESS_CRAM_ECHO:
1281 /* We're not cramming elements in the
1282 all-to-all round, since that would just
1283 add more elements to the result set, but
1284 wouldn't test robustness. */
1285 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1287 GNUNET_SET_commit (setop->op, set->h);
1290 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1291 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1293 GNUNET_SET_commit (setop->op, set->h);
1296 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1298 GNUNET_SET_commit (setop->op, set->h);
1301 for (i = 0; i < evil.num; i++)
1303 struct GNUNET_HashCode hash;
1304 struct GNUNET_SET_Element element;
1305 element.data = &hash;
1306 element.size = sizeof (struct GNUNET_HashCode);
1307 element.element_type = 0;
1309 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1311 /* Always generate a new element. */
1312 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1314 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1316 /* Always cram the same elements, derived from counter. */
1317 GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1323 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1324 #ifdef GNUNET_EXTRA_LOGGING
1325 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1327 session->local_peer_idx,
1328 debug_str_element (&element),
1329 debug_str_set_key (&setop->input_set),
1330 debug_str_task_key (&task->key));
1333 GNUNET_STATISTICS_update (statistics,
1334 "# stuffed elements",
1337 GNUNET_SET_commit (setop->op, set->h);
1339 case EVILNESS_SLACK:
1340 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1341 "P%u: evil peer: slacking\n",
1342 (unsigned int) session->local_peer_idx);
1346 GNUNET_SET_commit (setop->op, set->h);
1351 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1353 struct GNUNET_SET_Element element;
1354 struct ContestedPayload payload;
1355 element.data = &payload;
1356 element.size = sizeof (struct ContestedPayload);
1357 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1358 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1360 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1362 GNUNET_SET_commit (setop->op, set->h);
1366 /* For our testcases, we don't want the blacklisted
1368 GNUNET_SET_operation_cancel (setop->op);
1376 put_diff (struct ConsensusSession *session,
1377 struct DiffEntry *diff)
1379 struct GNUNET_HashCode hash;
1381 GNUNET_assert (NULL != diff);
1383 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1384 GNUNET_assert (GNUNET_OK ==
1385 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1386 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1390 put_set (struct ConsensusSession *session,
1391 struct SetEntry *set)
1393 struct GNUNET_HashCode hash;
1395 GNUNET_assert (NULL != set->h);
1397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1399 debug_str_set_key (&set->key));
1401 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1402 GNUNET_assert (GNUNET_SYSERR !=
1403 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1404 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1409 put_rfn (struct ConsensusSession *session,
1410 struct ReferendumEntry *rfn)
1412 struct GNUNET_HashCode hash;
1414 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1415 GNUNET_assert (GNUNET_OK ==
1416 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1417 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1423 task_cancel_reconcile (struct TaskEntry *task)
1425 /* not implemented yet */
1431 apply_diff_to_rfn (struct DiffEntry *diff,
1432 struct ReferendumEntry *rfn,
1433 uint16_t voting_peer,
1436 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1437 struct DiffElementInfo *di;
1439 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1441 while (GNUNET_YES ==
1442 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1444 (const void **) &di))
1448 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1452 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1456 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1463 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1465 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1472 diff_compose (struct DiffEntry *diff_1,
1473 struct DiffEntry *diff_2)
1475 struct DiffEntry *diff_new;
1476 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1477 struct DiffElementInfo *di;
1479 diff_new = diff_create ();
1481 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1482 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1484 diff_insert (diff_new, di->weight, di->element);
1486 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1488 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1489 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1491 diff_insert (diff_new, di->weight, di->element);
1493 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1499 struct ReferendumEntry *
1500 rfn_create (uint16_t size)
1502 struct ReferendumEntry *rfn;
1504 rfn = GNUNET_new (struct ReferendumEntry);
1505 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1506 rfn->peer_commited = GNUNET_new_array (size, int);
1507 rfn->peer_contested = GNUNET_new_array (size, int);
1508 rfn->num_peers = size;
1515 diff_destroy (struct DiffEntry *diff)
1517 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1523 * For a given majority, count what the outcome
1524 * is (add/remove/keep), and give the number
1525 * of peers that voted for this outcome.
1528 rfn_majority (const struct ReferendumEntry *rfn,
1529 const struct RfnElementInfo *ri,
1530 uint16_t *ret_majority,
1531 enum ReferendumVote *ret_vote)
1533 uint16_t votes_yes = 0;
1534 uint16_t num_commited = 0;
1537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538 "Computing rfn majority for element %s of rfn {%s}\n",
1539 debug_str_element (ri->element),
1540 debug_str_rfn_key (&rfn->key));
1542 for (i = 0; i < rfn->num_peers; i++)
1544 if (GNUNET_NO == rfn->peer_commited[i])
1548 if (GNUNET_YES == ri->votes[i])
1552 if (votes_yes > (num_commited) / 2)
1554 *ret_vote = ri->proposal;
1555 *ret_majority = votes_yes;
1559 *ret_vote = VOTE_STAY;
1560 *ret_majority = num_commited - votes_yes;
1567 struct TaskEntry *task;
1568 struct SetKey dst_set_key;
1573 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1575 struct SetCopyCls *scc = cls;
1576 struct TaskEntry *task = scc->task;
1577 struct SetKey dst_set_key = scc->dst_set_key;
1578 struct SetEntry *set;
1581 set = GNUNET_new (struct SetEntry);
1583 set->key = dst_set_key;
1584 put_set (task->step->session, set);
1591 * Call the start function of the given
1592 * task again after we created a copy of the given set.
1595 create_set_copy_for_task (struct TaskEntry *task,
1596 struct SetKey *src_set_key,
1597 struct SetKey *dst_set_key)
1599 struct SetEntry *src_set;
1600 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603 "Copying set {%s} to {%s} for task {%s}\n",
1604 debug_str_set_key (src_set_key),
1605 debug_str_set_key (dst_set_key),
1606 debug_str_task_key (&task->key));
1609 scc->dst_set_key = *dst_set_key;
1610 src_set = lookup_set (task->step->session, src_set_key);
1611 GNUNET_assert (NULL != src_set);
1612 GNUNET_SET_copy_lazy (src_set->h,
1618 struct SetMutationProgressCls
1622 * Task to finish once all changes are through.
1624 struct TaskEntry *task;
1629 set_mutation_done (void *cls)
1631 struct SetMutationProgressCls *pc = cls;
1633 GNUNET_assert (pc->num_pending > 0);
1637 if (0 == pc->num_pending)
1639 struct TaskEntry *task = pc->task;
1647 try_finish_step_early (struct Step *step)
1651 if (GNUNET_YES == step->is_running)
1653 if (GNUNET_YES == step->is_finished)
1655 if (GNUNET_NO == step->early_finishable)
1658 step->is_finished = GNUNET_YES;
1660 #ifdef GNUNET_EXTRA_LOGGING
1661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662 "Finishing step `%s' early.\n",
1666 for (i = 0; i < step->subordinates_len; i++)
1668 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1669 step->subordinates[i]->pending_prereq--;
1670 #ifdef GNUNET_EXTRA_LOGGING
1671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672 "Decreased pending_prereq to %u for step `%s'.\n",
1673 (unsigned int) step->subordinates[i]->pending_prereq,
1674 step->subordinates[i]->debug_name);
1677 try_finish_step_early (step->subordinates[i]);
1680 // XXX: maybe schedule as task to avoid recursion?
1681 run_ready_steps (step->session);
1686 finish_step (struct Step *step)
1690 GNUNET_assert (step->finished_tasks == step->tasks_len);
1691 GNUNET_assert (GNUNET_YES == step->is_running);
1692 GNUNET_assert (GNUNET_NO == step->is_finished);
1694 #ifdef GNUNET_EXTRA_LOGGING
1695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696 "All tasks of step `%s' with %u subordinates finished.\n",
1698 step->subordinates_len);
1701 for (i = 0; i < step->subordinates_len; i++)
1703 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1704 step->subordinates[i]->pending_prereq--;
1705 #ifdef GNUNET_EXTRA_LOGGING
1706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707 "Decreased pending_prereq to %u for step `%s'.\n",
1708 (unsigned int) step->subordinates[i]->pending_prereq,
1709 step->subordinates[i]->debug_name);
1714 step->is_finished = GNUNET_YES;
1716 // XXX: maybe schedule as task to avoid recursion?
1717 run_ready_steps (step->session);
1723 * Apply the result from one round of gradecasts (i.e. every peer
1724 * should have gradecasted) to the peer's current set.
1726 * @param task the task with context information
1729 task_start_apply_round (struct TaskEntry *task)
1731 struct ConsensusSession *session = task->step->session;
1732 struct SetKey sk_in;
1733 struct SetKey sk_out;
1734 struct RfnKey rk_in;
1735 struct SetEntry *set_out;
1736 struct ReferendumEntry *rfn_in;
1737 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1738 struct RfnElementInfo *ri;
1739 struct SetMutationProgressCls *progress_cls;
1740 uint16_t worst_majority = UINT16_MAX;
1742 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1743 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1744 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1746 set_out = lookup_set (session, &sk_out);
1747 if (NULL == set_out)
1749 create_set_copy_for_task (task, &sk_in, &sk_out);
1753 rfn_in = lookup_rfn (session, &rk_in);
1754 GNUNET_assert (NULL != rfn_in);
1756 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1757 progress_cls->task = task;
1759 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1761 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1763 uint16_t majority_num;
1764 enum ReferendumVote majority_vote;
1766 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1768 if (worst_majority > majority_num)
1769 worst_majority = majority_num;
1771 switch (majority_vote)
1774 progress_cls->num_pending++;
1775 GNUNET_assert (GNUNET_OK ==
1776 GNUNET_SET_add_element (set_out->h,
1780 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1781 "P%u: apply round: adding element %s with %u-majority.\n",
1782 session->local_peer_idx,
1783 debug_str_element (ri->element), majority_num);
1786 progress_cls->num_pending++;
1787 GNUNET_assert (GNUNET_OK ==
1788 GNUNET_SET_remove_element (set_out->h,
1792 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1793 "P%u: apply round: deleting element %s with %u-majority.\n",
1794 session->local_peer_idx,
1795 debug_str_element (ri->element), majority_num);
1798 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1799 "P%u: apply round: keeping element %s with %u-majority.\n",
1800 session->local_peer_idx,
1801 debug_str_element (ri->element), majority_num);
1810 if (progress_cls->num_pending == 0)
1812 // call closure right now, no pending ops
1813 GNUNET_free (progress_cls);
1818 uint16_t thresh = (session->num_peers / 3) * 2;
1820 if (worst_majority >= thresh)
1822 switch (session->early_stopping)
1824 case EARLY_STOPPING_NONE:
1825 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1826 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1827 "P%u: Stopping early (after one more superround)\n",
1828 session->local_peer_idx);
1830 case EARLY_STOPPING_ONE_MORE:
1831 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1832 session->local_peer_idx);
1833 session->early_stopping = EARLY_STOPPING_DONE;
1836 for (step = session->steps_head; NULL != step; step = step->next)
1837 try_finish_step_early (step);
1840 case EARLY_STOPPING_DONE:
1841 /* We shouldn't be here anymore after early stopping */
1849 else if (EARLY_STOPPING_NONE != session->early_stopping)
1851 // Our assumption about the number of bad peers
1853 GNUNET_break_op (0);
1857 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1858 session->local_peer_idx);
1866 task_start_grade (struct TaskEntry *task)
1868 struct ConsensusSession *session = task->step->session;
1869 struct ReferendumEntry *output_rfn;
1870 struct ReferendumEntry *input_rfn;
1871 struct DiffEntry *input_diff;
1872 struct RfnKey rfn_key;
1873 struct DiffKey diff_key;
1874 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1875 struct RfnElementInfo *ri;
1876 unsigned int gradecast_confidence = 2;
1878 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1879 output_rfn = lookup_rfn (session, &rfn_key);
1880 if (NULL == output_rfn)
1882 output_rfn = rfn_create (session->num_peers);
1883 output_rfn->key = rfn_key;
1884 put_rfn (session, output_rfn);
1887 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1888 input_diff = lookup_diff (session, &diff_key);
1889 GNUNET_assert (NULL != input_diff);
1891 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1892 input_rfn = lookup_rfn (session, &rfn_key);
1893 GNUNET_assert (NULL != input_rfn);
1895 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1897 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1899 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1901 uint16_t majority_num;
1902 enum ReferendumVote majority_vote;
1904 // XXX: we need contested votes and non-contested votes here
1905 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1907 if (majority_num <= session->num_peers / 3)
1908 majority_vote = VOTE_REMOVE;
1910 switch (majority_vote)
1915 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1918 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1927 uint16_t noncontested;
1928 noncontested = rfn_noncontested (input_rfn);
1929 if (noncontested < (session->num_peers / 3) * 2)
1931 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1933 if (noncontested < (session->num_peers / 3) + 1)
1935 gradecast_confidence = 0;
1939 if (gradecast_confidence >= 1)
1940 rfn_commit (output_rfn, task->key.leader);
1942 if (gradecast_confidence <= 1)
1943 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1950 task_start_reconcile (struct TaskEntry *task)
1952 struct SetEntry *input;
1953 struct SetOpCls *setop = &task->cls.setop;
1954 struct ConsensusSession *session = task->step->session;
1956 input = lookup_set (session, &setop->input_set);
1957 GNUNET_assert (NULL != input);
1958 GNUNET_assert (NULL != input->h);
1960 /* We create the outputs for the operation here
1961 (rather than in the set operation callback)
1962 because we want something valid in there, even
1963 if the other peer doesn't talk to us */
1965 if (SET_KIND_NONE != setop->output_set.set_kind)
1967 /* If we don't have an existing output set,
1968 we clone the input set. */
1969 if (NULL == lookup_set (session, &setop->output_set))
1971 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1976 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1978 if (NULL == lookup_rfn (session, &setop->output_rfn))
1980 struct ReferendumEntry *rfn;
1982 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1983 "P%u: output rfn <%s> missing, creating.\n",
1984 session->local_peer_idx,
1985 debug_str_rfn_key (&setop->output_rfn));
1987 rfn = rfn_create (session->num_peers);
1988 rfn->key = setop->output_rfn;
1989 put_rfn (session, rfn);
1993 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1995 if (NULL == lookup_diff (session, &setop->output_diff))
1997 struct DiffEntry *diff;
1999 diff = diff_create ();
2000 diff->key = setop->output_diff;
2001 put_diff (session, diff);
2005 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2007 /* XXX: mark the corresponding rfn as commited if necessary */
2012 if (task->key.peer1 == session->local_peer_idx)
2014 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2017 "P%u: Looking up set {%s} to run remote union\n",
2018 session->local_peer_idx,
2019 debug_str_set_key (&setop->input_set));
2021 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2022 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2024 rcm.kind = htons (task->key.kind);
2025 rcm.peer1 = htons (task->key.peer1);
2026 rcm.peer2 = htons (task->key.peer2);
2027 rcm.leader = htons (task->key.leader);
2028 rcm.repetition = htons (task->key.repetition);
2030 GNUNET_assert (NULL == setop->op);
2031 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2032 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2034 // XXX: maybe this should be done while
2035 // setting up tasks alreays?
2036 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2037 &session->global_id,
2039 GNUNET_SET_RESULT_SYMMETRIC,
2043 commit_set (session, task);
2045 else if (task->key.peer2 == session->local_peer_idx)
2047 /* Wait for the other peer to contact us */
2048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2049 session->local_peer_idx, task->key.peer1);
2051 if (NULL != setop->op)
2053 commit_set (session, task);
2058 /* We made an error while constructing the task graph. */
2065 task_start_eval_echo (struct TaskEntry *task)
2067 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2068 struct ReferendumEntry *input_rfn;
2069 struct RfnElementInfo *ri;
2070 struct SetEntry *output_set;
2071 struct SetMutationProgressCls *progress_cls;
2072 struct ConsensusSession *session = task->step->session;
2073 struct SetKey sk_in;
2074 struct SetKey sk_out;
2075 struct RfnKey rk_in;
2077 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2078 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2079 output_set = lookup_set (session, &sk_out);
2080 if (NULL == output_set)
2082 create_set_copy_for_task (task, &sk_in, &sk_out);
2088 // FIXME: should be marked as a shallow copy, so
2089 // we can destroy everything correctly
2090 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2091 last_set->h = output_set->h;
2092 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2093 put_set (session, last_set);
2096 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2097 "Evaluating referendum in Task {%s}\n",
2098 debug_str_task_key (&task->key));
2100 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2101 progress_cls->task = task;
2103 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2104 input_rfn = lookup_rfn (session, &rk_in);
2106 GNUNET_assert (NULL != input_rfn);
2108 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2109 GNUNET_assert (NULL != iter);
2111 while (GNUNET_YES ==
2112 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2114 (const void **) &ri))
2116 enum ReferendumVote majority_vote;
2117 uint16_t majority_num;
2119 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2121 if (majority_num < session->num_peers / 3)
2123 /* It is not the case that all nonfaulty peers
2124 echoed the same value. Since we're doing a set reconciliation, we
2125 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2126 reconciliation as contested. Other peers might not know that the
2127 leader is faulty, thus we still re-distribute in the confirmation
2129 output_set->is_contested = GNUNET_YES;
2132 switch (majority_vote)
2135 progress_cls->num_pending++;
2136 GNUNET_assert (GNUNET_OK ==
2137 GNUNET_SET_add_element (output_set->h,
2143 progress_cls->num_pending++;
2144 GNUNET_assert (GNUNET_OK ==
2145 GNUNET_SET_remove_element (output_set->h,
2151 /* Nothing to do. */
2159 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2161 if (progress_cls->num_pending == 0)
2163 // call closure right now, no pending ops
2164 GNUNET_free (progress_cls);
2171 task_start_finish (struct TaskEntry *task)
2173 struct SetEntry *final_set;
2174 struct ConsensusSession *session = task->step->session;
2176 final_set = lookup_set (session, &task->cls.finish.input_set);
2178 GNUNET_assert (NULL != final_set);
2181 GNUNET_SET_iterate (final_set->h,
2182 send_to_client_iter,
2187 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2189 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2191 GNUNET_assert (GNUNET_NO == task->is_started);
2192 GNUNET_assert (GNUNET_NO == task->is_finished);
2193 GNUNET_assert (NULL != task->start);
2197 task->is_started = GNUNET_YES;
2204 * Run all steps of the session that don't any
2205 * more dependencies.
2208 run_ready_steps (struct ConsensusSession *session)
2212 step = session->steps_head;
2214 while (NULL != step)
2216 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2220 GNUNET_assert (0 == step->finished_tasks);
2222 #ifdef GNUNET_EXTRA_LOGGING
2223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2224 session->local_peer_idx,
2226 step->round, step->tasks_len, step->subordinates_len);
2229 step->is_running = GNUNET_YES;
2230 for (i = 0; i < step->tasks_len; i++)
2231 start_task (session, step->tasks[i]);
2233 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2234 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2237 /* Running the next ready steps will be triggered by task completion */
2249 finish_task (struct TaskEntry *task)
2251 GNUNET_assert (GNUNET_NO == task->is_finished);
2252 task->is_finished = GNUNET_YES;
2254 task->step->finished_tasks++;
2256 if (task->step->finished_tasks == task->step->tasks_len)
2257 finish_step (task->step);
2262 * Search peer in the list of peers in session.
2264 * @param peer peer to find
2265 * @param session session with peer
2266 * @return index of peer, -1 if peer is not in session
2269 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2272 for (i = 0; i < session->num_peers; i++)
2273 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2280 * Compute a global, (hopefully) unique consensus session id,
2281 * from the local id of the consensus session, and the identities of all participants.
2282 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2283 * exactly the same peers, the global id will be different.
2285 * @param session session to generate the global id for
2286 * @param local_session_id local id of the consensus session
2289 compute_global_id (struct ConsensusSession *session,
2290 const struct GNUNET_HashCode *local_session_id)
2292 const char *salt = "gnunet-service-consensus/session_id";
2294 GNUNET_assert (GNUNET_YES ==
2295 GNUNET_CRYPTO_kdf (&session->global_id,
2296 sizeof (struct GNUNET_HashCode),
2300 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2302 sizeof (struct GNUNET_HashCode),
2308 * Compare two peer identities.
2310 * @param h1 some peer identity
2311 * @param h2 some peer identity
2312 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2315 peer_id_cmp (const void *h1, const void *h2)
2317 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2322 * Create the sorted list of peers for the session,
2323 * add the local peer if not in the join message.
2326 initialize_session_peer_list (struct ConsensusSession *session,
2327 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2329 unsigned int local_peer_in_list;
2330 uint32_t listed_peers;
2331 const struct GNUNET_PeerIdentity *msg_peers;
2334 GNUNET_assert (NULL != join_msg);
2336 /* peers in the join message, may or may not include the local peer */
2337 listed_peers = ntohl (join_msg->num_peers);
2339 session->num_peers = listed_peers;
2341 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2343 local_peer_in_list = GNUNET_NO;
2344 for (i = 0; i < listed_peers; i++)
2346 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2348 local_peer_in_list = GNUNET_YES;
2353 if (GNUNET_NO == local_peer_in_list)
2354 session->num_peers++;
2356 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2358 if (GNUNET_NO == local_peer_in_list)
2359 session->peers[session->num_peers - 1] = my_peer;
2361 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2362 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2366 static struct TaskEntry *
2367 lookup_task (struct ConsensusSession *session,
2368 struct TaskKey *key)
2370 struct GNUNET_HashCode hash;
2373 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2375 GNUNET_h2s (&hash));
2376 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2381 * Called when another peer wants to do a set operation with the
2384 * @param cls closure
2385 * @param other_peer the other peer
2386 * @param context_msg message with application specific information from
2388 * @param request request from the other peer, use GNUNET_SET_accept
2389 * to accept it, otherwise the request will be refused
2390 * Note that we don't use a return value here, as it is also
2391 * necessary to specify the set we want to do the operation with,
2392 * whith sometimes can be derived from the context message.
2393 * Also necessary to specify the timeout.
2396 set_listen_cb (void *cls,
2397 const struct GNUNET_PeerIdentity *other_peer,
2398 const struct GNUNET_MessageHeader *context_msg,
2399 struct GNUNET_SET_Request *request)
2401 struct ConsensusSession *session = cls;
2403 struct TaskEntry *task;
2404 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2406 if (NULL == context_msg)
2408 GNUNET_break_op (0);
2412 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2414 GNUNET_break_op (0);
2418 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2420 GNUNET_break_op (0);
2424 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2426 tk = ((struct TaskKey) {
2427 .kind = ntohs (cm->kind),
2428 .peer1 = ntohs (cm->peer1),
2429 .peer2 = ntohs (cm->peer2),
2430 .repetition = ntohs (cm->repetition),
2431 .leader = ntohs (cm->leader),
2434 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2435 session->local_peer_idx, debug_str_task_key (&tk));
2437 task = lookup_task (session, &tk);
2441 GNUNET_break_op (0);
2445 if (GNUNET_YES == task->is_finished)
2447 GNUNET_break_op (0);
2451 if (task->key.peer2 != session->local_peer_idx)
2453 /* We're being asked, so we must be thne 2nd peer. */
2454 GNUNET_break_op (0);
2458 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2459 (task->key.peer2 == session->local_peer_idx)));
2461 task->cls.setop.op = GNUNET_SET_accept (request,
2462 GNUNET_SET_RESULT_SYMMETRIC,
2466 /* If the task hasn't been started yet,
2467 we wait for that until we commit. */
2469 if (GNUNET_YES == task->is_started)
2471 commit_set (session, task);
2478 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2479 struct TaskEntry *t)
2481 struct GNUNET_HashCode round_hash;
2484 GNUNET_assert (NULL != t->step);
2486 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2490 if (s->tasks_len == s->tasks_cap)
2492 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2493 GNUNET_array_grow (s->tasks,
2498 #ifdef GNUNET_EXTRA_LOGGING
2499 GNUNET_assert (NULL != s->debug_name);
2500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2501 debug_str_task_key (&t->key),
2505 s->tasks[s->tasks_len] = t;
2508 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2509 GNUNET_assert (GNUNET_OK ==
2510 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2511 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2516 install_step_timeouts (struct ConsensusSession *session)
2518 /* Given the fully constructed task graph
2519 with rounds for tasks, we can give the tasks timeouts. */
2521 // unsigned int max_round;
2523 /* XXX: implement! */
2529 * Arrange two peers in some canonical order.
2532 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2537 GNUNET_assert (*p1 < n);
2538 GNUNET_assert (*p2 < n);
2551 /* For uniformly random *p1, *p2,
2552 this condition is true with 50% chance */
2553 if (((b - a) + n) % n <= n / 2)
2567 * Record @a dep as a dependency of @a step.
2570 step_depend_on (struct Step *step, struct Step *dep)
2572 /* We're not checking for cyclic dependencies,
2573 but this is a cheap sanity check. */
2574 GNUNET_assert (step != dep);
2575 GNUNET_assert (NULL != step);
2576 GNUNET_assert (NULL != dep);
2577 GNUNET_assert (dep->round <= step->round);
2579 #ifdef GNUNET_EXTRA_LOGGING
2580 /* Make sure we have complete debugging information.
2581 Also checks that we don't screw up too badly
2582 constructing the task graph. */
2583 GNUNET_assert (NULL != step->debug_name);
2584 GNUNET_assert (NULL != dep->debug_name);
2585 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2586 "Making step `%s' depend on `%s'\n",
2591 if (dep->subordinates_cap == dep->subordinates_len)
2593 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2594 GNUNET_array_grow (dep->subordinates,
2595 dep->subordinates_cap,
2599 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2601 dep->subordinates[dep->subordinates_len] = step;
2602 dep->subordinates_len++;
2604 step->pending_prereq++;
2608 static struct Step *
2609 create_step (struct ConsensusSession *session, int round, int early_finishable)
2612 step = GNUNET_new (struct Step);
2613 step->session = session;
2614 step->round = round;
2615 step->early_finishable = early_finishable;
2616 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2617 session->steps_tail,
2624 * Construct the task graph for a single
2628 construct_task_graph_gradecast (struct ConsensusSession *session,
2631 struct Step *step_before,
2632 struct Step *step_after)
2634 uint16_t n = session->num_peers;
2635 uint16_t me = session->local_peer_idx;
2640 /* The task we're currently setting up. */
2641 struct TaskEntry task;
2644 struct Step *prev_step;
2650 round = step_before->round + 1;
2652 /* gcast step 1: leader disseminates */
2654 step = create_step (session, round, GNUNET_YES);
2656 #ifdef GNUNET_EXTRA_LOGGING
2657 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2659 step_depend_on (step, step_before);
2663 for (k = 0; k < n; k++)
2669 arrange_peers (&p1, &p2, n);
2670 task = ((struct TaskEntry) {
2672 .start = task_start_reconcile,
2673 .cancel = task_cancel_reconcile,
2674 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2676 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2677 put_task (session->taskmap, &task);
2679 /* We run this task to make sure that the leader
2680 has the stored the SET_KIND_LEADER set of himself,
2681 so he can participate in the rest of the gradecast
2682 without the code having to handle any special cases. */
2683 task = ((struct TaskEntry) {
2685 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2686 .start = task_start_reconcile,
2687 .cancel = task_cancel_reconcile,
2689 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2690 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2691 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2692 put_task (session->taskmap, &task);
2698 arrange_peers (&p1, &p2, n);
2699 task = ((struct TaskEntry) {
2701 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2702 .start = task_start_reconcile,
2703 .cancel = task_cancel_reconcile,
2705 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2706 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2707 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2708 put_task (session->taskmap, &task);
2711 /* gcast phase 2: echo */
2714 step = create_step (session, round, GNUNET_YES);
2715 #ifdef GNUNET_EXTRA_LOGGING
2716 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2718 step_depend_on (step, prev_step);
2720 for (k = 0; k < n; k++)
2724 arrange_peers (&p1, &p2, n);
2725 task = ((struct TaskEntry) {
2727 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2728 .start = task_start_reconcile,
2729 .cancel = task_cancel_reconcile,
2731 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2732 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2733 put_task (session->taskmap, &task);
2737 /* Same round, since step only has local tasks */
2738 step = create_step (session, round, GNUNET_YES);
2739 #ifdef GNUNET_EXTRA_LOGGING
2740 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2742 step_depend_on (step, prev_step);
2744 arrange_peers (&p1, &p2, n);
2745 task = ((struct TaskEntry) {
2746 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2748 .start = task_start_eval_echo
2750 put_task (session->taskmap, &task);
2754 step = create_step (session, round, GNUNET_YES);
2755 #ifdef GNUNET_EXTRA_LOGGING
2756 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2758 step_depend_on (step, prev_step);
2760 /* gcast phase 3: confirmation and grading */
2761 for (k = 0; k < n; k++)
2765 arrange_peers (&p1, &p2, n);
2766 task = ((struct TaskEntry) {
2768 .start = task_start_reconcile,
2769 .cancel = task_cancel_reconcile,
2770 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2772 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2773 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2774 /* If there was at least one element in the echo round that was
2775 contested (i.e. it had no n-t majority), then we let the other peers
2776 know, and other peers let us know. The contested flag for each peer is
2777 stored in the rfn. */
2778 task.cls.setop.transceive_contested = GNUNET_YES;
2779 put_task (session->taskmap, &task);
2783 /* Same round, since step only has local tasks */
2784 step = create_step (session, round, GNUNET_YES);
2785 #ifdef GNUNET_EXTRA_LOGGING
2786 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2788 step_depend_on (step, prev_step);
2790 task = ((struct TaskEntry) {
2792 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2793 .start = task_start_grade,
2795 put_task (session->taskmap, &task);
2797 step_depend_on (step_after, step);
2802 construct_task_graph (struct ConsensusSession *session)
2804 uint16_t n = session->num_peers;
2807 uint16_t me = session->local_peer_idx;
2809 /* The task we're currently setting up. */
2810 struct TaskEntry task;
2812 /* Current leader */
2816 struct Step *prev_step;
2818 unsigned int round = 0;
2822 // XXX: introduce first step,
2823 // where we wait for all insert acks
2824 // from the set service
2826 /* faster but brittle all-to-all */
2828 // XXX: Not implemented yet
2830 /* all-to-all step */
2832 step = create_step (session, round, GNUNET_NO);
2834 #ifdef GNUNET_EXTRA_LOGGING
2835 step->debug_name = GNUNET_strdup ("all to all");
2838 for (i = 0; i < n; i++)
2845 arrange_peers (&p1, &p2, n);
2846 task = ((struct TaskEntry) {
2847 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2849 .start = task_start_reconcile,
2850 .cancel = task_cancel_reconcile,
2852 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2853 task.cls.setop.output_set = task.cls.setop.input_set;
2854 task.cls.setop.do_not_remove = GNUNET_YES;
2855 put_task (session->taskmap, &task);
2863 /* Byzantine union */
2865 /* sequential repetitions of the gradecasts */
2866 for (i = 0; i < t + 1; i++)
2868 struct Step *step_rep_start;
2869 struct Step *step_rep_end;
2871 /* Every repetition is in a separate round. */
2872 step_rep_start = create_step (session, round, GNUNET_YES);
2873 #ifdef GNUNET_EXTRA_LOGGING
2874 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2877 step_depend_on (step_rep_start, prev_step);
2879 /* gradecast has three rounds */
2881 step_rep_end = create_step (session, round, GNUNET_YES);
2882 #ifdef GNUNET_EXTRA_LOGGING
2883 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2886 /* parallel gradecasts */
2887 for (lead = 0; lead < n; lead++)
2888 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2890 task = ((struct TaskEntry) {
2891 .step = step_rep_end,
2892 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2893 .start = task_start_apply_round,
2895 put_task (session->taskmap, &task);
2897 prev_step = step_rep_end;
2900 /* There is no next gradecast round, thus the final
2901 start step is the overall end step of the gradecasts */
2903 step = create_step (session, round, GNUNET_NO);
2904 #ifdef GNUNET_EXTRA_LOGGING
2905 GNUNET_asprintf (&step->debug_name, "finish");
2907 step_depend_on (step, prev_step);
2909 task = ((struct TaskEntry) {
2911 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2912 .start = task_start_finish,
2914 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2916 put_task (session->taskmap, &task);
2921 * Initialize the session, continue receiving messages from the owning client
2923 * @param session the session to initialize
2924 * @param join_msg the join message from the client
2927 initialize_session (struct ConsensusSession *session,
2928 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2930 struct ConsensusSession *other_session;
2932 initialize_session_peer_list (session, join_msg);
2933 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2934 compute_global_id (session, &join_msg->session_id);
2936 /* Check if some local client already owns the session.
2937 It is only legal to have a session with an existing global id
2938 if all other sessions with this global id are finished.*/
2939 other_session = sessions_head;
2940 while (NULL != other_session)
2942 if ((other_session != session) &&
2943 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2945 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2947 // GNUNET_break (0);
2948 // destroy_session (session);
2953 other_session = other_session->next;
2956 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2957 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2959 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n",
2960 (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2962 session->local_peer_idx = get_peer_idx (&my_peer, session);
2963 GNUNET_assert (-1 != session->local_peer_idx);
2964 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2965 &session->global_id,
2966 set_listen_cb, session);
2967 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2969 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2970 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2971 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2972 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2975 struct SetEntry *client_set;
2976 client_set = GNUNET_new (struct SetEntry);
2977 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2978 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2979 put_set (session, client_set);
2982 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2984 /* Just construct the task graph,
2985 but don't run anything until the client calls conclude. */
2986 construct_task_graph (session);
2988 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2992 static struct ConsensusSession *
2993 get_session_by_client (struct GNUNET_SERVER_Client *client)
2995 struct ConsensusSession *session;
2997 session = sessions_head;
2998 while (NULL != session)
3000 if (session->client == client)
3002 session = session->next;
3009 * Called when a client wants to join a consensus session.
3012 * @param client client that sent the message
3013 * @param m message sent by the client
3016 client_join (void *cls,
3017 struct GNUNET_SERVER_Client *client,
3018 const struct GNUNET_MessageHeader *m)
3020 struct ConsensusSession *session;
3022 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3024 session = get_session_by_client (client);
3025 if (NULL != session)
3028 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3031 session = GNUNET_new (struct ConsensusSession);
3032 session->client = client;
3033 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3034 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3035 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3036 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3038 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3043 client_insert_done (void *cls)
3050 * Called when a client performs an insert operation.
3052 * @param cls (unused)
3053 * @param client client handle
3054 * @param m message sent by the client
3057 client_insert (void *cls,
3058 struct GNUNET_SERVER_Client *client,
3059 const struct GNUNET_MessageHeader *m)
3061 struct ConsensusSession *session;
3062 struct GNUNET_CONSENSUS_ElementMessage *msg;
3063 struct GNUNET_SET_Element *element;
3064 ssize_t element_size;
3065 struct GNUNET_SET_Handle *initial_set;
3067 session = get_session_by_client (client);
3069 if (NULL == session)
3072 GNUNET_SERVER_client_disconnect (client);
3076 if (GNUNET_YES == session->conclude_started)
3079 GNUNET_SERVER_client_disconnect (client);
3083 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3084 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3085 if (element_size < 0)
3091 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3092 element->element_type = msg->element_type;
3093 element->size = element_size;
3094 memcpy (&element[1], &msg[1], element_size);
3095 element->data = &element[1];
3097 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3098 struct SetEntry *entry;
3099 entry = lookup_set (session, &key);
3100 GNUNET_assert (NULL != entry);
3101 initial_set = entry->h;
3103 session->num_client_insert_pending++;
3104 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
3106 #ifdef GNUNET_EXTRA_LOGGING
3108 struct GNUNET_HashCode hash;
3110 GNUNET_SET_element_hash (element, &hash);
3112 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
3113 session->local_peer_idx,
3114 GNUNET_h2s (&hash));
3118 GNUNET_free (element);
3119 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3124 * Called when a client performs the conclude operation.
3126 * @param cls (unused)
3127 * @param client client handle
3128 * @param message message sent by the client
3131 client_conclude (void *cls,
3132 struct GNUNET_SERVER_Client *client,
3133 const struct GNUNET_MessageHeader *message)
3135 struct ConsensusSession *session;
3137 session = get_session_by_client (client);
3138 if (NULL == session)
3140 /* client not found */
3142 GNUNET_SERVER_client_disconnect (client);
3146 if (GNUNET_YES == session->conclude_started)
3148 /* conclude started twice */
3150 GNUNET_SERVER_client_disconnect (client);
3151 destroy_session (session);
3155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3157 session->conclude_started = GNUNET_YES;
3159 install_step_timeouts (session);
3160 run_ready_steps (session);
3163 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3168 * Called to clean up, after a shutdown has been requested.
3170 * @param cls closure
3173 shutdown_task (void *cls)
3175 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
3176 while (NULL != sessions_head)
3177 destroy_session (sessions_head);
3179 GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
3184 * Clean up after a client after it is
3185 * disconnected (either by us or by itself)
3187 * @param cls closure, unused
3188 * @param client the client to clean up after
3191 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3193 struct ConsensusSession *session;
3195 session = get_session_by_client (client);
3196 if (NULL == session)
3198 // FIXME: destroy if we can
3204 * Start processing consensus requests.
3206 * @param cls closure
3207 * @param server the initialized server
3208 * @param c configuration to use
3211 run (void *cls, struct GNUNET_SERVER_Handle *server,
3212 const struct GNUNET_CONFIGURATION_Handle *c)
3214 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3215 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3216 sizeof (struct GNUNET_MessageHeader)},
3217 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3218 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3224 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3226 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3228 GNUNET_SCHEDULER_shutdown ();
3231 statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3232 GNUNET_SERVER_add_handlers (server, server_handlers);
3233 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
3234 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3235 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3240 * The main function for the consensus service.
3242 * @param argc number of arguments from the command line
3243 * @param argv command line arguments
3244 * @return 0 ok, 1 on error
3247 main (int argc, char *const *argv)
3250 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3251 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3252 return (GNUNET_OK == ret) ? 0 : 1;