2 This file is part of GNUnet
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "gnunet_consensus_service.h"
34 #include "consensus_protocol.h"
35 #include "consensus.h"
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
48 * Vote that an element should be added.
52 * Vote that an element should be removed.
58 GNUNET_NETWORK_STRUCT_BEGIN
61 struct ContestedPayload
66 * Tuple of integers that together
67 * identify a task uniquely.
71 * A value from 'enum PhaseKind'.
73 uint16_t kind GNUNET_PACKED;
76 * Number of the first peer
79 int16_t peer1 GNUNET_PACKED;
82 * Number of the second peer in canonical order.
84 int16_t peer2 GNUNET_PACKED;
87 * Repetition of the gradecast phase.
89 int16_t repetition GNUNET_PACKED;
92 * Leader in the gradecast phase.
94 * Can be different from both peer1 and peer2.
96 int16_t leader GNUNET_PACKED;
103 int set_kind GNUNET_PACKED;
104 int k1 GNUNET_PACKED;
105 int k2 GNUNET_PACKED;
112 struct GNUNET_SET_Handle *h;
114 * GNUNET_YES if the set resulted
115 * from applying a referendum with contested
124 int diff_kind GNUNET_PACKED;
125 int k1 GNUNET_PACKED;
126 int k2 GNUNET_PACKED;
131 int rfn_kind GNUNET_PACKED;
132 int k1 GNUNET_PACKED;
133 int k2 GNUNET_PACKED;
137 GNUNET_NETWORK_STRUCT_END
141 PHASE_KIND_ALL_TO_ALL,
142 PHASE_KIND_GRADECAST_LEADER,
143 PHASE_KIND_GRADECAST_ECHO,
144 PHASE_KIND_GRADECAST_ECHO_GRADE,
145 PHASE_KIND_GRADECAST_CONFIRM,
146 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
148 * Apply a repetition of the all-to-all
149 * gradecast to the current set.
151 PHASE_KIND_APPLY_REP,
160 SET_KIND_LEADER_PROPOSAL,
161 SET_KIND_ECHO_RESULT,
167 DIFF_KIND_LEADER_PROPOSAL,
168 DIFF_KIND_LEADER_CONSENSUS,
169 DIFF_KIND_GRADECAST_RESULT,
177 RFN_KIND_GRADECAST_RESULT
183 struct SetKey input_set;
185 struct SetKey output_set;
186 struct RfnKey output_rfn;
187 struct DiffKey output_diff;
191 int transceive_contested;
193 struct GNUNET_SET_OperationHandle *op;
199 struct SetKey input_set;
203 * Closure for both @a start_task
204 * and @a cancel_task.
208 struct SetOpCls setop;
209 struct FinishCls finish;
214 typedef void (*TaskFunc) (struct TaskEntry *task);
217 * Node in the consensus task graph.
232 union TaskFuncCls cls;
239 * All steps of one session are in a
240 * linked list for easier deallocation.
245 * All steps of one session are in a
246 * linked list for easier deallocation.
250 struct ConsensusSession *session;
252 struct TaskEntry **tasks;
253 unsigned int tasks_len;
254 unsigned int tasks_cap;
256 unsigned int finished_tasks;
259 * Tasks that have this task as dependency.
261 * We store pointers to subordinates rather
262 * than to prerequisites since it makes
263 * tracking the readiness of a task easier.
265 struct Step **subordinates;
266 unsigned int subordinates_len;
267 unsigned int subordinates_cap;
270 * Counter for the prerequisites of
273 size_t pending_prereq;
276 * Task that will run this step despite
277 * any pending prerequisites.
279 struct GNUNET_SCHEDULER_Task *timeout_task;
281 unsigned int is_running;
283 unsigned int is_finished;
286 * Synchrony round of the task.
287 * Determines the deadline for the task.
292 * Human-readable name for
293 * the task, used for debugging.
299 struct RfnElementInfo
301 const struct GNUNET_SET_Element *element;
304 * GNUNET_YES if the peer votes for the proposal.
309 * Proposal for this element,
310 * can only be VOTE_ADD or VOTE_REMOVE.
312 enum ReferendumVote proposal;
316 struct ReferendumEntry
321 * Elements where there is at least one proposed change.
323 * Maps the hash of the GNUNET_SET_Element
324 * to 'struct RfnElementInfo'.
326 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
328 unsigned int num_peers;
331 * Stores, for every peer in the session,
332 * whether the peer finished the whole referendum.
334 * Votes from peers are only counted if they're
335 * marked as commited (#GNUNET_YES) in the referendum.
337 * Otherwise (#GNUNET_NO), the requested changes are
338 * not counted for majority votes or thresholds.
344 * Contestation state of the peer. If a peer is contested, the values it
345 * contributed are still counted for applying changes, but the grading is
352 struct DiffElementInfo
354 const struct GNUNET_SET_Element *element;
357 * Positive weight for 'add', negative
358 * weights for 'remove'.
370 struct GNUNET_CONTAINER_MultiHashMap *changes;
376 * A consensus session consists of one local client and the remote authorities.
378 struct ConsensusSession
381 * Consensus sessions are kept in a DLL.
383 struct ConsensusSession *next;
386 * Consensus sessions are kept in a DLL.
388 struct ConsensusSession *prev;
390 unsigned int num_client_insert_pending;
392 struct GNUNET_CONTAINER_MultiHashMap *setmap;
393 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
394 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
397 * Array of peers with length 'num_peers'.
399 int *peers_blacklisted;
402 * Mapping from (hashed) TaskKey to TaskEntry.
404 * We map the application_id for a round to the task that should be
405 * executed, so we don't have to go through all task whenever we get
406 * an incoming set op request.
408 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
410 struct Step *steps_head;
411 struct Step *steps_tail;
413 int conclude_started;
418 * Global consensus identification, computed
419 * from the session id and participating authorities.
421 struct GNUNET_HashCode global_id;
424 * Client that inhabits the session
426 struct GNUNET_SERVER_Client *client;
429 * Queued messages to the client.
431 struct GNUNET_MQ_Handle *client_mq;
434 * Time when the conclusion of the consensus should begin.
436 struct GNUNET_TIME_Absolute conclude_start;
439 * Timeout for all rounds together, single rounds will schedule a timeout task
440 * with a fraction of the conclude timeout.
441 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
443 struct GNUNET_TIME_Absolute conclude_deadline;
445 struct GNUNET_PeerIdentity *peers;
448 * Number of other peers in the consensus.
450 unsigned int num_peers;
453 * Index of the local peer in the peers array
455 unsigned int local_peer_idx;
458 * Listener for requests from other peers.
459 * Uses the session's global id as app id.
461 struct GNUNET_SET_ListenHandle *set_listener;
465 * Linked list of sessions this peer participates in.
467 static struct ConsensusSession *sessions_head;
470 * Linked list of sessions this peer participates in.
472 static struct ConsensusSession *sessions_tail;
475 * Configuration of the consensus service.
477 static const struct GNUNET_CONFIGURATION_Handle *cfg;
480 * Handle to the server for this service.
482 static struct GNUNET_SERVER_Handle *srv;
485 * Peer that runs this service.
487 static struct GNUNET_PeerIdentity my_peer;
492 struct GNUNET_STATISTICS_Handle *statistics;
496 finish_task (struct TaskEntry *task);
499 run_ready_steps (struct ConsensusSession *session);
502 phasename (uint16_t phase)
506 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
507 case PHASE_KIND_FINISH: return "FINISH";
508 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
509 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
510 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
511 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
512 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
513 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
514 default: return "(unknown)";
520 setname (uint16_t kind)
524 case SET_KIND_CURRENT: return "CURRENT";
525 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
526 case SET_KIND_NONE: return "NONE";
527 default: return "(unknown)";
532 rfnname (uint16_t kind)
536 case RFN_KIND_NONE: return "NONE";
537 case RFN_KIND_ECHO: return "ECHO";
538 case RFN_KIND_CONFIRM: return "CONFIRM";
539 default: return "(unknown)";
544 diffname (uint16_t kind)
548 case DIFF_KIND_NONE: return "NONE";
549 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
550 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
551 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
552 default: return "(unknown)";
556 #ifdef GNUNET_EXTRA_LOGGING
560 debug_str_element (const struct GNUNET_SET_Element *el)
562 struct GNUNET_HashCode hash;
564 GNUNET_SET_element_hash (el, &hash);
566 return GNUNET_h2s (&hash);
570 debug_str_task_key (struct TaskKey *tk)
572 static char buf[256];
574 snprintf (buf, sizeof (buf),
575 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
576 phasename (tk->kind), tk->peer1, tk->peer2,
577 tk->leader, tk->repetition);
583 debug_str_diff_key (struct DiffKey *dk)
585 static char buf[256];
587 snprintf (buf, sizeof (buf),
588 "DiffKey kind=%s, k1=%d, k2=%d",
589 diffname (dk->diff_kind), dk->k1, dk->k2);
595 debug_str_set_key (const struct SetKey *sk)
597 static char buf[256];
599 snprintf (buf, sizeof (buf),
600 "SetKey kind=%s, k1=%d, k2=%d",
601 setname (sk->set_kind), sk->k1, sk->k2);
608 debug_str_rfn_key (const struct RfnKey *rk)
610 static char buf[256];
612 snprintf (buf, sizeof (buf),
613 "RfnKey kind=%s, k1=%d, k2=%d",
614 rfnname (rk->rfn_kind), rk->k1, rk->k2);
619 #endif /* GNUNET_EXTRA_LOGGING */
623 * Destroy a session, free all resources associated with it.
625 * @param session the session to destroy
628 destroy_session (struct ConsensusSession *session)
630 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
631 if (NULL != session->set_listener)
633 GNUNET_SET_listen_cancel (session->set_listener);
634 session->set_listener = NULL;
636 if (NULL != session->client_mq)
638 GNUNET_MQ_destroy (session->client_mq);
639 session->client_mq = NULL;
640 /* The MQ cleanup will also disconnect the underlying client. */
641 session->client = NULL;
643 if (NULL != session->client)
645 GNUNET_SERVER_client_disconnect (session->client);
646 session->client = NULL;
648 GNUNET_free (session);
653 * Send the final result set of the consensus to the client, element by
657 * @param element the current element, NULL if all elements have been
659 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
662 send_to_client_iter (void *cls,
663 const struct GNUNET_SET_Element *element)
665 struct TaskEntry *task = (struct TaskEntry *) cls;
666 struct ConsensusSession *session = task->step->session;
667 struct GNUNET_MQ_Envelope *ev;
671 struct GNUNET_CONSENSUS_ElementMessage *m;
673 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674 "P%d: sending element %s to client\n",
675 session->local_peer_idx,
676 debug_str_element (element));
678 ev = GNUNET_MQ_msg_extra (m, element->size,
679 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
680 m->element_type = htons (element->element_type);
681 memcpy (&m[1], element->data, element->size);
682 GNUNET_MQ_send (session->client_mq, ev);
686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687 "P%d: finished iterating elements for client\n",
688 session->local_peer_idx);
689 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
690 GNUNET_MQ_send (session->client_mq, ev);
696 static struct SetEntry *
697 lookup_set (struct ConsensusSession *session, struct SetKey *key)
699 struct GNUNET_HashCode hash;
701 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
702 "P%u: looking up set {%s}\n",
703 session->local_peer_idx,
704 debug_str_set_key (key));
706 GNUNET_assert (SET_KIND_NONE != key->set_kind);
707 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
708 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
712 static struct DiffEntry *
713 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
715 struct GNUNET_HashCode hash;
717 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718 "P%u: looking up diff {%s}\n",
719 session->local_peer_idx,
720 debug_str_diff_key (key));
722 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
723 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
724 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
728 static struct ReferendumEntry *
729 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
731 struct GNUNET_HashCode hash;
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "P%u: looking up rfn {%s}\n",
735 session->local_peer_idx,
736 debug_str_rfn_key (key));
738 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
739 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
740 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
745 diff_insert (struct DiffEntry *diff,
747 const struct GNUNET_SET_Element *element)
749 struct DiffElementInfo *di;
750 struct GNUNET_HashCode hash;
752 GNUNET_assert ( (1 == weight) || (-1 == weight));
754 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
755 "diff_insert with element size %u\n",
758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759 "hashing element\n");
761 GNUNET_SET_element_hash (element, &hash);
763 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
770 di = GNUNET_new (struct DiffElementInfo);
771 di->element = GNUNET_SET_element_dup (element);
772 GNUNET_assert (GNUNET_OK ==
773 GNUNET_CONTAINER_multihashmap_put (diff->changes,
775 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
783 rfn_commit (struct ReferendumEntry *rfn,
784 uint16_t commit_peer)
786 GNUNET_assert (commit_peer < rfn->num_peers);
788 rfn->peer_commited[commit_peer] = GNUNET_YES;
793 rfn_contest (struct ReferendumEntry *rfn,
794 uint16_t contested_peer)
796 GNUNET_assert (contested_peer < rfn->num_peers);
798 rfn->peer_contested[contested_peer] = GNUNET_YES;
803 rfn_noncontested (struct ReferendumEntry *rfn)
809 for (i = 0; i < rfn->num_peers; i++)
810 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
818 rfn_vote (struct ReferendumEntry *rfn,
819 uint16_t voting_peer,
820 enum ReferendumVote vote,
821 const struct GNUNET_SET_Element *element)
823 struct RfnElementInfo *ri;
824 struct GNUNET_HashCode hash;
826 GNUNET_assert (voting_peer < rfn->num_peers);
828 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
829 since VOTE_KEEP is implicit in not voting. */
830 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
832 GNUNET_SET_element_hash (element, &hash);
833 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
837 ri = GNUNET_new (struct RfnElementInfo);
838 ri->element = GNUNET_SET_element_dup (element);
839 ri->votes = GNUNET_new_array (rfn->num_peers, int);
840 GNUNET_assert (GNUNET_OK ==
841 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
843 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
846 ri->votes[voting_peer] = GNUNET_YES;
852 task_other_peer (struct TaskEntry *task)
854 uint16_t me = task->step->session->local_peer_idx;
855 if (task->key.peer1 == me)
856 return task->key.peer2;
857 return task->key.peer1;
862 * Callback for set operation results. Called for each element
866 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
867 * @param status see enum GNUNET_SET_Status
870 set_result_cb (void *cls,
871 const struct GNUNET_SET_Element *element,
872 enum GNUNET_SET_Status status)
874 struct TaskEntry *task = cls;
875 struct ConsensusSession *session = task->step->session;
876 struct SetEntry *output_set = NULL;
877 struct DiffEntry *output_diff = NULL;
878 struct ReferendumEntry *output_rfn = NULL;
879 unsigned int other_idx;
880 struct SetOpCls *setop;
882 setop = &task->cls.setop;
885 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886 "P%u: got set result for {%s}, status %u\n",
887 session->local_peer_idx,
888 debug_str_task_key (&task->key),
891 if (GNUNET_NO == task->is_started)
897 if (GNUNET_YES == task->is_finished)
903 other_idx = task_other_peer (task);
905 if (SET_KIND_NONE != setop->output_set.set_kind)
907 output_set = lookup_set (session, &setop->output_set);
908 GNUNET_assert (NULL != output_set);
911 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
913 output_diff = lookup_diff (session, &setop->output_diff);
914 GNUNET_assert (NULL != output_diff);
917 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
919 output_rfn = lookup_rfn (session, &setop->output_rfn);
920 GNUNET_assert (NULL != output_rfn);
923 if (GNUNET_YES == session->peers_blacklisted[other_idx])
925 /* Peer might have been blacklisted
926 by a gradecast running in parallel, ignore elements from now */
927 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
929 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
933 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
935 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
937 GNUNET_assert (NULL != output_rfn);
938 rfn_contest (output_rfn, task_other_peer (task));
945 case GNUNET_SET_STATUS_ADD_LOCAL:
946 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947 "Adding element in Task {%s}\n",
948 debug_str_task_key (&task->key));
949 if (NULL != output_set)
951 // FIXME: record pending adds, use callback
952 GNUNET_SET_add_element (output_set->h,
956 #ifdef GNUNET_EXTRA_LOGGING
957 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
958 "P%u: adding element %s into set {%s} of task {%s}\n",
959 session->local_peer_idx,
960 debug_str_element (element),
961 debug_str_set_key (&setop->output_set),
962 debug_str_task_key (&task->key));
965 if (NULL != output_diff)
967 diff_insert (output_diff, 1, element);
968 #ifdef GNUNET_EXTRA_LOGGING
969 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
970 "P%u: adding element %s into diff {%s} of task {%s}\n",
971 session->local_peer_idx,
972 debug_str_element (element),
973 debug_str_diff_key (&setop->output_diff),
974 debug_str_task_key (&task->key));
977 if (NULL != output_rfn)
979 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
980 #ifdef GNUNET_EXTRA_LOGGING
981 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
982 "P%u: adding element %s into rfn {%s} of task {%s}\n",
983 session->local_peer_idx,
984 debug_str_element (element),
985 debug_str_rfn_key (&setop->output_rfn),
986 debug_str_task_key (&task->key));
989 // XXX: add result to structures in task
991 case GNUNET_SET_STATUS_ADD_REMOTE:
992 if (GNUNET_YES == setop->do_not_remove)
994 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997 "Removing element in Task {%s}\n",
998 debug_str_task_key (&task->key));
999 if (NULL != output_set)
1001 // FIXME: record pending adds, use callback
1002 GNUNET_SET_remove_element (output_set->h,
1006 #ifdef GNUNET_EXTRA_LOGGING
1007 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1008 "P%u: removing element %s from set {%s} of task {%s}\n",
1009 session->local_peer_idx,
1010 debug_str_element (element),
1011 debug_str_set_key (&setop->output_set),
1012 debug_str_task_key (&task->key));
1015 if (NULL != output_diff)
1017 diff_insert (output_diff, -1, element);
1018 #ifdef GNUNET_EXTRA_LOGGING
1019 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1020 "P%u: removing element %s from diff {%s} of task {%s}\n",
1021 session->local_peer_idx,
1022 debug_str_element (element),
1023 debug_str_diff_key (&setop->output_diff),
1024 debug_str_task_key (&task->key));
1027 if (NULL != output_rfn)
1029 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1030 #ifdef GNUNET_EXTRA_LOGGING
1031 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1032 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1033 session->local_peer_idx,
1034 debug_str_element (element),
1035 debug_str_rfn_key (&setop->output_rfn),
1036 debug_str_task_key (&task->key));
1040 case GNUNET_SET_STATUS_DONE:
1041 // XXX: check first if any changes to the underlying
1042 // set are still pending
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "Finishing setop in Task {%s}\n",
1045 debug_str_task_key (&task->key));
1046 if (NULL != output_rfn)
1048 rfn_commit (output_rfn, task_other_peer (task));
1052 case GNUNET_SET_STATUS_FAILURE:
1054 GNUNET_break_op (0);
1074 enum EvilnessSubType
1077 EVILNESS_SUB_REPLACEMENT,
1078 EVILNESS_SUB_NO_REPLACEMENT,
1083 enum EvilnessType type;
1084 enum EvilnessSubType subtype;
1090 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1092 if (0 == strcmp ("replace", evil_subtype_str))
1094 evil->subtype = EVILNESS_SUB_REPLACEMENT;
1096 else if (0 == strcmp ("noreplace", evil_subtype_str))
1098 evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1102 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1103 "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1105 return GNUNET_SYSERR;
1112 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1116 char *evil_type_str = NULL;
1117 char *evil_subtype_str = NULL;
1119 GNUNET_assert (NULL != evil);
1121 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124 "P%u: no evilness\n",
1125 session->local_peer_idx);
1126 evil->type = EVILNESS_NONE;
1129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130 "P%u: got evilness spec\n",
1131 session->local_peer_idx);
1133 for (field = strtok (evil_spec, "/");
1135 field = strtok (NULL, "/"))
1137 unsigned int peer_num;
1138 unsigned int evil_num;
1141 evil_type_str = NULL;
1142 evil_subtype_str = NULL;
1144 ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1148 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1149 "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1155 GNUNET_assert (NULL != evil_type_str);
1156 GNUNET_assert (NULL != evil_subtype_str);
1158 if (peer_num == session->local_peer_idx)
1160 if (0 == strcmp ("slack", evil_type_str))
1162 evil->type = EVILNESS_SLACK;
1164 else if (0 == strcmp ("cram-all", evil_type_str))
1166 evil->type = EVILNESS_CRAM_ALL;
1167 evil->num = evil_num;
1168 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1171 else if (0 == strcmp ("cram-lead", evil_type_str))
1173 evil->type = EVILNESS_CRAM_LEAD;
1174 evil->num = evil_num;
1175 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1178 else if (0 == strcmp ("cram-echo", evil_type_str))
1180 evil->type = EVILNESS_CRAM_ECHO;
1181 evil->num = evil_num;
1182 if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1188 "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1194 /* No GNUNET_free since memory was allocated by libc */
1195 free (evil_type_str);
1196 evil_type_str = NULL;
1197 evil_subtype_str = NULL;
1200 evil->type = EVILNESS_NONE;
1202 GNUNET_free (evil_spec);
1203 /* no GNUNET_free_non_null since it wasn't
1204 * allocated with GNUNET_malloc */
1205 if (NULL != evil_type_str)
1206 free (evil_type_str);
1207 if (NULL != evil_subtype_str)
1208 free (evil_subtype_str);
1215 * Commit the appropriate set for a
1219 commit_set (struct ConsensusSession *session,
1220 struct TaskEntry *task)
1222 struct SetEntry *set;
1223 struct SetOpCls *setop = &task->cls.setop;
1225 GNUNET_assert (NULL != setop->op);
1226 set = lookup_set (session, &setop->input_set);
1227 GNUNET_assert (NULL != set);
1232 struct Evilness evil;
1234 get_evilness (session, &evil);
1235 if (EVILNESS_NONE != evil.type)
1237 /* Useful for evaluation */
1238 GNUNET_STATISTICS_set (statistics,
1245 case EVILNESS_CRAM_ALL:
1246 case EVILNESS_CRAM_LEAD:
1247 case EVILNESS_CRAM_ECHO:
1248 /* We're not cramming elements in the
1249 all-to-all round, since that would just
1250 add more elements to the result set, but
1251 wouldn't test robustness. */
1252 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1254 GNUNET_SET_commit (setop->op, set->h);
1257 if ((EVILNESS_CRAM_LEAD == evil.type) &&
1258 ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1260 GNUNET_SET_commit (setop->op, set->h);
1263 if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1265 GNUNET_SET_commit (setop->op, set->h);
1268 for (i = 0; i < evil.num; i++)
1270 struct GNUNET_HashCode hash;
1271 struct GNUNET_SET_Element element;
1272 element.data = &hash;
1273 element.size = sizeof (struct GNUNET_HashCode);
1274 element.element_type = 0;
1276 if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1278 /* Always generate a new element. */
1279 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
1281 else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1283 /* Always cram the same elements, derived from counter. */
1284 GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
1290 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1291 #ifdef GNUNET_EXTRA_LOGGING
1292 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1293 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1294 session->local_peer_idx,
1295 debug_str_element (&element),
1296 debug_str_set_key (&setop->input_set),
1297 debug_str_task_key (&task->key));
1300 GNUNET_STATISTICS_update (statistics,
1301 "# stuffed elements",
1304 GNUNET_SET_commit (setop->op, set->h);
1306 case EVILNESS_SLACK:
1307 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1308 "P%u: evil peer: slacking\n",
1309 session->local_peer_idx,
1314 GNUNET_SET_commit (setop->op, set->h);
1319 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1321 struct GNUNET_SET_Element element;
1322 struct ContestedPayload payload;
1323 element.data = &payload;
1324 element.size = sizeof (struct ContestedPayload);
1325 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1326 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1328 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1330 GNUNET_SET_commit (setop->op, set->h);
1334 /* For our testcases, we don't want the blacklisted
1336 GNUNET_SET_operation_cancel (setop->op);
1344 put_diff (struct ConsensusSession *session,
1345 struct DiffEntry *diff)
1347 struct GNUNET_HashCode hash;
1349 GNUNET_assert (NULL != diff);
1351 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1352 GNUNET_assert (GNUNET_OK ==
1353 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1354 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1358 put_set (struct ConsensusSession *session,
1359 struct SetEntry *set)
1361 struct GNUNET_HashCode hash;
1363 GNUNET_assert (NULL != set->h);
1365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1367 debug_str_set_key (&set->key));
1369 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1370 GNUNET_assert (GNUNET_OK ==
1371 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1372 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1377 put_rfn (struct ConsensusSession *session,
1378 struct ReferendumEntry *rfn)
1380 struct GNUNET_HashCode hash;
1382 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1383 GNUNET_assert (GNUNET_OK ==
1384 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1385 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1391 task_cancel_reconcile (struct TaskEntry *task)
1393 /* not implemented yet */
1399 apply_diff_to_rfn (struct DiffEntry *diff,
1400 struct ReferendumEntry *rfn,
1401 uint16_t voting_peer,
1404 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1405 struct DiffElementInfo *di;
1407 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1409 while (GNUNET_YES ==
1410 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1412 (const void **) &di))
1416 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1420 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1424 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1431 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1433 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1440 diff_compose (struct DiffEntry *diff_1,
1441 struct DiffEntry *diff_2)
1443 struct DiffEntry *diff_new;
1444 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1445 struct DiffElementInfo *di;
1447 diff_new = diff_create ();
1449 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1450 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1452 diff_insert (diff_new, di->weight, di->element);
1454 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1456 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1457 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1459 diff_insert (diff_new, di->weight, di->element);
1461 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1467 struct ReferendumEntry *
1468 rfn_create (uint16_t size)
1470 struct ReferendumEntry *rfn;
1472 rfn = GNUNET_new (struct ReferendumEntry);
1473 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1474 rfn->peer_commited = GNUNET_new_array (size, int);
1475 rfn->peer_contested = GNUNET_new_array (size, int);
1476 rfn->num_peers = size;
1483 diff_destroy (struct DiffEntry *diff)
1485 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1491 rfn_majority (const struct ReferendumEntry *rfn,
1492 const struct RfnElementInfo *ri,
1493 uint16_t *ret_majority,
1494 enum ReferendumVote *ret_vote)
1496 uint16_t votes_yes = 0;
1497 uint16_t num_commited = 0;
1500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1501 "Computing rfn majority for element %s of rfn {%s}\n",
1502 debug_str_element (ri->element),
1503 debug_str_rfn_key (&rfn->key));
1505 for (i = 0; i < rfn->num_peers; i++)
1507 if (GNUNET_NO == rfn->peer_commited[i])
1511 if (GNUNET_YES == ri->votes[i])
1515 if (votes_yes > (num_commited) / 2)
1517 *ret_vote = ri->proposal;
1518 *ret_majority = votes_yes;
1522 *ret_vote = VOTE_STAY;
1523 *ret_majority = num_commited - votes_yes;
1530 struct TaskEntry *task;
1531 struct SetKey dst_set_key;
1536 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1538 struct SetCopyCls *scc = cls;
1539 struct TaskEntry *task = scc->task;
1540 struct SetKey dst_set_key = scc->dst_set_key;
1541 struct SetEntry *set;
1544 set = GNUNET_new (struct SetEntry);
1546 set->key = dst_set_key;
1547 put_set (task->step->session, set);
1554 * Call the start function of the given
1555 * task again after we created a copy of the given set.
1558 create_set_copy_for_task (struct TaskEntry *task,
1559 struct SetKey *src_set_key,
1560 struct SetKey *dst_set_key)
1562 struct SetEntry *src_set;
1563 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1565 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1566 "Copying set {%s} to {%s} for task {%s}\n",
1567 debug_str_set_key (src_set_key),
1568 debug_str_set_key (dst_set_key),
1569 debug_str_task_key (&task->key));
1572 scc->dst_set_key = *dst_set_key;
1573 src_set = lookup_set (task->step->session, src_set_key);
1574 GNUNET_assert (NULL != src_set);
1575 GNUNET_SET_copy_lazy (src_set->h,
1581 struct SetMutationProgressCls
1585 * Task to finish once all changes are through.
1587 struct TaskEntry *task;
1592 set_mutation_done (void *cls)
1594 struct SetMutationProgressCls *pc = cls;
1596 GNUNET_assert (pc->num_pending > 0);
1600 if (0 == pc->num_pending)
1602 struct TaskEntry *task = pc->task;
1609 task_start_apply_round (struct TaskEntry *task)
1611 struct ConsensusSession *session = task->step->session;
1612 struct SetKey sk_in;
1613 struct SetKey sk_out;
1614 struct RfnKey rk_in;
1615 struct SetEntry *set_out;
1616 struct ReferendumEntry *rfn_in;
1617 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1618 struct RfnElementInfo *ri;
1619 struct SetMutationProgressCls *progress_cls;
1621 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1622 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1623 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1625 set_out = lookup_set (session, &sk_out);
1626 if (NULL == set_out)
1628 create_set_copy_for_task (task, &sk_in, &sk_out);
1632 rfn_in = lookup_rfn (session, &rk_in);
1633 GNUNET_assert (NULL != rfn_in);
1635 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1636 progress_cls->task = task;
1638 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1640 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1642 uint16_t majority_num;
1643 enum ReferendumVote majority_vote;
1645 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1647 switch (majority_vote)
1650 progress_cls->num_pending++;
1651 GNUNET_assert (GNUNET_OK ==
1652 GNUNET_SET_add_element (set_out->h,
1658 progress_cls->num_pending++;
1659 GNUNET_assert (GNUNET_OK ==
1660 GNUNET_SET_remove_element (set_out->h,
1674 if (progress_cls->num_pending == 0)
1676 // call closure right now, no pending ops
1677 GNUNET_free (progress_cls);
1683 #define THRESH(s) (((s)->num_peers / 3))
1687 task_start_grade (struct TaskEntry *task)
1689 struct ConsensusSession *session = task->step->session;
1690 struct ReferendumEntry *output_rfn;
1691 struct ReferendumEntry *input_rfn;
1692 struct DiffEntry *input_diff;
1693 struct RfnKey rfn_key;
1694 struct DiffKey diff_key;
1695 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1696 struct RfnElementInfo *ri;
1697 unsigned int gradecast_confidence = 2;
1699 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1700 output_rfn = lookup_rfn (session, &rfn_key);
1701 if (NULL == output_rfn)
1703 output_rfn = rfn_create (session->num_peers);
1704 output_rfn->key = rfn_key;
1705 put_rfn (session, output_rfn);
1708 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1709 input_diff = lookup_diff (session, &diff_key);
1710 GNUNET_assert (NULL != input_diff);
1712 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1713 input_rfn = lookup_rfn (session, &rfn_key);
1714 GNUNET_assert (NULL != input_rfn);
1716 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1718 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1720 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1722 uint16_t majority_num;
1723 enum ReferendumVote majority_vote;
1725 // XXX: we need contested votes and non-contested votes here
1726 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1728 if (majority_num <= session->num_peers / 3)
1729 majority_vote = VOTE_REMOVE;
1731 switch (majority_vote)
1736 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1739 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1748 uint16_t noncontested;
1749 noncontested = rfn_noncontested (input_rfn);
1750 if (noncontested < (session->num_peers / 3) * 2)
1752 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1754 if (noncontested < (session->num_peers / 3) + 1)
1756 gradecast_confidence = 0;
1760 if (gradecast_confidence >= 1)
1761 rfn_commit (output_rfn, task->key.leader);
1763 if (gradecast_confidence <= 1)
1764 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1771 task_start_reconcile (struct TaskEntry *task)
1773 struct SetEntry *input;
1774 struct SetOpCls *setop = &task->cls.setop;
1775 struct ConsensusSession *session = task->step->session;
1777 input = lookup_set (session, &setop->input_set);
1778 GNUNET_assert (NULL != input);
1779 GNUNET_assert (NULL != input->h);
1781 /* We create the outputs for the operation here
1782 (rather than in the set operation callback)
1783 because we want something valid in there, even
1784 if the other peer doesn't talk to us */
1786 if (SET_KIND_NONE != setop->output_set.set_kind)
1788 /* If we don't have an existing output set,
1789 we clone the input set. */
1790 if (NULL == lookup_set (session, &setop->output_set))
1792 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1797 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1799 if (NULL == lookup_rfn (session, &setop->output_rfn))
1801 struct ReferendumEntry *rfn;
1803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804 "P%u: output rfn <%s> missing, creating.\n",
1805 session->local_peer_idx,
1806 debug_str_rfn_key (&setop->output_rfn));
1808 rfn = rfn_create (session->num_peers);
1809 rfn->key = setop->output_rfn;
1810 put_rfn (session, rfn);
1814 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1816 if (NULL == lookup_diff (session, &setop->output_diff))
1818 struct DiffEntry *diff;
1820 diff = diff_create ();
1821 diff->key = setop->output_diff;
1822 put_diff (session, diff);
1826 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
1828 /* XXX: mark the corresponding rfn as commited if necessary */
1833 if (task->key.peer1 == session->local_peer_idx)
1835 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1837 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1838 "P%u: Looking up set {%s} to run remote union\n",
1839 session->local_peer_idx,
1840 debug_str_set_key (&setop->input_set));
1842 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1843 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
1845 rcm.kind = htons (task->key.kind);
1846 rcm.peer1 = htons (task->key.peer1);
1847 rcm.peer2 = htons (task->key.peer2);
1848 rcm.leader = htons (task->key.leader);
1849 rcm.repetition = htons (task->key.repetition);
1851 GNUNET_assert (NULL == setop->op);
1852 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
1853 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
1855 // XXX: maybe this should be done while
1856 // setting up tasks alreays?
1857 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
1858 &session->global_id,
1860 GNUNET_SET_RESULT_SYMMETRIC,
1864 commit_set (session, task);
1866 else if (task->key.peer2 == session->local_peer_idx)
1868 /* Wait for the other peer to contact us */
1869 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
1870 session->local_peer_idx, task->key.peer1);
1872 if (NULL != setop->op)
1874 commit_set (session, task);
1879 /* We made an error while constructing the task graph. */
1886 task_start_eval_echo (struct TaskEntry *task)
1888 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1889 struct ReferendumEntry *input_rfn;
1890 struct RfnElementInfo *ri;
1891 struct SetEntry *output_set;
1892 struct SetMutationProgressCls *progress_cls;
1893 struct ConsensusSession *session = task->step->session;
1894 struct SetKey sk_in;
1895 struct SetKey sk_out;
1896 struct RfnKey rk_in;
1898 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1899 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
1900 output_set = lookup_set (session, &sk_out);
1901 if (NULL == output_set)
1903 create_set_copy_for_task (task, &sk_in, &sk_out);
1907 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1908 "Evaluating referendum in Task {%s}\n",
1909 debug_str_task_key (&task->key));
1911 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1912 progress_cls->task = task;
1914 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1915 input_rfn = lookup_rfn (session, &rk_in);
1917 GNUNET_assert (NULL != input_rfn);
1919 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1920 GNUNET_assert (NULL != iter);
1922 while (GNUNET_YES ==
1923 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1925 (const void **) &ri))
1927 enum ReferendumVote majority_vote;
1928 uint16_t majority_num;
1930 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1932 if (majority_num < session->num_peers / 3)
1934 /* It is not the case that all nonfaulty peers
1935 echoed the same value. Since we're doing a set reconciliation, we
1936 can't simply send "nothing" for the value. Thus we mark our 'confirm'
1937 reconciliation as contested. Other peers might not know that the
1938 leader is faulty, thus we still re-distribute in the confirmation
1940 output_set->is_contested = GNUNET_YES;
1943 switch (majority_vote)
1946 progress_cls->num_pending++;
1947 GNUNET_assert (GNUNET_OK ==
1948 GNUNET_SET_add_element (output_set->h,
1954 progress_cls->num_pending++;
1955 GNUNET_assert (GNUNET_OK ==
1956 GNUNET_SET_remove_element (output_set->h,
1962 /* Nothing to do. */
1970 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1972 if (progress_cls->num_pending == 0)
1974 // call closure right now, no pending ops
1975 GNUNET_free (progress_cls);
1982 task_start_finish (struct TaskEntry *task)
1984 struct SetEntry *final_set;
1985 struct ConsensusSession *session = task->step->session;
1987 final_set = lookup_set (session, &task->cls.finish.input_set);
1989 GNUNET_assert (NULL != final_set);
1992 GNUNET_SET_iterate (final_set->h,
1993 send_to_client_iter,
1998 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2000 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2002 GNUNET_assert (GNUNET_NO == task->is_started);
2003 GNUNET_assert (GNUNET_NO == task->is_finished);
2004 GNUNET_assert (NULL != task->start);
2008 task->is_started = GNUNET_YES;
2012 static void finish_step (struct Step *step)
2016 GNUNET_assert (step->finished_tasks == step->tasks_len);
2017 GNUNET_assert (GNUNET_YES == step->is_running);
2018 GNUNET_assert (GNUNET_NO == step->is_finished);
2020 #ifdef GNUNET_EXTRA_LOGGING
2021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2022 "All tasks of step `%s' with %u subordinates finished.\n",
2024 step->subordinates_len);
2027 for (i = 0; i < step->subordinates_len; i++)
2029 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
2030 step->subordinates[i]->pending_prereq--;
2031 #ifdef GNUNET_EXTRA_LOGGING
2032 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2033 "Decreased pending_prereq to %u for step `%s'.\n",
2034 step->subordinates[i]->pending_prereq,
2035 step->subordinates[i]->debug_name);
2040 step->is_finished = GNUNET_YES;
2042 // XXX: maybe schedule as task to avoid recursion?
2043 run_ready_steps (step->session);
2048 * Run all steps of the session that don't any
2049 * more dependencies.
2052 run_ready_steps (struct ConsensusSession *session)
2056 step = session->steps_head;
2058 while (NULL != step)
2060 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) )
2064 GNUNET_assert (0 == step->finished_tasks);
2066 #ifdef GNUNET_EXTRA_LOGGING
2067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2068 session->local_peer_idx,
2070 step->round, step->tasks_len, step->subordinates_len);
2073 step->is_running = GNUNET_YES;
2074 for (i = 0; i < step->tasks_len; i++)
2075 start_task (session, step->tasks[i]);
2077 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2078 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2081 /* Running the next ready steps will be triggered by task completion */
2093 finish_task (struct TaskEntry *task)
2095 GNUNET_assert (GNUNET_NO == task->is_finished);
2096 task->is_finished = GNUNET_YES;
2098 task->step->finished_tasks++;
2100 if (task->step->finished_tasks == task->step->tasks_len)
2101 finish_step (task->step);
2106 * Search peer in the list of peers in session.
2108 * @param peer peer to find
2109 * @param session session with peer
2110 * @return index of peer, -1 if peer is not in session
2113 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2116 for (i = 0; i < session->num_peers; i++)
2117 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2124 * Compute a global, (hopefully) unique consensus session id,
2125 * from the local id of the consensus session, and the identities of all participants.
2126 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2127 * exactly the same peers, the global id will be different.
2129 * @param session session to generate the global id for
2130 * @param local_session_id local id of the consensus session
2133 compute_global_id (struct ConsensusSession *session,
2134 const struct GNUNET_HashCode *local_session_id)
2136 const char *salt = "gnunet-service-consensus/session_id";
2138 GNUNET_assert (GNUNET_YES ==
2139 GNUNET_CRYPTO_kdf (&session->global_id,
2140 sizeof (struct GNUNET_HashCode),
2144 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2146 sizeof (struct GNUNET_HashCode),
2152 * Compare two peer identities.
2154 * @param h1 some peer identity
2155 * @param h2 some peer identity
2156 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2159 peer_id_cmp (const void *h1, const void *h2)
2161 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2166 * Create the sorted list of peers for the session,
2167 * add the local peer if not in the join message.
2170 initialize_session_peer_list (struct ConsensusSession *session,
2171 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2173 unsigned int local_peer_in_list;
2174 uint32_t listed_peers;
2175 const struct GNUNET_PeerIdentity *msg_peers;
2178 GNUNET_assert (NULL != join_msg);
2180 /* peers in the join message, may or may not include the local peer */
2181 listed_peers = ntohl (join_msg->num_peers);
2183 session->num_peers = listed_peers;
2185 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2187 local_peer_in_list = GNUNET_NO;
2188 for (i = 0; i < listed_peers; i++)
2190 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2192 local_peer_in_list = GNUNET_YES;
2197 if (GNUNET_NO == local_peer_in_list)
2198 session->num_peers++;
2200 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2202 if (GNUNET_NO == local_peer_in_list)
2203 session->peers[session->num_peers - 1] = my_peer;
2205 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2206 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2210 static struct TaskEntry *
2211 lookup_task (struct ConsensusSession *session,
2212 struct TaskKey *key)
2214 struct GNUNET_HashCode hash;
2217 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2219 GNUNET_h2s (&hash));
2220 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2225 * Called when another peer wants to do a set operation with the
2228 * @param cls closure
2229 * @param other_peer the other peer
2230 * @param context_msg message with application specific information from
2232 * @param request request from the other peer, use GNUNET_SET_accept
2233 * to accept it, otherwise the request will be refused
2234 * Note that we don't use a return value here, as it is also
2235 * necessary to specify the set we want to do the operation with,
2236 * whith sometimes can be derived from the context message.
2237 * Also necessary to specify the timeout.
2240 set_listen_cb (void *cls,
2241 const struct GNUNET_PeerIdentity *other_peer,
2242 const struct GNUNET_MessageHeader *context_msg,
2243 struct GNUNET_SET_Request *request)
2245 struct ConsensusSession *session = cls;
2247 struct TaskEntry *task;
2248 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2250 if (NULL == context_msg)
2252 GNUNET_break_op (0);
2256 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2258 GNUNET_break_op (0);
2262 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2264 GNUNET_break_op (0);
2268 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2270 tk = ((struct TaskKey) {
2271 .kind = ntohs (cm->kind),
2272 .peer1 = ntohs (cm->peer1),
2273 .peer2 = ntohs (cm->peer2),
2274 .repetition = ntohs (cm->repetition),
2275 .leader = ntohs (cm->leader),
2278 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2279 session->local_peer_idx, debug_str_task_key (&tk));
2281 task = lookup_task (session, &tk);
2285 GNUNET_break_op (0);
2289 if (GNUNET_YES == task->is_finished)
2291 GNUNET_break_op (0);
2295 if (task->key.peer2 != session->local_peer_idx)
2297 /* We're being asked, so we must be thne 2nd peer. */
2298 GNUNET_break_op (0);
2302 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2303 (task->key.peer2 == session->local_peer_idx)));
2305 task->cls.setop.op = GNUNET_SET_accept (request,
2306 GNUNET_SET_RESULT_SYMMETRIC,
2310 /* If the task hasn't been started yet,
2311 we wait for that until we commit. */
2313 if (GNUNET_YES == task->is_started)
2315 commit_set (session, task);
2322 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2323 struct TaskEntry *t)
2325 struct GNUNET_HashCode round_hash;
2328 GNUNET_assert (NULL != t->step);
2330 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2334 if (s->tasks_len == s->tasks_cap)
2336 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2337 GNUNET_array_grow (s->tasks,
2342 #ifdef GNUNET_EXTRA_LOGGING
2343 GNUNET_assert (NULL != s->debug_name);
2344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2345 debug_str_task_key (&t->key),
2349 s->tasks[s->tasks_len] = t;
2352 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2353 GNUNET_assert (GNUNET_OK ==
2354 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2355 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2360 install_step_timeouts (struct ConsensusSession *session)
2362 /* Given the fully constructed task graph
2363 with rounds for tasks, we can give the tasks timeouts. */
2365 // unsigned int max_round;
2367 /* XXX: implement! */
2373 * Arrange two peers in some canonical order.
2376 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2381 GNUNET_assert (*p1 < n);
2382 GNUNET_assert (*p2 < n);
2395 /* For uniformly random *p1, *p2,
2396 this condition is true with 50% chance */
2397 if (((b - a) + n) % n <= n / 2)
2411 * Record @a dep as a dependency of @a step.
2414 step_depend_on (struct Step *step, struct Step *dep)
2416 /* We're not checking for cyclic dependencies,
2417 but this is a cheap sanity check. */
2418 GNUNET_assert (step != dep);
2419 GNUNET_assert (NULL != step);
2420 GNUNET_assert (NULL != dep);
2421 GNUNET_assert (dep->round <= step->round);
2423 #ifdef GNUNET_EXTRA_LOGGING
2424 /* Make sure we have complete debugging information.
2425 Also checks that we don't screw up too badly
2426 constructing the task graph. */
2427 GNUNET_assert (NULL != step->debug_name);
2428 GNUNET_assert (NULL != dep->debug_name);
2429 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430 "Making step `%s' depend on `%s'\n",
2435 if (dep->subordinates_cap == dep->subordinates_len)
2437 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2438 GNUNET_array_grow (dep->subordinates,
2439 dep->subordinates_cap,
2443 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2445 dep->subordinates[dep->subordinates_len] = step;
2446 dep->subordinates_len++;
2448 step->pending_prereq++;
2452 static struct Step *
2453 create_step (struct ConsensusSession *session, int round)
2456 step = GNUNET_new (struct Step);
2457 step->session = session;
2458 step->round = round;
2459 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2460 session->steps_tail,
2467 * Construct the task graph for a single
2471 construct_task_graph_gradecast (struct ConsensusSession *session,
2474 struct Step *step_before,
2475 struct Step *step_after)
2477 uint16_t n = session->num_peers;
2478 uint16_t me = session->local_peer_idx;
2483 /* The task we're currently setting up. */
2484 struct TaskEntry task;
2487 struct Step *prev_step;
2493 round = step_before->round + 1;
2495 /* gcast step 1: leader disseminates */
2497 step = create_step (session, round);
2499 #ifdef GNUNET_EXTRA_LOGGING
2500 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2502 step_depend_on (step, step_before);
2506 for (k = 0; k < n; k++)
2512 arrange_peers (&p1, &p2, n);
2513 task = ((struct TaskEntry) {
2515 .start = task_start_reconcile,
2516 .cancel = task_cancel_reconcile,
2517 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2519 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2520 put_task (session->taskmap, &task);
2522 /* We run this task to make sure that the leader
2523 has the stored the SET_KIND_LEADER set of himself,
2524 so he can participate in the rest of the gradecast
2525 without the code having to handle any special cases. */
2526 task = ((struct TaskEntry) {
2528 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2529 .start = task_start_reconcile,
2530 .cancel = task_cancel_reconcile,
2532 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2533 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2534 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2535 put_task (session->taskmap, &task);
2541 arrange_peers (&p1, &p2, n);
2542 task = ((struct TaskEntry) {
2544 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2545 .start = task_start_reconcile,
2546 .cancel = task_cancel_reconcile,
2548 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2549 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2550 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2551 put_task (session->taskmap, &task);
2554 /* gcast phase 2: echo */
2557 step = create_step (session, round);
2558 #ifdef GNUNET_EXTRA_LOGGING
2559 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2561 step_depend_on (step, prev_step);
2563 for (k = 0; k < n; k++)
2567 arrange_peers (&p1, &p2, n);
2568 task = ((struct TaskEntry) {
2570 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2571 .start = task_start_reconcile,
2572 .cancel = task_cancel_reconcile,
2574 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2575 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2576 put_task (session->taskmap, &task);
2580 /* Same round, since step only has local tasks */
2581 step = create_step (session, round);
2582 #ifdef GNUNET_EXTRA_LOGGING
2583 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2585 step_depend_on (step, prev_step);
2587 arrange_peers (&p1, &p2, n);
2588 task = ((struct TaskEntry) {
2589 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2591 .start = task_start_eval_echo
2593 put_task (session->taskmap, &task);
2597 step = create_step (session, round);
2598 #ifdef GNUNET_EXTRA_LOGGING
2599 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2601 step_depend_on (step, prev_step);
2603 /* gcast phase 3: confirmation and grading */
2604 for (k = 0; k < n; k++)
2608 arrange_peers (&p1, &p2, n);
2609 task = ((struct TaskEntry) {
2611 .start = task_start_reconcile,
2612 .cancel = task_cancel_reconcile,
2613 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2615 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2616 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2617 /* If there was at least one element in the echo round that was
2618 contested (i.e. it had no n-t majority), then we let the other peers
2619 know, and other peers let us know. The contested flag for each peer is
2620 stored in the rfn. */
2621 task.cls.setop.transceive_contested = GNUNET_YES;
2622 put_task (session->taskmap, &task);
2626 /* Same round, since step only has local tasks */
2627 step = create_step (session, round);
2628 #ifdef GNUNET_EXTRA_LOGGING
2629 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2631 step_depend_on (step, prev_step);
2633 task = ((struct TaskEntry) {
2635 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2636 .start = task_start_grade,
2638 put_task (session->taskmap, &task);
2640 step_depend_on (step_after, step);
2645 construct_task_graph (struct ConsensusSession *session)
2647 uint16_t n = session->num_peers;
2650 uint16_t me = session->local_peer_idx;
2655 /* The task we're currently setting up. */
2656 struct TaskEntry task;
2658 /* Current leader */
2662 struct Step *prev_step;
2664 unsigned int round = 0;
2668 // XXX: introduce first step,
2669 // where we wait for all insert acks
2670 // from the set service
2672 /* faster but brittle all-to-all */
2674 // XXX: Not implemented yet
2676 /* all-to-all step */
2678 step = create_step (session, round);
2680 #ifdef GNUNET_EXTRA_LOGGING
2681 step->debug_name = GNUNET_strdup ("all to all");
2684 for (i = 0; i < n; i++)
2688 arrange_peers (&p1, &p2, n);
2689 task = ((struct TaskEntry) {
2690 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2692 .start = task_start_reconcile,
2693 .cancel = task_cancel_reconcile,
2695 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2696 task.cls.setop.output_set = task.cls.setop.input_set;
2697 task.cls.setop.do_not_remove = GNUNET_YES;
2698 put_task (session->taskmap, &task);
2706 /* Byzantine union */
2708 /* sequential repetitions of the gradecasts */
2709 for (i = 0; i < t + 1; i++)
2711 struct Step *step_rep_start;
2712 struct Step *step_rep_end;
2714 /* Every repetition is in a separate round. */
2715 step_rep_start = create_step (session, round);
2716 #ifdef GNUNET_EXTRA_LOGGING
2717 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2720 step_depend_on (step_rep_start, prev_step);
2722 /* gradecast has three rounds */
2724 step_rep_end = create_step (session, round);
2725 #ifdef GNUNET_EXTRA_LOGGING
2726 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2729 /* parallel gradecasts */
2730 for (lead = 0; lead < n; lead++)
2731 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2733 task = ((struct TaskEntry) {
2734 .step = step_rep_end,
2735 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2736 .start = task_start_apply_round,
2738 put_task (session->taskmap, &task);
2740 prev_step = step_rep_end;
2743 /* There is no next gradecast round, thus the final
2744 start step is the overall end step of the gradecasts */
2746 step = create_step (session, round);
2747 #ifdef GNUNET_EXTRA_LOGGING
2748 GNUNET_asprintf (&step->debug_name, "finish");
2750 step_depend_on (step, prev_step);
2752 task = ((struct TaskEntry) {
2754 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2755 .start = task_start_finish,
2757 task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
2759 put_task (session->taskmap, &task);
2764 * Initialize the session, continue receiving messages from the owning client
2766 * @param session the session to initialize
2767 * @param join_msg the join message from the client
2770 initialize_session (struct ConsensusSession *session,
2771 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2773 struct ConsensusSession *other_session;
2775 initialize_session_peer_list (session, join_msg);
2776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2777 compute_global_id (session, &join_msg->session_id);
2779 /* Check if some local client already owns the session.
2780 It is only legal to have a session with an existing global id
2781 if all other sessions with this global id are finished.*/
2782 other_session = sessions_head;
2783 while (NULL != other_session)
2785 if ((other_session != session) &&
2786 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2788 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2790 // GNUNET_break (0);
2791 // destroy_session (session);
2796 other_session = other_session->next;
2799 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2800 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2802 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
2803 (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2805 session->local_peer_idx = get_peer_idx (&my_peer, session);
2806 GNUNET_assert (-1 != session->local_peer_idx);
2807 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2808 &session->global_id,
2809 set_listen_cb, session);
2810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2812 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2813 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2814 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2815 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2818 struct SetEntry *client_set;
2819 client_set = GNUNET_new (struct SetEntry);
2820 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2821 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2822 put_set (session, client_set);
2825 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2827 /* Just construct the task graph,
2828 but don't run anything until the client calls conclude. */
2829 construct_task_graph (session);
2831 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2835 static struct ConsensusSession *
2836 get_session_by_client (struct GNUNET_SERVER_Client *client)
2838 struct ConsensusSession *session;
2840 session = sessions_head;
2841 while (NULL != session)
2843 if (session->client == client)
2845 session = session->next;
2852 * Called when a client wants to join a consensus session.
2855 * @param client client that sent the message
2856 * @param m message sent by the client
2859 client_join (void *cls,
2860 struct GNUNET_SERVER_Client *client,
2861 const struct GNUNET_MessageHeader *m)
2863 struct ConsensusSession *session;
2865 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
2867 session = get_session_by_client (client);
2868 if (NULL != session)
2871 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2874 session = GNUNET_new (struct ConsensusSession);
2875 session->client = client;
2876 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
2877 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2878 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
2879 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2881 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
2886 client_insert_done (void *cls)
2893 * Called when a client performs an insert operation.
2895 * @param cls (unused)
2896 * @param client client handle
2897 * @param m message sent by the client
2900 client_insert (void *cls,
2901 struct GNUNET_SERVER_Client *client,
2902 const struct GNUNET_MessageHeader *m)
2904 struct ConsensusSession *session;
2905 struct GNUNET_CONSENSUS_ElementMessage *msg;
2906 struct GNUNET_SET_Element *element;
2907 ssize_t element_size;
2908 struct GNUNET_SET_Handle *initial_set;
2910 session = get_session_by_client (client);
2912 if (NULL == session)
2915 GNUNET_SERVER_client_disconnect (client);
2919 if (GNUNET_YES == session->conclude_started)
2922 GNUNET_SERVER_client_disconnect (client);
2926 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2927 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2928 if (element_size < 0)
2934 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
2935 element->element_type = msg->element_type;
2936 element->size = element_size;
2937 memcpy (&element[1], &msg[1], element_size);
2938 element->data = &element[1];
2940 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
2941 struct SetEntry *entry;
2942 entry = lookup_set (session, &key);
2943 GNUNET_assert (NULL != entry);
2944 initial_set = entry->h;
2946 session->num_client_insert_pending++;
2947 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
2949 #ifdef GNUNET_EXTRA_LOGGING
2951 struct GNUNET_HashCode hash;
2953 GNUNET_SET_element_hash (element, &hash);
2955 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
2956 session->local_peer_idx,
2957 GNUNET_h2s (&hash));
2961 GNUNET_free (element);
2962 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2967 * Called when a client performs the conclude operation.
2969 * @param cls (unused)
2970 * @param client client handle
2971 * @param message message sent by the client
2974 client_conclude (void *cls,
2975 struct GNUNET_SERVER_Client *client,
2976 const struct GNUNET_MessageHeader *message)
2978 struct ConsensusSession *session;
2980 session = get_session_by_client (client);
2981 if (NULL == session)
2983 /* client not found */
2985 GNUNET_SERVER_client_disconnect (client);
2989 if (GNUNET_YES == session->conclude_started)
2991 /* conclude started twice */
2993 GNUNET_SERVER_client_disconnect (client);
2994 destroy_session (session);
2998 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
3000 session->conclude_started = GNUNET_YES;
3002 install_step_timeouts (session);
3003 run_ready_steps (session);
3006 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3011 * Called to clean up, after a shutdown has been requested.
3013 * @param cls closure
3014 * @param tc context information (why was this task triggered now)
3017 shutdown_task (void *cls,
3018 const struct GNUNET_SCHEDULER_TaskContext *tc)
3020 while (NULL != sessions_head)
3021 destroy_session (sessions_head);
3023 GNUNET_STATISTICS_destroy (statistics, GNUNET_YES);
3024 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
3029 * Clean up after a client after it is
3030 * disconnected (either by us or by itself)
3032 * @param cls closure, unused
3033 * @param client the client to clean up after
3036 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
3038 struct ConsensusSession *session;
3040 session = get_session_by_client (client);
3041 if (NULL == session)
3043 // FIXME: destroy if we can
3049 * Start processing consensus requests.
3051 * @param cls closure
3052 * @param server the initialized server
3053 * @param c configuration to use
3056 run (void *cls, struct GNUNET_SERVER_Handle *server,
3057 const struct GNUNET_CONFIGURATION_Handle *c)
3059 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
3060 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3061 sizeof (struct GNUNET_MessageHeader)},
3062 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3063 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3069 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3071 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
3073 GNUNET_SCHEDULER_shutdown ();
3076 statistics = GNUNET_STATISTICS_create ("consensus", cfg);
3077 GNUNET_SERVER_add_handlers (server, server_handlers);
3078 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
3079 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
3080 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3085 * The main function for the consensus service.
3087 * @param argc number of arguments from the command line
3088 * @param argv command line arguments
3089 * @return 0 ok, 1 on error
3092 main (int argc, char *const *argv)
3095 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
3096 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
3097 return (GNUNET_OK == ret) ? 0 : 1;