X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_union.c;h=f46713c3102d25b403b3aea2ae369d6915a0e9d0;hb=c9bc0115c53e10a31ffffb6dbb1cb85e77168dda;hp=78975749a5c337851df4f133b07e952b2af3cd5d;hpb=9b6eafa38e02ace11cc6217efee78e95b82eb19e;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 78975749a..f46713c31 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors) + Copyright (C) 2013-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -19,6 +19,7 @@ */ /** * @file set/gnunet-service-set_union.c + * @brief two-peer set operations * @author Florian Dold */ @@ -60,7 +61,7 @@ * Choose this value so that computing the IBF is still cheaper * than transmitting all values. */ -#define MAX_IBF_ORDER (16) +#define MAX_IBF_ORDER (20) /** * Number of buckets used in the ibf per estimated @@ -84,6 +85,7 @@ enum UnionOperationPhase * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. * * XXX: could use better wording. + * XXX: repurposed to also expect a "request full set" message, should be renamed * * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS */ @@ -114,14 +116,22 @@ enum UnionOperationPhase * In the penultimate phase, * we wait until all our demands * are satisfied. Then we send a done - * message, and wait for another done message.*/ + * message, and wait for another done message. + */ PHASE_FINISH_WAITING, /** * In the ultimate phase, we wait until * our demands are satisfied and then - * quit (sending another DONE message). */ - PHASE_DONE + * quit (sending another DONE message). + */ + PHASE_DONE, + + /** + * After sending the full set, wait for responses with the elements + * that the local peer is missing. + */ + PHASE_FULL_SENDING, }; @@ -147,7 +157,7 @@ struct OperationState struct InvertibleBloomFilter *local_ibf; /** - * Maps IBF-Keys (specific to the current salt) to elements. + * Maps unsalted IBF-Keys to elements. * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. * Colliding IBF-Keys are linked. */ @@ -172,6 +182,33 @@ struct OperationState * Hashes for elements that we have demanded from the other peer. */ struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; + + /** + * Salt that we're using for sending IBFs + */ + uint32_t salt_send; + + /** + * Salt for the IBF we've received and that we're currently decoding. + */ + uint32_t salt_receive; + + /** + * Number of elements we received from the other peer + * that were not in the local set yet. + */ + uint32_t received_fresh; + + /** + * Total number of elements received from the other peer. + */ + uint32_t received_total; + + /** + * Initial size of our set, just before + * the operation started. + */ + uint64_t initial_size; }; @@ -192,6 +229,14 @@ struct KeyEntry * is #GNUNET_YES. */ struct ElementEntry *element; + + /** + * Did we receive this element? + * Even if element->is_foreign is false, we might + * have received the element, so this indicates that + * the other peer has it. + */ + int received; }; @@ -334,18 +379,13 @@ fail_union_operation (struct Operation *op) * a salt. * * @param src the hash code - * @param salt salt to use * @return the derived IBF key */ static struct IBF_Key -get_ibf_key (const struct GNUNET_HashCode *src, - uint16_t salt) +get_ibf_key (const struct GNUNET_HashCode *src) { struct IBF_Key key; - - /* FIXME: Ensure that the salt is handled correctly. - This is a quick fix so that consensus works for now. */ - salt = 0; + uint16_t salt = 0; GNUNET_CRYPTO_kdf (&key, sizeof (key), src, sizeof *src, @@ -355,6 +395,16 @@ get_ibf_key (const struct GNUNET_HashCode *src, } +/** + * Context for #op_get_element_iterator + */ +struct GetElementContext +{ + struct GNUNET_HashCode hash; + struct KeyEntry *k; +}; + + /** * Iterator over the mapping from IBF keys to element entries. Checks if we * have an element with a given GNUNET_HashCode. @@ -366,17 +416,20 @@ get_ibf_key (const struct GNUNET_HashCode *src, * #GNUNET_NO if we've found the element. */ static int -op_has_element_iterator (void *cls, +op_get_element_iterator (void *cls, uint32_t key, void *value) { - struct GNUNET_HashCode *element_hash = cls; + struct GetElementContext *ctx = cls; struct KeyEntry *k = value; GNUNET_assert (NULL != k); if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, - element_hash)) + &ctx->hash)) + { + ctx->k = k; return GNUNET_NO; + } return GNUNET_YES; } @@ -389,23 +442,29 @@ op_has_element_iterator (void *cls, * @param element_hash hash of the element to look for * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise */ -static int -op_has_element (struct Operation *op, +static struct KeyEntry * +op_get_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) { int ret; struct IBF_Key ibf_key; + struct GetElementContext ctx = {{{ 0 }} , 0}; - ibf_key = get_ibf_key (element_hash, op->spec->salt); + ctx.hash = *element_hash; + + ibf_key = get_ibf_key (element_hash); ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_has_element_iterator, - (void *) element_hash); + op_get_element_iterator, + &ctx); /* was the iteration aborted because we found the element? */ if (GNUNET_SYSERR == ret) - return GNUNET_YES; - return GNUNET_NO; + { + GNUNET_assert (NULL != ctx.k); + return ctx.k; + } + return NULL; } @@ -421,18 +480,21 @@ op_has_element (struct Operation *op, * * @param op the union operation * @param ee the element entry + * @parem received was this element received from the remote peer? */ static void op_register_element (struct Operation *op, - struct ElementEntry *ee) + struct ElementEntry *ee, + int received) { struct IBF_Key ibf_key; struct KeyEntry *k; - ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt); + ibf_key = get_ibf_key (&ee->element_hash); k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; + k->received = received; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, @@ -441,6 +503,31 @@ op_register_element (struct Operation *op, } +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; +} + + /** * Insert a key into an ibf. * @@ -455,13 +542,15 @@ prepare_ibf_iterator (void *cls, { struct Operation *op = cls; struct KeyEntry *ke = value; + struct IBF_Key salted_key; LOG (GNUNET_ERROR_TYPE_DEBUG, "[OP %x] inserting %lx (hash %s) into ibf\n", (void *) op, (unsigned long) ke->ibf_key.key_val, GNUNET_h2s (&ke->element->element_hash)); - ibf_insert (op->state->local_ibf, ke->ibf_key); + salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); + ibf_insert (op->state->local_ibf, salted_key); return GNUNET_YES; } @@ -491,11 +580,29 @@ init_key_to_element_iterator (void *cls, GNUNET_assert (GNUNET_NO == ee->remote); - op_register_element (op, ee); + op_register_element (op, ee, GNUNET_NO); return GNUNET_YES; } +/** + * Initialize the IBF key to element mapping local to this set + * operation. + * + * @param op the set union operation + */ +static void +initialize_key_to_element (struct Operation *op) +{ + unsigned int len; + + GNUNET_assert (NULL == op->state->key_to_element); + len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); + op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); +} + + /** * Create an ibf with the operation's elements * of the specified size @@ -508,15 +615,8 @@ static int prepare_ibf (struct Operation *op, uint32_t size) { - if (NULL == op->state->key_to_element) - { - unsigned int len; + GNUNET_assert (NULL != op->state->key_to_element); - len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); - op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, - init_key_to_element_iterator, op); - } if (NULL != op->state->local_ibf) ibf_destroy (op->state->local_ibf); op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); @@ -582,9 +682,11 @@ send_ibf (struct Operation *op, ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); - msg->reserved = 0; + msg->reserved1 = 0; + msg->reserved2 = 0; msg->order = ibf_order; - msg->offset = htons (buckets_sent); + msg->offset = htonl (buckets_sent); + msg->salt = htonl (op->state->salt_send); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; @@ -613,7 +715,7 @@ send_strata_estimator (struct Operation *op) { const struct StrataEstimator *se = op->state->se; struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *strata_msg; + struct StrataEstimatorMessage *strata_msg; char *buf; size_t len; uint16_t type; @@ -625,13 +727,14 @@ send_strata_estimator (struct Operation *op) type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; else type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; - ev = GNUNET_MQ_msg_header_extra (strata_msg, - len, - type); - memcpy (&strata_msg[1], + ev = GNUNET_MQ_msg_extra (strata_msg, + len, + type); + GNUNET_memcpy (&strata_msg[1], buf, len); GNUNET_free (buf); + strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); GNUNET_MQ_send (op->mq, ev); op->state->phase = PHASE_EXPECT_IBF; @@ -658,7 +761,51 @@ get_order_from_difference (unsigned int diff) ibf_order++; if (ibf_order > MAX_IBF_ORDER) ibf_order = MAX_IBF_ORDER; - return ibf_order; + // add one for correction + return ibf_order + 1; +} + + +/** + * Send a set element. + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +send_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = value; + struct GNUNET_SET_Element *el = &ee->element; + struct GNUNET_MQ_Envelope *ev; + + + ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + emsg->element_type = htons (el->element_type); + GNUNET_memcpy (&emsg[1], el->data, el->size); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; +} + + +static void +send_full_set (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_FULL_SENDING; + + (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &send_element_iterator, op); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); } @@ -678,16 +825,23 @@ handle_p2p_strata_estimator (void *cls, { struct Operation *op = cls; struct StrataEstimator *remote_se; - int diff; + struct StrataEstimatorMessage *msg = (void *) mh; + unsigned int diff; + uint64_t other_size; size_t len; + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (mh->size), + GNUNET_NO); + if (op->state->phase != PHASE_EXPECT_SE) { - fail_union_operation (op); GNUNET_break (0); + fail_union_operation (op); return GNUNET_SYSERR; } - len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); + len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); if ( (GNUNET_NO == is_compressed) && (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) { @@ -695,6 +849,7 @@ handle_p2p_strata_estimator (void *cls, GNUNET_break (0); return GNUNET_SYSERR; } + other_size = GNUNET_ntohll (msg->set_size); remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); @@ -705,18 +860,23 @@ handle_p2p_strata_estimator (void *cls, return GNUNET_SYSERR; } if (GNUNET_OK != - strata_estimator_read (&mh[1], + strata_estimator_read (&msg[1], len, is_compressed, remote_se)) { /* decompression failed */ fail_union_operation (op); + strata_estimator_destroy (remote_se); return GNUNET_SYSERR; } GNUNET_assert (NULL != op->state->se); diff = strata_estimator_difference (remote_se, op->state->se); + + if (diff > 200) + diff = diff * 3 / 2; + strata_estimator_destroy (remote_se); strata_estimator_destroy (op->state->se); op->state->se = NULL; @@ -724,16 +884,55 @@ handle_p2p_strata_estimator (void *cls, "got se diff=%d, using ibf size %d\n", diff, 1<spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) { - /* Internal error, best we can do is shut the connection */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to send IBF, closing connection\n"); + GNUNET_break (0); fail_union_operation (op); return GNUNET_SYSERR; } + + + if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) + { + LOG (GNUNET_ERROR_TYPE_INFO, + "Sending full set (diff=%d, own set=%u)\n", + diff, + op->state->initial_size); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of full sends", + 1, + GNUNET_NO); + if (op->state->initial_size <= other_size) + { + send_full_set (op); + } + else + { + struct GNUNET_MQ_Envelope *ev; + op->state->phase = PHASE_EXPECT_IBF; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); + GNUNET_MQ_send (op->mq, ev); + } + } + else + { + GNUNET_STATISTICS_update (_GSS_statistics, + "# of ibf sends", + 1, + GNUNET_NO); + if (GNUNET_OK != + send_ibf (op, + get_order_from_difference (diff))) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } + } + return GNUNET_OK; } @@ -817,6 +1016,7 @@ decode_and_send (struct Operation *op) if (GNUNET_OK != prepare_ibf (op, op->state->remote_ibf->size)) { + GNUNET_break (0); /* allocation failed */ return GNUNET_SYSERR; } @@ -831,7 +1031,7 @@ decode_and_send (struct Operation *op) diff_ibf->size); num_decoded = 0; - last_key.key_val = 0; + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ while (1) { @@ -848,7 +1048,8 @@ decode_and_send (struct Operation *op) (unsigned long) key.key_val); num_decoded += 1; if ( (num_decoded > diff_ibf->size) || - (num_decoded > 1 && last_key.key_val == key.key_val) ) + ( (num_decoded > 1) && + (last_key.key_val == key.key_val) ) ) { LOG (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", @@ -874,6 +1075,7 @@ decode_and_send (struct Operation *op) "# of IBF retries", 1, GNUNET_NO); + op->state->salt_send++; if (GNUNET_OK != send_ibf (op, next_order)) { @@ -915,20 +1117,22 @@ decode_and_send (struct Operation *op) } if (1 == side) { - send_offers_for_key (op, key); + struct IBF_Key unsalted_key; + unsalt_key (&key, op->state->salt_receive, &unsalted_key); + send_offers_for_key (op, unsalted_key); } else if (-1 == side) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *msg; + struct InquiryMessage *msg; /* It may be nice to merge multiple requests, but with CADET's corking it is not worth * the effort additional complexity. */ - ev = GNUNET_MQ_msg_header_extra (msg, - sizeof (struct IBF_Key), - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); - - memcpy (&msg[1], + ev = GNUNET_MQ_msg_extra (msg, + sizeof (struct IBF_Key), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); + msg->salt = htonl (op->state->salt_receive); + GNUNET_memcpy (&msg[1], &key, sizeof (struct IBF_Key)); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -981,6 +1185,8 @@ handle_p2p_ibf (void *cls, "Creating new ibf of size %u\n", 1 << msg->order); op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); if (NULL == op->state->remote_ibf) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -989,7 +1195,7 @@ handle_p2p_ibf (void *cls, return GNUNET_SYSERR; } op->state->ibf_buckets_received = 0; - if (0 != ntohs (msg->offset)) + if (0 != ntohl (msg->offset)) { GNUNET_break_op (0); fail_union_operation (op); @@ -998,8 +1204,19 @@ handle_p2p_ibf (void *cls, } else if (op->state->phase == PHASE_EXPECT_IBF_CONT) { - if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) || - (1<order != op->state->remote_ibf->size) ) + if (ntohl (msg->offset) != op->state->ibf_buckets_received) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (1<order != op->state->remote_ibf->size) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (ntohl (msg->salt) != op->state->salt_receive) { GNUNET_break_op (0); fail_union_operation (op); @@ -1082,8 +1299,9 @@ send_client_element (struct Operation *op, } rm->result_status = htons (status); rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->element_type; - memcpy (&rm[1], element->data, element->size); + rm->element_type = htons (element->element_type); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); + GNUNET_memcpy (&rm[1], element->data, element->size); GNUNET_MQ_send (op->spec->set->client_mq, ev); } @@ -1105,6 +1323,7 @@ send_done_and_destroy (void *cls) rm->request_id = htonl (op->spec->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); GNUNET_MQ_send (op->spec->set->client_mq, ev); /* Will also call the union-specific cancel function. */ _GSS_operation_destroy (op, GNUNET_YES); @@ -1151,6 +1370,8 @@ maybe_finish (struct Operation *op) /** * Handle an element message from a remote peer. + * Sent by the other peer either because we decoded an IBF and placed a demand, + * or because the other peer switched to full set transmission. * * @param cls the union operation * @param mh the message @@ -1181,7 +1402,7 @@ handle_p2p_elements (void *cls, element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); - memcpy (&ee[1], &emsg[1], element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); ee->element.size = element_size; ee->element.data = &ee[1]; ee->element.element_type = ntohs (emsg->element_type); @@ -1209,8 +1430,16 @@ handle_p2p_elements (void *cls, "# received elements", 1, GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; - if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) { /* Got repeated element. Should not happen since * we track demands. */ @@ -1218,13 +1447,15 @@ handle_p2p_elements (void *cls, "# repeated elements", 1, GNUNET_NO); + ke->received = GNUNET_YES; GNUNET_free (ee); } else { LOG (GNUNET_ERROR_TYPE_DEBUG, "Registering new element from remote peer\n"); - op_register_element (op, ee); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); /* only send results immediately if the client wants it */ switch (op->spec->result_mode) { @@ -1241,10 +1472,117 @@ handle_p2p_elements (void *cls, } } + if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + maybe_finish (op); } +/** + * Handle an element message from a remote peer. + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_p2p_full_element (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; + uint16_t element_size; + + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; + + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (full diff, size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; + + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } + + if ( (GNUNET_YES == op->spec->byzantine) && + (op->state->received_total > 384 + op->state->received_fresh * 4) && + (op->state->received_fresh < op->state->received_total / 6) ) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + LOG (GNUNET_ERROR_TYPE_ERROR, + "Other peer sent only %llu/%llu fresh elements, failing operation\n", + (unsigned long long) op->state->received_fresh, + (unsigned long long) op->state->received_total); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + /** * Send offers (for GNUNET_Hash-es) in response * to inquiries (for IBF_Key-s). @@ -1259,6 +1597,7 @@ handle_p2p_inquiry (void *cls, struct Operation *op = cls; const struct IBF_Key *ibf_key; unsigned int num_keys; + struct InquiryMessage *msg; /* look up elements and send them */ if (op->state->phase != PHASE_INVENTORY_PASSIVE) @@ -1267,9 +1606,9 @@ handle_p2p_inquiry (void *cls, fail_union_operation (op); return; } - num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) / sizeof (struct IBF_Key); - if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) != num_keys * sizeof (struct IBF_Key)) { GNUNET_break_op (0); @@ -1277,17 +1616,130 @@ handle_p2p_inquiry (void *cls, return; } - ibf_key = (const struct IBF_Key *) &mh[1]; + msg = (struct InquiryMessage *) mh; + + ibf_key = (const struct IBF_Key *) &msg[1]; while (0 != num_keys--) { - send_offers_for_key (op, *ibf_key); + struct IBF_Key unsalted_key; + unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); + send_offers_for_key (op, unsalted_key); ibf_key++; } } /** - * FIXME + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +send_missing_elements_iter (void *cls, + uint32_t key, + void *value) +{ + struct Operation *op = cls; + struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = ke->element; + + if (GNUNET_YES == ke->received) + return GNUNET_YES; + + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + GNUNET_MQ_send (op->mq, ev); + + return GNUNET_YES; +} + + +/** + * Handle a + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_request_full (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF != op->state->phase) + { + fail_union_operation (op); + GNUNET_break_op (0); + return; + } + + // FIXME: we need to check that our set is larger than the + // byzantine_lower_bound by some threshold + send_full_set (op); +} + + +/** + * Handle a "full done" message. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_full_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF == op->state->phase) + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); + + /* send all the elements that did not come from the remote peer */ + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &send_missing_elements_iter, + op); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); + op->state->phase = PHASE_DONE; + + /* we now wait until the other peer shuts the tunnel down*/ + } + else if (PHASE_FULL_SENDING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); + /* We sent the full set, and got the response for that. We're done. */ + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + + +/** + * Handle a demand by the other peer for elements based on a list + * of GNUNET_HashCode-s. + * + * @parem cls closure, a set union operation + * @param mh the demand message */ static void handle_p2p_demand (void *cls, @@ -1330,7 +1782,7 @@ handle_p2p_demand (void *cls, return; } ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - memcpy (&emsg[1], ee->element.data, ee->element.size); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); emsg->reserved = htons (0); emsg->element_type = htons (ee->element.element_type); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1339,6 +1791,10 @@ handle_p2p_demand (void *cls, (unsigned int) ee->element.size, GNUNET_h2s (&ee->element_hash)); GNUNET_MQ_send (op->mq, ev); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); switch (op->spec->result_mode) { @@ -1504,8 +1960,13 @@ union_evaluate (struct Operation *op, op->state->se = strata_estimator_dup (op->spec->set->state->se); /* we started the operation, thus we have to send the operation request */ op->state->phase = PHASE_EXPECT_SE; + op->state->salt_receive = op->state->salt_send = 42; LOG (GNUNET_ERROR_TYPE_DEBUG, "Initiating union operation evaluation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (_GSS_statistics, "# of initiated union operations", 1, @@ -1517,11 +1978,10 @@ union_evaluate (struct Operation *op, { /* the context message is too large */ GNUNET_break (0); - GNUNET_SERVER_client_disconnect (op->spec->set->client); + GNUNET_SERVICE_client_drop (op->spec->set->client); return; } msg->operation = htonl (GNUNET_SET_OPERATION_UNION); - msg->app_id = op->spec->app_id; GNUNET_MQ_send (op->mq, ev); @@ -1531,6 +1991,9 @@ union_evaluate (struct Operation *op, else LOG (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); + + initialize_key_to_element (op); + op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); } @@ -1551,10 +2014,17 @@ union_accept (struct Operation *op) "# of accepted union operations", 1, GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); op->state = GNUNET_new (struct OperationState); op->state->se = strata_estimator_dup (op->spec->set->state->se); op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + op->state->salt_receive = op->state->salt_send = 42; + initialize_key_to_element (op); + op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); /* kick off the operation */ send_strata_estimator (op); } @@ -1599,7 +2069,7 @@ static void union_add (struct SetState *set_state, struct ElementEntry *ee) { strata_estimator_insert (set_state->se, - get_ibf_key (&ee->element_hash, 0)); + get_ibf_key (&ee->element_hash)); } @@ -1614,7 +2084,7 @@ static void union_remove (struct SetState *set_state, struct ElementEntry *ee) { strata_estimator_remove (set_state->se, - get_ibf_key (&ee->element_hash, 0)); + get_ibf_key (&ee->element_hash)); } @@ -1662,6 +2132,9 @@ union_handle_p2p_message (struct Operation *op, case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: handle_p2p_elements (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: + handle_p2p_full_element (op, mh); + break; case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: handle_p2p_inquiry (op, mh); break; @@ -1674,6 +2147,12 @@ union_handle_p2p_message (struct Operation *op, case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: handle_p2p_demand (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: + handle_p2p_full_done (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: + handle_p2p_request_full (op, mh); + break; default: /* Something wrong with cadet's message handlers? */ GNUNET_assert (0);