X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_union.c;h=e22465fd3c0b8dac29ee8b3a3a11993d16e0671f;hb=bdd2a2f82789160f7cd1d5f6d25bdcd75a90937e;hp=6bd86c5b50878f863278af055d2072fc6e50393b;hpb=7961175bdde4e1efe2140e04caa6e600d41bcf90;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 6bd86c5b5..e22465fd3 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,34 +14,40 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ - /** - * @file set/gnunet-service-set.c + * @file set/gnunet-service-set_union.c + * @brief two-peer set operations * @author Florian Dold */ #include "platform.h" #include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" #include "gnunet-service-set.h" #include "ibf.h" -#include "strata_estimator.h" -#include "set_protocol.h" +#include "gnunet-service-set_union_strata_estimator.h" +#include "gnunet-service-set_protocol.h" #include +#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) + + /** * Number of IBFs in a strata estimator. */ #define SE_STRATA_COUNT 32 + /** * Size of the IBFs in the strata estimator. */ #define SE_IBF_SIZE 80 + /** - * hash num parameter for the difference digests and strata estimators + * The hash num parameter for the difference digests and strata estimators. */ #define SE_IBF_HASH_NUM 4 @@ -55,7 +61,7 @@ * Choose this value so that computing the IBF is still cheaper * than transmitting all values. */ -#define MAX_IBF_ORDER (16) +#define MAX_IBF_ORDER (20) /** * Number of buckets used in the ibf per estimated @@ -70,70 +76,74 @@ enum UnionOperationPhase { /** - * We sent the request message, and expect a strata estimator + * We sent the request message, and expect a strata estimator. */ PHASE_EXPECT_SE, + /** - * We sent the strata estimator, and expect an IBF + * We sent the strata estimator, and expect an IBF. This phase is entered once + * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. + * + * XXX: could use better wording. + * + * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS */ PHASE_EXPECT_IBF, + /** - * We know what type of IBF the other peer wants to send us, - * and expect the remaining parts + * Continuation for multi part IBFs. */ PHASE_EXPECT_IBF_CONT, + /** - * We are sending request and elements, - * and thus only expect elements from the other peer. + * We are decoding an IBF. */ - PHASE_EXPECT_ELEMENTS, + PHASE_INVENTORY_ACTIVE, + /** - * We are expecting elements and requests, and send - * requested elements back to the other peer. + * The other peer is decoding the IBF we just sent. */ - PHASE_EXPECT_ELEMENTS_AND_REQUESTS, + PHASE_INVENTORY_PASSIVE, + /** - * The protocol is over. - * Results may still have to be sent to the client. + * The protocol is almost finished, but we still have to flush our message + * queue and/or expect some elements. */ - PHASE_FINISHED + PHASE_FINISH_CLOSING, + + /** + * In the penultimate phase, + * we wait until all our demands + * are satisfied. Then we send a done + * message, and wait for another done message.*/ + PHASE_FINISH_WAITING, + + /** + * In the ultimate phase, we wait until + * our demands are satisfied and then + * quit (sending another DONE message). */ + PHASE_DONE }; /** - * State of an evaluate operation - * with another peer. + * State of an evaluate operation with another peer. */ struct OperationState { - /** - * Tunnel to the remote peer. - */ - struct GNUNET_MESH_Tunnel *tunnel; - - /** - * Message queue for the peer. - */ - struct GNUNET_MQ_Handle *mq; - - /** - * Number of ibf buckets received - */ - unsigned int ibf_buckets_received; - /** * Copy of the set's strata estimator at the time of - * creation of this operation + * creation of this operation. */ struct StrataEstimator *se; /** - * The ibf we currently receive + * The IBF we currently receive. */ struct InvertibleBloomFilter *remote_ibf; /** - * IBF of the set's element. + * The IBF with the local set's element. */ struct InvertibleBloomFilter *local_ibf; @@ -144,11 +154,6 @@ struct OperationState */ struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; - /** - * Iterator for sending elements on the key to element mapping to the client. - */ - struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; - /** * Current state of the operation. */ @@ -158,12 +163,31 @@ struct OperationState * Did we send the client that we are done? */ int client_done_sent; + + /** + * Number of ibf buckets already received into the @a remote_ibf. + */ + unsigned int ibf_buckets_received; + + /** + * Hashes for elements that we have demanded from the other peer. + */ + struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; + + /** + * Salt that we're using for sending IBFs + */ + uint32_t salt_send; + + /** + * Salt for the IBF we've received and that we're currently decoding. + */ + uint32_t salt_receive; }; /** - * The key entry is used to associate an ibf key with - * an element. + * The key entry is used to associate an ibf key with an element. */ struct KeyEntry { @@ -174,14 +198,11 @@ struct KeyEntry /** * The actual element associated with the key. + * + * Only owned by the union operation if element->operation + * is #GNUNET_YES. */ struct ElementEntry *element; - - /** - * Element that collides with this element - * on the ibf key. All colliding entries must have the same ibf key. - */ - struct KeyEntry *next_colliding; }; @@ -221,13 +242,13 @@ struct SetState /** - * Iterator over hash map entries. + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. * * @param cls closure * @param key current key code * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, + * @return #GNUNET_YES if we should continue to iterate, * #GNUNET_NO if not. */ static int @@ -236,31 +257,29 @@ destroy_key_to_element_iter (void *cls, void *value) { struct KeyEntry *k = value; - /* destroy the linked list of colliding ibf key entries */ - while (NULL != k) + + GNUNET_assert (NULL != k); + if (GNUNET_YES == k->element->remote) { - struct KeyEntry *k_tmp = k; - k = k->next_colliding; - if (GNUNET_YES == k_tmp->element->remote) - { - GNUNET_free (k_tmp->element); - k_tmp->element = NULL; - } - GNUNET_free (k_tmp); + GNUNET_free (k->element); + k->element = NULL; } + GNUNET_free (k); return GNUNET_YES; } /** - * Destroy the union operation. Only things specific to the union operation are destroyed. - * + * Destroy the union operation. Only things specific to the union + * operation are destroyed. + * * @param op union operation to destroy */ static void union_op_cancel (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op\n"); /* check if the op was canceled twice */ GNUNET_assert (NULL != op->state); if (NULL != op->state->remote_ibf) @@ -268,6 +287,11 @@ union_op_cancel (struct Operation *op) ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; } + if (NULL != op->state->demanded_hashes) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); + op->state->demanded_hashes = NULL; + } if (NULL != op->state->local_ibf) { ibf_destroy (op->state->local_ibf); @@ -280,13 +304,16 @@ union_op_cancel (struct Operation *op) } if (NULL != op->state->key_to_element) { - GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL); + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &destroy_key_to_element_iter, + NULL); GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); op->state->key_to_element = NULL; } GNUNET_free (op->state); op->state = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op done\n"); } @@ -302,14 +329,14 @@ fail_union_operation (struct Operation *op) struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n"); - + LOG (GNUNET_ERROR_TYPE_ERROR, + "union operation failed\n"); ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->request_id = htonl (op->spec->client_request_id); msg->element_type = htons (0); GNUNET_MQ_send (op->spec->set->client_mq, ev); - _GSS_operation_destroy (op); + _GSS_operation_destroy (op, GNUNET_YES); } @@ -318,105 +345,31 @@ fail_union_operation (struct Operation *op) * a salt. * * @param src the hash code - * @param salt salt to use * @return the derived IBF key */ static struct IBF_Key -get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt) +get_ibf_key (const struct GNUNET_HashCode *src) { struct IBF_Key key; + uint16_t salt = 0; - GNUNET_CRYPTO_hkdf (&key, sizeof (key), - GCRY_MD_SHA512, GCRY_MD_SHA256, - src, sizeof *src, - &salt, sizeof (salt), - NULL, 0); + GNUNET_CRYPTO_kdf (&key, sizeof (key), + src, sizeof *src, + &salt, sizeof (salt), + NULL, 0); return key; } /** - * Send a request for the evaluate operation to a remote peer - * - * @param op operation with the other peer - */ -static void -send_operation_request (struct Operation *op) -{ - struct GNUNET_MQ_Envelope *ev; - struct OperationRequestMessage *msg; - - ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - op->spec->context_msg); - - if (NULL == ev) - { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (op->spec->set->client); - return; - } - msg->operation = htonl (GNUNET_SET_OPERATION_UNION); - msg->app_id = op->spec->app_id; - msg->salt = htonl (op->spec->salt); - GNUNET_MQ_send (op->mq, ev); - - if (NULL != op->spec->context_msg) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); - - if (NULL != op->spec->context_msg) - { - GNUNET_free (op->spec->context_msg); - op->spec->context_msg = NULL; - } -} - - -/** - * Iterator to create the mapping between ibf keys - * and element entries. + * Iterator over the mapping from IBF keys to element entries. Checks if we + * have an element with a given GNUNET_HashCode. * * @param cls closure * @param key current key code * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. - */ -static int -op_register_element_iterator (void *cls, - uint32_t key, - void *value) -{ - struct KeyEntry *const new_k = cls; - struct KeyEntry *old_k = value; - - GNUNET_assert (NULL != old_k); - /* check if our ibf key collides with the ibf key in the existing entry */ - if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) - { - /* insert the the new key in the collision chain */ - new_k->next_colliding = old_k->next_colliding; - old_k->next_colliding = new_k; - /* signal to the caller that we were able to insert into a colliding bucket */ - return GNUNET_NO; - } - return GNUNET_YES; -} - - -/** - * Iterator to create the mapping between ibf keys - * and element entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. + * @return #GNUNET_YES if we should search further, + * #GNUNET_NO if we've found the element. */ static int op_has_element_iterator (void *cls, @@ -427,12 +380,9 @@ op_has_element_iterator (void *cls, struct KeyEntry *k = value; GNUNET_assert (NULL != k); - while (NULL != k) - { - if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash)) - return GNUNET_NO; - k = k->next_colliding; - } + if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, + element_hash)) + return GNUNET_NO; return GNUNET_YES; } @@ -446,15 +396,17 @@ op_has_element_iterator (void *cls, * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise */ static int -op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) +op_has_element (struct Operation *op, + const struct GNUNET_HashCode *element_hash) { int ret; struct IBF_Key ibf_key; - ibf_key = get_ibf_key (element_hash, op->spec->salt); + ibf_key = get_ibf_key (element_hash); ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_has_element_iterator, (void *) element_hash); + op_has_element_iterator, + (void *) element_hash); /* was the iteration aborted because we found the element? */ if (GNUNET_SYSERR == ret) @@ -471,30 +423,52 @@ op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash * This is done to speed up re-tried operations, if some elements * were transmitted, and then the IBF fails to decode. * + * XXX: clarify ownership, doesn't sound right. + * * @param op the union operation * @param ee the element entry */ static void -op_register_element (struct Operation *op, struct ElementEntry *ee) +op_register_element (struct Operation *op, + struct ElementEntry *ee) { - int ret; struct IBF_Key ibf_key; struct KeyEntry *k; - ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt); + ibf_key = get_ibf_key (&ee->element_hash); k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; - ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_register_element_iterator, k); + k, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); +} - /* was the element inserted into a colliding bucket? */ - if (GNUNET_SYSERR == ret) - return; - GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; } @@ -510,12 +484,17 @@ prepare_ibf_iterator (void *cls, uint32_t key, void *value) { - struct InvertibleBloomFilter *ibf = cls; + struct Operation *op = cls; struct KeyEntry *ke = value; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val); - - ibf_insert (ibf, ke->ibf_key); + struct IBF_Key salted_key; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] inserting %lx (hash %s) into ibf\n", + (void *) op, + (unsigned long) ke->ibf_key.key_val, + GNUNET_h2s (&ke->element->element_hash)); + salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); + ibf_insert (op->state->local_ibf, salted_key); return GNUNET_YES; } @@ -524,12 +503,11 @@ prepare_ibf_iterator (void *cls, * Iterator for initializing the * key-to-element mapping of a union operation * - * @param cls the union operation - * @param key unised - * @param value the element entry to insert + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert * into the key-to-element mapping - * @return GNUNET_YES to continue iterating, - * GNUNET_NO to stop + * @return #GNUNET_YES (to continue iterating) */ static int init_key_to_element_iterator (void *cls, @@ -537,18 +515,16 @@ init_key_to_element_iterator (void *cls, void *value) { struct Operation *op = cls; - struct ElementEntry *e = value; + struct ElementEntry *ee = value; /* make sure that the element belongs to the set at the time * of creating the operation */ - if ( (e->generation_added > op->generation_created) || - ( (GNUNET_YES == e->removed) && - (e->generation_removed < op->generation_created))) + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) return GNUNET_YES; - GNUNET_assert (GNUNET_NO == e->remote); + GNUNET_assert (GNUNET_NO == ee->remote); - op_register_element (op, e); + op_register_element (op, ee); return GNUNET_YES; } @@ -559,41 +535,69 @@ init_key_to_element_iterator (void *cls, * * @param op the union operation * @param size size of the ibf to create + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -prepare_ibf (struct Operation *op, uint16_t size) +static int +prepare_ibf (struct Operation *op, + uint32_t size) { if (NULL == op->state->key_to_element) { unsigned int len; - len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements); + + len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); } if (NULL != op->state->local_ibf) ibf_destroy (op->state->local_ibf); op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + if (NULL == op->state->local_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate local IBF\n"); + return GNUNET_SYSERR; + } GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, - prepare_ibf_iterator, op->state->local_ibf); + &prepare_ibf_iterator, + op); + return GNUNET_OK; } /** * Send an ibf of appropriate size. * + * Fragments the IBF into multiple messages if necessary. + * * @param op the union operation * @param ibf_order order of the ibf to send, size=2^order + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -send_ibf (struct Operation *op, uint16_t ibf_order) +static int +send_ibf (struct Operation *op, + uint16_t ibf_order) { unsigned int buckets_sent = 0; struct InvertibleBloomFilter *ibf; - prepare_ibf (op, 1<state->local_ibf; @@ -608,20 +612,29 @@ send_ibf (struct Operation *op, uint16_t ibf_order) if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) buckets_in_message = MAX_BUCKETS_PER_MESSAGE; - ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); - msg->reserved = 0; + ev = GNUNET_MQ_msg_extra (msg, + buckets_in_message * IBF_BUCKET_SIZE, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); + msg->reserved1 = 0; + msg->reserved2 = 0; msg->order = ibf_order; - msg->offset = htons (buckets_sent); + msg->offset = htonl (buckets_sent); + msg->salt = htonl (op->state->salt_send); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", - buckets_in_message, buckets_sent, 1<mq, ev); } - op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; + /* The other peer must decode the IBF, so + * we're passive. */ + op->state->phase = PHASE_INVENTORY_PASSIVE; + return GNUNET_OK; } @@ -633,16 +646,32 @@ send_ibf (struct Operation *op, uint16_t ibf_order) static void send_strata_estimator (struct Operation *op) { + const struct StrataEstimator *se = op->state->se; struct GNUNET_MQ_Envelope *ev; struct GNUNET_MessageHeader *strata_msg; - + char *buf; + size_t len; + uint16_t type; + + buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + len = strata_estimator_write (op->state->se, + buf); + if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; + else + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; ev = GNUNET_MQ_msg_header_extra (strata_msg, - SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE); - strata_estimator_write (op->state->se, &strata_msg[1]); - GNUNET_MQ_send (op->mq, ev); + len, + type); + GNUNET_memcpy (&strata_msg[1], + buf, + len); + GNUNET_free (buf); + GNUNET_MQ_send (op->mq, + ev); op->state->phase = PHASE_EXPECT_IBF; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent SE, expecting IBF\n"); } @@ -659,7 +688,8 @@ get_order_from_difference (unsigned int diff) unsigned int ibf_order; ibf_order = 2; - while ((1< MAX_IBF_ORDER) ibf_order = MAX_IBF_ORDER; @@ -672,35 +702,83 @@ get_order_from_difference (unsigned int diff) * * @param cls the union operation * @param mh the message + * @param is_compressed #GNUNET_YES if the estimator is compressed + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_strata_estimator (void *cls, + const struct GNUNET_MessageHeader *mh, + int is_compressed) { struct Operation *op = cls; struct StrataEstimator *remote_se; int diff; + size_t len; + + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (mh->size), + GNUNET_NO); if (op->state->phase != PHASE_EXPECT_SE) { fail_union_operation (op); GNUNET_break (0); - return; + return GNUNET_SYSERR; } - remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, + len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); + if ( (GNUNET_NO == is_compressed) && + (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) + { + fail_union_operation (op); + GNUNET_break (0); + return GNUNET_SYSERR; + } + remote_se = strata_estimator_create (SE_STRATA_COUNT, + SE_IBF_SIZE, SE_IBF_HASH_NUM); - strata_estimator_read (&mh[1], remote_se); + if (NULL == remote_se) + { + /* insufficient resources, fail */ + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + strata_estimator_read (&mh[1], + len, + is_compressed, + remote_se)) + { + /* decompression failed */ + fail_union_operation (op); + strata_estimator_destroy (remote_se); + return GNUNET_SYSERR; + } GNUNET_assert (NULL != op->state->se); - diff = strata_estimator_difference (remote_se, op->state->se); + diff = strata_estimator_difference (remote_se, + op->state->se); strata_estimator_destroy (remote_se); strata_estimator_destroy (op->state->se); op->state->se = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", - diff, 1<ibf_key; struct Operation *op = sec->op; struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_MessageHeader *mh; - if (ke->ibf_key.key_val != ibf_key.key_val) + /* Detect 32-bit key collision for the 64-bit IBF keys. */ + if (ke->ibf_key.key_val != sec->ibf_key.key_val) return GNUNET_YES; - while (NULL != ke) - { - const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *mh; - GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (NULL == ev) - { - /* element too large */ - GNUNET_break (0); - continue; - } - memcpy (&mh[1], element->data, element->size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", - GNUNET_h2s (&ke->element->element_hash)); - GNUNET_MQ_send (op->mq, ev); - ke = ke->next_colliding; - } - return GNUNET_NO; + ev = GNUNET_MQ_msg_header_extra (mh, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); + + GNUNET_assert (NULL != ev); + *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] sending element offer (%s) to peer\n", + (void *) op, + GNUNET_h2s (&ke->element->element_hash)); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; } + /** - * Send all elements that have the specified IBF key - * to the remote peer of the union operation + * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key. * * @param op union operation * @param ibf_key IBF key of interest */ static void -send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key) +send_offers_for_key (struct Operation *op, + struct IBF_Key ibf_key) { struct SendElementClosure send_cls; send_cls.ibf_key = ibf_key; send_cls.op = op; - GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - &send_element_iterator, &send_cls); + (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + (uint32_t) ibf_key.key_val, + &send_offers_iterator, + &send_cls); } /** * Decode which elements are missing on each side, and - * send the appropriate elemens and requests + * send the appropriate offers and inquiries. * * @param op union operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void +static int decode_and_send (struct Operation *op) { struct IBF_Key key; @@ -777,19 +853,27 @@ decode_and_send (struct Operation *op) unsigned int num_decoded; struct InvertibleBloomFilter *diff_ibf; - GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase); + GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); - prepare_ibf (op, op->state->remote_ibf->size); + if (GNUNET_OK != + prepare_ibf (op, op->state->remote_ibf->size)) + { + GNUNET_break (0); + /* allocation failed */ + return GNUNET_SYSERR; + } diff_ibf = ibf_dup (op->state->local_ibf); ibf_subtract (diff_ibf, op->state->remote_ibf); ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoding IBF (size=%u)\n", + diff_ibf->size); num_decoded = 0; - last_key.key_val = 0; + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ while (1) { @@ -801,17 +885,23 @@ decode_and_send (struct Operation *op) res = ibf_decode (diff_ibf, &side, &key); if (res == GNUNET_OK) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", - key.key_val); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoded ibf key %lx\n", + (unsigned long) key.key_val); num_decoded += 1; - if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) + if ( (num_decoded > diff_ibf->size) || + ( (num_decoded > 1) && + (last_key.key_val == key.key_val) ) ) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", - num_decoded, diff_ibf->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "detected cyclic ibf (decoded %u/%u)\n", + num_decoded, + diff_ibf->size); cycle_detected = GNUNET_YES; } } - if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) + if ( (GNUNET_SYSERR == res) || + (GNUNET_YES == cycle_detected) ) { int next_order; next_order = 0; @@ -820,15 +910,37 @@ decode_and_send (struct Operation *op) next_order++; if (next_order <= MAX_IBF_ORDER) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "decoding failed, sending larger ibf (size %u)\n", - 1<state->salt_send++; + if (GNUNET_OK != + send_ibf (op, next_order)) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; + } } else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "set union failed: reached ibf limit\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of failed union operations (too large)", + 1, + GNUNET_NO); + // XXX: Send the whole set, element-by-element + LOG (GNUNET_ERROR_TYPE_ERROR, + "set union failed: reached ibf limit\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; } break; } @@ -836,27 +948,38 @@ decode_and_send (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "transmitted all values, sending DONE\n"); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); GNUNET_MQ_send (op->mq, ev); + /* We now wait until we get a DONE message back + * and then wait for our MQ to be flushed and all our + * demands be delivered. */ break; } if (1 == side) { - send_elements_for_key (op, key); + struct IBF_Key unsalted_key; + unsalt_key (&key, op->state->salt_receive, &unsalted_key); + send_offers_for_key (op, unsalted_key); } else if (-1 == side) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *msg; + struct InquiryMessage *msg; - /* It may be nice to merge multiple requests, but with mesh's corking it is not worth + /* It may be nice to merge multiple requests, but with CADET's corking it is not worth * the effort additional complexity. */ - ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), - GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); - - *(struct IBF_Key *) &msg[1] = key; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); + ev = GNUNET_MQ_msg_extra (msg, + sizeof (struct IBF_Key), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); + msg->salt = htonl (op->state->salt_receive); + GNUNET_memcpy (&msg[1], + &key, + sizeof (struct IBF_Key)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element inquiry for IBF key %lx\n", + (unsigned long) key.key_val); GNUNET_MQ_send (op->mq, ev); } else @@ -865,46 +988,86 @@ decode_and_send (struct Operation *op) } } ibf_destroy (diff_ibf); + return GNUNET_OK; } /** * Handle an IBF message from a remote peer. * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * * @param cls the union operation * @param mh the header of the message + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_ibf (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct IBFMessage *msg = (struct IBFMessage *) mh; + const struct IBFMessage *msg; unsigned int buckets_in_message; - if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || + if (ntohs (mh->size) < sizeof (struct IBFMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + msg = (const struct IBFMessage *) mh; + if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || (op->state->phase == PHASE_EXPECT_IBF) ) { op->state->phase = PHASE_EXPECT_IBF_CONT; GNUNET_assert (NULL == op->state->remote_ibf); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<order); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new ibf of size %u\n", + 1 << msg->order); op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); + if (NULL == op->state->remote_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse remote IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } op->state->ibf_buckets_received = 0; - if (0 != ntohs (msg->offset)) + if (0 != ntohl (msg->offset)) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; } } else if (op->state->phase == PHASE_EXPECT_IBF_CONT) { - if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) || - (1<order != op->state->remote_ibf->size) ) + if (ntohl (msg->offset) != op->state->ibf_buckets_received) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; + } + if (1<order != op->state->remote_ibf->size) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; } + if (ntohl (msg->salt) != op->state->salt_receive) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + } + else + { + GNUNET_assert (0); } buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; @@ -913,25 +1076,39 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) { GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; } if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); - return; + return GNUNET_SYSERR; } - ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf); + GNUNET_assert (NULL != op->state->remote_ibf); + + ibf_read_slice (&msg[1], + op->state->ibf_buckets_received, + buckets_in_message, + op->state->remote_ibf); op->state->ibf_buckets_received += buckets_in_message; if (op->state->ibf_buckets_received == op->state->remote_ibf->size) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); - op->state->phase = PHASE_EXPECT_ELEMENTS; - decode_and_send (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received full ibf\n"); + op->state->phase = PHASE_INVENTORY_ACTIVE; + if (GNUNET_OK != + decode_and_send (op)) + { + /* Internal error, best we can do is shut down */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to decode IBF, closing connection\n"); + return GNUNET_SYSERR; + } } + return GNUNET_OK; } @@ -941,15 +1118,19 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) * * @param op union operation * @param element element to send + * @param status status to send with the new element */ static void send_client_element (struct Operation *op, - struct GNUNET_SET_Element *element) + struct GNUNET_SET_Element *element, + int status) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element (size %u) to client\n", + element->size); GNUNET_assert (0 != op->spec->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); if (NULL == ev) @@ -958,10 +1139,10 @@ send_client_element (struct Operation *op, GNUNET_break (0); return; } - rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->result_status = htons (status); rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], element->data, element->size); GNUNET_MQ_send (op->spec->set->client_mq, ev); } @@ -978,175 +1159,346 @@ send_done_and_destroy (void *cls) struct Operation *op = cls; struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; + ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); rm->request_id = htonl (op->spec->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); GNUNET_MQ_send (op->spec->set->client_mq, ev); - _GSS_operation_destroy (op); + /* Will also call the union-specific cancel function. */ + _GSS_operation_destroy (op, GNUNET_YES); +} + + +static void +maybe_finish (struct Operation *op) +{ + unsigned int num_demanded; + + num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); + + if (PHASE_FINISH_WAITING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_WAITING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) + { + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_DONE; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, ev); + + /* We now wait until the other peer closes the channel + * after it got all elements from us. */ + } + } + if (PHASE_FINISH_CLOSING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_CLOSING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) + { + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); + } + } } /** - * Send all remaining elements in the full result iterator. + * Handle an element message from a remote peer. * - * @param cls operation + * @param cls the union operation + * @param mh the message */ static void -send_remaining_elements (void *cls) +handle_p2p_elements (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct KeyEntry *ke; - int res; + struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; + uint16_t element_size; - res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke); - res = GNUNET_NO; - if (GNUNET_NO == res) + if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); - send_done_and_destroy (op); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n"); + emsg = (const struct GNUNET_SET_ElementMessage *) mh; - while (1) + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, + &ee->element_hash, + NULL)) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; - struct GNUNET_SET_Element *element; - element = &ke->element->element; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); - GNUNET_assert (0 != op->spec->client_request_id); - ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); - if (NULL == ev) - { - GNUNET_MQ_discard (ev); - GNUNET_break (0); - continue; - } - rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); - if (ke->next_colliding == NULL) + /* We got something we didn't demand, since it's not in our map. */ + GNUNET_break_op (0); + GNUNET_free (ee); + fail_union_operation (op); + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op_register_element (op, ee); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) { - GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - break; + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; } - GNUNET_MQ_send (op->spec->set->client_mq, ev); - ke = ke->next_colliding; } + + maybe_finish (op); } /** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). * - * @param op union operation + * @param cls the union operation + * @param mh the message */ static void -finish_and_destroy (struct Operation *op) +handle_p2p_inquiry (void *cls, + const struct GNUNET_MessageHeader *mh) { - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + struct Operation *op = cls; + const struct IBF_Key *ibf_key; + unsigned int num_keys; + struct InquiryMessage *msg; - if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + /* look up elements and send them */ + if (op->state->phase != PHASE_INVENTORY_PASSIVE) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); - GNUNET_assert (NULL == op->state->full_result_iter); - op->state->full_result_iter = - GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element); - send_remaining_elements (op); + GNUNET_break_op (0); + fail_union_operation (op); return; } - send_done_and_destroy (op); + num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) + / sizeof (struct IBF_Key); + if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) + != num_keys * sizeof (struct IBF_Key)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + msg = (struct InquiryMessage *) mh; + + ibf_key = (const struct IBF_Key *) &msg[1]; + while (0 != num_keys--) + { + struct IBF_Key unsalted_key; + unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); + send_offers_for_key (op, unsalted_key); + ibf_key++; + } } /** - * Handle an element message from a remote peer. - * - * @param cls the union operation - * @param mh the message + * FIXME */ static void -handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; struct ElementEntry *ee; - uint16_t element_size; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); + struct GNUNET_SET_ElementMessage *emsg; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + struct GNUNET_MQ_Envelope *ev; - if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) && - (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) { + GNUNET_break_op (0); fail_union_operation (op); - GNUNET_break (0); return; } - element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); - ee = GNUNET_malloc (sizeof *ee + element_size); - memcpy (&ee[1], &mh[1], element_size); - ee->element.size = element_size; - ee->element.data = &ee[1]; - ee->remote = GNUNET_YES; - GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); - if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n"); - GNUNET_free (ee); - return; - } + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); + if (NULL == ee) + { + /* Demand for non-existing element. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + /* Probably confused lazily copied sets. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", + (void *) op, + (unsigned int) ee->element.size, + GNUNET_h2s (&ee->element_hash)); + GNUNET_MQ_send (op->mq, ev); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); - op_register_element (op, ee); - /* only send results immediately if the client wants it */ - if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode) - send_client_element (op, &ee->element); + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + /* Nothing to do. */ + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } } /** - * Handle an element request from a remote peer. + * Handle offers (of GNUNET_HashCode-s) and + * respond with demands (of GNUNET_HashCode-s). * * @param cls the union operation * @param mh the message */ static void -handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct IBF_Key *ibf_key; - unsigned int num_keys; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; /* look up elements and send them */ - if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_INVENTORY_ACTIVE)) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); return; } - - num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key); - - if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) { - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); return; } - ibf_key = (struct IBF_Key *) &mh[1]; - while (0 != num_keys--) + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) { - send_elements_for_key (op, *ibf_key); - ibf_key++; + struct ElementEntry *ee; + struct GNUNET_MessageHeader *demands; + struct GNUNET_MQ_Envelope *ev; + + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + hash); + if (NULL != ee) + if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) + continue; + + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, + hash)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Skipped sending duplicate demand\n"); + continue; + } + + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, + hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Requesting element (hash %s)\n", + (void *) op, GNUNET_h2s (hash)); + ev = GNUNET_MQ_msg_header_extra (demands, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); + *(struct GNUNET_HashCode *) &demands[1] = *hash; + GNUNET_MQ_send (op->mq, ev); } } @@ -1158,48 +1510,103 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) * @param mh the message */ static void -handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - struct GNUNET_MQ_Envelope *ev; - if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if (op->state->phase == PHASE_INVENTORY_PASSIVE) { - /* we got all requests, but still have to send our elements as response */ - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); - op->state->phase = PHASE_FINISHED; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (op->mq, ev); + /* We got all requests, but still have to send our elements in response. */ + + op->state->phase = PHASE_FINISH_WAITING; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as passive partner), waiting for our demands to be satisfied\n"); + /* The active peer is done sending offers + * and inquiries. This means that all + * our responses to that (demands and offers) + * must be in flight (queued or in mesh). + * + * We should notify the active peer once + * all our demands are satisfied, so that the active + * peer can quit if we gave him everything. + */ + maybe_finish (op); return; } - if (op->state->phase == PHASE_EXPECT_ELEMENTS) + if (op->state->phase == PHASE_INVENTORY_ACTIVE) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); - op->state->phase = PHASE_FINISHED; - finish_and_destroy (op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as active partner), waiting to finish\n"); + /* All demands of the other peer are satisfied, + * and we processed all offers, thus we know + * exactly what our demands must be. + * + * We'll close the channel + * to the other peer once our demands are met. + */ + op->state->phase = PHASE_FINISH_CLOSING; + maybe_finish (op); return; } - GNUNET_break (0); + GNUNET_break_op (0); fail_union_operation (op); } /** - * Evaluate a union operation with - * a remote peer. + * Initiate operation to evaluate a set union with a remote peer. * - * @param op operation to evaluate + * @param op operation to perform (to be initialized) + * @param opaque_context message to be transmitted to the listener + * to convince him to accept, may be NULL */ static void -union_evaluate (struct Operation *op) +union_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) { + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + GNUNET_assert (NULL == op->state); op->state = GNUNET_new (struct OperationState); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + /* copy the current generation's strata estimator for this operation */ op->state->se = strata_estimator_dup (op->spec->set->state->se); /* we started the operation, thus we have to send the operation request */ op->state->phase = PHASE_EXPECT_SE; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation"); - send_operation_request (op); + op->state->salt_receive = op->state->salt_send = 42; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initiating union operation evaluation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of initiated union operations", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (op->spec->set->client); + return; + } + msg->operation = htonl (GNUNET_SET_OPERATION_UNION); + GNUNET_MQ_send (op->mq, + ev); + + if (NULL != opaque_context) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request with context message\n"); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request without context message\n"); } @@ -1212,9 +1619,23 @@ union_evaluate (struct Operation *op) static void union_accept (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "accepting set union operation\n"); + GNUNET_assert (NULL == op->state); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# of accepted union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + op->state = GNUNET_new (struct OperationState); op->state->se = strata_estimator_dup (op->spec->set->state->se); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + op->state->salt_receive = op->state->salt_send = 42; /* kick off the operation */ send_strata_estimator (op); } @@ -1223,18 +1644,28 @@ union_accept (struct Operation *op) /** * Create a new set supporting the union operation * - * @return the newly created set + * We maintain one strata estimator per set and then manipulate it over the + * lifetime of the set, as recreating a strata estimator would be expensive. + * + * @return the newly created set, NULL on error */ static struct SetState * union_set_create (void) { struct SetState *set_state; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n"); - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "union set created\n"); set_state = GNUNET_new (struct SetState); set_state->se = strata_estimator_create (SE_STRATA_COUNT, - SE_IBF_SIZE, SE_IBF_HASH_NUM); + SE_IBF_SIZE, SE_IBF_HASH_NUM); + if (NULL == set_state->se) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate strata estimator\n"); + GNUNET_free (set_state); + return NULL; + } return set_state; } @@ -1248,7 +1679,8 @@ union_set_create (void) static void union_add (struct SetState *set_state, struct ElementEntry *ee) { - strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0)); + strata_estimator_insert (set_state->se, + get_ibf_key (&ee->element_hash)); } @@ -1262,7 +1694,8 @@ union_add (struct SetState *set_state, struct ElementEntry *ee) static void union_remove (struct SetState *set_state, struct ElementEntry *ee) { - strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0)); + strata_estimator_remove (set_state->se, + get_ibf_key (&ee->element_hash)); } @@ -1288,63 +1721,109 @@ union_set_destroy (struct SetState *set_state) * * @param op the state of the union evaluate operation * @param mh the received message - * @return GNUNET_SYSERR if the tunnel should be disconnected, - * GNUNET_OK otherwise + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ int union_handle_p2p_message (struct Operation *op, const struct GNUNET_MessageHeader *mh) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", - ntohs (mh->type), ntohs (mh->size)); + //LOG (GNUNET_ERROR_TYPE_DEBUG, + // "received p2p message (t: %u, s: %u)\n", + // ntohs (mh->type), + // ntohs (mh->size)); switch (ntohs (mh->type)) { case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: - handle_p2p_ibf (op, mh); - break; + return handle_p2p_ibf (op, mh); case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: - handle_p2p_strata_estimator (op, mh); - break; + return handle_p2p_strata_estimator (op, mh, GNUNET_NO); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC: + return handle_p2p_strata_estimator (op, mh, GNUNET_YES); case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: handle_p2p_elements (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: - handle_p2p_element_requests (op, mh); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: + handle_p2p_inquiry (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: handle_p2p_done (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: + handle_p2p_offer (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: + handle_p2p_demand (op, mh); + break; default: - /* something wrong with mesh's message handlers? */ + /* Something wrong with cadet's message handlers? */ GNUNET_assert (0); } return GNUNET_OK; } +/** + * Handler for peer-disconnects, notifies the client + * about the aborted operation in case the op was not concluded. + * + * @param op the destroyed operation + */ static void union_peer_disconnect (struct Operation *op) { - if (PHASE_FINISHED != op->state->phase) + if (PHASE_DONE != op->state->phase) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); msg->request_id = htonl (op->spec->client_request_id); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); - _GSS_operation_destroy (op); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + LOG (GNUNET_ERROR_TYPE_WARNING, + "other peer disconnected prematurely, phase %u\n", + op->state->phase); + _GSS_operation_destroy (op, + GNUNET_YES); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); + // else: the session has already been concluded + LOG (GNUNET_ERROR_TYPE_DEBUG, + "other peer disconnected (finished)\n"); if (GNUNET_NO == op->state->client_done_sent) - finish_and_destroy (op); + send_done_and_destroy (op); } +/** + * Copy union-specific set state. + * + * @param set source set for copying the union state + * @return a copy of the union-specific set state + */ +static struct SetState * +union_copy_state (struct Set *set) +{ + struct SetState *new_state; + + new_state = GNUNET_new (struct SetState); + GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) ); + new_state->se = strata_estimator_dup (set->state->se); + + return new_state; +} + + +/** + * Get the table with implementing functions for + * set union. + * + * @return the operation specific VTable + */ const struct SetVT * _GSS_union_vt () { @@ -1358,6 +1837,7 @@ _GSS_union_vt () .accept = &union_accept, .peer_disconnect = &union_peer_disconnect, .cancel = &union_op_cancel, + .copy_state = &union_copy_state, }; return &union_vt;