#include "platform.h"
#include "gnunet_util_lib.h"
+#include "gnunet_block_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_applications.h"
#include "gnunet_set_service.h"
#include "consensus_protocol.h"
#include "consensus.h"
-#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX + 1)
-
enum ReferendumVote
{
GNUNET_NETWORK_STRUCT_BEGIN
-
-struct ContestedPayload
-{
-};
-
/**
* Tuple of integers that together
* identify a task uniquely.
enum PhaseKind
{
PHASE_KIND_ALL_TO_ALL,
+ PHASE_KIND_ALL_TO_ALL_2,
PHASE_KIND_GRADECAST_LEADER,
PHASE_KIND_GRADECAST_ECHO,
PHASE_KIND_GRADECAST_ECHO_GRADE,
* State of our early stopping scheme.
*/
int early_stopping;
+
+ /**
+ * Our set size from the first round.
+ */
+ uint64_t first_size;
+
+ uint64_t *first_sizes_received;
+
+ /**
+ * Bounded Eppstein lower bound.
+ */
+ uint64_t lower_bound;
};
/**
switch (phase)
{
case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
+ case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
case PHASE_KIND_FINISH: return "FINISH";
case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
if (NULL != element)
{
struct GNUNET_CONSENSUS_ElementMessage *m;
+ const struct ConsensusElement *ce;
+
+ GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
+ ce = element->data;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker);
+
+ if (0 != ce->marker)
+ return GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"P%d: sending element %s to client\n",
session->local_peer_idx,
debug_str_element (element));
- ev = GNUNET_MQ_msg_extra (m, element->size,
+ ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
- m->element_type = htons (element->element_type);
- GNUNET_memcpy (&m[1], element->data, element->size);
+ m->element_type = ce->payload_type;
+ GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
GNUNET_MQ_send (session->client_mq, ev);
}
else
}
-uint16_t
+static uint16_t
task_other_peer (struct TaskEntry *task)
{
uint16_t me = task->step->session->local_peer_idx;
}
+static int
+cmp_uint64_t (const void *pa, const void *pb)
+{
+ uint64_t a = *(uint64_t *) pa;
+ uint64_t b = *(uint64_t *) pb;
+
+ if (a == b)
+ return 0;
+ if (a < b)
+ return -1;
+ return 1;
+}
+
+
/**
* Callback for set operation results. Called for each element
* in the result set.
*
* @param cls closure
* @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
+ * @param current_size current set size
* @param status see enum GNUNET_SET_Status
*/
static void
set_result_cb (void *cls,
const struct GNUNET_SET_Element *element,
+ uint64_t current_size,
enum GNUNET_SET_Status status)
{
struct TaskEntry *task = cls;
struct ReferendumEntry *output_rfn = NULL;
unsigned int other_idx;
struct SetOpCls *setop;
+ const struct ConsensusElement *consensus_element = NULL;
+
+ if (NULL != element)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "P%u: got element of type %u, status %u\n",
+ session->local_peer_idx,
+ (unsigned) element->element_type,
+ (unsigned) status);
+ GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type);
+ consensus_element = element->data;
+ }
setop = &task->cls.setop;
return;
}
- if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) )
+ if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
{
- if ( (GNUNET_YES == setop->transceive_contested) && (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: got some marker\n",
+ session->local_peer_idx);
+ if ( (GNUNET_YES == setop->transceive_contested) &&
+ (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
{
GNUNET_assert (NULL != output_rfn);
rfn_contest (output_rfn, task_other_peer (task));
return;
}
+
+ if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
+ {
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: got size marker\n",
+ session->local_peer_idx);
+
+
+ struct ConsensusSizeElement *cse = (void *) consensus_element;
+
+ if (cse->sender_index == other_idx)
+ {
+ if (NULL == session->first_sizes_received)
+ session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
+ session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
+
+ uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
+ qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
+ session->lower_bound = copy[session->num_peers / 3 + 1];
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "P%u: lower bound %llu\n",
+ session->local_peer_idx,
+ (long long) session->lower_bound);
+ }
+ return;
+ }
+
+ return;
}
switch (status)
{
case GNUNET_SET_STATUS_ADD_LOCAL:
+ GNUNET_assert (NULL != consensus_element);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding element in Task {%s}\n",
debug_str_task_key (&task->key));
// XXX: add result to structures in task
break;
case GNUNET_SET_STATUS_ADD_REMOTE:
+ GNUNET_assert (NULL != consensus_element);
if (GNUNET_YES == setop->do_not_remove)
break;
- if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
+ if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
break;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing element in Task {%s}\n",
{
rfn_commit (output_rfn, task_other_peer (task));
}
+ if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
+ {
+ session->first_size = current_size;
+ }
finish_task (task);
break;
case GNUNET_SET_STATUS_FAILURE:
EVILNESS_CRAM_LEAD,
EVILNESS_CRAM_ECHO,
EVILNESS_SLACK,
+ EVILNESS_SLACK_A2A,
};
enum EvilnessSubType
{
evil->type = EVILNESS_SLACK;
}
+ if (0 == strcmp ("slack-a2a", evil_type_str))
+ {
+ evil->type = EVILNESS_SLACK_A2A;
+ }
else if (0 == strcmp ("cram-all", evil_type_str))
{
evil->type = EVILNESS_CRAM_ALL;
set = lookup_set (session, &setop->input_set);
GNUNET_assert (NULL != set);
+ if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
+ {
+ struct GNUNET_SET_Element element;
+ struct ConsensusElement ce = { 0 };
+ ce.marker = CONSENSUS_MARKER_CONTESTED;
+ element.data = &ce;
+ element.size = sizeof (struct ConsensusElement);
+ element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
+ GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+ }
+
+ if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
+ {
+ struct GNUNET_SET_Element element;
+ struct ConsensusSizeElement cse = { 0 };
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n");
+ cse.ce.marker = CONSENSUS_MARKER_SIZE;
+ cse.size = GNUNET_htonll (session->first_size);
+ cse.sender_index = session->local_peer_idx;
+ element.data = &cse;
+ element.size = sizeof (struct ConsensusSizeElement);
+ element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
+ GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+ }
+
#ifdef EVIL
{
unsigned int i;
}
for (i = 0; i < evil.num; i++)
{
- struct GNUNET_HashCode hash;
struct GNUNET_SET_Element element;
- element.data = &hash;
- element.size = sizeof (struct GNUNET_HashCode);
- element.element_type = 0;
+ struct ConsensusStuffedElement se = { 0 };
+ element.data = &se;
+ element.size = sizeof (struct ConsensusStuffedElement);
+ element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT;
if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
{
/* Always generate a new element. */
- GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &hash);
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &se.rand);
}
else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
{
/* Always cram the same elements, derived from counter. */
- GNUNET_CRYPTO_hash (&i, sizeof (i), &hash);
+ GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
}
else
{
"P%u: evil peer: slacking\n",
(unsigned int) session->local_peer_idx);
/* Do nothing. */
+ case EVILNESS_SLACK_A2A:
+ if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
+ (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
+ {
+ struct GNUNET_SET_Handle *empty_set;
+ empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ GNUNET_SET_commit (setop->op, empty_set);
+ GNUNET_SET_destroy (empty_set);
+ }
+ else
+ {
+ GNUNET_SET_commit (setop->op, set->h);
+ }
break;
case EVILNESS_NONE:
GNUNET_SET_commit (setop->op, set->h);
}
}
#else
- if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
- {
- struct GNUNET_SET_Element element;
- struct ContestedPayload payload;
- element.data = &payload;
- element.size = sizeof (struct ContestedPayload);
- element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
- GNUNET_SET_add_element (set->h, &element, NULL, NULL);
- }
if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
{
GNUNET_SET_commit (setop->op, set->h);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
+ struct GNUNET_SET_Option opts[] = {
+ { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
+ { GNUNET_SET_OPTION_END },
+ };
+
// XXX: maybe this should be done while
// setting up tasks alreays?
setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
&session->global_id,
&rcm.header,
GNUNET_SET_RESULT_SYMMETRIC,
+ opts,
set_result_cb,
task);
GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
(task->key.peer2 == session->local_peer_idx)));
+ struct GNUNET_SET_Option opts[] = {
+ { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
+ { GNUNET_SET_OPTION_END },
+ };
+
task->cls.setop.op = GNUNET_SET_accept (request,
GNUNET_SET_RESULT_SYMMETRIC,
+ opts,
set_result_cb,
task);
put_task (session->taskmap, &task);
}
+ round += 1;
prev_step = step;
- step = NULL;
+ step = create_step (session, round, GNUNET_NO);;
+#ifdef GNUNET_EXTRA_LOGGING
+ step->debug_name = GNUNET_strdup ("all to all 2");
+#endif
+ step_depend_on (step, prev_step);
+
+
+ for (i = 0; i < n; i++)
+ {
+ uint16_t p1;
+ uint16_t p2;
+
+ p1 = me;
+ p2 = i;
+ arrange_peers (&p1, &p2, n);
+ task = ((struct TaskEntry) {
+ .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
+ .step = step,
+ .start = task_start_reconcile,
+ .cancel = task_cancel_reconcile,
+ });
+ task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
+ task.cls.setop.output_set = task.cls.setop.input_set;
+ task.cls.setop.do_not_remove = GNUNET_YES;
+ put_task (session->taskmap, &task);
+ }
round += 1;
+ prev_step = step;
+ step = NULL;
+
+
+
/* Byzantine union */
/* sequential repetitions of the gradecasts */
const struct GNUNET_CONSENSUS_ElementMessage *msg)
{
struct ConsensusSession *session = cls;
- struct GNUNET_SET_Element *element;
ssize_t element_size;
struct GNUNET_SET_Handle *initial_set;
+ struct ConsensusElement *ce;
if (GNUNET_YES == session->conclude_started)
{
GNUNET_SERVICE_client_drop (session->client);
return;
}
+
element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
- element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
- element->element_type = msg->element_type;
- element->size = element_size;
- GNUNET_memcpy (&element[1], &msg[1], element_size);
- element->data = &element[1];
+ ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
+ GNUNET_memcpy (&ce[1], &msg[1], element_size);
+ ce->payload_type = msg->element_type;
+
+ struct GNUNET_SET_Element element = {
+ .element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT,
+ .size = sizeof (struct ConsensusElement) + element_size,
+ .data = ce,
+ };
+
{
struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
struct SetEntry *entry;
GNUNET_assert (NULL != entry);
initial_set = entry->h;
}
+
session->num_client_insert_pending++;
GNUNET_SET_add_element (initial_set,
- element,
+ &element,
&client_insert_done,
session);
#ifdef GNUNET_EXTRA_LOGGING
{
- struct GNUNET_HashCode hash;
-
- GNUNET_SET_element_hash (element,
- &hash);
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"P%u: element %s added\n",
session->local_peer_idx,
- GNUNET_h2s (&hash));
+ debug_str_element (&element));
}
#endif
- GNUNET_free (element);
+ GNUNET_free (ce);
GNUNET_SERVICE_client_continue (session->client);
}