X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_union.c;h=f46713c3102d25b403b3aea2ae369d6915a0e9d0;hb=c9bc0115c53e10a31ffffb6dbb1cb85e77168dda;hp=5b452cae1598fd5ef3099065a71d69fdba174db7;hpb=68e2709a38f9c481f96024138a6f9ae57a280a57;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 5b452cae1..f46713c31 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -19,11 +19,13 @@ */ /** * @file set/gnunet-service-set_union.c + * @brief two-peer set operations * @author Florian Dold */ #include "platform.h" #include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" #include "gnunet-service-set.h" #include "ibf.h" #include "gnunet-service-set_union_strata_estimator.h" @@ -31,16 +33,21 @@ #include +#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) + + /** * Number of IBFs in a strata estimator. */ #define SE_STRATA_COUNT 32 + /** * Size of the IBFs in the strata estimator. */ #define SE_IBF_SIZE 80 + /** - * hash num parameter for the difference digests and strata estimators + * The hash num parameter for the difference digests and strata estimators. */ #define SE_IBF_HASH_NUM 4 @@ -54,7 +61,7 @@ * Choose this value so that computing the IBF is still cheaper * than transmitting all values. */ -#define MAX_IBF_ORDER (16) +#define MAX_IBF_ORDER (20) /** * Number of buckets used in the ibf per estimated @@ -69,7 +76,7 @@ enum UnionOperationPhase { /** - * We sent the request message, and expect a strata estimator + * We sent the request message, and expect a strata estimator. */ PHASE_EXPECT_SE, @@ -77,6 +84,9 @@ enum UnionOperationPhase * We sent the strata estimator, and expect an IBF. This phase is entered once * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. * + * XXX: could use better wording. + * XXX: repurposed to also expect a "request full set" message, should be renamed + * * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS */ PHASE_EXPECT_IBF, @@ -87,33 +97,41 @@ enum UnionOperationPhase PHASE_EXPECT_IBF_CONT, /** - * We are sending request and elements, - * and thus only expect elements from the other peer. - * - * We are currently decoding an IBF until it can no longer be decoded, - * we currently send requests and expect elements - * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS + * We are decoding an IBF. */ - PHASE_EXPECT_ELEMENTS, + PHASE_INVENTORY_ACTIVE, /** - * We are expecting elements and requests, and send - * requested elements back to the other peer. - * - * We are in this phase if we have SENT an IBF for the remote peer to decode. - * We expect requests, send elements or could receive an new IBF, which takes - * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS - * - * The remote peer is thus in: - * #PHASE_EXPECT_ELEMENTS + * The other peer is decoding the IBF we just sent. + */ + PHASE_INVENTORY_PASSIVE, + + /** + * The protocol is almost finished, but we still have to flush our message + * queue and/or expect some elements. + */ + PHASE_FINISH_CLOSING, + + /** + * In the penultimate phase, + * we wait until all our demands + * are satisfied. Then we send a done + * message, and wait for another done message. */ - PHASE_EXPECT_ELEMENTS_AND_REQUESTS, + PHASE_FINISH_WAITING, /** - * The protocol is over. - * Results may still have to be sent to the client. + * In the ultimate phase, we wait until + * our demands are satisfied and then + * quit (sending another DONE message). */ - PHASE_FINISHED + PHASE_DONE, + + /** + * After sending the full set, wait for responses with the elements + * that the local peer is missing. + */ + PHASE_FULL_SENDING, }; @@ -122,35 +140,29 @@ enum UnionOperationPhase */ struct OperationState { - /** * Copy of the set's strata estimator at the time of - * creation of this operation + * creation of this operation. */ struct StrataEstimator *se; /** - * The ibf we currently receive + * The IBF we currently receive. */ struct InvertibleBloomFilter *remote_ibf; /** - * IBF of the set's element. + * The IBF with the local set's element. */ struct InvertibleBloomFilter *local_ibf; /** - * Maps IBF-Keys (specific to the current salt) to elements. + * Maps unsalted IBF-Keys to elements. * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. * Colliding IBF-Keys are linked. */ struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; - /** - * Iterator for sending elements on the key to element mapping to the client. - */ - struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; - /** * Current state of the operation. */ @@ -162,10 +174,41 @@ struct OperationState int client_done_sent; /** - * Number of ibf buckets received + * Number of ibf buckets already received into the @a remote_ibf. */ unsigned int ibf_buckets_received; + /** + * Hashes for elements that we have demanded from the other peer. + */ + struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; + + /** + * Salt that we're using for sending IBFs + */ + uint32_t salt_send; + + /** + * Salt for the IBF we've received and that we're currently decoding. + */ + uint32_t salt_receive; + + /** + * Number of elements we received from the other peer + * that were not in the local set yet. + */ + uint32_t received_fresh; + + /** + * Total number of elements received from the other peer. + */ + uint32_t received_total; + + /** + * Initial size of our set, just before + * the operation started. + */ + uint64_t initial_size; }; @@ -181,14 +224,19 @@ struct KeyEntry /** * The actual element associated with the key. + * + * Only owned by the union operation if element->operation + * is #GNUNET_YES. */ struct ElementEntry *element; /** - * Element that collides with this element - * on the ibf key. All colliding entries must have the same ibf key. + * Did we receive this element? + * Even if element->is_foreign is false, we might + * have received the element, so this indicates that + * the other peer has it. */ - struct KeyEntry *next_colliding; + int received; }; @@ -244,18 +292,13 @@ destroy_key_to_element_iter (void *cls, { struct KeyEntry *k = value; - while (NULL != k) + GNUNET_assert (NULL != k); + if (GNUNET_YES == k->element->remote) { - struct KeyEntry *k_tmp = k; - - k = k->next_colliding; - if (GNUNET_YES == k_tmp->element->remote) - { - GNUNET_free (k_tmp->element); - k_tmp->element = NULL; - } - GNUNET_free (k_tmp); + GNUNET_free (k->element); + k->element = NULL; } + GNUNET_free (k); return GNUNET_YES; } @@ -269,8 +312,8 @@ destroy_key_to_element_iter (void *cls, static void union_op_cancel (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "destroying union op\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op\n"); /* check if the op was canceled twice */ GNUNET_assert (NULL != op->state); if (NULL != op->state->remote_ibf) @@ -278,6 +321,11 @@ union_op_cancel (struct Operation *op) ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; } + if (NULL != op->state->demanded_hashes) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); + op->state->demanded_hashes = NULL; + } if (NULL != op->state->local_ibf) { ibf_destroy (op->state->local_ibf); @@ -298,8 +346,8 @@ union_op_cancel (struct Operation *op) } GNUNET_free (op->state); op->state = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "destroying union op done\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op done\n"); } @@ -315,8 +363,8 @@ fail_union_operation (struct Operation *op) struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "union operation failed\n"); + LOG (GNUNET_ERROR_TYPE_ERROR, + "union operation failed\n"); ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->request_id = htonl (op->spec->client_request_id); @@ -331,80 +379,56 @@ fail_union_operation (struct Operation *op) * a salt. * * @param src the hash code - * @param salt salt to use * @return the derived IBF key */ static struct IBF_Key -get_ibf_key (const struct GNUNET_HashCode *src, - uint16_t salt) +get_ibf_key (const struct GNUNET_HashCode *src) { struct IBF_Key key; + uint16_t salt = 0; - GNUNET_CRYPTO_hkdf (&key, sizeof (key), - GCRY_MD_SHA512, GCRY_MD_SHA256, - src, sizeof *src, - &salt, sizeof (salt), - NULL, 0); + GNUNET_CRYPTO_kdf (&key, sizeof (key), + src, sizeof *src, + &salt, sizeof (salt), + NULL, 0); return key; } /** - * Iterator to create the mapping between ibf keys - * and element entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to iterate, - * #GNUNET_NO if not. + * Context for #op_get_element_iterator */ -static int -op_register_element_iterator (void *cls, - uint32_t key, - void *value) +struct GetElementContext { - struct KeyEntry *const new_k = cls; - struct KeyEntry *old_k = value; - - GNUNET_assert (NULL != old_k); - /* check if our ibf key collides with the ibf key in the existing entry */ - if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) - { - /* insert the the new key in the collision chain */ - new_k->next_colliding = old_k->next_colliding; - old_k->next_colliding = new_k; - /* signal to the caller that we were able to insert into a colliding bucket */ - return GNUNET_NO; - } - return GNUNET_YES; -} + struct GNUNET_HashCode hash; + struct KeyEntry *k; +}; /** - * Iterator to create the mapping between ibf keys - * and element entries. + * Iterator over the mapping from IBF keys to element entries. Checks if we + * have an element with a given GNUNET_HashCode. * * @param cls closure * @param key current key code * @param value value in the hash map - * @return #GNUNET_YES (we should continue to iterate) + * @return #GNUNET_YES if we should search further, + * #GNUNET_NO if we've found the element. */ static int -op_has_element_iterator (void *cls, +op_get_element_iterator (void *cls, uint32_t key, void *value) { - struct GNUNET_HashCode *element_hash = cls; + struct GetElementContext *ctx = cls; struct KeyEntry *k = value; GNUNET_assert (NULL != k); - while (NULL != k) + if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, + &ctx->hash)) { - if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, - element_hash)) - return GNUNET_NO; - k = k->next_colliding; + ctx->k = k; + return GNUNET_NO; } return GNUNET_YES; } @@ -418,23 +442,29 @@ op_has_element_iterator (void *cls, * @param element_hash hash of the element to look for * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise */ -static int -op_has_element (struct Operation *op, +static struct KeyEntry * +op_get_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) { int ret; struct IBF_Key ibf_key; + struct GetElementContext ctx = {{{ 0 }} , 0}; - ibf_key = get_ibf_key (element_hash, op->spec->salt); + ctx.hash = *element_hash; + + ibf_key = get_ibf_key (element_hash); ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_has_element_iterator, - (void *) element_hash); + op_get_element_iterator, + &ctx); /* was the iteration aborted because we found the element? */ if (GNUNET_SYSERR == ret) - return GNUNET_YES; - return GNUNET_NO; + { + GNUNET_assert (NULL != ctx.k); + return ctx.k; + } + return NULL; } @@ -446,33 +476,55 @@ op_has_element (struct Operation *op, * This is done to speed up re-tried operations, if some elements * were transmitted, and then the IBF fails to decode. * + * XXX: clarify ownership, doesn't sound right. + * * @param op the union operation * @param ee the element entry + * @parem received was this element received from the remote peer? */ static void op_register_element (struct Operation *op, - struct ElementEntry *ee) + struct ElementEntry *ee, + int received) { - int ret; struct IBF_Key ibf_key; struct KeyEntry *k; - ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt); + ibf_key = get_ibf_key (&ee->element_hash); k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; - ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + k->received = received; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_register_element_iterator, - k); + k, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); +} - /* was the element inserted into a colliding bucket? */ - if (GNUNET_SYSERR == ret) - return; - GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, - (uint32_t) ibf_key.key_val, - k, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; } @@ -488,13 +540,17 @@ prepare_ibf_iterator (void *cls, uint32_t key, void *value) { - struct InvertibleBloomFilter *ibf = cls; + struct Operation *op = cls; struct KeyEntry *ke = value; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "inserting %x into ibf\n", - ke->ibf_key.key_val); - ibf_insert (ibf, ke->ibf_key); + struct IBF_Key salted_key; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] inserting %lx (hash %s) into ibf\n", + (void *) op, + (unsigned long) ke->ibf_key.key_val, + GNUNET_h2s (&ke->element->element_hash)); + salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); + ibf_insert (op->state->local_ibf, salted_key); return GNUNET_YES; } @@ -524,58 +580,91 @@ init_key_to_element_iterator (void *cls, GNUNET_assert (GNUNET_NO == ee->remote); - op_register_element (op, ee); + op_register_element (op, ee, GNUNET_NO); return GNUNET_YES; } +/** + * Initialize the IBF key to element mapping local to this set + * operation. + * + * @param op the set union operation + */ +static void +initialize_key_to_element (struct Operation *op) +{ + unsigned int len; + + GNUNET_assert (NULL == op->state->key_to_element); + len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); + op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); +} + + /** * Create an ibf with the operation's elements * of the specified size * * @param op the union operation * @param size size of the ibf to create + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void +static int prepare_ibf (struct Operation *op, - uint16_t size) + uint32_t size) { - if (NULL == op->state->key_to_element) - { - unsigned int len; + GNUNET_assert (NULL != op->state->key_to_element); - len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); - op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, - init_key_to_element_iterator, op); - } if (NULL != op->state->local_ibf) ibf_destroy (op->state->local_ibf); op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + if (NULL == op->state->local_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate local IBF\n"); + return GNUNET_SYSERR; + } GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, &prepare_ibf_iterator, - op->state->local_ibf); + op); + return GNUNET_OK; } /** * Send an ibf of appropriate size. * + * Fragments the IBF into multiple messages if necessary. + * * @param op the union operation * @param ibf_order order of the ibf to send, size=2^order + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void +static int send_ibf (struct Operation *op, uint16_t ibf_order) { unsigned int buckets_sent = 0; struct InvertibleBloomFilter *ibf; - prepare_ibf (op, 1<state->local_ibf; @@ -593,21 +682,26 @@ send_ibf (struct Operation *op, ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); - msg->reserved = 0; + msg->reserved1 = 0; + msg->reserved2 = 0; msg->order = ibf_order; - msg->offset = htons (buckets_sent); + msg->offset = htonl (buckets_sent); + msg->salt = htonl (op->state->salt_send); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ibf chunk size %u, %u/%u sent\n", - buckets_in_message, - buckets_sent, - 1<mq, ev); } - op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; + /* The other peer must decode the IBF, so + * we're passive. */ + op->state->phase = PHASE_INVENTORY_PASSIVE; + return GNUNET_OK; } @@ -619,18 +713,33 @@ send_ibf (struct Operation *op, static void send_strata_estimator (struct Operation *op) { + const struct StrataEstimator *se = op->state->se; struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *strata_msg; - - ev = GNUNET_MQ_msg_header_extra (strata_msg, - SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE); - strata_estimator_write (op->state->se, &strata_msg[1]); + struct StrataEstimatorMessage *strata_msg; + char *buf; + size_t len; + uint16_t type; + + buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + len = strata_estimator_write (op->state->se, + buf); + if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; + else + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; + ev = GNUNET_MQ_msg_extra (strata_msg, + len, + type); + GNUNET_memcpy (&strata_msg[1], + buf, + len); + GNUNET_free (buf); + strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); GNUNET_MQ_send (op->mq, ev); op->state->phase = PHASE_EXPECT_IBF; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sent SE, expecting IBF\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent SE, expecting IBF\n"); } @@ -652,7 +761,51 @@ get_order_from_difference (unsigned int diff) ibf_order++; if (ibf_order > MAX_IBF_ORDER) ibf_order = MAX_IBF_ORDER; - return ibf_order; + // add one for correction + return ibf_order + 1; +} + + +/** + * Send a set element. + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +send_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = value; + struct GNUNET_SET_Element *el = &ee->element; + struct GNUNET_MQ_Envelope *ev; + + + ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + emsg->element_type = htons (el->element_type); + GNUNET_memcpy (&emsg[1], el->data, el->size); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; +} + + +static void +send_full_set (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_FULL_SENDING; + + (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &send_element_iterator, op); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); } @@ -661,47 +814,125 @@ get_order_from_difference (unsigned int diff) * * @param cls the union operation * @param mh the message + * @param is_compressed #GNUNET_YES if the estimator is compressed * @return #GNUNET_SYSERR if the tunnel should be disconnected, * #GNUNET_OK otherwise */ static int handle_p2p_strata_estimator (void *cls, - const struct GNUNET_MessageHeader *mh) + const struct GNUNET_MessageHeader *mh, + int is_compressed) { struct Operation *op = cls; struct StrataEstimator *remote_se; - int diff; + struct StrataEstimatorMessage *msg = (void *) mh; + unsigned int diff; + uint64_t other_size; + size_t len; + + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (mh->size), + GNUNET_NO); if (op->state->phase != PHASE_EXPECT_SE) { - fail_union_operation (op); GNUNET_break (0); + fail_union_operation (op); return GNUNET_SYSERR; } - if (ntohs (mh->size) != - SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE + - sizeof (struct GNUNET_MessageHeader)) + len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); + if ( (GNUNET_NO == is_compressed) && + (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) { fail_union_operation (op); GNUNET_break (0); return GNUNET_SYSERR; } + other_size = GNUNET_ntohll (msg->set_size); remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); - strata_estimator_read (&mh[1], remote_se); + if (NULL == remote_se) + { + /* insufficient resources, fail */ + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + strata_estimator_read (&msg[1], + len, + is_compressed, + remote_se)) + { + /* decompression failed */ + fail_union_operation (op); + strata_estimator_destroy (remote_se); + return GNUNET_SYSERR; + } GNUNET_assert (NULL != op->state->se); diff = strata_estimator_difference (remote_se, op->state->se); + + if (diff > 200) + diff = diff * 3 / 2; + strata_estimator_destroy (remote_se); strata_estimator_destroy (op->state->se); op->state->se = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "got se diff=%d, using ibf size %d\n", - diff, - 1<spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) + { + GNUNET_break (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + + + if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) + { + LOG (GNUNET_ERROR_TYPE_INFO, + "Sending full set (diff=%d, own set=%u)\n", + diff, + op->state->initial_size); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of full sends", + 1, + GNUNET_NO); + if (op->state->initial_size <= other_size) + { + send_full_set (op); + } + else + { + struct GNUNET_MQ_Envelope *ev; + op->state->phase = PHASE_EXPECT_IBF; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); + GNUNET_MQ_send (op->mq, ev); + } + } + else + { + GNUNET_STATISTICS_update (_GSS_statistics, + "# of ibf sends", + 1, + GNUNET_NO); + if (GNUNET_OK != + send_ibf (op, + get_order_from_difference (diff))) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } + } + return GNUNET_OK; } @@ -714,56 +945,44 @@ handle_p2p_strata_estimator (void *cls, * @param value the key entry */ static int -send_element_iterator (void *cls, - uint32_t key, - void *value) +send_offers_iterator (void *cls, + uint32_t key, + void *value) { struct SendElementClosure *sec = cls; - struct IBF_Key ibf_key = sec->ibf_key; struct Operation *op = sec->op; struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_MessageHeader *mh; - if (ke->ibf_key.key_val != ibf_key.key_val) + /* Detect 32-bit key collision for the 64-bit IBF keys. */ + if (ke->ibf_key.key_val != sec->ibf_key.key_val) return GNUNET_YES; - while (NULL != ke) - { - const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *mh; - GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - ev = GNUNET_MQ_msg_header_extra (mh, - element->size, - GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (NULL == ev) - { - /* element too large */ - GNUNET_break (0); - continue; - } - memcpy (&mh[1], - element->data, - element->size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending element (%s) to peer\n", - GNUNET_h2s (&ke->element->element_hash)); - GNUNET_MQ_send (op->mq, ev); - ke = ke->next_colliding; - } - return GNUNET_NO; + ev = GNUNET_MQ_msg_header_extra (mh, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); + + GNUNET_assert (NULL != ev); + *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] sending element offer (%s) to peer\n", + (void *) op, + GNUNET_h2s (&ke->element->element_hash)); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; } /** - * Send all elements that have the specified IBF key - * to the remote peer of the union operation + * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key. * * @param op union operation * @param ibf_key IBF key of interest */ static void -send_elements_for_key (struct Operation *op, - struct IBF_Key ibf_key) +send_offers_for_key (struct Operation *op, + struct IBF_Key ibf_key) { struct SendElementClosure send_cls; @@ -771,18 +990,19 @@ send_elements_for_key (struct Operation *op, send_cls.op = op; (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - &send_element_iterator, + &send_offers_iterator, &send_cls); } /** * Decode which elements are missing on each side, and - * send the appropriate elemens and requests + * send the appropriate offers and inquiries. * * @param op union operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void +static int decode_and_send (struct Operation *op) { struct IBF_Key key; @@ -791,21 +1011,27 @@ decode_and_send (struct Operation *op) unsigned int num_decoded; struct InvertibleBloomFilter *diff_ibf; - GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); + GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); - prepare_ibf (op, op->state->remote_ibf->size); + if (GNUNET_OK != + prepare_ibf (op, op->state->remote_ibf->size)) + { + GNUNET_break (0); + /* allocation failed */ + return GNUNET_SYSERR; + } diff_ibf = ibf_dup (op->state->local_ibf); ibf_subtract (diff_ibf, op->state->remote_ibf); ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "decoding IBF (size=%u)\n", - diff_ibf->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoding IBF (size=%u)\n", + diff_ibf->size); num_decoded = 0; - last_key.key_val = 0; + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ while (1) { @@ -817,17 +1043,18 @@ decode_and_send (struct Operation *op) res = ibf_decode (diff_ibf, &side, &key); if (res == GNUNET_OK) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "decoded ibf key %lx\n", - key.key_val); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoded ibf key %lx\n", + (unsigned long) key.key_val); num_decoded += 1; if ( (num_decoded > diff_ibf->size) || - (num_decoded > 1 && last_key.key_val == key.key_val) ) + ( (num_decoded > 1) && + (last_key.key_val == key.key_val) ) ) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "detected cyclic ibf (decoded %u/%u)\n", - num_decoded, - diff_ibf->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "detected cyclic ibf (decoded %u/%u)\n", + num_decoded, + diff_ibf->size); cycle_detected = GNUNET_YES; } } @@ -841,15 +1068,37 @@ decode_and_send (struct Operation *op) next_order++; if (next_order <= MAX_IBF_ORDER) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "decoding failed, sending larger ibf (size %u)\n", - 1<state->salt_send++; + if (GNUNET_OK != + send_ibf (op, next_order)) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; + } } else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "set union failed: reached ibf limit\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of failed union operations (too large)", + 1, + GNUNET_NO); + // XXX: Send the whole set, element-by-element + LOG (GNUNET_ERROR_TYPE_ERROR, + "set union failed: reached ibf limit\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; } break; } @@ -857,32 +1106,38 @@ decode_and_send (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "transmitted all values, sending DONE\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "transmitted all values, sending DONE\n"); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); GNUNET_MQ_send (op->mq, ev); + /* We now wait until we get a DONE message back + * and then wait for our MQ to be flushed and all our + * demands be delivered. */ break; } if (1 == side) { - send_elements_for_key (op, key); + struct IBF_Key unsalted_key; + unsalt_key (&key, op->state->salt_receive, &unsalted_key); + send_offers_for_key (op, unsalted_key); } else if (-1 == side) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *msg; + struct InquiryMessage *msg; - /* It may be nice to merge multiple requests, but with cadet's corking it is not worth + /* It may be nice to merge multiple requests, but with CADET's corking it is not worth * the effort additional complexity. */ - ev = GNUNET_MQ_msg_header_extra (msg, - sizeof (struct IBF_Key), - GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); - - memcpy (&msg[1], + ev = GNUNET_MQ_msg_extra (msg, + sizeof (struct IBF_Key), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); + msg->salt = htonl (op->state->salt_receive); + GNUNET_memcpy (&msg[1], &key, sizeof (struct IBF_Key)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending element request\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element inquiry for IBF key %lx\n", + (unsigned long) key.key_val); GNUNET_MQ_send (op->mq, ev); } else @@ -891,12 +1146,16 @@ decode_and_send (struct Operation *op) } } ibf_destroy (diff_ibf); + return GNUNET_OK; } /** * Handle an IBF message from a remote peer. * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * * @param cls the union operation * @param mh the header of the message * @return #GNUNET_SYSERR if the tunnel should be disconnected, @@ -917,17 +1176,26 @@ handle_p2p_ibf (void *cls, return GNUNET_SYSERR; } msg = (const struct IBFMessage *) mh; - if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || + if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || (op->state->phase == PHASE_EXPECT_IBF) ) { op->state->phase = PHASE_EXPECT_IBF_CONT; GNUNET_assert (NULL == op->state->remote_ibf); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new ibf of size %u\n", - 1 << msg->order); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new ibf of size %u\n", + 1 << msg->order); op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); + if (NULL == op->state->remote_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse remote IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } op->state->ibf_buckets_received = 0; - if (0 != ntohs (msg->offset)) + if (0 != ntohl (msg->offset)) { GNUNET_break_op (0); fail_union_operation (op); @@ -936,14 +1204,29 @@ handle_p2p_ibf (void *cls, } else if (op->state->phase == PHASE_EXPECT_IBF_CONT) { - if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) || - (1<order != op->state->remote_ibf->size) ) + if (ntohl (msg->offset) != op->state->ibf_buckets_received) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (1<order != op->state->remote_ibf->size) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (ntohl (msg->salt) != op->state->salt_receive) { GNUNET_break_op (0); fail_union_operation (op); return GNUNET_SYSERR; } } + else + { + GNUNET_assert (0); + } buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; @@ -961,6 +1244,8 @@ handle_p2p_ibf (void *cls, return GNUNET_SYSERR; } + GNUNET_assert (NULL != op->state->remote_ibf); + ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, @@ -969,10 +1254,17 @@ handle_p2p_ibf (void *cls, if (op->state->ibf_buckets_received == op->state->remote_ibf->size) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "received full ibf\n"); - op->state->phase = PHASE_EXPECT_ELEMENTS; - decode_and_send (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received full ibf\n"); + op->state->phase = PHASE_INVENTORY_ACTIVE; + if (GNUNET_OK != + decode_and_send (op)) + { + /* Internal error, best we can do is shut down */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to decode IBF, closing connection\n"); + return GNUNET_SYSERR; + } } return GNUNET_OK; } @@ -984,17 +1276,19 @@ handle_p2p_ibf (void *cls, * * @param op union operation * @param element element to send + * @param status status to send with the new element */ static void send_client_element (struct Operation *op, - struct GNUNET_SET_Element *element) + struct GNUNET_SET_Element *element, + int status) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending element (size %u) to client\n", - element->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element (size %u) to client\n", + element->size); GNUNET_assert (0 != op->spec->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); if (NULL == ev) @@ -1003,15 +1297,11 @@ send_client_element (struct Operation *op, GNUNET_break (0); return; } - - if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode) - rm->result_status = htons (GNUNET_SET_STATUS_OK); - else if (GNUNET_SET_RESULT_SYMMETRIC == op->spec->result_mode) - rm->result_status = htons (GNUNET_SET_STATUS_ADD_LOCAL); - + rm->result_status = htons (status); rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->element_type; - memcpy (&rm[1], element->data, element->size); + rm->element_type = htons (element->element_type); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); + GNUNET_memcpy (&rm[1], element->data, element->size); GNUNET_MQ_send (op->spec->set->client_mq, ev); } @@ -1033,99 +1323,164 @@ send_done_and_destroy (void *cls) rm->request_id = htonl (op->spec->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); GNUNET_MQ_send (op->spec->set->client_mq, ev); + /* Will also call the union-specific cancel function. */ _GSS_operation_destroy (op, GNUNET_YES); - op->keep--; - if (0 == op->keep) - GNUNET_free (op); } -/** - * Send all remaining elements in the full result iterator. - * - * @param cls operation - */ static void -send_remaining_elements (void *cls) +maybe_finish (struct Operation *op) { - struct Operation *op = cls; - struct KeyEntry *ke; - int res; + unsigned int num_demanded; - res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, - NULL, - (const void **) &ke); - if (GNUNET_NO == res) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending done and destroy because iterator ran out\n"); - send_done_and_destroy (op); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending elements from key entry\n"); - while (1) + num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); + + if (PHASE_FINISH_WAITING == op->state->phase) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; - struct GNUNET_SET_Element *element; - - element = &ke->element->element; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending element (size %u) to client (full set)\n", - element->size); - GNUNET_assert (0 != op->spec->client_request_id); - ev = GNUNET_MQ_msg_extra (rm, - element->size, - GNUNET_MESSAGE_TYPE_SET_RESULT); - if (NULL == ev) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_WAITING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) { - GNUNET_MQ_discard (ev); - GNUNET_break (0); - continue; + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_DONE; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, ev); + + /* We now wait until the other peer closes the channel + * after it got all elements from us. */ } - rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->element_type; - memcpy (&rm[1], element->data, element->size); - if (NULL == ke->next_colliding) + } + if (PHASE_FINISH_CLOSING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_CLOSING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) { - GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - break; + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); } - GNUNET_MQ_send (op->spec->set->client_mq, ev); - ke = ke->next_colliding; } } /** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * Handle an element message from a remote peer. + * Sent by the other peer either because we decoded an IBF and placed a demand, + * or because the other peer switched to full set transmission. * - * @param op union operation + * @param cls the union operation + * @param mh the message */ static void -finish_and_destroy (struct Operation *op) +handle_p2p_elements (void *cls, + const struct GNUNET_MessageHeader *mh) { - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); - op->keep++; - if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + struct Operation *op = cls; + struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; + uint16_t element_size; + + if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; + + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, + &ee->element_hash, + NULL)) + { + /* We got something we didn't demand, since it's not in our map. */ + GNUNET_break_op (0); + GNUNET_free (ee); + fail_union_operation (op); + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; + + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; + GNUNET_free (ee); + } + else { - /* prevent that the op is free'd by the tunnel end handler */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending full result set\n"); - GNUNET_assert (NULL == op->state->full_result_iter); - op->state->full_result_iter = - GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); - send_remaining_elements (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } + + if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + GNUNET_break_op (0); + fail_union_operation (op); return; } - send_done_and_destroy (op); + + maybe_finish (op); } @@ -1136,71 +1491,124 @@ finish_and_destroy (struct Operation *op) * @param mh the message */ static void -handle_p2p_elements (void *cls, - const struct GNUNET_MessageHeader *mh) +handle_p2p_full_element (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; uint16_t element_size; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got element from peer\n"); - if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) && - (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) { - fail_union_operation (op); GNUNET_break_op (0); + fail_union_operation (op); return; } - element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; + + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); - memcpy (&ee[1], &mh[1], element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); ee->element.size = element_size; ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); ee->remote = GNUNET_YES; - GNUNET_CRYPTO_hash (ee->element.data, - ee->element.size, - &ee->element_hash); + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (full diff, size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; - if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "got existing element from peer\n"); + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; GNUNET_free (ee); - return; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } } - op_register_element (op, ee); - /* only send results immediately if the client wants it */ - if (GNUNET_SET_RESULT_FULL != op->spec->result_mode) - send_client_element (op, &ee->element); + if ( (GNUNET_YES == op->spec->byzantine) && + (op->state->received_total > 384 + op->state->received_fresh * 4) && + (op->state->received_fresh < op->state->received_total / 6) ) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + LOG (GNUNET_ERROR_TYPE_ERROR, + "Other peer sent only %llu/%llu fresh elements, failing operation\n", + (unsigned long long) op->state->received_fresh, + (unsigned long long) op->state->received_total); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } } - /** - * Handle an element request from a remote peer. + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). * * @param cls the union operation * @param mh the message */ static void -handle_p2p_element_requests (void *cls, - const struct GNUNET_MessageHeader *mh) +handle_p2p_inquiry (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; const struct IBF_Key *ibf_key; unsigned int num_keys; + struct InquiryMessage *msg; /* look up elements and send them */ - if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if (op->state->phase != PHASE_INVENTORY_PASSIVE) { GNUNET_break_op (0); fail_union_operation (op); return; } - num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) - / sizeof (struct IBF_Key); - if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) + / sizeof (struct IBF_Key); + if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) != num_keys * sizeof (struct IBF_Key)) { GNUNET_break_op (0); @@ -1208,15 +1616,277 @@ handle_p2p_element_requests (void *cls, return; } - ibf_key = (const struct IBF_Key *) &mh[1]; + msg = (struct InquiryMessage *) mh; + + ibf_key = (const struct IBF_Key *) &msg[1]; while (0 != num_keys--) { - send_elements_for_key (op, *ibf_key); + struct IBF_Key unsalted_key; + unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); + send_offers_for_key (op, unsalted_key); ibf_key++; } } +/** + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +send_missing_elements_iter (void *cls, + uint32_t key, + void *value) +{ + struct Operation *op = cls; + struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = ke->element; + + if (GNUNET_YES == ke->received) + return GNUNET_YES; + + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + GNUNET_MQ_send (op->mq, ev); + + return GNUNET_YES; +} + + +/** + * Handle a + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_request_full (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF != op->state->phase) + { + fail_union_operation (op); + GNUNET_break_op (0); + return; + } + + // FIXME: we need to check that our set is larger than the + // byzantine_lower_bound by some threshold + send_full_set (op); +} + + +/** + * Handle a "full done" message. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_full_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF == op->state->phase) + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); + + /* send all the elements that did not come from the remote peer */ + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &send_missing_elements_iter, + op); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); + op->state->phase = PHASE_DONE; + + /* we now wait until the other peer shuts the tunnel down*/ + } + else if (PHASE_FULL_SENDING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); + /* We sent the full set, and got the response for that. We're done. */ + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + + +/** + * Handle a demand by the other peer for elements based on a list + * of GNUNET_HashCode-s. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct GNUNET_SET_ElementMessage *emsg; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + struct GNUNET_MQ_Envelope *ev; + + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); + if (NULL == ee) + { + /* Demand for non-existing element. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + /* Probably confused lazily copied sets. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", + (void *) op, + (unsigned int) ee->element.size, + GNUNET_h2s (&ee->element_hash)); + GNUNET_MQ_send (op->mq, ev); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + /* Nothing to do. */ + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } +} + + +/** + * Handle offers (of GNUNET_HashCode-s) and + * respond with demands (of GNUNET_HashCode-s). + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + + /* look up elements and send them */ + if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_INVENTORY_ACTIVE)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + struct ElementEntry *ee; + struct GNUNET_MessageHeader *demands; + struct GNUNET_MQ_Envelope *ev; + + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + hash); + if (NULL != ee) + if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) + continue; + + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, + hash)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Skipped sending duplicate demand\n"); + continue; + } + + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, + hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Requesting element (hash %s)\n", + (void *) op, GNUNET_h2s (hash)); + ev = GNUNET_MQ_msg_header_extra (demands, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); + *(struct GNUNET_HashCode *) &demands[1] = *hash; + GNUNET_MQ_send (op->mq, ev); + } +} + + /** * Handle a done message from a remote peer * @@ -1228,25 +1898,40 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct GNUNET_MQ_Envelope *ev; - if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if (op->state->phase == PHASE_INVENTORY_PASSIVE) { - /* we got all requests, but still have to send our elements as response */ - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "got DONE, sending final DONE after elements\n"); - op->state->phase = PHASE_FINISHED; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); - GNUNET_MQ_send (op->mq, ev); + /* We got all requests, but still have to send our elements in response. */ + + op->state->phase = PHASE_FINISH_WAITING; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as passive partner), waiting for our demands to be satisfied\n"); + /* The active peer is done sending offers + * and inquiries. This means that all + * our responses to that (demands and offers) + * must be in flight (queued or in mesh). + * + * We should notify the active peer once + * all our demands are satisfied, so that the active + * peer can quit if we gave him everything. + */ + maybe_finish (op); return; } - if (op->state->phase == PHASE_EXPECT_ELEMENTS) + if (op->state->phase == PHASE_INVENTORY_ACTIVE) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "got final DONE\n"); - op->state->phase = PHASE_FINISHED; - finish_and_destroy (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as active partner), waiting to finish\n"); + /* All demands of the other peer are satisfied, + * and we processed all offers, thus we know + * exactly what our demands must be. + * + * We'll close the channel + * to the other peer once our demands are met. + */ + op->state->phase = PHASE_FINISH_CLOSING; + maybe_finish (op); return; } GNUNET_break_op (0); @@ -1268,13 +1953,24 @@ union_evaluate (struct Operation *op, struct GNUNET_MQ_Envelope *ev; struct OperationRequestMessage *msg; + GNUNET_assert (NULL == op->state); op->state = GNUNET_new (struct OperationState); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); /* copy the current generation's strata estimator for this operation */ op->state->se = strata_estimator_dup (op->spec->set->state->se); /* we started the operation, thus we have to send the operation request */ op->state->phase = PHASE_EXPECT_SE; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initiating union operation evaluation\n"); + op->state->salt_receive = op->state->salt_send = 42; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initiating union operation evaluation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of initiated union operations", + 1, + GNUNET_NO); ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, opaque_context); @@ -1282,20 +1978,22 @@ union_evaluate (struct Operation *op, { /* the context message is too large */ GNUNET_break (0); - GNUNET_SERVER_client_disconnect (op->spec->set->client); + GNUNET_SERVICE_client_drop (op->spec->set->client); return; } msg->operation = htonl (GNUNET_SET_OPERATION_UNION); - msg->app_id = op->spec->app_id; GNUNET_MQ_send (op->mq, ev); if (NULL != opaque_context) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sent op request with context message\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request with context message\n"); else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sent op request without context message\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request without context message\n"); + + initialize_key_to_element (op); + op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); } @@ -1308,10 +2006,25 @@ union_evaluate (struct Operation *op, static void union_accept (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "accepting set union operation\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "accepting set union operation\n"); + GNUNET_assert (NULL == op->state); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# of accepted union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + op->state = GNUNET_new (struct OperationState); op->state->se = strata_estimator_dup (op->spec->set->state->se); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + op->state->salt_receive = op->state->salt_send = 42; + initialize_key_to_element (op); + op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); /* kick off the operation */ send_strata_estimator (op); } @@ -1323,18 +2036,25 @@ union_accept (struct Operation *op) * We maintain one strata estimator per set and then manipulate it over the * lifetime of the set, as recreating a strata estimator would be expensive. * - * @return the newly created set + * @return the newly created set, NULL on error */ static struct SetState * union_set_create (void) { struct SetState *set_state; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "union set created\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "union set created\n"); set_state = GNUNET_new (struct SetState); set_state->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); + if (NULL == set_state->se) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate strata estimator\n"); + GNUNET_free (set_state); + return NULL; + } return set_state; } @@ -1349,7 +2069,7 @@ static void union_add (struct SetState *set_state, struct ElementEntry *ee) { strata_estimator_insert (set_state->se, - get_ibf_key (&ee->element_hash, 0)); + get_ibf_key (&ee->element_hash)); } @@ -1364,7 +2084,7 @@ static void union_remove (struct SetState *set_state, struct ElementEntry *ee) { strata_estimator_remove (set_state->se, - get_ibf_key (&ee->element_hash, 0)); + get_ibf_key (&ee->element_hash)); } @@ -1397,42 +2117,60 @@ int union_handle_p2p_message (struct Operation *op, const struct GNUNET_MessageHeader *mh) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "received p2p message (t: %u, s: %u)\n", - ntohs (mh->type), - ntohs (mh->size)); + //LOG (GNUNET_ERROR_TYPE_DEBUG, + // "received p2p message (t: %u, s: %u)\n", + // ntohs (mh->type), + // ntohs (mh->size)); switch (ntohs (mh->type)) { case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: return handle_p2p_ibf (op, mh); case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: - return handle_p2p_strata_estimator (op, mh); + return handle_p2p_strata_estimator (op, mh, GNUNET_NO); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC: + return handle_p2p_strata_estimator (op, mh, GNUNET_YES); case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: handle_p2p_elements (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: - handle_p2p_element_requests (op, mh); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: + handle_p2p_full_element (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: + handle_p2p_inquiry (op, mh); break; case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: handle_p2p_done (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: + handle_p2p_offer (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: + handle_p2p_demand (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: + handle_p2p_full_done (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: + handle_p2p_request_full (op, mh); + break; default: - /* something wrong with cadet's message handlers? */ + /* Something wrong with cadet's message handlers? */ GNUNET_assert (0); } return GNUNET_OK; } + /** - * handler for peer-disconnects, notifies the client - * about the aborted operation in case the op was not concluded + * Handler for peer-disconnects, notifies the client + * about the aborted operation in case the op was not concluded. * * @param op the destroyed operation */ static void union_peer_disconnect (struct Operation *op) { - if (PHASE_FINISHED != op->state->phase) + if (PHASE_DONE != op->state->phase) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; @@ -1444,19 +2182,27 @@ union_peer_disconnect (struct Operation *op) msg->element_type = htons (0); GNUNET_MQ_send (op->spec->set->client_mq, ev); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "other peer disconnected prematurely\n"); + LOG (GNUNET_ERROR_TYPE_WARNING, + "other peer disconnected prematurely, phase %u\n", + op->state->phase); _GSS_operation_destroy (op, GNUNET_YES); return; } // else: the session has already been concluded - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "other peer disconnected (finished)\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "other peer disconnected (finished)\n"); if (GNUNET_NO == op->state->client_done_sent) - finish_and_destroy (op); + send_done_and_destroy (op); } + +/** + * Copy union-specific set state. + * + * @param set source set for copying the union state + * @return a copy of the union-specific set state + */ static struct SetState * union_copy_state (struct Set *set) { @@ -1469,6 +2215,7 @@ union_copy_state (struct Set *set) return new_state; } + /** * Get the table with implementing functions for * set union.