- fix use of uninitialized memory
[oweals/gnunet.git] / src / consensus / gnunet-service-consensus.c
index dd619c6e49b0510233aa32c6803eb911fed4a4b8..6cd0244b5679e29054cab610df07d3d099631d9a 100644 (file)
 #include "consensus.h"
 
 
+#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
+
+
+enum ReferendumVote
+{
+  /**
+   * Vote that nothing should change.
+   * This option is never voted explicitly.
+   */
+  VOTE_STAY = 0,
+  /**
+   * Vote that an element should be added.
+   */
+  VOTE_ADD = 1,
+  /**
+   * Vote that an element should be removed.
+   */
+  VOTE_REMOVE = 2,
+};
+
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
+
+struct ContestedPayload
+{
+};
+
 /**
  * Tuple of integers that together
  * identify a task uniquely.
@@ -72,23 +97,6 @@ struct TaskKey {
 };
 
 
-enum ReferendumVote
-{
-  /**
-   * Vote that nothing should change.
-   * This option is never voted explicitly.
-   */
-  VOTE_STAY = 0,
-  /**
-   * Vote that an element should be added.
-   */
-  VOTE_ADD = 1,
-  /**
-   * Vote that an element should be removed.
-   */
-  VOTE_REMOVE = 2,
-};
-
 
 struct SetKey
 {
@@ -136,7 +144,6 @@ enum PhaseKind
   PHASE_KIND_GRADECAST_ECHO_GRADE,
   PHASE_KIND_GRADECAST_CONFIRM,
   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
-  PHASE_KIND_GRADECAST_APPLY_RESULT,
   /**
    * Apply a repetition of the all-to-all
    * gradecast to the current set.
@@ -483,9 +490,6 @@ static struct GNUNET_PeerIdentity my_peer;
 static void
 finish_task (struct TaskEntry *task);
 
-static void
-task_start_reconcile (struct TaskEntry *task);
-
 static void
 run_ready_steps (struct ConsensusSession *session);
 
@@ -501,7 +505,6 @@ phasename (uint16_t phase)
     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
-    case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
     default: return "(unknown)";
   }
@@ -771,25 +774,55 @@ diff_insert (struct DiffEntry *diff,
 }
 
 
+static void
+rfn_commit (struct ReferendumEntry *rfn,
+            uint16_t commit_peer)
+{
+  GNUNET_assert (commit_peer < rfn->num_peers);
+
+  rfn->peer_commited[commit_peer] = GNUNET_YES;
+}
+
+
+static void
+rfn_contest (struct ReferendumEntry *rfn,
+             uint16_t contested_peer)
+{
+  GNUNET_assert (contested_peer < rfn->num_peers);
+
+  rfn->peer_contested[contested_peer] = GNUNET_YES;
+}
+
+
+static uint16_t
+rfn_noncontested (struct ReferendumEntry *rfn)
+{
+  uint16_t i;
+  uint16_t ret;
+
+  ret = 0;
+  for (i = 0; i < rfn->num_peers; i++)
+    if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
+      ret++;
+
+  return ret;
+}
+
 static void
 rfn_vote (struct ReferendumEntry *rfn,
           uint16_t voting_peer,
-          uint16_t num_peers,
           enum ReferendumVote vote,
           const struct GNUNET_SET_Element *element)
 {
   struct RfnElementInfo *ri;
   struct GNUNET_HashCode hash;
 
-  GNUNET_assert (voting_peer < num_peers);
+  GNUNET_assert (voting_peer < rfn->num_peers);
 
   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
      since VOTE_KEEP is implicit in not voting. */
   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
 
-  // XXX: should happen in another place!
-  rfn->peer_commited[voting_peer] = GNUNET_YES;
-
   GNUNET_SET_element_hash (element, &hash);
   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
 
@@ -797,7 +830,7 @@ rfn_vote (struct ReferendumEntry *rfn,
   {
     ri = GNUNET_new (struct RfnElementInfo);
     ri->element = GNUNET_SET_element_dup (element);
-    ri->votes = GNUNET_new_array (num_peers, int);
+    ri->votes = GNUNET_new_array (rfn->num_peers, int);
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
                                                       &hash, ri,
@@ -861,15 +894,7 @@ set_result_cb (void *cls,
     return;
   }
 
-  if (task->key.peer1 == session->local_peer_idx)
-    other_idx = task->key.peer2;
-  else if (task->key.peer2 == session->local_peer_idx)
-    other_idx = task->key.peer1;
-  else
-  {
-    /* error in task graph construction */
-    GNUNET_assert (0);
-  }
+  other_idx = task_other_peer (task);
 
   if (SET_KIND_NONE != setop->output_set.set_kind)
   {
@@ -891,10 +916,22 @@ set_result_cb (void *cls,
 
   if (GNUNET_YES == session->peers_blacklisted[other_idx])
   {
-    /* We should have never started or commited to an operation
-       with a blacklisted peer. */
-    GNUNET_break (0);
-    return;
+    /* Peer might have been blacklisted
+       by a gradecast running in parallel, ignore elements from now */
+    if (GNUNET_SET_STATUS_ADD_LOCAL == status)
+      return;
+    if (GNUNET_SET_STATUS_ADD_REMOTE == status)
+      return;
+  }
+
+  if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
+  {
+    if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
+    {
+      GNUNET_assert (NULL != output_rfn);
+      rfn_contest (output_rfn, task_other_peer (task));
+      return;
+    }
   }
 
   switch (status)
@@ -933,7 +970,7 @@ set_result_cb (void *cls,
       }
       if (NULL != output_rfn)
       {
-        rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_ADD, element);
+        rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
 #ifdef GNUNET_EXTRA_LOGGING
         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
@@ -948,6 +985,8 @@ set_result_cb (void *cls,
     case GNUNET_SET_STATUS_ADD_REMOTE:
       if (GNUNET_YES == setop->do_not_remove)
         break;
+      if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
+        break;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Removing element in Task {%s}\n",
                   debug_str_task_key (&task->key));
@@ -981,7 +1020,7 @@ set_result_cb (void *cls,
       }
       if (NULL != output_rfn)
       {
-        rfn_vote (output_rfn, task_other_peer (task), session->num_peers, VOTE_REMOVE, element);
+        rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
 #ifdef GNUNET_EXTRA_LOGGING
         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
@@ -995,15 +1034,19 @@ set_result_cb (void *cls,
     case GNUNET_SET_STATUS_DONE:
       // XXX: check first if any changes to the underlying
       // set are still pending
-      // XXX: commit other peer in referendum
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Finishing setop in Task {%s}\n",
                   debug_str_task_key (&task->key));
+      if (NULL != output_rfn)
+      {
+        rfn_commit (output_rfn, task_other_peer (task));
+      }
       finish_task (task);
       break;
     case GNUNET_SET_STATUS_FAILURE:
       // XXX: cleanup
-      GNUNET_break (0);
+      GNUNET_break_op (0);
+      finish_task (task);
       return;
     default:
       /* not reached */
@@ -1159,7 +1202,26 @@ commit_set (struct ConsensusSession *session,
     }
   }
 #else
-  GNUNET_SET_commit (setop->op, set->h);
+  if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
+  {
+    struct GNUNET_SET_Element element;
+    struct ContestedPayload payload;
+    element.data = &payload;
+    element.size = sizeof (struct ContestedPayload);
+    element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
+    GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+  }
+  if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
+  {
+    GNUNET_SET_commit (setop->op, set->h);
+  }
+  else
+  {
+    /* For our testcases, we don't want the blacklisted
+       peers to wait. */
+    GNUNET_SET_operation_cancel (setop->op);
+    setop->op = NULL;
+  }
 #endif
 }
 
@@ -1211,25 +1273,6 @@ put_rfn (struct ConsensusSession *session,
 
 
 
-static void
-output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
-{
-  struct TaskEntry *task = (struct TaskEntry *) cls;
-  struct SetOpCls *setop = &task->cls.setop;
-  struct ConsensusSession *session = task->step->session;
-  struct SetEntry *set = GNUNET_new (struct SetEntry);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "P%u: Received lazy copy, storing output set %s\n",
-              session->local_peer_idx, debug_str_set_key (&setop->output_set));
-
-  set->key = setop->output_set;
-  set->h = copy;
-  put_set (task->step->session, set);
-  task_start_reconcile (task);
-}
-
-
 static void
 task_cancel_reconcile (struct TaskEntry *task)
 {
@@ -1249,15 +1292,18 @@ apply_diff_to_rfn (struct DiffEntry *diff,
 
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
 
-  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
+  while (GNUNET_YES ==
+         GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+                                                      NULL,
+                                                      (const void **) &di))
   {
     if (di->weight > 0)
     {
-      rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element);
+      rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
     }
     if (di->weight < 0)
     {
-      rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element);
+      rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
     }
   }
 
@@ -1312,6 +1358,7 @@ rfn_create (uint16_t size)
   rfn = GNUNET_new (struct ReferendumEntry);
   rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
   rfn->peer_commited = GNUNET_new_array (size, int);
+  rfn->peer_contested = GNUNET_new_array (size, int);
   rfn->num_peers = size;
 
   return rfn;
@@ -1401,6 +1448,12 @@ create_set_copy_for_task (struct TaskEntry *task,
   struct SetEntry *src_set;
   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Copying set {%s} to {%s} for task {%s}\n",
+              debug_str_set_key (src_set_key),
+              debug_str_set_key (dst_set_key),
+              debug_str_task_key (&task->key));
+
   scc->task = task;
   scc->dst_set_key = *dst_set_key;
   src_set = lookup_set (task->step->session, src_set_key);
@@ -1410,6 +1463,34 @@ create_set_copy_for_task (struct TaskEntry *task,
                         scc);
 }
 
+
+struct SetMutationProgressCls
+{
+  int num_pending;
+  /**
+   * Task to finish once all changes are through.
+   */
+  struct TaskEntry *task;
+};
+
+
+static void
+set_mutation_done (void *cls)
+{
+  struct SetMutationProgressCls *pc = cls;
+
+  GNUNET_assert (pc->num_pending > 0);
+
+  pc->num_pending--;
+
+  if (0 == pc->num_pending)
+  {
+    struct TaskEntry *task = pc->task;
+    GNUNET_free (pc);
+    finish_task (task);
+  }
+}
+
 static void
 task_start_apply_round (struct TaskEntry *task)
 {
@@ -1421,6 +1502,7 @@ task_start_apply_round (struct TaskEntry *task)
   struct ReferendumEntry *rfn_in;
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct RfnElementInfo *ri;
+  struct SetMutationProgressCls *progress_cls;
 
   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
@@ -1436,6 +1518,9 @@ task_start_apply_round (struct TaskEntry *task)
   rfn_in = lookup_rfn (session, &rk_in);
   GNUNET_assert (NULL != rfn_in);
 
+  progress_cls = GNUNET_new (struct SetMutationProgressCls);
+  progress_cls->task = task;
+
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
 
   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
@@ -1448,10 +1533,20 @@ task_start_apply_round (struct TaskEntry *task)
     switch (majority_vote)
     {
       case VOTE_ADD:
-        // XXX: add to set
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_add_element (set_out->h,
+                                               ri->element,
+                                               set_mutation_done,
+                                               progress_cls));
         break;
       case VOTE_REMOVE:
-        // XXX: remove from set
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_remove_element (set_out->h,
+                                                  ri->element,
+                                                  set_mutation_done,
+                                                  progress_cls));
         break;
       case VOTE_STAY:
         // do nothing
@@ -1461,9 +1556,19 @@ task_start_apply_round (struct TaskEntry *task)
         break;
     }
   }
+
+  if (progress_cls->num_pending == 0)
+  {
+    // call closure right now, no pending ops
+    GNUNET_free (progress_cls);
+    finish_task (task);
+  }
 }
 
 
+#define THRESH(s) (((s)->num_peers / 3))
+
+
 static void
 task_start_grade (struct TaskEntry *task)
 {
@@ -1475,12 +1580,14 @@ task_start_grade (struct TaskEntry *task)
   struct DiffKey diff_key;
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct RfnElementInfo *ri;
+  unsigned int gradecast_confidence = 2;
 
   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
   output_rfn = lookup_rfn (session, &rfn_key);
   if (NULL == output_rfn)
   {
     output_rfn = rfn_create (session->num_peers);
+    output_rfn->key = rfn_key;
     put_rfn (session, output_rfn);
   }
 
@@ -1492,26 +1599,63 @@ task_start_grade (struct TaskEntry *task)
   input_rfn = lookup_rfn (session, &rfn_key);
   GNUNET_assert (NULL != input_rfn);
 
-  apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
-
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
 
+  apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
+
   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
   {
     uint16_t majority_num;
     enum ReferendumVote majority_vote;
 
+    // XXX: we need contested votes and non-contested votes here
     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
 
-    
+    if (majority_num < (session->num_peers / 3) * 2)
+    {
+      gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
+    }
+    if (majority_num < (session->num_peers / 3) + 1)
+    {
+      gradecast_confidence = 0;
+    }
 
     switch (majority_vote)
     {
+      case VOTE_STAY:
+        break;
+      case VOTE_ADD:
+        rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
+        break;
+      case VOTE_REMOVE:
+        rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
+        break;
       default:
         GNUNET_assert (0);
         break;
     }
   }
+
+  {
+    uint16_t noncontested;
+    noncontested = rfn_noncontested (input_rfn);
+    if (noncontested < (session->num_peers / 3) * 2)
+    {
+      gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
+    }
+    if (noncontested < (session->num_peers / 3) + 1)
+    {
+      gradecast_confidence = 0;
+    }
+  }
+
+  if (gradecast_confidence >= 1)
+    rfn_commit (output_rfn, task->key.leader);
+
+  if (gradecast_confidence <= 1)
+    session->peers_blacklisted[task->key.leader] = GNUNET_YES;
+
+  finish_task (task);
 }
 
 
@@ -1537,12 +1681,7 @@ task_start_reconcile (struct TaskEntry *task)
        we clone the input set. */
     if (NULL == lookup_set (session, &setop->output_set))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Output set missing, copying from input set\n");
-      /* Since the cloning is asynchronous,
-         we'll retry the current function once the copy
-         has been provided by the SET service. */
-      GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task);
+      create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
       return;
     }
   }
@@ -1640,37 +1779,6 @@ task_start_reconcile (struct TaskEntry *task)
 }
 
 
-
-
-struct SetMutationProgressCls
-{
-  int num_pending;
-  /**
-   * Task to finish once all changes are through.
-   */
-  struct TaskEntry *task;
-};
-
-
-static void
-set_mutation_done (void *cls)
-{
-  struct SetMutationProgressCls *pc = cls;
-
-  GNUNET_assert (pc->num_pending > 0);
-
-  pc->num_pending--;
-
-  if (0 == pc->num_pending)
-  {
-    struct TaskEntry *task = pc->task;
-    GNUNET_free (pc);
-    finish_task (task);
-  }
-}
-
-
-
 static void
 task_start_eval_echo (struct TaskEntry *task)
 {
@@ -1708,7 +1816,10 @@ task_start_eval_echo (struct TaskEntry *task)
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (input_rfn->rfn_elements);
   GNUNET_assert (NULL != iter);
 
-  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ri))
+  while (GNUNET_YES ==
+         GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+                                                      NULL,
+                                                      (const void **) &ri))
   {
     enum ReferendumVote majority_vote;
     uint16_t majority_num;
@@ -1717,7 +1828,12 @@ task_start_eval_echo (struct TaskEntry *task)
 
     if (majority_num < session->num_peers / 3)
     {
-      majority_vote = VOTE_REMOVE;
+      /* It is not the case that all nonfaulty peers
+         echoed the same value.  Since we're doing a set reconciliation, we
+         can't simply send "nothing" for the value.  Thus we mark our 'confirm'
+         reconciliation as contested.  Other peers might not know that the
+         leader is faulty, thus we still re-distribute in the confirmation
+         round. */
       output_set->is_contested = GNUNET_YES;
     }
 
@@ -2509,8 +2625,6 @@ construct_task_graph (struct ConsensusSession *session)
     for (lead = 0; lead < n; lead++)
       construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
 
-    // TODO: add peers to blacklisted list, either here or
-    // already in the gradecast.
     task = ((struct TaskEntry) {
       .step = step_rep_end,
       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},