GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
struct GNUNET_MessageHeader,
NULL),
+ GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ NULL),
GNUNET_MQ_hd_var_size (p2p_message,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
struct GNUNET_MessageHeader,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
struct GNUNET_MessageHeader,
op),
+ GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ op),
GNUNET_MQ_hd_var_size (p2p_message,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
struct GNUNET_MessageHeader,
* upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
*
* XXX: could use better wording.
+ * XXX: repurposed to also expect a "request full set" message, should be renamed
*
* After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
*/
* Total number of elements received from the other peer.
*/
uint32_t received_total;
+
+ /**
+ * Initial size of our set, just before
+ * the operation started.
+ */
+ uint64_t initial_size;
};
{
const struct StrataEstimator *se = op->state->se;
struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_MessageHeader *strata_msg;
+ struct StrataEstimatorMessage *strata_msg;
char *buf;
size_t len;
uint16_t type;
type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
else
type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
- ev = GNUNET_MQ_msg_header_extra (strata_msg,
- len,
- type);
+ ev = GNUNET_MQ_msg_extra (strata_msg,
+ len,
+ type);
GNUNET_memcpy (&strata_msg[1],
buf,
len);
GNUNET_free (buf);
+ strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
GNUNET_MQ_send (op->mq,
ev);
op->state->phase = PHASE_EXPECT_IBF;
{
struct Operation *op = cls;
struct StrataEstimator *remote_se;
- int diff;
+ struct StrataEstimatorMessage *msg = (void *) mh;
+ unsigned int diff;
+ uint64_t other_size;
size_t len;
GNUNET_STATISTICS_update (_GSS_statistics,
if (op->state->phase != PHASE_EXPECT_SE)
{
- fail_union_operation (op);
GNUNET_break (0);
+ fail_union_operation (op);
return GNUNET_SYSERR;
}
- len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
+ len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
if ( (GNUNET_NO == is_compressed) &&
(len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
+ other_size = GNUNET_ntohll (msg->set_size);
remote_se = strata_estimator_create (SE_STRATA_COUNT,
SE_IBF_SIZE,
SE_IBF_HASH_NUM);
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
- strata_estimator_read (&mh[1],
+ strata_estimator_read (&msg[1],
len,
is_compressed,
remote_se))
diff,
1<<get_order_from_difference (diff));
- if (diff > GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2)
+ if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
+ {
+ GNUNET_break (0);
+ fail_union_operation (op);
+ return GNUNET_SYSERR;
+ }
+
+
+ if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 2))
{
LOG (GNUNET_ERROR_TYPE_INFO,
"Sending full set (diff=%d, own set=%u)\n",
diff,
- GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
- send_full_set (op);
+ op->state->initial_size);
+ if (op->state->initial_size <= other_size)
+ {
+ send_full_set (op);
+ }
+ else
+ {
+ struct GNUNET_MQ_Envelope *ev;
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+ GNUNET_MQ_send (op->mq, ev);
+ }
}
else
{
return GNUNET_YES;
}
+
+/**
+ * Handle a
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_request_full (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ if (PHASE_EXPECT_IBF != op->state->phase)
+ {
+ fail_union_operation (op);
+ GNUNET_break_op (0);
+ return;
+ }
+
+ // FIXME: we need to check that our set is larger than the
+ // byzantine_lower_bound by some threshold
+ send_full_set (op);
+}
+
+
/**
* Handle a "full done" message.
*
LOG (GNUNET_ERROR_TYPE_DEBUG,
"sent op request without context message\n");
+ op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
initialize_key_to_element (op);
}
op->state->se = strata_estimator_dup (op->spec->set->state->se);
op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
op->state->salt_receive = op->state->salt_send = 42;
+ op->state->initial_size = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
initialize_key_to_element (op);
/* kick off the operation */
send_strata_estimator (op);
case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
handle_p2p_full_done (op, mh);
break;
+ case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
+ handle_p2p_request_full (op, mh);
+ break;
default:
/* Something wrong with cadet's message handlers? */
GNUNET_assert (0);