2 This file is part of GNUnet
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
48 * Vote that an element should be added.
52 * Vote that an element should be removed.
58 enum EarlyStoppingPhase
60 EARLY_STOPPING_NONE = 0,
61 EARLY_STOPPING_ONE_MORE = 1,
62 EARLY_STOPPING_DONE = 2,
66 GNUNET_NETWORK_STRUCT_BEGIN
69 struct ContestedPayload
74 * Tuple of integers that together
75 * identify a task uniquely.
79 * A value from 'enum PhaseKind'.
81 uint16_t kind GNUNET_PACKED;
84 * Number of the first peer
87 int16_t peer1 GNUNET_PACKED;
90 * Number of the second peer in canonical order.
92 int16_t peer2 GNUNET_PACKED;
95 * Repetition of the gradecast phase.
97 int16_t repetition GNUNET_PACKED;
100 * Leader in the gradecast phase.
102 * Can be different from both peer1 and peer2.
104 int16_t leader GNUNET_PACKED;
111 int set_kind GNUNET_PACKED;
112 int k1 GNUNET_PACKED;
113 int k2 GNUNET_PACKED;
120 struct GNUNET_SET_Handle *h;
122 * GNUNET_YES if the set resulted
123 * from applying a referendum with contested
132 int diff_kind GNUNET_PACKED;
133 int k1 GNUNET_PACKED;
134 int k2 GNUNET_PACKED;
139 int rfn_kind GNUNET_PACKED;
140 int k1 GNUNET_PACKED;
141 int k2 GNUNET_PACKED;
145 GNUNET_NETWORK_STRUCT_END
149 PHASE_KIND_ALL_TO_ALL,
150 PHASE_KIND_GRADECAST_LEADER,
151 PHASE_KIND_GRADECAST_ECHO,
152 PHASE_KIND_GRADECAST_ECHO_GRADE,
153 PHASE_KIND_GRADECAST_CONFIRM,
154 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
156 * Apply a repetition of the all-to-all
157 * gradecast to the current set.
159 PHASE_KIND_APPLY_REP,
169 * Last result set from a gradecast
171 SET_KIND_LAST_GRADECAST,
172 SET_KIND_LEADER_PROPOSAL,
173 SET_KIND_ECHO_RESULT,
179 DIFF_KIND_LEADER_PROPOSAL,
180 DIFF_KIND_LEADER_CONSENSUS,
181 DIFF_KIND_GRADECAST_RESULT,
189 RFN_KIND_GRADECAST_RESULT
195 struct SetKey input_set;
197 struct SetKey output_set;
198 struct RfnKey output_rfn;
199 struct DiffKey output_diff;
203 int transceive_contested;
205 struct GNUNET_SET_OperationHandle *op;
211 struct SetKey input_set;
215 * Closure for both @a start_task
216 * and @a cancel_task.
220 struct SetOpCls setop;
221 struct FinishCls finish;
226 typedef void (*TaskFunc) (struct TaskEntry *task);
229 * Node in the consensus task graph.
244 union TaskFuncCls cls;
251 * All steps of one session are in a
252 * linked list for easier deallocation.
257 * All steps of one session are in a
258 * linked list for easier deallocation.
262 struct ConsensusSession *session;
265 * Tasks that this step is composed of.
267 struct TaskEntry **tasks;
268 unsigned int tasks_len;
269 unsigned int tasks_cap;
271 unsigned int finished_tasks;
274 * Tasks that have this task as dependency.
276 * We store pointers to subordinates rather
277 * than to prerequisites since it makes
278 * tracking the readiness of a task easier.
280 struct Step **subordinates;
281 unsigned int subordinates_len;
282 unsigned int subordinates_cap;
285 * Counter for the prerequisites of
288 size_t pending_prereq;
291 * Task that will run this step despite
292 * any pending prerequisites.
294 struct GNUNET_SCHEDULER_Task *timeout_task;
296 unsigned int is_running;
298 unsigned int is_finished;
301 * Synchrony round of the task.
302 * Determines the deadline for the task.
307 * Human-readable name for
308 * the task, used for debugging.
313 * When we're doing an early finish, how should this step be
315 * If GNUNET_YES, the step will be marked as finished
316 * without actually running its tasks.
317 * Otherwise, the step will still be run even after
320 * Note that a task may never be finished early if
321 * it is already running.
323 int early_finishable;
327 struct RfnElementInfo
329 const struct GNUNET_SET_Element *element;
332 * GNUNET_YES if the peer votes for the proposal.
337 * Proposal for this element,
338 * can only be VOTE_ADD or VOTE_REMOVE.
340 enum ReferendumVote proposal;
344 struct ReferendumEntry
349 * Elements where there is at least one proposed change.
351 * Maps the hash of the GNUNET_SET_Element
352 * to 'struct RfnElementInfo'.
354 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
356 unsigned int num_peers;
359 * Stores, for every peer in the session,
360 * whether the peer finished the whole referendum.
362 * Votes from peers are only counted if they're
363 * marked as commited (#GNUNET_YES) in the referendum.
365 * Otherwise (#GNUNET_NO), the requested changes are
366 * not counted for majority votes or thresholds.
372 * Contestation state of the peer. If a peer is contested, the values it
373 * contributed are still counted for applying changes, but the grading is
380 struct DiffElementInfo
382 const struct GNUNET_SET_Element *element;
385 * Positive weight for 'add', negative
386 * weights for 'remove'.
398 struct GNUNET_CONTAINER_MultiHashMap *changes;
404 * A consensus session consists of one local client and the remote authorities.
406 struct ConsensusSession
409 * Consensus sessions are kept in a DLL.
411 struct ConsensusSession *next;
414 * Consensus sessions are kept in a DLL.
416 struct ConsensusSession *prev;
418 unsigned int num_client_insert_pending;
420 struct GNUNET_CONTAINER_MultiHashMap *setmap;
421 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
422 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
425 * Array of peers with length 'num_peers'.
427 int *peers_blacklisted;
430 * Mapping from (hashed) TaskKey to TaskEntry.
432 * We map the application_id for a round to the task that should be
433 * executed, so we don't have to go through all task whenever we get
434 * an incoming set op request.
436 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
438 struct Step *steps_head;
439 struct Step *steps_tail;
441 int conclude_started;
446 * Global consensus identification, computed
447 * from the session id and participating authorities.
449 struct GNUNET_HashCode global_id;
452 * Client that inhabits the session
454 struct GNUNET_SERVER_Client *client;
457 * Queued messages to the client.
459 struct GNUNET_MQ_Handle *client_mq;
462 * Time when the conclusion of the consensus should begin.
464 struct GNUNET_TIME_Absolute conclude_start;
467 * Timeout for all rounds together, single rounds will schedule a timeout task
468 * with a fraction of the conclude timeout.
469 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
471 struct GNUNET_TIME_Absolute conclude_deadline;
473 struct GNUNET_PeerIdentity *peers;
476 * Number of other peers in the consensus.
478 unsigned int num_peers;
481 * Index of the local peer in the peers array
483 unsigned int local_peer_idx;
486 * Listener for requests from other peers.
487 * Uses the session's global id as app id.
489 struct GNUNET_SET_ListenHandle *set_listener;
492 * State of our early stopping scheme.
498 * Linked list of sessions this peer participates in.
500 static struct ConsensusSession *sessions_head;
503 * Linked list of sessions this peer participates in.
505 static struct ConsensusSession *sessions_tail;
508 * Configuration of the consensus service.
510 static const struct GNUNET_CONFIGURATION_Handle *cfg;
513 * Handle to the server for this service.
515 static struct GNUNET_SERVER_Handle *srv;
518 * Peer that runs this service.
520 static struct GNUNET_PeerIdentity my_peer;
525 struct GNUNET_STATISTICS_Handle *statistics;
529 finish_task (struct TaskEntry *task);
532 run_ready_steps (struct ConsensusSession *session);
535 phasename (uint16_t phase)
539 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
540 case PHASE_KIND_FINISH: return "FINISH";
541 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
542 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
543 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
545 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
546 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
547 default: return "(unknown)";
553 setname (uint16_t kind)
557 case SET_KIND_CURRENT: return "CURRENT";
558 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
559 case SET_KIND_NONE: return "NONE";
560 default: return "(unknown)";
565 rfnname (uint16_t kind)
569 case RFN_KIND_NONE: return "NONE";
570 case RFN_KIND_ECHO: return "ECHO";
571 case RFN_KIND_CONFIRM: return "CONFIRM";
572 default: return "(unknown)";
577 diffname (uint16_t kind)
581 case DIFF_KIND_NONE: return "NONE";
582 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
583 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
584 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
585 default: return "(unknown)";
589 #ifdef GNUNET_EXTRA_LOGGING
593 debug_str_element (const struct GNUNET_SET_Element *el)
595 struct GNUNET_HashCode hash;
597 GNUNET_SET_element_hash (el, &hash);
599 return GNUNET_h2s (&hash);
603 debug_str_task_key (struct TaskKey *tk)
605 static char buf[256];
607 snprintf (buf, sizeof (buf),
608 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
609 phasename (tk->kind), tk->peer1, tk->peer2,
610 tk->leader, tk->repetition);
616 debug_str_diff_key (struct DiffKey *dk)
618 static char buf[256];
620 snprintf (buf, sizeof (buf),
621 "DiffKey kind=%s, k1=%d, k2=%d",
622 diffname (dk->diff_kind), dk->k1, dk->k2);
628 debug_str_set_key (const struct SetKey *sk)
630 static char buf[256];
632 snprintf (buf, sizeof (buf),
633 "SetKey kind=%s, k1=%d, k2=%d",
634 setname (sk->set_kind), sk->k1, sk->k2);
641 debug_str_rfn_key (const struct RfnKey *rk)
643 static char buf[256];
645 snprintf (buf, sizeof (buf),
646 "RfnKey kind=%s, k1=%d, k2=%d",
647 rfnname (rk->rfn_kind), rk->k1, rk->k2);
652 #endif /* GNUNET_EXTRA_LOGGING */
656 * Destroy a session, free all resources associated with it.
658 * @param session the session to destroy
661 destroy_session (struct ConsensusSession *session)
663 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664 if (NULL != session->set_listener)
666 GNUNET_SET_listen_cancel (session->set_listener);
667 session->set_listener = NULL;
669 if (NULL != session->client_mq)
671 GNUNET_MQ_destroy (session->client_mq);
672 session->client_mq = NULL;
673 /* The MQ cleanup will also disconnect the underlying client. */
674 session->client = NULL;
676 if (NULL != session->client)
678 GNUNET_SERVER_client_disconnect (session->client);
679 session->client = NULL;
681 GNUNET_free (session);
686 * Send the final result set of the consensus to the client, element by
690 * @param element the current element, NULL if all elements have been
692 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
695 send_to_client_iter (void *cls,
696 const struct GNUNET_SET_Element *element)
698 struct TaskEntry *task = (struct TaskEntry *) cls;
699 struct ConsensusSession *session = task->step->session;
700 struct GNUNET_MQ_Envelope *ev;
704 struct GNUNET_CONSENSUS_ElementMessage *m;
706 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707 "P%d: sending element %s to client\n",
708 session->local_peer_idx,
709 debug_str_element (element));
711 ev = GNUNET_MQ_msg_extra (m, element->size,
712 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
713 m->element_type = htons (element->element_type);
714 memcpy (&m[1], element->data, element->size);
715 GNUNET_MQ_send (session->client_mq, ev);
719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720 "P%d: finished iterating elements for client\n",
721 session->local_peer_idx);
722 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
723 GNUNET_MQ_send (session->client_mq, ev);
729 static struct SetEntry *
730 lookup_set (struct ConsensusSession *session, struct SetKey *key)
732 struct GNUNET_HashCode hash;
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735 "P%u: looking up set {%s}\n",
736 session->local_peer_idx,
737 debug_str_set_key (key));
739 GNUNET_assert (SET_KIND_NONE != key->set_kind);
740 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
741 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
745 static struct DiffEntry *
746 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
748 struct GNUNET_HashCode hash;
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "P%u: looking up diff {%s}\n",
752 session->local_peer_idx,
753 debug_str_diff_key (key));
755 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
756 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
757 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
761 static struct ReferendumEntry *
762 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
764 struct GNUNET_HashCode hash;
766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767 "P%u: looking up rfn {%s}\n",
768 session->local_peer_idx,
769 debug_str_rfn_key (key));
771 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
772 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
773 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
778 diff_insert (struct DiffEntry *diff,
780 const struct GNUNET_SET_Element *element)
782 struct DiffElementInfo *di;
783 struct GNUNET_HashCode hash;
785 GNUNET_assert ( (1 == weight) || (-1 == weight));
787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788 "diff_insert with element size %u\n",
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "hashing element\n");
794 GNUNET_SET_element_hash (element, &hash);
796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
803 di = GNUNET_new (struct DiffElementInfo);
804 di->element = GNUNET_SET_element_dup (element);
805 GNUNET_assert (GNUNET_OK ==
806 GNUNET_CONTAINER_multihashmap_put (diff->changes,
808 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
816 rfn_commit (struct ReferendumEntry *rfn,
817 uint16_t commit_peer)
819 GNUNET_assert (commit_peer < rfn->num_peers);
821 rfn->peer_commited[commit_peer] = GNUNET_YES;
826 rfn_contest (struct ReferendumEntry *rfn,
827 uint16_t contested_peer)
829 GNUNET_assert (contested_peer < rfn->num_peers);
831 rfn->peer_contested[contested_peer] = GNUNET_YES;
836 rfn_noncontested (struct ReferendumEntry *rfn)
842 for (i = 0; i < rfn->num_peers; i++)
843 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
851 rfn_vote (struct ReferendumEntry *rfn,
852 uint16_t voting_peer,
853 enum ReferendumVote vote,
854 const struct GNUNET_SET_Element *element)
856 struct RfnElementInfo *ri;
857 struct GNUNET_HashCode hash;
859 GNUNET_assert (voting_peer < rfn->num_peers);
861 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
862 since VOTE_KEEP is implicit in not voting. */
863 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
865 GNUNET_SET_element_hash (element, &hash);
866 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
870 ri = GNUNET_new (struct RfnElementInfo);
871 ri->element = GNUNET_SET_element_dup (element);
872 ri->votes = GNUNET_new_array (rfn->num_peers, int);
873 GNUNET_assert (GNUNET_OK ==
874 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
876 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
879 ri->votes[voting_peer] = GNUNET_YES;
885 task_other_peer (struct TaskEntry *task)
887 uint16_t me = task->step->session->local_peer_idx;
888 if (task->key.peer1 == me)
889 return task->key.peer2;
890 return task->key.peer1;
895 * Callback for set operation results. Called for each element
899 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
900 * @param status see enum GNUNET_SET_Status
903 set_result_cb (void *cls,
904 const struct GNUNET_SET_Element *element,
905 enum GNUNET_SET_Status status)
907 struct TaskEntry *task = cls;
908 struct ConsensusSession *session = task->step->session;
909 struct SetEntry *output_set = NULL;
910 struct DiffEntry *output_diff = NULL;
911 struct ReferendumEntry *output_rfn = NULL;
912 unsigned int other_idx;
913 struct SetOpCls *setop;
915 setop = &task->cls.setop;
918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919 "P%u: got set result for {%s}, status %u\n",
920 session->local_peer_idx,
921 debug_str_task_key (&task->key),
924 if (GNUNET_NO == task->is_started)
930 if (GNUNET_YES == task->is_finished)
936 other_idx = task_other_peer (task);
938 if (SET_KIND_NONE != setop->output_set.set_kind)
940 output_set = lookup_set (session, &setop->output_set);
941 GNUNET_assert (NULL != output_set);
944 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
946 output_diff = lookup_diff (session, &setop->output_diff);
947 GNUNET_assert (NULL != output_diff);
950 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
952 output_rfn = lookup_rfn (session, &setop->output_rfn);
953 GNUNET_assert (NULL != output_rfn);
956 if (GNUNET_YES == session->peers_blacklisted[other_idx])
958 /* Peer might have been blacklisted
959 by a gradecast running in parallel, ignore elements from now */
960 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
962 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
966 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
968 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
970 GNUNET_assert (NULL != output_rfn);
971 rfn_contest (output_rfn, task_other_peer (task));
978 case GNUNET_SET_STATUS_ADD_LOCAL:
979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980 "Adding element in Task {%s}\n",
981 debug_str_task_key (&task->key));
982 if (NULL != output_set)
984 // FIXME: record pending adds, use callback
985 GNUNET_SET_add_element (output_set->h,
989 #ifdef GNUNET_EXTRA_LOGGING
990 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
991 "P%u: adding element %s into set {%s} of task {%s}\n",
992 session->local_peer_idx,
993 debug_str_element (element),
994 debug_str_set_key (&setop->output_set),
995 debug_str_task_key (&task->key));
998 if (NULL != output_diff)
1000 diff_insert (output_diff, 1, element);
1001 #ifdef GNUNET_EXTRA_LOGGING
1002 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1003 "P%u: adding element %s into diff {%s} of task {%s}\n",
1004 session->local_peer_idx,
1005 debug_str_element (element),
1006 debug_str_diff_key (&setop->output_diff),
1007 debug_str_task_key (&task->key));
1010 if (NULL != output_rfn)
1012 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1013 #ifdef GNUNET_EXTRA_LOGGING
1014 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1015 "P%u: adding element %s into rfn {%s} of task {%s}\n",
1016 session->local_peer_idx,
1017 debug_str_element (element),
1018 debug_str_rfn_key (&setop->output_rfn),
1019 debug_str_task_key (&task->key));
1022 // XXX: add result to structures in task
1024 case GNUNET_SET_STATUS_ADD_REMOTE:
1025 if (GNUNET_YES == setop->do_not_remove)
1027 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "Removing element in Task {%s}\n",
1031 debug_str_task_key (&task->key));
1032 if (NULL != output_set)
1034 // FIXME: record pending adds, use callback
1035 GNUNET_SET_remove_element (output_set->h,
1039 #ifdef GNUNET_EXTRA_LOGGING
1040 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1041 "P%u: removing element %s from set {%s} of task {%s}\n",
1042 session->local_peer_idx,
1043 debug_str_element (element),
1044 debug_str_set_key (&setop->output_set),
1045 debug_str_task_key (&task->key));
1048 if (NULL != output_diff)
1050 diff_insert (output_diff, -1, element);
1051 #ifdef GNUNET_EXTRA_LOGGING
1052 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1053 "P%u: removing element %s from diff {%s} of task {%s}\n",
1054 session->local_peer_idx,
1055 debug_str_element (element),
1056 debug_str_diff_key (&setop->output_diff),
1057 debug_str_task_key (&task->key));
1060 if (NULL != output_rfn)
1062 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1063 #ifdef GNUNET_EXTRA_LOGGING
1064 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1065 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1066 session->local_peer_idx,
1067 debug_str_element (element),
1068 debug_str_rfn_key (&setop->output_rfn),
1069 debug_str_task_key (&task->key));
1073 case GNUNET_SET_STATUS_DONE:
1074 // XXX: check first if any changes to the underlying
1075 // set are still pending
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Finishing setop in Task {%s}\n",
1078 debug_str_task_key (&task->key));
1079 if (NULL != output_rfn)
1081 rfn_commit (output_rfn, task_other_peer (task));
1085 case GNUNET_SET_STATUS_FAILURE:
1087 GNUNET_break_op (0);
1107 enum EvilnessSubType
1110 EVILNESS_SUB_REPLACEMENT,
1111 EVILNESS_SUB_NO_REPLACEMENT,
1116 enum EvilnessType type;
1117 enum EvilnessSubType subtype;
1123 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1125 if (0 == strcmp ("replace", evil_subtype_str))
1127 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1129 else if (0 == strcmp ("noreplace", evil_subtype_str))
1131 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1135 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1136 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1138 return GNUNET_SYSERR;
1145 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1149 char *evil_type_str = NULL;
1150 char *evil_subtype_str = NULL;
1152 GNUNET_assert (NULL != evil);
1154 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1157 "P%u: no evilness\n",
1158 session->local_peer_idx);
1159 evil->type = EVILNESS_NONE;
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "P%u: got evilness spec\n",
1164 session->local_peer_idx);
1166 for (field = strtok (evil_spec, "/");
1168 field = strtok (NULL, "/"))
1170 unsigned int peer_num;
1171 unsigned int evil_num;
1174 evil_type_str = NULL;
1175 evil_subtype_str = NULL;
1177 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1181 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1182 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1188 GNUNET_assert (NULL != evil_type_str);
1189 GNUNET_assert (NULL != evil_subtype_str);
1191 if (peer_num == session->local_peer_idx)
1193 if (0 == strcmp ("slack", evil_type_str))
1195 evil->type = EVILNESS_SLACK;
1197 else if (0 == strcmp ("cram-all", evil_type_str))
1199 evil->type = EVILNESS_CRAM_ALL;
1200 evil->num = evil_num;
1201 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1204 else if (0 == strcmp ("cram-lead", evil_type_str))
1206 evil->type = EVILNESS_CRAM_LEAD;
1207 evil->num = evil_num;
1208 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1211 else if (0 == strcmp ("cram-echo", evil_type_str))
1213 evil->type = EVILNESS_CRAM_ECHO;
1214 evil->num = evil_num;
1215 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1220 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1221 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1227 /* No GNUNET_free since memory was allocated by libc */
1228 free (evil_type_str);
1229 evil_type_str = NULL;
1230 evil_subtype_str = NULL;
1233 evil->type = EVILNESS_NONE;
1235 GNUNET_free (evil_spec);
1236 /* no GNUNET_free_non_null since it wasn't
1237 * allocated with GNUNET_malloc */
1238 if (NULL != evil_type_str)
1239 free (evil_type_str);
1240 if (NULL != evil_subtype_str)
1241 free (evil_subtype_str);
1248 * Commit the appropriate set for a
1252 commit_set (struct ConsensusSession *session,
1253 struct TaskEntry *task)
1255 struct SetEntry *set;
1256 struct SetOpCls *setop = &task->cls.setop;
1258 GNUNET_assert (NULL != setop->op);
1259 set = lookup_set (session, &setop->input_set);
1260 GNUNET_assert (NULL != set);
1265 struct Evilness evil;
1267 get_evilness (session, &evil);
1268 if (EVILNESS_NONE != evil.type)
1270 /* Useful for evaluation */
1271 GNUNET_STATISTICS_set (statistics,
1278 case EVILNESS_CRAM_ALL:
1279 case EVILNESS_CRAM_LEAD:
1280 case EVILNESS_CRAM_ECHO:
1281 /* We're not cramming elements in the
1282 all-to-all round, since that would just
1283 add more elements to the result set, but
1284 wouldn't test robustness. */
1285 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1287 GNUNET_SET_commit (setop->op, set->h);
1290 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1291 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1293 GNUNET_SET_commit (setop->op, set->h);
1296 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1298 GNUNET_SET_commit (setop->op, set->h);
1301 for (i = 0; i < evil.num; i++)
1303 struct GNUNET_HashCode hash;
1304 struct GNUNET_SET_Element element;
1305 element.data = &hash;
1306 element.size = sizeof (struct GNUNET_HashCode);
1307 element.element_type = 0;
1309 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1311 /* Always generate a new element. */
1312 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1314 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1316 /* Always cram the same elements, derived from counter. */
1317 GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1323 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1324 #ifdef GNUNET_EXTRA_LOGGING
1325 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1326 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1327 session->local_peer_idx,
1328 debug_str_element (&element),
1329 debug_str_set_key (&setop->input_set),
1330 debug_str_task_key (&task->key));
1333 GNUNET_STATISTICS_update (statistics,
1334 "# stuffed elements",
1337 GNUNET_SET_commit (setop->op, set->h);
1339 case EVILNESS_SLACK:
1340 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1341 "P%u: evil peer: slacking\n",
1342 session->local_peer_idx,
1347 GNUNET_SET_commit (setop->op, set->h);
1352 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1354 struct GNUNET_SET_Element element;
1355 struct ContestedPayload payload;
1356 element.data = &payload;
1357 element.size = sizeof (struct ContestedPayload);
1358 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1359 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1361 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1363 GNUNET_SET_commit (setop->op, set->h);
1367 /* For our testcases, we don't want the blacklisted
1369 GNUNET_SET_operation_cancel (setop->op);
1377 put_diff (struct ConsensusSession *session,
1378 struct DiffEntry *diff)
1380 struct GNUNET_HashCode hash;
1382 GNUNET_assert (NULL != diff);
1384 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1385 GNUNET_assert (GNUNET_OK ==
1386 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1387 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1391 put_set (struct ConsensusSession *session,
1392 struct SetEntry *set)
1394 struct GNUNET_HashCode hash;
1396 GNUNET_assert (NULL != set->h);
1398 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1400 debug_str_set_key (&set->key));
1402 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1403 GNUNET_assert (GNUNET_SYSERR !=
1404 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1405 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
1410 put_rfn (struct ConsensusSession *session,
1411 struct ReferendumEntry *rfn)
1413 struct GNUNET_HashCode hash;
1415 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1416 GNUNET_assert (GNUNET_OK ==
1417 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1418 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1424 task_cancel_reconcile (struct TaskEntry *task)
1426 /* not implemented yet */
1432 apply_diff_to_rfn (struct DiffEntry *diff,
1433 struct ReferendumEntry *rfn,
1434 uint16_t voting_peer,
1437 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1438 struct DiffElementInfo *di;
1440 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1442 while (GNUNET_YES ==
1443 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1445 (const void **) &di))
1449 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1453 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1457 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1464 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1466 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1473 diff_compose (struct DiffEntry *diff_1,
1474 struct DiffEntry *diff_2)
1476 struct DiffEntry *diff_new;
1477 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1478 struct DiffElementInfo *di;
1480 diff_new = diff_create ();
1482 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1483 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1485 diff_insert (diff_new, di->weight, di->element);
1487 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1489 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1490 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1492 diff_insert (diff_new, di->weight, di->element);
1494 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1500 struct ReferendumEntry *
1501 rfn_create (uint16_t size)
1503 struct ReferendumEntry *rfn;
1505 rfn = GNUNET_new (struct ReferendumEntry);
1506 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1507 rfn->peer_commited = GNUNET_new_array (size, int);
1508 rfn->peer_contested = GNUNET_new_array (size, int);
1509 rfn->num_peers = size;
1516 diff_destroy (struct DiffEntry *diff)
1518 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1524 * For a given majority, count what the outcome
1525 * is (add/remove/keep), and give the number
1526 * of peers that voted for this outcome.
1529 rfn_majority (const struct ReferendumEntry *rfn,
1530 const struct RfnElementInfo *ri,
1531 uint16_t *ret_majority,
1532 enum ReferendumVote *ret_vote)
1534 uint16_t votes_yes = 0;
1535 uint16_t num_commited = 0;
1538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1539 "Computing rfn majority for element %s of rfn {%s}\n",
1540 debug_str_element (ri->element),
1541 debug_str_rfn_key (&rfn->key));
1543 for (i = 0; i < rfn->num_peers; i++)
1545 if (GNUNET_NO == rfn->peer_commited[i])
1549 if (GNUNET_YES == ri->votes[i])
1553 if (votes_yes > (num_commited) / 2)
1555 *ret_vote = ri->proposal;
1556 *ret_majority = votes_yes;
1560 *ret_vote = VOTE_STAY;
1561 *ret_majority = num_commited - votes_yes;
1568 struct TaskEntry *task;
1569 struct SetKey dst_set_key;
1574 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1576 struct SetCopyCls *scc = cls;
1577 struct TaskEntry *task = scc->task;
1578 struct SetKey dst_set_key = scc->dst_set_key;
1579 struct SetEntry *set;
1582 set = GNUNET_new (struct SetEntry);
1584 set->key = dst_set_key;
1585 put_set (task->step->session, set);
1592 * Call the start function of the given
1593 * task again after we created a copy of the given set.
1596 create_set_copy_for_task (struct TaskEntry *task,
1597 struct SetKey *src_set_key,
1598 struct SetKey *dst_set_key)
1600 struct SetEntry *src_set;
1601 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604 "Copying set {%s} to {%s} for task {%s}\n",
1605 debug_str_set_key (src_set_key),
1606 debug_str_set_key (dst_set_key),
1607 debug_str_task_key (&task->key));
1610 scc->dst_set_key = *dst_set_key;
1611 src_set = lookup_set (task->step->session, src_set_key);
1612 GNUNET_assert (NULL != src_set);
1613 GNUNET_SET_copy_lazy (src_set->h,
1619 struct SetMutationProgressCls
1623 * Task to finish once all changes are through.
1625 struct TaskEntry *task;
1630 set_mutation_done (void *cls)
1632 struct SetMutationProgressCls *pc = cls;
1634 GNUNET_assert (pc->num_pending > 0);
1638 if (0 == pc->num_pending)
1640 struct TaskEntry *task = pc->task;
1648 try_finish_step_early (struct Step *step)
1652 if (GNUNET_YES == step->is_running)
1654 if (GNUNET_YES == step->is_finished)
1656 if (GNUNET_NO == step->early_finishable)
1659 step->is_finished = GNUNET_YES;
1661 #ifdef GNUNET_EXTRA_LOGGING
1662 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1663 "Finishing step `%s' early.\n",
1667 for (i = 0; i < step->subordinates_len; i++)
1669 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1670 step->subordinates[i]->pending_prereq--;
1671 #ifdef GNUNET_EXTRA_LOGGING
1672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673 "Decreased pending_prereq to %u for step `%s'.\n",
1674 step->subordinates[i]->pending_prereq,
1675 step->subordinates[i]->debug_name);
1678 try_finish_step_early (step->subordinates[i]);
1681 // XXX: maybe schedule as task to avoid recursion?
1682 run_ready_steps (step->session);
1687 finish_step (struct Step *step)
1691 GNUNET_assert (step->finished_tasks == step->tasks_len);
1692 GNUNET_assert (GNUNET_YES == step->is_running);
1693 GNUNET_assert (GNUNET_NO == step->is_finished);
1695 #ifdef GNUNET_EXTRA_LOGGING
1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697 "All tasks of step `%s' with %u subordinates finished.\n",
1699 step->subordinates_len);
1702 for (i = 0; i < step->subordinates_len; i++)
1704 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1705 step->subordinates[i]->pending_prereq--;
1706 #ifdef GNUNET_EXTRA_LOGGING
1707 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1708 "Decreased pending_prereq to %u for step `%s'.\n",
1709 step->subordinates[i]->pending_prereq,
1710 step->subordinates[i]->debug_name);
1715 step->is_finished = GNUNET_YES;
1717 // XXX: maybe schedule as task to avoid recursion?
1718 run_ready_steps (step->session);
1724 * Apply the result from one round of gradecasts (i.e. every peer
1725 * should have gradecasted) to the peer's current set.
1727 * @param task the task with context information
1730 task_start_apply_round (struct TaskEntry *task)
1732 struct ConsensusSession *session = task->step->session;
1733 struct SetKey sk_in;
1734 struct SetKey sk_out;
1735 struct RfnKey rk_in;
1736 struct SetEntry *set_out;
1737 struct ReferendumEntry *rfn_in;
1738 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1739 struct RfnElementInfo *ri;
1740 struct SetMutationProgressCls *progress_cls;
1741 uint16_t worst_majority = UINT16_MAX;
1743 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1744 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1745 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1747 set_out = lookup_set (session, &sk_out);
1748 if (NULL == set_out)
1750 create_set_copy_for_task (task, &sk_in, &sk_out);
1754 rfn_in = lookup_rfn (session, &rk_in);
1755 GNUNET_assert (NULL != rfn_in);
1757 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1758 progress_cls->task = task;
1760 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1762 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1764 uint16_t majority_num;
1765 enum ReferendumVote majority_vote;
1767 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1769 if (worst_majority > majority_num)
1770 worst_majority = majority_num;
1772 switch (majority_vote)
1775 progress_cls->num_pending++;
1776 GNUNET_assert (GNUNET_OK ==
1777 GNUNET_SET_add_element (set_out->h,
1781 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1782 "P%u: apply round: adding element %s with %u-majority.\n",
1783 session->local_peer_idx,
1784 debug_str_element (ri->element), majority_num);
1787 progress_cls->num_pending++;
1788 GNUNET_assert (GNUNET_OK ==
1789 GNUNET_SET_remove_element (set_out->h,
1793 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1794 "P%u: apply round: deleting element %s with %u-majority.\n",
1795 session->local_peer_idx,
1796 debug_str_element (ri->element), majority_num);
1799 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1800 "P%u: apply round: keeping element %s with %u-majority.\n",
1801 session->local_peer_idx,
1802 debug_str_element (ri->element), majority_num);
1811 if (progress_cls->num_pending == 0)
1813 // call closure right now, no pending ops
1814 GNUNET_free (progress_cls);
1819 uint16_t thresh = (session->num_peers / 3) * 2;
1821 if (worst_majority >= thresh)
1823 switch (session->early_stopping)
1825 case EARLY_STOPPING_NONE:
1826 session->early_stopping = EARLY_STOPPING_ONE_MORE;
1827 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1828 "P%u: Stopping early (after one more superround)\n",
1829 session->local_peer_idx);
1831 case EARLY_STOPPING_ONE_MORE:
1832 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1833 session->local_peer_idx);
1834 session->early_stopping = EARLY_STOPPING_DONE;
1837 for (step = session->steps_head; NULL != step; step = step->next)
1838 try_finish_step_early (step);
1841 case EARLY_STOPPING_DONE:
1842 /* We shouldn't be here anymore after early stopping */
1850 else if (EARLY_STOPPING_NONE != session->early_stopping)
1852 // Our assumption about the number of bad peers
1854 GNUNET_break_op (0);
1858 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1859 session->local_peer_idx);
1867 task_start_grade (struct TaskEntry *task)
1869 struct ConsensusSession *session = task->step->session;
1870 struct ReferendumEntry *output_rfn;
1871 struct ReferendumEntry *input_rfn;
1872 struct DiffEntry *input_diff;
1873 struct RfnKey rfn_key;
1874 struct DiffKey diff_key;
1875 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1876 struct RfnElementInfo *ri;
1877 unsigned int gradecast_confidence = 2;
1879 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1880 output_rfn = lookup_rfn (session, &rfn_key);
1881 if (NULL == output_rfn)
1883 output_rfn = rfn_create (session->num_peers);
1884 output_rfn->key = rfn_key;
1885 put_rfn (session, output_rfn);
1888 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1889 input_diff = lookup_diff (session, &diff_key);
1890 GNUNET_assert (NULL != input_diff);
1892 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1893 input_rfn = lookup_rfn (session, &rfn_key);
1894 GNUNET_assert (NULL != input_rfn);
1896 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1898 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1900 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1902 uint16_t majority_num;
1903 enum ReferendumVote majority_vote;
1905 // XXX: we need contested votes and non-contested votes here
1906 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1908 if (majority_num <= session->num_peers / 3)
1909 majority_vote = VOTE_REMOVE;
1911 switch (majority_vote)
1916 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1919 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1928 uint16_t noncontested;
1929 noncontested = rfn_noncontested (input_rfn);
1930 if (noncontested < (session->num_peers / 3) * 2)
1932 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1934 if (noncontested < (session->num_peers / 3) + 1)
1936 gradecast_confidence = 0;
1940 if (gradecast_confidence >= 1)
1941 rfn_commit (output_rfn, task->key.leader);
1943 if (gradecast_confidence <= 1)
1944 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1951 task_start_reconcile (struct TaskEntry *task)
1953 struct SetEntry *input;
1954 struct SetOpCls *setop = &task->cls.setop;
1955 struct ConsensusSession *session = task->step->session;
1957 input = lookup_set (session, &setop->input_set);
1958 GNUNET_assert (NULL != input);
1959 GNUNET_assert (NULL != input->h);
1961 /* We create the outputs for the operation here
1962 (rather than in the set operation callback)
1963 because we want something valid in there, even
1964 if the other peer doesn't talk to us */
1966 if (SET_KIND_NONE != setop->output_set.set_kind)
1968 /* If we don't have an existing output set,
1969 we clone the input set. */
1970 if (NULL == lookup_set (session, &setop->output_set))
1972 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1977 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1979 if (NULL == lookup_rfn (session, &setop->output_rfn))
1981 struct ReferendumEntry *rfn;
1983 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984 "P%u: output rfn <%s> missing, creating.\n",
1985 session->local_peer_idx,
1986 debug_str_rfn_key (&setop->output_rfn));
1988 rfn = rfn_create (session->num_peers);
1989 rfn->key = setop->output_rfn;
1990 put_rfn (session, rfn);
1994 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1996 if (NULL == lookup_diff (session, &setop->output_diff))
1998 struct DiffEntry *diff;
2000 diff = diff_create ();
2001 diff->key = setop->output_diff;
2002 put_diff (session, diff);
2006 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2008 /* XXX: mark the corresponding rfn as commited if necessary */
2013 if (task->key.peer1 == session->local_peer_idx)
2015 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
2017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2018 "P%u: Looking up set {%s} to run remote union\n",
2019 session->local_peer_idx,
2020 debug_str_set_key (&setop->input_set));
2022 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
2023 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2025 rcm.kind = htons (task->key.kind);
2026 rcm.peer1 = htons (task->key.peer1);
2027 rcm.peer2 = htons (task->key.peer2);
2028 rcm.leader = htons (task->key.leader);
2029 rcm.repetition = htons (task->key.repetition);
2031 GNUNET_assert (NULL == setop->op);
2032 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2033 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2035 // XXX: maybe this should be done while
2036 // setting up tasks alreays?
2037 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2038 &session->global_id,
2040 GNUNET_SET_RESULT_SYMMETRIC,
2044 commit_set (session, task);
2046 else if (task->key.peer2 == session->local_peer_idx)
2048 /* Wait for the other peer to contact us */
2049 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2050 session->local_peer_idx, task->key.peer1);
2052 if (NULL != setop->op)
2054 commit_set (session, task);
2059 /* We made an error while constructing the task graph. */
2066 task_start_eval_echo (struct TaskEntry *task)
2068 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
2069 struct ReferendumEntry *input_rfn;
2070 struct RfnElementInfo *ri;
2071 struct SetEntry *output_set;
2072 struct SetMutationProgressCls *progress_cls;
2073 struct ConsensusSession *session = task->step->session;
2074 struct SetKey sk_in;
2075 struct SetKey sk_out;
2076 struct RfnKey rk_in;
2078 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2079 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2080 output_set = lookup_set (session, &sk_out);
2081 if (NULL == output_set)
2083 create_set_copy_for_task (task, &sk_in, &sk_out);
2089 // FIXME: should be marked as a shallow copy, so
2090 // we can destroy everything correctly
2091 struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2092 last_set->h = output_set->h;
2093 last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2094 put_set (session, last_set);
2097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2098 "Evaluating referendum in Task {%s}\n",
2099 debug_str_task_key (&task->key));
2101 progress_cls = GNUNET_new (struct SetMutationProgressCls);
2102 progress_cls->task = task;
2104 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2105 input_rfn = lookup_rfn (session, &rk_in);
2107 GNUNET_assert (NULL != input_rfn);
2109 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
2110 GNUNET_assert (NULL != iter);
2112 while (GNUNET_YES ==
2113 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
2115 (const void **) &ri))
2117 enum ReferendumVote majority_vote;
2118 uint16_t majority_num;
2120 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2122 if (majority_num < session->num_peers / 3)
2124 /* It is not the case that all nonfaulty peers
2125 echoed the same value. Since we're doing a set reconciliation, we
2126 can't simply send "nothing" for the value. Thus we mark our 'confirm'
2127 reconciliation as contested. Other peers might not know that the
2128 leader is faulty, thus we still re-distribute in the confirmation
2130 output_set->is_contested = GNUNET_YES;
2133 switch (majority_vote)
2136 progress_cls->num_pending++;
2137 GNUNET_assert (GNUNET_OK ==
2138 GNUNET_SET_add_element (output_set->h,
2144 progress_cls->num_pending++;
2145 GNUNET_assert (GNUNET_OK ==
2146 GNUNET_SET_remove_element (output_set->h,
2152 /* Nothing to do. */
2160 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
2162 if (progress_cls->num_pending == 0)
2164 // call closure right now, no pending ops
2165 GNUNET_free (progress_cls);
2172 task_start_finish (struct TaskEntry *task)
2174 struct SetEntry *final_set;
2175 struct ConsensusSession *session = task->step->session;
2177 final_set = lookup_set (session, &task->cls.finish.input_set);
2179 GNUNET_assert (NULL != final_set);
2182 GNUNET_SET_iterate (final_set->h,
2183 send_to_client_iter,
2188 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2190 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2192 GNUNET_assert (GNUNET_NO == task->is_started);
2193 GNUNET_assert (GNUNET_NO == task->is_finished);
2194 GNUNET_assert (NULL != task->start);
2198 task->is_started = GNUNET_YES;
2205 * Run all steps of the session that don't any
2206 * more dependencies.
2209 run_ready_steps (struct ConsensusSession *session)
2213 step = session->steps_head;
2215 while (NULL != step)
2217 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2221 GNUNET_assert (0 == step->finished_tasks);
2223 #ifdef GNUNET_EXTRA_LOGGING
2224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2225 session->local_peer_idx,
2227 step->round, step->tasks_len, step->subordinates_len);
2230 step->is_running = GNUNET_YES;
2231 for (i = 0; i < step->tasks_len; i++)
2232 start_task (session, step->tasks[i]);
2234 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2235 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2238 /* Running the next ready steps will be triggered by task completion */
2250 finish_task (struct TaskEntry *task)
2252 GNUNET_assert (GNUNET_NO == task->is_finished);
2253 task->is_finished = GNUNET_YES;
2255 task->step->finished_tasks++;
2257 if (task->step->finished_tasks == task->step->tasks_len)
2258 finish_step (task->step);
2263 * Search peer in the list of peers in session.
2265 * @param peer peer to find
2266 * @param session session with peer
2267 * @return index of peer, -1 if peer is not in session
2270 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2273 for (i = 0; i < session->num_peers; i++)
2274 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2281 * Compute a global, (hopefully) unique consensus session id,
2282 * from the local id of the consensus session, and the identities of all participants.
2283 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2284 * exactly the same peers, the global id will be different.
2286 * @param session session to generate the global id for
2287 * @param local_session_id local id of the consensus session
2290 compute_global_id (struct ConsensusSession *session,
2291 const struct GNUNET_HashCode *local_session_id)
2293 const char *salt = "gnunet-service-consensus/session_id";
2295 GNUNET_assert (GNUNET_YES ==
2296 GNUNET_CRYPTO_kdf (&session->global_id,
2297 sizeof (struct GNUNET_HashCode),
2301 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2303 sizeof (struct GNUNET_HashCode),
2309 * Compare two peer identities.
2311 * @param h1 some peer identity
2312 * @param h2 some peer identity
2313 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2316 peer_id_cmp (const void *h1, const void *h2)
2318 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2323 * Create the sorted list of peers for the session,
2324 * add the local peer if not in the join message.
2327 initialize_session_peer_list (struct ConsensusSession *session,
2328 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2330 unsigned int local_peer_in_list;
2331 uint32_t listed_peers;
2332 const struct GNUNET_PeerIdentity *msg_peers;
2335 GNUNET_assert (NULL != join_msg);
2337 /* peers in the join message, may or may not include the local peer */
2338 listed_peers = ntohl (join_msg->num_peers);
2340 session->num_peers = listed_peers;
2342 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2344 local_peer_in_list = GNUNET_NO;
2345 for (i = 0; i < listed_peers; i++)
2347 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2349 local_peer_in_list = GNUNET_YES;
2354 if (GNUNET_NO == local_peer_in_list)
2355 session->num_peers++;
2357 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2359 if (GNUNET_NO == local_peer_in_list)
2360 session->peers[session->num_peers - 1] = my_peer;
2362 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2363 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2367 static struct TaskEntry *
2368 lookup_task (struct ConsensusSession *session,
2369 struct TaskKey *key)
2371 struct GNUNET_HashCode hash;
2374 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2376 GNUNET_h2s (&hash));
2377 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2382 * Called when another peer wants to do a set operation with the
2385 * @param cls closure
2386 * @param other_peer the other peer
2387 * @param context_msg message with application specific information from
2389 * @param request request from the other peer, use GNUNET_SET_accept
2390 * to accept it, otherwise the request will be refused
2391 * Note that we don't use a return value here, as it is also
2392 * necessary to specify the set we want to do the operation with,
2393 * whith sometimes can be derived from the context message.
2394 * Also necessary to specify the timeout.
2397 set_listen_cb (void *cls,
2398 const struct GNUNET_PeerIdentity *other_peer,
2399 const struct GNUNET_MessageHeader *context_msg,
2400 struct GNUNET_SET_Request *request)
2402 struct ConsensusSession *session = cls;
2404 struct TaskEntry *task;
2405 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2407 if (NULL == context_msg)
2409 GNUNET_break_op (0);
2413 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2415 GNUNET_break_op (0);
2419 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2421 GNUNET_break_op (0);
2425 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2427 tk = ((struct TaskKey) {
2428 .kind = ntohs (cm->kind),
2429 .peer1 = ntohs (cm->peer1),
2430 .peer2 = ntohs (cm->peer2),
2431 .repetition = ntohs (cm->repetition),
2432 .leader = ntohs (cm->leader),
2435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2436 session->local_peer_idx, debug_str_task_key (&tk));
2438 task = lookup_task (session, &tk);
2442 GNUNET_break_op (0);
2446 if (GNUNET_YES == task->is_finished)
2448 GNUNET_break_op (0);
2452 if (task->key.peer2 != session->local_peer_idx)
2454 /* We're being asked, so we must be thne 2nd peer. */
2455 GNUNET_break_op (0);
2459 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2460 (task->key.peer2 == session->local_peer_idx)));
2462 task->cls.setop.op = GNUNET_SET_accept (request,
2463 GNUNET_SET_RESULT_SYMMETRIC,
2467 /* If the task hasn't been started yet,
2468 we wait for that until we commit. */
2470 if (GNUNET_YES == task->is_started)
2472 commit_set (session, task);
2479 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2480 struct TaskEntry *t)
2482 struct GNUNET_HashCode round_hash;
2485 GNUNET_assert (NULL != t->step);
2487 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2491 if (s->tasks_len == s->tasks_cap)
2493 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2494 GNUNET_array_grow (s->tasks,
2499 #ifdef GNUNET_EXTRA_LOGGING
2500 GNUNET_assert (NULL != s->debug_name);
2501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2502 debug_str_task_key (&t->key),
2506 s->tasks[s->tasks_len] = t;
2509 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2510 GNUNET_assert (GNUNET_OK ==
2511 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2512 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2517 install_step_timeouts (struct ConsensusSession *session)
2519 /* Given the fully constructed task graph
2520 with rounds for tasks, we can give the tasks timeouts. */
2522 // unsigned int max_round;
2524 /* XXX: implement! */
2530 * Arrange two peers in some canonical order.
2533 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2538 GNUNET_assert (*p1 < n);
2539 GNUNET_assert (*p2 < n);
2552 /* For uniformly random *p1, *p2,
2553 this condition is true with 50% chance */
2554 if (((b - a) + n) % n <= n / 2)
2568 * Record @a dep as a dependency of @a step.
2571 step_depend_on (struct Step *step, struct Step *dep)
2573 /* We're not checking for cyclic dependencies,
2574 but this is a cheap sanity check. */
2575 GNUNET_assert (step != dep);
2576 GNUNET_assert (NULL != step);
2577 GNUNET_assert (NULL != dep);
2578 GNUNET_assert (dep->round <= step->round);
2580 #ifdef GNUNET_EXTRA_LOGGING
2581 /* Make sure we have complete debugging information.
2582 Also checks that we don't screw up too badly
2583 constructing the task graph. */
2584 GNUNET_assert (NULL != step->debug_name);
2585 GNUNET_assert (NULL != dep->debug_name);
2586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2587 "Making step `%s' depend on `%s'\n",
2592 if (dep->subordinates_cap == dep->subordinates_len)
2594 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2595 GNUNET_array_grow (dep->subordinates,
2596 dep->subordinates_cap,
2600 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2602 dep->subordinates[dep->subordinates_len] = step;
2603 dep->subordinates_len++;
2605 step->pending_prereq++;
2609 static struct Step *
2610 create_step (struct ConsensusSession *session, int round, int early_finishable)
2613 step = GNUNET_new (struct Step);
2614 step->session = session;
2615 step->round = round;
2616 step->early_finishable = early_finishable;
2617 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2618 session->steps_tail,
2625 * Construct the task graph for a single
2629 construct_task_graph_gradecast (struct ConsensusSession *session,
2632 struct Step *step_before,
2633 struct Step *step_after)
2635 uint16_t n = session->num_peers;
2636 uint16_t me = session->local_peer_idx;
2641 /* The task we're currently setting up. */
2642 struct TaskEntry task;
2645 struct Step *prev_step;
2651 round = step_before->round + 1;
2653 /* gcast step 1: leader disseminates */
2655 step = create_step (session, round, GNUNET_YES);
2657 #ifdef GNUNET_EXTRA_LOGGING
2658 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2660 step_depend_on (step, step_before);
2664 for (k = 0; k < n; k++)
2670 arrange_peers (&p1, &p2, n);
2671 task = ((struct TaskEntry) {
2673 .start = task_start_reconcile,
2674 .cancel = task_cancel_reconcile,
2675 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2677 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2678 put_task (session->taskmap, &task);
2680 /* We run this task to make sure that the leader
2681 has the stored the SET_KIND_LEADER set of himself,
2682 so he can participate in the rest of the gradecast
2683 without the code having to handle any special cases. */
2684 task = ((struct TaskEntry) {
2686 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2687 .start = task_start_reconcile,
2688 .cancel = task_cancel_reconcile,
2690 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2691 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2692 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2693 put_task (session->taskmap, &task);
2699 arrange_peers (&p1, &p2, n);
2700 task = ((struct TaskEntry) {
2702 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2703 .start = task_start_reconcile,
2704 .cancel = task_cancel_reconcile,
2706 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2707 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2708 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2709 put_task (session->taskmap, &task);
2712 /* gcast phase 2: echo */
2715 step = create_step (session, round, GNUNET_YES);
2716 #ifdef GNUNET_EXTRA_LOGGING
2717 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2719 step_depend_on (step, prev_step);
2721 for (k = 0; k < n; k++)
2725 arrange_peers (&p1, &p2, n);
2726 task = ((struct TaskEntry) {
2728 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2729 .start = task_start_reconcile,
2730 .cancel = task_cancel_reconcile,
2732 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2733 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2734 put_task (session->taskmap, &task);
2738 /* Same round, since step only has local tasks */
2739 step = create_step (session, round, GNUNET_YES);
2740 #ifdef GNUNET_EXTRA_LOGGING
2741 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2743 step_depend_on (step, prev_step);
2745 arrange_peers (&p1, &p2, n);
2746 task = ((struct TaskEntry) {
2747 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2749 .start = task_start_eval_echo
2751 put_task (session->taskmap, &task);
2755 step = create_step (session, round, GNUNET_YES);
2756 #ifdef GNUNET_EXTRA_LOGGING
2757 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2759 step_depend_on (step, prev_step);
2761 /* gcast phase 3: confirmation and grading */
2762 for (k = 0; k < n; k++)
2766 arrange_peers (&p1, &p2, n);
2767 task = ((struct TaskEntry) {
2769 .start = task_start_reconcile,
2770 .cancel = task_cancel_reconcile,
2771 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2773 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2774 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2775 /* If there was at least one element in the echo round that was
2776 contested (i.e. it had no n-t majority), then we let the other peers
2777 know, and other peers let us know. The contested flag for each peer is
2778 stored in the rfn. */
2779 task.cls.setop.transceive_contested = GNUNET_YES;
2780 put_task (session->taskmap, &task);
2784 /* Same round, since step only has local tasks */
2785 step = create_step (session, round, GNUNET_YES);
2786 #ifdef GNUNET_EXTRA_LOGGING
2787 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2789 step_depend_on (step, prev_step);
2791 task = ((struct TaskEntry) {
2793 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2794 .start = task_start_grade,
2796 put_task (session->taskmap, &task);
2798 step_depend_on (step_after, step);
2803 construct_task_graph (struct ConsensusSession *session)
2805 uint16_t n = session->num_peers;
2808 uint16_t me = session->local_peer_idx;
2813 /* The task we're currently setting up. */
2814 struct TaskEntry task;
2816 /* Current leader */
2820 struct Step *prev_step;
2822 unsigned int round = 0;
2826 // XXX: introduce first step,
2827 // where we wait for all insert acks
2828 // from the set service
2830 /* faster but brittle all-to-all */
2832 // XXX: Not implemented yet
2834 /* all-to-all step */
2836 step = create_step (session, round, GNUNET_NO);
2838 #ifdef GNUNET_EXTRA_LOGGING
2839 step->debug_name = GNUNET_strdup ("all to all");
2842 for (i = 0; i < n; i++)
2846 arrange_peers (&p1, &p2, n);
2847 task = ((struct TaskEntry) {
2848 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2850 .start = task_start_reconcile,
2851 .cancel = task_cancel_reconcile,
2853 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2854 task.cls.setop.output_set = task.cls.setop.input_set;
2855 task.cls.setop.do_not_remove = GNUNET_YES;
2856 put_task (session->taskmap, &task);
2864 /* Byzantine union */
2866 /* sequential repetitions of the gradecasts */
2867 for (i = 0; i < t + 1; i++)
2869 struct Step *step_rep_start;
2870 struct Step *step_rep_end;
2872 /* Every repetition is in a separate round. */
2873 step_rep_start = create_step (session, round, GNUNET_YES);
2874 #ifdef GNUNET_EXTRA_LOGGING
2875 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2878 step_depend_on (step_rep_start, prev_step);
2880 /* gradecast has three rounds */
2882 step_rep_end = create_step (session, round, GNUNET_YES);
2883 #ifdef GNUNET_EXTRA_LOGGING
2884 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2887 /* parallel gradecasts */
2888 for (lead = 0; lead < n; lead++)
2889 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2891 task = ((struct TaskEntry) {
2892 .step = step_rep_end,
2893 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2894 .start = task_start_apply_round,
2896 put_task (session->taskmap, &task);
2898 prev_step = step_rep_end;
2901 /* There is no next gradecast round, thus the final
2902 start step is the overall end step of the gradecasts */
2904 step = create_step (session, round, GNUNET_NO);
2905 #ifdef GNUNET_EXTRA_LOGGING
2906 GNUNET_asprintf (&step->debug_name, "finish");
2908 step_depend_on (step, prev_step);
2910 task = ((struct TaskEntry) {
2912 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2913 .start = task_start_finish,
2915 task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2917 put_task (session->taskmap, &task);
2922 * Initialize the session, continue receiving messages from the owning client
2924 * @param session the session to initialize
2925 * @param join_msg the join message from the client
2928 initialize_session (struct ConsensusSession *session,
2929 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2931 struct ConsensusSession *other_session;
2933 initialize_session_peer_list (session, join_msg);
2934 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2935 compute_global_id (session, &join_msg->session_id);
2937 /* Check if some local client already owns the session.
2938 It is only legal to have a session with an existing global id
2939 if all other sessions with this global id are finished.*/
2940 other_session = sessions_head;
2941 while (NULL != other_session)
2943 if ((other_session != session) &&
2944 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2946 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2948 // GNUNET_break (0);
2949 // destroy_session (session);
2954 other_session = other_session->next;
2957 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2958 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2960 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
2961 (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2963 session->local_peer_idx = get_peer_idx (&my_peer, session);
2964 GNUNET_assert (-1 != session->local_peer_idx);
2965 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2966 &session->global_id,
2967 set_listen_cb, session);
2968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2970 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2971 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2972 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2973 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2976 struct SetEntry *client_set;
2977 client_set = GNUNET_new (struct SetEntry);
2978 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2979 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2980 put_set (session, client_set);
2983 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2985 /* Just construct the task graph,
2986 but don't run anything until the client calls conclude. */
2987 construct_task_graph (session);
2989 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2993 static struct ConsensusSession *
2994 get_session_by_client (struct GNUNET_SERVER_Client *client)
2996 struct ConsensusSession *session;
2998 session = sessions_head;
2999 while (NULL != session)
3001 if (session->client == client)
3003 session = session->next;
3010 * Called when a client wants to join a consensus session.
3013 * @param client client that sent the message
3014 * @param m message sent by the client
3017 client_join (void *cls,
3018 struct GNUNET_SERVER_Client *client,
3019 const struct GNUNET_MessageHeader *m)
3021 struct ConsensusSession *session;
3023 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3025 session = get_session_by_client (client);
3026 if (NULL != session)
3029 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3032 session = GNUNET_new (struct ConsensusSession);
3033 session->client = client;
3034 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3035 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3036 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3037 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3039 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3044 client_insert_done (void *cls)
3051 * Called when a client performs an insert operation.
3053 * @param cls (unused)
3054 * @param client client handle
3055 * @param m message sent by the client
3058 client_insert (void *cls,
3059 struct GNUNET_SERVER_Client *client,
3060 const struct GNUNET_MessageHeader *m)
3062 struct ConsensusSession *session;
3063 struct GNUNET_CONSENSUS_ElementMessage *msg;
3064 struct GNUNET_SET_Element *element;
3065 ssize_t element_size;
3066 struct GNUNET_SET_Handle *initial_set;
3068 session = get_session_by_client (client);
3070 if (NULL == session)
3073 GNUNET_SERVER_client_disconnect (client);
3077 if (GNUNET_YES == session->conclude_started)
3080 GNUNET_SERVER_client_disconnect (client);
3084 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3085 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3086 if (element_size < 0)
3092 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3093 element->element_type = msg->element_type;
3094 element->size = element_size;
3095 memcpy (&element[1], &msg[1], element_size);
3096 element->data = &element[1];
3098 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3099 struct SetEntry *entry;
3100 entry = lookup_set (session, &key);
3101 GNUNET_assert (NULL != entry);
3102 initial_set = entry->h;
3104 session->num_client_insert_pending++;
3105 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
3107 #ifdef GNUNET_EXTRA_LOGGING
3109 struct GNUNET_HashCode hash;
3111 GNUNET_SET_element_hash (element, &hash);
3113 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
3114 session->local_peer_idx,
3115 GNUNET_h2s (&hash));
3119 GNUNET_free (element);
3120 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3125 * Called when a client performs the conclude operation.
3127 * @param cls (unused)
3128 * @param client client handle
3129 * @param message message sent by the client
3132 client_conclude (void *cls,
3133 struct GNUNET_SERVER_Client *client,
3134 const struct GNUNET_MessageHeader *message)
3136 struct ConsensusSession *session;
3138 session = get_session_by_client (client);
3139 if (NULL == session)
3141 /* client not found */
3143 GNUNET_SERVER_client_disconnect (client);
3147 if (GNUNET_YES == session->conclude_started)
3149 /* conclude started twice */
3151 GNUNET_SERVER_client_disconnect (client);
3152 destroy_session (session);
3156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3158 session->conclude_started = GNUNET_YES;
3160 install_step_timeouts (session);
3161 run_ready_steps (session);
3164 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3169 * Called to clean up, after a shutdown has been requested.
3171 * @param cls closure
3174 shutdown_task (void *cls)
3176 while (NULL != sessions_head)
3177 destroy_session (sessions_head);
3179 GNUNET_STATISTICS_destroy (statistics, GNUNET_YES);
3180 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
3185 * Clean up after a client after it is
3186 * disconnected (either by us or by itself)
3188 * @param cls closure, unused
3189 * @param client the client to clean up after
3192 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3194 struct ConsensusSession *session;
3196 session = get_session_by_client (client);
3197 if (NULL == session)
3199 // FIXME: destroy if we can
3205 * Start processing consensus requests.
3207 * @param cls closure
3208 * @param server the initialized server
3209 * @param c configuration to use
3212 run (void *cls, struct GNUNET_SERVER_Handle *server,
3213 const struct GNUNET_CONFIGURATION_Handle *c)
3215 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3216 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3217 sizeof (struct GNUNET_MessageHeader)},
3218 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3219 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3225 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3227 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3229 GNUNET_SCHEDULER_shutdown ();
3232 statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3233 GNUNET_SERVER_add_handlers (server, server_handlers);
3234 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
3235 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3236 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3241 * The main function for the consensus service.
3243 * @param argc number of arguments from the command line
3244 * @param argv command line arguments
3245 * @return 0 ok, 1 on error
3248 main (int argc, char *const *argv)
3251 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3252 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3253 return (GNUNET_OK == ret) ? 0 : 1;