From 5abac271a452d04a5fbea2e4333a9606d435cf94 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 27 Feb 2017 03:43:00 +0100 Subject: [PATCH] implement lower bound agreement --- src/consensus/consensus_protocol.h | 13 ++- src/consensus/gnunet-service-consensus.c | 119 ++++++++++++++++++----- 2 files changed, 107 insertions(+), 25 deletions(-) diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index 43b6a9632..fa445dc2e 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h @@ -90,8 +90,8 @@ struct GNUNET_CONSENSUS_RoundContextMessage enum { - CONSENSUS_MARKER_CONTESTED, - CONSENSUS_MARKER_SIZE, + CONSENSUS_MARKER_CONTESTED = 1, + CONSENSUS_MARKER_SIZE = 2, }; @@ -115,6 +115,15 @@ struct ConsensusElement }; +struct ConsensusSizeElement +{ + struct ConsensusElement ce GNUNET_PACKED; + + uint64_t size GNUNET_PACKED; + uint8_t sender_index; +}; + + GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index bfb14996a..8b02031fd 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -492,6 +492,13 @@ struct ConsensusSession * Our set size from the first round. */ uint64_t first_size; + + uint64_t *first_sizes_received; + + /** + * Bounded Eppstein lower bound. + */ + uint64_t lower_bound; }; /** @@ -675,7 +682,10 @@ send_to_client_iter (void *cls, GNUNET_assert (GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT == element->element_type); ce = element->data; - GNUNET_assert (0 == ce->marker); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "marker is %u\n", (unsigned) ce->marker); + + if (0 != ce->marker) + return GNUNET_YES; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%d: sending element %s to client\n", @@ -855,7 +865,7 @@ rfn_vote (struct ReferendumEntry *rfn, } -uint16_t +static uint16_t task_other_peer (struct TaskEntry *task) { uint16_t me = task->step->session->local_peer_idx; @@ -865,6 +875,20 @@ task_other_peer (struct TaskEntry *task) } +static int +cmp_uint64_t (const void *pa, const void *pb) +{ + uint64_t a = *(uint64_t *) pa; + uint64_t b = *(uint64_t *) pb; + + if (a == b) + return 0; + if (a < b) + return -1; + return 1; +} + + /** * Callback for set operation results. Called for each element * in the result set. @@ -946,8 +970,11 @@ set_result_cb (void *cls, return; } - if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || (GNUNET_SET_STATUS_ADD_REMOTE == status) ) + if ( (NULL != consensus_element) && (0 != consensus_element->marker) ) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: got some marker\n", + session->local_peer_idx); if ( (GNUNET_YES == setop->transceive_contested) && (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) ) { @@ -955,6 +982,35 @@ set_result_cb (void *cls, rfn_contest (output_rfn, task_other_peer (task)); return; } + + if (CONSENSUS_MARKER_SIZE == consensus_element->marker) + { + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: got size marker\n", + session->local_peer_idx); + + + struct ConsensusSizeElement *cse = (void *) consensus_element; + + if (cse->sender_index == other_idx) + { + if (NULL == session->first_sizes_received) + session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t); + session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size); + + uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers); + qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t); + session->lower_bound = copy[session->num_peers / 3 + 1]; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "P%u: lower bound %llu\n", + session->local_peer_idx, + (long long) session->lower_bound); + } + return; + } + + return; } switch (status) @@ -1249,6 +1305,31 @@ commit_set (struct ConsensusSession *session, set = lookup_set (session, &setop->input_set); GNUNET_assert (NULL != set); + if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) ) + { + struct GNUNET_SET_Element element; + struct ConsensusElement ce = { 0 }; + ce.marker = CONSENSUS_MARKER_CONTESTED; + element.data = &ce; + element.size = sizeof (struct ConsensusElement); + element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT; + GNUNET_SET_add_element (set->h, &element, NULL, NULL); + } + + if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) + { + struct GNUNET_SET_Element element; + struct ConsensusSizeElement cse = { 0 }; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting size marker\n"); + cse.ce.marker = CONSENSUS_MARKER_SIZE; + cse.size = GNUNET_htonll (session->first_size); + cse.sender_index = session->local_peer_idx; + element.data = &cse; + element.size = sizeof (struct ConsensusSizeElement); + element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT; + GNUNET_SET_add_element (set->h, &element, NULL, NULL); + } + #ifdef EVIL { unsigned int i; @@ -1338,24 +1419,6 @@ commit_set (struct ConsensusSession *session, } } #else - - //if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) - //{ - // struct GNUNET_SET_Element element; - // struct ConsensusElement ce = { 0 }; - //} - - - if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) ) - { - struct GNUNET_SET_Element element; - struct ConsensusElement ce = { 0 }; - ce.marker = CONSENSUS_MARKER_CONTESTED; - element.data = &ce; - element.size = sizeof (struct ConsensusElement); - element.element_type = GNUNET_BLOCK_TYPE_CONSENSUS_ELEMENT; - GNUNET_SET_add_element (set->h, &element, NULL, NULL); - } if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)]) { GNUNET_SET_commit (setop->op, set->h); @@ -2039,13 +2102,18 @@ task_start_reconcile (struct TaskEntry *task) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n", session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set)); + struct GNUNET_SET_Option opts[] = { + {GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } }, + {0}, + }; + // XXX: maybe this should be done while // setting up tasks alreays? setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2], &session->global_id, &rcm.header, GNUNET_SET_RESULT_SYMMETRIC, - (struct GNUNET_SET_Option[]) { 0 }, + opts, set_result_cb, task); @@ -2470,9 +2538,14 @@ set_listen_cb (void *cls, GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx))); + struct GNUNET_SET_Option opts[] = { + {GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } }, + {0}, + }; + task->cls.setop.op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_SYMMETRIC, - (struct GNUNET_SET_Option[]) { 0 }, + opts, set_result_cb, task); -- 2.25.1