- fix use of uninitialized memory
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
index 424be7a7176f88686b4e59ce6175dae42cffdf3e..6cd0244b5679e29054cab610df07d3d099631d9a 100644 (file)
 #include "consensus.h"
 
 
+#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
+
+
+enum ReferendumVote
+{
+  /**
+   * Vote that nothing should change.
+   * This option is never voted explicitly.
+   */
+  VOTE_STAY = 0,
+  /**
+   * Vote that an element should be added.
+   */
+  VOTE_ADD = 1,
+  /**
+   * Vote that an element should be removed.
+   */
+  VOTE_REMOVE = 2,
+};
+
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
+
+struct ContestedPayload
+{
+};
+
 /**
  * Tuple of integers that together
  * identify a task uniquely.
@@ -72,14 +97,6 @@ struct TaskKey {
 };
 
 
-enum ReferendumVote
-{
-  VOTE_NONE = 0,
-  VOTE_ADD = 1,
-  VOTE_REMOVE = 2,
-  VOTE_CONTESTED = 3
-};
-
 
 struct SetKey
 {
@@ -127,41 +144,28 @@ enum PhaseKind
   PHASE_KIND_GRADECAST_ECHO_GRADE,
   PHASE_KIND_GRADECAST_CONFIRM,
   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
-  PHASE_KIND_GRADECAST_APPLY_RESULT,
-  PHASE_KIND_FINISH,
-};
-
-
-enum ActionType
-{
-  /**
-   * Do a set reconciliation with another peer (or via looback).
-   */
-  ACTION_RECONCILE,
   /**
-   * Apply a referendum with a threshold
-   * to a set and/or a diff.
+   * Apply a repetition of the all-to-all
+   * gradecast to the current set.
    */
-  ACTION_EVAL_RFN,
-  /**
-   * Apply a diff to a set.
-   */
-  ACTION_APPLY_DIFF,
-  ACTION_FINISH,
+  PHASE_KIND_APPLY_REP,
+  PHASE_KIND_FINISH,
 };
 
+
 enum SetKind
 {
   SET_KIND_NONE = 0,
   SET_KIND_CURRENT,
-  SET_KIND_LEADER,
+  SET_KIND_LEADER_PROPOSAL,
   SET_KIND_ECHO_RESULT,
 };
 
 enum DiffKind
 {
   DIFF_KIND_NONE = 0,
-  DIFF_KIND_LEADER,
+  DIFF_KIND_LEADER_PROPOSAL,
+  DIFF_KIND_LEADER_CONSENSUS,
   DIFF_KIND_GRADECAST_RESULT,
 };
 
@@ -170,9 +174,45 @@ enum RfnKind
   RFN_KIND_NONE = 0,
   RFN_KIND_ECHO,
   RFN_KIND_CONFIRM,
+  RFN_KIND_GRADECAST_RESULT
+};
+
+
+struct SetOpCls
+{
+  struct SetKey input_set;
+
+  struct SetKey output_set;
+  struct RfnKey output_rfn;
+  struct DiffKey output_diff;
+
+  int do_not_remove;
+
+  int transceive_contested;
+
+  struct GNUNET_SET_OperationHandle *op;
 };
 
 
+struct FinishCls
+{
+  struct SetKey input_set;
+};
+
+/**
+ * Closure for both @a start_task
+ * and @a cancel_task.
+ */
+union TaskFuncCls
+{
+  struct SetOpCls setop;
+  struct FinishCls finish;
+};
+
+struct TaskEntry;
+
+typedef void (*TaskFunc) (struct TaskEntry *task);
+
 /*
  * Node in the consensus task graph.
  */
@@ -182,30 +222,14 @@ struct TaskEntry
 
   struct Step *step;
 
-  int is_running;
+  int is_started;
 
   int is_finished;
 
-  enum ActionType action;
-
-  struct SetKey input_set;
-  struct DiffKey input_diff;
-  struct RfnKey input_rfn;
-  struct SetKey output_set;
-  struct DiffKey output_diff;
-  struct RfnKey output_rfn;
-
-  /**
-   * Threshold when evaluating referendums.
-   */
-  uint16_t threshold;
-
-  /**
-   * Operation that is running for this task.
-   */
-  struct GNUNET_SET_OperationHandle *op;
+  TaskFunc start;
+  TaskFunc cancel;
 
-  struct GNUNET_SET_Handle *commited_set;
+  union TaskFuncCls cls;
 };
 
 
@@ -259,19 +283,10 @@ struct Step
   unsigned int is_finished;
 
   /*
-   * Round that this step should start.
-   * If not all prerequisites have run,
-   * the task will run anyway.
-   */
-  unsigned int start_round;
-
-  /*
-   * Number of rounds this step occupies.
-   *
-   * Some steps are more expensive, and thus
-   * are allocated more rounds.
+   * Synchrony round of the task.
+   * Determines the deadline for the task.
    */
-  unsigned int num_rounds;
+  unsigned int round;
 
   /**
    * Human-readable name for
@@ -280,23 +295,21 @@ struct Step
   char *debug_name;
 };
 
-struct RfnPeerInfo
-{
-  /* Peers can propose changes,
-   * but they are only accepted once
-   * the whole set operation is done. */
-  int is_commited;
-};
 
 struct RfnElementInfo
 {
-  struct GNUNET_SET_Element *element;
+  const struct GNUNET_SET_Element *element;
 
   /*
-   * Vote (or VOTE_NONE) from every peer
-   * in the session about the element.
+   * GNUNET_YES if the peer votes for the proposal.
    */
   int *votes;
+
+  /**
+   * Proposal for this element,
+   * can only be VOTE_ADD or VOTE_REMOVE.
+   */
+  enum ReferendumVote proposal;
 };
 
 
@@ -312,6 +325,8 @@ struct ReferendumEntry
    */
   struct GNUNET_CONTAINER_MultiHashMap *rfn_elements;
 
+  unsigned int num_peers;
+
   /**
    * Stores, for every peer in the session,
    * whether the peer finished the whole referendum.
@@ -323,12 +338,20 @@ struct ReferendumEntry
    * not counted for majority votes or thresholds.
    */
   int *peer_commited;
+
+
+  /**
+   * Contestation state of the peer.  If a peer is contested, the values it
+   * contributed are still counted for applying changes, but the grading is
+   * affected.
+   */
+  int *peer_contested;
 };
 
 
 struct DiffElementInfo
 {
-  struct GNUNET_SET_Element *element;
+  const struct GNUNET_SET_Element *element;
 
   /**
    * Positive weight for 'add', negative
@@ -373,7 +396,7 @@ struct ConsensusSession
   /**
    * Array of peers with length 'num_peers'.
    */
-  int *peers_ignored;
+  int *peers_blacklisted;
 
   /*
    * Mapping from (hashed) TaskKey to TaskEntry.
@@ -467,15 +490,6 @@ static struct GNUNET_PeerIdentity my_peer;
 static void
 finish_task (struct TaskEntry *task);
 
-static void
-run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task);
-
-static void
-run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task);
-
-static void
-run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task);
-
 static void
 run_ready_steps (struct ConsensusSession *session);
 
@@ -491,7 +505,7 @@ phasename (uint16_t phase)
     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
-    case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
+    case PHASE_KIND_APPLY_REP: return "APPLY_REP";
     default: return "(unknown)";
   }
 }
@@ -503,7 +517,7 @@ setname (uint16_t kind)
   switch (kind)
   {
     case SET_KIND_CURRENT: return "CURRENT";
-    case SET_KIND_LEADER: return "LEADER";
+    case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
     case SET_KIND_NONE: return "NONE";
     default: return "(unknown)";
   }
@@ -527,12 +541,26 @@ diffname (uint16_t kind)
   switch (kind)
   {
     case DIFF_KIND_NONE: return "NONE";
-    case DIFF_KIND_LEADER: return "LEADER";
+    case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
+    case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
     default: return "(unknown)";
   }
 }
 
+#ifdef GNUNET_EXTRA_LOGGING
+
+
+static const char *
+debug_str_element (const struct GNUNET_SET_Element *el)
+{
+  struct GNUNET_HashCode hash;
+
+  GNUNET_SET_element_hash (el, &hash);
+
+  return GNUNET_h2s (&hash);
+}
+
 static const char *
 debug_str_task_key (struct TaskKey *tk)
 {
@@ -559,7 +587,7 @@ debug_str_diff_key (struct DiffKey *dk)
 }
 
 static const char *
-debug_str_set_key (struct SetKey *sk)
+debug_str_set_key (const struct SetKey *sk)
 {
   static char buf[256];
 
@@ -572,7 +600,7 @@ debug_str_set_key (struct SetKey *sk)
 
 
 static const char *
-debug_str_rfn_key (struct RfnKey *rk)
+debug_str_rfn_key (const struct RfnKey *rk)
 {
   static char buf[256];
 
@@ -583,6 +611,8 @@ debug_str_rfn_key (struct RfnKey *rk)
   return buf;
 }
 
+#endif /* GNUNET_EXTRA_LOGGING */
+
 
 /**
  * Destroy a session, free all resources associated with it.
@@ -602,6 +632,8 @@ destroy_session (struct ConsensusSession *session)
   {
     GNUNET_MQ_destroy (session->client_mq);
     session->client_mq = NULL;
+    /* The MQ cleanup will also disconnect the underlying client. */
+    session->client = NULL;
   }
   if (NULL != session->client)
   {
@@ -634,8 +666,9 @@ send_to_client_iter (void *cls,
     struct GNUNET_CONSENSUS_ElementMessage *m;
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "P%d: got element for client\n",
-                session->local_peer_idx);
+                "P%d: sending element %s to client\n",
+                session->local_peer_idx,
+                debug_str_element (element));
 
     ev = GNUNET_MQ_msg_extra (m, element->size,
                               GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
@@ -655,34 +688,6 @@ send_to_client_iter (void *cls,
 }
 
 
-/**
- * Callback for set operation results. Called for each element
- * in the result set.
- *
- * @param cls closure
- * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
- * @param status see enum GNUNET_SET_Status
- */
-static void
-set_result_cb_loop (void *cls,
-               const struct GNUNET_SET_Element *element,
-               enum GNUNET_SET_Status status)
-{
-  /* Nothing to do here.
-     This is the callback for looped local set operations, everything is
-     handled by the first callback */
-
-  struct TaskEntry *task = cls;
-  struct ConsensusSession *session = task->step->session;
-  
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "P%u: skipping looped set result for {%s}, status %u\n",
-              session->local_peer_idx,
-              debug_str_task_key (&task->key),
-              status);
-}
-
-
 static struct SetEntry *
 lookup_set (struct ConsensusSession *session, struct SetKey *key)
 {
@@ -736,21 +741,107 @@ diff_insert (struct DiffEntry *diff,
              int weight,
              const struct GNUNET_SET_Element *element)
 {
-  GNUNET_assert (0);
+  struct DiffElementInfo *di;
+  struct GNUNET_HashCode hash;
+
+  GNUNET_assert ( (1 == weight) || (-1 == weight));
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "diff_insert with element size %u\n",
+              element->size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "hashing element\n");
+
+  GNUNET_SET_element_hash (element, &hash);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "hashed element\n");
+
+  di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
+
+  if (NULL == di)
+  {
+    di = GNUNET_new (struct DiffElementInfo);
+    di->element = GNUNET_SET_element_dup (element);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put (diff->changes,
+                                                      &hash, di,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+  }
+
+  di->weight = weight;
+}
+
+
+static void
+rfn_commit (struct ReferendumEntry *rfn,
+            uint16_t commit_peer)
+{
+  GNUNET_assert (commit_peer < rfn->num_peers);
+
+  rfn->peer_commited[commit_peer] = GNUNET_YES;
 }
 
 
+static void
+rfn_contest (struct ReferendumEntry *rfn,
+             uint16_t contested_peer)
+{
+  GNUNET_assert (contested_peer < rfn->num_peers);
+
+  rfn->peer_contested[contested_peer] = GNUNET_YES;
+}
+
+
+static uint16_t
+rfn_noncontested (struct ReferendumEntry *rfn)
+{
+  uint16_t i;
+  uint16_t ret;
+
+  ret = 0;
+  for (i = 0; i < rfn->num_peers; i++)
+    if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
+      ret++;
+
+  return ret;
+}
+
 static void
 rfn_vote (struct ReferendumEntry *rfn,
           uint16_t voting_peer,
-          uint16_t num_peers,
-          int vote,
+          enum ReferendumVote vote,
           const struct GNUNET_SET_Element *element)
 {
-  GNUNET_assert (voting_peer < num_peers);
-  GNUNET_assert (0);
+  struct RfnElementInfo *ri;
+  struct GNUNET_HashCode hash;
+
+  GNUNET_assert (voting_peer < rfn->num_peers);
+
+  /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
+     since VOTE_KEEP is implicit in not voting. */
+  GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
+
+  GNUNET_SET_element_hash (element, &hash);
+  ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
+
+  if (NULL == ri)
+  {
+    ri = GNUNET_new (struct RfnElementInfo);
+    ri->element = GNUNET_SET_element_dup (element);
+    ri->votes = GNUNET_new_array (rfn->num_peers, int);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
+                                                      &hash, ri,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+  }
+
+  ri->votes[voting_peer] = GNUNET_YES;
+  ri->proposal = vote;
 }
 
+
 uint16_t
 task_other_peer (struct TaskEntry *task)
 {
@@ -760,6 +851,7 @@ task_other_peer (struct TaskEntry *task)
   return task->key.peer1;
 }
 
+
 /**
  * Callback for set operation results. Called for each element
  * in the result set.
@@ -779,6 +871,10 @@ set_result_cb (void *cls,
   struct DiffEntry *output_diff = NULL;
   struct ReferendumEntry *output_rfn = NULL;
   unsigned int other_idx;
+  struct SetOpCls *setop;
+
+  setop = &task->cls.setop;
+
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "P%u: got set result for {%s}, status %u\n",
@@ -786,7 +882,7 @@ set_result_cb (void *cls,
               debug_str_task_key (&task->key),
               status);
 
-  if (GNUNET_NO == task->is_running)
+  if (GNUNET_NO == task->is_started)
   {
     GNUNET_break_op (0);
     return;
@@ -798,37 +894,52 @@ set_result_cb (void *cls,
     return;
   }
 
-  if (task->key.peer1 == session->local_peer_idx)
-    other_idx = task->key.peer2;
-  else if (task->key.peer2 == session->local_peer_idx)
-    other_idx = task->key.peer1;
-  else
+  other_idx = task_other_peer (task);
+
+  if (SET_KIND_NONE != setop->output_set.set_kind)
   {
-    /* error in task graph construction */
-    GNUNET_assert (0);
+    output_set = lookup_set (session, &setop->output_set);
+    GNUNET_assert (NULL != output_set);
   }
 
-  if (SET_KIND_NONE != task->output_set.set_kind)
-    output_set = lookup_set (session, &task->output_set);
+  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
+  {
+    output_diff = lookup_diff (session, &setop->output_diff);
+    GNUNET_assert (NULL != output_diff);
+  }
 
-  if (DIFF_KIND_NONE != task->output_diff.diff_kind)
-    output_diff = lookup_diff (session, &task->output_diff);
+  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
+  {
+    output_rfn = lookup_rfn (session, &setop->output_rfn);
+    GNUNET_assert (NULL != output_rfn);
+  }
 
-  if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
-    output_rfn = lookup_rfn (session, &task->output_rfn);
+  if (GNUNET_YES == session->peers_blacklisted[other_idx])
+  {
+    /* Peer might have been blacklisted
+       by a gradecast running in parallel, ignore elements from now */
+    if (GNUNET_SET_STATUS_ADD_LOCAL == status)
+      return;
+    if (GNUNET_SET_STATUS_ADD_REMOTE == status)
+      return;
+  }
 
-  if (GNUNET_YES == session->peers_ignored[other_idx])
+  if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
   {
-    /* We should have never started or commited to an operation
-       with an ignored peer. */
-    GNUNET_break (0);
-    return;
+    if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
+    {
+      GNUNET_assert (NULL != output_rfn);
+      rfn_contest (output_rfn, task_other_peer (task));
+      return;
+    }
   }
 
   switch (status)
   {
-    // case GNUNET_SET_STATUS_MISSING_LOCAL:
-    case GNUNET_SET_STATUS_OK:
+    case GNUNET_SET_STATUS_ADD_LOCAL:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Adding element in Task {%s}\n",
+                  debug_str_task_key (&task->key));
       if (NULL != output_set)
       {
         // FIXME: record pending adds, use callback
@@ -836,30 +947,106 @@ set_result_cb (void *cls,
                                 element,
                                 NULL,
                                 NULL);
-
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: adding element %s into set {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_set_key (&setop->output_set),
+                    debug_str_task_key (&task->key));
+#endif
       }
       if (NULL != output_diff)
       {
         diff_insert (output_diff, 1, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: adding element %s into diff {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_diff_key (&setop->output_diff),
+                    debug_str_task_key (&task->key));
+#endif
       }
       if (NULL != output_rfn)
       {
-        rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element);
+        rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: adding element %s into rfn {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_rfn_key (&setop->output_rfn),
+                    debug_str_task_key (&task->key));
+#endif
       }
       // XXX: add result to structures in task
       break;
-    //case GNUNET_SET_STATUS_MISSING_REMOTE:
-    //  // XXX: add result to structures in task
-    //  break;
+    case GNUNET_SET_STATUS_ADD_REMOTE:
+      if (GNUNET_YES == setop->do_not_remove)
+        break;
+      if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
+        break;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Removing element in Task {%s}\n",
+                  debug_str_task_key (&task->key));
+      if (NULL != output_set)
+      {
+        // FIXME: record pending adds, use callback
+        GNUNET_SET_remove_element (output_set->h,
+                                   element,
+                                   NULL,
+                                   NULL);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: removing element %s from set {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_set_key (&setop->output_set),
+                    debug_str_task_key (&task->key));
+#endif
+      }
+      if (NULL != output_diff)
+      {
+        diff_insert (output_diff, -1, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: removing element %s from diff {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_diff_key (&setop->output_diff),
+                    debug_str_task_key (&task->key));
+#endif
+      }
+      if (NULL != output_rfn)
+      {
+        rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: removing element %s from rfn {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_rfn_key (&setop->output_rfn),
+                    debug_str_task_key (&task->key));
+#endif
+      }
+      break;
     case GNUNET_SET_STATUS_DONE:
       // XXX: check first if any changes to the underlying
       // set are still pending
-      // XXX: commit other peer in referendum
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Finishing setop in Task {%s}\n",
+                  debug_str_task_key (&task->key));
+      if (NULL != output_rfn)
+      {
+        rfn_commit (output_rfn, task_other_peer (task));
+      }
       finish_task (task);
       break;
     case GNUNET_SET_STATUS_FAILURE:
       // XXX: cleanup
-      GNUNET_break (0);
+      GNUNET_break_op (0);
+      finish_task (task);
       return;
     default:
       /* not reached */
@@ -867,6 +1054,84 @@ set_result_cb (void *cls,
   }
 }
 
+#ifdef EVIL
+
+enum Evilness
+{
+  EVILNESS_NONE,
+  EVILNESS_CRAM,
+  EVILNESS_SLACK,
+};
+
+static void
+get_evilness (struct ConsensusSession *session, enum Evilness *ret_type, unsigned int *ret_num)
+{
+  char *evil_spec;
+  char *field;
+  char *evil_type_str = NULL;
+
+  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "P%u: no evilness\n",
+                session->local_peer_idx);
+    *ret_type = EVILNESS_NONE;
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "P%u: got evilness spec\n",
+              session->local_peer_idx);
+
+  for (field = strtok (evil_spec, "/");
+       NULL != field;
+       field = strtok (NULL, "/"))
+  {
+    unsigned int peer_num;
+    unsigned int evil_num;
+    int ret;
+
+    evil_type_str = NULL;
+
+    ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str, &evil_num);
+
+    if (ret != 3)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC, behaving like a good peer.\n",
+                  field); 
+      goto not_evil;
+    }
+
+    GNUNET_assert (NULL != evil_type_str);
+
+    if (peer_num == session->local_peer_idx)
+    {
+      if (0 == strcmp ("slack", evil_type_str))
+        *ret_type = EVILNESS_SLACK;
+      else if (0 == strcmp ("cram", evil_type_str))
+      {
+        *ret_type = EVILNESS_CRAM;
+        *ret_num = evil_num;
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n"); 
+        goto not_evil;
+      }
+      goto cleanup;
+    }
+    /* No GNUNET_free since memory was allocated by libc */
+    free (evil_type_str);
+    evil_type_str = NULL;
+  }
+not_evil:
+  *ret_type = EVILNESS_NONE;
+cleanup:
+  GNUNET_free (evil_spec);
+  if (NULL != evil_type_str)
+    free (evil_type_str);
+}
+
+#endif
 
 
 /**
@@ -878,11 +1143,86 @@ commit_set (struct ConsensusSession *session,
             struct TaskEntry *task)
 {
   struct SetEntry *set;
+  struct SetOpCls *setop = &task->cls.setop;
 
-  GNUNET_assert (NULL != task->op);
-  set = lookup_set (session, &task->input_set);
+  GNUNET_assert (NULL != setop->op);
+  set = lookup_set (session, &setop->input_set);
   GNUNET_assert (NULL != set);
-  GNUNET_SET_commit (task->op, set->h);
+
+#ifdef EVIL
+  {
+    unsigned int i;
+    unsigned int evil_num;
+    enum Evilness evilness;
+
+    get_evilness (session, &evilness, &evil_num);
+    switch (evilness)
+    {
+      case EVILNESS_CRAM:
+        /* We're not cramming elements in the
+           all-to-all round, since that would just
+           add more elements to the result set, but
+           wouldn't test robustness. */
+        if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
+        {
+          GNUNET_SET_commit (setop->op, set->h);
+          break;
+        }
+        for (i = 0; i < evil_num; i++)
+        {
+          struct GNUNET_HashCode hash;
+          struct GNUNET_SET_Element element;
+          element.data = &hash;
+          element.size = sizeof (struct GNUNET_HashCode);
+          element.element_type = 0;
+
+          GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+          GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+#ifdef GNUNET_EXTRA_LOGGING
+          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                      "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
+                      session->local_peer_idx,
+                      debug_str_element (&element),
+                      debug_str_set_key (&setop->input_set),
+                      debug_str_task_key (&task->key));
+#endif
+        }
+        GNUNET_SET_commit (setop->op, set->h);
+        break;
+      case EVILNESS_SLACK:
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: evil peer: slacking\n",
+                    session->local_peer_idx,
+                    evil_num);
+        /* Do nothing. */
+        break;
+      case EVILNESS_NONE:
+        GNUNET_SET_commit (setop->op, set->h);
+        break;
+    }
+  }
+#else
+  if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
+  {
+    struct GNUNET_SET_Element element;
+    struct ContestedPayload payload;
+    element.data = &payload;
+    element.size = sizeof (struct ContestedPayload);
+    element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
+    GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+  }
+  if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
+  {
+    GNUNET_SET_commit (setop->op, set->h);
+  }
+  else
+  {
+    /* For our testcases, we don't want the blacklisted
+       peers to wait. */
+    GNUNET_SET_operation_cancel (setop->op);
+    setop->op = NULL;
+  }
+#endif
 }
 
 
@@ -892,6 +1232,8 @@ put_diff (struct ConsensusSession *session,
 {
   struct GNUNET_HashCode hash;
 
+  GNUNET_assert (NULL != diff);
+
   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
@@ -904,53 +1246,427 @@ put_set (struct ConsensusSession *session,
 {
   struct GNUNET_HashCode hash;
 
-  GNUNET_assert (NULL != set->h);
+  GNUNET_assert (NULL != set->h);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Putting set %s\n",
+              debug_str_set_key (&set->key));
+
+  GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
+                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+}
+
+
+static void
+put_rfn (struct ConsensusSession *session,
+         struct ReferendumEntry *rfn)
+{
+  struct GNUNET_HashCode hash;
+
+  GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
+                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+}
+
+
+
+static void
+task_cancel_reconcile (struct TaskEntry *task)
+{
+  /* not implemented yet */
+  GNUNET_assert (0);
+}
+
+
+static void
+apply_diff_to_rfn (struct DiffEntry *diff,
+                   struct ReferendumEntry *rfn,
+                   uint16_t voting_peer,
+                   uint16_t num_peers)
+{
+  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+  struct DiffElementInfo *di;
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
+
+  while (GNUNET_YES ==
+         GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+                                                      NULL,
+                                                      (const void **) &di))
+  {
+    if (di->weight > 0)
+    {
+      rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
+    }
+    if (di->weight < 0)
+    {
+      rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
+    }
+  }
+
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+}
+
+
+struct DiffEntry *
+diff_create ()
+{
+  struct DiffEntry *d = GNUNET_new (struct DiffEntry);
+
+  d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
+  
+  return d;
+}
+
+
+struct DiffEntry *
+diff_compose (struct DiffEntry *diff_1,
+              struct DiffEntry *diff_2)
+{
+  struct DiffEntry *diff_new;
+  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+  struct DiffElementInfo *di;
+  diff_new = diff_create ();
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
+  {
+    diff_insert (diff_new, di->weight, di->element);
+  }
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
+  {
+    diff_insert (diff_new, di->weight, di->element);
+  }
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+
+  return diff_new;
+}
+
+
+struct ReferendumEntry *
+rfn_create (uint16_t size)
+{
+  struct ReferendumEntry *rfn;
+
+  rfn = GNUNET_new (struct ReferendumEntry);
+  rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
+  rfn->peer_commited = GNUNET_new_array (size, int);
+  rfn->peer_contested = GNUNET_new_array (size, int);
+  rfn->num_peers = size;
+
+  return rfn;
+}
+
+
+void
+diff_destroy (struct DiffEntry *diff)
+{
+  GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
+  GNUNET_free (diff);
+}
+
+
+static void
+rfn_majority (const struct ReferendumEntry *rfn,
+              const struct RfnElementInfo *ri,
+              uint16_t *ret_majority,
+              enum ReferendumVote *ret_vote)
+{
+  uint16_t votes_yes = 0;
+  uint16_t num_commited = 0;
+  uint16_t i;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Computing rfn majority for element %s of rfn {%s}\n",
+              debug_str_element (ri->element),
+              debug_str_rfn_key (&rfn->key));
+
+  for (i = 0; i < rfn->num_peers; i++)
+  {
+    if (GNUNET_NO == rfn->peer_commited[i])
+      continue;
+    num_commited++;
+
+    if (GNUNET_YES == ri->votes[i])
+      votes_yes++;
+  }
+
+  if (votes_yes > (num_commited) / 2)
+  {
+    *ret_vote = ri->proposal;
+    *ret_majority = votes_yes;
+  }
+  else
+  {
+    *ret_vote = VOTE_STAY;
+    *ret_majority = num_commited - votes_yes;
+  }
+}
+
+
+struct SetCopyCls
+{
+  struct TaskEntry *task;
+  struct SetKey dst_set_key;
+};
+
+
+static void
+set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
+{
+  struct SetCopyCls *scc = cls;
+  struct TaskEntry *task = scc->task;
+  struct SetKey dst_set_key = scc->dst_set_key;
+  struct SetEntry *set;
+
+  GNUNET_free (scc);
+  set = GNUNET_new (struct SetEntry);
+  set->h = copy;
+  set->key = dst_set_key;
+  put_set (task->step->session, set);
+
+  task->start (task);
+}
+
+
+/**
+ * Call the start function of the given
+ * task again after we created a copy of the given set.
+ */
+static void
+create_set_copy_for_task (struct TaskEntry *task,
+                          struct SetKey *src_set_key,
+                          struct SetKey *dst_set_key)
+{
+  struct SetEntry *src_set;
+  struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Copying set {%s} to {%s} for task {%s}\n",
+              debug_str_set_key (src_set_key),
+              debug_str_set_key (dst_set_key),
+              debug_str_task_key (&task->key));
+
+  scc->task = task;
+  scc->dst_set_key = *dst_set_key;
+  src_set = lookup_set (task->step->session, src_set_key);
+  GNUNET_assert (NULL != src_set);
+  GNUNET_SET_copy_lazy (src_set->h,
+                        set_copy_cb,
+                        scc);
+}
+
+
+struct SetMutationProgressCls
+{
+  int num_pending;
+  /**
+   * Task to finish once all changes are through.
+   */
+  struct TaskEntry *task;
+};
+
+
+static void
+set_mutation_done (void *cls)
+{
+  struct SetMutationProgressCls *pc = cls;
+
+  GNUNET_assert (pc->num_pending > 0);
+
+  pc->num_pending--;
+
+  if (0 == pc->num_pending)
+  {
+    struct TaskEntry *task = pc->task;
+    GNUNET_free (pc);
+    finish_task (task);
+  }
+}
+
+static void
+task_start_apply_round (struct TaskEntry *task)
+{
+  struct ConsensusSession *session = task->step->session;
+  struct SetKey sk_in;
+  struct SetKey sk_out;
+  struct RfnKey rk_in;
+  struct SetEntry *set_out;
+  struct ReferendumEntry *rfn_in;
+  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+  struct RfnElementInfo *ri;
+  struct SetMutationProgressCls *progress_cls;
+
+  sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
+  rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
+  sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
+
+  set_out = lookup_set (session, &sk_out);
+  if (NULL == set_out)
+  {
+    create_set_copy_for_task (task, &sk_in, &sk_out);
+    return;
+  }
+
+  rfn_in = lookup_rfn (session, &rk_in);
+  GNUNET_assert (NULL != rfn_in);
+
+  progress_cls = GNUNET_new (struct SetMutationProgressCls);
+  progress_cls->task = task;
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
+
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
+  {
+    uint16_t majority_num;
+    enum ReferendumVote majority_vote;
+
+    rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
+
+    switch (majority_vote)
+    {
+      case VOTE_ADD:
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_add_element (set_out->h,
+                                               ri->element,
+                                               set_mutation_done,
+                                               progress_cls));
+        break;
+      case VOTE_REMOVE:
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_remove_element (set_out->h,
+                                                  ri->element,
+                                                  set_mutation_done,
+                                                  progress_cls));
+        break;
+      case VOTE_STAY:
+        // do nothing
+        break;
+      default:
+        GNUNET_assert (0);
+        break;
+    }
+  }
+
+  if (progress_cls->num_pending == 0)
+  {
+    // call closure right now, no pending ops
+    GNUNET_free (progress_cls);
+    finish_task (task);
+  }
+}
+
+
+#define THRESH(s) (((s)->num_peers / 3))
+
+
+static void
+task_start_grade (struct TaskEntry *task)
+{
+  struct ConsensusSession *session = task->step->session;
+  struct ReferendumEntry *output_rfn;
+  struct ReferendumEntry *input_rfn;
+  struct DiffEntry *input_diff;
+  struct RfnKey rfn_key;
+  struct DiffKey diff_key;
+  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+  struct RfnElementInfo *ri;
+  unsigned int gradecast_confidence = 2;
+
+  rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
+  output_rfn = lookup_rfn (session, &rfn_key);
+  if (NULL == output_rfn)
+  {
+    output_rfn = rfn_create (session->num_peers);
+    output_rfn->key = rfn_key;
+    put_rfn (session, output_rfn);
+  }
+
+  diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
+  input_diff = lookup_diff (session, &diff_key);
+  GNUNET_assert (NULL != input_diff);
+
+  rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
+  input_rfn = lookup_rfn (session, &rfn_key);
+  GNUNET_assert (NULL != input_rfn);
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
 
-  GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
-                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-}
+  apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
 
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
+  {
+    uint16_t majority_num;
+    enum ReferendumVote majority_vote;
 
-static void
-put_rfn (struct ConsensusSession *session,
-         struct ReferendumEntry *rfn)
-{
-  struct GNUNET_HashCode hash;
+    // XXX: we need contested votes and non-contested votes here
+    rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
 
-  GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
-                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-}
+    if (majority_num < (session->num_peers / 3) * 2)
+    {
+      gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
+    }
+    if (majority_num < (session->num_peers / 3) + 1)
+    {
+      gradecast_confidence = 0;
+    }
 
+    switch (majority_vote)
+    {
+      case VOTE_STAY:
+        break;
+      case VOTE_ADD:
+        rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
+        break;
+      case VOTE_REMOVE:
+        rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
+        break;
+      default:
+        GNUNET_assert (0);
+        break;
+    }
+  }
 
+  {
+    uint16_t noncontested;
+    noncontested = rfn_noncontested (input_rfn);
+    if (noncontested < (session->num_peers / 3) * 2)
+    {
+      gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
+    }
+    if (noncontested < (session->num_peers / 3) + 1)
+    {
+      gradecast_confidence = 0;
+    }
+  }
 
-static void
-output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
-{
-  struct TaskEntry *task = (struct TaskEntry *) cls;
-  struct ConsensusSession *session = task->step->session;
-  struct SetEntry *set = GNUNET_new (struct SetEntry);
+  if (gradecast_confidence >= 1)
+    rfn_commit (output_rfn, task->key.leader);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "P%u: Received lazy copy, storing output set %s\n",
-              session->local_peer_idx, debug_str_set_key (&task->output_set));
+  if (gradecast_confidence <= 1)
+    session->peers_blacklisted[task->key.leader] = GNUNET_YES;
 
-  set->key = task->output_set;
-  set->h = copy;
-  put_set (task->step->session, set);
-  run_task_remote_union (task->step->session, task);
+  finish_task (task);
 }
 
 
 static void
-run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_reconcile (struct TaskEntry *task)
 {
   struct SetEntry *input;
+  struct SetOpCls *setop = &task->cls.setop;
+  struct ConsensusSession *session = task->step->session;
 
-  input = lookup_set (session, &task->input_set);
+  input = lookup_set (session, &setop->input_set);
   GNUNET_assert (NULL != input);
   GNUNET_assert (NULL != input->h);
 
@@ -959,41 +1675,53 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
      because we want something valid in there, even
      if the other peer doesn't talk to us */
 
-  if (SET_KIND_NONE != task->output_set.set_kind)
+  if (SET_KIND_NONE != setop->output_set.set_kind)
   {
     /* If we don't have an existing output set,
        we clone the input set. */
-    if (NULL == lookup_set (session, &task->output_set))
+    if (NULL == lookup_set (session, &setop->output_set))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Output set missing, copying from input set\n");
-      /* Since the cloning is asynchronous,
-         we'll retry the current function once the copy
-         has been provided by the SET service. */
-      GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task);
+      create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
       return;
     }
   }
 
-  if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
+  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
   {
-    if (NULL == lookup_rfn (session, &task->output_rfn))
+    if (NULL == lookup_rfn (session, &setop->output_rfn))
     {
       struct ReferendumEntry *rfn;
 
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "P%u: output rfn <%s> missing, creating.\n",
                   session->local_peer_idx,
-                  debug_str_rfn_key (&task->output_rfn));
+                  debug_str_rfn_key (&setop->output_rfn));
 
-      rfn = GNUNET_new (struct ReferendumEntry);
-      rfn->key = task->output_rfn;
-      rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
-      rfn->peer_commited = GNUNET_new_array (session->num_peers, int);
+      rfn = rfn_create (session->num_peers);
+      rfn->key = setop->output_rfn;
       put_rfn (session, rfn);
     }
   }
 
+  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
+  {
+    if (NULL == lookup_diff (session, &setop->output_diff))
+    {
+      struct DiffEntry *diff;
+
+      diff = diff_create ();
+      diff->key = setop->output_diff;
+      put_diff (session, diff);
+    }
+  }
+
+  if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
+  {
+    /* XXX: mark the corresponding rfn as commited if necessary */
+    finish_task (task);
+    return;
+  }
+
   if (task->key.peer1 == session->local_peer_idx)
   {
     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
@@ -1001,7 +1729,7 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "P%u: Looking up set {%s} to run remote union\n",
                 session->local_peer_idx,
-                debug_str_set_key (&task->input_set));
+                debug_str_set_key (&setop->input_set));
 
     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
     rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
@@ -1012,23 +1740,20 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
     rcm.leader = htons (task->key.leader);
     rcm.repetition = htons (task->key.repetition);
 
-    GNUNET_assert (NULL == task->op);
+    GNUNET_assert (NULL == setop->op);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
-                session->local_peer_idx, task->key.peer2, debug_str_set_key (&task->input_set));
+                session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
 
     // XXX: maybe this should be done while
     // setting up tasks alreays?
-    task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
-                                   &session->global_id,
-                                   &rcm.header,
-                                   GNUNET_SET_RESULT_ADDED, /* XXX: will be obsolete soon */
-                                   set_result_cb,
-                                   task);
-
-    /* Referendums must be materialized as a set before */
-    GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind);
-
-    if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h))
+    setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
+                                    &session->global_id,
+                                    &rcm.header,
+                                    GNUNET_SET_RESULT_SYMMETRIC,
+                                    set_result_cb,
+                                    task);
+
+    if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h))
     {
       GNUNET_break (0);
       /* XXX: cleanup? */
@@ -1041,9 +1766,8 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
                 session->local_peer_idx, task->key.peer1);
 
-    if (NULL != task->op)
+    if (NULL != setop->op)
     {
-      GNUNET_assert (NULL == task->commited_set);
       commit_set (session, task);
     }
   }
@@ -1055,221 +1779,83 @@ run_task_remote_union (struct ConsensusSession *session, struct TaskEntry *task)
 }
 
 
-static int
-rfn_majority (uint16_t num_peers,
-              struct ReferendumEntry *rfn,
-              struct RfnElementInfo *ri,
-              uint16_t threshold)
-{
-  unsigned int votes_add = 0;
-  unsigned int votes_remove = 0;
-  unsigned int num_commited = 0;
-  unsigned int maj_thresh;
-  unsigned int nv;
-  unsigned int tv;
-  unsigned int i;
-
-  for (i = 0; i < num_peers; i++)
-  {
-    if (GNUNET_NO == rfn->peer_commited[i])
-      continue;
-    num_commited++;
-    if (ri->votes[i] == VOTE_ADD)
-      votes_add++;
-    if (ri->votes[i] == VOTE_REMOVE)
-      votes_remove++;
-  }
-
-  /* Threshold to reach a majority among
-     submitted votes, may not be enough for the
-     global threshold. */
-  maj_thresh = (num_commited + 1) / 2;
-  /* Vote are relative to our local set, so it can only be
-     either all add or all remove */
-  GNUNET_assert ( (0 == votes_add) || (0 == votes_remove) );
-
-  if (votes_add > 0)
-  {
-    nv = votes_add;
-    tv = VOTE_ADD;
-  }
-  else if (votes_remove > 0)
-  {
-    nv = votes_remove;
-    tv = VOTE_REMOVE;
-  }
-  else
-  {
-    nv = 0;
-    tv = VOTE_NONE;
-  }
-
-  if ( (nv >= maj_thresh) && (nv >= threshold) )
-    return tv;
-
-  if ( ((num_commited - nv) >= maj_thresh) && ((num_commited - nv) >= threshold) )
-    return VOTE_NONE;
-
-  return VOTE_CONTESTED;
-}
-
-
-struct SetChangeProgressCls
-{
-  int num_pending;
-  struct TaskEntry *task;
-};
-
-
-static void
-eval_rfn_done (struct TaskEntry *task)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "P%u: EVAL_REFERENDUM done for task {%s}\n",
-              task->step->session->local_peer_idx, debug_str_task_key (&task->key));
-
-  finish_task (task);
-}
-
-
-static void
-eval_rfn_progress (void *cls)
-{
-  struct SetChangeProgressCls *erc = cls;
-
-  GNUNET_assert (erc->num_pending > 0);
-
-  erc->num_pending--;
-
-  if (0 == erc->num_pending)
-  {
-    struct TaskEntry *task = erc->task;
-    GNUNET_free (erc);
-    eval_rfn_done (task);
-  }
-}
-
-
-static void
-eval_rfn_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
-{
-  struct TaskEntry *task = (struct TaskEntry *) cls;
-  struct ConsensusSession *session = task->step->session;
-  struct SetEntry *set;
-
-  set = GNUNET_new (struct SetEntry);
-  set->h = copy;
-  set->key = task->output_set;
-
-  put_set (session, set);
-
-  run_task_eval_rfn (session, task);
-}
-
-
-/**
- * Take an input set and an input referendum, 
- * apply the referendum with a threshold to the input
- * set and store the result in the output set and/or output diff.
- */
 static void
-run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_eval_echo (struct TaskEntry *task)
 {
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct ReferendumEntry *input_rfn;
   struct RfnElementInfo *ri;
-  struct SetEntry *output_set = NULL;
-  struct DiffEntry *output_diff = NULL;
-  struct SetChangeProgressCls *progress_cls;
-
-  /* Have at least one output */
-  GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) ||
-                  (task->output_diff.diff_kind != DIFF_KIND_NONE));
-
-  /* Not allowed as output */
-  GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE));
+  struct SetEntry *output_set;
+  struct SetMutationProgressCls *progress_cls;
+  struct ConsensusSession *session = task->step->session;
+  struct SetKey sk_in;
+  struct SetKey sk_out;
+  struct RfnKey rk_in;
 
-  if (SET_KIND_NONE != task->output_set.set_kind)
+  sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
+  sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
+  output_set = lookup_set (session, &sk_out);
+  if (NULL == output_set)
   {
-    /* We have a set output, thus the output set must
-       exist or copy it from the input set */
-    output_set = lookup_set (session, &task->output_set);
-    if (NULL == output_set)
-    {
-      struct SetEntry *input_set;
-
-      input_set = lookup_set (session, &task->input_set);
-      GNUNET_assert (NULL != input_set);
-      GNUNET_SET_copy_lazy (input_set->h,
-                            eval_rfn_copy_cb,
-                            task);
-      /* We'll be called again, this time with the
-         set ready. */
-      return;
-    }
+    create_set_copy_for_task (task, &sk_in, &sk_out);
+    return;
   }
 
-  if (DIFF_KIND_NONE != task->output_diff.diff_kind)
-  {
-    output_diff = lookup_diff (session, &task->output_diff);
-    if (NULL == output_diff)
-    {
-      output_diff = GNUNET_new (struct DiffEntry);
-      output_diff->key = task->output_diff;
-      output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
-      put_diff (session, output_diff);
-    }
-  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Evaluating referendum in Task {%s}\n",
+              debug_str_task_key (&task->key));
 
-  progress_cls = GNUNET_new (struct SetChangeProgressCls);
+  progress_cls = GNUNET_new (struct SetMutationProgressCls);
+  progress_cls->task = task;
 
-  input_rfn = lookup_rfn (session, &task->input_rfn);
+  rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
+  input_rfn = lookup_rfn (session, &rk_in);
 
   GNUNET_assert (NULL != input_rfn);
 
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
   GNUNET_assert (NULL != iter);
 
-  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
+  while (GNUNET_YES ==
+         GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+                                                      NULL,
+                                                      (const void **) &ri))
   {
-    int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, task->threshold);
+    enum ReferendumVote majority_vote;
+    uint16_t majority_num;
+
+    rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
+
+    if (majority_num < session->num_peers / 3)
+    {
+      /* It is not the case that all nonfaulty peers
+         echoed the same value.  Since we're doing a set reconciliation, we
+         can't simply send "nothing" for the value.  Thus we mark our 'confirm'
+         reconciliation as contested.  Other peers might not know that the
+         leader is faulty, thus we still re-distribute in the confirmation
+         round. */
+      output_set->is_contested = GNUNET_YES;
+    }
+
     switch (majority_vote)
     {
       case VOTE_ADD:
-        if (NULL != output_set)
-        {
-          progress_cls->num_pending++;
-          GNUNET_assert (GNUNET_OK ==
-                         GNUNET_SET_add_element (output_set->h,
-                                     ri->element,
-                                     eval_rfn_progress,
-                                     progress_cls));
-        }
-        if (NULL != output_diff)
-        {
-          diff_insert (output_diff, 1, ri->element);
-        }
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_add_element (output_set->h,
+                                               ri->element,
+                                               set_mutation_done,
+                                               progress_cls));
         break;
-      case VOTE_CONTESTED:
-        if (NULL != output_set)
-          output_set->is_contested = GNUNET_YES;
-        /* fallthrough */
       case VOTE_REMOVE:
-        if (NULL != output_set)
-        {
-          progress_cls->num_pending++;
-          GNUNET_assert (GNUNET_OK ==
-                         GNUNET_SET_remove_element (output_set->h,
-                                     ri->element,
-                                     eval_rfn_progress,
-                                     progress_cls));
-        }
-        if (NULL != output_diff)
-        {
-          diff_insert (output_diff, -1, ri->element);
-        }
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_remove_element (output_set->h,
+                                                  ri->element,
+                                                  set_mutation_done,
+                                                  progress_cls));
         break;
-      case VOTE_NONE:
+      case VOTE_STAY:
         /* Nothing to do. */
         break;
       default:
@@ -1277,119 +1863,6 @@ run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
         GNUNET_assert (0);
     }
   }
-  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
-
-  if (progress_cls->num_pending == 0)
-  {
-    // call closure right now, no pending ops
-    GNUNET_free (progress_cls);
-    eval_rfn_done (task);
-  }
-}
-
-
-static void
-apply_diff_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
-{
-  struct TaskEntry *task = (struct TaskEntry *) cls;
-  struct ConsensusSession *session = task->step->session;
-  struct SetEntry *set;
-
-  set = GNUNET_new (struct SetEntry);
-  set->h = copy;
-  set->key = task->output_set;
-
-  put_set (session, set);
-
-  run_task_apply_diff (session, task);
-}
-
-
-static void
-apply_diff_done (struct TaskEntry *task)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "P%u: APPLY_DIFF done for task {%s}\n",
-              task->step->session->local_peer_idx, debug_str_task_key (&task->key));
-  finish_task (task);
-}
-
-
-static void
-apply_diff_progress (void *cls)
-{
-  struct SetChangeProgressCls *erc = cls;
-
-  GNUNET_assert (erc->num_pending > 0);
-
-  erc->num_pending--;
-
-  if (0 == erc->num_pending)
-  {
-    struct TaskEntry *task = erc->task;
-    GNUNET_free (erc);
-    apply_diff_done (task);
-  }
-}
-
-
-static void
-run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task)
-{
-  struct SetEntry *output_set;
-  struct DiffEntry *input_diff;
-  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
-  struct DiffElementInfo *di;
-  struct SetChangeProgressCls *progress_cls;
-
-  GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE);
-  GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE);
-
-  input_diff = lookup_diff (session, &task->input_diff);
-
-  GNUNET_assert (NULL != input_diff);
-
-  output_set = lookup_set (session, &task->output_set);
-
-  if (NULL == output_set)
-  {
-      struct SetEntry *input_set;
-
-      input_set = lookup_set (session, &task->input_set);
-      GNUNET_assert (NULL != input_set);
-      GNUNET_SET_copy_lazy (input_set->h,
-                            apply_diff_copy_cb,
-                            task);
-      /* We'll be called again, this time with the
-         set ready. */
-      return;
-  }
-
-  progress_cls = GNUNET_new (struct SetChangeProgressCls);
-
-  iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_diff->changes);
-
-  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
-  {
-    if (di->weight > 0)
-    {
-      progress_cls->num_pending++;
-      GNUNET_assert (GNUNET_OK ==
-                     GNUNET_SET_remove_element (output_set->h,
-                                 di->element,
-                                 apply_diff_progress,
-                                 progress_cls));
-    }
-    else if (di->weight < 0)
-    {
-      progress_cls->num_pending++;
-      GNUNET_assert (GNUNET_OK ==
-                     GNUNET_SET_add_element (output_set->h,
-                                 di->element,
-                                 apply_diff_progress,
-                                 progress_cls));
-    }
-  }
 
   GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
 
@@ -1397,17 +1870,18 @@ run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task)
   {
     // call closure right now, no pending ops
     GNUNET_free (progress_cls);
-    apply_diff_done (task);
+    finish_task (task);
   }
 }
 
 
 static void
-run_task_finish (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_finish (struct TaskEntry *task)
 {
   struct SetEntry *final_set;
+  struct ConsensusSession *session = task->step->session;
 
-  final_set = lookup_set (session, &task->input_set);
+  final_set = lookup_set (session, &task->cls.finish.input_set);
 
   GNUNET_assert (NULL != final_set);
 
@@ -1418,37 +1892,17 @@ run_task_finish (struct ConsensusSession *session, struct TaskEntry *task)
 }
 
 static void
-run_task (struct ConsensusSession *session, struct TaskEntry *task)
+start_task (struct ConsensusSession *session, struct TaskEntry *task)
 {
-  GNUNET_assert (GNUNET_NO == task->is_running);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
+
+  GNUNET_assert (GNUNET_NO == task->is_started);
   GNUNET_assert (GNUNET_NO == task->is_finished);
+  GNUNET_assert (NULL != task->start);
 
-  
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
+  task->start (task);
 
-  switch (task->action)
-  {
-    case ACTION_RECONCILE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE task\n", session->local_peer_idx);
-      run_task_remote_union (session, task);
-      break;
-    case ACTION_EVAL_RFN:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN task\n", session->local_peer_idx);
-      run_task_eval_rfn (session, task);
-      break;
-    case ACTION_APPLY_DIFF:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF task\n", session->local_peer_idx);
-      run_task_apply_diff (session, task);
-      break;
-    case ACTION_FINISH:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH task\n", session->local_peer_idx);
-      run_task_finish (session, task);
-      break;
-    default:
-      /* not reached */
-      GNUNET_assert (0);
-  }
-  task->is_running = GNUNET_YES;
+  task->is_started = GNUNET_YES;
 }
 
 
@@ -1507,15 +1961,15 @@ run_ready_steps (struct ConsensusSession *session)
       GNUNET_assert (0 == step->finished_tasks);
 
 #ifdef GNUNET_EXTRA_LOGGING
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d:%d with %d tasks and %d subordinates\n",
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
                   session->local_peer_idx,
                   step->debug_name,
-                  step->start_round, step->num_rounds, step->tasks_len, step->subordinates_len);
+                  step->round, step->tasks_len, step->subordinates_len);
 #endif
 
       step->is_running = GNUNET_YES;
       for (i = 0; i < step->tasks_len; i++)
-        run_task (session, step->tasks[i]);
+        start_task (session, step->tasks[i]);
 
       /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
@@ -1689,7 +2143,6 @@ set_listen_cb (void *cls,
   struct TaskKey tk;
   struct TaskEntry *task;
   struct GNUNET_CONSENSUS_RoundContextMessage *cm;
-  GNUNET_SET_ResultIterator my_result_cb;
 
   if (NULL == context_msg)
   {
@@ -1730,12 +2183,6 @@ set_listen_cb (void *cls,
     return;
   }
 
-  if (ACTION_RECONCILE != task->action)
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-
   if (GNUNET_YES == task->is_finished)
   {
     GNUNET_break_op (0);
@@ -1749,20 +2196,18 @@ set_listen_cb (void *cls,
     return;
   }
 
-  if (task->key.peer1 == task->key.peer2)
-    my_result_cb = set_result_cb_loop;
-  else
-    my_result_cb = set_result_cb;
+  GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
+                    (task->key.peer2 == session->local_peer_idx)));
 
-  task->op = GNUNET_SET_accept (request,
-                                GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon */
-                                my_result_cb,
-                                task);
+  task->cls.setop.op = GNUNET_SET_accept (request,
+                                          GNUNET_SET_RESULT_SYMMETRIC,
+                                          set_result_cb,
+                                          task);
   
   /* If the task hasn't been started yet, 
      we wait for that until we commit. */
 
-  if (GNUNET_YES == task->is_running)
+  if (GNUNET_YES == task->is_started)
   {
     commit_set (session, task);
   }
@@ -1868,8 +2313,7 @@ step_depend_on (struct Step *step, struct Step *dep)
   GNUNET_assert (step != dep);
   GNUNET_assert (NULL != step);
   GNUNET_assert (NULL != dep);
-  // XXX: make rounds work
-  //GNUNET_assert (dep->start_round <= step->start_round);
+  GNUNET_assert (dep->round <= step->round);
 
 #ifdef GNUNET_EXTRA_LOGGING
   /* Make sure we have complete debugging information.
@@ -1901,13 +2345,12 @@ step_depend_on (struct Step *step, struct Step *dep)
 
 
 static struct Step *
-create_step (struct ConsensusSession *session, int start_round, int num_rounds)
+create_step (struct ConsensusSession *session, int round)
 {
   struct Step *step;
   step = GNUNET_new (struct Step);
   step->session = session;
-  step->start_round = start_round;
-  step->num_rounds = num_rounds;
+  step->round = round;
   GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
                                     session->steps_tail,
                                     step);
@@ -1927,8 +2370,6 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
                                 struct Step *step_after)
 {
   uint16_t n = session->num_peers;
-  uint16_t t = n / 3;
-
   uint16_t me = session->local_peer_idx;
 
   uint16_t p1;
@@ -1944,19 +2385,17 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
 
   unsigned int k;
 
-  round = step_before->start_round + step_before->num_rounds;
+  round = step_before->round + 1;
 
   /* gcast step 1: leader disseminates */
 
-  step = create_step (session, round, 1);
+  step = create_step (session, round);
 
 #ifdef GNUNET_EXTRA_LOGGING
   GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
 #endif
   step_depend_on (step, step_before);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: Considering leader %d\n", session->local_peer_idx, lead);
-
   if (lead == me)
   {
     for (k = 0; k < n; k++)
@@ -1966,14 +2405,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
       p1 = me;
       p2 = k;
       arrange_peers (&p1, &p2, n);
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
       task = ((struct TaskEntry) {
         .step = step,
-        .action = ACTION_RECONCILE,
+        .start = task_start_reconcile,
+        .cancel = task_cancel_reconcile,
         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
-        .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
-        .output_set = (struct SetKey) { SET_KIND_NONE },
       });
+      task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
       put_task (session->taskmap, &task);
     }
     /* We run this task to make sure that the leader
@@ -1982,12 +2420,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
        without the code having to handle any special cases. */
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
-      .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me },
-      .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
+    task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
+    task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
     put_task (session->taskmap, &task);
   }
   else
@@ -1995,21 +2434,22 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
     p1 = me;
     p2 = lead;
     arrange_peers (&p1, &p2, n);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", session->local_peer_idx, p1, p2, rep, lead);
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead},
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
-      .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
-      .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
+    task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
+    task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
     put_task (session->taskmap, &task);
   }
 
   /* gcast phase 2: echo */
   prev_step = step;
-  step = create_step (session, round, 1);
+  round += 1;
+  step = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
   GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
 #endif
@@ -2022,16 +2462,18 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
     arrange_peers (&p1, &p2, n);
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
-      .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
-      .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
+    task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
     put_task (session->taskmap, &task);
   }
 
   prev_step = step;
-  step = create_step (session, round, 1);
+  /* Same round, since step only has local tasks */
+  step = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
   GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
 #endif
@@ -2041,16 +2483,13 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
   task = ((struct TaskEntry) {
     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
     .step = step,
-    .action = ACTION_EVAL_RFN,
-    .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
-    .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
-    .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
-    .threshold = n - t,
+    .start = task_start_eval_echo
   });
   put_task (session->taskmap, &task);
 
   prev_step = step;
-  step = create_step (session, round, 1);
+  round += 1;
+  step = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
   GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
 #endif
@@ -2064,29 +2503,32 @@ construct_task_graph_gradecast (struct ConsensusSession *session,
     arrange_peers (&p1, &p2, n);
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
-      .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
-      .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead },
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
+    task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
+    /* If there was at least one element in the echo round that was
+       contested (i.e. it had no n-t majority), then we let the other peers
+       know, and other peers let us know.  The contested flag for each peer is
+       stored in the rfn. */
+    task.cls.setop.transceive_contested = GNUNET_YES;
     put_task (session->taskmap, &task);
   }
 
   prev_step = step;
-  step = create_step (session, round, 1);
+  /* Same round, since step only has local tasks */
+  step = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
   GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
 #endif
   step_depend_on (step, prev_step);
 
-  // evaluate ConfirmationReferendum and
-  // apply it to the LeaderReferendum
   task = ((struct TaskEntry) {
     .step = step,
     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
-    .action = ACTION_EVAL_RFN,
-    .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
-    .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep },
+    .start = task_start_grade,
   });
   put_task (session->taskmap, &task);
 
@@ -2128,7 +2570,7 @@ construct_task_graph (struct ConsensusSession *session)
 
   /* all-to-all step */
 
-  step = create_step (session, round, 1);
+  step = create_step (session, round);
 
 #ifdef GNUNET_EXTRA_LOGGING
   step->debug_name = GNUNET_strdup ("all to all");
@@ -2142,18 +2584,20 @@ construct_task_graph (struct ConsensusSession *session)
     task = ((struct TaskEntry) {
       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
       .step = step,
-      .action = ACTION_RECONCILE,
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
-      .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
+    task.cls.setop.output_set = task.cls.setop.input_set;
+    task.cls.setop.do_not_remove = GNUNET_YES;
     put_task (session->taskmap, &task);
   }
 
-  round++;
-
   prev_step = step;
   step = NULL;
 
+  round += 1;
+
   /* Byzantine union */
 
   /* sequential repetitions of the gradecasts */
@@ -2162,33 +2606,29 @@ construct_task_graph (struct ConsensusSession *session)
     struct Step *step_rep_start;
     struct Step *step_rep_end;
 
-    step_rep_start = create_step (session, round, 1);
+    /* Every repetition is in a separate round. */
+    step_rep_start = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
-      GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
+    GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
 #endif
 
     step_depend_on (step_rep_start, prev_step);
 
-    step_rep_end = create_step (session, round, 1);
+    /* gradecast has three rounds */
+    round += 3;
+    step_rep_end = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
-      GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
+    GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
 #endif
 
     /* parallel gradecasts */
     for (lead = 0; lead < n; lead++)
       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
 
-    // TODO: add peers to ignore list,
-    //
-    // evaluate ConfirmationReferendum and
-    // apply it to the LeaderReferendum
     task = ((struct TaskEntry) {
       .step = step_rep_end,
-      .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i, -1},
-      .action = ACTION_APPLY_DIFF,
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, i },
-      .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i },
-      .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 },
+      .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
+      .start = task_start_apply_round,
     });
     put_task (session->taskmap, &task);
 
@@ -2197,7 +2637,8 @@ construct_task_graph (struct ConsensusSession *session)
 
  /* There is no next gradecast round, thus the final
     start step is the overall end step of the gradecasts */
-  step = create_step (session, round, 1);
+  round += 1;
+  step = create_step (session, round);
 #ifdef GNUNET_EXTRA_LOGGING
   GNUNET_asprintf (&step->debug_name, "finish");
 #endif
@@ -2206,9 +2647,9 @@ construct_task_graph (struct ConsensusSession *session)
   task = ((struct TaskEntry) {
     .step = step,
     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
-    .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 },
-    .action = ACTION_FINISH,
+    .start = task_start_finish,
   });
+  task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
 
   put_task (session->taskmap, &task);
 }
@@ -2276,7 +2717,7 @@ initialize_session (struct ConsensusSession *session,
     put_set (session, client_set);
   }
 
-  session->peers_ignored = GNUNET_new_array (session->num_peers, int);
+  session->peers_blacklisted = GNUNET_new_array (session->num_peers, int);
 
   /* Just construct the task graph,
      but don't run anything until the client calls conclude. */
@@ -2399,10 +2840,21 @@ client_insert (void *cls,
   }
   session->num_client_insert_pending++;
   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
+
+#ifdef GNUNET_EXTRA_LOGGING
+  {
+    struct GNUNET_HashCode hash;
+
+    GNUNET_SET_element_hash (element, &hash);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
+                session->local_peer_idx,
+                GNUNET_h2s (&hash));
+  }
+#endif
+
   GNUNET_free (element);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", session->local_peer_idx);
 }