X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_union.c;h=f46713c3102d25b403b3aea2ae369d6915a0e9d0;hb=c9bc0115c53e10a31ffffb6dbb1cb85e77168dda;hp=6bb28471a7fc0460e1d4b11f5cdfc6abcff6e550;hpb=e9a2778efa6e4ee9940cdb56face621dc319787f;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 6bb28471a..f46713c31 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,34 +14,40 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ - /** - * @file set/gnunet-service-set.c + * @file set/gnunet-service-set_union.c + * @brief two-peer set operations * @author Florian Dold */ #include "platform.h" #include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" #include "gnunet-service-set.h" #include "ibf.h" -#include "strata_estimator.h" -#include "set_protocol.h" +#include "gnunet-service-set_union_strata_estimator.h" +#include "gnunet-service-set_protocol.h" #include +#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) + + /** * Number of IBFs in a strata estimator. */ #define SE_STRATA_COUNT 32 + /** * Size of the IBFs in the strata estimator. */ #define SE_IBF_SIZE 80 + /** - * hash num parameter for the difference digests and strata estimators + * The hash num parameter for the difference digests and strata estimators. */ #define SE_IBF_HASH_NUM 4 @@ -55,7 +61,7 @@ * Choose this value so that computing the IBF is still cheaper * than transmitting all values. */ -#define MAX_IBF_ORDER (16) +#define MAX_IBF_ORDER (20) /** * Number of buckets used in the ibf per estimated @@ -70,88 +76,93 @@ enum UnionOperationPhase { /** - * We sent the request message, and expect a strata estimator + * We sent the request message, and expect a strata estimator. */ PHASE_EXPECT_SE, + /** * We sent the strata estimator, and expect an IBF. This phase is entered once - * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS. + * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. + * + * XXX: could use better wording. + * XXX: repurposed to also expect a "request full set" message, should be renamed * - * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS + * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS */ PHASE_EXPECT_IBF, + /** * Continuation for multi part IBFs. */ PHASE_EXPECT_IBF_CONT, + /** - * We are sending request and elements, - * and thus only expect elements from the other peer. - * - * We are currently decoding an IBF until it can no longer be decoded, - * we currently send requests and expect elements - * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS + * We are decoding an IBF. */ - PHASE_EXPECT_ELEMENTS, + PHASE_INVENTORY_ACTIVE, + /** - * We are expecting elements and requests, and send - * requested elements back to the other peer. - * - * We are in this phase if we have SENT an IBF for the remote peer to decode. - * We expect requests, send elements or could receive an new IBF, which takes - * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS - * - * The remote peer is thus in: - * PHASE_EXPECT_ELEMENTS + * The other peer is decoding the IBF we just sent. + */ + PHASE_INVENTORY_PASSIVE, + + /** + * The protocol is almost finished, but we still have to flush our message + * queue and/or expect some elements. + */ + PHASE_FINISH_CLOSING, + + /** + * In the penultimate phase, + * we wait until all our demands + * are satisfied. Then we send a done + * message, and wait for another done message. */ - PHASE_EXPECT_ELEMENTS_AND_REQUESTS, + PHASE_FINISH_WAITING, + + /** + * In the ultimate phase, we wait until + * our demands are satisfied and then + * quit (sending another DONE message). + */ + PHASE_DONE, + /** - * The protocol is over. - * Results may still have to be sent to the client. + * After sending the full set, wait for responses with the elements + * that the local peer is missing. */ - PHASE_FINISHED + PHASE_FULL_SENDING, }; /** - * State of an evaluate operation - * with another peer. + * State of an evaluate operation with another peer. */ struct OperationState { - /** - * Number of ibf buckets received - */ - unsigned int ibf_buckets_received; - /** * Copy of the set's strata estimator at the time of - * creation of this operation + * creation of this operation. */ struct StrataEstimator *se; /** - * The ibf we currently receive + * The IBF we currently receive. */ struct InvertibleBloomFilter *remote_ibf; /** - * IBF of the set's element. + * The IBF with the local set's element. */ struct InvertibleBloomFilter *local_ibf; /** - * Maps IBF-Keys (specific to the current salt) to elements. + * Maps unsalted IBF-Keys to elements. * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. * Colliding IBF-Keys are linked. */ struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; - /** - * Iterator for sending elements on the key to element mapping to the client. - */ - struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; - /** * Current state of the operation. */ @@ -161,12 +172,48 @@ struct OperationState * Did we send the client that we are done? */ int client_done_sent; + + /** + * Number of ibf buckets already received into the @a remote_ibf. + */ + unsigned int ibf_buckets_received; + + /** + * Hashes for elements that we have demanded from the other peer. + */ + struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; + + /** + * Salt that we're using for sending IBFs + */ + uint32_t salt_send; + + /** + * Salt for the IBF we've received and that we're currently decoding. + */ + uint32_t salt_receive; + + /** + * Number of elements we received from the other peer + * that were not in the local set yet. + */ + uint32_t received_fresh; + + /** + * Total number of elements received from the other peer. + */ + uint32_t received_total; + + /** + * Initial size of our set, just before + * the operation started. + */ + uint64_t initial_size; }; /** - * The key entry is used to associate an ibf key with - * an element. + * The key entry is used to associate an ibf key with an element. */ struct KeyEntry { @@ -177,14 +224,19 @@ struct KeyEntry /** * The actual element associated with the key. + * + * Only owned by the union operation if element->operation + * is #GNUNET_YES. */ struct ElementEntry *element; /** - * Element that collides with this element - * on the ibf key. All colliding entries must have the same ibf key. + * Did we receive this element? + * Even if element->is_foreign is false, we might + * have received the element, so this indicates that + * the other peer has it. */ - struct KeyEntry *next_colliding; + int received; }; @@ -224,13 +276,13 @@ struct SetState /** - * Iterator over hash map entries. + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. * * @param cls closure * @param key current key code * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, + * @return #GNUNET_YES if we should continue to iterate, * #GNUNET_NO if not. */ static int @@ -239,31 +291,29 @@ destroy_key_to_element_iter (void *cls, void *value) { struct KeyEntry *k = value; - /* destroy the linked list of colliding ibf key entries */ - while (NULL != k) + + GNUNET_assert (NULL != k); + if (GNUNET_YES == k->element->remote) { - struct KeyEntry *k_tmp = k; - k = k->next_colliding; - if (GNUNET_YES == k_tmp->element->remote) - { - GNUNET_free (k_tmp->element); - k_tmp->element = NULL; - } - GNUNET_free (k_tmp); + GNUNET_free (k->element); + k->element = NULL; } + GNUNET_free (k); return GNUNET_YES; } /** - * Destroy the union operation. Only things specific to the union operation are destroyed. + * Destroy the union operation. Only things specific to the union + * operation are destroyed. * * @param op union operation to destroy */ static void union_op_cancel (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op\n"); /* check if the op was canceled twice */ GNUNET_assert (NULL != op->state); if (NULL != op->state->remote_ibf) @@ -271,6 +321,11 @@ union_op_cancel (struct Operation *op) ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; } + if (NULL != op->state->demanded_hashes) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); + op->state->demanded_hashes = NULL; + } if (NULL != op->state->local_ibf) { ibf_destroy (op->state->local_ibf); @@ -283,13 +338,16 @@ union_op_cancel (struct Operation *op) } if (NULL != op->state->key_to_element) { - GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL); + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &destroy_key_to_element_iter, + NULL); GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); op->state->key_to_element = NULL; } GNUNET_free (op->state); op->state = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op done\n"); } @@ -305,14 +363,14 @@ fail_union_operation (struct Operation *op) struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n"); - + LOG (GNUNET_ERROR_TYPE_ERROR, + "union operation failed\n"); ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->request_id = htonl (op->spec->client_request_id); msg->element_type = htons (0); GNUNET_MQ_send (op->spec->set->client_mq, ev); - _GSS_operation_destroy (op); + _GSS_operation_destroy (op, GNUNET_YES); } @@ -321,120 +379,56 @@ fail_union_operation (struct Operation *op) * a salt. * * @param src the hash code - * @param salt salt to use * @return the derived IBF key */ static struct IBF_Key -get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt) +get_ibf_key (const struct GNUNET_HashCode *src) { struct IBF_Key key; + uint16_t salt = 0; - GNUNET_CRYPTO_hkdf (&key, sizeof (key), - GCRY_MD_SHA512, GCRY_MD_SHA256, - src, sizeof *src, - &salt, sizeof (salt), - NULL, 0); + GNUNET_CRYPTO_kdf (&key, sizeof (key), + src, sizeof *src, + &salt, sizeof (salt), + NULL, 0); return key; } /** - * Send a request for the evaluate operation to a remote peer - * - * @param op operation with the other peer - */ -static void -send_operation_request (struct Operation *op) -{ - struct GNUNET_MQ_Envelope *ev; - struct OperationRequestMessage *msg; - - ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - op->spec->context_msg); - - if (NULL == ev) - { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (op->spec->set->client); - return; - } - msg->operation = htonl (GNUNET_SET_OPERATION_UNION); - msg->app_id = op->spec->app_id; - msg->salt = htonl (op->spec->salt); - GNUNET_MQ_send (op->mq, ev); - - if (NULL != op->spec->context_msg) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); - - if (NULL != op->spec->context_msg) - { - GNUNET_free (op->spec->context_msg); - op->spec->context_msg = NULL; - } -} - - -/** - * Iterator to create the mapping between ibf keys - * and element entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. + * Context for #op_get_element_iterator */ -static int -op_register_element_iterator (void *cls, - uint32_t key, - void *value) +struct GetElementContext { - struct KeyEntry *const new_k = cls; - struct KeyEntry *old_k = value; - - GNUNET_assert (NULL != old_k); - /* check if our ibf key collides with the ibf key in the existing entry */ - if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) - { - /* insert the the new key in the collision chain */ - new_k->next_colliding = old_k->next_colliding; - old_k->next_colliding = new_k; - /* signal to the caller that we were able to insert into a colliding bucket */ - return GNUNET_NO; - } - return GNUNET_YES; -} + struct GNUNET_HashCode hash; + struct KeyEntry *k; +}; /** - * Iterator to create the mapping between ibf keys - * and element entries. + * Iterator over the mapping from IBF keys to element entries. Checks if we + * have an element with a given GNUNET_HashCode. * * @param cls closure * @param key current key code * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. + * @return #GNUNET_YES if we should search further, + * #GNUNET_NO if we've found the element. */ static int -op_has_element_iterator (void *cls, +op_get_element_iterator (void *cls, uint32_t key, void *value) { - struct GNUNET_HashCode *element_hash = cls; + struct GetElementContext *ctx = cls; struct KeyEntry *k = value; GNUNET_assert (NULL != k); - while (NULL != k) + if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, + &ctx->hash)) { - if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash)) - return GNUNET_NO; - k = k->next_colliding; + ctx->k = k; + return GNUNET_NO; } return GNUNET_YES; } @@ -448,21 +442,29 @@ op_has_element_iterator (void *cls, * @param element_hash hash of the element to look for * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise */ -static int -op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) +static struct KeyEntry * +op_get_element (struct Operation *op, + const struct GNUNET_HashCode *element_hash) { int ret; struct IBF_Key ibf_key; + struct GetElementContext ctx = {{{ 0 }} , 0}; - ibf_key = get_ibf_key (element_hash, op->spec->salt); + ctx.hash = *element_hash; + + ibf_key = get_ibf_key (element_hash); ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_has_element_iterator, (void *) element_hash); + op_get_element_iterator, + &ctx); /* was the iteration aborted because we found the element? */ if (GNUNET_SYSERR == ret) - return GNUNET_YES; - return GNUNET_NO; + { + GNUNET_assert (NULL != ctx.k); + return ctx.k; + } + return NULL; } @@ -474,30 +476,55 @@ op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash * This is done to speed up re-tried operations, if some elements * were transmitted, and then the IBF fails to decode. * + * XXX: clarify ownership, doesn't sound right. + * * @param op the union operation * @param ee the element entry + * @parem received was this element received from the remote peer? */ static void -op_register_element (struct Operation *op, struct ElementEntry *ee) +op_register_element (struct Operation *op, + struct ElementEntry *ee, + int received) { - int ret; struct IBF_Key ibf_key; struct KeyEntry *k; - ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt); + ibf_key = get_ibf_key (&ee->element_hash); k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; - ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + k->received = received; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_register_element_iterator, k); + k, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); +} - /* was the element inserted into a colliding bucket? */ - if (GNUNET_SYSERR == ret) - return; - GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; } @@ -513,12 +540,17 @@ prepare_ibf_iterator (void *cls, uint32_t key, void *value) { - struct InvertibleBloomFilter *ibf = cls; + struct Operation *op = cls; struct KeyEntry *ke = value; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val); - - ibf_insert (ibf, ke->ibf_key); + struct IBF_Key salted_key; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] inserting %lx (hash %s) into ibf\n", + (void *) op, + (unsigned long) ke->ibf_key.key_val, + GNUNET_h2s (&ke->element->element_hash)); + salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); + ibf_insert (op->state->local_ibf, salted_key); return GNUNET_YES; } @@ -527,12 +559,11 @@ prepare_ibf_iterator (void *cls, * Iterator for initializing the * key-to-element mapping of a union operation * - * @param cls the union operation - * @param key unised - * @param value the element entry to insert + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert * into the key-to-element mapping - * @return GNUNET_YES to continue iterating, - * GNUNET_NO to stop + * @return #GNUNET_YES (to continue iterating) */ static int init_key_to_element_iterator (void *cls, @@ -540,63 +571,100 @@ init_key_to_element_iterator (void *cls, void *value) { struct Operation *op = cls; - struct ElementEntry *e = value; + struct ElementEntry *ee = value; /* make sure that the element belongs to the set at the time * of creating the operation */ - if ( (e->generation_added > op->generation_created) || - ( (GNUNET_YES == e->removed) && - (e->generation_removed < op->generation_created))) + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) return GNUNET_YES; - GNUNET_assert (GNUNET_NO == e->remote); + GNUNET_assert (GNUNET_NO == ee->remote); - op_register_element (op, e); + op_register_element (op, ee, GNUNET_NO); return GNUNET_YES; } +/** + * Initialize the IBF key to element mapping local to this set + * operation. + * + * @param op the set union operation + */ +static void +initialize_key_to_element (struct Operation *op) +{ + unsigned int len; + + GNUNET_assert (NULL == op->state->key_to_element); + len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); + op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); +} + + /** * Create an ibf with the operation's elements * of the specified size * * @param op the union operation * @param size size of the ibf to create + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -prepare_ibf (struct Operation *op, uint16_t size) +static int +prepare_ibf (struct Operation *op, + uint32_t size) { - if (NULL == op->state->key_to_element) - { - unsigned int len; - len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements); - op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - init_key_to_element_iterator, op); - } + GNUNET_assert (NULL != op->state->key_to_element); + if (NULL != op->state->local_ibf) ibf_destroy (op->state->local_ibf); op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + if (NULL == op->state->local_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate local IBF\n"); + return GNUNET_SYSERR; + } GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, - prepare_ibf_iterator, op->state->local_ibf); + &prepare_ibf_iterator, + op); + return GNUNET_OK; } /** * Send an ibf of appropriate size. * + * Fragments the IBF into multiple messages if necessary. + * * @param op the union operation * @param ibf_order order of the ibf to send, size=2^order + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -send_ibf (struct Operation *op, uint16_t ibf_order) +static int +send_ibf (struct Operation *op, + uint16_t ibf_order) { unsigned int buckets_sent = 0; struct InvertibleBloomFilter *ibf; - prepare_ibf (op, 1<state->local_ibf; @@ -611,20 +679,29 @@ send_ibf (struct Operation *op, uint16_t ibf_order) if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) buckets_in_message = MAX_BUCKETS_PER_MESSAGE; - ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); - msg->reserved = 0; + ev = GNUNET_MQ_msg_extra (msg, + buckets_in_message * IBF_BUCKET_SIZE, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); + msg->reserved1 = 0; + msg->reserved2 = 0; msg->order = ibf_order; - msg->offset = htons (buckets_sent); + msg->offset = htonl (buckets_sent); + msg->salt = htonl (op->state->salt_send); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", - buckets_in_message, buckets_sent, 1<mq, ev); } - op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; + /* The other peer must decode the IBF, so + * we're passive. */ + op->state->phase = PHASE_INVENTORY_PASSIVE; + return GNUNET_OK; } @@ -636,16 +713,33 @@ send_ibf (struct Operation *op, uint16_t ibf_order) static void send_strata_estimator (struct Operation *op) { + const struct StrataEstimator *se = op->state->se; struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *strata_msg; - - ev = GNUNET_MQ_msg_header_extra (strata_msg, - SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE); - strata_estimator_write (op->state->se, &strata_msg[1]); - GNUNET_MQ_send (op->mq, ev); + struct StrataEstimatorMessage *strata_msg; + char *buf; + size_t len; + uint16_t type; + + buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + len = strata_estimator_write (op->state->se, + buf); + if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; + else + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; + ev = GNUNET_MQ_msg_extra (strata_msg, + len, + type); + GNUNET_memcpy (&strata_msg[1], + buf, + len); + GNUNET_free (buf); + strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); + GNUNET_MQ_send (op->mq, + ev); op->state->phase = PHASE_EXPECT_IBF; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent SE, expecting IBF\n"); } @@ -662,11 +756,56 @@ get_order_from_difference (unsigned int diff) unsigned int ibf_order; ibf_order = 2; - while ((1< MAX_IBF_ORDER) ibf_order = MAX_IBF_ORDER; - return ibf_order; + // add one for correction + return ibf_order + 1; +} + + +/** + * Send a set element. + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +send_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = value; + struct GNUNET_SET_Element *el = &ee->element; + struct GNUNET_MQ_Envelope *ev; + + + ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + emsg->element_type = htons (el->element_type); + GNUNET_memcpy (&emsg[1], el->data, el->size); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; +} + + +static void +send_full_set (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_FULL_SENDING; + + (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &send_element_iterator, op); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); } @@ -675,33 +814,127 @@ get_order_from_difference (unsigned int diff) * * @param cls the union operation * @param mh the message + * @param is_compressed #GNUNET_YES if the estimator is compressed + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_strata_estimator (void *cls, + const struct GNUNET_MessageHeader *mh, + int is_compressed) { struct Operation *op = cls; struct StrataEstimator *remote_se; - int diff; + struct StrataEstimatorMessage *msg = (void *) mh; + unsigned int diff; + uint64_t other_size; + size_t len; + + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (mh->size), + GNUNET_NO); if (op->state->phase != PHASE_EXPECT_SE) + { + GNUNET_break (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); + if ( (GNUNET_NO == is_compressed) && + (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) { fail_union_operation (op); GNUNET_break (0); - return; + return GNUNET_SYSERR; } - remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, + other_size = GNUNET_ntohll (msg->set_size); + remote_se = strata_estimator_create (SE_STRATA_COUNT, + SE_IBF_SIZE, SE_IBF_HASH_NUM); - strata_estimator_read (&mh[1], remote_se); + if (NULL == remote_se) + { + /* insufficient resources, fail */ + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + strata_estimator_read (&msg[1], + len, + is_compressed, + remote_se)) + { + /* decompression failed */ + fail_union_operation (op); + strata_estimator_destroy (remote_se); + return GNUNET_SYSERR; + } GNUNET_assert (NULL != op->state->se); - diff = strata_estimator_difference (remote_se, op->state->se); + diff = strata_estimator_difference (remote_se, + op->state->se); + + if (diff > 200) + diff = diff * 3 / 2; + strata_estimator_destroy (remote_se); strata_estimator_destroy (op->state->se); op->state->se = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", - diff, 1<spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) + { + GNUNET_break (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + + + if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) + { + LOG (GNUNET_ERROR_TYPE_INFO, + "Sending full set (diff=%d, own set=%u)\n", + diff, + op->state->initial_size); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of full sends", + 1, + GNUNET_NO); + if (op->state->initial_size <= other_size) + { + send_full_set (op); + } + else + { + struct GNUNET_MQ_Envelope *ev; + op->state->phase = PHASE_EXPECT_IBF; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); + GNUNET_MQ_send (op->mq, ev); + } + } + else + { + GNUNET_STATISTICS_update (_GSS_statistics, + "# of ibf sends", + 1, + GNUNET_NO); + if (GNUNET_OK != + send_ibf (op, + get_order_from_difference (diff))) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } + } + return GNUNET_OK; +} /** @@ -712,49 +945,44 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) * @param value the key entry */ static int -send_element_iterator (void *cls, - uint32_t key, - void *value) +send_offers_iterator (void *cls, + uint32_t key, + void *value) { struct SendElementClosure *sec = cls; - struct IBF_Key ibf_key = sec->ibf_key; struct Operation *op = sec->op; struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_MessageHeader *mh; - if (ke->ibf_key.key_val != ibf_key.key_val) + /* Detect 32-bit key collision for the 64-bit IBF keys. */ + if (ke->ibf_key.key_val != sec->ibf_key.key_val) return GNUNET_YES; - while (NULL != ke) - { - const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *mh; - GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (NULL == ev) - { - /* element too large */ - GNUNET_break (0); - continue; - } - memcpy (&mh[1], element->data, element->size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", - GNUNET_h2s (&ke->element->element_hash)); - GNUNET_MQ_send (op->mq, ev); - ke = ke->next_colliding; - } - return GNUNET_NO; + ev = GNUNET_MQ_msg_header_extra (mh, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); + + GNUNET_assert (NULL != ev); + *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] sending element offer (%s) to peer\n", + (void *) op, + GNUNET_h2s (&ke->element->element_hash)); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; } + /** - * Send all elements that have the specified IBF key - * to the remote peer of the union operation + * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key. * * @param op union operation * @param ibf_key IBF key of interest */ static void -send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key) +send_offers_for_key (struct Operation *op, + struct IBF_Key ibf_key) { struct SendElementClosure send_cls; @@ -762,17 +990,19 @@ send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key) send_cls.op = op; (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - &send_element_iterator, &send_cls); + &send_offers_iterator, + &send_cls); } /** * Decode which elements are missing on each side, and - * send the appropriate elemens and requests + * send the appropriate offers and inquiries. * * @param op union operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void +static int decode_and_send (struct Operation *op) { struct IBF_Key key; @@ -781,19 +1011,27 @@ decode_and_send (struct Operation *op) unsigned int num_decoded; struct InvertibleBloomFilter *diff_ibf; - GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); + GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); - prepare_ibf (op, op->state->remote_ibf->size); + if (GNUNET_OK != + prepare_ibf (op, op->state->remote_ibf->size)) + { + GNUNET_break (0); + /* allocation failed */ + return GNUNET_SYSERR; + } diff_ibf = ibf_dup (op->state->local_ibf); ibf_subtract (diff_ibf, op->state->remote_ibf); ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoding IBF (size=%u)\n", + diff_ibf->size); num_decoded = 0; - last_key.key_val = 0; + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ while (1) { @@ -805,17 +1043,23 @@ decode_and_send (struct Operation *op) res = ibf_decode (diff_ibf, &side, &key); if (res == GNUNET_OK) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", - key.key_val); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoded ibf key %lx\n", + (unsigned long) key.key_val); num_decoded += 1; - if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) + if ( (num_decoded > diff_ibf->size) || + ( (num_decoded > 1) && + (last_key.key_val == key.key_val) ) ) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", - num_decoded, diff_ibf->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "detected cyclic ibf (decoded %u/%u)\n", + num_decoded, + diff_ibf->size); cycle_detected = GNUNET_YES; } } - if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) + if ( (GNUNET_SYSERR == res) || + (GNUNET_YES == cycle_detected) ) { int next_order; next_order = 0; @@ -824,15 +1068,37 @@ decode_and_send (struct Operation *op) next_order++; if (next_order <= MAX_IBF_ORDER) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "decoding failed, sending larger ibf (size %u)\n", - 1<state->salt_send++; + if (GNUNET_OK != + send_ibf (op, next_order)) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; + } } else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "set union failed: reached ibf limit\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of failed union operations (too large)", + 1, + GNUNET_NO); + // XXX: Send the whole set, element-by-element + LOG (GNUNET_ERROR_TYPE_ERROR, + "set union failed: reached ibf limit\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; } break; } @@ -840,27 +1106,38 @@ decode_and_send (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "transmitted all values, sending DONE\n"); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); GNUNET_MQ_send (op->mq, ev); + /* We now wait until we get a DONE message back + * and then wait for our MQ to be flushed and all our + * demands be delivered. */ break; } if (1 == side) { - send_elements_for_key (op, key); + struct IBF_Key unsalted_key; + unsalt_key (&key, op->state->salt_receive, &unsalted_key); + send_offers_for_key (op, unsalted_key); } else if (-1 == side) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *msg; + struct InquiryMessage *msg; - /* It may be nice to merge multiple requests, but with mesh's corking it is not worth + /* It may be nice to merge multiple requests, but with CADET's corking it is not worth * the effort additional complexity. */ - ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), - GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); - - *(struct IBF_Key *) &msg[1] = key; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); + ev = GNUNET_MQ_msg_extra (msg, + sizeof (struct IBF_Key), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); + msg->salt = htonl (op->state->salt_receive); + GNUNET_memcpy (&msg[1], + &key, + sizeof (struct IBF_Key)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element inquiry for IBF key %lx\n", + (unsigned long) key.key_val); GNUNET_MQ_send (op->mq, ev); } else @@ -869,47 +1146,87 @@ decode_and_send (struct Operation *op) } } ibf_destroy (diff_ibf); + return GNUNET_OK; } /** * Handle an IBF message from a remote peer. * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * * @param cls the union operation * @param mh the header of the message + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_ibf (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct IBFMessage *msg = (struct IBFMessage *) mh; + const struct IBFMessage *msg; unsigned int buckets_in_message; - if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || + if (ntohs (mh->size) < sizeof (struct IBFMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + msg = (const struct IBFMessage *) mh; + if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || (op->state->phase == PHASE_EXPECT_IBF) ) { op->state->phase = PHASE_EXPECT_IBF_CONT; GNUNET_assert (NULL == op->state->remote_ibf); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<order); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new ibf of size %u\n", + 1 << msg->order); op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); - op->state->ibf_buckets_received = 0; - if (0 != ntohs (msg->offset)) + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); + if (NULL == op->state->remote_ibf) { - GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse remote IBF, closing connection\n"); fail_union_operation (op); - return; + return GNUNET_SYSERR; } - } + op->state->ibf_buckets_received = 0; + if (0 != ntohl (msg->offset)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + } else if (op->state->phase == PHASE_EXPECT_IBF_CONT) { - if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) || - (1<order != op->state->remote_ibf->size) ) + if (ntohl (msg->offset) != op->state->ibf_buckets_received) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; + } + if (1<order != op->state->remote_ibf->size) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (ntohl (msg->salt) != op->state->salt_receive) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; } } + else + { + GNUNET_assert (0); + } buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; @@ -917,25 +1234,39 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) { GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; } if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; } - ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf); + GNUNET_assert (NULL != op->state->remote_ibf); + + ibf_read_slice (&msg[1], + op->state->ibf_buckets_received, + buckets_in_message, + op->state->remote_ibf); op->state->ibf_buckets_received += buckets_in_message; if (op->state->ibf_buckets_received == op->state->remote_ibf->size) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); - op->state->phase = PHASE_EXPECT_ELEMENTS; - decode_and_send (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received full ibf\n"); + op->state->phase = PHASE_INVENTORY_ACTIVE; + if (GNUNET_OK != + decode_and_send (op)) + { + /* Internal error, best we can do is shut down */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to decode IBF, closing connection\n"); + return GNUNET_SYSERR; + } } + return GNUNET_OK; } @@ -945,15 +1276,19 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) * * @param op union operation * @param element element to send + * @param status status to send with the new element */ static void send_client_element (struct Operation *op, - struct GNUNET_SET_Element *element) + struct GNUNET_SET_Element *element, + int status) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element (size %u) to client\n", + element->size); GNUNET_assert (0 != op->spec->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); if (NULL == ev) @@ -962,10 +1297,11 @@ send_client_element (struct Operation *op, GNUNET_break (0); return; } - rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->result_status = htons (status); rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); + rm->element_type = htons (element->element_type); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); + GNUNET_memcpy (&rm[1], element->data, element->size); GNUNET_MQ_send (op->spec->set->client_mq, ev); } @@ -982,97 +1318,169 @@ send_done_and_destroy (void *cls) struct Operation *op = cls; struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - int keep = op->keep; + ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); rm->request_id = htonl (op->spec->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); + rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); GNUNET_MQ_send (op->spec->set->client_mq, ev); - _GSS_operation_destroy (op); - if (GNUNET_YES == keep) - GNUNET_free (op); + /* Will also call the union-specific cancel function. */ + _GSS_operation_destroy (op, GNUNET_YES); } -/** - * Send all remaining elements in the full result iterator. - * - * @param cls operation - */ static void -send_remaining_elements (void *cls) +maybe_finish (struct Operation *op) { - struct Operation *op = cls; - struct KeyEntry *ke; - int res; + unsigned int num_demanded; - res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke); - if (GNUNET_NO == res) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); - send_done_and_destroy (op); - return; - } + num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n"); - - while (1) + if (PHASE_FINISH_WAITING == op->state->phase) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; - struct GNUNET_SET_Element *element; - element = &ke->element->element; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); - GNUNET_assert (0 != op->spec->client_request_id); - ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); - if (NULL == ev) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_WAITING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) { - GNUNET_MQ_discard (ev); - GNUNET_break (0); - continue; + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_DONE; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, ev); + + /* We now wait until the other peer closes the channel + * after it got all elements from us. */ } - rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); - if (ke->next_colliding == NULL) + } + if (PHASE_FINISH_CLOSING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_CLOSING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) { - GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - break; + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); } - GNUNET_MQ_send (op->spec->set->client_mq, ev); - ke = ke->next_colliding; } } /** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * Handle an element message from a remote peer. + * Sent by the other peer either because we decoded an IBF and placed a demand, + * or because the other peer switched to full set transmission. * - * @param op union operation + * @param cls the union operation + * @param mh the message */ static void -finish_and_destroy (struct Operation *op) +handle_p2p_elements (void *cls, + const struct GNUNET_MessageHeader *mh) { - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + struct Operation *op = cls; + struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; + uint16_t element_size; + + if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; - if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, + &ee->element_hash, + NULL)) { - /* prevent that the op is free'd by the tunnel end handler */ - op->keep = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); - GNUNET_assert (NULL == op->state->full_result_iter); - op->state->full_result_iter = - GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); - send_remaining_elements (op); + /* We got something we didn't demand, since it's not in our map. */ + GNUNET_break_op (0); + GNUNET_free (ee); + fail_union_operation (op); return; } - send_done_and_destroy (op); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; + + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } + + if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + maybe_finish (op); } @@ -1083,132 +1491,509 @@ finish_and_destroy (struct Operation *op) * @param mh the message */ static void -handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_full_element (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; uint16_t element_size; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); - - if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) && - (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) { + GNUNET_break_op (0); fail_union_operation (op); - GNUNET_break (0); return; } - element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); - ee = GNUNET_malloc (sizeof *ee + element_size); - memcpy (&ee[1], &mh[1], element_size); + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; + + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); ee->element.size = element_size; ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); ee->remote = GNUNET_YES; - GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (full diff, size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; + + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); - if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + if (NULL != ke) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n"); + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; GNUNET_free (ee); - return; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } } - op_register_element (op, ee); - /* only send results immediately if the client wants it */ - if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode) - send_client_element (op, &ee->element); + if ( (GNUNET_YES == op->spec->byzantine) && + (op->state->received_total > 384 + op->state->received_fresh * 4) && + (op->state->received_fresh < op->state->received_total / 6) ) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + LOG (GNUNET_ERROR_TYPE_ERROR, + "Other peer sent only %llu/%llu fresh elements, failing operation\n", + (unsigned long long) op->state->received_fresh, + (unsigned long long) op->state->received_total); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } } - /** - * Handle an element request from a remote peer. + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). * * @param cls the union operation * @param mh the message */ static void -handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_inquiry (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct IBF_Key *ibf_key; + const struct IBF_Key *ibf_key; unsigned int num_keys; + struct InquiryMessage *msg; /* look up elements and send them */ - if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if (op->state->phase != PHASE_INVENTORY_PASSIVE) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); return; } - - num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key); - - if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) + num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) + / sizeof (struct IBF_Key); + if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) + != num_keys * sizeof (struct IBF_Key)) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); return; } - ibf_key = (struct IBF_Key *) &mh[1]; + msg = (struct InquiryMessage *) mh; + + ibf_key = (const struct IBF_Key *) &msg[1]; while (0 != num_keys--) { - send_elements_for_key (op, *ibf_key); + struct IBF_Key unsalted_key; + unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); + send_offers_for_key (op, unsalted_key); ibf_key++; } } /** - * Handle a done message from a remote peer + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +send_missing_elements_iter (void *cls, + uint32_t key, + void *value) +{ + struct Operation *op = cls; + struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = ke->element; + + if (GNUNET_YES == ke->received) + return GNUNET_YES; + + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + GNUNET_MQ_send (op->mq, ev); + + return GNUNET_YES; +} + + +/** + * Handle a + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_request_full (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF != op->state->phase) + { + fail_union_operation (op); + GNUNET_break_op (0); + return; + } + + // FIXME: we need to check that our set is larger than the + // byzantine_lower_bound by some threshold + send_full_set (op); +} + + +/** + * Handle a "full done" message. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_full_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF == op->state->phase) + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); + + /* send all the elements that did not come from the remote peer */ + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &send_missing_elements_iter, + op); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); + op->state->phase = PHASE_DONE; + + /* we now wait until the other peer shuts the tunnel down*/ + } + else if (PHASE_FULL_SENDING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); + /* We sent the full set, and got the response for that. We're done. */ + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + + +/** + * Handle a demand by the other peer for elements based on a list + * of GNUNET_HashCode-s. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct GNUNET_SET_ElementMessage *emsg; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + struct GNUNET_MQ_Envelope *ev; + + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); + if (NULL == ee) + { + /* Demand for non-existing element. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + /* Probably confused lazily copied sets. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", + (void *) op, + (unsigned int) ee->element.size, + GNUNET_h2s (&ee->element_hash)); + GNUNET_MQ_send (op->mq, ev); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + /* Nothing to do. */ + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } +} + + +/** + * Handle offers (of GNUNET_HashCode-s) and + * respond with demands (of GNUNET_HashCode-s). * * @param cls the union operation * @param mh the message */ static void -handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct GNUNET_MQ_Envelope *ev; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + + /* look up elements and send them */ + if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_INVENTORY_ACTIVE)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } - if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) { - /* we got all requests, but still have to send our elements as response */ + struct ElementEntry *ee; + struct GNUNET_MessageHeader *demands; + struct GNUNET_MQ_Envelope *ev; + + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + hash); + if (NULL != ee) + if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) + continue; + + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, + hash)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Skipped sending duplicate demand\n"); + continue; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); - op->state->phase = PHASE_FINISHED; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, + hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Requesting element (hash %s)\n", + (void *) op, GNUNET_h2s (hash)); + ev = GNUNET_MQ_msg_header_extra (demands, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); + *(struct GNUNET_HashCode *) &demands[1] = *hash; GNUNET_MQ_send (op->mq, ev); + } +} + + +/** + * Handle a done message from a remote peer + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (op->state->phase == PHASE_INVENTORY_PASSIVE) + { + /* We got all requests, but still have to send our elements in response. */ + + op->state->phase = PHASE_FINISH_WAITING; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as passive partner), waiting for our demands to be satisfied\n"); + /* The active peer is done sending offers + * and inquiries. This means that all + * our responses to that (demands and offers) + * must be in flight (queued or in mesh). + * + * We should notify the active peer once + * all our demands are satisfied, so that the active + * peer can quit if we gave him everything. + */ + maybe_finish (op); return; } - if (op->state->phase == PHASE_EXPECT_ELEMENTS) + if (op->state->phase == PHASE_INVENTORY_ACTIVE) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); - op->state->phase = PHASE_FINISHED; - finish_and_destroy (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as active partner), waiting to finish\n"); + /* All demands of the other peer are satisfied, + * and we processed all offers, thus we know + * exactly what our demands must be. + * + * We'll close the channel + * to the other peer once our demands are met. + */ + op->state->phase = PHASE_FINISH_CLOSING; + maybe_finish (op); return; } - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); } /** - * Evaluate a union operation with - * a remote peer. + * Initiate operation to evaluate a set union with a remote peer. * - * @param op operation to evaluate + * @param op operation to perform (to be initialized) + * @param opaque_context message to be transmitted to the listener + * to convince him to accept, may be NULL */ static void -union_evaluate (struct Operation *op) +union_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) { + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + GNUNET_assert (NULL == op->state); op->state = GNUNET_new (struct OperationState); - // copy the current generation's strata estimator for this operation + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + /* copy the current generation's strata estimator for this operation */ op->state->se = strata_estimator_dup (op->spec->set->state->se); /* we started the operation, thus we have to send the operation request */ op->state->phase = PHASE_EXPECT_SE; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation\n"); - send_operation_request (op); + op->state->salt_receive = op->state->salt_send = 42; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initiating union operation evaluation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of initiated union operations", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (op->spec->set->client); + return; + } + msg->operation = htonl (GNUNET_SET_OPERATION_UNION); + GNUNET_MQ_send (op->mq, + ev); + + if (NULL != opaque_context) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request with context message\n"); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request without context message\n"); + + initialize_key_to_element (op); + op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); } @@ -1221,9 +2006,25 @@ union_evaluate (struct Operation *op) static void union_accept (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "accepting set union operation\n"); + GNUNET_assert (NULL == op->state); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# of accepted union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + op->state = GNUNET_new (struct OperationState); op->state->se = strata_estimator_dup (op->spec->set->state->se); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + op->state->salt_receive = op->state->salt_send = 42; + initialize_key_to_element (op); + op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); /* kick off the operation */ send_strata_estimator (op); } @@ -1235,18 +2036,25 @@ union_accept (struct Operation *op) * We maintain one strata estimator per set and then manipulate it over the * lifetime of the set, as recreating a strata estimator would be expensive. * - * @return the newly created set + * @return the newly created set, NULL on error */ static struct SetState * union_set_create (void) { struct SetState *set_state; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n"); - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "union set created\n"); set_state = GNUNET_new (struct SetState); set_state->se = strata_estimator_create (SE_STRATA_COUNT, - SE_IBF_SIZE, SE_IBF_HASH_NUM); + SE_IBF_SIZE, SE_IBF_HASH_NUM); + if (NULL == set_state->se) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate strata estimator\n"); + GNUNET_free (set_state); + return NULL; + } return set_state; } @@ -1260,7 +2068,8 @@ union_set_create (void) static void union_add (struct SetState *set_state, struct ElementEntry *ee) { - strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0)); + strata_estimator_insert (set_state->se, + get_ibf_key (&ee->element_hash)); } @@ -1274,7 +2083,8 @@ union_add (struct SetState *set_state, struct ElementEntry *ee) static void union_remove (struct SetState *set_state, struct ElementEntry *ee) { - strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0)); + strata_estimator_remove (set_state->se, + get_ibf_key (&ee->element_hash)); } @@ -1300,66 +2110,109 @@ union_set_destroy (struct SetState *set_state) * * @param op the state of the union evaluate operation * @param mh the received message - * @return GNUNET_SYSERR if the tunnel should be disconnected, - * GNUNET_OK otherwise + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ int union_handle_p2p_message (struct Operation *op, const struct GNUNET_MessageHeader *mh) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", - ntohs (mh->type), ntohs (mh->size)); + //LOG (GNUNET_ERROR_TYPE_DEBUG, + // "received p2p message (t: %u, s: %u)\n", + // ntohs (mh->type), + // ntohs (mh->size)); switch (ntohs (mh->type)) { case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: - handle_p2p_ibf (op, mh); - break; + return handle_p2p_ibf (op, mh); case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: - handle_p2p_strata_estimator (op, mh); - break; + return handle_p2p_strata_estimator (op, mh, GNUNET_NO); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC: + return handle_p2p_strata_estimator (op, mh, GNUNET_YES); case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: handle_p2p_elements (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: - handle_p2p_element_requests (op, mh); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: + handle_p2p_full_element (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: + handle_p2p_inquiry (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: handle_p2p_done (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: + handle_p2p_offer (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: + handle_p2p_demand (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: + handle_p2p_full_done (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: + handle_p2p_request_full (op, mh); + break; default: - /* something wrong with mesh's message handlers? */ + /* Something wrong with cadet's message handlers? */ GNUNET_assert (0); } return GNUNET_OK; } + /** - * handler for peer-disconnects, notifies the client - * about the aborted operation in case the op was not concluded + * Handler for peer-disconnects, notifies the client + * about the aborted operation in case the op was not concluded. * * @param op the destroyed operation */ static void union_peer_disconnect (struct Operation *op) { - if (PHASE_FINISHED != op->state->phase) + if (PHASE_DONE != op->state->phase) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); msg->request_id = htonl (op->spec->client_request_id); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); - _GSS_operation_destroy (op); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + LOG (GNUNET_ERROR_TYPE_WARNING, + "other peer disconnected prematurely, phase %u\n", + op->state->phase); + _GSS_operation_destroy (op, + GNUNET_YES); return; } // else: the session has already been concluded - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "other peer disconnected (finished)\n"); if (GNUNET_NO == op->state->client_done_sent) - finish_and_destroy (op); + send_done_and_destroy (op); +} + + +/** + * Copy union-specific set state. + * + * @param set source set for copying the union state + * @return a copy of the union-specific set state + */ +static struct SetState * +union_copy_state (struct Set *set) +{ + struct SetState *new_state; + + new_state = GNUNET_new (struct SetState); + GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) ); + new_state->se = strata_estimator_dup (set->state->se); + + return new_state; } @@ -1382,6 +2235,7 @@ _GSS_union_vt () .accept = &union_accept, .peer_disconnect = &union_peer_disconnect, .cancel = &union_op_cancel, + .copy_state = &union_copy_state, }; return &union_vt;