2 This file is part of GNUnet
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
48 * Vote that an element should be added.
52 * Vote that an element should be removed.
58 enum EarlyStoppingPhase
60 EARLY_STOPPING_NONE = 0,
61 EARLY_STOPPING_ONE_MORE = 1,
62 EARLY_STOPPING_DONE = 2,
66 GNUNET_NETWORK_STRUCT_BEGIN
69 struct ContestedPayload
74 * Tuple of integers that together
75 * identify a task uniquely.
79 * A value from 'enum PhaseKind'.
81 uint16_t kind GNUNET_PACKED;
84 * Number of the first peer
87 int16_t peer1 GNUNET_PACKED;
90 * Number of the second peer in canonical order.
92 int16_t peer2 GNUNET_PACKED;
95 * Repetition of the gradecast phase.
97 int16_t repetition GNUNET_PACKED;
100 * Leader in the gradecast phase.
102 * Can be different from both peer1 and peer2.
104 int16_t leader GNUNET_PACKED;
111 int set_kind GNUNET_PACKED;
112 int k1 GNUNET_PACKED;
113 int k2 GNUNET_PACKED;
120 struct GNUNET_SET_Handle *h;
122 * GNUNET_YES if the set resulted
123 * from applying a referendum with contested
132 int diff_kind GNUNET_PACKED;
133 int k1 GNUNET_PACKED;
134 int k2 GNUNET_PACKED;
139 int rfn_kind GNUNET_PACKED;
140 int k1 GNUNET_PACKED;
141 int k2 GNUNET_PACKED;
145 GNUNET_NETWORK_STRUCT_END
149 PHASE_KIND_ALL_TO_ALL,
150 PHASE_KIND_GRADECAST_LEADER,
151 PHASE_KIND_GRADECAST_ECHO,
152 PHASE_KIND_GRADECAST_ECHO_GRADE,
153 PHASE_KIND_GRADECAST_CONFIRM,
154 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
156 * Apply a repetition of the all-to-all
157 * gradecast to the current set.
159 PHASE_KIND_APPLY_REP,
169 * Last result set from a gradecast
171 SET_KIND_LAST_GRADECAST,
172 SET_KIND_LEADER_PROPOSAL,
173 SET_KIND_ECHO_RESULT,
179 DIFF_KIND_LEADER_PROPOSAL,
180 DIFF_KIND_LEADER_CONSENSUS,
181 DIFF_KIND_GRADECAST_RESULT,
189 RFN_KIND_GRADECAST_RESULT
195 struct SetKey input_set;
197 struct SetKey output_set;
198 struct RfnKey output_rfn;
199 struct DiffKey output_diff;
203 int transceive_contested;
205 struct GNUNET_SET_OperationHandle *op;
211 struct SetKey input_set;
215 * Closure for both @a start_task
216 * and @a cancel_task.
220 struct SetOpCls setop;
221 struct FinishCls finish;
226 typedef void (*TaskFunc) (struct TaskEntry *task);
229 * Node in the consensus task graph.
244 union TaskFuncCls cls;
251 * All steps of one session are in a
252 * linked list for easier deallocation.
257 * All steps of one session are in a
258 * linked list for easier deallocation.
262 struct ConsensusSession *session;
265 * Tasks that this step is composed of.
267 struct TaskEntry **tasks;
268 unsigned int tasks_len;
269 unsigned int tasks_cap;
271 unsigned int finished_tasks;
274 * Tasks that have this task as dependency.
276 * We store pointers to subordinates rather
277 * than to prerequisites since it makes
278 * tracking the readiness of a task easier.
280 struct Step **subordinates;
281 unsigned int subordinates_len;
282 unsigned int subordinates_cap;
285 * Counter for the prerequisites of
288 size_t pending_prereq;
291 * Task that will run this step despite
292 * any pending prerequisites.
294 struct GNUNET_SCHEDULER_Task *timeout_task;
296 unsigned int is_running;
298 unsigned int is_finished;
301 * Synchrony round of the task.
302 * Determines the deadline for the task.
307 * Human-readable name for
308 * the task, used for debugging.
313 * When we're doing an early finish, how should this step be
315 * If GNUNET_YES, the step will be marked as finished
316 * without actually running its tasks.
317 * Otherwise, the step will still be run even after
320 * Note that a task may never be finished early if
321 * it is already running.
323 int early_finishable;
327 struct RfnElementInfo
329 const struct GNUNET_SET_Element *element;
332 * GNUNET_YES if the peer votes for the proposal.
337 * Proposal for this element,
338 * can only be VOTE_ADD or VOTE_REMOVE.
340 enum ReferendumVote proposal;
344 struct ReferendumEntry
349 * Elements where there is at least one proposed change.
351 * Maps the hash of the GNUNET_SET_Element
352 * to 'struct RfnElementInfo'.
354 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
356 unsigned int num_peers;
359 * Stores, for every peer in the session,
360 * whether the peer finished the whole referendum.
362 * Votes from peers are only counted if they're
363 * marked as commited (#GNUNET_YES) in the referendum.
365 * Otherwise (#GNUNET_NO), the requested changes are
366 * not counted for majority votes or thresholds.
372 * Contestation state of the peer. If a peer is contested, the values it
373 * contributed are still counted for applying changes, but the grading is
380 struct DiffElementInfo
382 const struct GNUNET_SET_Element *element;
385 * Positive weight for 'add', negative
386 * weights for 'remove'.
398 struct GNUNET_CONTAINER_MultiHashMap *changes;
404 * A consensus session consists of one local client and the remote authorities.
406 struct ConsensusSession
409 * Consensus sessions are kept in a DLL.
411 struct ConsensusSession *next;
414 * Consensus sessions are kept in a DLL.
416 struct ConsensusSession *prev;
418 unsigned int num_client_insert_pending;
420 struct GNUNET_CONTAINER_MultiHashMap *setmap;
421 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
425 * Array of peers with length 'num_peers'.
427 int *peers_blacklisted;
430 * Mapping from (hashed) TaskKey to TaskEntry.
432 * We map the application_id for a round to the task that should be
433 * executed, so we don't have to go through all task whenever we get
434 * an incoming set op request.
436 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
438 struct Step *steps_head;
439 struct Step *steps_tail;
441 int conclude_started;
446 * Global consensus identification, computed
447 * from the session id and participating authorities.
449 struct GNUNET_HashCode global_id;
452 * Client that inhabits the session
454 struct GNUNET_SERVER_Client *client;
457 * Queued messages to the client.
459 struct GNUNET_MQ_Handle *client_mq;
462 * Time when the conclusion of the consensus should begin.
464 struct GNUNET_TIME_Absolute conclude_start;
467 * Timeout for all rounds together, single rounds will schedule a timeout task
468 * with a fraction of the conclude timeout.
469 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
471 struct GNUNET_TIME_Absolute conclude_deadline;
473 struct GNUNET_PeerIdentity *peers;
476 * Number of other peers in the consensus.
478 unsigned int num_peers;
481 * Index of the local peer in the peers array
483 unsigned int local_peer_idx;
486 * Listener for requests from other peers.
487 * Uses the session's global id as app id.
489 struct GNUNET_SET_ListenHandle *set_listener;
492 * State of our early stopping scheme.
498 * Linked list of sessions this peer participates in.
500 static struct ConsensusSession *sessions_head;
503 * Linked list of sessions this peer participates in.
505 static struct ConsensusSession *sessions_tail;
508 * Configuration of the consensus service.
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
513 * Handle to the server for this service.
515 static struct GNUNET_SERVER_Handle *srv;
518 * Peer that runs this service.
520 static struct GNUNET_PeerIdentity my_peer;
525 struct GNUNET_STATISTICS_Handle *statistics;
529 finish_task (struct TaskEntry *task);
532 run_ready_steps (struct ConsensusSession *session);
535 phasename (uint16_t phase)
539 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
540 case PHASE_KIND_FINISH: return "FINISH";
541 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
542 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
543 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
545 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
546 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
547 default: return "(unknown)";
553 setname (uint16_t kind)
557 case SET_KIND_CURRENT: return "CURRENT";
558 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
559 case SET_KIND_NONE: return "NONE";
560 default: return "(unknown)";
565 rfnname (uint16_t kind)
569 case RFN_KIND_NONE: return "NONE";
570 case RFN_KIND_ECHO: return "ECHO";
571 case RFN_KIND_CONFIRM: return "CONFIRM";
572 default: return "(unknown)";
577 diffname (uint16_t kind)
581 case DIFF_KIND_NONE: return "NONE";
582 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
583 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
584 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585 default: return "(unknown)";
589 #ifdef GNUNET_EXTRA_LOGGING
593 debug_str_element (const struct GNUNET_SET_Element *el)
595 struct GNUNET_HashCode hash;
597 GNUNET_SET_element_hash (el, &hash);
599 return GNUNET_h2s (&hash);
603 debug_str_task_key (struct TaskKey *tk)
605 static char buf[256];
607 snprintf (buf, sizeof (buf),
608 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
609 phasename (tk->kind), tk->peer1, tk->peer2,
610 tk->leader, tk->repetition);
616 debug_str_diff_key (struct DiffKey *dk)
618 static char buf[256];
620 snprintf (buf, sizeof (buf),
621 "DiffKey kind=%s, k1=%d, k2=%d",
622 diffname (dk->diff_kind), dk->k1, dk->k2);
628 debug_str_set_key (const struct SetKey *sk)
630 static char buf[256];
632 snprintf (buf, sizeof (buf),
633 "SetKey kind=%s, k1=%d, k2=%d",
634 setname (sk->set_kind), sk->k1, sk->k2);
641 debug_str_rfn_key (const struct RfnKey *rk)
643 static char buf[256];
645 snprintf (buf, sizeof (buf),
646 "RfnKey kind=%s, k1=%d, k2=%d",
647 rfnname (rk->rfn_kind), rk->k1, rk->k2);
652 #endif /* GNUNET_EXTRA_LOGGING */
656 * Destroy a session, free all resources associated with it.
658 * @param session the session to destroy
661 destroy_session (struct ConsensusSession *session)
663 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664 if (NULL != session->set_listener)
666 GNUNET_SET_listen_cancel (session->set_listener);
667 session->set_listener = NULL;
669 if (NULL != session->client_mq)
671 GNUNET_MQ_destroy (session->client_mq);
672 session->client_mq = NULL;
673 /* The MQ cleanup will also disconnect the underlying client. */
674 session->client = NULL;
676 if (NULL != session->client)
678 GNUNET_SERVER_client_disconnect (session->client);
679 session->client = NULL;
681 GNUNET_free (session);
686 * Send the final result set of the consensus to the client, element by
690 * @param element the current element, NULL if all elements have been
692 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
695 send_to_client_iter (void *cls,
696 const struct GNUNET_SET_Element *element)
698 struct TaskEntry *task = (struct TaskEntry *) cls;
699 struct ConsensusSession *session = task->step->session;
700 struct GNUNET_MQ_Envelope *ev;
704 struct GNUNET_CONSENSUS_ElementMessage *m;
706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707 "P%d: sending element %s to client\n",
708 session->local_peer_idx,
709 debug_str_element (element));
711 ev = GNUNET_MQ_msg_extra (m, element->size,
712 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
713 m->element_type = htons (element->element_type);
714 GNUNET_memcpy (&m[1], element->data, element->size);
715 GNUNET_MQ_send (session->client_mq, ev);
719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720 "P%d: finished iterating elements for client\n",
721 session->local_peer_idx);
722 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
723 GNUNET_MQ_send (session->client_mq, ev);
729 static struct SetEntry *
730 lookup_set (struct ConsensusSession *session, struct SetKey *key)
732 struct GNUNET_HashCode hash;
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "P%u: looking up set {%s}\n",
736 session->local_peer_idx,
737 debug_str_set_key (key));
739 GNUNET_assert (SET_KIND_NONE != key->set_kind);
740 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
741 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
745 static struct DiffEntry *
746 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
748 struct GNUNET_HashCode hash;
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "P%u: looking up diff {%s}\n",
752 session->local_peer_idx,
753 debug_str_diff_key (key));
755 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
756 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
757 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
761 static struct ReferendumEntry *
762 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
764 struct GNUNET_HashCode hash;
766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767 "P%u: looking up rfn {%s}\n",
768 session->local_peer_idx,
769 debug_str_rfn_key (key));
771 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
772 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
773 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
778 diff_insert (struct DiffEntry *diff,
780 const struct GNUNET_SET_Element *element)
782 struct DiffElementInfo *di;
783 struct GNUNET_HashCode hash;
785 GNUNET_assert ( (1 == weight) || (-1 == weight));
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788 "diff_insert with element size %u\n",
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "hashing element\n");
794 GNUNET_SET_element_hash (element, &hash);
796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
803 di = GNUNET_new (struct DiffElementInfo);
804 di->element = GNUNET_SET_element_dup (element);
805 GNUNET_assert (GNUNET_OK ==
806 GNUNET_CONTAINER_multihashmap_put (diff->changes,
808 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
816 rfn_commit (struct ReferendumEntry *rfn,
817 uint16_t commit_peer)
819 GNUNET_assert (commit_peer < rfn->num_peers);
821 rfn->peer_commited[commit_peer] = GNUNET_YES;
826 rfn_contest (struct ReferendumEntry *rfn,
827 uint16_t contested_peer)
829 GNUNET_assert (contested_peer < rfn->num_peers);
831 rfn->peer_contested[contested_peer] = GNUNET_YES;
836 rfn_noncontested (struct ReferendumEntry *rfn)
842 for (i = 0; i < rfn->num_peers; i++)
843 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
851 rfn_vote (struct ReferendumEntry *rfn,
852 uint16_t voting_peer,
853 enum ReferendumVote vote,
854 const struct GNUNET_SET_Element *element)
856 struct RfnElementInfo *ri;
857 struct GNUNET_HashCode hash;
859 GNUNET_assert (voting_peer < rfn->num_peers);
861 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
862 since VOTE_KEEP is implicit in not voting. */
863 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
865 GNUNET_SET_element_hash (element, &hash);
866 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
870 ri = GNUNET_new (struct RfnElementInfo);
871 ri->element = GNUNET_SET_element_dup (element);
872 ri->votes = GNUNET_new_array (rfn->num_peers, int);
873 GNUNET_assert (GNUNET_OK ==
874 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
876 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
879 ri->votes[voting_peer] = GNUNET_YES;
885 task_other_peer (struct TaskEntry *task)
887 uint16_t me = task->step->session->local_peer_idx;
888 if (task->key.peer1 == me)
889 return task->key.peer2;
890 return task->key.peer1;
895 * Callback for set operation results. Called for each element
899 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
900 * @param status see enum GNUNET_SET_Status
903 set_result_cb (void *cls,
904 const struct GNUNET_SET_Element *element,
905 enum GNUNET_SET_Status status)
907 struct TaskEntry *task = cls;
908 struct ConsensusSession *session = task->step->session;
909 struct SetEntry *output_set = NULL;
910 struct DiffEntry *output_diff = NULL;
911 struct ReferendumEntry *output_rfn = NULL;
912 unsigned int other_idx;
913 struct SetOpCls *setop;
915 setop = &task->cls.setop;
918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919 "P%u: got set result for {%s}, status %u\n",
920 session->local_peer_idx,
921 debug_str_task_key (&task->key),
924 if (GNUNET_NO == task->is_started)
930 if (GNUNET_YES == task->is_finished)
936 other_idx = task_other_peer (task);
938 if (SET_KIND_NONE != setop->output_set.set_kind)
940 output_set = lookup_set (session, &setop->output_set);
941 GNUNET_assert (NULL != output_set);
944 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
946 output_diff = lookup_diff (session, &setop->output_diff);
947 GNUNET_assert (NULL != output_diff);
950 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
952 output_rfn = lookup_rfn (session, &setop->output_rfn);
953 GNUNET_assert (NULL != output_rfn);
956 if (GNUNET_YES == session->peers_blacklisted[other_idx])
958 /* Peer might have been blacklisted
959 by a gradecast running in parallel, ignore elements from now */
960 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
962 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
966 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
968 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
970 GNUNET_assert (NULL != output_rfn);
971 rfn_contest (output_rfn, task_other_peer (task));
978 case GNUNET_SET_STATUS_ADD_LOCAL:
979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980 "Adding element in Task {%s}\n",
981 debug_str_task_key (&task->key));
982 if (NULL != output_set)
984 // FIXME: record pending adds, use callback
985 GNUNET_SET_add_element (output_set->h,
989 #ifdef GNUNET_EXTRA_LOGGING
990 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
991 "P%u: adding element %s into set {%s} of task {%s}\n",
992 session->local_peer_idx,
993 debug_str_element (element),
994 debug_str_set_key (&setop->output_set),
995 debug_str_task_key (&task->key));
998 if (NULL != output_diff)
1000 diff_insert (output_diff, 1, element);
1001 #ifdef GNUNET_EXTRA_LOGGING
1002 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003 "P%u: adding element %s into diff {%s} of task {%s}\n",
1004 session->local_peer_idx,
1005 debug_str_element (element),
1006 debug_str_diff_key (&setop->output_diff),
1007 debug_str_task_key (&task->key));
1010 if (NULL != output_rfn)
1012 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1013 #ifdef GNUNET_EXTRA_LOGGING
1014 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1015 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1016 session->local_peer_idx,
1017 debug_str_element (element),
1018 debug_str_rfn_key (&setop->output_rfn),
1019 debug_str_task_key (&task->key));
1022 // XXX: add result to structures in task
1024 case GNUNET_SET_STATUS_ADD_REMOTE:
1025 if (GNUNET_YES == setop->do_not_remove)
1027 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "Removing element in Task {%s}\n",
1031 debug_str_task_key (&task->key));
1032 if (NULL != output_set)
1034 // FIXME: record pending adds, use callback
1035 GNUNET_SET_remove_element (output_set->h,
1039 #ifdef GNUNET_EXTRA_LOGGING
1040 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1041 "P%u: removing element %s from set {%s} of task {%s}\n",
1042 session->local_peer_idx,
1043 debug_str_element (element),
1044 debug_str_set_key (&setop->output_set),
1045 debug_str_task_key (&task->key));
1048 if (NULL != output_diff)
1050 diff_insert (output_diff, -1, element);
1051 #ifdef GNUNET_EXTRA_LOGGING
1052 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1053 "P%u: removing element %s from diff {%s} of task {%s}\n",
1054 session->local_peer_idx,
1055 debug_str_element (element),
1056 debug_str_diff_key (&setop->output_diff),
1057 debug_str_task_key (&task->key));
1060 if (NULL != output_rfn)
1062 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1063 #ifdef GNUNET_EXTRA_LOGGING
1064 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1065 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1066 session->local_peer_idx,
1067 debug_str_element (element),
1068 debug_str_rfn_key (&setop->output_rfn),
1069 debug_str_task_key (&task->key));
1073 case GNUNET_SET_STATUS_DONE:
1074 // XXX: check first if any changes to the underlying
1075 // set are still pending
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Finishing setop in Task {%s}\n",
1078 debug_str_task_key (&task->key));
1079 if (NULL != output_rfn)
1081 rfn_commit (output_rfn, task_other_peer (task));
1085 case GNUNET_SET_STATUS_FAILURE:
1087 GNUNET_break_op (0);
1107 enum EvilnessSubType
1110 EVILNESS_SUB_REPLACEMENT,
1111 EVILNESS_SUB_NO_REPLACEMENT,
1116 enum EvilnessType type;
1117 enum EvilnessSubType subtype;
1123 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1125 if (0 == strcmp ("replace", evil_subtype_str))
1127 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1129 else if (0 == strcmp ("noreplace", evil_subtype_str))
1131 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1135 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1138 return GNUNET_SYSERR;
1145 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1149 char *evil_type_str = NULL;
1150 char *evil_subtype_str = NULL;
1152 GNUNET_assert (NULL != evil);
1154 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "P%u: no evilness\n",
1158 session->local_peer_idx);
1159 evil->type = EVILNESS_NONE;
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "P%u: got evilness spec\n",
1164 session->local_peer_idx);
1166 for (field = strtok (evil_spec, "/");
1168 field = strtok (NULL, "/"))
1170 unsigned int peer_num;
1171 unsigned int evil_num;
1174 evil_type_str = NULL;
1175 evil_subtype_str = NULL;
1177 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1181 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1182 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1188 GNUNET_assert (NULL != evil_type_str);
1189 GNUNET_assert (NULL != evil_subtype_str);
1191 if (peer_num == session->local_peer_idx)
1193 if (0 == strcmp ("slack", evil_type_str))
1195 evil->type = EVILNESS_SLACK;
1197 else if (0 == strcmp ("cram-all", evil_type_str))
1199 evil->type = EVILNESS_CRAM_ALL;
1200 evil->num = evil_num;
1201 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1204 else if (0 == strcmp ("cram-lead", evil_type_str))
1206 evil->type = EVILNESS_CRAM_LEAD;
1207 evil->num = evil_num;
1208 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1211 else if (0 == strcmp ("cram-echo", evil_type_str))
1213 evil->type = EVILNESS_CRAM_ECHO;
1214 evil->num = evil_num;
1215 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1220 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1221 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1227 /* No GNUNET_free since memory was allocated by libc */
1228 free (evil_type_str);
1229 evil_type_str = NULL;
1230 evil_subtype_str = NULL;
1233 evil->type = EVILNESS_NONE;
1235 GNUNET_free (evil_spec);
1236 /* no GNUNET_free_non_null since it wasn't
1237 * allocated with GNUNET_malloc */
1238 if (NULL != evil_type_str)
1239 free (evil_type_str);
1240 if (NULL != evil_subtype_str)
1241 free (evil_subtype_str);
1248 * Commit the appropriate set for a
1252 commit_set (struct ConsensusSession *session,
1253 struct TaskEntry *task)
1255 struct SetEntry *set;
1256 struct SetOpCls *setop = &task->cls.setop;
1258 GNUNET_assert (NULL != setop->op);
1259 set = lookup_set (session, &setop->input_set);
1260 GNUNET_assert (NULL != set);
1265 struct Evilness evil;
1267 get_evilness (session, &evil);
1268 if (EVILNESS_NONE != evil.type)
1270 /* Useful for evaluation */
1271 GNUNET_STATISTICS_set (statistics,
1278 case EVILNESS_CRAM_ALL:
1279 case EVILNESS_CRAM_LEAD:
1280 case EVILNESS_CRAM_ECHO:
1281 /* We're not cramming elements in the
1282 all-to-all round, since that would just
1283 add more elements to the result set, but
1284 wouldn't test robustness. */
1285 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1287 GNUNET_SET_commit (setop->op, set->h);
1290 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1291 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1293 GNUNET_SET_commit (setop->op, set->h);
1296 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1298 GNUNET_SET_commit (setop->op, set->h);
1301 for (i = 0; i < evil.num; i++)
1303 struct GNUNET_HashCode hash;
1304 struct GNUNET_SET_Element element;
1305 element.data = &hash;
1306 element.size = sizeof (struct GNUNET_HashCode);
1307 element.element_type = 0;
1309 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1311 /* Always generate a new element. */
1312 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1314 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1316 /* Always cram the same elements, derived from counter. */
1317 GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1323 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1324 #ifdef GNUNET_EXTRA_LOGGING
1325 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1327 session->local_peer_idx,
1328 debug_str_element (&element),
1329 debug_str_set_key (&setop->input_set),
1330 debug_str_task_key (&task->key));
1333 GNUNET_STATISTICS_update (statistics,
1334 "# stuffed elements",
1337 GNUNET_SET_commit (setop->op, set->h);
1339 case EVILNESS_SLACK:
1340 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1341 "P%u: evil peer: slacking\n",
1342 (unsigned int) session->local_peer_idx);
1346 GNUNET_SET_commit (setop->op, set->h);
1351 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1353 struct GNUNET_SET_Element element;
1354 struct ContestedPayload payload;
1355 element.data = &payload;
1356 element.size = sizeof (struct ContestedPayload);
1357 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1358 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1360 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1362 GNUNET_SET_commit (setop->op, set->h);
1366 /* For our testcases, we don't want the blacklisted
1368 GNUNET_SET_operation_cancel (setop->op);
1376 put_diff (struct ConsensusSession *session,
1377 struct DiffEntry *diff)
1379 struct GNUNET_HashCode hash;
1381 GNUNET_assert (NULL != diff);
1383 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1384 GNUNET_assert (GNUNET_OK ==
1385 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1386 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1390 put_set (struct ConsensusSession *session,
1391 struct SetEntry *set)
1393 struct GNUNET_HashCode hash;
1395 GNUNET_assert (NULL != set->h);
1397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1399 debug_str_set_key (&set->key));
1401 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1402 GNUNET_assert (GNUNET_SYSERR !=
1403 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1404 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1409 put_rfn (struct ConsensusSession *session,
1410 struct ReferendumEntry *rfn)
1412 struct GNUNET_HashCode hash;
1414 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1415 GNUNET_assert (GNUNET_OK ==
1416 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1417 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1423 task_cancel_reconcile (struct TaskEntry *task)
1425 /* not implemented yet */
1431 apply_diff_to_rfn (struct DiffEntry *diff,
1432 struct ReferendumEntry *rfn,
1433 uint16_t voting_peer,
1436 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1437 struct DiffElementInfo *di;
1439 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1441 while (GNUNET_YES ==
1442 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1444 (const void **) &di))
1448 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1452 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1456 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1463 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1465 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1472 diff_compose (struct DiffEntry *diff_1,
1473 struct DiffEntry *diff_2)
1475 struct DiffEntry *diff_new;
1476 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1477 struct DiffElementInfo *di;
1479 diff_new = diff_create ();
1481 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1482 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1484 diff_insert (diff_new, di->weight, di->element);
1486 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1488 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1489 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1491 diff_insert (diff_new, di->weight, di->element);
1493 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1499 struct ReferendumEntry *
1500 rfn_create (uint16_t size)
1502 struct ReferendumEntry *rfn;
1504 rfn = GNUNET_new (struct ReferendumEntry);
1505 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1506 rfn->peer_commited = GNUNET_new_array (size, int);
1507 rfn->peer_contested = GNUNET_new_array (size, int);
1508 rfn->num_peers = size;
1515 diff_destroy (struct DiffEntry *diff)
1517 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1523 * For a given majority, count what the outcome
1524 * is (add/remove/keep), and give the number
1525 * of peers that voted for this outcome.
1528 rfn_majority (const struct ReferendumEntry *rfn,
1529 const struct RfnElementInfo *ri,
1530 uint16_t *ret_majority,
1531 enum ReferendumVote *ret_vote)
1533 uint16_t votes_yes = 0;
1534 uint16_t num_commited = 0;
1537 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538 "Computing rfn majority for element %s of rfn {%s}\n",
1539 debug_str_element (ri->element),
1540 debug_str_rfn_key (&rfn->key));
1542 for (i = 0; i < rfn->num_peers; i++)
1544 if (GNUNET_NO == rfn->peer_commited[i])
1548 if (GNUNET_YES == ri->votes[i])
1552 if (votes_yes > (num_commited) / 2)
1554 *ret_vote = ri->proposal;
1555 *ret_majority = votes_yes;
1559 *ret_vote = VOTE_STAY;
1560 *ret_majority = num_commited - votes_yes;
1567 struct TaskEntry *task;
1568 struct SetKey dst_set_key;
1573 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1575 struct SetCopyCls *scc = cls;
1576 struct TaskEntry *task = scc->task;
1577 struct SetKey dst_set_key = scc->dst_set_key;
1578 struct SetEntry *set;
1581 set = GNUNET_new (struct SetEntry);
1583 set->key = dst_set_key;
1584 put_set (task->step->session, set);
1591 * Call the start function of the given
1592 * task again after we created a copy of the given set.
1595 create_set_copy_for_task (struct TaskEntry *task,
1596 struct SetKey *src_set_key,
1597 struct SetKey *dst_set_key)
1599 struct SetEntry *src_set;
1600 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603 "Copying set {%s} to {%s} for task {%s}\n",
1604 debug_str_set_key (src_set_key),
1605 debug_str_set_key (dst_set_key),
1606 debug_str_task_key (&task->key));
1609 scc->dst_set_key = *dst_set_key;
1610 src_set = lookup_set (task->step->session, src_set_key);
1611 GNUNET_assert (NULL != src_set);
1612 GNUNET_SET_copy_lazy (src_set->h,
1618 struct SetMutationProgressCls
1622 * Task to finish once all changes are through.
1624 struct TaskEntry *task;
1629 set_mutation_done (void *cls)
1631 struct SetMutationProgressCls *pc = cls;
1633 GNUNET_assert (pc->num_pending > 0);
1637 if (0 == pc->num_pending)
1639 struct TaskEntry *task = pc->task;
1647 try_finish_step_early (struct Step *step)
1651 if (GNUNET_YES == step->is_running)
1653 if (GNUNET_YES == step->is_finished)
1655 if (GNUNET_NO == step->early_finishable)
1658 step->is_finished = GNUNET_YES;
1660 #ifdef GNUNET_EXTRA_LOGGING
1661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662 "Finishing step `%s' early.\n",
1666 for (i = 0; i < step->subordinates_len; i++)
1668 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1669 step->subordinates[i]->pending_prereq--;
1670 #ifdef GNUNET_EXTRA_LOGGING
1671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1672 "Decreased pending_prereq to %u for step `%s'.\n",
1673 (unsigned int) step->subordinates[i]->pending_prereq,
1674 step->subordinates[i]->debug_name);
1677 try_finish_step_early (step->subordinates[i]);
1680 // XXX: maybe schedule as task to avoid recursion?
1681 run_ready_steps (step->session);
1686 finish_step (struct Step *step)
1690 GNUNET_assert (step->finished_tasks == step->tasks_len);
1691 GNUNET_assert (GNUNET_YES == step->is_running);
1692 GNUNET_assert (GNUNET_NO == step->is_finished);
1694 #ifdef GNUNET_EXTRA_LOGGING
1695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696 "All tasks of step `%s' with %u subordinates finished.\n",
1698 step->subordinates_len);
1701 for (i = 0; i < step->subordinates_len; i++)
1703 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1704 step->subordinates[i]->pending_prereq--;
1705 #ifdef GNUNET_EXTRA_LOGGING
1706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1707 "Decreased pending_prereq to %u for step `%s'.\n",
1708 (unsigned int) step->subordinates[i]->pending_prereq,
1709 step->subordinates[i]->debug_name);
1714 step->is_finished = GNUNET_YES;
1716 // XXX: maybe schedule as task to avoid recursion?
1717 run_ready_steps (step->session);
1723 * Apply the result from one round of gradecasts (i.e. every peer
1724 * should have gradecasted) to the peer's current set.
1726 * @param task the task with context information
1729 task_start_apply_round (struct TaskEntry *task)
1731 struct ConsensusSession *session = task->step->session;
1732 struct SetKey sk_in;
1733 struct SetKey sk_out;
1734 struct RfnKey rk_in;
1735 struct SetEntry *set_out;
1736 struct ReferendumEntry *rfn_in;
1737 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1738 struct RfnElementInfo *ri;
1739 struct SetMutationProgressCls *progress_cls;
1740 uint16_t worst_majority = UINT16_MAX;
1742 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1743 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1744 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1746 set_out = lookup_set (session, &sk_out);
1747 if (NULL == set_out)
1749 create_set_copy_for_task (task, &sk_in, &sk_out);
1753 rfn_in = lookup_rfn (session, &rk_in);
1754 GNUNET_assert (NULL != rfn_in);
1756 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1757 progress_cls->task = task;
1759 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1761 while (GNUNET_YES ==
1762 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1764 (const void **) &ri))
1766 uint16_t majority_num;
1767 enum ReferendumVote majority_vote;
1769 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1771 if (worst_majority > majority_num)
1772 worst_majority = majority_num;
1774 switch (majority_vote)
1777 progress_cls->num_pending++;
1778 GNUNET_assert (GNUNET_OK ==
1779 GNUNET_SET_add_element (set_out->h,
1783 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1784 "P%u: apply round: adding element %s with %u-majority.\n",
1785 session->local_peer_idx,
1786 debug_str_element (ri->element), majority_num);
1789 progress_cls->num_pending++;
1790 GNUNET_assert (GNUNET_OK ==
1791 GNUNET_SET_remove_element (set_out->h,
1795 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1796 "P%u: apply round: deleting element %s with %u-majority.\n",
1797 session->local_peer_idx,
1798 debug_str_element (ri->element), majority_num);
1801 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1802 "P%u: apply round: keeping element %s with %u-majority.\n",
1803 session->local_peer_idx,
1804 debug_str_element (ri->element), majority_num);
1813 if (0 == progress_cls->num_pending)
1815 // call closure right now, no pending ops
1816 GNUNET_free (progress_cls);
1821 uint16_t thresh = (session->num_peers / 3) * 2;
1823 if (worst_majority >= thresh)
1825 switch (session->early_stopping)
1827 case EARLY_STOPPING_NONE:
1828 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1829 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1830 "P%u: Stopping early (after one more superround)\n",
1831 session->local_peer_idx);
1833 case EARLY_STOPPING_ONE_MORE:
1834 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1835 session->local_peer_idx);
1836 session->early_stopping = EARLY_STOPPING_DONE;
1839 for (step = session->steps_head; NULL != step; step = step->next)
1840 try_finish_step_early (step);
1843 case EARLY_STOPPING_DONE:
1844 /* We shouldn't be here anymore after early stopping */
1852 else if (EARLY_STOPPING_NONE != session->early_stopping)
1854 // Our assumption about the number of bad peers
1856 GNUNET_break_op (0);
1860 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1861 session->local_peer_idx);
1864 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1869 task_start_grade (struct TaskEntry *task)
1871 struct ConsensusSession *session = task->step->session;
1872 struct ReferendumEntry *output_rfn;
1873 struct ReferendumEntry *input_rfn;
1874 struct DiffEntry *input_diff;
1875 struct RfnKey rfn_key;
1876 struct DiffKey diff_key;
1877 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1878 struct RfnElementInfo *ri;
1879 unsigned int gradecast_confidence = 2;
1881 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1882 output_rfn = lookup_rfn (session, &rfn_key);
1883 if (NULL == output_rfn)
1885 output_rfn = rfn_create (session->num_peers);
1886 output_rfn->key = rfn_key;
1887 put_rfn (session, output_rfn);
1890 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1891 input_diff = lookup_diff (session, &diff_key);
1892 GNUNET_assert (NULL != input_diff);
1894 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1895 input_rfn = lookup_rfn (session, &rfn_key);
1896 GNUNET_assert (NULL != input_rfn);
1898 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1900 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1902 while (GNUNET_YES ==
1903 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1905 (const void **) &ri))
1907 uint16_t majority_num;
1908 enum ReferendumVote majority_vote;
1910 // XXX: we need contested votes and non-contested votes here
1911 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1913 if (majority_num <= session->num_peers / 3)
1914 majority_vote = VOTE_REMOVE;
1916 switch (majority_vote)
1921 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1924 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1931 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1934 uint16_t noncontested;
1935 noncontested = rfn_noncontested (input_rfn);
1936 if (noncontested < (session->num_peers / 3) * 2)
1938 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1940 if (noncontested < (session->num_peers / 3) + 1)
1942 gradecast_confidence = 0;
1946 if (gradecast_confidence >= 1)
1947 rfn_commit (output_rfn, task->key.leader);
1949 if (gradecast_confidence <= 1)
1950 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1957 task_start_reconcile (struct TaskEntry *task)
1959 struct SetEntry *input;
1960 struct SetOpCls *setop = &task->cls.setop;
1961 struct ConsensusSession *session = task->step->session;
1963 input = lookup_set (session, &setop->input_set);
1964 GNUNET_assert (NULL != input);
1965 GNUNET_assert (NULL != input->h);
1967 /* We create the outputs for the operation here
1968 (rather than in the set operation callback)
1969 because we want something valid in there, even
1970 if the other peer doesn't talk to us */
1972 if (SET_KIND_NONE != setop->output_set.set_kind)
1974 /* If we don't have an existing output set,
1975 we clone the input set. */
1976 if (NULL == lookup_set (session, &setop->output_set))
1978 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1983 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1985 if (NULL == lookup_rfn (session, &setop->output_rfn))
1987 struct ReferendumEntry *rfn;
1989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1990 "P%u: output rfn <%s> missing, creating.\n",
1991 session->local_peer_idx,
1992 debug_str_rfn_key (&setop->output_rfn));
1994 rfn = rfn_create (session->num_peers);
1995 rfn->key = setop->output_rfn;
1996 put_rfn (session, rfn);
2000 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2002 if (NULL == lookup_diff (session, &setop->output_diff))
2004 struct DiffEntry *diff;
2006 diff = diff_create ();
2007 diff->key = setop->output_diff;
2008 put_diff (session, diff);
2012 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2014 /* XXX: mark the corresponding rfn as commited if necessary */
2019 if (task->key.peer1 == session->local_peer_idx)
2021 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2023 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024 "P%u: Looking up set {%s} to run remote union\n",
2025 session->local_peer_idx,
2026 debug_str_set_key (&setop->input_set));
2028 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2029 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2031 rcm.kind = htons (task->key.kind);
2032 rcm.peer1 = htons (task->key.peer1);
2033 rcm.peer2 = htons (task->key.peer2);
2034 rcm.leader = htons (task->key.leader);
2035 rcm.repetition = htons (task->key.repetition);
2037 GNUNET_assert (NULL == setop->op);
2038 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2039 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2041 // XXX: maybe this should be done while
2042 // setting up tasks alreays?
2043 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2044 &session->global_id,
2046 GNUNET_SET_RESULT_SYMMETRIC,
2050 commit_set (session, task);
2052 else if (task->key.peer2 == session->local_peer_idx)
2054 /* Wait for the other peer to contact us */
2055 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2056 session->local_peer_idx, task->key.peer1);
2058 if (NULL != setop->op)
2060 commit_set (session, task);
2065 /* We made an error while constructing the task graph. */
2072 task_start_eval_echo (struct TaskEntry *task)
2074 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2075 struct ReferendumEntry *input_rfn;
2076 struct RfnElementInfo *ri;
2077 struct SetEntry *output_set;
2078 struct SetMutationProgressCls *progress_cls;
2079 struct ConsensusSession *session = task->step->session;
2080 struct SetKey sk_in;
2081 struct SetKey sk_out;
2082 struct RfnKey rk_in;
2084 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2085 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2086 output_set = lookup_set (session, &sk_out);
2087 if (NULL == output_set)
2089 create_set_copy_for_task (task, &sk_in, &sk_out);
2095 // FIXME: should be marked as a shallow copy, so
2096 // we can destroy everything correctly
2097 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2098 last_set->h = output_set->h;
2099 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2100 put_set (session, last_set);
2103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104 "Evaluating referendum in Task {%s}\n",
2105 debug_str_task_key (&task->key));
2107 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2108 progress_cls->task = task;
2110 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2111 input_rfn = lookup_rfn (session, &rk_in);
2113 GNUNET_assert (NULL != input_rfn);
2115 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2116 GNUNET_assert (NULL != iter);
2118 while (GNUNET_YES ==
2119 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2121 (const void **) &ri))
2123 enum ReferendumVote majority_vote;
2124 uint16_t majority_num;
2126 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2128 if (majority_num < session->num_peers / 3)
2130 /* It is not the case that all nonfaulty peers
2131 echoed the same value. Since we're doing a set reconciliation, we
2132 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2133 reconciliation as contested. Other peers might not know that the
2134 leader is faulty, thus we still re-distribute in the confirmation
2136 output_set->is_contested = GNUNET_YES;
2139 switch (majority_vote)
2142 progress_cls->num_pending++;
2143 GNUNET_assert (GNUNET_OK ==
2144 GNUNET_SET_add_element (output_set->h,
2150 progress_cls->num_pending++;
2151 GNUNET_assert (GNUNET_OK ==
2152 GNUNET_SET_remove_element (output_set->h,
2158 /* Nothing to do. */
2166 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2168 if (0 == progress_cls->num_pending)
2170 // call closure right now, no pending ops
2171 GNUNET_free (progress_cls);
2178 task_start_finish (struct TaskEntry *task)
2180 struct SetEntry *final_set;
2181 struct ConsensusSession *session = task->step->session;
2183 final_set = lookup_set (session, &task->cls.finish.input_set);
2185 GNUNET_assert (NULL != final_set);
2188 GNUNET_SET_iterate (final_set->h,
2189 send_to_client_iter,
2194 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2198 GNUNET_assert (GNUNET_NO == task->is_started);
2199 GNUNET_assert (GNUNET_NO == task->is_finished);
2200 GNUNET_assert (NULL != task->start);
2204 task->is_started = GNUNET_YES;
2211 * Run all steps of the session that don't any
2212 * more dependencies.
2215 run_ready_steps (struct ConsensusSession *session)
2219 step = session->steps_head;
2221 while (NULL != step)
2223 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2227 GNUNET_assert (0 == step->finished_tasks);
2229 #ifdef GNUNET_EXTRA_LOGGING
2230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2231 session->local_peer_idx,
2233 step->round, step->tasks_len, step->subordinates_len);
2236 step->is_running = GNUNET_YES;
2237 for (i = 0; i < step->tasks_len; i++)
2238 start_task (session, step->tasks[i]);
2240 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2241 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2244 /* Running the next ready steps will be triggered by task completion */
2256 finish_task (struct TaskEntry *task)
2258 GNUNET_assert (GNUNET_NO == task->is_finished);
2259 task->is_finished = GNUNET_YES;
2261 task->step->finished_tasks++;
2263 if (task->step->finished_tasks == task->step->tasks_len)
2264 finish_step (task->step);
2269 * Search peer in the list of peers in session.
2271 * @param peer peer to find
2272 * @param session session with peer
2273 * @return index of peer, -1 if peer is not in session
2276 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2279 for (i = 0; i < session->num_peers; i++)
2280 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2287 * Compute a global, (hopefully) unique consensus session id,
2288 * from the local id of the consensus session, and the identities of all participants.
2289 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2290 * exactly the same peers, the global id will be different.
2292 * @param session session to generate the global id for
2293 * @param local_session_id local id of the consensus session
2296 compute_global_id (struct ConsensusSession *session,
2297 const struct GNUNET_HashCode *local_session_id)
2299 const char *salt = "gnunet-service-consensus/session_id";
2301 GNUNET_assert (GNUNET_YES ==
2302 GNUNET_CRYPTO_kdf (&session->global_id,
2303 sizeof (struct GNUNET_HashCode),
2307 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2309 sizeof (struct GNUNET_HashCode),
2315 * Compare two peer identities.
2317 * @param h1 some peer identity
2318 * @param h2 some peer identity
2319 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2322 peer_id_cmp (const void *h1, const void *h2)
2324 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2329 * Create the sorted list of peers for the session,
2330 * add the local peer if not in the join message.
2333 initialize_session_peer_list (struct ConsensusSession *session,
2334 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2336 unsigned int local_peer_in_list;
2337 uint32_t listed_peers;
2338 const struct GNUNET_PeerIdentity *msg_peers;
2341 GNUNET_assert (NULL != join_msg);
2343 /* peers in the join message, may or may not include the local peer */
2344 listed_peers = ntohl (join_msg->num_peers);
2346 session->num_peers = listed_peers;
2348 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2350 local_peer_in_list = GNUNET_NO;
2351 for (i = 0; i < listed_peers; i++)
2353 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2355 local_peer_in_list = GNUNET_YES;
2360 if (GNUNET_NO == local_peer_in_list)
2361 session->num_peers++;
2363 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2365 if (GNUNET_NO == local_peer_in_list)
2366 session->peers[session->num_peers - 1] = my_peer;
2368 GNUNET_memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2369 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2373 static struct TaskEntry *
2374 lookup_task (struct ConsensusSession *session,
2375 struct TaskKey *key)
2377 struct GNUNET_HashCode hash;
2380 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2382 GNUNET_h2s (&hash));
2383 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2388 * Called when another peer wants to do a set operation with the
2391 * @param cls closure
2392 * @param other_peer the other peer
2393 * @param context_msg message with application specific information from
2395 * @param request request from the other peer, use GNUNET_SET_accept
2396 * to accept it, otherwise the request will be refused
2397 * Note that we don't use a return value here, as it is also
2398 * necessary to specify the set we want to do the operation with,
2399 * whith sometimes can be derived from the context message.
2400 * Also necessary to specify the timeout.
2403 set_listen_cb (void *cls,
2404 const struct GNUNET_PeerIdentity *other_peer,
2405 const struct GNUNET_MessageHeader *context_msg,
2406 struct GNUNET_SET_Request *request)
2408 struct ConsensusSession *session = cls;
2410 struct TaskEntry *task;
2411 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2413 if (NULL == context_msg)
2415 GNUNET_break_op (0);
2419 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2421 GNUNET_break_op (0);
2425 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2427 GNUNET_break_op (0);
2431 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2433 tk = ((struct TaskKey) {
2434 .kind = ntohs (cm->kind),
2435 .peer1 = ntohs (cm->peer1),
2436 .peer2 = ntohs (cm->peer2),
2437 .repetition = ntohs (cm->repetition),
2438 .leader = ntohs (cm->leader),
2441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2442 session->local_peer_idx, debug_str_task_key (&tk));
2444 task = lookup_task (session, &tk);
2448 GNUNET_break_op (0);
2452 if (GNUNET_YES == task->is_finished)
2454 GNUNET_break_op (0);
2458 if (task->key.peer2 != session->local_peer_idx)
2460 /* We're being asked, so we must be thne 2nd peer. */
2461 GNUNET_break_op (0);
2465 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2466 (task->key.peer2 == session->local_peer_idx)));
2468 task->cls.setop.op = GNUNET_SET_accept (request,
2469 GNUNET_SET_RESULT_SYMMETRIC,
2473 /* If the task hasn't been started yet,
2474 we wait for that until we commit. */
2476 if (GNUNET_YES == task->is_started)
2478 commit_set (session, task);
2485 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2486 struct TaskEntry *t)
2488 struct GNUNET_HashCode round_hash;
2491 GNUNET_assert (NULL != t->step);
2493 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2497 if (s->tasks_len == s->tasks_cap)
2499 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2500 GNUNET_array_grow (s->tasks,
2505 #ifdef GNUNET_EXTRA_LOGGING
2506 GNUNET_assert (NULL != s->debug_name);
2507 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2508 debug_str_task_key (&t->key),
2512 s->tasks[s->tasks_len] = t;
2515 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2516 GNUNET_assert (GNUNET_OK ==
2517 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2518 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2523 install_step_timeouts (struct ConsensusSession *session)
2525 /* Given the fully constructed task graph
2526 with rounds for tasks, we can give the tasks timeouts. */
2528 // unsigned int max_round;
2530 /* XXX: implement! */
2536 * Arrange two peers in some canonical order.
2539 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2544 GNUNET_assert (*p1 < n);
2545 GNUNET_assert (*p2 < n);
2558 /* For uniformly random *p1, *p2,
2559 this condition is true with 50% chance */
2560 if (((b - a) + n) % n <= n / 2)
2574 * Record @a dep as a dependency of @a step.
2577 step_depend_on (struct Step *step, struct Step *dep)
2579 /* We're not checking for cyclic dependencies,
2580 but this is a cheap sanity check. */
2581 GNUNET_assert (step != dep);
2582 GNUNET_assert (NULL != step);
2583 GNUNET_assert (NULL != dep);
2584 GNUNET_assert (dep->round <= step->round);
2586 #ifdef GNUNET_EXTRA_LOGGING
2587 /* Make sure we have complete debugging information.
2588 Also checks that we don't screw up too badly
2589 constructing the task graph. */
2590 GNUNET_assert (NULL != step->debug_name);
2591 GNUNET_assert (NULL != dep->debug_name);
2592 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593 "Making step `%s' depend on `%s'\n",
2598 if (dep->subordinates_cap == dep->subordinates_len)
2600 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2601 GNUNET_array_grow (dep->subordinates,
2602 dep->subordinates_cap,
2606 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2608 dep->subordinates[dep->subordinates_len] = step;
2609 dep->subordinates_len++;
2611 step->pending_prereq++;
2615 static struct Step *
2616 create_step (struct ConsensusSession *session, int round, int early_finishable)
2619 step = GNUNET_new (struct Step);
2620 step->session = session;
2621 step->round = round;
2622 step->early_finishable = early_finishable;
2623 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2624 session->steps_tail,
2631 * Construct the task graph for a single
2635 construct_task_graph_gradecast (struct ConsensusSession *session,
2638 struct Step *step_before,
2639 struct Step *step_after)
2641 uint16_t n = session->num_peers;
2642 uint16_t me = session->local_peer_idx;
2647 /* The task we're currently setting up. */
2648 struct TaskEntry task;
2651 struct Step *prev_step;
2657 round = step_before->round + 1;
2659 /* gcast step 1: leader disseminates */
2661 step = create_step (session, round, GNUNET_YES);
2663 #ifdef GNUNET_EXTRA_LOGGING
2664 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2666 step_depend_on (step, step_before);
2670 for (k = 0; k < n; k++)
2676 arrange_peers (&p1, &p2, n);
2677 task = ((struct TaskEntry) {
2679 .start = task_start_reconcile,
2680 .cancel = task_cancel_reconcile,
2681 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2683 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2684 put_task (session->taskmap, &task);
2686 /* We run this task to make sure that the leader
2687 has the stored the SET_KIND_LEADER set of himself,
2688 so he can participate in the rest of the gradecast
2689 without the code having to handle any special cases. */
2690 task = ((struct TaskEntry) {
2692 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2693 .start = task_start_reconcile,
2694 .cancel = task_cancel_reconcile,
2696 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2697 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2698 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2699 put_task (session->taskmap, &task);
2705 arrange_peers (&p1, &p2, n);
2706 task = ((struct TaskEntry) {
2708 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2709 .start = task_start_reconcile,
2710 .cancel = task_cancel_reconcile,
2712 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2713 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2714 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2715 put_task (session->taskmap, &task);
2718 /* gcast phase 2: echo */
2721 step = create_step (session, round, GNUNET_YES);
2722 #ifdef GNUNET_EXTRA_LOGGING
2723 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2725 step_depend_on (step, prev_step);
2727 for (k = 0; k < n; k++)
2731 arrange_peers (&p1, &p2, n);
2732 task = ((struct TaskEntry) {
2734 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2735 .start = task_start_reconcile,
2736 .cancel = task_cancel_reconcile,
2738 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2739 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2740 put_task (session->taskmap, &task);
2744 /* Same round, since step only has local tasks */
2745 step = create_step (session, round, GNUNET_YES);
2746 #ifdef GNUNET_EXTRA_LOGGING
2747 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2749 step_depend_on (step, prev_step);
2751 arrange_peers (&p1, &p2, n);
2752 task = ((struct TaskEntry) {
2753 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2755 .start = task_start_eval_echo
2757 put_task (session->taskmap, &task);
2761 step = create_step (session, round, GNUNET_YES);
2762 #ifdef GNUNET_EXTRA_LOGGING
2763 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2765 step_depend_on (step, prev_step);
2767 /* gcast phase 3: confirmation and grading */
2768 for (k = 0; k < n; k++)
2772 arrange_peers (&p1, &p2, n);
2773 task = ((struct TaskEntry) {
2775 .start = task_start_reconcile,
2776 .cancel = task_cancel_reconcile,
2777 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2779 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2780 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2781 /* If there was at least one element in the echo round that was
2782 contested (i.e. it had no n-t majority), then we let the other peers
2783 know, and other peers let us know. The contested flag for each peer is
2784 stored in the rfn. */
2785 task.cls.setop.transceive_contested = GNUNET_YES;
2786 put_task (session->taskmap, &task);
2790 /* Same round, since step only has local tasks */
2791 step = create_step (session, round, GNUNET_YES);
2792 #ifdef GNUNET_EXTRA_LOGGING
2793 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2795 step_depend_on (step, prev_step);
2797 task = ((struct TaskEntry) {
2799 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2800 .start = task_start_grade,
2802 put_task (session->taskmap, &task);
2804 step_depend_on (step_after, step);
2809 construct_task_graph (struct ConsensusSession *session)
2811 uint16_t n = session->num_peers;
2814 uint16_t me = session->local_peer_idx;
2816 /* The task we're currently setting up. */
2817 struct TaskEntry task;
2819 /* Current leader */
2823 struct Step *prev_step;
2825 unsigned int round = 0;
2829 // XXX: introduce first step,
2830 // where we wait for all insert acks
2831 // from the set service
2833 /* faster but brittle all-to-all */
2835 // XXX: Not implemented yet
2837 /* all-to-all step */
2839 step = create_step (session, round, GNUNET_NO);
2841 #ifdef GNUNET_EXTRA_LOGGING
2842 step->debug_name = GNUNET_strdup ("all to all");
2845 for (i = 0; i < n; i++)
2852 arrange_peers (&p1, &p2, n);
2853 task = ((struct TaskEntry) {
2854 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2856 .start = task_start_reconcile,
2857 .cancel = task_cancel_reconcile,
2859 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2860 task.cls.setop.output_set = task.cls.setop.input_set;
2861 task.cls.setop.do_not_remove = GNUNET_YES;
2862 put_task (session->taskmap, &task);
2870 /* Byzantine union */
2872 /* sequential repetitions of the gradecasts */
2873 for (i = 0; i < t + 1; i++)
2875 struct Step *step_rep_start;
2876 struct Step *step_rep_end;
2878 /* Every repetition is in a separate round. */
2879 step_rep_start = create_step (session, round, GNUNET_YES);
2880 #ifdef GNUNET_EXTRA_LOGGING
2881 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2884 step_depend_on (step_rep_start, prev_step);
2886 /* gradecast has three rounds */
2888 step_rep_end = create_step (session, round, GNUNET_YES);
2889 #ifdef GNUNET_EXTRA_LOGGING
2890 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2893 /* parallel gradecasts */
2894 for (lead = 0; lead < n; lead++)
2895 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2897 task = ((struct TaskEntry) {
2898 .step = step_rep_end,
2899 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2900 .start = task_start_apply_round,
2902 put_task (session->taskmap, &task);
2904 prev_step = step_rep_end;
2907 /* There is no next gradecast round, thus the final
2908 start step is the overall end step of the gradecasts */
2910 step = create_step (session, round, GNUNET_NO);
2911 #ifdef GNUNET_EXTRA_LOGGING
2912 GNUNET_asprintf (&step->debug_name, "finish");
2914 step_depend_on (step, prev_step);
2916 task = ((struct TaskEntry) {
2918 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2919 .start = task_start_finish,
2921 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2923 put_task (session->taskmap, &task);
2928 * Initialize the session, continue receiving messages from the owning client
2930 * @param session the session to initialize
2931 * @param join_msg the join message from the client
2934 initialize_session (struct ConsensusSession *session,
2935 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2937 struct ConsensusSession *other_session;
2939 initialize_session_peer_list (session, join_msg);
2940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2941 compute_global_id (session, &join_msg->session_id);
2943 /* Check if some local client already owns the session.
2944 It is only legal to have a session with an existing global id
2945 if all other sessions with this global id are finished.*/
2946 other_session = sessions_head;
2947 while (NULL != other_session)
2949 if ((other_session != session) &&
2950 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2952 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2954 // GNUNET_break (0);
2955 // destroy_session (session);
2960 other_session = other_session->next;
2963 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2964 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2966 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n",
2967 (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2969 session->local_peer_idx = get_peer_idx (&my_peer, session);
2970 GNUNET_assert (-1 != session->local_peer_idx);
2971 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2972 &session->global_id,
2973 set_listen_cb, session);
2974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2976 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2977 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2978 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2979 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2982 struct SetEntry *client_set;
2983 client_set = GNUNET_new (struct SetEntry);
2984 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2985 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2986 put_set (session, client_set);
2989 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2991 /* Just construct the task graph,
2992 but don't run anything until the client calls conclude. */
2993 construct_task_graph (session);
2995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2999 static struct ConsensusSession *
3000 get_session_by_client (struct GNUNET_SERVER_Client *client)
3002 struct ConsensusSession *session;
3004 session = sessions_head;
3005 while (NULL != session)
3007 if (session->client == client)
3009 session = session->next;
3016 * Called when a client wants to join a consensus session.
3019 * @param client client that sent the message
3020 * @param m message sent by the client
3023 client_join (void *cls,
3024 struct GNUNET_SERVER_Client *client,
3025 const struct GNUNET_MessageHeader *m)
3027 struct ConsensusSession *session;
3029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3031 session = get_session_by_client (client);
3032 if (NULL != session)
3035 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3038 session = GNUNET_new (struct ConsensusSession);
3039 session->client = client;
3040 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3041 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3042 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3043 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3050 client_insert_done (void *cls)
3057 * Called when a client performs an insert operation.
3059 * @param cls (unused)
3060 * @param client client handle
3061 * @param m message sent by the client
3064 client_insert (void *cls,
3065 struct GNUNET_SERVER_Client *client,
3066 const struct GNUNET_MessageHeader *m)
3068 struct ConsensusSession *session;
3069 struct GNUNET_CONSENSUS_ElementMessage *msg;
3070 struct GNUNET_SET_Element *element;
3071 ssize_t element_size;
3072 struct GNUNET_SET_Handle *initial_set;
3074 session = get_session_by_client (client);
3076 if (NULL == session)
3079 GNUNET_SERVER_client_disconnect (client);
3083 if (GNUNET_YES == session->conclude_started)
3086 GNUNET_SERVER_client_disconnect (client);
3090 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3091 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3092 if (element_size < 0)
3098 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3099 element->element_type = msg->element_type;
3100 element->size = element_size;
3101 GNUNET_memcpy (&element[1], &msg[1], element_size);
3102 element->data = &element[1];
3104 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3105 struct SetEntry *entry;
3106 entry = lookup_set (session, &key);
3107 GNUNET_assert (NULL != entry);
3108 initial_set = entry->h;
3110 session->num_client_insert_pending++;
3111 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
3113 #ifdef GNUNET_EXTRA_LOGGING
3115 struct GNUNET_HashCode hash;
3117 GNUNET_SET_element_hash (element, &hash);
3119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
3120 session->local_peer_idx,
3121 GNUNET_h2s (&hash));
3125 GNUNET_free (element);
3126 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3131 * Called when a client performs the conclude operation.
3133 * @param cls (unused)
3134 * @param client client handle
3135 * @param message message sent by the client
3138 client_conclude (void *cls,
3139 struct GNUNET_SERVER_Client *client,
3140 const struct GNUNET_MessageHeader *message)
3142 struct ConsensusSession *session;
3144 session = get_session_by_client (client);
3145 if (NULL == session)
3147 /* client not found */
3149 GNUNET_SERVER_client_disconnect (client);
3153 if (GNUNET_YES == session->conclude_started)
3155 /* conclude started twice */
3157 GNUNET_SERVER_client_disconnect (client);
3158 destroy_session (session);
3162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3164 session->conclude_started = GNUNET_YES;
3166 install_step_timeouts (session);
3167 run_ready_steps (session);
3170 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3175 * Called to clean up, after a shutdown has been requested.
3177 * @param cls closure
3180 shutdown_task (void *cls)
3182 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n");
3183 while (NULL != sessions_head)
3184 destroy_session (sessions_head);
3186 GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
3191 * Clean up after a client after it is
3192 * disconnected (either by us or by itself)
3194 * @param cls closure, unused
3195 * @param client the client to clean up after
3198 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3200 struct ConsensusSession *session;
3202 session = get_session_by_client (client);
3203 if (NULL == session)
3205 // FIXME: destroy if we can
3211 * Start processing consensus requests.
3213 * @param cls closure
3214 * @param server the initialized server
3215 * @param c configuration to use
3218 run (void *cls, struct GNUNET_SERVER_Handle *server,
3219 const struct GNUNET_CONFIGURATION_Handle *c)
3221 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3222 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3223 sizeof (struct GNUNET_MessageHeader)},
3224 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3225 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3231 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3233 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3235 GNUNET_SCHEDULER_shutdown ();
3238 statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3239 GNUNET_SERVER_add_handlers (server, server_handlers);
3240 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
3241 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3242 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3247 * The main function for the consensus service.
3249 * @param argc number of arguments from the command line
3250 * @param argv command line arguments
3251 * @return 0 ok, 1 on error
3254 main (int argc, char *const *argv)
3257 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3258 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3259 return (GNUNET_OK == ret) ? 0 : 1;