X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_intersection.c;h=9fe1eabe64e98753f2db24796c01d7fe8431b1d1;hb=9528dcc4b739041f51ed0c8791fe34902525fac2;hp=6b2473e26c7cb0267b747149da8a0b59b44ac749;hpb=b5a4052779cba718ff887c6b43c2d20d5e076510;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 6b2473e26..9fe1eabe6 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013, 2014 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,30 +14,22 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ - /** * @file set/gnunet-service-set_intersection.c * @brief two-peer set intersection * @author Christian Fuchs + * @author Christian Grothoff */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet-service-set.h" #include "gnunet_block_lib.h" -#include "set_protocol.h" +#include "gnunet-service-set_protocol.h" #include -#define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH - -#define CALCULATE_BF_SIZE(A, B, s, k) \ - do { \ - k = ceil(1 + log2((double) (2*B / (double) A)));\ - if (k<1) k=1; /* k can be calculated as 0 */\ - s = ceil((double) (A * k / log(2))); \ - } while (0) /** * Current phase we are in for a intersection operation. @@ -45,33 +37,34 @@ enum IntersectionOperationPhase { /** - * Alices has suggested an operation to bob, - * and is waiting for a bf or session end. + * We are just starting. */ PHASE_INITIAL, + /** - * Bob has accepted the operation, Bob and Alice are now exchanging bfs - * until one notices the their element count is equal + * We have send the number of our elements to the other + * peer, but did not setup our element set yet. */ - PHASE_BF_EXCHANGE, + PHASE_COUNT_SENT, + /** - * if both peers have an equal peercount, they enter this state for - * one more turn, to see if they actually have agreed on a correct set. - * if a peer finds the same element count after the next iteration, - * it ends the the session + * We have initialized our set and are now reducing it by exchanging + * Bloom filters until one party notices the their element hashes + * are equal. */ - PHASE_MAYBE_FINISHED, + PHASE_BF_EXCHANGE, + /** - * The protocol is over. - * Results may still have to be sent to the client. + * The protocol is over. Results may still have to be sent to the + * client. */ PHASE_FINISHED + }; /** - * State of an evaluate operation - * with another peer. + * State of an evaluate operation with another peer. */ struct OperationState { @@ -86,57 +79,83 @@ struct OperationState struct GNUNET_CONTAINER_BloomFilter *local_bf; /** - * for multipart msgs we have to store the bloomfilter-data until we fully sent it. + * Remaining elements in the intersection operation. + * Maps element-id-hashes to 'elements in our set'. */ - char * bf_data; + struct GNUNET_CONTAINER_MultiHashMap *my_elements; /** - * size of the bloomfilter + * Iterator for sending the final set of @e my_elements to the client. */ - uint32_t bf_data_size; + struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; /** - * size of the bloomfilter + * Evaluate operations are held in a linked list. */ - uint32_t bf_bits_per_element; + struct OperationState *next; /** - * Current state of the operation. + * Evaluate operations are held in a linked list. */ - enum IntersectionOperationPhase phase; + struct OperationState *prev; /** - * Generation in which the operation handle - * was created. + * For multipart BF transmissions, we have to store the + * bloomfilter-data until we fully received it. */ - unsigned int generation_created; + char *bf_data; /** - * Maps element-id-hashes to 'elements in our set'. + * XOR of the keys of all of the elements (remaining) in my set. + * Always updated when elements are added or removed to + * @e my_elements. */ - struct GNUNET_CONTAINER_MultiHashMap *my_elements; + struct GNUNET_HashCode my_xor; /** - * Current element count contained within contained_elements + * XOR of the keys of all of the elements (remaining) in + * the other peer's set. Updated when we receive the + * other peer's Bloom filter. + */ + struct GNUNET_HashCode other_xor; + + /** + * How many bytes of @e bf_data are valid? + */ + uint32_t bf_data_offset; + + /** + * Current element count contained within @e my_elements. + * (May differ briefly during initialization.) */ uint32_t my_element_count; /** - * Iterator for sending elements on the key to element mapping to the client. + * size of the bloomfilter in @e bf_data. */ - struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter; + uint32_t bf_data_size; /** - * Evaluate operations are held in - * a linked list. + * size of the bloomfilter */ - struct OperationState *next; + uint32_t bf_bits_per_element; - /** - * Evaluate operations are held in - * a linked list. - */ - struct OperationState *prev; + /** + * Salt currently used for BF construction (by us or the other peer, + * depending on where we are in the code). + */ + uint32_t salt; + + /** + * Current state of the operation. + */ + enum IntersectionOperationPhase phase; + + /** + * Generation in which the operation handle + * was created. + */ + unsigned int generation_created; /** * Did we send the client that we are done? @@ -147,192 +166,216 @@ struct OperationState /** * Extra state required for efficient set intersection. + * Merely tracks the total number of elements. */ struct SetState { /** - * Number of currently valid elements in the set which have not been removed + * Number of currently valid elements in the set which have not been + * removed. */ uint32_t current_set_element_count; }; /** - * Send a result message to the client indicating - * we removed an element + * If applicable in the current operation mode, send a result message + * to the client indicating we removed an element. * - * @param op union operation + * @param op intersection operation * @param element element to send */ static void -send_client_element (struct Operation *op, - struct GNUNET_SET_Element *element) +send_client_removed_element (struct Operation *op, + struct GNUNET_SET_Element *element) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending removed element (size %u) to client\n", element->size); + if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) + return; /* Wrong mode for transmitting removed elements */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending removed element (size %u) to client\n", + element->size); GNUNET_assert (0 != op->spec->client_request_id); - ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); if (NULL == ev) { - GNUNET_MQ_discard (ev); GNUNET_break (0); return; } rm->result_status = htons (GNUNET_SET_STATUS_OK); rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); - GNUNET_MQ_send (op->spec->set->client_mq, ev); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); } /** - * Alice's version: - * - * fills the contained-elements hashmap with all relevant - * elements and adds their mutated hashes to our local bloomfilter with mutator+1 + * Fills the "my_elements" hashmap with all relevant elements. * - * @param cls closure + * @param cls the `struct Operation *` we are performing * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. + * @param value the `struct ElementEntry *` from the hash map + * @return #GNUNET_YES (we should continue to iterate) */ static int -iterator_initialization_by_alice (void *cls, - const struct GNUNET_HashCode *key, - void *value) +filtered_map_initialization (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - struct ElementEntry *ee = value; struct Operation *op = cls; + struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - //only consider this element, if it is valid for us - if ((op->generation_created < ee->generation_removed) - && (op->generation_created >= ee->generation_added)) - return GNUNET_YES; - - // not contained according to bob's bloomfilter - GNUNET_BLOCK_mingle_hash(&ee->element_hash, - op->spec->salt, - &mutated_hash); - if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, - &mutated_hash)){ - if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode) - send_client_element (op, &ee->element); - return GNUNET_YES; - } - - op->state->my_element_count++; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, - &ee->element_hash, ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - return GNUNET_YES; -} + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "FIMA called for %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); -/** - * fills the contained-elements hashmap with all relevant - * elements and adds their mutated hashes to our local bloomfilter - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. - */ -static int -iterator_initialization (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct ElementEntry *ee = value; - struct Operation *op = cls; + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u (wrong generation)\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + return GNUNET_YES; /* element not valid in our operation's generation */ + } - //only consider this element, if it is valid for us - if ((op->generation_created < ee->generation_removed) - && (op->generation_created >= ee->generation_added)) + /* Test if element is in other peer's bloomfilter */ + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, + &mutated_hash)) + { + /* remove this element */ + send_client_removed_element (op, + &ee->element); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); return GNUNET_YES; + } + op->state->my_element_count++; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Filtered initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, - &ee->element_hash, ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); return GNUNET_YES; } /** - * removes element from a hashmap if it is not contained within the - * provided remote bloomfilter. Then, fill our new bloomfilter. + * Removes elements from our hashmap if they are not contained within the + * provided remote bloomfilter. * - * @param cls closure + * @param cls closure with the `struct Operation *` * @param key current key code * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. + * @return #GNUNET_YES (we should continue to iterate) */ static int iterator_bf_reduce (void *cls, const struct GNUNET_HashCode *key, void *value) { - struct ElementEntry *ee = value; struct Operation *op = cls; + struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash); - + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, &mutated_hash)) { + GNUNET_break (0 < op->state->my_element_count); op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, &ee->element_hash, ee)); - if (GNUNET_SET_RESULT_REMOVED == op->spec->result_mode) - send_client_element (op, &ee->element); + send_client_removed_element (op, + &ee->element); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, keeping %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); } - return GNUNET_YES; } + /** - * create a bloomfilter based on the elements given + * Create initial bloomfilter based on all the elements given. * - * @param cls closure + * @param cls the `struct Operation *` * @param key current key code - * @param value value in the hash map - * @return #GNUNET_YES if we should continue to - * iterate, - * #GNUNET_NO if not. + * @param value the `struct ElementEntry` to process + * @return #GNUNET_YES (we should continue to iterate) */ static int iterator_bf_create (void *cls, - const struct GNUNET_HashCode *key, - void *value) + const struct GNUNET_HashCode *key, + void *value) { - struct ElementEntry *ee = value; struct Operation *op = cls; + struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash); - + GNUNET_BLOCK_mingle_hash (&ee->element_hash, + op->state->salt, + &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initializing BF with hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, &mutated_hash); return GNUNET_YES; } + /** - * Inform the client that the union operation has failed, + * Inform the client that the intersection operation has failed, * and proceed to destroy the evaluate operation. * * @param op the intersection operation to fail @@ -343,96 +386,28 @@ fail_intersection_operation (struct Operation *op) struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - if (op->state->my_elements){ - GNUNET_CONTAINER_multihashmap_destroy(op->state->my_elements); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Intersection operation failed\n"); + if (NULL != op->state->my_elements) + { + GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); op->state->my_elements = NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "intersection operation failed\n"); - - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->request_id = htonl (op->spec->client_request_id); msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - _GSS_operation_destroy (op); -} - - -/** - * Send a request for the evaluate operation to a remote peer - * - * @param op operation with the other peer - */ -static void -send_operation_request (struct Operation *op) -{ - struct GNUNET_MQ_Envelope *ev; - struct OperationRequestMessage *msg; - - ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - op->spec->context_msg); - - if (NULL == ev) - { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (op->spec->set->client); - return; - } - msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); - msg->app_id = op->spec->app_id; - msg->salt = htonl (op->spec->salt); - msg->element_count = htonl(op->state->my_element_count); - - GNUNET_MQ_send (op->mq, ev); - - if (NULL != op->spec->context_msg) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); - - if (NULL != op->spec->context_msg) - { - GNUNET_free (op->spec->context_msg); - op->spec->context_msg = NULL; - } + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); } -static void -send_bloomfilter_multipart (struct Operation *op, uint32_t offset) -{ - struct GNUNET_MQ_Envelope *ev; - struct BFPart *msg; - uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); - uint32_t todo_size = op->state->bf_data_size - offset; - - if (todo_size < chunk_size) - chunk_size = todo_size; - - ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); - - msg->chunk_length = htonl (chunk_size); - msg->chunk_offset = htonl (offset); - memcpy(&msg[1], &op->state->bf_data[offset], chunk_size); - - GNUNET_MQ_send (op->mq, ev); - - if (op->state->bf_data_size == offset + chunk_size) - { - // done - GNUNET_free(op->state->bf_data); - op->state->bf_data = NULL; - return; - } - - send_bloomfilter_multipart (op, offset + chunk_size); -} /** - * Send a bloomfilter to our peer. - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * Send a bloomfilter to our peer. After the result done message has + * been sent to the client, destroy the evaluate operation. * * @param op intersection operation */ @@ -444,58 +419,85 @@ send_bloomfilter (struct Operation *op) uint32_t bf_size; uint32_t bf_elementbits; uint32_t chunk_size; - struct GNUNET_CONTAINER_BloomFilter * local_bf; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n"); - - CALCULATE_BF_SIZE(op->state->my_element_count, - op->spec->remote_element_count, - bf_size, - bf_elementbits); - - local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - bf_size, - bf_elementbits); - - op->spec->salt++; + char *bf_data; + uint32_t offset; + + /* We consider the ratio of the set sizes to determine + the number of bits per element, as the smaller set + should use more bits to maximize its set reduction + potential and minimize overall bandwidth consumption. */ + bf_elementbits = 2 + ceil (log2((double) + (op->spec->remote_element_count / + (double) op->state->my_element_count))); + if (bf_elementbits < 1) + bf_elementbits = 1; /* make sure k is not 0 */ + /* optimize BF-size to ~50% of bits set */ + bf_size = ceil ((double) (op->state->my_element_count + * bf_elementbits / log(2))); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending Bloom filter (%u) of size %u bytes\n", + (unsigned int) bf_elementbits, + (unsigned int) bf_size); + op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + bf_elementbits); + op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, &iterator_bf_create, op); - // send our bloomfilter - if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) { - // singlepart + /* send our Bloom filter */ + chunk_size = 60 * 1024 - sizeof (struct BFMessage); + if (bf_size <= chunk_size) + { + /* singlepart */ chunk_size = bf_size; - ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); GNUNET_assert (GNUNET_SYSERR != - GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, - (char*)&msg[1], + GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf, + (char*) &msg[1], bf_size)); + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); } - else { - //multipart - chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); - ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); - op->state->bf_data = (char *) GNUNET_malloc (bf_size); + else + { + /* multipart */ + bf_data = GNUNET_malloc (bf_size); GNUNET_assert (GNUNET_SYSERR != - GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, - op->state->bf_data, + GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf, + bf_data, bf_size)); - memcpy (&msg[1], op->state->bf_data, chunk_size); - op->state->bf_data_size = bf_size; + offset = 0; + while (offset < bf_size) + { + if (bf_size - chunk_size < offset) + chunk_size = bf_size - offset; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_memcpy (&msg[1], + &bf_data[offset], + chunk_size); + offset += chunk_size; + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + GNUNET_free (bf_data); } - GNUNET_CONTAINER_bloomfilter_free (local_bf); - - msg->sender_element_count = htonl (op->state->my_element_count); - msg->bloomfilter_total_length = htonl (bf_size); - msg->bloomfilter_length = htonl (chunk_size); - msg->bits_per_element = htonl (bf_elementbits); - msg->sender_mutator = htonl (op->spec->salt); - - GNUNET_MQ_send (op->mq, ev); - - if (op->state->bf_data) - send_bloomfilter_multipart (op, chunk_size); + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; } @@ -511,51 +513,68 @@ send_client_done_and_destroy (void *cls) struct Operation *op = cls; struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); + + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SET_RESULT); rm->request_id = htonl (op->spec->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - _GSS_operation_destroy (op); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + _GSS_operation_destroy (op, + GNUNET_YES); } /** * Send all elements in the full result iterator. * - * @param cls operation + * @param cls the `struct Operation *` */ static void send_remaining_elements (void *cls) { struct Operation *op = cls; - struct ElementEntry *remaining; //TODO rework this, key entry does not exist here + const void *nxt; + const struct ElementEntry *ee; struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - struct GNUNET_SET_Element *element; + const struct GNUNET_SET_Element *element; int res; - res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining); - if (GNUNET_NO == res) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); + res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, + NULL, + &nxt); + if (GNUNET_NO == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending done and destroy because iterator ran out\n"); + op->keep--; send_client_done_and_destroy (op); return; } - - element = &remaining->element; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); + ee = nxt; + element = &ee->element; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending element %s:%u to client (full set)\n", + GNUNET_h2s (&ee->element_hash), + element->size); GNUNET_assert (0 != op->spec->client_request_id); - - ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + ev = GNUNET_MQ_msg_extra (rm, + element->size, + GNUNET_MESSAGE_TYPE_SET_RESULT); GNUNET_assert (NULL != ev); - rm->result_status = htons (GNUNET_SET_STATUS_OK); rm->request_id = htonl (op->spec->client_request_id); - rm->element_type = element->type; - memcpy (&rm[1], element->data, element->size); - - GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); - GNUNET_MQ_send (op->spec->set->client_mq, ev); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_notify_sent (ev, + &send_remaining_elements, + op); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); } @@ -568,119 +587,83 @@ static void send_peer_done (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; + struct IntersectionDoneMessage *idm; op->state->phase = PHASE_FINISHED; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE\n"); GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); op->state->local_bf = NULL; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (op->mq, ev); + ev = GNUNET_MQ_msg (idm, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + idm->final_element_count = htonl (op->state->my_element_count); + idm->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, + ev); } /** - * Process a Bloomfilter once we got all the chunks + * Process a Bloomfilter once we got all the chunks. * * @param op the intersection operation */ static void -process_bf (struct Operation *op){ - uint32_t old_elements; - uint32_t peer_elements; - - old_elements = op->state->my_element_count; - peer_elements = op->spec->remote_element_count; +process_bf (struct Operation *op) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", + op->state->phase, + op->spec->remote_element_count, + op->state->my_element_count, + GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); switch (op->state->phase) { case PHASE_INITIAL: - // If we are ot our first msg - op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - &iterator_initialization_by_alice, + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + case PHASE_COUNT_SENT: + /* This is the first BF being sent, build our initial map with + filtering in place */ + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, + GNUNET_YES); + op->state->my_element_count = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &filtered_map_initialization, op); break; case PHASE_BF_EXCHANGE: - case PHASE_MAYBE_FINISHED: - // if we are bob or alice and are continuing operation + /* Update our set by reduction */ GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, &iterator_bf_reduce, op); break; - default: + case PHASE_FINISHED: GNUNET_break_op (0); fail_intersection_operation(op); + return; } - // the iterators created a new BF with salt+1 - // the peer needs this information for decoding the next BF - // this behavior can be modified at will later on. - op->spec->salt++; - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); op->state->remote_bf = NULL; - if ((0 == op->state->my_element_count) // fully disjoint - || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements - && (old_elements == op->state->my_element_count) - && (op->state->my_element_count == peer_elements))) { - // In the last round we though we were finished, we now know this is correct + if ( (0 == op->state->my_element_count) || /* fully disjoint */ + ( (op->state->my_element_count == op->spec->remote_element_count) && + (0 == memcmp (&op->state->my_xor, + &op->state->other_xor, + sizeof (struct GNUNET_HashCode))) ) ) + { + /* we are done */ send_peer_done (op); return; } - op->state->phase = PHASE_BF_EXCHANGE; - if (op->state->my_element_count == peer_elements) - // maybe we are finished, but we do one more round to make certain - // we don't have false positives ... - op->state->phase = PHASE_MAYBE_FINISHED; - send_bloomfilter (op); } -/** - * Handle an BF multipart message from a remote peer. - * - * @param cls the intersection operation - * @param mh the header of the message - */ -static void -handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh) -{ - struct Operation *op = cls; - const struct BFPart *msg = (const struct BFPart *) mh; - uint32_t chunk_size; - uint32_t chunk_offset; - - chunk_size = ntohl(msg->chunk_length); - chunk_offset = ntohl(msg->chunk_offset); - - if ((NULL == op->state->bf_data) - || (op->state->bf_data_size < chunk_size + chunk_offset)){ - // unexpected multipart chunk - GNUNET_break_op (0); - fail_intersection_operation(op); - return; - } - - memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size); - - if (op->state->bf_data_size != chunk_offset + chunk_size) - // wait for next chunk - return; - - op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], - op->state->bf_data_size, - op->state->bf_bits_per_element); - - GNUNET_free (op->state->bf_data); - op->state->bf_data = NULL; - - process_bf (op); -} - - /** * Handle an BF message from a remote peer. * @@ -688,90 +671,142 @@ handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh) * @param mh the header of the message */ static void -handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_bf (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - const struct BFMessage *msg = (const struct BFMessage *) mh; + const struct BFMessage *msg; uint32_t bf_size; uint32_t chunk_size; uint32_t bf_bits_per_element; + uint16_t msize; + msize = htons (mh->size); + if (msize < sizeof (struct BFMessage)) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + msg = (const struct BFMessage *) mh; switch (op->state->phase) { case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + break; + case PHASE_COUNT_SENT: case PHASE_BF_EXCHANGE: - case PHASE_MAYBE_FINISHED: - if (NULL == op->state->bf_data) { - // no colliding multipart transaction going on currently - op->spec->salt = ntohl (msg->sender_mutator); - bf_size = ntohl (msg->bloomfilter_total_length); - bf_bits_per_element = ntohl (msg->bits_per_element); - chunk_size = ntohl (msg->bloomfilter_length); - op->spec->remote_element_count = ntohl(msg->sender_element_count); - if (bf_size == chunk_size) { - // single part, done here - op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], - bf_size, - bf_bits_per_element); - process_bf (op); + bf_size = ntohl (msg->bloomfilter_total_length); + bf_bits_per_element = ntohl (msg->bits_per_element); + chunk_size = msize - sizeof (struct BFMessage); + op->state->other_xor = msg->element_xor_hash; + if (bf_size == chunk_size) + { + if (NULL != op->state->bf_data) + { + GNUNET_break_op (0); + fail_intersection_operation (op); return; } - - //first multipart chunk + /* single part, done here immediately */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], + bf_size, + bf_bits_per_element); + op->state->salt = ntohl (msg->sender_mutator); + op->spec->remote_element_count = ntohl (msg->sender_element_count); + process_bf (op); + return; + } + /* multipart chunk */ + if (NULL == op->state->bf_data) + { + /* first chunk, initialize */ op->state->bf_data = GNUNET_malloc (bf_size); op->state->bf_data_size = bf_size; op->state->bf_bits_per_element = bf_bits_per_element; - memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); - return; + op->state->bf_data_offset = 0; + op->state->salt = ntohl (msg->sender_mutator); + op->spec->remote_element_count = ntohl (msg->sender_element_count); + } + else + { + /* increment */ + if ( (op->state->bf_data_size != bf_size) || + (op->state->bf_bits_per_element != bf_bits_per_element) || + (op->state->bf_data_offset + chunk_size > bf_size) || + (op->state->salt != ntohl (msg->sender_mutator)) || + (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + } + GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset], + (const char*) &msg[1], + chunk_size); + op->state->bf_data_offset += chunk_size; + if (op->state->bf_data_offset == bf_size) + { + /* last chunk, run! */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, + bf_size, + bf_bits_per_element); + GNUNET_free (op->state->bf_data); + op->state->bf_data = NULL; + op->state->bf_data_size = 0; + process_bf (op); } + break; default: GNUNET_break_op (0); fail_intersection_operation (op); + break; } } /** - * Handle an BF message from a remote peer. + * Fills the "my_elements" hashmap with the initial set of + * (non-deleted) elements from the set of the specification. * - * @param cls the intersection operation - * @param mh the header of the message + * @param cls closure with the `struct Operation *` + * @param key current key code for the element + * @param value value in the hash map with the `struct ElementEntry *` + * @return #GNUNET_YES (we should continue to iterate) */ -static void -handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh) +static int +initialize_map_unfiltered (void *cls, + const struct GNUNET_HashCode *key, + void *value) { + struct ElementEntry *ee = value; struct Operation *op = cls; - struct BFMessage *msg = (struct BFMessage *) mh; - - op->spec->remote_element_count = ntohl(msg->sender_element_count); - if ((op->state->phase != PHASE_INITIAL) - || (op->state->my_element_count > op->spec->remote_element_count) - || (0 == op->state->my_element_count) - || (0 == op->spec->remote_element_count)){ - GNUNET_break_op (0); - fail_intersection_operation(op); - return; - } - - op->state->phase = PHASE_BF_EXCHANGE; - op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - &iterator_initialization, - op); - - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); - op->state->remote_bf = NULL; - - if (op->state->my_element_count == ntohl (msg->sender_element_count)) - op->state->phase = PHASE_MAYBE_FINISHED; - send_bloomfilter (op); + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + return GNUNET_YES; /* element not live in operation's generation */ + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initial full initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return GNUNET_YES; } /** - * Send our element count to the peer, in case our element count is lower than his + * Send our element count to the peer, in case our element count is + * lower than his. * * @param op intersection operation */ @@ -779,179 +814,271 @@ static void send_element_count (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - struct BFMessage *msg; + struct IntersectionElementInfoMessage *msg; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element count (bf_msg)\n"); - - // just send our element count, as the other peer must start - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending our element count (%u)\n", + op->state->my_element_count); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); msg->sender_element_count = htonl (op->state->my_element_count); - msg->bloomfilter_length = htonl (0); - msg->sender_mutator = htonl (0); - GNUNET_MQ_send (op->mq, ev); } /** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * We go first, initialize our map with all elements and + * send the first Bloom filter. * - * @param op intersection operation + * @param op operation to start exchange for */ static void -finish_and_destroy (struct Operation *op) +begin_bf_exchange (struct Operation *op) { - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); - - if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); - op->state->full_result_iter = - GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); - send_remaining_elements (op); - return; - } - send_client_done_and_destroy (op); + op->state->phase = PHASE_BF_EXCHANGE; + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, + GNUNET_YES); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &initialize_map_unfiltered, + op); + send_bloomfilter (op); } /** - * Handle a done message from a remote peer + * Handle the initial `struct IntersectionElementInfoMessage` from a + * remote peer. * - * @param cls the union operation - * @param mh the message + * @param cls the intersection operation + * @param mh the header of the message */ static void -handle_p2p_done (void *cls, - const struct GNUNET_MessageHeader *mh) +handle_p2p_element_info (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; + const struct IntersectionElementInfoMessage *msg; - if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); - - finish_and_destroy (op); + if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage)) + { + GNUNET_break_op (0); + fail_intersection_operation(op); return; } - - GNUNET_break_op (0); - fail_intersection_operation (op); -} - - -/** - * Evaluate a union operation with - * a remote peer. - * - * @param op operation to evaluate - */ -static void -intersection_evaluate (struct Operation *op) -{ - op->state = GNUNET_new (struct OperationState); - /* we started the operation, thus we have to send the operation request */ - op->state->phase = PHASE_INITIAL; - op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); - op->state->my_element_count = op->spec->set->state->current_set_element_count; - + msg = (const struct IntersectionElementInfoMessage *) mh; + op->spec->remote_element_count = ntohl (msg->sender_element_count); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "evaluating intersection operation"); - send_operation_request (op); + "Received remote element count (%u), I have %u\n", + op->spec->remote_element_count, + op->state->my_element_count); + if ( ( (PHASE_INITIAL != op->state->phase) && + (PHASE_COUNT_SENT != op->state->phase) ) || + (op->state->my_element_count > op->spec->remote_element_count) || + (0 == op->state->my_element_count) || + (0 == op->spec->remote_element_count) ) + { + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + } + GNUNET_break (NULL == op->state->remote_bf); + begin_bf_exchange (op); } /** - * Accept an union operation request from a remote peer. - * Only initializes the private operation state. + * Send a result message to the client indicating that the operation + * is over. After the result done message has been sent to the + * client, destroy the evaluate operation. * - * @param op operation that will be accepted as a union operation + * @param op intersection operation */ static void -intersection_accept (struct Operation *op) +finish_and_destroy (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); - op->state = GNUNET_new (struct OperationState); - op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); - op->state->my_element_count = op->spec->set->state->current_set_element_count; + GNUNET_assert (GNUNET_NO == op->state->client_done_sent); - // if Alice (the peer) has more elements than Bob (us), she should start - if (op->spec->remote_element_count < op->state->my_element_count){ - op->state->phase = PHASE_INITIAL; - send_element_count(op); + if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + op->keep++; + send_remaining_elements (op); return; } - // create a new bloomfilter in case we have fewer elements - op->state->phase = PHASE_BF_EXCHANGE; - op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - BLOOMFILTER_SIZE, - GNUNET_CONSTANTS_BLOOMFILTER_K); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - &iterator_initialization, - op); - send_bloomfilter (op); + send_client_done_and_destroy (op); } /** - * Create a new set supporting the intersection operation + * Remove all elements from our hashmap. * - * @return the newly created set + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) */ -static struct SetState * -intersection_set_create () +static int +filter_all (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - struct SetState *set_state; + struct Operation *op = cls; + struct ElementEntry *ee = value; + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "intersection set created\n"); - set_state = GNUNET_new (struct SetState); - set_state->current_set_element_count = 0; - - return set_state; + "Final reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + return GNUNET_YES; } /** - * Add the element from the given element message to the set. + * Handle a done message from a remote peer * - * @param set_state state of the set want to add to - * @param ee the element to add to the set + * @param cls the intersection operation + * @param mh the message */ static void -intersection_add (struct SetState *set_state, - struct ElementEntry *ee) +handle_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) { - set_state->current_set_element_count++; + struct Operation *op = cls; + const struct IntersectionDoneMessage *idm; + + if (PHASE_BF_EXCHANGE != op->state->phase) + { + /* wrong phase to conclude? FIXME: Or should we allow this + if the other peer has _initially_ already an empty set? */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + idm = (const struct IntersectionDoneMessage *) mh; + if (0 == ntohl (idm->final_element_count)) + { + /* other peer determined empty set is the intersection, + remove all elements */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &filter_all, + op); + } + if ( (op->state->my_element_count != ntohl (idm->final_element_count)) || + (0 != memcmp (&op->state->my_xor, + &idm->element_xor_hash, + sizeof (struct GNUNET_HashCode))) ) + { + /* Other peer thinks we are done, but we disagree on the result! */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got IntersectionDoneMessage, have %u elements in intersection\n", + op->state->my_element_count); + op->state->phase = PHASE_FINISHED; + finish_and_destroy (op); } /** - * Destroy a set that supports the intersection operation + * Initiate a set intersection operation with a remote peer. * - * @param set_state the set to destroy + * @param op operation that is created, should be initialized to + * begin the evaluation + * @param opaque_context message to be transmitted to the listener + * to convince him to accept, may be NULL */ static void -intersection_set_destroy (struct SetState *set_state) +intersection_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) { - GNUNET_free (set_state); + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + op->state = GNUNET_new (struct OperationState); + /* we started the operation, thus we have to send the operation request */ + op->state->phase = PHASE_INITIAL; + op->state->my_element_count = op->spec->set->state->current_set_element_count; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initiating intersection operation evaluation\n"); + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large!? */ + GNUNET_break (0); + GNUNET_SERVICE_client_drop (op->spec->set->client); + return; + } + msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); + msg->element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, + ev); + op->state->phase = PHASE_COUNT_SENT; + if (NULL != opaque_context) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request with context message\n"); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sent op request without context message\n"); } /** - * Remove the element given in the element message from the set. + * Accept an intersection operation request from a remote peer. Only + * initializes the private operation state. * - * @param set_state state of the set to remove from - * @param element set element to remove + * @param op operation that will be accepted as an intersection operation */ static void -intersection_remove (struct SetState *set_state, - struct ElementEntry *element) +intersection_accept (struct Operation *op) { - GNUNET_assert(0 < set_state->current_set_element_count); - set_state->current_set_element_count--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting set intersection operation\n"); + op->state = GNUNET_new (struct OperationState); + op->state->phase = PHASE_INITIAL; + op->state->my_element_count + = op->spec->set->state->current_set_element_count; + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create + (GNUNET_MIN (op->state->my_element_count, + op->spec->remote_element_count), + GNUNET_YES); + if (op->spec->remote_element_count < op->state->my_element_count) + { + /* If the other peer (Alice) has fewer elements than us (Bob), + we just send the count as Alice should send the first BF */ + send_element_count (op); + op->state->phase = PHASE_COUNT_SENT; + return; + } + /* We have fewer elements, so we start with the BF */ + begin_bf_exchange (op); } @@ -967,7 +1094,8 @@ static int intersection_handle_p2p_message (struct Operation *op, const struct GNUNET_MessageHeader *mh) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received p2p message (t: %u, s: %u)\n", ntohs (mh->type), ntohs (mh->size)); switch (ntohs (mh->type)) { @@ -980,10 +1108,7 @@ intersection_handle_p2p_message (struct Operation *op, case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: handle_p2p_bf (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART: - handle_p2p_bf_part (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: + case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: handle_p2p_done (op, mh); break; default: @@ -995,7 +1120,9 @@ intersection_handle_p2p_message (struct Operation *op, /** - * handler for peer-disconnects, notifies the client about the aborted operation + * Handler for peer-disconnects, notifies the client about the aborted + * operation. If we did not expect anything from the other peer, we + * gracefully terminate the operation. * * @param op the destroyed operation */ @@ -1004,34 +1131,26 @@ intersection_peer_disconnect (struct Operation *op) { if (PHASE_FINISHED != op->state->phase) { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *msg; - - ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); - msg->request_id = htonl (op->spec->client_request_id); - msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); - _GSS_operation_destroy (op); + fail_intersection_operation (op); return; } - // else: the session has already been concluded - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); + /* the session has already been concluded */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Other peer disconnected (finished)\n"); if (GNUNET_NO == op->state->client_done_sent) finish_and_destroy (op); } /** - * Destroy the union operation. Only things specific to the union operation are destroyed. + * Destroy the intersection operation. Only things specific to the + * intersection operation are destroyed. * - * @param op union operation to destroy + * @param op intersection operation to destroy */ static void intersection_op_cancel (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n"); /* check if the op was canceled twice */ GNUNET_assert (NULL != op->state); if (NULL != op->state->remote_bf) @@ -1044,17 +1163,83 @@ intersection_op_cancel (struct Operation *op) GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); op->state->local_bf = NULL; } -/* if (NULL != op->state->my_elements) + if (NULL != op->state->my_elements) { - // no need to free the elements, they are still part of the set GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); op->state->my_elements = NULL; - }*/ + } GNUNET_free (op->state); op->state = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying intersection op state done\n"); +} + + +/** + * Create a new set supporting the intersection operation. + * + * @return the newly created set + */ +static struct SetState * +intersection_set_create () +{ + struct SetState *set_state; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection set created\n"); + set_state = GNUNET_new (struct SetState); + set_state->current_set_element_count = 0; + + return set_state; } + +/** + * Add the element from the given element message to the set. + * + * @param set_state state of the set want to add to + * @param ee the element to add to the set + */ +static void +intersection_add (struct SetState *set_state, + struct ElementEntry *ee) +{ + set_state->current_set_element_count++; +} + + +/** + * Destroy a set that supports the intersection operation + * + * @param set_state the set to destroy + */ +static void +intersection_set_destroy (struct SetState *set_state) +{ + GNUNET_free (set_state); +} + + +/** + * Remove the element given in the element message from the set. + * + * @param set_state state of the set to remove from + * @param element set element to remove + */ +static void +intersection_remove (struct SetState *set_state, + struct ElementEntry *element) +{ + GNUNET_assert (0 < set_state->current_set_element_count); + set_state->current_set_element_count--; +} + + +/** + * Get the table with implementing functions for set intersection. + * + * @return the operation specific VTable + */ const struct SetVT * _GSS_intersection_vt () {