};
+enum EarlyStoppingPhase
+{
+ EARLY_STOPPING_NONE = 0,
+ EARLY_STOPPING_ONE_MORE = 1,
+ EARLY_STOPPING_DONE = 2,
+};
+
+
GNUNET_NETWORK_STRUCT_BEGIN
{
SET_KIND_NONE = 0,
SET_KIND_CURRENT,
+ /**
+ * Last result set from a gradecast
+ */
+ SET_KIND_LAST_GRADECAST,
SET_KIND_LEADER_PROPOSAL,
SET_KIND_ECHO_RESULT,
};
struct ConsensusSession *session;
+ /**
+ * Tasks that this step is composed of.
+ */
struct TaskEntry **tasks;
unsigned int tasks_len;
unsigned int tasks_cap;
* the task, used for debugging.
*/
char *debug_name;
+
+ /**
+ * When we're doing an early finish, how should this step be
+ * treated?
+ * If GNUNET_YES, the step will be marked as finished
+ * without actually running its tasks.
+ * Otherwise, the step will still be run even after
+ * an early finish.
+ *
+ * Note that a task may never be finished early if
+ * it is already running.
+ */
+ int early_finishable;
};
* Uses the session's global id as app id.
*/
struct GNUNET_SET_ListenHandle *set_listener;
+
+ /**
+ * State of our early stopping scheme.
+ */
+ int early_stopping;
};
/**
debug_str_set_key (&set->key));
GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
- GNUNET_assert (GNUNET_OK ==
+ GNUNET_assert (GNUNET_SYSERR !=
GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE));
}
}
+/**
+ * For a given majority, count what the outcome
+ * is (add/remove/keep), and give the number
+ * of peers that voted for this outcome.
+ */
static void
rfn_majority (const struct ReferendumEntry *rfn,
const struct RfnElementInfo *ri,
}
}
+
+static void
+try_finish_step_early (struct Step *step)
+{
+ unsigned int i;
+
+ if (GNUNET_YES == step->is_running)
+ return;
+ if (GNUNET_YES == step->is_finished)
+ return;
+ if (GNUNET_NO == step->early_finishable)
+ return;
+
+ step->is_finished = GNUNET_YES;
+
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Finishing step `%s' early.\n",
+ step->debug_name);
+#endif
+
+ for (i = 0; i < step->subordinates_len; i++)
+ {
+ GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
+ step->subordinates[i]->pending_prereq--;
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Decreased pending_prereq to %u for step `%s'.\n",
+ step->subordinates[i]->pending_prereq,
+ step->subordinates[i]->debug_name);
+
+#endif
+ try_finish_step_early (step->subordinates[i]);
+ }
+
+ // XXX: maybe schedule as task to avoid recursion?
+ run_ready_steps (step->session);
+}
+
+
+static void
+finish_step (struct Step *step)
+{
+ unsigned int i;
+
+ GNUNET_assert (step->finished_tasks == step->tasks_len);
+ GNUNET_assert (GNUNET_YES == step->is_running);
+ GNUNET_assert (GNUNET_NO == step->is_finished);
+
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "All tasks of step `%s' with %u subordinates finished.\n",
+ step->debug_name,
+ step->subordinates_len);
+#endif
+
+ for (i = 0; i < step->subordinates_len; i++)
+ {
+ GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
+ step->subordinates[i]->pending_prereq--;
+#ifdef GNUNET_EXTRA_LOGGING
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Decreased pending_prereq to %u for step `%s'.\n",
+ step->subordinates[i]->pending_prereq,
+ step->subordinates[i]->debug_name);
+
+#endif
+ }
+
+ step->is_finished = GNUNET_YES;
+
+ // XXX: maybe schedule as task to avoid recursion?
+ run_ready_steps (step->session);
+}
+
+
+
+/**
+ * Apply the result from one round of gradecasts (i.e. every peer
+ * should have gradecasted) to the peer's current set.
+ *
+ * @param task the task with context information
+ */
static void
task_start_apply_round (struct TaskEntry *task)
{
struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
struct RfnElementInfo *ri;
struct SetMutationProgressCls *progress_cls;
+ uint16_t worst_majority = UINT16_MAX;
sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
+ if (worst_majority > majority_num)
+ worst_majority = majority_num;
+
switch (majority_vote)
{
case VOTE_ADD:
ri->element,
set_mutation_done,
progress_cls));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: apply round: adding element %s with %u-majority.\n",
+ session->local_peer_idx,
+ debug_str_element (ri->element), majority_num);
break;
case VOTE_REMOVE:
progress_cls->num_pending++;
ri->element,
set_mutation_done,
progress_cls));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: apply round: deleting element %s with %u-majority.\n",
+ session->local_peer_idx,
+ debug_str_element (ri->element), majority_num);
break;
case VOTE_STAY:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: apply round: keeping element %s with %u-majority.\n",
+ session->local_peer_idx,
+ debug_str_element (ri->element), majority_num);
// do nothing
break;
default:
GNUNET_free (progress_cls);
finish_task (task);
}
-}
+ {
+ uint16_t thresh = (session->num_peers / 3) * 2;
-#define THRESH(s) (((s)->num_peers / 3))
+ if (worst_majority >= thresh)
+ {
+ switch (session->early_stopping)
+ {
+ case EARLY_STOPPING_NONE:
+ session->early_stopping = EARLY_STOPPING_ONE_MORE;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: Stopping early (after one more superround)\n",
+ session->local_peer_idx);
+ break;
+ case EARLY_STOPPING_ONE_MORE:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
+ session->local_peer_idx);
+ session->early_stopping = EARLY_STOPPING_DONE;
+ {
+ struct Step *step;
+ for (step = session->steps_head; NULL != step; step = step->next)
+ try_finish_step_early (step);
+ }
+ break;
+ case EARLY_STOPPING_DONE:
+ /* We shouldn't be here anymore after early stopping */
+ GNUNET_break (0);
+ break;
+ default:
+ GNUNET_assert (0);
+ break;
+ }
+ }
+ else if (EARLY_STOPPING_NONE != session->early_stopping)
+ {
+ // Our assumption about the number of bad peers
+ // has been broken.
+ GNUNET_break_op (0);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
+ session->local_peer_idx);
+ }
+ }
+
+}
static void
return;
}
+
+ {
+ // FIXME: should be marked as a shallow copy, so
+ // we can destroy everything correctly
+ struct SetEntry *last_set = GNUNET_new (struct SetEntry);
+ last_set->h = output_set->h;
+ last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
+ put_set (session, last_set);
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Evaluating referendum in Task {%s}\n",
debug_str_task_key (&task->key));
}
-static void finish_step (struct Step *step)
-{
- unsigned int i;
-
- GNUNET_assert (step->finished_tasks == step->tasks_len);
- GNUNET_assert (GNUNET_YES == step->is_running);
- GNUNET_assert (GNUNET_NO == step->is_finished);
-
-#ifdef GNUNET_EXTRA_LOGGING
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "All tasks of step `%s' with %u subordinates finished.\n",
- step->debug_name,
- step->subordinates_len);
-#endif
-
- for (i = 0; i < step->subordinates_len; i++)
- {
- GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
- step->subordinates[i]->pending_prereq--;
-#ifdef GNUNET_EXTRA_LOGGING
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Decreased pending_prereq to %u for step `%s'.\n",
- step->subordinates[i]->pending_prereq,
- step->subordinates[i]->debug_name);
-
-#endif
- }
-
- step->is_finished = GNUNET_YES;
-
- // XXX: maybe schedule as task to avoid recursion?
- run_ready_steps (step->session);
-}
/*
while (NULL != step)
{
- if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) )
+ if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
{
size_t i;
static struct Step *
-create_step (struct ConsensusSession *session, int round)
+create_step (struct ConsensusSession *session, int round, int early_finishable)
{
struct Step *step;
step = GNUNET_new (struct Step);
step->session = session;
step->round = round;
+ step->early_finishable = early_finishable;
GNUNET_CONTAINER_DLL_insert_tail (session->steps_head,
session->steps_tail,
step);
/* gcast step 1: leader disseminates */
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
/* gcast phase 2: echo */
prev_step = step;
round += 1;
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
#endif
prev_step = step;
/* Same round, since step only has local tasks */
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
#endif
prev_step = step;
round += 1;
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
#endif
prev_step = step;
/* Same round, since step only has local tasks */
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
#endif
/* all-to-all step */
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_NO);
#ifdef GNUNET_EXTRA_LOGGING
step->debug_name = GNUNET_strdup ("all to all");
struct Step *step_rep_end;
/* Every repetition is in a separate round. */
- step_rep_start = create_step (session, round);
+ step_rep_start = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
#endif
/* gradecast has three rounds */
round += 3;
- step_rep_end = create_step (session, round);
+ step_rep_end = create_step (session, round, GNUNET_YES);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
#endif
/* There is no next gradecast round, thus the final
start step is the overall end step of the gradecasts */
round += 1;
- step = create_step (session, round);
+ step = create_step (session, round, GNUNET_NO);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_asprintf (&step->debug_name, "finish");
#endif
.key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
.start = task_start_finish,
});
- task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
+ task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
put_task (session->taskmap, &task);
}