* upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
*
* XXX: could use better wording.
+ * XXX: repurposed to also expect a "request full set" message, should be renamed
*
* After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
*/
* Total number of elements received from the other peer.
*/
uint32_t received_total;
+
+ /**
+ * Initial size of our set, just before
+ * the operation started.
+ */
+ uint64_t initial_size;
};
{
int ret;
struct IBF_Key ibf_key;
- struct GetElementContext ctx = { 0 };
+ struct GetElementContext ctx = {{{ 0 }} , 0};
ctx.hash = *element_hash;
{
const struct StrataEstimator *se = op->state->se;
struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_MessageHeader *strata_msg;
+ struct StrataEstimatorMessage *strata_msg;
char *buf;
size_t len;
uint16_t type;
type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
else
type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
- ev = GNUNET_MQ_msg_header_extra (strata_msg,
- len,
- type);
+ ev = GNUNET_MQ_msg_extra (strata_msg,
+ len,
+ type);
GNUNET_memcpy (&strata_msg[1],
buf,
len);
GNUNET_free (buf);
+ strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
GNUNET_MQ_send (op->mq,
ev);
op->state->phase = PHASE_EXPECT_IBF;
ibf_order++;
if (ibf_order > MAX_IBF_ORDER)
ibf_order = MAX_IBF_ORDER;
- return ibf_order;
+ // add one for correction
+ return ibf_order + 1;
}
{
struct Operation *op = cls;
struct GNUNET_SET_ElementMessage *emsg;
- struct GNUNET_SET_Element *el = value;
+ struct ElementEntry *ee = value;
+ struct GNUNET_SET_Element *el = &ee->element;
struct GNUNET_MQ_Envelope *ev;
+
ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
- emsg->element_type = htonl (el->element_type);
+ emsg->element_type = htons (el->element_type);
GNUNET_memcpy (&emsg[1], el->data, el->size);
GNUNET_MQ_send (op->mq, ev);
return GNUNET_YES;
{
struct Operation *op = cls;
struct StrataEstimator *remote_se;
- int diff;
+ struct StrataEstimatorMessage *msg = (void *) mh;
+ unsigned int diff;
+ uint64_t other_size;
size_t len;
GNUNET_STATISTICS_update (_GSS_statistics,
if (op->state->phase != PHASE_EXPECT_SE)
{
- fail_union_operation (op);
GNUNET_break (0);
+ fail_union_operation (op);
return GNUNET_SYSERR;
}
- len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
+ len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
if ( (GNUNET_NO == is_compressed) &&
(len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
+ other_size = GNUNET_ntohll (msg->set_size);
remote_se = strata_estimator_create (SE_STRATA_COUNT,
SE_IBF_SIZE,
SE_IBF_HASH_NUM);
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
- strata_estimator_read (&mh[1],
+ strata_estimator_read (&msg[1],
len,
is_compressed,
remote_se))
GNUNET_assert (NULL != op->state->se);
diff = strata_estimator_difference (remote_se,
op->state->se);
+
+ if (diff > 200)
+ diff = diff * 3 / 2;
+
strata_estimator_destroy (remote_se);
strata_estimator_destroy (op->state->se);
op->state->se = NULL;
diff,
1<<get_order_from_difference (diff));
- if (diff > GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2)
+ if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return GNUNET_SYSERR;
+ }
+
+
+ if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
{
LOG (GNUNET_ERROR_TYPE_INFO,
"Sending full set (diff=%d, own set=%u)\n",
diff,
- GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
- send_full_set (op);
+ op->state->initial_size);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of full sends",
+ 1,
+ GNUNET_NO);
+ if (op->state->initial_size <= other_size)
+ {
+ send_full_set (op);
+ }
+ else
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ op->state->phase = PHASE_EXPECT_IBF;
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+ GNUNET_MQ_send (op->mq, ev);
+ }
}
else
{
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# of ibf sends",
+ 1,
+ GNUNET_NO);
if (GNUNET_OK !=
send_ibf (op,
get_order_from_difference (diff)))
}
rm->result_status = htons (status);
rm->request_id = htonl (op->spec->client_request_id);
- rm->element_type = element->element_type;
+ rm->element_type = htons (element->element_type);
+ rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
GNUNET_memcpy (&rm[1], element->data, element->size);
GNUNET_MQ_send (op->spec->set->client_mq, ev);
}
rm->request_id = htonl (op->spec->client_request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
rm->element_type = htons (0);
+ rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
GNUNET_MQ_send (op->spec->set->client_mq, ev);
/* Will also call the union-specific cancel function. */
_GSS_operation_destroy (op, GNUNET_YES);
}
}
- if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+ if ( (GNUNET_YES == op->spec->byzantine) &&
+ (op->state->received_total > 384 + op->state->received_fresh * 4) &&
+ (op->state->received_fresh < op->state->received_total / 6) )
{
/* The other peer gave us lots of old elements, there's something wrong. */
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+ (unsigned long long) op->state->received_fresh,
+ (unsigned long long) op->state->received_total);
GNUNET_break_op (0);
fail_union_operation (op);
return;
return GNUNET_YES;
}
+
+/**
+ * Handle a
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_request_full (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ if (PHASE_EXPECT_IBF != op->state->phase)
+ {
+ fail_union_operation (op);
+ GNUNET_break_op (0);
+ return;
+ }
+
+ // FIXME: we need to check that our set is larger than the
+ // byzantine_lower_bound by some threshold
+ send_full_set (op);
+}
+
+
/**
* Handle a "full done" message.
*
"sent op request without context message\n");
initialize_key_to_element (op);
+ op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
}
op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
op->state->salt_receive = op->state->salt_send = 42;
initialize_key_to_element (op);
+ op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
/* kick off the operation */
send_strata_estimator (op);
}
case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
handle_p2p_full_done (op, mh);
break;
+ case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
+ handle_p2p_request_full (op, mh);
+ break;
default:
/* Something wrong with cadet's message handlers? */
GNUNET_assert (0);