X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_intersection.c;h=9fe1eabe64e98753f2db24796c01d7fe8431b1d1;hb=9528dcc4b739041f51ed0c8791fe34902525fac2;hp=7e8bd9cbfa34a1d41f589a7aa2caa41f26229ff5;hpb=15389f2525da19c32e040ac1d32d3473b43456df;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 7e8bd9cbf..9fe1eabe6 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013, 2014 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,1197 +14,1182 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ - /** * @file set/gnunet-service-set_intersection.c * @brief two-peer set intersection * @author Christian Fuchs + * @author Christian Grothoff */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet-service-set.h" -#include "ibf.h" -#include "strata_estimator.h" -#include "set_protocol.h" +#include "gnunet_block_lib.h" +#include "gnunet-service-set_protocol.h" #include /** - * Number of IBFs in a strata estimator. - */ -#define SE_STRATA_COUNT 32 -/** - * Size of the IBFs in the strata estimator. - */ -#define SE_IBF_SIZE 80 -/** - * hash num parameter for the difference digests and strata estimators - */ -#define SE_IBF_HASH_NUM 4 - -/** - * Number of buckets that can be transmitted in one message. - */ -#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) - -/** - * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). - * Choose this value so that computing the IBF is still cheaper - * than transmitting all values. - */ -#define MAX_IBF_ORDER (16) - -/** - * Number of buckets used in the ibf per estimated - * difference. - */ -#define IBF_ALPHA 4 - - -/** - * Current phase we are in for a union operation. + * Current phase we are in for a intersection operation. */ enum IntersectionOperationPhase { /** - * We sent the request message, and expect a strata estimator - */ - PHASE_EXPECT_SE, - /** - * We sent the strata estimator, and expect an IBF + * We are just starting. */ - PHASE_EXPECT_IBF, - /** - * We know what type of IBF the other peer wants to send us, - * and expect the remaining parts - */ - PHASE_EXPECT_IBF_CONT, + PHASE_INITIAL, + /** - * We are sending request and elements, - * and thus only expect elements from the other peer. + * We have send the number of our elements to the other + * peer, but did not setup our element set yet. */ - PHASE_EXPECT_ELEMENTS, + PHASE_COUNT_SENT, + /** - * We are expecting elements and requests, and send - * requested elements back to the other peer. + * We have initialized our set and are now reducing it by exchanging + * Bloom filters until one party notices the their element hashes + * are equal. */ - PHASE_EXPECT_ELEMENTS_AND_REQUESTS, + PHASE_BF_EXCHANGE, + /** - * The protocol is over. - * Results may still have to be sent to the client. + * The protocol is over. Results may still have to be sent to the + * client. */ PHASE_FINISHED + }; /** - * State of an evaluate operation - * with another peer. + * State of an evaluate operation with another peer. */ struct OperationState { /** - * Tunnel to the remote peer. + * The bf we currently receive */ - struct GNUNET_MESH_Tunnel *tunnel; + struct GNUNET_CONTAINER_BloomFilter *remote_bf; /** - * Detail information about the set operation, - * including the set to use. + * BF of the set's element. */ - struct OperationSpecification *spec; + struct GNUNET_CONTAINER_BloomFilter *local_bf; /** - * Message queue for the peer. + * Remaining elements in the intersection operation. + * Maps element-id-hashes to 'elements in our set'. */ - struct GNUNET_MQ_Handle *mq; + struct GNUNET_CONTAINER_MultiHashMap *my_elements; /** - * Number of ibf buckets received + * Iterator for sending the final set of @e my_elements to the client. */ - unsigned int ibf_buckets_received; + struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; /** - * Copy of the set's strata estimator at the time of - * creation of this operation + * Evaluate operations are held in a linked list. */ - struct StrataEstimator *se; + struct OperationState *next; /** - * The ibf we currently receive + * Evaluate operations are held in a linked list. */ - struct InvertibleBloomFilter *remote_ibf; + struct OperationState *prev; /** - * IBF of the set's element. + * For multipart BF transmissions, we have to store the + * bloomfilter-data until we fully received it. */ - struct InvertibleBloomFilter *local_ibf; + char *bf_data; /** - * Maps IBF-Keys (specific to the current salt) to elements. - * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. - * Colliding IBF-Keys are linked. + * XOR of the keys of all of the elements (remaining) in my set. + * Always updated when elements are added or removed to + * @e my_elements. */ - struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; + struct GNUNET_HashCode my_xor; /** - * Current state of the operation. + * XOR of the keys of all of the elements (remaining) in + * the other peer's set. Updated when we receive the + * other peer's Bloom filter. */ - enum IntersectionOperationPhase phase; + struct GNUNET_HashCode other_xor; /** - * Generation in which the operation handle - * was created. + * How many bytes of @e bf_data are valid? */ - unsigned int generation_created; + uint32_t bf_data_offset; /** - * Set state of the set that this operation - * belongs to. + * Current element count contained within @e my_elements. + * (May differ briefly during initialization.) */ - struct Set *set; + uint32_t my_element_count; /** - * Evaluate operations are held in - * a linked list. + * size of the bloomfilter in @e bf_data. */ - struct OperationState *next; - - /** - * Evaluate operations are held in - * a linked list. - */ - struct OperationState *prev; + uint32_t bf_data_size; /** - * Did we send the client that we are done? + * size of the bloomfilter */ - int client_done_sent; -}; + uint32_t bf_bits_per_element; + /** + * Salt currently used for BF construction (by us or the other peer, + * depending on where we are in the code). + */ + uint32_t salt; -/** - * The key entry is used to associate an ibf key with - * an element. - */ -struct KeyEntry -{ /** - * IBF key for the entry, derived from the current salt. + * Current state of the operation. */ - struct IBF_Key ibf_key; + enum IntersectionOperationPhase phase; /** - * The actual element associated with the key + * Generation in which the operation handle + * was created. */ - struct ElementEntry *element; + unsigned int generation_created; /** - * Element that collides with this element - * on the ibf key + * Did we send the client that we are done? */ - struct KeyEntry *next_colliding; + int client_done_sent; }; /** - * Used as a closure for sending elements - * with a specific IBF key. + * Extra state required for efficient set intersection. + * Merely tracks the total number of elements. */ -struct SendElementClosure +struct SetState { /** - * The IBF key whose matching elements should be - * sent. + * Number of currently valid elements in the set which have not been + * removed. */ - struct IBF_Key ibf_key; - - /** - * Operation for which the elements - * should be sent. - */ - struct OperationState *eo; + uint32_t current_set_element_count; }; /** - * Extra state required for efficient set union. + * If applicable in the current operation mode, send a result message + * to the client indicating we removed an element. + * + * @param op intersection operation + * @param element element to send */ -struct SetState +static void +send_client_removed_element (struct Operation *op, + struct GNUNET_SET_Element *element) { - /** - * The strata estimator is only generated once for - * each set. - * The IBF keys are derived from the element hashes with - * salt=0. - */ - struct StrataEstimator *se; - - /** - * Evaluate operations are held in - * a linked list. - */ - struct OperationState *ops_head; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; - /** - * Evaluate operations are held in - * a linked list. - */ - struct OperationState *ops_tail; -}; + if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) + return; /* Wrong mode for transmitting removed elements */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending removed element (size %u) to client\n", + element->size); + GNUNET_assert (0 != op->spec->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) + { + GNUNET_break (0); + return; + } + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->spec->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); +} /** - * Iterator over hash map entries. + * Fills the "my_elements" hashmap with all relevant elements. * - * @param cls closure + * @param cls the `struct Operation *` we are performing * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @param value the `struct ElementEntry *` from the hash map + * @return #GNUNET_YES (we should continue to iterate) */ static int -destroy_key_to_element_iter (void *cls, - uint32_t key, +filtered_map_initialization (void *cls, + const struct GNUNET_HashCode *key, void *value) { - struct KeyEntry *k = value; + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; - while (NULL != k) + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "FIMA called for %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) { - struct KeyEntry *k_tmp = k; - k = k->next_colliding; - if (GNUNET_YES == k_tmp->element->remote) - { - GNUNET_free (k_tmp->element); - k_tmp->element = NULL; - } - GNUNET_free (k_tmp); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u (wrong generation)\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; /* element not valid in our operation's generation */ } + + /* Test if element is in other peer's bloomfilter */ + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) + { + /* remove this element */ + send_client_removed_element (op, + &ee->element); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; + } + op->state->my_element_count++; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Filtered initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return GNUNET_YES; } /** - * Destroy a union operation, and free all resources - * associated with it. + * Removes elements from our hashmap if they are not contained within the + * provided remote bloomfilter. * - * @param eo the union operation to destroy + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) */ -static void -intersection_operation_destroy (struct OperationState *eo) +static int +iterator_bf_reduce (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); - GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head, - eo->set->state->ops_tail, - eo); - if (NULL != eo->mq) - { - GNUNET_MQ_destroy (eo->mq); - eo->mq = NULL; - } - if (NULL != eo->tunnel) - { - struct GNUNET_MESH_Tunnel *t = eo->tunnel; - eo->tunnel = NULL; - GNUNET_MESH_tunnel_destroy (t); - } - if (NULL != eo->remote_ibf) - { - ibf_destroy (eo->remote_ibf); - eo->remote_ibf = NULL; - } - if (NULL != eo->local_ibf) - { - ibf_destroy (eo->local_ibf); - eo->local_ibf = NULL; - } - if (NULL != eo->se) - { - strata_estimator_destroy (eo->se); - eo->se = NULL; - } - if (NULL != eo->key_to_element) + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; + + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) { - GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); - GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); - eo->key_to_element = NULL; + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); } - if (NULL != eo->spec) + else { - if (NULL != eo->spec->context_msg) - { - GNUNET_free (eo->spec->context_msg); - eo->spec->context_msg = NULL; - } - GNUNET_free (eo->spec); - eo->spec = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, keeping %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); } - GNUNET_free (eo); + return GNUNET_YES; +} + - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); +/** + * Create initial bloomfilter based on all the elements given. + * + * @param cls the `struct Operation *` + * @param key current key code + * @param value the `struct ElementEntry` to process + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +iterator_bf_create (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + struct GNUNET_HashCode mutated_hash; - /* FIXME: do a garbage collection of the set generations */ + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initializing BF with hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, + &mutated_hash); + return GNUNET_YES; } /** - * Inform the client that the union operation has failed, + * Inform the client that the intersection operation has failed, * and proceed to destroy the evaluate operation. * - * @param eo the union operation to fail + * @param op the intersection operation to fail */ static void -fail_intersection_operation (struct OperationState *eo) +fail_intersection_operation (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Intersection operation failed\n"); + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->request_id = htonl (eo->spec->client_request_id); + msg->request_id = htonl (op->spec->client_request_id); msg->element_type = htons (0); - GNUNET_MQ_send (eo->spec->set->client_mq, ev); - union_operation_destroy (eo); -} - - -/** - * Derive the IBF key from a hash code and - * a salt. - * - * @param src the hash code - * @param salt salt to use - * @return the derived IBF key - */ -static struct IBF_Key -get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) -{ - struct IBF_Key key; - - GNUNET_CRYPTO_hkdf (&key, sizeof (key), - GCRY_MD_SHA512, GCRY_MD_SHA256, - src, sizeof *src, - &salt, sizeof (salt), - NULL, 0); - return key; + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); } /** - * Send a request for the evaluate operation to a remote peer + * Send a bloomfilter to our peer. After the result done message has + * been sent to the client, destroy the evaluate operation. * - * @param eo operation with the other peer + * @param op intersection operation */ static void -send_operation_request (struct OperationState *eo) +send_bloomfilter (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - struct OperationRequestMessage *msg; - - ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - eo->spec->context_msg); - - if (NULL == ev) + struct BFMessage *msg; + uint32_t bf_size; + uint32_t bf_elementbits; + uint32_t chunk_size; + char *bf_data; + uint32_t offset; + + /* We consider the ratio of the set sizes to determine + the number of bits per element, as the smaller set + should use more bits to maximize its set reduction + potential and minimize overall bandwidth consumption. */ + bf_elementbits = 2 + ceil (log2((double) + (op->spec->remote_element_count / + (double) op->state->my_element_count))); + if (bf_elementbits < 1) + bf_elementbits = 1; /* make sure k is not 0 */ + /* optimize BF-size to ~50% of bits set */ + bf_size = ceil ((double) (op->state->my_element_count + * bf_elementbits / log(2))); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending Bloom filter (%u) of size %u bytes\n", + (unsigned int) bf_elementbits, + (unsigned int) bf_size); + op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + bf_elementbits); + op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &iterator_bf_create, + op); + + /* send our Bloom filter */ + chunk_size = 60 * 1024 - sizeof (struct BFMessage); + if (bf_size <= chunk_size) { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (eo->spec->set->client); - return; + /* singlepart */ + chunk_size = bf_size; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf, + (char*) &msg[1], + bf_size)); + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); } - msg->operation = htonl (GNUNET_SET_OPERATION_UNION); - msg->app_id = eo->spec->app_id; - msg->salt = htonl (eo->spec->salt); - GNUNET_MQ_send (eo->mq, ev); - - if (NULL != eo->spec->context_msg) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); - - if (NULL != eo->spec->context_msg) { - GNUNET_free (eo->spec->context_msg); - eo->spec->context_msg = NULL; + /* multipart */ + bf_data = GNUNET_malloc (bf_size); + GNUNET_assert (GNUNET_SYSERR != + GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf, + bf_data, + bf_size)); + offset = 0; + while (offset < bf_size) + { + if (bf_size - chunk_size < offset) + chunk_size = bf_size - offset; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_memcpy (&msg[1], + &bf_data[offset], + chunk_size); + offset += chunk_size; + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + GNUNET_free (bf_data); } - + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; } /** - * Iterator to create the mapping between ibf keys - * and element entries. + * Signal to the client that the operation has finished and + * destroy the operation. * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @param cls operation to destroy */ -static int -op_register_element_iterator (void *cls, - uint32_t key, - void *value) +static void +send_client_done_and_destroy (void *cls) { - struct KeyEntry *const new_k = cls; - struct KeyEntry *old_k = value; + struct Operation *op = cls; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; - GNUNET_assert (NULL != old_k); - do - { - if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) - { - new_k->next_colliding = old_k->next_colliding; - old_k->next_colliding = new_k; - return GNUNET_NO; - } - old_k = old_k->next_colliding; - } while (NULL != old_k); - return GNUNET_YES; + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (op->spec->client_request_id); + rm->result_status = htons (GNUNET_SET_STATUS_DONE); + rm->element_type = htons (0); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); } /** - * Insert an element into the union operation's - * key-to-element mapping. Takes ownership of 'ee'. - * Note that this does not insert the element in the set, - * only in the operation's key-element mapping. - * This is done to speed up re-tried operations, if some elements - * were transmitted, and then the IBF fails to decode. + * Send all elements in the full result iterator. * - * @param eo the union operation - * @param ee the element entry + * @param cls the `struct Operation *` */ static void -op_register_element (struct OperationState *eo, struct ElementEntry *ee) +send_remaining_elements (void *cls) { - int ret; - struct IBF_Key ibf_key; - struct KeyEntry *k; - - ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); - k = GNUNET_new (struct KeyEntry); - k->element = ee; - k->ibf_key = ibf_key; - ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, - (uint32_t) ibf_key.key_val, - op_register_element_iterator, k); - - /* was the element inserted into a colliding bucket? */ - if (GNUNET_SYSERR == ret) - return; + struct Operation *op = cls; + const void *nxt; + const struct ElementEntry *ee; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + const struct GNUNET_SET_Element *element; + int res; - GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, + NULL, + &nxt); + if (GNUNET_NO == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending done and destroy because iterator ran out\n"); + op->keep--; + send_client_done_and_destroy (op); + return; + } + ee = nxt; + element = &ee->element; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending element %s:%u to client (full set)\n", + GNUNET_h2s (&ee->element_hash), + element->size); + GNUNET_assert (0 != op->spec->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_assert (NULL != ev); + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->spec->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (ev, + &send_remaining_elements, + op); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); } /** - * Insert a key into an ibf. + * Inform the peer that this operation is complete. * - * @param cls the ibf - * @param key unused - * @param value the key entry to get the key from + * @param op the intersection operation to fail */ -static int -prepare_ibf_iterator (void *cls, - uint32_t key, - void *value) +static void +send_peer_done (struct Operation *op) { - struct InvertibleBloomFilter *ibf = cls; - struct KeyEntry *ke = value; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val); + struct GNUNET_MQ_Envelope *ev; + struct IntersectionDoneMessage *idm; - ibf_insert (ibf, ke->ibf_key); - return GNUNET_YES; + op->state->phase = PHASE_FINISHED; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE\n"); + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + + ev = GNUNET_MQ_msg (idm, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + idm->final_element_count = htonl (op->state->my_element_count); + idm->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, + ev); } /** - * Iterator for initializing the - * key-to-element mapping of a union operation + * Process a Bloomfilter once we got all the chunks. * - * @param cls the union operation - * @param key unised - * @param value the element entry to insert - * into the key-to-element mapping - * @return GNUNET_YES to continue iterating, - * GNUNET_NO to stop + * @param op the intersection operation */ -static int -init_key_to_element_iterator (void *cls, - const struct GNUNET_HashCode *key, - void *value) +static void +process_bf (struct Operation *op) { - struct OperationState *eo = cls; - struct ElementEntry *e = value; - - /* make sure that the element belongs to the set at the time - * of creating the operation */ - if ( (e->generation_added > eo->generation_created) || - ( (GNUNET_YES == e->removed) && - (e->generation_removed < eo->generation_created))) - return GNUNET_YES; - - GNUNET_assert (GNUNET_NO == e->remote); - - op_register_element (eo, e); - return GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", + op->state->phase, + op->spec->remote_element_count, + op->state->my_element_count, + GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); + switch (op->state->phase) + { + case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + case PHASE_COUNT_SENT: + /* This is the first BF being sent, build our initial map with + filtering in place */ + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, + GNUNET_YES); + op->state->my_element_count = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &filtered_map_initialization, + op); + break; + case PHASE_BF_EXCHANGE: + /* Update our set by reduction */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &iterator_bf_reduce, + op); + break; + case PHASE_FINISHED: + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + } + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; + + if ( (0 == op->state->my_element_count) || /* fully disjoint */ + ( (op->state->my_element_count == op->spec->remote_element_count) && + (0 == memcmp (&op->state->my_xor, + &op->state->other_xor, + sizeof (struct GNUNET_HashCode))) ) ) + { + /* we are done */ + send_peer_done (op); + return; + } + op->state->phase = PHASE_BF_EXCHANGE; + send_bloomfilter (op); } /** - * Create an ibf with the operation's elements - * of the specified size + * Handle an BF message from a remote peer. * - * @param eo the union operation - * @param size size of the ibf to create + * @param cls the intersection operation + * @param mh the header of the message */ static void -prepare_ibf (struct OperationState *eo, uint16_t size) +handle_p2p_bf (void *cls, + const struct GNUNET_MessageHeader *mh) { - if (NULL == eo->key_to_element) + struct Operation *op = cls; + const struct BFMessage *msg; + uint32_t bf_size; + uint32_t chunk_size; + uint32_t bf_bits_per_element; + uint16_t msize; + + msize = htons (mh->size); + if (msize < sizeof (struct BFMessage)) { - unsigned int len; - len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); - eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, - init_key_to_element_iterator, eo); + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + msg = (const struct BFMessage *) mh; + switch (op->state->phase) + { + case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + break; + case PHASE_COUNT_SENT: + case PHASE_BF_EXCHANGE: + bf_size = ntohl (msg->bloomfilter_total_length); + bf_bits_per_element = ntohl (msg->bits_per_element); + chunk_size = msize - sizeof (struct BFMessage); + op->state->other_xor = msg->element_xor_hash; + if (bf_size == chunk_size) + { + if (NULL != op->state->bf_data) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + /* single part, done here immediately */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], + bf_size, + bf_bits_per_element); + op->state->salt = ntohl (msg->sender_mutator); + op->spec->remote_element_count = ntohl (msg->sender_element_count); + process_bf (op); + return; + } + /* multipart chunk */ + if (NULL == op->state->bf_data) + { + /* first chunk, initialize */ + op->state->bf_data = GNUNET_malloc (bf_size); + op->state->bf_data_size = bf_size; + op->state->bf_bits_per_element = bf_bits_per_element; + op->state->bf_data_offset = 0; + op->state->salt = ntohl (msg->sender_mutator); + op->spec->remote_element_count = ntohl (msg->sender_element_count); + } + else + { + /* increment */ + if ( (op->state->bf_data_size != bf_size) || + (op->state->bf_bits_per_element != bf_bits_per_element) || + (op->state->bf_data_offset + chunk_size > bf_size) || + (op->state->salt != ntohl (msg->sender_mutator)) || + (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + } + GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset], + (const char*) &msg[1], + chunk_size); + op->state->bf_data_offset += chunk_size; + if (op->state->bf_data_offset == bf_size) + { + /* last chunk, run! */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, + bf_size, + bf_bits_per_element); + GNUNET_free (op->state->bf_data); + op->state->bf_data = NULL; + op->state->bf_data_size = 0; + process_bf (op); + } + break; + default: + GNUNET_break_op (0); + fail_intersection_operation (op); + break; } - if (NULL != eo->local_ibf) - ibf_destroy (eo->local_ibf); - eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); - GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, - prepare_ibf_iterator, eo->local_ibf); } /** - * Send an ibf of appropriate size. + * Fills the "my_elements" hashmap with the initial set of + * (non-deleted) elements from the set of the specification. * - * @param eo the union operation - * @param ibf_order order of the ibf to send, size=2^order + * @param cls closure with the `struct Operation *` + * @param key current key code for the element + * @param value value in the hash map with the `struct ElementEntry *` + * @return #GNUNET_YES (we should continue to iterate) */ -static void -send_ibf (struct OperationState *eo, uint16_t ibf_order) +static int +initialize_map_unfiltered (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - unsigned int buckets_sent = 0; - struct InvertibleBloomFilter *ibf; - - prepare_ibf (eo, 1<local_ibf; - - while (buckets_sent < (1 << ibf_order)) - { - unsigned int buckets_in_message; - struct GNUNET_MQ_Envelope *ev; - struct IBFMessage *msg; - - buckets_in_message = (1 << ibf_order) - buckets_sent; - /* limit to maximum */ - if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) - buckets_in_message = MAX_BUCKETS_PER_MESSAGE; - - ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, - GNUNET_MESSAGE_TYPE_SET_P2P_IBF); - msg->reserved = 0; - msg->order = ibf_order; - msg->offset = htons (buckets_sent); - ibf_write_slice (ibf, buckets_sent, - buckets_in_message, &msg[1]); - buckets_sent += buckets_in_message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", - buckets_in_message, buckets_sent, 1<mq, ev); - } - - eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; + struct ElementEntry *ee = value; + struct Operation *op = cls; + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + return GNUNET_YES; /* element not live in operation's generation */ + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initial full initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return GNUNET_YES; } /** - * Send a strata estimator to the remote peer. + * Send our element count to the peer, in case our element count is + * lower than his. * - * @param eo the union operation with the remote peer + * @param op intersection operation */ static void -send_strata_estimator (struct OperationState *eo) +send_element_count (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *strata_msg; - - ev = GNUNET_MQ_msg_header_extra (strata_msg, - SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, - GNUNET_MESSAGE_TYPE_SET_P2P_SE); - strata_estimator_write (eo->set->state->se, &strata_msg[1]); - GNUNET_MQ_send (eo->mq, ev); - eo->phase = PHASE_EXPECT_IBF; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); + struct IntersectionElementInfoMessage *msg; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending our element count (%u)\n", + op->state->my_element_count); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + msg->sender_element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, ev); } /** - * Compute the necessary order of an ibf - * from the size of the symmetric set difference. + * We go first, initialize our map with all elements and + * send the first Bloom filter. * - * @param diff the difference - * @return the required size of the ibf + * @param op operation to start exchange for */ -static unsigned int -get_order_from_difference (unsigned int diff) +static void +begin_bf_exchange (struct Operation *op) { - unsigned int ibf_order; - - ibf_order = 2; - while ((1< MAX_IBF_ORDER) - ibf_order = MAX_IBF_ORDER; - return ibf_order; + op->state->phase = PHASE_BF_EXCHANGE; + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, + GNUNET_YES); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &initialize_map_unfiltered, + op); + send_bloomfilter (op); } /** - * Handle a strata estimator from a remote peer + * Handle the initial `struct IntersectionElementInfoMessage` from a + * remote peer. * - * @param cls the union operation - * @param mh the message + * @param cls the intersection operation + * @param mh the header of the message */ static void -handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_element_info (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct OperationState *eo = cls; - struct StrataEstimator *remote_se; - int diff; + struct Operation *op = cls; + const struct IntersectionElementInfoMessage *msg; - if (eo->phase != PHASE_EXPECT_SE) + if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage)) { - fail_union_operation (eo); - GNUNET_break (0); + GNUNET_break_op (0); + fail_intersection_operation(op); return; } - remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, - SE_IBF_HASH_NUM); - strata_estimator_read (&mh[1], remote_se); - GNUNET_assert (NULL != eo->se); - diff = strata_estimator_difference (remote_se, eo->se); - strata_estimator_destroy (remote_se); - strata_estimator_destroy (eo->se); - eo->se = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", - diff, 1<ibf_key; - struct OperationState *eo = sec->eo; - struct KeyEntry *ke = value; - - if (ke->ibf_key.key_val != ibf_key.key_val) - return GNUNET_YES; - while (NULL != ke) + msg = (const struct IntersectionElementInfoMessage *) mh; + op->spec->remote_element_count = ntohl (msg->sender_element_count); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received remote element count (%u), I have %u\n", + op->spec->remote_element_count, + op->state->my_element_count); + if ( ( (PHASE_INITIAL != op->state->phase) && + (PHASE_COUNT_SENT != op->state->phase) ) || + (op->state->my_element_count > op->spec->remote_element_count) || + (0 == op->state->my_element_count) || + (0 == op->spec->remote_element_count) ) { - const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *mh; - - GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (NULL == ev) - { - /* element too large */ - GNUNET_break (0); - continue; - } - memcpy (&mh[1], element->data, element->size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", - GNUNET_h2s (&ke->element->element_hash)); - GNUNET_MQ_send (eo->mq, ev); - ke = ke->next_colliding; + GNUNET_break_op (0); + fail_intersection_operation(op); + return; } - return GNUNET_NO; + GNUNET_break (NULL == op->state->remote_bf); + begin_bf_exchange (op); } + /** - * Send all elements that have the specified IBF key - * to the remote peer of the union operation + * Send a result message to the client indicating that the operation + * is over. After the result done message has been sent to the + * client, destroy the evaluate operation. * - * @param eo union operation - * @param ibf_key IBF key of interest + * @param op intersection operation */ static void -send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) +finish_and_destroy (struct Operation *op) { - struct SendElementClosure send_cls; + GNUNET_assert (GNUNET_NO == op->state->client_done_sent); - send_cls.ibf_key = ibf_key; - send_cls.eo = eo; - GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, - &send_element_iterator, &send_cls); + if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + op->keep++; + send_remaining_elements (op); + return; + } + send_client_done_and_destroy (op); } /** - * Decode which elements are missing on each side, and - * send the appropriate elemens and requests + * Remove all elements from our hashmap. * - * @param eo union operation + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) */ -static void -decode_and_send (struct OperationState *eo) +static int +filter_all (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - struct IBF_Key key; - struct IBF_Key last_key; - int side; - unsigned int num_decoded; - struct InvertibleBloomFilter *diff_ibf; - - GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); - - prepare_ibf (eo, eo->remote_ibf->size); - diff_ibf = ibf_dup (eo->local_ibf); - ibf_subtract (diff_ibf, eo->remote_ibf); - - ibf_destroy (eo->remote_ibf); - eo->remote_ibf = NULL; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); - - num_decoded = 0; - last_key.key_val = 0; - - while (1) - { - int res; - int cycle_detected = GNUNET_NO; - - last_key = key; - - res = ibf_decode (diff_ibf, &side, &key); - if (res == GNUNET_OK) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", - key.key_val); - num_decoded += 1; - if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", - num_decoded, diff_ibf->size); - cycle_detected = GNUNET_YES; - } - } - if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) - { - int next_order; - next_order = 0; - while (1<size) - next_order++; - next_order++; - if (next_order <= MAX_IBF_ORDER) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "decoding failed, sending larger ibf (size %u)\n", - 1<mq, ev); - break; - } - if (1 == side) - { - send_elements_for_key (eo, key); - } - else if (-1 == side) - { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MessageHeader *msg; - - /* FIXME: before sending the request, check if we may just have the element */ - /* FIXME: merge multiple requests */ - /* FIXME: remember somewhere that we already requested the element, - * so that we don't request it again with the next ibf if decoding fails */ - ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), - GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); - - *(struct IBF_Key *) &msg[1] = key; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); - GNUNET_MQ_send (eo->mq, ev); - } - else - { - GNUNET_assert (0); - } - } - ibf_destroy (diff_ibf); + struct Operation *op = cls; + struct ElementEntry *ee = value; + + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Final reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + return GNUNET_YES; } /** - * Handle an IBF message from a remote peer. + * Handle a done message from a remote peer * - * @param cls the union operation - * @param mh the header of the message + * @param cls the intersection operation + * @param mh the message */ static void -handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct OperationState *eo = cls; - struct IBFMessage *msg = (struct IBFMessage *) mh; - unsigned int buckets_in_message; + struct Operation *op = cls; + const struct IntersectionDoneMessage *idm; - if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || - (eo->phase == PHASE_EXPECT_IBF) ) + if (PHASE_BF_EXCHANGE != op->state->phase) { - eo->phase = PHASE_EXPECT_IBF_CONT; - GNUNET_assert (NULL == eo->remote_ibf); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<order); - eo->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); - eo->ibf_buckets_received = 0; - if (0 != ntohs (msg->offset)) - { - GNUNET_break (0); - fail_union_operation (eo); - return; - } - } - else if (eo->phase == PHASE_EXPECT_IBF_CONT) - { - if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || - (1<order != eo->remote_ibf->size) ) - { - GNUNET_break (0); - fail_union_operation (eo); - return; - } + /* wrong phase to conclude? FIXME: Or should we allow this + if the other peer has _initially_ already an empty set? */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; } - - buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; - - if (0 == buckets_in_message) + if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) { GNUNET_break_op (0); - fail_union_operation (eo); + fail_intersection_operation (op); return; } - - if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) + idm = (const struct IntersectionDoneMessage *) mh; + if (0 == ntohl (idm->final_element_count)) { - GNUNET_break (0); - fail_union_operation (eo); - return; + /* other peer determined empty set is the intersection, + remove all elements */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &filter_all, + op); } - - ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); - eo->ibf_buckets_received += buckets_in_message; - - if (eo->ibf_buckets_received == eo->remote_ibf->size) + if ( (op->state->my_element_count != ntohl (idm->final_element_count)) || + (0 != memcmp (&op->state->my_xor, + &idm->element_xor_hash, + sizeof (struct GNUNET_HashCode))) ) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); - eo->phase = PHASE_EXPECT_ELEMENTS; - decode_and_send (eo); + /* Other peer thinks we are done, but we disagree on the result! */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got IntersectionDoneMessage, have %u elements in intersection\n", + op->state->my_element_count); + op->state->phase = PHASE_FINISHED; + finish_and_destroy (op); } /** - * Send a result message to the client indicating - * that there is a new element. + * Initiate a set intersection operation with a remote peer. * - * @param eo union operation - * @param element element to send + * @param op operation that is created, should be initialized to + * begin the evaluation + * @param opaque_context message to be transmitted to the listener + * to convince him to accept, may be NULL */ static void -send_client_element (struct OperationState *eo, - struct GNUNET_SET_Element *element) +intersection_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; + struct OperationRequestMessage *msg; + + op->state = GNUNET_new (struct OperationState); + /* we started the operation, thus we have to send the operation request */ + op->state->phase = PHASE_INITIAL; + op->state->my_element_count = op->spec->set->state->current_set_element_count; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); - GNUNET_assert (0 != eo->spec->client_request_id); - ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initiating intersection operation evaluation\n"); + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); if (NULL == ev) { - GNUNET_MQ_discard (ev); + /* the context message is too large!? */ GNUNET_break (0); + GNUNET_SERVICE_client_drop (op->spec->set->client); return; } - rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (eo->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); - GNUNET_MQ_send (eo->spec->set->client_mq, ev); + msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); + msg->element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, + ev); + op->state->phase = PHASE_COUNT_SENT; + if (NULL != opaque_context) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request with context message\n"); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request without context message\n"); } /** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * Accept an intersection operation request from a remote peer. Only + * initializes the private operation state. * - * @param eo union operation + * @param op operation that will be accepted as an intersection operation */ static void -send_client_done_and_destroy (struct OperationState *eo) +intersection_accept (struct Operation *op) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *rm; - - GNUNET_assert (GNUNET_NO == eo->client_done_sent); - - eo->client_done_sent = GNUNET_YES; - - ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); - rm->request_id = htonl (eo->spec->client_request_id); - rm->result_status = htons (GNUNET_SET_STATUS_DONE); - rm->element_type = htons (0); - GNUNET_MQ_send (eo->spec->set->client_mq, ev); - - union_operation_destroy (eo); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting set intersection operation\n"); + op->state = GNUNET_new (struct OperationState); + op->state->phase = PHASE_INITIAL; + op->state->my_element_count + = op->spec->set->state->current_set_element_count; + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create + (GNUNET_MIN (op->state->my_element_count, + op->spec->remote_element_count), + GNUNET_YES); + if (op->spec->remote_element_count < op->state->my_element_count) + { + /* If the other peer (Alice) has fewer elements than us (Bob), + we just send the count as Alice should send the first BF */ + send_element_count (op); + op->state->phase = PHASE_COUNT_SENT; + return; + } + /* We have fewer elements, so we start with the BF */ + begin_bf_exchange (op); } /** - * Handle an element message from a remote peer. + * Dispatch messages for a intersection operation. * - * @param cls the union operation - * @param mh the message + * @param op the state of the intersection evaluate operation + * @param mh the received message + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) +static int +intersection_handle_p2p_message (struct Operation *op, + const struct GNUNET_MessageHeader *mh) { - struct OperationState *eo = cls; - struct ElementEntry *ee; - uint16_t element_size; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); - - if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && - (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received p2p message (t: %u, s: %u)\n", + ntohs (mh->type), ntohs (mh->size)); + switch (ntohs (mh->type)) { - fail_union_operation (eo); - GNUNET_break (0); - return; + /* this message handler is not active until after we received an + * operation request message, thus the ops request is not handled here + */ + case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO: + handle_p2p_element_info (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: + handle_p2p_bf (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: + handle_p2p_done (op, mh); + break; + default: + /* something wrong with cadet's message handlers? */ + GNUNET_assert (0); } - element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); - ee = GNUNET_malloc (sizeof *ee + element_size); - memcpy (&ee[1], &mh[1], element_size); - ee->element.size = element_size; - ee->element.data = &ee[1]; - ee->remote = GNUNET_YES; - GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); - - /* FIXME: see if the element has already been inserted! */ - - op_register_element (eo, ee); - send_client_element (eo, &ee->element); + return GNUNET_OK; } /** - * Handle an element request from a remote peer. + * Handler for peer-disconnects, notifies the client about the aborted + * operation. If we did not expect anything from the other peer, we + * gracefully terminate the operation. * - * @param cls the union operation - * @param mh the message + * @param op the destroyed operation */ static void -handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) +intersection_peer_disconnect (struct Operation *op) { - struct OperationState *eo = cls; - struct IBF_Key *ibf_key; - unsigned int num_keys; - - /* look up elements and send them */ - if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if (PHASE_FINISHED != op->state->phase) { - GNUNET_break (0); - fail_union_operation (eo); + fail_intersection_operation (op); return; } - - num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key); - - if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) - { - GNUNET_break (0); - fail_union_operation (eo); - return; - } - - ibf_key = (struct IBF_Key *) &mh[1]; - while (0 != num_keys--) - { - send_elements_for_key (eo, *ibf_key); - ibf_key++; - } + /* the session has already been concluded */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Other peer disconnected (finished)\n"); + if (GNUNET_NO == op->state->client_done_sent) + finish_and_destroy (op); } /** - * Handle a done message from a remote peer + * Destroy the intersection operation. Only things specific to the + * intersection operation are destroyed. * - * @param cls the union operation - * @param mh the message + * @param op intersection operation to destroy */ static void -handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) +intersection_op_cancel (struct Operation *op) { - struct OperationState *eo = cls; - struct GNUNET_MQ_Envelope *ev; - - if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + /* check if the op was canceled twice */ + GNUNET_assert (NULL != op->state); + if (NULL != op->state->remote_bf) { - /* we got all requests, but still have to send our elements as response */ - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); - eo->phase = PHASE_FINISHED; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (eo->mq, ev); - return; + GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); + op->state->remote_bf = NULL; } - if (eo->phase == PHASE_EXPECT_ELEMENTS) + if (NULL != op->state->local_bf) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); - eo->phase = PHASE_FINISHED; - send_client_done_and_destroy (eo); - return; + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; } - GNUNET_break (0); - fail_union_operation (eo); -} - - -/** - * Evaluate a union operation with - * a remote peer. - * - * @param spec specification of the operation the evaluate - * @param tunnel tunnel already connected to the partner peer - * @param tc tunnel context, passed here so all new incoming - * messages are directly going to the union operations - * @return a handle to the operation - */ -static void -intersection_evaluate (struct OperationSpecification *spec, - struct GNUNET_MESH_Tunnel *tunnel, - struct TunnelContext *tc) -{ - struct OperationState *eo; - - eo = GNUNET_new (struct OperationState); - tc->vt = _GSS_union_vt (); - tc->op = eo; - eo->se = strata_estimator_dup (spec->set->state->se); - eo->generation_created = spec->set->current_generation++; - eo->set = spec->set; - eo->spec = spec; - eo->tunnel = tunnel; - eo->mq = GNUNET_MESH_mq_create (tunnel); - + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); + op->state->my_elements = NULL; + } + GNUNET_free (op->state); + op->state = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "evaluating union operation, (app %s)\n", - GNUNET_h2s (&eo->spec->app_id)); - - /* we started the operation, thus we have to send the operation request */ - eo->phase = PHASE_EXPECT_SE; - - GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, - eo->set->state->ops_tail, - eo); - - send_operation_request (eo); -} - - -/** - * Accept an union operation request from a remote peer - * - * @param spec all necessary information about the operation - * @param tunnel open tunnel to the partner's peer - * @param tc tunnel context, passed here so all new incoming - * messages are directly going to the union operations - * @return operation - */ -static void -intersection_accept (struct OperationSpecification *spec, - struct GNUNET_MESH_Tunnel *tunnel, - struct TunnelContext *tc) -{ - struct OperationState *eo; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); - - eo = GNUNET_new (struct OperationState); - tc->vt = _GSS_union_vt (); - tc->op = eo; - eo->set = spec->set; - eo->generation_created = eo->set->current_generation++; - eo->spec = spec; - eo->tunnel = tunnel; - eo->mq = GNUNET_MESH_mq_create (tunnel); - eo->se = strata_estimator_dup (eo->set->state->se); - /* transfer ownership of mq and socket from incoming to eo */ - GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, - eo->set->state->ops_tail, - eo); - /* kick off the operation */ - send_strata_estimator (eo); + "Destroying intersection op state done\n"); } /** - * Create a new set supporting the union operation + * Create a new set supporting the intersection operation. * * @return the newly created set */ static struct SetState * -intersection_set_create (void) +intersection_set_create () { struct SetState *set_state; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n"); - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection set created\n"); set_state = GNUNET_new (struct SetState); - set_state->se = strata_estimator_create (SE_STRATA_COUNT, - SE_IBF_SIZE, SE_IBF_HASH_NUM); + set_state->current_set_element_count = 0; + return set_state; } @@ -1216,139 +1201,59 @@ intersection_set_create (void) * @param ee the element to add to the set */ static void -intersection_add (struct SetState *set_state, struct ElementEntry *ee) +intersection_add (struct SetState *set_state, + struct ElementEntry *ee) { - strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0)); + set_state->current_set_element_count++; } /** - * Destroy a set that supports the union operation + * Destroy a set that supports the intersection operation * * @param set_state the set to destroy */ static void intersection_set_destroy (struct SetState *set_state) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n"); - /* important to destroy operations before the rest of the set */ - while (NULL != set_state->ops_head) - union_operation_destroy (set_state->ops_head); - if (NULL != set_state->se) - { - strata_estimator_destroy (set_state->se); - set_state->se = NULL; - } GNUNET_free (set_state); } /** * Remove the element given in the element message from the set. - * Only marks the element as removed, so that older set operations can still exchange it. * * @param set_state state of the set to remove from * @param element set element to remove */ static void -intersection_remove (struct SetState *set_state, struct ElementEntry *element) +intersection_remove (struct SetState *set_state, + struct ElementEntry *element) { - /* FIXME: remove from strata estimator */ + GNUNET_assert (0 < set_state->current_set_element_count); + set_state->current_set_element_count--; } /** - * Dispatch messages for a union operation. + * Get the table with implementing functions for set intersection. * - * @param eo the state of the union evaluate operation - * @param mh the received message - * @return GNUNET_SYSERR if the tunnel should be disconnected, - * GNUNET_OK otherwise + * @return the operation specific VTable */ -int -intersection_handle_p2p_message (struct OperationState *eo, - const struct GNUNET_MessageHeader *mh) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", - ntohs (mh->type), ntohs (mh->size)); - switch (ntohs (mh->type)) - { - case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: - handle_p2p_ibf (eo, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_P2P_SE: - handle_p2p_strata_estimator (eo, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: - handle_p2p_elements (eo, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: - handle_p2p_element_requests (eo, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: - handle_p2p_done (eo, mh); - break; - default: - /* something wrong with mesh's message handlers? */ - GNUNET_assert (0); - } - return GNUNET_OK; -} - - -static void -intersection_peer_disconnect (struct OperationState *op) -{ - /* Are we already disconnected? */ - if (NULL == op->tunnel) - return; - op->tunnel = NULL; - if (NULL != op->mq) - { - GNUNET_MQ_destroy (op->mq); - op->mq = NULL; - } - if (PHASE_FINISHED != op->phase) - { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *msg; - - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); - msg->request_id = htonl (op->spec->client_request_id); - msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); - union_operation_destroy (op); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); - if (GNUNET_NO == op->client_done_sent) - send_client_done_and_destroy (op); -} - - -static void -intersection_op_cancel (struct SetState *set_state, uint32_t op_id) -{ - /* FIXME: implement */ -} - - const struct SetVT * _GSS_intersection_vt () { - static const struct SetVT union_vt = { - .create = &union_set_create, - .msg_handler = &union_handle_p2p_message, - .add = &union_add, - .remove = &union_remove, - .destroy_set = &union_set_destroy, - .evaluate = &union_evaluate, - .accept = &union_accept, - .peer_disconnect = &union_peer_disconnect, - .cancel = &union_op_cancel, + static const struct SetVT intersection_vt = { + .create = &intersection_set_create, + .msg_handler = &intersection_handle_p2p_message, + .add = &intersection_add, + .remove = &intersection_remove, + .destroy_set = &intersection_set_destroy, + .evaluate = &intersection_evaluate, + .accept = &intersection_accept, + .peer_disconnect = &intersection_peer_disconnect, + .cancel = &intersection_op_cancel, }; - return &union_vt; + return &intersection_vt; }