X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_intersection.c;h=258ad64436494fd02d5beca0007026f9c9b12ac1;hb=5b32752cd7b02adcb8e6fec7798637638c6f63a0;hp=19d6498f5453259da9bd6ae05c779a7fe3ded706;hpb=f23525cfc0c0a8931db0b20b35c1aabbfbc5ac4e;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 19d6498f5..258ad6443 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2013, 2014 Christian Grothoff (and other contributing authors) + Copyright (C) 2013, 2014 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,13 +14,14 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file set/gnunet-service-set_intersection.c * @brief two-peer set intersection * @author Christian Fuchs + * @author Christian Grothoff */ #include "platform.h" #include "gnunet_util_lib.h" @@ -29,24 +30,6 @@ #include "gnunet-service-set_protocol.h" #include -#define BLOOMFILTER_SIZE GNUNET_CRYPTO_HASH_LENGTH - -/** - * Calculate the size of the bloom filter. - * - * @param A - * @param B - * @param s - * @param k - * @return - */ -#define CALCULATE_BF_SIZE(A, B, s, k) \ - do { \ - k = ceil(1 + log2((double) (2*B / (double) A)));\ - if (k<1) k=1; /* k can be calculated as 0 */\ - s = ceil((double) (A * k / log(2))); \ - } while (0) - /** * Current phase we are in for a intersection operation. @@ -54,30 +37,29 @@ enum IntersectionOperationPhase { /** - * Alices has suggested an operation to bob, - * and is waiting for a bf or session end. + * We are just starting. */ PHASE_INITIAL, /** - * Bob has accepted the operation, Bob and Alice are now exchanging bfs - * until one notices the their element count is equal + * We have send the number of our elements to the other + * peer, but did not setup our element set yet. */ - PHASE_BF_EXCHANGE, + PHASE_COUNT_SENT, /** - * if both peers have an equal peercount, they enter this state for - * one more turn, to see if they actually have agreed on a correct set. - * if a peer finds the same element count after the next iteration, - * it ends the the session + * We have initialized our set and are now reducing it by exchanging + * Bloom filters until one party notices the their element hashes + * are equal. */ - PHASE_MAYBE_FINISHED, + PHASE_BF_EXCHANGE, /** - * The protocol is over. - * Results may still have to be sent to the client. + * The protocol is over. Results may still have to be sent to the + * client. */ PHASE_FINISHED + }; @@ -118,12 +100,33 @@ struct OperationState struct OperationState *prev; /** - * for multipart msgs we have to store the bloomfilter-data until we fully sent it. + * For multipart BF transmissions, we have to store the + * bloomfilter-data until we fully received it. */ char *bf_data; /** - * Current element count contained within @e my_elements + * XOR of the keys of all of the elements (remaining) in my set. + * Always updated when elements are added or removed to + * @e my_elements. + */ + struct GNUNET_HashCode my_xor; + + /** + * XOR of the keys of all of the elements (remaining) in + * the other peer's set. Updated when we receive the + * other peer's Bloom filter. + */ + struct GNUNET_HashCode other_xor; + + /** + * How many bytes of @e bf_data are valid? + */ + uint32_t bf_data_offset; + + /** + * Current element count contained within @e my_elements. + * (May differ briefly during initialization.) */ uint32_t my_element_count; @@ -137,6 +140,12 @@ struct OperationState */ uint32_t bf_bits_per_element; + /** + * Salt currently used for BF construction (by us or the other peer, + * depending on where we are in the code). + */ + uint32_t salt; + /** * Current state of the operation. */ @@ -162,7 +171,8 @@ struct OperationState struct SetState { /** - * Number of currently valid elements in the set which have not been removed + * Number of currently valid elements in the set which have not been + * removed. */ uint32_t current_set_element_count; }; @@ -199,7 +209,7 @@ send_client_removed_element (struct Operation *op, rm->result_status = htons (GNUNET_SET_STATUS_OK); rm->request_id = htonl (op->spec->client_request_id); rm->element_type = element->element_type; - memcpy (&rm[1], + GNUNET_memcpy (&rm[1], element->data, element->size); GNUNET_MQ_send (op->spec->set->client_mq, @@ -208,8 +218,7 @@ send_client_removed_element (struct Operation *op, /** - * Fills the "my_elements" hashmap with all relevant elements and - * adds their mutated hashes to our local bloomfilter with mutator+1. + * Fills the "my_elements" hashmap with all relevant elements. * * @param cls the `struct Operation *` we are performing * @param key current key code @@ -217,23 +226,37 @@ send_client_removed_element (struct Operation *op, * @return #GNUNET_YES (we should continue to iterate) */ static int -filtered_map_and_bf_initialization (void *cls, - const struct GNUNET_HashCode *key, - void *value) +filtered_map_initialization (void *cls, + const struct GNUNET_HashCode *key, + void *value) { struct Operation *op = cls; struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - if ( (op->generation_created < ee->generation_removed) && - (op->generation_created >= ee->generation_added) ) + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "FIMA called for %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u (wrong generation)\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); return GNUNET_YES; /* element not valid in our operation's generation */ + } - /* Test if element is in Bob's bloomfilter */ - // FIXME: where does this salt come from!? + /* Test if element is in other peer's bloomfilter */ GNUNET_BLOCK_mingle_hash (&ee->element_hash, - op->spec->salt, + op->state->salt, &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, &mutated_hash)) @@ -241,9 +264,20 @@ filtered_map_and_bf_initialization (void *cls, /* remove this element */ send_client_removed_element (op, &ee->element); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reduced initialization, not starting with %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); return GNUNET_YES; } op->state->my_element_count++; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Filtered initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, &ee->element_hash, @@ -272,15 +306,26 @@ iterator_bf_reduce (void *cls, struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - // FIXME: where does this salt come from!? GNUNET_BLOCK_mingle_hash (&ee->element_hash, - op->spec->salt, + op->state->salt, &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Testing mingled hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, &mutated_hash)) { + GNUNET_break (0 < op->state->my_element_count); op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, &ee->element_hash, @@ -288,6 +333,13 @@ iterator_bf_reduce (void *cls, send_client_removed_element (op, &ee->element); } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bloom filter reduction of my_elements, keeping %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + } return GNUNET_YES; } @@ -309,10 +361,13 @@ iterator_bf_create (void *cls, struct ElementEntry *ee = value; struct GNUNET_HashCode mutated_hash; - // FIXME: where does this salt come from!? GNUNET_BLOCK_mingle_hash (&ee->element_hash, - op->spec->salt, + op->state->salt, &mutated_hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initializing BF with hash %s with salt %u\n", + GNUNET_h2s (&mutated_hash), + op->state->salt); GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf, &mutated_hash); return GNUNET_YES; @@ -350,49 +405,6 @@ fail_intersection_operation (struct Operation *op) } - - - - - -/** - * - * @param op - * @param offset - */ -static void -send_bloomfilter_multipart (struct Operation *op, - uint32_t offset) -{ - struct GNUNET_MQ_Envelope *ev; - struct BFPart *msg; - uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); - uint32_t todo_size = op->state->bf_data_size - offset; - - if (todo_size < chunk_size) - chunk_size = todo_size; - - ev = GNUNET_MQ_msg_extra (msg, - chunk_size, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); - - msg->chunk_length = htonl (chunk_size); - msg->chunk_offset = htonl (offset); - memcpy(&msg[1], &op->state->bf_data[offset], chunk_size); - - GNUNET_MQ_send (op->mq, ev); - - if (op->state->bf_data_size == offset + chunk_size) - { - // done - GNUNET_free(op->state->bf_data); - op->state->bf_data = NULL; - return; - } - send_bloomfilter_multipart (op, offset + chunk_size); -} - - /** * Send a bloomfilter to our peer. After the result done message has * been sent to the client, destroy the evaluate operation. @@ -407,65 +419,85 @@ send_bloomfilter (struct Operation *op) uint32_t bf_size; uint32_t bf_elementbits; uint32_t chunk_size; - struct GNUNET_CONTAINER_BloomFilter *local_bf; - + char *bf_data; + uint32_t offset; + + /* We consider the ratio of the set sizes to determine + the number of bits per element, as the smaller set + should use more bits to maximize its set reduction + potential and minimize overall bandwidth consumption. */ + bf_elementbits = 2 + ceil (log2((double) + (op->spec->remote_element_count / + (double) op->state->my_element_count))); + if (bf_elementbits < 1) + bf_elementbits = 1; /* make sure k is not 0 */ + /* optimize BF-size to ~50% of bits set */ + bf_size = ceil ((double) (op->state->my_element_count + * bf_elementbits / log(2))); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "sending bf of size %u\n"); - - CALCULATE_BF_SIZE(op->state->my_element_count, - op->spec->remote_element_count, - bf_size, - bf_elementbits); - - local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - bf_size, - bf_elementbits); - - op->spec->salt++; + "Sending Bloom filter (%u) of size %u bytes\n", + (unsigned int) bf_elementbits, + (unsigned int) bf_size); + op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + bf_size, + bf_elementbits); + op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, &iterator_bf_create, op); - // send our bloomfilter - if (GNUNET_SERVER_MAX_MESSAGE_SIZE > bf_size + sizeof (struct BFMessage)) + /* send our Bloom filter */ + chunk_size = 60 * 1024 - sizeof (struct BFMessage); + if (bf_size <= chunk_size) { - // singlepart + /* singlepart */ chunk_size = bf_size; ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); GNUNET_assert (GNUNET_SYSERR != - GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, - (char*)&msg[1], + GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf, + (char*) &msg[1], bf_size)); + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); } else { - //multipart - chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); - ev = GNUNET_MQ_msg_extra (msg, - chunk_size, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); - op->state->bf_data = (char *) GNUNET_malloc (bf_size); + /* multipart */ + bf_data = GNUNET_malloc (bf_size); GNUNET_assert (GNUNET_SYSERR != - GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, - op->state->bf_data, + GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf, + bf_data, bf_size)); - memcpy (&msg[1], op->state->bf_data, chunk_size); - op->state->bf_data_size = bf_size; + offset = 0; + while (offset < bf_size) + { + if (bf_size - chunk_size < offset) + chunk_size = bf_size - offset; + ev = GNUNET_MQ_msg_extra (msg, + chunk_size, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); + GNUNET_memcpy (&msg[1], + &bf_data[offset], + chunk_size); + offset += chunk_size; + msg->sender_element_count = htonl (op->state->my_element_count); + msg->bloomfilter_total_length = htonl (bf_size); + msg->bits_per_element = htonl (bf_elementbits); + msg->sender_mutator = htonl (op->state->salt); + msg->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, ev); + } + GNUNET_free (bf_data); } - GNUNET_CONTAINER_bloomfilter_free (local_bf); - - msg->sender_element_count = htonl (op->state->my_element_count); - msg->bloomfilter_total_length = htonl (bf_size); - msg->bloomfilter_length = htonl (chunk_size); - msg->bits_per_element = htonl (bf_elementbits); - msg->sender_mutator = htonl (op->spec->salt); - - GNUNET_MQ_send (op->mq, ev); - - if (op->state->bf_data) - send_bloomfilter_multipart (op, chunk_size); + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; } @@ -517,13 +549,15 @@ send_remaining_elements (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending done and destroy because iterator ran out\n"); + op->keep--; send_client_done_and_destroy (op); return; } ee = nxt; element = &ee->element; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending element (size %u) to client (full set)\n", + "Sending element %s:%u to client (full set)\n", + GNUNET_h2s (&ee->element_hash), element->size); GNUNET_assert (0 != op->spec->client_request_id); ev = GNUNET_MQ_msg_extra (rm, @@ -533,7 +567,7 @@ send_remaining_elements (void *cls) rm->result_status = htons (GNUNET_SET_STATUS_OK); rm->request_id = htonl (op->spec->client_request_id); rm->element_type = element->element_type; - memcpy (&rm[1], + GNUNET_memcpy (&rm[1], element->data, element->size); GNUNET_MQ_notify_sent (ev, @@ -553,6 +587,7 @@ static void send_peer_done (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; + struct IntersectionDoneMessage *idm; op->state->phase = PHASE_FINISHED; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -560,8 +595,12 @@ send_peer_done (struct Operation *op) GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); op->state->local_bf = NULL; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (op->mq, ev); + ev = GNUNET_MQ_msg (idm, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + idm->final_element_count = htonl (op->state->my_element_count); + idm->element_xor_hash = op->state->my_xor; + GNUNET_MQ_send (op->mq, + ev); } @@ -573,106 +612,58 @@ send_peer_done (struct Operation *op) static void process_bf (struct Operation *op) { - uint32_t old_elements; - uint32_t peer_elements; - - old_elements = op->state->my_element_count; - peer_elements = op->spec->remote_element_count; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", + op->state->phase, + op->spec->remote_element_count, + op->state->my_element_count, + GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); switch (op->state->phase) { case PHASE_INITIAL: - /* This is the first BF being sent, build our - initial map with filtering in place */ + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + case PHASE_COUNT_SENT: + /* This is the first BF being sent, build our initial map with + filtering in place */ op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, GNUNET_YES); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - &filtered_map_and_bf_initialization, + op->state->my_element_count = 0; + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &filtered_map_initialization, op); break; case PHASE_BF_EXCHANGE: - case PHASE_MAYBE_FINISHED: /* Update our set by reduction */ GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, &iterator_bf_reduce, op); break; - default: + case PHASE_FINISHED: GNUNET_break_op (0); fail_intersection_operation(op); + return; } - // the iterators created a new BF with salt+1 - // the peer needs this information for decoding the next BF - // this behavior can be modified at will later on. - op->spec->salt++; - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); op->state->remote_bf = NULL; - if ((0 == op->state->my_element_count) // fully disjoint - || ((op->state->phase == PHASE_MAYBE_FINISHED) // we agree on a shared set of elements - && (old_elements == op->state->my_element_count) - && (op->state->my_element_count == peer_elements))) + if ( (0 == op->state->my_element_count) || /* fully disjoint */ + ( (op->state->my_element_count == op->spec->remote_element_count) && + (0 == memcmp (&op->state->my_xor, + &op->state->other_xor, + sizeof (struct GNUNET_HashCode))) ) ) { - // In the last round we though we were finished, we now know this is correct + /* we are done */ send_peer_done (op); return; } - op->state->phase = PHASE_BF_EXCHANGE; - if (op->state->my_element_count == peer_elements) - // maybe we are finished, but we do one more round to make certain - // we don't have false positives ... - op->state->phase = PHASE_MAYBE_FINISHED; - send_bloomfilter (op); } -/** - * Handle an BF multipart message from a remote peer. - * - * @param cls the intersection operation - * @param mh the header of the message - */ -static void -handle_p2p_bf_part (void *cls, - const struct GNUNET_MessageHeader *mh) -{ - struct Operation *op = cls; - const struct BFPart *msg = (const struct BFPart *) mh; - uint32_t chunk_size; - uint32_t chunk_offset; - - chunk_size = ntohl(msg->chunk_length); - chunk_offset = ntohl(msg->chunk_offset); - - if ((NULL == op->state->bf_data) - || (op->state->bf_data_size < chunk_size + chunk_offset)) - { - // unexpected multipart chunk - GNUNET_break_op (0); - fail_intersection_operation(op); - return; - } - - memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size); - - if (op->state->bf_data_size != chunk_offset + chunk_size) - // wait for next chunk - return; - - op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], - op->state->bf_data_size, - op->state->bf_bits_per_element); - - GNUNET_free (op->state->bf_data); - op->state->bf_data = NULL; - - process_bf (op); -} - - /** * Handle an BF message from a remote peer. * @@ -684,44 +675,96 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - const struct BFMessage *msg = (const struct BFMessage *) mh; + const struct BFMessage *msg; uint32_t bf_size; uint32_t chunk_size; uint32_t bf_bits_per_element; + uint16_t msize; + msize = htons (mh->size); + if (msize < sizeof (struct BFMessage)) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + msg = (const struct BFMessage *) mh; switch (op->state->phase) { case PHASE_INITIAL: + GNUNET_break_op (0); + fail_intersection_operation (op); + break; + case PHASE_COUNT_SENT: case PHASE_BF_EXCHANGE: - case PHASE_MAYBE_FINISHED: - if (NULL == op->state->bf_data) + bf_size = ntohl (msg->bloomfilter_total_length); + bf_bits_per_element = ntohl (msg->bits_per_element); + chunk_size = msize - sizeof (struct BFMessage); + op->state->other_xor = msg->element_xor_hash; + if (bf_size == chunk_size) { - // no colliding multipart transaction going on currently - op->spec->salt = ntohl (msg->sender_mutator); - bf_size = ntohl (msg->bloomfilter_total_length); - bf_bits_per_element = ntohl (msg->bits_per_element); - chunk_size = ntohl (msg->bloomfilter_length); - op->spec->remote_element_count = ntohl(msg->sender_element_count); - if (bf_size == chunk_size) + if (NULL != op->state->bf_data) { - // single part, done here - op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], - bf_size, - bf_bits_per_element); - process_bf (op); + GNUNET_break_op (0); + fail_intersection_operation (op); return; } - - //first multipart chunk + /* single part, done here immediately */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], + bf_size, + bf_bits_per_element); + op->state->salt = ntohl (msg->sender_mutator); + op->spec->remote_element_count = ntohl (msg->sender_element_count); + process_bf (op); + return; + } + /* multipart chunk */ + if (NULL == op->state->bf_data) + { + /* first chunk, initialize */ op->state->bf_data = GNUNET_malloc (bf_size); op->state->bf_data_size = bf_size; op->state->bf_bits_per_element = bf_bits_per_element; - memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); - return; + op->state->bf_data_offset = 0; + op->state->salt = ntohl (msg->sender_mutator); + op->spec->remote_element_count = ntohl (msg->sender_element_count); + } + else + { + /* increment */ + if ( (op->state->bf_data_size != bf_size) || + (op->state->bf_bits_per_element != bf_bits_per_element) || + (op->state->bf_data_offset + chunk_size > bf_size) || + (op->state->salt != ntohl (msg->sender_mutator)) || + (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + } + GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset], + (const char*) &msg[1], + chunk_size); + op->state->bf_data_offset += chunk_size; + if (op->state->bf_data_offset == bf_size) + { + /* last chunk, run! */ + op->state->remote_bf + = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data, + bf_size, + bf_bits_per_element); + GNUNET_free (op->state->bf_data); + op->state->bf_data = NULL; + op->state->bf_data_size = 0; + process_bf (op); } + break; default: GNUNET_break_op (0); fail_intersection_operation (op); + break; } } @@ -736,16 +779,22 @@ handle_p2p_bf (void *cls, * @return #GNUNET_YES (we should continue to iterate) */ static int -initialize_map (void *cls, - const struct GNUNET_HashCode *key, - void *value) +initialize_map_unfiltered (void *cls, + const struct GNUNET_HashCode *key, + void *value) { struct ElementEntry *ee = value; struct Operation *op = cls; - if ( (op->generation_created < ee->generation_removed) && - (op->generation_created >= ee->generation_added) ) + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) return GNUNET_YES; /* element not live in operation's generation */ + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initial full initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, &ee->element_hash, @@ -755,6 +804,48 @@ initialize_map (void *cls, } +/** + * Send our element count to the peer, in case our element count is + * lower than his. + * + * @param op intersection operation + */ +static void +send_element_count (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct IntersectionElementInfoMessage *msg; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending our element count (%u)\n", + op->state->my_element_count); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + msg->sender_element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, ev); +} + + +/** + * We go first, initialize our map with all elements and + * send the first Bloom filter. + * + * @param op operation to start exchange for + */ +static void +begin_bf_exchange (struct Operation *op) +{ + op->state->phase = PHASE_BF_EXCHANGE; + op->state->my_elements + = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, + GNUNET_YES); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &initialize_map_unfiltered, + op); + send_bloomfilter (op); +} + + /** * Handle the initial `struct IntersectionElementInfoMessage` from a * remote peer. @@ -777,7 +868,12 @@ handle_p2p_element_info (void *cls, } msg = (const struct IntersectionElementInfoMessage *) mh; op->spec->remote_element_count = ntohl (msg->sender_element_count); - if ( (PHASE_INITIAL != op->state->phase) || + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received remote element count (%u), I have %u\n", + op->spec->remote_element_count, + op->state->my_element_count); + if ( ( (PHASE_INITIAL != op->state->phase) && + (PHASE_COUNT_SENT != op->state->phase) ) || (op->state->my_element_count > op->spec->remote_element_count) || (0 == op->state->my_element_count) || (0 == op->spec->remote_element_count) ) @@ -786,41 +882,8 @@ handle_p2p_element_info (void *cls, fail_intersection_operation(op); return; } - - op->state->phase = PHASE_BF_EXCHANGE; - // FIXME... -- why a new map here!? - op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (1, - GNUNET_YES); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - &initialize_map, // FIXME: filtering!? - op); - GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); - op->state->remote_bf = NULL; - - if (op->state->my_element_count == ntohl (msg->sender_element_count)) - op->state->phase = PHASE_MAYBE_FINISHED; - - send_bloomfilter (op); -} - - -/** - * Send our element count to the peer, in case our element count is lower than his - * - * @param op intersection operation - */ -static void -send_element_count (struct Operation *op) -{ - struct GNUNET_MQ_Envelope *ev; - struct IntersectionElementInfoMessage *msg; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending our element count (bf_msg)\n"); - ev = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); - msg->sender_element_count = htonl (op->state->my_element_count); - GNUNET_MQ_send (op->mq, ev); + GNUNET_break (NULL == op->state->remote_bf); + begin_bf_exchange (op); } @@ -839,9 +902,11 @@ finish_and_destroy (struct Operation *op) if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending full result set\n"); + "Sending full result set (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); op->state->full_result_iter = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + op->keep++; send_remaining_elements (op); return; } @@ -849,6 +914,41 @@ finish_and_destroy (struct Operation *op) } +/** + * Remove all elements from our hashmap. + * + * @param cls closure with the `struct Operation *` + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +filter_all (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct ElementEntry *ee = value; + + GNUNET_break (0 < op->state->my_element_count); + op->state->my_element_count--; + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Final reduction of my_elements, removing %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, + &ee->element_hash, + ee)); + send_client_removed_element (op, + &ee->element); + return GNUNET_YES; +} + + /** * Handle a done message from a remote peer * @@ -860,17 +960,46 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; + const struct IntersectionDoneMessage *idm; - if ( (op->state->phase = PHASE_FINISHED) || - (op->state->phase = PHASE_MAYBE_FINISHED) ) + if (PHASE_BF_EXCHANGE != op->state->phase) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got final DONE\n"); - finish_and_destroy (op); + /* wrong phase to conclude? FIXME: Or should we allow this + if the other peer has _initially_ already an empty set? */ + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) + { + GNUNET_break_op (0); + fail_intersection_operation (op); + return; + } + idm = (const struct IntersectionDoneMessage *) mh; + if (0 == ntohl (idm->final_element_count)) + { + /* other peer determined empty set is the intersection, + remove all elements */ + GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, + &filter_all, + op); + } + if ( (op->state->my_element_count != ntohl (idm->final_element_count)) || + (0 != memcmp (&op->state->my_xor, + &idm->element_xor_hash, + sizeof (struct GNUNET_HashCode))) ) + { + /* Other peer thinks we are done, but we disagree on the result! */ + GNUNET_break_op (0); + fail_intersection_operation (op); return; } - GNUNET_break_op (0); - fail_intersection_operation (op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got IntersectionDoneMessage, have %u elements in intersection\n", + op->state->my_element_count); + op->state->phase = PHASE_FINISHED; + finish_and_destroy (op); } @@ -892,11 +1021,10 @@ intersection_evaluate (struct Operation *op, op->state = GNUNET_new (struct OperationState); /* we started the operation, thus we have to send the operation request */ op->state->phase = PHASE_INITIAL; - op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); op->state->my_element_count = op->spec->set->state->current_set_element_count; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initiating intersection operation evaluation"); + "Initiating intersection operation evaluation\n"); ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, opaque_context); @@ -908,12 +1036,10 @@ intersection_evaluate (struct Operation *op, return; } msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); - msg->app_id = op->spec->app_id; - // FIXME: where does this 'salt' come from? - msg->salt = htonl (op->spec->salt); msg->element_count = htonl (op->state->my_element_count); GNUNET_MQ_send (op->mq, ev); + op->state->phase = PHASE_COUNT_SENT; if (NULL != opaque_context) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent op request with context message\n"); @@ -935,29 +1061,24 @@ intersection_accept (struct Operation *op) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Accepting set intersection operation\n"); op->state = GNUNET_new (struct OperationState); + op->state->phase = PHASE_INITIAL; op->state->my_element_count = op->spec->set->state->current_set_element_count; op->state->my_elements - = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, - op->spec->remote_element_count), - GNUNET_YES); + = GNUNET_CONTAINER_multihashmap_create + (GNUNET_MIN (op->state->my_element_count, + op->spec->remote_element_count), + GNUNET_YES); if (op->spec->remote_element_count < op->state->my_element_count) { /* If the other peer (Alice) has fewer elements than us (Bob), we just send the count as Alice should send the first BF */ - op->state->phase = PHASE_INITIAL; send_element_count (op); + op->state->phase = PHASE_COUNT_SENT; return; } /* We have fewer elements, so we start with the BF */ - op->state->phase = PHASE_BF_EXCHANGE; - op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - BLOOMFILTER_SIZE, - GNUNET_CONSTANTS_BLOOMFILTER_K); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, - &initialize_map, - op); - send_bloomfilter (op); + begin_bf_exchange (op); } @@ -987,10 +1108,7 @@ intersection_handle_p2p_message (struct Operation *op, case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: handle_p2p_bf (op, mh); break; - case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART: - handle_p2p_bf_part (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: + case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: handle_p2p_done (op, mh); break; default: