2 This file is part of GNUnet
3 Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file consensus/gnunet-service-consensus.c
23 * @brief multi-peer set reconciliation
24 * @author Florian Dold
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_applications.h"
31 #include "gnunet_set_service.h"
32 #include "gnunet_consensus_service.h"
33 #include "consensus_protocol.h"
34 #include "consensus.h"
37 #define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
43 * Vote that nothing should change.
44 * This option is never voted explicitly.
48 * Vote that an element should be added.
52 * Vote that an element should be removed.
58 GNUNET_NETWORK_STRUCT_BEGIN
61 struct ContestedPayload
66 * Tuple of integers that together
67 * identify a task uniquely.
71 * A value from 'enum PhaseKind'.
73 uint16_t kind GNUNET_PACKED;
76 * Number of the first peer
79 int16_t peer1 GNUNET_PACKED;
82 * Number of the second peer in canonical order.
84 int16_t peer2 GNUNET_PACKED;
87 * Repetition of the gradecast phase.
89 int16_t repetition GNUNET_PACKED;
92 * Leader in the gradecast phase.
94 * Can be different from both peer1 and peer2.
96 int16_t leader GNUNET_PACKED;
103 int set_kind GNUNET_PACKED;
104 int k1 GNUNET_PACKED;
105 int k2 GNUNET_PACKED;
112 struct GNUNET_SET_Handle *h;
114 * GNUNET_YES if the set resulted
115 * from applying a referendum with contested
124 int diff_kind GNUNET_PACKED;
125 int k1 GNUNET_PACKED;
126 int k2 GNUNET_PACKED;
131 int rfn_kind GNUNET_PACKED;
132 int k1 GNUNET_PACKED;
133 int k2 GNUNET_PACKED;
137 GNUNET_NETWORK_STRUCT_END
141 PHASE_KIND_ALL_TO_ALL,
142 PHASE_KIND_GRADECAST_LEADER,
143 PHASE_KIND_GRADECAST_ECHO,
144 PHASE_KIND_GRADECAST_ECHO_GRADE,
145 PHASE_KIND_GRADECAST_CONFIRM,
146 PHASE_KIND_GRADECAST_CONFIRM_GRADE,
148 * Apply a repetition of the all-to-all
149 * gradecast to the current set.
151 PHASE_KIND_APPLY_REP,
160 SET_KIND_LEADER_PROPOSAL,
161 SET_KIND_ECHO_RESULT,
167 DIFF_KIND_LEADER_PROPOSAL,
168 DIFF_KIND_LEADER_CONSENSUS,
169 DIFF_KIND_GRADECAST_RESULT,
177 RFN_KIND_GRADECAST_RESULT
183 struct SetKey input_set;
185 struct SetKey output_set;
186 struct RfnKey output_rfn;
187 struct DiffKey output_diff;
191 int transceive_contested;
193 struct GNUNET_SET_OperationHandle *op;
199 struct SetKey input_set;
203 * Closure for both @a start_task
204 * and @a cancel_task.
208 struct SetOpCls setop;
209 struct FinishCls finish;
214 typedef void (*TaskFunc) (struct TaskEntry *task);
217 * Node in the consensus task graph.
232 union TaskFuncCls cls;
239 * All steps of one session are in a
240 * linked list for easier deallocation.
245 * All steps of one session are in a
246 * linked list for easier deallocation.
250 struct ConsensusSession *session;
252 struct TaskEntry **tasks;
253 unsigned int tasks_len;
254 unsigned int tasks_cap;
256 unsigned int finished_tasks;
259 * Tasks that have this task as dependency.
261 * We store pointers to subordinates rather
262 * than to prerequisites since it makes
263 * tracking the readiness of a task easier.
265 struct Step **subordinates;
266 unsigned int subordinates_len;
267 unsigned int subordinates_cap;
270 * Counter for the prerequisites of
273 size_t pending_prereq;
276 * Task that will run this step despite
277 * any pending prerequisites.
279 struct GNUNET_SCHEDULER_Task *timeout_task;
281 unsigned int is_running;
283 unsigned int is_finished;
286 * Synchrony round of the task.
287 * Determines the deadline for the task.
292 * Human-readable name for
293 * the task, used for debugging.
299 struct RfnElementInfo
301 const struct GNUNET_SET_Element *element;
304 * GNUNET_YES if the peer votes for the proposal.
309 * Proposal for this element,
310 * can only be VOTE_ADD or VOTE_REMOVE.
312 enum ReferendumVote proposal;
316 struct ReferendumEntry
321 * Elements where there is at least one proposed change.
323 * Maps the hash of the GNUNET_SET_Element
324 * to 'struct RfnElementInfo'.
326 struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
328 unsigned int num_peers;
331 * Stores, for every peer in the session,
332 * whether the peer finished the whole referendum.
334 * Votes from peers are only counted if they're
335 * marked as commited (#GNUNET_YES) in the referendum.
337 * Otherwise (#GNUNET_NO), the requested changes are
338 * not counted for majority votes or thresholds.
344 * Contestation state of the peer. If a peer is contested, the values it
345 * contributed are still counted for applying changes, but the grading is
352 struct DiffElementInfo
354 const struct GNUNET_SET_Element *element;
357 * Positive weight for 'add', negative
358 * weights for 'remove'.
370 struct GNUNET_CONTAINER_MultiHashMap *changes;
376 * A consensus session consists of one local client and the remote authorities.
378 struct ConsensusSession
381 * Consensus sessions are kept in a DLL.
383 struct ConsensusSession *next;
386 * Consensus sessions are kept in a DLL.
388 struct ConsensusSession *prev;
390 unsigned int num_client_insert_pending;
392 struct GNUNET_CONTAINER_MultiHashMap *setmap;
393 struct GNUNET_CONTAINER_MultiHashMap *rfnmap;
394 struct GNUNET_CONTAINER_MultiHashMap *diffmap;
397 * Array of peers with length 'num_peers'.
399 int *peers_blacklisted;
402 * Mapping from (hashed) TaskKey to TaskEntry.
404 * We map the application_id for a round to the task that should be
405 * executed, so we don't have to go through all task whenever we get
406 * an incoming set op request.
408 struct GNUNET_CONTAINER_MultiHashMap *taskmap;
410 struct Step *steps_head;
411 struct Step *steps_tail;
413 int conclude_started;
418 * Global consensus identification, computed
419 * from the session id and participating authorities.
421 struct GNUNET_HashCode global_id;
424 * Client that inhabits the session
426 struct GNUNET_SERVER_Client *client;
429 * Queued messages to the client.
431 struct GNUNET_MQ_Handle *client_mq;
434 * Time when the conclusion of the consensus should begin.
436 struct GNUNET_TIME_Absolute conclude_start;
439 * Timeout for all rounds together, single rounds will schedule a timeout task
440 * with a fraction of the conclude timeout.
441 * Only valid once the current round is not CONSENSUS_ROUND_BEGIN.
443 struct GNUNET_TIME_Absolute conclude_deadline;
445 struct GNUNET_PeerIdentity *peers;
448 * Number of other peers in the consensus.
450 unsigned int num_peers;
453 * Index of the local peer in the peers array
455 unsigned int local_peer_idx;
458 * Listener for requests from other peers.
459 * Uses the session's global id as app id.
461 struct GNUNET_SET_ListenHandle *set_listener;
465 * Linked list of sessions this peer participates in.
467 static struct ConsensusSession *sessions_head;
470 * Linked list of sessions this peer participates in.
472 static struct ConsensusSession *sessions_tail;
475 * Configuration of the consensus service.
477 static const struct GNUNET_CONFIGURATION_Handle *cfg;
480 * Handle to the server for this service.
482 static struct GNUNET_SERVER_Handle *srv;
485 * Peer that runs this service.
487 static struct GNUNET_PeerIdentity my_peer;
491 finish_task (struct TaskEntry *task);
494 run_ready_steps (struct ConsensusSession *session);
497 phasename (uint16_t phase)
501 case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
502 case PHASE_KIND_FINISH: return "FINISH";
503 case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
504 case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
505 case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
506 case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
507 case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
508 case PHASE_KIND_APPLY_REP: return "APPLY_REP";
509 default: return "(unknown)";
515 setname (uint16_t kind)
519 case SET_KIND_CURRENT: return "CURRENT";
520 case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
521 case SET_KIND_NONE: return "NONE";
522 default: return "(unknown)";
527 rfnname (uint16_t kind)
531 case RFN_KIND_NONE: return "NONE";
532 case RFN_KIND_ECHO: return "ECHO";
533 case RFN_KIND_CONFIRM: return "CONFIRM";
534 default: return "(unknown)";
539 diffname (uint16_t kind)
543 case DIFF_KIND_NONE: return "NONE";
544 case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
545 case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
546 case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
547 default: return "(unknown)";
551 #ifdef GNUNET_EXTRA_LOGGING
555 debug_str_element (const struct GNUNET_SET_Element *el)
557 struct GNUNET_HashCode hash;
559 GNUNET_SET_element_hash (el, &hash);
561 return GNUNET_h2s (&hash);
565 debug_str_task_key (struct TaskKey *tk)
567 static char buf[256];
569 snprintf (buf, sizeof (buf),
570 "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
571 phasename (tk->kind), tk->peer1, tk->peer2,
572 tk->leader, tk->repetition);
578 debug_str_diff_key (struct DiffKey *dk)
580 static char buf[256];
582 snprintf (buf, sizeof (buf),
583 "DiffKey kind=%s, k1=%d, k2=%d",
584 diffname (dk->diff_kind), dk->k1, dk->k2);
590 debug_str_set_key (const struct SetKey *sk)
592 static char buf[256];
594 snprintf (buf, sizeof (buf),
595 "SetKey kind=%s, k1=%d, k2=%d",
596 setname (sk->set_kind), sk->k1, sk->k2);
603 debug_str_rfn_key (const struct RfnKey *rk)
605 static char buf[256];
607 snprintf (buf, sizeof (buf),
608 "RfnKey kind=%s, k1=%d, k2=%d",
609 rfnname (rk->rfn_kind), rk->k1, rk->k2);
614 #endif /* GNUNET_EXTRA_LOGGING */
618 * Destroy a session, free all resources associated with it.
620 * @param session the session to destroy
623 destroy_session (struct ConsensusSession *session)
625 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
626 if (NULL != session->set_listener)
628 GNUNET_SET_listen_cancel (session->set_listener);
629 session->set_listener = NULL;
631 if (NULL != session->client_mq)
633 GNUNET_MQ_destroy (session->client_mq);
634 session->client_mq = NULL;
635 /* The MQ cleanup will also disconnect the underlying client. */
636 session->client = NULL;
638 if (NULL != session->client)
640 GNUNET_SERVER_client_disconnect (session->client);
641 session->client = NULL;
643 GNUNET_free (session);
648 * Send the final result set of the consensus to the client, element by
652 * @param element the current element, NULL if all elements have been
654 * @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop.
657 send_to_client_iter (void *cls,
658 const struct GNUNET_SET_Element *element)
660 struct TaskEntry *task = (struct TaskEntry *) cls;
661 struct ConsensusSession *session = task->step->session;
662 struct GNUNET_MQ_Envelope *ev;
666 struct GNUNET_CONSENSUS_ElementMessage *m;
668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669 "P%d: sending element %s to client\n",
670 session->local_peer_idx,
671 debug_str_element (element));
673 ev = GNUNET_MQ_msg_extra (m, element->size,
674 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
675 m->element_type = htons (element->element_type);
676 memcpy (&m[1], element->data, element->size);
677 GNUNET_MQ_send (session->client_mq, ev);
681 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682 "P%d: finished iterating elements for client\n",
683 session->local_peer_idx);
684 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
685 GNUNET_MQ_send (session->client_mq, ev);
691 static struct SetEntry *
692 lookup_set (struct ConsensusSession *session, struct SetKey *key)
694 struct GNUNET_HashCode hash;
696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697 "P%u: looking up set {%s}\n",
698 session->local_peer_idx,
699 debug_str_set_key (key));
701 GNUNET_assert (SET_KIND_NONE != key->set_kind);
702 GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
703 return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
707 static struct DiffEntry *
708 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
710 struct GNUNET_HashCode hash;
712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713 "P%u: looking up diff {%s}\n",
714 session->local_peer_idx,
715 debug_str_diff_key (key));
717 GNUNET_assert (DIFF_KIND_NONE != key->diff_kind);
718 GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
719 return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
723 static struct ReferendumEntry *
724 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
726 struct GNUNET_HashCode hash;
728 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729 "P%u: looking up rfn {%s}\n",
730 session->local_peer_idx,
731 debug_str_rfn_key (key));
733 GNUNET_assert (RFN_KIND_NONE != key->rfn_kind);
734 GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
735 return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
740 diff_insert (struct DiffEntry *diff,
742 const struct GNUNET_SET_Element *element)
744 struct DiffElementInfo *di;
745 struct GNUNET_HashCode hash;
747 GNUNET_assert ( (1 == weight) || (-1 == weight));
749 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
750 "diff_insert with element size %u\n",
753 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
754 "hashing element\n");
756 GNUNET_SET_element_hash (element, &hash);
758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
761 di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
765 di = GNUNET_new (struct DiffElementInfo);
766 di->element = GNUNET_SET_element_dup (element);
767 GNUNET_assert (GNUNET_OK ==
768 GNUNET_CONTAINER_multihashmap_put (diff->changes,
770 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
778 rfn_commit (struct ReferendumEntry *rfn,
779 uint16_t commit_peer)
781 GNUNET_assert (commit_peer < rfn->num_peers);
783 rfn->peer_commited[commit_peer] = GNUNET_YES;
788 rfn_contest (struct ReferendumEntry *rfn,
789 uint16_t contested_peer)
791 GNUNET_assert (contested_peer < rfn->num_peers);
793 rfn->peer_contested[contested_peer] = GNUNET_YES;
798 rfn_noncontested (struct ReferendumEntry *rfn)
804 for (i = 0; i < rfn->num_peers; i++)
805 if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
812 rfn_vote (struct ReferendumEntry *rfn,
813 uint16_t voting_peer,
814 enum ReferendumVote vote,
815 const struct GNUNET_SET_Element *element)
817 struct RfnElementInfo *ri;
818 struct GNUNET_HashCode hash;
820 GNUNET_assert (voting_peer < rfn->num_peers);
822 /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
823 since VOTE_KEEP is implicit in not voting. */
824 GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
826 GNUNET_SET_element_hash (element, &hash);
827 ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
831 ri = GNUNET_new (struct RfnElementInfo);
832 ri->element = GNUNET_SET_element_dup (element);
833 ri->votes = GNUNET_new_array (rfn->num_peers, int);
834 GNUNET_assert (GNUNET_OK ==
835 GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
837 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
840 ri->votes[voting_peer] = GNUNET_YES;
846 task_other_peer (struct TaskEntry *task)
848 uint16_t me = task->step->session->local_peer_idx;
849 if (task->key.peer1 == me)
850 return task->key.peer2;
851 return task->key.peer1;
856 * Callback for set operation results. Called for each element
860 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
861 * @param status see enum GNUNET_SET_Status
864 set_result_cb (void *cls,
865 const struct GNUNET_SET_Element *element,
866 enum GNUNET_SET_Status status)
868 struct TaskEntry *task = cls;
869 struct ConsensusSession *session = task->step->session;
870 struct SetEntry *output_set = NULL;
871 struct DiffEntry *output_diff = NULL;
872 struct ReferendumEntry *output_rfn = NULL;
873 unsigned int other_idx;
874 struct SetOpCls *setop;
876 setop = &task->cls.setop;
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "P%u: got set result for {%s}, status %u\n",
881 session->local_peer_idx,
882 debug_str_task_key (&task->key),
885 if (GNUNET_NO == task->is_started)
891 if (GNUNET_YES == task->is_finished)
897 other_idx = task_other_peer (task);
899 if (SET_KIND_NONE != setop->output_set.set_kind)
901 output_set = lookup_set (session, &setop->output_set);
902 GNUNET_assert (NULL != output_set);
905 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
907 output_diff = lookup_diff (session, &setop->output_diff);
908 GNUNET_assert (NULL != output_diff);
911 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
913 output_rfn = lookup_rfn (session, &setop->output_rfn);
914 GNUNET_assert (NULL != output_rfn);
917 if (GNUNET_YES == session->peers_blacklisted[other_idx])
919 /* Peer might have been blacklisted
920 by a gradecast running in parallel, ignore elements from now */
921 if (GNUNET_SET_STATUS_ADD_LOCAL == status)
923 if (GNUNET_SET_STATUS_ADD_REMOTE == status)
927 if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
929 if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
931 GNUNET_assert (NULL != output_rfn);
932 rfn_contest (output_rfn, task_other_peer (task));
939 case GNUNET_SET_STATUS_ADD_LOCAL:
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "Adding element in Task {%s}\n",
942 debug_str_task_key (&task->key));
943 if (NULL != output_set)
945 // FIXME: record pending adds, use callback
946 GNUNET_SET_add_element (output_set->h,
950 #ifdef GNUNET_EXTRA_LOGGING
951 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
952 "P%u: adding element %s into set {%s} of task {%s}\n",
953 session->local_peer_idx,
954 debug_str_element (element),
955 debug_str_set_key (&setop->output_set),
956 debug_str_task_key (&task->key));
959 if (NULL != output_diff)
961 diff_insert (output_diff, 1, element);
962 #ifdef GNUNET_EXTRA_LOGGING
963 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
964 "P%u: adding element %s into diff {%s} of task {%s}\n",
965 session->local_peer_idx,
966 debug_str_element (element),
967 debug_str_diff_key (&setop->output_diff),
968 debug_str_task_key (&task->key));
971 if (NULL != output_rfn)
973 rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
974 #ifdef GNUNET_EXTRA_LOGGING
975 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
976 "P%u: adding element %s into rfn {%s} of task {%s}\n",
977 session->local_peer_idx,
978 debug_str_element (element),
979 debug_str_rfn_key (&setop->output_rfn),
980 debug_str_task_key (&task->key));
983 // XXX: add result to structures in task
985 case GNUNET_SET_STATUS_ADD_REMOTE:
986 if (GNUNET_YES == setop->do_not_remove)
988 if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
990 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991 "Removing element in Task {%s}\n",
992 debug_str_task_key (&task->key));
993 if (NULL != output_set)
995 // FIXME: record pending adds, use callback
996 GNUNET_SET_remove_element (output_set->h,
1000 #ifdef GNUNET_EXTRA_LOGGING
1001 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1002 "P%u: removing element %s from set {%s} of task {%s}\n",
1003 session->local_peer_idx,
1004 debug_str_element (element),
1005 debug_str_set_key (&setop->output_set),
1006 debug_str_task_key (&task->key));
1009 if (NULL != output_diff)
1011 diff_insert (output_diff, -1, element);
1012 #ifdef GNUNET_EXTRA_LOGGING
1013 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1014 "P%u: removing element %s from diff {%s} of task {%s}\n",
1015 session->local_peer_idx,
1016 debug_str_element (element),
1017 debug_str_diff_key (&setop->output_diff),
1018 debug_str_task_key (&task->key));
1021 if (NULL != output_rfn)
1023 rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1024 #ifdef GNUNET_EXTRA_LOGGING
1025 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1026 "P%u: removing element %s from rfn {%s} of task {%s}\n",
1027 session->local_peer_idx,
1028 debug_str_element (element),
1029 debug_str_rfn_key (&setop->output_rfn),
1030 debug_str_task_key (&task->key));
1034 case GNUNET_SET_STATUS_DONE:
1035 // XXX: check first if any changes to the underlying
1036 // set are still pending
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "Finishing setop in Task {%s}\n",
1039 debug_str_task_key (&task->key));
1040 if (NULL != output_rfn)
1042 rfn_commit (output_rfn, task_other_peer (task));
1046 case GNUNET_SET_STATUS_FAILURE:
1048 GNUNET_break_op (0);
1067 get_evilness (struct ConsensusSession *session, enum Evilness *ret_type, unsigned int *ret_num)
1071 char *evil_type_str = NULL;
1073 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1075 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076 "P%u: no evilness\n",
1077 session->local_peer_idx);
1078 *ret_type = EVILNESS_NONE;
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082 "P%u: got evilness spec\n",
1083 session->local_peer_idx);
1085 for (field = strtok (evil_spec, "/");
1087 field = strtok (NULL, "/"))
1089 unsigned int peer_num;
1090 unsigned int evil_num;
1093 evil_type_str = NULL;
1095 ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str, &evil_num);
1099 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC, behaving like a good peer.\n",
1104 GNUNET_assert (NULL != evil_type_str);
1106 if (peer_num == session->local_peer_idx)
1108 if (0 == strcmp ("slack", evil_type_str))
1109 *ret_type = EVILNESS_SLACK;
1110 else if (0 == strcmp ("cram", evil_type_str))
1112 *ret_type = EVILNESS_CRAM;
1113 *ret_num = evil_num;
1117 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n");
1122 /* No GNUNET_free since memory was allocated by libc */
1123 free (evil_type_str);
1124 evil_type_str = NULL;
1127 *ret_type = EVILNESS_NONE;
1129 GNUNET_free (evil_spec);
1130 if (NULL != evil_type_str)
1131 free (evil_type_str);
1138 * Commit the appropriate set for a
1142 commit_set (struct ConsensusSession *session,
1143 struct TaskEntry *task)
1145 struct SetEntry *set;
1146 struct SetOpCls *setop = &task->cls.setop;
1148 GNUNET_assert (NULL != setop->op);
1149 set = lookup_set (session, &setop->input_set);
1150 GNUNET_assert (NULL != set);
1155 unsigned int evil_num;
1156 enum Evilness evilness;
1158 get_evilness (session, &evilness, &evil_num);
1162 /* We're not cramming elements in the
1163 all-to-all round, since that would just
1164 add more elements to the result set, but
1165 wouldn't test robustness. */
1166 if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1168 GNUNET_SET_commit (setop->op, set->h);
1171 for (i = 0; i < evil_num; i++)
1173 struct GNUNET_HashCode hash;
1174 struct GNUNET_SET_Element element;
1175 element.data = &hash;
1176 element.size = sizeof (struct GNUNET_HashCode);
1177 element.element_type = 0;
1179 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
1180 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1181 #ifdef GNUNET_EXTRA_LOGGING
1182 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1183 "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1184 session->local_peer_idx,
1185 debug_str_element (&element),
1186 debug_str_set_key (&setop->input_set),
1187 debug_str_task_key (&task->key));
1190 GNUNET_SET_commit (setop->op, set->h);
1192 case EVILNESS_SLACK:
1193 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1194 "P%u: evil peer: slacking\n",
1195 session->local_peer_idx,
1200 GNUNET_SET_commit (setop->op, set->h);
1205 if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1207 struct GNUNET_SET_Element element;
1208 struct ContestedPayload payload;
1209 element.data = &payload;
1210 element.size = sizeof (struct ContestedPayload);
1211 element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
1212 GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1214 if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1216 GNUNET_SET_commit (setop->op, set->h);
1220 /* For our testcases, we don't want the blacklisted
1222 GNUNET_SET_operation_cancel (setop->op);
1230 put_diff (struct ConsensusSession *session,
1231 struct DiffEntry *diff)
1233 struct GNUNET_HashCode hash;
1235 GNUNET_assert (NULL != diff);
1237 GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1238 GNUNET_assert (GNUNET_OK ==
1239 GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1240 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1244 put_set (struct ConsensusSession *session,
1245 struct SetEntry *set)
1247 struct GNUNET_HashCode hash;
1249 GNUNET_assert (NULL != set->h);
1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1253 debug_str_set_key (&set->key));
1255 GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1256 GNUNET_assert (GNUNET_OK ==
1257 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1263 put_rfn (struct ConsensusSession *session,
1264 struct ReferendumEntry *rfn)
1266 struct GNUNET_HashCode hash;
1268 GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1269 GNUNET_assert (GNUNET_OK ==
1270 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1271 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1277 task_cancel_reconcile (struct TaskEntry *task)
1279 /* not implemented yet */
1285 apply_diff_to_rfn (struct DiffEntry *diff,
1286 struct ReferendumEntry *rfn,
1287 uint16_t voting_peer,
1290 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1291 struct DiffElementInfo *di;
1293 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
1295 while (GNUNET_YES ==
1296 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1298 (const void **) &di))
1302 rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1306 rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1310 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1317 struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1319 d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1326 diff_compose (struct DiffEntry *diff_1,
1327 struct DiffEntry *diff_2)
1329 struct DiffEntry *diff_new;
1330 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1331 struct DiffElementInfo *di;
1333 diff_new = diff_create ();
1335 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
1336 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1338 diff_insert (diff_new, di->weight, di->element);
1340 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1342 iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
1343 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1345 diff_insert (diff_new, di->weight, di->element);
1347 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1353 struct ReferendumEntry *
1354 rfn_create (uint16_t size)
1356 struct ReferendumEntry *rfn;
1358 rfn = GNUNET_new (struct ReferendumEntry);
1359 rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
1360 rfn->peer_commited = GNUNET_new_array (size, int);
1361 rfn->peer_contested = GNUNET_new_array (size, int);
1362 rfn->num_peers = size;
1369 diff_destroy (struct DiffEntry *diff)
1371 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1377 rfn_majority (const struct ReferendumEntry *rfn,
1378 const struct RfnElementInfo *ri,
1379 uint16_t *ret_majority,
1380 enum ReferendumVote *ret_vote)
1382 uint16_t votes_yes = 0;
1383 uint16_t num_commited = 0;
1386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387 "Computing rfn majority for element %s of rfn {%s}\n",
1388 debug_str_element (ri->element),
1389 debug_str_rfn_key (&rfn->key));
1391 for (i = 0; i < rfn->num_peers; i++)
1393 if (GNUNET_NO == rfn->peer_commited[i])
1397 if (GNUNET_YES == ri->votes[i])
1401 if (votes_yes > (num_commited) / 2)
1403 *ret_vote = ri->proposal;
1404 *ret_majority = votes_yes;
1408 *ret_vote = VOTE_STAY;
1409 *ret_majority = num_commited - votes_yes;
1416 struct TaskEntry *task;
1417 struct SetKey dst_set_key;
1422 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1424 struct SetCopyCls *scc = cls;
1425 struct TaskEntry *task = scc->task;
1426 struct SetKey dst_set_key = scc->dst_set_key;
1427 struct SetEntry *set;
1430 set = GNUNET_new (struct SetEntry);
1432 set->key = dst_set_key;
1433 put_set (task->step->session, set);
1440 * Call the start function of the given
1441 * task again after we created a copy of the given set.
1444 create_set_copy_for_task (struct TaskEntry *task,
1445 struct SetKey *src_set_key,
1446 struct SetKey *dst_set_key)
1448 struct SetEntry *src_set;
1449 struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452 "Copying set {%s} to {%s} for task {%s}\n",
1453 debug_str_set_key (src_set_key),
1454 debug_str_set_key (dst_set_key),
1455 debug_str_task_key (&task->key));
1458 scc->dst_set_key = *dst_set_key;
1459 src_set = lookup_set (task->step->session, src_set_key);
1460 GNUNET_assert (NULL != src_set);
1461 GNUNET_SET_copy_lazy (src_set->h,
1467 struct SetMutationProgressCls
1471 * Task to finish once all changes are through.
1473 struct TaskEntry *task;
1478 set_mutation_done (void *cls)
1480 struct SetMutationProgressCls *pc = cls;
1482 GNUNET_assert (pc->num_pending > 0);
1486 if (0 == pc->num_pending)
1488 struct TaskEntry *task = pc->task;
1495 task_start_apply_round (struct TaskEntry *task)
1497 struct ConsensusSession *session = task->step->session;
1498 struct SetKey sk_in;
1499 struct SetKey sk_out;
1500 struct RfnKey rk_in;
1501 struct SetEntry *set_out;
1502 struct ReferendumEntry *rfn_in;
1503 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1504 struct RfnElementInfo *ri;
1505 struct SetMutationProgressCls *progress_cls;
1507 sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1508 rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1509 sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1511 set_out = lookup_set (session, &sk_out);
1512 if (NULL == set_out)
1514 create_set_copy_for_task (task, &sk_in, &sk_out);
1518 rfn_in = lookup_rfn (session, &rk_in);
1519 GNUNET_assert (NULL != rfn_in);
1521 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1522 progress_cls->task = task;
1524 iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
1526 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1528 uint16_t majority_num;
1529 enum ReferendumVote majority_vote;
1531 rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1533 switch (majority_vote)
1536 progress_cls->num_pending++;
1537 GNUNET_assert (GNUNET_OK ==
1538 GNUNET_SET_add_element (set_out->h,
1544 progress_cls->num_pending++;
1545 GNUNET_assert (GNUNET_OK ==
1546 GNUNET_SET_remove_element (set_out->h,
1560 if (progress_cls->num_pending == 0)
1562 // call closure right now, no pending ops
1563 GNUNET_free (progress_cls);
1569 #define THRESH(s) (((s)->num_peers / 3))
1573 task_start_grade (struct TaskEntry *task)
1575 struct ConsensusSession *session = task->step->session;
1576 struct ReferendumEntry *output_rfn;
1577 struct ReferendumEntry *input_rfn;
1578 struct DiffEntry *input_diff;
1579 struct RfnKey rfn_key;
1580 struct DiffKey diff_key;
1581 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1582 struct RfnElementInfo *ri;
1583 unsigned int gradecast_confidence = 2;
1585 rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1586 output_rfn = lookup_rfn (session, &rfn_key);
1587 if (NULL == output_rfn)
1589 output_rfn = rfn_create (session->num_peers);
1590 output_rfn->key = rfn_key;
1591 put_rfn (session, output_rfn);
1594 diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1595 input_diff = lookup_diff (session, &diff_key);
1596 GNUNET_assert (NULL != input_diff);
1598 rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1599 input_rfn = lookup_rfn (session, &rfn_key);
1600 GNUNET_assert (NULL != input_rfn);
1602 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1604 apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
1606 while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
1608 uint16_t majority_num;
1609 enum ReferendumVote majority_vote;
1611 // XXX: we need contested votes and non-contested votes here
1612 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1614 if (majority_num < (session->num_peers / 3) * 2)
1616 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1618 if (majority_num < (session->num_peers / 3) + 1)
1620 gradecast_confidence = 0;
1623 switch (majority_vote)
1628 rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
1631 rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
1640 uint16_t noncontested;
1641 noncontested = rfn_noncontested (input_rfn);
1642 if (noncontested < (session->num_peers / 3) * 2)
1644 gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
1646 if (noncontested < (session->num_peers / 3) + 1)
1648 gradecast_confidence = 0;
1652 if (gradecast_confidence >= 1)
1653 rfn_commit (output_rfn, task->key.leader);
1655 if (gradecast_confidence <= 1)
1656 session->peers_blacklisted[task->key.leader] = GNUNET_YES;
1663 task_start_reconcile (struct TaskEntry *task)
1665 struct SetEntry *input;
1666 struct SetOpCls *setop = &task->cls.setop;
1667 struct ConsensusSession *session = task->step->session;
1669 input = lookup_set (session, &setop->input_set);
1670 GNUNET_assert (NULL != input);
1671 GNUNET_assert (NULL != input->h);
1673 /* We create the outputs for the operation here
1674 (rather than in the set operation callback)
1675 because we want something valid in there, even
1676 if the other peer doesn't talk to us */
1678 if (SET_KIND_NONE != setop->output_set.set_kind)
1680 /* If we don't have an existing output set,
1681 we clone the input set. */
1682 if (NULL == lookup_set (session, &setop->output_set))
1684 create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
1689 if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1691 if (NULL == lookup_rfn (session, &setop->output_rfn))
1693 struct ReferendumEntry *rfn;
1695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696 "P%u: output rfn <%s> missing, creating.\n",
1697 session->local_peer_idx,
1698 debug_str_rfn_key (&setop->output_rfn));
1700 rfn = rfn_create (session->num_peers);
1701 rfn->key = setop->output_rfn;
1702 put_rfn (session, rfn);
1706 if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1708 if (NULL == lookup_diff (session, &setop->output_diff))
1710 struct DiffEntry *diff;
1712 diff = diff_create ();
1713 diff->key = setop->output_diff;
1714 put_diff (session, diff);
1718 if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
1720 /* XXX: mark the corresponding rfn as commited if necessary */
1725 if (task->key.peer1 == session->local_peer_idx)
1727 struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
1729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1730 "P%u: Looking up set {%s} to run remote union\n",
1731 session->local_peer_idx,
1732 debug_str_set_key (&setop->input_set));
1734 rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
1735 rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
1737 rcm.kind = htons (task->key.kind);
1738 rcm.peer1 = htons (task->key.peer1);
1739 rcm.peer2 = htons (task->key.peer2);
1740 rcm.leader = htons (task->key.leader);
1741 rcm.repetition = htons (task->key.repetition);
1743 GNUNET_assert (NULL == setop->op);
1744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
1745 session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
1747 // XXX: maybe this should be done while
1748 // setting up tasks alreays?
1749 setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
1750 &session->global_id,
1752 GNUNET_SET_RESULT_SYMMETRIC,
1756 if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h))
1763 else if (task->key.peer2 == session->local_peer_idx)
1765 /* Wait for the other peer to contact us */
1766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
1767 session->local_peer_idx, task->key.peer1);
1769 if (NULL != setop->op)
1771 commit_set (session, task);
1776 /* We made an error while constructing the task graph. */
1783 task_start_eval_echo (struct TaskEntry *task)
1785 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
1786 struct ReferendumEntry *input_rfn;
1787 struct RfnElementInfo *ri;
1788 struct SetEntry *output_set;
1789 struct SetMutationProgressCls *progress_cls;
1790 struct ConsensusSession *session = task->step->session;
1791 struct SetKey sk_in;
1792 struct SetKey sk_out;
1793 struct RfnKey rk_in;
1795 sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
1796 sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
1797 output_set = lookup_set (session, &sk_out);
1798 if (NULL == output_set)
1800 create_set_copy_for_task (task, &sk_in, &sk_out);
1804 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1805 "Evaluating referendum in Task {%s}\n",
1806 debug_str_task_key (&task->key));
1808 progress_cls = GNUNET_new (struct SetMutationProgressCls);
1809 progress_cls->task = task;
1811 rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
1812 input_rfn = lookup_rfn (session, &rk_in);
1814 GNUNET_assert (NULL != input_rfn);
1816 iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
1817 GNUNET_assert (NULL != iter);
1819 while (GNUNET_YES ==
1820 GNUNET_CONTAINER_multihashmap_iterator_next (iter,
1822 (const void **) &ri))
1824 enum ReferendumVote majority_vote;
1825 uint16_t majority_num;
1827 rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
1829 if (majority_num < session->num_peers / 3)
1831 /* It is not the case that all nonfaulty peers
1832 echoed the same value. Since we're doing a set reconciliation, we
1833 can't simply send "nothing" for the value. Thus we mark our 'confirm'
1834 reconciliation as contested. Other peers might not know that the
1835 leader is faulty, thus we still re-distribute in the confirmation
1837 output_set->is_contested = GNUNET_YES;
1840 switch (majority_vote)
1843 progress_cls->num_pending++;
1844 GNUNET_assert (GNUNET_OK ==
1845 GNUNET_SET_add_element (output_set->h,
1851 progress_cls->num_pending++;
1852 GNUNET_assert (GNUNET_OK ==
1853 GNUNET_SET_remove_element (output_set->h,
1859 /* Nothing to do. */
1867 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
1869 if (progress_cls->num_pending == 0)
1871 // call closure right now, no pending ops
1872 GNUNET_free (progress_cls);
1879 task_start_finish (struct TaskEntry *task)
1881 struct SetEntry *final_set;
1882 struct ConsensusSession *session = task->step->session;
1884 final_set = lookup_set (session, &task->cls.finish.input_set);
1886 GNUNET_assert (NULL != final_set);
1889 GNUNET_SET_iterate (final_set->h,
1890 send_to_client_iter,
1895 start_task (struct ConsensusSession *session, struct TaskEntry *task)
1897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
1899 GNUNET_assert (GNUNET_NO == task->is_started);
1900 GNUNET_assert (GNUNET_NO == task->is_finished);
1901 GNUNET_assert (NULL != task->start);
1905 task->is_started = GNUNET_YES;
1909 static void finish_step (struct Step *step)
1913 GNUNET_assert (step->finished_tasks == step->tasks_len);
1914 GNUNET_assert (GNUNET_YES == step->is_running);
1915 GNUNET_assert (GNUNET_NO == step->is_finished);
1917 #ifdef GNUNET_EXTRA_LOGGING
1918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919 "All tasks of step `%s' with %u subordinates finished.\n",
1921 step->subordinates_len);
1924 for (i = 0; i < step->subordinates_len; i++)
1926 GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1927 step->subordinates[i]->pending_prereq--;
1928 #ifdef GNUNET_EXTRA_LOGGING
1929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1930 "Decreased pending_prereq to %u for step `%s'.\n",
1931 step->subordinates[i]->pending_prereq,
1932 step->subordinates[i]->debug_name);
1937 step->is_finished = GNUNET_YES;
1939 // XXX: maybe schedule as task to avoid recursion?
1940 run_ready_steps (step->session);
1945 * Run all steps of the session that don't any
1946 * more dependencies.
1949 run_ready_steps (struct ConsensusSession *session)
1953 step = session->steps_head;
1955 while (NULL != step)
1957 if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) )
1961 GNUNET_assert (0 == step->finished_tasks);
1963 #ifdef GNUNET_EXTRA_LOGGING
1964 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
1965 session->local_peer_idx,
1967 step->round, step->tasks_len, step->subordinates_len);
1970 step->is_running = GNUNET_YES;
1971 for (i = 0; i < step->tasks_len; i++)
1972 start_task (session, step->tasks[i]);
1974 /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
1975 if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
1978 /* Running the next ready steps will be triggered by task completion */
1990 finish_task (struct TaskEntry *task)
1992 GNUNET_assert (GNUNET_NO == task->is_finished);
1993 task->is_finished = GNUNET_YES;
1995 task->step->finished_tasks++;
1997 if (task->step->finished_tasks == task->step->tasks_len)
1998 finish_step (task->step);
2003 * Search peer in the list of peers in session.
2005 * @param peer peer to find
2006 * @param session session with peer
2007 * @return index of peer, -1 if peer is not in session
2010 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2013 for (i = 0; i < session->num_peers; i++)
2014 if (0 == memcmp (peer, &session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
2021 * Compute a global, (hopefully) unique consensus session id,
2022 * from the local id of the consensus session, and the identities of all participants.
2023 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
2024 * exactly the same peers, the global id will be different.
2026 * @param session session to generate the global id for
2027 * @param local_session_id local id of the consensus session
2030 compute_global_id (struct ConsensusSession *session,
2031 const struct GNUNET_HashCode *local_session_id)
2033 const char *salt = "gnunet-service-consensus/session_id";
2035 GNUNET_assert (GNUNET_YES ==
2036 GNUNET_CRYPTO_kdf (&session->global_id,
2037 sizeof (struct GNUNET_HashCode),
2041 session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2043 sizeof (struct GNUNET_HashCode),
2049 * Compare two peer identities.
2051 * @param h1 some peer identity
2052 * @param h2 some peer identity
2053 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
2056 peer_id_cmp (const void *h1, const void *h2)
2058 return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2063 * Create the sorted list of peers for the session,
2064 * add the local peer if not in the join message.
2067 initialize_session_peer_list (struct ConsensusSession *session,
2068 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2070 unsigned int local_peer_in_list;
2071 uint32_t listed_peers;
2072 const struct GNUNET_PeerIdentity *msg_peers;
2075 GNUNET_assert (NULL != join_msg);
2077 /* peers in the join message, may or may not include the local peer */
2078 listed_peers = ntohl (join_msg->num_peers);
2080 session->num_peers = listed_peers;
2082 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1];
2084 local_peer_in_list = GNUNET_NO;
2085 for (i = 0; i < listed_peers; i++)
2087 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity)))
2089 local_peer_in_list = GNUNET_YES;
2094 if (GNUNET_NO == local_peer_in_list)
2095 session->num_peers++;
2097 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
2099 if (GNUNET_NO == local_peer_in_list)
2100 session->peers[session->num_peers - 1] = my_peer;
2102 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
2103 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp);
2107 static struct TaskEntry *
2108 lookup_task (struct ConsensusSession *session,
2109 struct TaskKey *key)
2111 struct GNUNET_HashCode hash;
2114 GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2116 GNUNET_h2s (&hash));
2117 return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2122 * Called when another peer wants to do a set operation with the
2125 * @param cls closure
2126 * @param other_peer the other peer
2127 * @param context_msg message with application specific information from
2129 * @param request request from the other peer, use GNUNET_SET_accept
2130 * to accept it, otherwise the request will be refused
2131 * Note that we don't use a return value here, as it is also
2132 * necessary to specify the set we want to do the operation with,
2133 * whith sometimes can be derived from the context message.
2134 * Also necessary to specify the timeout.
2137 set_listen_cb (void *cls,
2138 const struct GNUNET_PeerIdentity *other_peer,
2139 const struct GNUNET_MessageHeader *context_msg,
2140 struct GNUNET_SET_Request *request)
2142 struct ConsensusSession *session = cls;
2144 struct TaskEntry *task;
2145 struct GNUNET_CONSENSUS_RoundContextMessage *cm;
2147 if (NULL == context_msg)
2149 GNUNET_break_op (0);
2153 if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2155 GNUNET_break_op (0);
2159 if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2161 GNUNET_break_op (0);
2165 cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2167 tk = ((struct TaskKey) {
2168 .kind = ntohs (cm->kind),
2169 .peer1 = ntohs (cm->peer1),
2170 .peer2 = ntohs (cm->peer2),
2171 .repetition = ntohs (cm->repetition),
2172 .leader = ntohs (cm->leader),
2175 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2176 session->local_peer_idx, debug_str_task_key (&tk));
2178 task = lookup_task (session, &tk);
2182 GNUNET_break_op (0);
2186 if (GNUNET_YES == task->is_finished)
2188 GNUNET_break_op (0);
2192 if (task->key.peer2 != session->local_peer_idx)
2194 /* We're being asked, so we must be thne 2nd peer. */
2195 GNUNET_break_op (0);
2199 GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2200 (task->key.peer2 == session->local_peer_idx)));
2202 task->cls.setop.op = GNUNET_SET_accept (request,
2203 GNUNET_SET_RESULT_SYMMETRIC,
2207 /* If the task hasn't been started yet,
2208 we wait for that until we commit. */
2210 if (GNUNET_YES == task->is_started)
2212 commit_set (session, task);
2219 put_task (struct GNUNET_CONTAINER_MultiHashMap *taskmap,
2220 struct TaskEntry *t)
2222 struct GNUNET_HashCode round_hash;
2225 GNUNET_assert (NULL != t->step);
2227 t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2231 if (s->tasks_len == s->tasks_cap)
2233 unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2234 GNUNET_array_grow (s->tasks,
2239 #ifdef GNUNET_EXTRA_LOGGING
2240 GNUNET_assert (NULL != s->debug_name);
2241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2242 debug_str_task_key (&t->key),
2246 s->tasks[s->tasks_len] = t;
2249 GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2250 GNUNET_assert (GNUNET_OK ==
2251 GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2252 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2257 install_step_timeouts (struct ConsensusSession *session)
2259 /* Given the fully constructed task graph
2260 with rounds for tasks, we can give the tasks timeouts. */
2262 /* XXX: implement! */
2268 * Arrange two peers in some canonical order.
2271 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2276 GNUNET_assert (*p1 < n);
2277 GNUNET_assert (*p2 < n);
2290 /* For uniformly random *p1, *p2,
2291 this condition is true with 50% chance */
2292 if (((b - a) + n) % n <= n / 2)
2306 * Record @a dep as a dependency of @step.
2309 step_depend_on (struct Step *step, struct Step *dep)
2311 /* We're not checking for cyclic dependencies,
2312 but this is a cheap sanity check. */
2313 GNUNET_assert (step != dep);
2314 GNUNET_assert (NULL != step);
2315 GNUNET_assert (NULL != dep);
2316 GNUNET_assert (dep->round <= step->round);
2318 #ifdef GNUNET_EXTRA_LOGGING
2319 /* Make sure we have complete debugging information.
2320 Also checks that we don't screw up too badly
2321 constructing the task graph. */
2322 GNUNET_assert (NULL != step->debug_name);
2323 GNUNET_assert (NULL != dep->debug_name);
2324 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2325 "Making step `%s' depend on `%s'\n",
2330 if (dep->subordinates_cap == dep->subordinates_len)
2332 unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2333 GNUNET_array_grow (dep->subordinates,
2334 dep->subordinates_cap,
2338 GNUNET_assert (dep->subordinates_len <= dep->subordinates_cap);
2340 dep->subordinates[dep->subordinates_len] = step;
2341 dep->subordinates_len++;
2343 step->pending_prereq++;
2347 static struct Step *
2348 create_step (struct ConsensusSession *session, int round)
2351 step = GNUNET_new (struct Step);
2352 step->session = session;
2353 step->round = round;
2354 GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
2355 session->steps_tail,
2362 * Construct the task graph for a single
2366 construct_task_graph_gradecast (struct ConsensusSession *session,
2369 struct Step *step_before,
2370 struct Step *step_after)
2372 uint16_t n = session->num_peers;
2373 uint16_t me = session->local_peer_idx;
2378 /* The task we're currently setting up. */
2379 struct TaskEntry task;
2382 struct Step *prev_step;
2388 round = step_before->round + 1;
2390 /* gcast step 1: leader disseminates */
2392 step = create_step (session, round);
2394 #ifdef GNUNET_EXTRA_LOGGING
2395 GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2397 step_depend_on (step, step_before);
2401 for (k = 0; k < n; k++)
2407 arrange_peers (&p1, &p2, n);
2408 task = ((struct TaskEntry) {
2410 .start = task_start_reconcile,
2411 .cancel = task_cancel_reconcile,
2412 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2414 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2415 put_task (session->taskmap, &task);
2417 /* We run this task to make sure that the leader
2418 has the stored the SET_KIND_LEADER set of himself,
2419 so he can participate in the rest of the gradecast
2420 without the code having to handle any special cases. */
2421 task = ((struct TaskEntry) {
2423 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2424 .start = task_start_reconcile,
2425 .cancel = task_cancel_reconcile,
2427 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2428 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2429 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2430 put_task (session->taskmap, &task);
2436 arrange_peers (&p1, &p2, n);
2437 task = ((struct TaskEntry) {
2439 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
2440 .start = task_start_reconcile,
2441 .cancel = task_cancel_reconcile,
2443 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2444 task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2445 task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2446 put_task (session->taskmap, &task);
2449 /* gcast phase 2: echo */
2452 step = create_step (session, round);
2453 #ifdef GNUNET_EXTRA_LOGGING
2454 GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2456 step_depend_on (step, prev_step);
2458 for (k = 0; k < n; k++)
2462 arrange_peers (&p1, &p2, n);
2463 task = ((struct TaskEntry) {
2465 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2466 .start = task_start_reconcile,
2467 .cancel = task_cancel_reconcile,
2469 task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2470 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2471 put_task (session->taskmap, &task);
2475 /* Same round, since step only has local tasks */
2476 step = create_step (session, round);
2477 #ifdef GNUNET_EXTRA_LOGGING
2478 GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2480 step_depend_on (step, prev_step);
2482 arrange_peers (&p1, &p2, n);
2483 task = ((struct TaskEntry) {
2484 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2486 .start = task_start_eval_echo
2488 put_task (session->taskmap, &task);
2492 step = create_step (session, round);
2493 #ifdef GNUNET_EXTRA_LOGGING
2494 GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2496 step_depend_on (step, prev_step);
2498 /* gcast phase 3: confirmation and grading */
2499 for (k = 0; k < n; k++)
2503 arrange_peers (&p1, &p2, n);
2504 task = ((struct TaskEntry) {
2506 .start = task_start_reconcile,
2507 .cancel = task_cancel_reconcile,
2508 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2510 task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2511 task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2512 /* If there was at least one element in the echo round that was
2513 contested (i.e. it had no n-t majority), then we let the other peers
2514 know, and other peers let us know. The contested flag for each peer is
2515 stored in the rfn. */
2516 task.cls.setop.transceive_contested = GNUNET_YES;
2517 put_task (session->taskmap, &task);
2521 /* Same round, since step only has local tasks */
2522 step = create_step (session, round);
2523 #ifdef GNUNET_EXTRA_LOGGING
2524 GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2526 step_depend_on (step, prev_step);
2528 task = ((struct TaskEntry) {
2530 .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2531 .start = task_start_grade,
2533 put_task (session->taskmap, &task);
2535 step_depend_on (step_after, step);
2540 construct_task_graph (struct ConsensusSession *session)
2542 uint16_t n = session->num_peers;
2545 uint16_t me = session->local_peer_idx;
2550 /* The task we're currently setting up. */
2551 struct TaskEntry task;
2553 /* Current leader */
2557 struct Step *prev_step;
2559 unsigned int round = 0;
2563 // XXX: introduce first step,
2564 // where we wait for all insert acks
2565 // from the set service
2567 /* faster but brittle all-to-all */
2569 // XXX: Not implemented yet
2571 /* all-to-all step */
2573 step = create_step (session, round);
2575 #ifdef GNUNET_EXTRA_LOGGING
2576 step->debug_name = GNUNET_strdup ("all to all");
2579 for (i = 0; i < n; i++)
2583 arrange_peers (&p1, &p2, n);
2584 task = ((struct TaskEntry) {
2585 .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2587 .start = task_start_reconcile,
2588 .cancel = task_cancel_reconcile,
2590 task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2591 task.cls.setop.output_set = task.cls.setop.input_set;
2592 task.cls.setop.do_not_remove = GNUNET_YES;
2593 put_task (session->taskmap, &task);
2601 /* Byzantine union */
2603 /* sequential repetitions of the gradecasts */
2604 for (i = 0; i < t + 1; i++)
2606 struct Step *step_rep_start;
2607 struct Step *step_rep_end;
2609 /* Every repetition is in a separate round. */
2610 step_rep_start = create_step (session, round);
2611 #ifdef GNUNET_EXTRA_LOGGING
2612 GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
2615 step_depend_on (step_rep_start, prev_step);
2617 /* gradecast has three rounds */
2619 step_rep_end = create_step (session, round);
2620 #ifdef GNUNET_EXTRA_LOGGING
2621 GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
2624 /* parallel gradecasts */
2625 for (lead = 0; lead < n; lead++)
2626 construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
2628 task = ((struct TaskEntry) {
2629 .step = step_rep_end,
2630 .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
2631 .start = task_start_apply_round,
2633 put_task (session->taskmap, &task);
2635 prev_step = step_rep_end;
2638 /* There is no next gradecast round, thus the final
2639 start step is the overall end step of the gradecasts */
2641 step = create_step (session, round);
2642 #ifdef GNUNET_EXTRA_LOGGING
2643 GNUNET_asprintf (&step->debug_name, "finish");
2645 step_depend_on (step, prev_step);
2647 task = ((struct TaskEntry) {
2649 .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
2650 .start = task_start_finish,
2652 task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
2654 put_task (session->taskmap, &task);
2659 * Initialize the session, continue receiving messages from the owning client
2661 * @param session the session to initialize
2662 * @param join_msg the join message from the client
2665 initialize_session (struct ConsensusSession *session,
2666 struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2668 struct ConsensusSession *other_session;
2670 initialize_session_peer_list (session, join_msg);
2671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers);
2672 compute_global_id (session, &join_msg->session_id);
2674 /* Check if some local client already owns the session.
2675 It is only legal to have a session with an existing global id
2676 if all other sessions with this global id are finished.*/
2677 other_session = sessions_head;
2678 while (NULL != other_session)
2680 if ((other_session != session) &&
2681 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id)))
2683 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2685 // GNUNET_break (0);
2686 // destroy_session (session);
2691 other_session = other_session->next;
2694 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline);
2695 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start);
2697 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %ums created\n",
2698 (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000);
2700 session->local_peer_idx = get_peer_idx (&my_peer, session);
2701 GNUNET_assert (-1 != session->local_peer_idx);
2702 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2703 &session->global_id,
2704 set_listen_cb, session);
2705 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2707 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2708 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2709 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2710 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2713 struct SetEntry *client_set;
2714 client_set = GNUNET_new (struct SetEntry);
2715 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
2716 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2717 put_set (session, client_set);
2720 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
2722 /* Just construct the task graph,
2723 but don't run anything until the client calls conclude. */
2724 construct_task_graph (session);
2726 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2730 static struct ConsensusSession *
2731 get_session_by_client (struct GNUNET_SERVER_Client *client)
2733 struct ConsensusSession *session;
2735 session = sessions_head;
2736 while (NULL != session)
2738 if (session->client == client)
2740 session = session->next;
2747 * Called when a client wants to join a consensus session.
2750 * @param client client that sent the message
2751 * @param m message sent by the client
2754 client_join (void *cls,
2755 struct GNUNET_SERVER_Client *client,
2756 const struct GNUNET_MessageHeader *m)
2758 struct ConsensusSession *session;
2760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
2762 session = get_session_by_client (client);
2763 if (NULL != session)
2766 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2769 session = GNUNET_new (struct ConsensusSession);
2770 session->client = client;
2771 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
2772 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
2773 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
2774 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2776 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
2781 client_insert_done (void *cls)
2788 * Called when a client performs an insert operation.
2790 * @param cls (unused)
2791 * @param client client handle
2792 * @param m message sent by the client
2795 client_insert (void *cls,
2796 struct GNUNET_SERVER_Client *client,
2797 const struct GNUNET_MessageHeader *m)
2799 struct ConsensusSession *session;
2800 struct GNUNET_CONSENSUS_ElementMessage *msg;
2801 struct GNUNET_SET_Element *element;
2802 ssize_t element_size;
2803 struct GNUNET_SET_Handle *initial_set;
2805 session = get_session_by_client (client);
2807 if (NULL == session)
2810 GNUNET_SERVER_client_disconnect (client);
2814 if (GNUNET_YES == session->conclude_started)
2817 GNUNET_SERVER_client_disconnect (client);
2821 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
2822 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
2823 if (element_size < 0)
2829 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
2830 element->element_type = msg->element_type;
2831 element->size = element_size;
2832 memcpy (&element[1], &msg[1], element_size);
2833 element->data = &element[1];
2835 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
2836 struct SetEntry *entry;
2837 entry = lookup_set (session, &key);
2838 GNUNET_assert (NULL != entry);
2839 initial_set = entry->h;
2841 session->num_client_insert_pending++;
2842 GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
2844 #ifdef GNUNET_EXTRA_LOGGING
2846 struct GNUNET_HashCode hash;
2848 GNUNET_SET_element_hash (element, &hash);
2850 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
2851 session->local_peer_idx,
2852 GNUNET_h2s (&hash));
2856 GNUNET_free (element);
2857 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2862 * Called when a client performs the conclude operation.
2864 * @param cls (unused)
2865 * @param client client handle
2866 * @param message message sent by the client
2869 client_conclude (void *cls,
2870 struct GNUNET_SERVER_Client *client,
2871 const struct GNUNET_MessageHeader *message)
2873 struct ConsensusSession *session;
2875 session = get_session_by_client (client);
2876 if (NULL == session)
2878 /* client not found */
2880 GNUNET_SERVER_client_disconnect (client);
2884 if (GNUNET_YES == session->conclude_started)
2886 /* conclude started twice */
2888 GNUNET_SERVER_client_disconnect (client);
2889 destroy_session (session);
2893 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n");
2895 session->conclude_started = GNUNET_YES;
2897 install_step_timeouts (session);
2898 run_ready_steps (session);
2901 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2906 * Called to clean up, after a shutdown has been requested.
2908 * @param cls closure
2909 * @param tc context information (why was this task triggered now)
2912 shutdown_task (void *cls,
2913 const struct GNUNET_SCHEDULER_TaskContext *tc)
2915 while (NULL != sessions_head)
2916 destroy_session (sessions_head);
2918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
2923 * Clean up after a client after it is
2924 * disconnected (either by us or by itself)
2926 * @param cls closure, unused
2927 * @param client the client to clean up after
2930 handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
2932 struct ConsensusSession *session;
2934 session = get_session_by_client (client);
2935 if (NULL == session)
2937 // FIXME: destroy if we can
2943 * Start processing consensus requests.
2945 * @param cls closure
2946 * @param server the initialized server
2947 * @param c configuration to use
2950 run (void *cls, struct GNUNET_SERVER_Handle *server,
2951 const struct GNUNET_CONFIGURATION_Handle *c)
2953 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2954 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
2955 sizeof (struct GNUNET_MessageHeader)},
2956 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
2957 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
2963 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
2965 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
2967 GNUNET_SCHEDULER_shutdown ();
2970 GNUNET_SERVER_add_handlers (server, server_handlers);
2971 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
2972 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
2973 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
2978 * The main function for the consensus service.
2980 * @param argc number of arguments from the command line
2981 * @param argv command line arguments
2982 * @return 0 ok, 1 on error
2985 main (int argc, char *const *argv)
2988 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
2989 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
2990 return (GNUNET_OK == ret) ? 0 : 1;