X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set_union.c;h=e22465fd3c0b8dac29ee8b3a3a11993d16e0671f;hb=bdd2a2f82789160f7cd1d5f6d25bdcd75a90937e;hp=6d9658ee5823bfedf2a61a8e413a8d405133a185;hpb=68403fa780bf94ace2ebc13c2c09463cbbc0b57c;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 6d9658ee5..e22465fd3 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1,10 +1,10 @@ /* This file is part of GNUnet - (C) 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2013-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -14,38 +14,42 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ - /** - * @file set/gnunet-service-set.c + * @file set/gnunet-service-set_union.c + * @brief two-peer set operations * @author Florian Dold */ - - +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_statistics_service.h" #include "gnunet-service-set.h" -#include "gnunet_container_lib.h" -#include "gnunet_crypto_lib.h" #include "ibf.h" -#include "strata_estimator.h" -#include "set_protocol.h" +#include "gnunet-service-set_union_strata_estimator.h" +#include "gnunet-service-set_protocol.h" #include +#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__) + + /** * Number of IBFs in a strata estimator. */ #define SE_STRATA_COUNT 32 + /** * Size of the IBFs in the strata estimator. */ #define SE_IBF_SIZE 80 + /** - * hash num parameter for the difference digests and strata estimators + * The hash num parameter for the difference digests and strata estimators. */ -#define SE_IBF_HASH_NUM 3 +#define SE_IBF_HASH_NUM 4 /** * Number of buckets that can be transmitted in one message. @@ -57,112 +61,96 @@ * Choose this value so that computing the IBF is still cheaper * than transmitting all values. */ -#define MAX_IBF_ORDER (16) - +#define MAX_IBF_ORDER (20) /** - * Current phase we are in for a union operation + * Number of buckets used in the ibf per estimated + * difference. */ -enum UnionOperationPhase -{ - /** - * We sent the request message, and expect a strata estimator - */ - PHASE_EXPECT_SE, - /** - * We sent the strata estimator, and expect an IBF - */ - PHASE_EXPECT_IBF, - /** - * We know what type of IBF the other peer wants to send us, - * and expect the remaining parts - */ - PHASE_EXPECT_IBF_CONT, - /** - * We are sending request and elements, - * and thus only expect elements from the other peer. - */ - PHASE_EXPECT_ELEMENTS, - /** - * We are expecting elements and requests, and send - * requested elements back to the other peer. - */ - PHASE_EXPECT_ELEMENTS_AND_REQUESTS, - /** - * The protocol is over. - * Results may still have to be sent to the client. - */ - PHASE_FINISHED -}; +#define IBF_ALPHA 4 /** - * State of an evaluate operation - * with another peer. + * Current phase we are in for a union operation. */ -struct UnionEvaluateOperation +enum UnionOperationPhase { /** - * Local set the operation is evaluated on. + * We sent the request message, and expect a strata estimator. */ - struct Set *set; + PHASE_EXPECT_SE, /** - * Peer with the remote set + * We sent the strata estimator, and expect an IBF. This phase is entered once + * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. + * + * XXX: could use better wording. + * + * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS */ - struct GNUNET_PeerIdentity peer; + PHASE_EXPECT_IBF, /** - * Application-specific identifier + * Continuation for multi part IBFs. */ - struct GNUNET_HashCode app_id; + PHASE_EXPECT_IBF_CONT, /** - * Context message, given to us - * by the client, may be NULL. + * We are decoding an IBF. */ - struct GNUNET_MessageHeader *context_msg; + PHASE_INVENTORY_ACTIVE, /** - * Stream socket connected to the other peer + * The other peer is decoding the IBF we just sent. */ - struct GNUNET_STREAM_Socket *socket; + PHASE_INVENTORY_PASSIVE, /** - * Message queue for the peer on the other - * end + * The protocol is almost finished, but we still have to flush our message + * queue and/or expect some elements. */ - struct GNUNET_MQ_MessageQueue *mq; + PHASE_FINISH_CLOSING, /** - * Request ID to multiplex set operations to - * the client inhabiting the set. - */ - uint32_t request_id; + * In the penultimate phase, + * we wait until all our demands + * are satisfied. Then we send a done + * message, and wait for another done message.*/ + PHASE_FINISH_WAITING, /** - * Number of ibf buckets received - */ - unsigned int ibf_buckets_received; + * In the ultimate phase, we wait until + * our demands are satisfied and then + * quit (sending another DONE message). */ + PHASE_DONE +}; + +/** + * State of an evaluate operation with another peer. + */ +struct OperationState +{ /** * Copy of the set's strata estimator at the time of - * creation of this operation + * creation of this operation. */ struct StrataEstimator *se; /** - * The ibf we currently receive + * The IBF we currently receive. */ struct InvertibleBloomFilter *remote_ibf; /** - * IBF of the set's element. + * The IBF with the local set's element. */ struct InvertibleBloomFilter *local_ibf; /** * Maps IBF-Keys (specific to the current salt) to elements. + * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. + * Colliding IBF-Keys are linked. */ struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; @@ -172,80 +160,34 @@ struct UnionEvaluateOperation enum UnionOperationPhase phase; /** - * Salt to use for this operation. - */ - uint16_t salt; - - /** - * Generation in which the operation handle - * was created. - */ - unsigned int generation_created; - - /** - * Evaluate operations are held in - * a linked list. + * Did we send the client that we are done? */ - struct UnionEvaluateOperation *next; - - /** - * Evaluate operations are held in - * a linked list. - */ - struct UnionEvaluateOperation *prev; -}; - + int client_done_sent; -/** - * Information about the element in a set. - * All elements are stored in a hash-table - * from their hash-code to their 'struct Element', - * so that the remove and add operations are reasonably - * fast. - */ -struct ElementEntry -{ /** - * The actual element. The data for the element - * should be allocated at the end of this struct. + * Number of ibf buckets already received into the @a remote_ibf. */ - struct GNUNET_SET_Element element; - - /** - * Hash of the element. - * Will be used to derive the different IBF keys - * for different salts. - */ - struct GNUNET_HashCode element_hash; - - /** - * Generation the element was added by the client. - * Operations of earlier generations will not consider the element. - */ - unsigned int generation_added; + unsigned int ibf_buckets_received; /** - * GNUNET_YES if the element has been removed in some generation. + * Hashes for elements that we have demanded from the other peer. */ - int removed; + struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes; /** - * Generation the element was removed by the client. - * Operations of later generations will not consider the element. - * Only valid if is_removed is GNUNET_YES. + * Salt that we're using for sending IBFs */ - unsigned int generation_removed; + uint32_t salt_send; /** - * GNUNET_YES if the element is a remote element, and does not belong - * to the operation's set. + * Salt for the IBF we've received and that we're currently decoding. */ - int remote; + uint32_t salt_receive; }; /** - * Entries in the key-to-element map of the union set. + * The key entry is used to associate an ibf key with an element. */ struct KeyEntry { @@ -255,17 +197,15 @@ struct KeyEntry struct IBF_Key ibf_key; /** - * The actual element associated with the key + * The actual element associated with the key. + * + * Only owned by the union operation if element->operation + * is #GNUNET_YES. */ struct ElementEntry *element; - - /** - * Element that collides with this element - * on the ibf key - */ - struct KeyEntry *next_colliding; }; + /** * Used as a closure for sending elements * with a specific IBF key. @@ -282,14 +222,14 @@ struct SendElementClosure * Operation for which the elements * should be sent. */ - struct UnionEvaluateOperation *eo; + struct Operation *op; }; /** * Extra state required for efficient set union. */ -struct UnionState +struct SetState { /** * The strata estimator is only generated once for @@ -298,81 +238,18 @@ struct UnionState * salt=0. */ struct StrataEstimator *se; - - /** - * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'. - */ - struct GNUNET_CONTAINER_MultiHashMap *elements; - - /** - * Evaluate operations are held in - * a linked list. - */ - struct UnionEvaluateOperation *ops_head; - - /** - * Evaluate operations are held in - * a linked list. - */ - struct UnionEvaluateOperation *ops_tail; - - /** - * Current generation, that is, number of - * previously executed operations on this set - */ - unsigned int current_generation; }; - /** - * Iterator over hash map entries. + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. * * @param cls closure * @param key current key code * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -destroy_elements_iterator (void *cls, - const struct GNUNET_HashCode * key, - void *value) -{ - struct ElementEntry *ee = value; - - GNUNET_free (ee); - return GNUNET_YES; -} - - -/** - * Destroy the elements belonging to a union set. - * - * @param us union state that contains the elements - */ -static void -destroy_elements (struct UnionState *us) -{ - if (NULL == us->elements) - return; - GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL); - GNUNET_CONTAINER_multihashmap_destroy (us->elements); - us->elements = NULL; -} - - - -/** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. */ static int destroy_key_to_element_iter (void *cls, @@ -380,71 +257,63 @@ destroy_key_to_element_iter (void *cls, void *value) { struct KeyEntry *k = value; - - while (NULL != k) + + GNUNET_assert (NULL != k); + if (GNUNET_YES == k->element->remote) { - struct KeyEntry *k_tmp = k; - k = k->next_colliding; - GNUNET_free (k_tmp); + GNUNET_free (k->element); + k->element = NULL; } + GNUNET_free (k); return GNUNET_YES; } /** - * Destroy a union operation, and free all resources - * associated with it. + * Destroy the union operation. Only things specific to the union + * operation are destroyed. * - * @param eo the union operation to destroy + * @param op union operation to destroy */ static void -destroy_union_operation (struct UnionEvaluateOperation *eo) +union_op_cancel (struct Operation *op) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); - - if (NULL != eo->mq) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op\n"); + /* check if the op was canceled twice */ + GNUNET_assert (NULL != op->state); + if (NULL != op->state->remote_ibf) { - GNUNET_MQ_destroy (eo->mq); - eo->mq = NULL; + ibf_destroy (op->state->remote_ibf); + op->state->remote_ibf = NULL; } - - if (NULL != eo->socket) - { - GNUNET_STREAM_close (eo->socket); - eo->socket = NULL; - } - if (NULL != eo->remote_ibf) + if (NULL != op->state->demanded_hashes) { - ibf_destroy (eo->remote_ibf); - eo->remote_ibf = NULL; + GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes); + op->state->demanded_hashes = NULL; } - if (NULL != eo->local_ibf) + if (NULL != op->state->local_ibf) { - ibf_destroy (eo->local_ibf); - eo->local_ibf = NULL; + ibf_destroy (op->state->local_ibf); + op->state->local_ibf = NULL; } - if (NULL != eo->se) + if (NULL != op->state->se) { - strata_estimator_destroy (eo->se); - eo->se = NULL; + strata_estimator_destroy (op->state->se); + op->state->se = NULL; } - if (NULL != eo->key_to_element) + if (NULL != op->state->key_to_element) { - GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); - GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); - eo->key_to_element = NULL; + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &destroy_key_to_element_iter, + NULL); + GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element); + op->state->key_to_element = NULL; } - - GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, - eo->set->state.u->ops_tail, - eo); - GNUNET_free (eo); - - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); - - - /* FIXME: do a garbage collection of the set generations */ + GNUNET_free (op->state); + op->state = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying union op done\n"); } @@ -452,140 +321,154 @@ destroy_union_operation (struct UnionEvaluateOperation *eo) * Inform the client that the union operation has failed, * and proceed to destroy the evaluate operation. * - * @param eo the union operation to fail + * @param op the union operation to fail */ static void -fail_union_operation (struct UnionEvaluateOperation *eo) +fail_union_operation (struct Operation *op) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + LOG (GNUNET_ERROR_TYPE_ERROR, + "union operation failed\n"); + ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->request_id = htonl (eo->request_id); - GNUNET_MQ_send (eo->set->client_mq, mqm); - destroy_union_operation (eo); + msg->request_id = htonl (op->spec->client_request_id); + msg->element_type = htons (0); + GNUNET_MQ_send (op->spec->set->client_mq, ev); + _GSS_operation_destroy (op, GNUNET_YES); } /** - * Derive the IBF key from a hash code and + * Derive the IBF key from a hash code and * a salt. * * @param src the hash code - * @param salt salt to use * @return the derived IBF key */ static struct IBF_Key -get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) +get_ibf_key (const struct GNUNET_HashCode *src) { struct IBF_Key key; + uint16_t salt = 0; - GNUNET_CRYPTO_hkdf (&key, sizeof (key), - GCRY_MD_SHA512, GCRY_MD_SHA256, - src, sizeof *src, - &salt, sizeof (salt), - NULL, 0); + GNUNET_CRYPTO_kdf (&key, sizeof (key), + src, sizeof *src, + &salt, sizeof (salt), + NULL, 0); return key; } /** - * Send a request for the evaluate operation to a remote peer + * Iterator over the mapping from IBF keys to element entries. Checks if we + * have an element with a given GNUNET_HashCode. * - * @param eo operation with the other peer + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should search further, + * #GNUNET_NO if we've found the element. */ -static void -send_operation_request (struct UnionEvaluateOperation *eo) +static int +op_has_element_iterator (void *cls, + uint32_t key, + void *value) { - struct GNUNET_MQ_Message *mqm; - struct OperationRequestMessage *msg; - - mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); - - if (NULL == mqm) - { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (eo->set->client); - return; - } - msg->operation = htons (GNUNET_SET_OPERATION_UNION); - msg->app_id = eo->app_id; - GNUNET_MQ_send (eo->mq, mqm); - - if (NULL != eo->context_msg) - { - GNUNET_free (eo->context_msg); - eo->context_msg = NULL; - } + struct GNUNET_HashCode *element_hash = cls; + struct KeyEntry *k = value; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); + GNUNET_assert (NULL != k); + if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, + element_hash)) + return GNUNET_NO; + return GNUNET_YES; } /** - * Iterator to create the mapping between ibf keys - * and element entries. + * Determine whether the given element is already in the operation's element + * set. * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. + * @param op operation that should be tested for 'element_hash' + * @param element_hash hash of the element to look for + * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise */ static int -insert_element_iterator (void *cls, - uint32_t key, - void *value) +op_has_element (struct Operation *op, + const struct GNUNET_HashCode *element_hash) { - struct KeyEntry *const new_k = cls; - struct KeyEntry *old_k = value; + int ret; + struct IBF_Key ibf_key; - GNUNET_assert (NULL != old_k); - do - { - if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) - { - new_k->next_colliding = old_k->next_colliding; - old_k->next_colliding = new_k; - return GNUNET_NO; - } - old_k = old_k->next_colliding; - } while (NULL != old_k); - return GNUNET_YES; + ibf_key = get_ibf_key (element_hash); + ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + (uint32_t) ibf_key.key_val, + op_has_element_iterator, + (void *) element_hash); + + /* was the iteration aborted because we found the element? */ + if (GNUNET_SYSERR == ret) + return GNUNET_YES; + return GNUNET_NO; } /** * Insert an element into the union operation's - * key-to-element mapping + * key-to-element mapping. Takes ownership of 'ee'. + * Note that this does not insert the element in the set, + * only in the operation's key-element mapping. + * This is done to speed up re-tried operations, if some elements + * were transmitted, and then the IBF fails to decode. + * + * XXX: clarify ownership, doesn't sound right. * - * @param the union operation + * @param op the union operation * @param ee the element entry */ static void -insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) +op_register_element (struct Operation *op, + struct ElementEntry *ee) { - int ret; struct IBF_Key ibf_key; struct KeyEntry *k; - ibf_key = get_ibf_key (&ee->element_hash, eo->salt); + ibf_key = get_ibf_key (&ee->element_hash); k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; - ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, - insert_element_iterator, k); + k, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); +} - /* was the element inserted into a colliding bucket? */ - if (GNUNET_SYSERR == ret) - return; - GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); +static void +salt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + /* rotate ibf key */ + x = (x >> s) | (x << (64 - s)); + k_out->key_val = x; +} + + +static void +unsalt_key (const struct IBF_Key *k_in, + uint32_t salt, + struct IBF_Key *k_out) +{ + int s = salt % 64; + uint64_t x = k_in->key_val; + x = (x << s) | (x >> (64 - s)); + k_out->key_val = x; } @@ -601,10 +484,17 @@ prepare_ibf_iterator (void *cls, uint32_t key, void *value) { - struct InvertibleBloomFilter *ibf = cls; + struct Operation *op = cls; struct KeyEntry *ke = value; - - ibf_insert (ibf, ke->ibf_key); + struct IBF_Key salted_key; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] inserting %lx (hash %s) into ibf\n", + (void *) op, + (unsigned long) ke->ibf_key.key_val, + GNUNET_h2s (&ke->element->element_hash)); + salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); + ibf_insert (op->state->local_ibf, salted_key); return GNUNET_YES; } @@ -613,27 +503,28 @@ prepare_ibf_iterator (void *cls, * Iterator for initializing the * key-to-element mapping of a union operation * - * @param cls the union operation - * @param key unised - * @param value the element entry to insert + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) */ static int init_key_to_element_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { - struct UnionEvaluateOperation *eo = cls; - struct ElementEntry *e = value; + struct Operation *op = cls; + struct ElementEntry *ee = value; /* make sure that the element belongs to the set at the time * of creating the operation */ - if ( (e->generation_added > eo->generation_created) || - ( (GNUNET_YES == e->removed) && - (e->generation_removed < eo->generation_created))) + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) return GNUNET_YES; - insert_element (eo, e); + GNUNET_assert (GNUNET_NO == ee->remote); + + op_register_element (op, ee); return GNUNET_YES; } @@ -642,50 +533,78 @@ init_key_to_element_iterator (void *cls, * Create an ibf with the operation's elements * of the specified size * - * @param eo the union operation + * @param op the union operation * @param size size of the ibf to create + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) +static int +prepare_ibf (struct Operation *op, + uint32_t size) { - if (NULL == eo->key_to_element) + if (NULL == op->state->key_to_element) { unsigned int len; - len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); - eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, - init_key_to_element_iterator, eo); + + len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); + op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + init_key_to_element_iterator, op); + } + if (NULL != op->state->local_ibf) + ibf_destroy (op->state->local_ibf); + op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); + if (NULL == op->state->local_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate local IBF\n"); + return GNUNET_SYSERR; } - if (NULL != eo->local_ibf) - ibf_destroy (eo->local_ibf); - eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); - GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, - prepare_ibf_iterator, eo->local_ibf); + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &prepare_ibf_iterator, + op); + return GNUNET_OK; } /** * Send an ibf of appropriate size. * - * @param eo the union operation + * Fragments the IBF into multiple messages if necessary. + * + * @param op the union operation * @param ibf_order order of the ibf to send, size=2^order + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) +static int +send_ibf (struct Operation *op, + uint16_t ibf_order) { unsigned int buckets_sent = 0; struct InvertibleBloomFilter *ibf; - prepare_ibf (eo, 1<local_ibf; + ibf = op->state->local_ibf; while (buckets_sent < (1 << ibf_order)) { unsigned int buckets_in_message; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *ev; struct IBFMessage *msg; buckets_in_message = (1 << ibf_order) - buckets_sent; @@ -693,37 +612,66 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) buckets_in_message = MAX_BUCKETS_PER_MESSAGE; - mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, - GNUNET_MESSAGE_TYPE_SET_P2P_IBF); + ev = GNUNET_MQ_msg_extra (msg, + buckets_in_message * IBF_BUCKET_SIZE, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); + msg->reserved1 = 0; + msg->reserved2 = 0; msg->order = ibf_order; - msg->offset = htons (buckets_sent); + msg->offset = htonl (buckets_sent); + msg->salt = htonl (op->state->salt_send); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; - GNUNET_MQ_send (eo->mq, mqm); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "ibf chunk size %u, %u/%u sent\n", + buckets_in_message, + buckets_sent, + 1<mq, ev); } - eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; + /* The other peer must decode the IBF, so + * we're passive. */ + op->state->phase = PHASE_INVENTORY_PASSIVE; + return GNUNET_OK; } /** * Send a strata estimator to the remote peer. * - * @param eo the union operation with the remote peer + * @param op the union operation with the remote peer */ static void -send_strata_estimator (struct UnionEvaluateOperation *eo) +send_strata_estimator (struct Operation *op) { - struct GNUNET_MQ_Message *mqm; + const struct StrataEstimator *se = op->state->se; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_MessageHeader *strata_msg; - - mqm = GNUNET_MQ_msg_header_extra (strata_msg, - SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, - GNUNET_MESSAGE_TYPE_SET_P2P_SE); - strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); - GNUNET_MQ_send (eo->mq, mqm); - eo->phase = PHASE_EXPECT_IBF; + char *buf; + size_t len; + uint16_t type; + + buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + len = strata_estimator_write (op->state->se, + buf); + if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; + else + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; + ev = GNUNET_MQ_msg_header_extra (strata_msg, + len, + type); + GNUNET_memcpy (&strata_msg[1], + buf, + len); + GNUNET_free (buf); + GNUNET_MQ_send (op->mq, + ev); + op->state->phase = PHASE_EXPECT_IBF; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent SE, expecting IBF\n"); } @@ -740,7 +688,8 @@ get_order_from_difference (unsigned int diff) unsigned int ibf_order; ibf_order = 2; - while ((1< MAX_IBF_ORDER) ibf_order = MAX_IBF_ORDER; @@ -751,37 +700,85 @@ get_order_from_difference (unsigned int diff) /** * Handle a strata estimator from a remote peer * - * @param the union operation + * @param cls the union operation * @param mh the message + * @param is_compressed #GNUNET_YES if the estimator is compressed + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_strata_estimator (void *cls, + const struct GNUNET_MessageHeader *mh, + int is_compressed) { - struct UnionEvaluateOperation *eo = cls; + struct Operation *op = cls; struct StrataEstimator *remote_se; int diff; + size_t len; + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (mh->size), + GNUNET_NO); - if (eo->phase != PHASE_EXPECT_SE) + if (op->state->phase != PHASE_EXPECT_SE) { - fail_union_operation (eo); + fail_union_operation (op); GNUNET_break (0); - return; + return GNUNET_SYSERR; + } + len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); + if ( (GNUNET_NO == is_compressed) && + (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) + { + fail_union_operation (op); + GNUNET_break (0); + return GNUNET_SYSERR; } - remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, + remote_se = strata_estimator_create (SE_STRATA_COUNT, + SE_IBF_SIZE, SE_IBF_HASH_NUM); - strata_estimator_read (&mh[1], remote_se); - GNUNET_assert (NULL != eo->se); - diff = strata_estimator_difference (remote_se, eo->se); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff); + if (NULL == remote_se) + { + /* insufficient resources, fail */ + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + strata_estimator_read (&mh[1], + len, + is_compressed, + remote_se)) + { + /* decompression failed */ + fail_union_operation (op); + strata_estimator_destroy (remote_se); + return GNUNET_SYSERR; + } + GNUNET_assert (NULL != op->state->se); + diff = strata_estimator_difference (remote_se, + op->state->se); strata_estimator_destroy (remote_se); - strata_estimator_destroy (eo->se); - eo->se = NULL; - send_ibf (eo, get_order_from_difference (diff)); + strata_estimator_destroy (op->state->se); + op->state->se = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got se diff=%d, using ibf size %d\n", + diff, + 1<ibf_key; - struct UnionEvaluateOperation *eo = sec->eo; + struct Operation *op = sec->op; struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_MessageHeader *mh; - if (ke->ibf_key.key_val != ibf_key.key_val) + /* Detect 32-bit key collision for the 64-bit IBF keys. */ + if (ke->ibf_key.key_val != sec->ibf_key.key_val) return GNUNET_YES; - while (NULL != ke) - { - const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Message *mqm; - struct GNUNET_MessageHeader *mh; - GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (NULL == mqm) - { - /* element too large */ - GNUNET_break (0); - continue; - } - memcpy (&mh[1], element->data, element->size); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); - GNUNET_MQ_send (eo->mq, mqm); - ke = ke->next_colliding; - } - return GNUNET_NO; + ev = GNUNET_MQ_msg_header_extra (mh, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER); + + GNUNET_assert (NULL != ev); + *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] sending element offer (%s) to peer\n", + (void *) op, + GNUNET_h2s (&ke->element->element_hash)); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; } + /** - * Send all elements that have the specified IBF key - * to the remote peer of the union operation + * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key. * - * @param eo union operation + * @param op union operation * @param ibf_key IBF key of interest */ static void -send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key) +send_offers_for_key (struct Operation *op, + struct IBF_Key ibf_key) { struct SendElementClosure send_cls; send_cls.ibf_key = ibf_key; - send_cls.eo = eo; - GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, - &send_element_iterator, &send_cls); + send_cls.op = op; + (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, + (uint32_t) ibf_key.key_val, + &send_offers_iterator, + &send_cls); } /** * Decode which elements are missing on each side, and - * send the appropriate elemens and requests + * send the appropriate offers and inquiries. * - * @param eo union operation + * @param op union operation + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ -static void -decode_and_send (struct UnionEvaluateOperation *eo) +static int +decode_and_send (struct Operation *op) { struct IBF_Key key; + struct IBF_Key last_key; int side; + unsigned int num_decoded; struct InvertibleBloomFilter *diff_ibf; - GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); + GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); + + if (GNUNET_OK != + prepare_ibf (op, op->state->remote_ibf->size)) + { + GNUNET_break (0); + /* allocation failed */ + return GNUNET_SYSERR; + } + diff_ibf = ibf_dup (op->state->local_ibf); + ibf_subtract (diff_ibf, op->state->remote_ibf); - prepare_ibf (eo, eo->remote_ibf->size); - diff_ibf = ibf_dup (eo->local_ibf); - ibf_subtract (diff_ibf, eo->remote_ibf); + ibf_destroy (op->state->remote_ibf); + op->state->remote_ibf = NULL; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoding IBF (size=%u)\n", + diff_ibf->size); + + num_decoded = 0; + key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */ while (1) { int res; + int cycle_detected = GNUNET_NO; + + last_key = key; res = ibf_decode (diff_ibf, &side, &key); - if (GNUNET_SYSERR == res) + if (res == GNUNET_OK) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "decoded ibf key %lx\n", + (unsigned long) key.key_val); + num_decoded += 1; + if ( (num_decoded > diff_ibf->size) || + ( (num_decoded > 1) && + (last_key.key_val == key.key_val) ) ) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "detected cyclic ibf (decoded %u/%u)\n", + num_decoded, + diff_ibf->size); + cycle_detected = GNUNET_YES; + } + } + if ( (GNUNET_SYSERR == res) || + (GNUNET_YES == cycle_detected) ) { int next_order; next_order = 0; @@ -875,104 +910,205 @@ decode_and_send (struct UnionEvaluateOperation *eo) next_order++; if (next_order <= MAX_IBF_ORDER) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "decoding failed, sending larger ibf (size %u)\n", - 1<state->salt_send++; + if (GNUNET_OK != + send_ibf (op, next_order)) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; + } } else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "set union failed: reached ibf limit\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of failed union operations (too large)", + 1, + GNUNET_NO); + // XXX: Send the whole set, element-by-element + LOG (GNUNET_ERROR_TYPE_ERROR, + "set union failed: reached ibf limit\n"); + fail_union_operation (op); + ibf_destroy (diff_ibf); + return GNUNET_SYSERR; } break; } if (GNUNET_NO == res) { - struct GNUNET_MQ_Message *mqm; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); - mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (eo->mq, mqm); + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "transmitted all values, sending DONE\n"); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, ev); + /* We now wait until we get a DONE message back + * and then wait for our MQ to be flushed and all our + * demands be delivered. */ break; } if (1 == side) { - send_elements_for_key (eo, key); + struct IBF_Key unsalted_key; + unsalt_key (&key, op->state->salt_receive, &unsalted_key); + send_offers_for_key (op, unsalted_key); + } + else if (-1 == side) + { + struct GNUNET_MQ_Envelope *ev; + struct InquiryMessage *msg; + + /* It may be nice to merge multiple requests, but with CADET's corking it is not worth + * the effort additional complexity. */ + ev = GNUNET_MQ_msg_extra (msg, + sizeof (struct IBF_Key), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY); + msg->salt = htonl (op->state->salt_receive); + GNUNET_memcpy (&msg[1], + &key, + sizeof (struct IBF_Key)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element inquiry for IBF key %lx\n", + (unsigned long) key.key_val); + GNUNET_MQ_send (op->mq, ev); } else { - struct GNUNET_MQ_Message *mqm; - struct GNUNET_MessageHeader *msg; - - /* FIXME: before sending the request, check if we may just have the element */ - /* FIXME: merge multiple requests */ - mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), - GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); - *(struct IBF_Key *) &msg[1] = key; - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_assert (0); } } ibf_destroy (diff_ibf); + return GNUNET_OK; } /** * Handle an IBF message from a remote peer. * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * * @param cls the union operation * @param mh the header of the message + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise */ -static void -handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_ibf (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct UnionEvaluateOperation *eo = cls; - struct IBFMessage *msg = (struct IBFMessage *) mh; + struct Operation *op = cls; + const struct IBFMessage *msg; unsigned int buckets_in_message; - if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || - (eo->phase == PHASE_EXPECT_IBF) ) + if (ntohs (mh->size) < sizeof (struct IBFMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + msg = (const struct IBFMessage *) mh; + if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || + (op->state->phase == PHASE_EXPECT_IBF) ) { - eo->phase = PHASE_EXPECT_IBF_CONT; - GNUNET_assert (NULL == eo->remote_ibf); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<order); - eo->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); - if (0 != ntohs (msg->offset)) + op->state->phase = PHASE_EXPECT_IBF_CONT; + GNUNET_assert (NULL == op->state->remote_ibf); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new ibf of size %u\n", + 1 << msg->order); + op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); + if (NULL == op->state->remote_ibf) { - GNUNET_break (0); - fail_union_operation (eo); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse remote IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } + op->state->ibf_buckets_received = 0; + if (0 != ntohl (msg->offset)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; } } - else if (eo->phase == PHASE_EXPECT_IBF_CONT) + else if (op->state->phase == PHASE_EXPECT_IBF_CONT) { - if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || - (1<order != eo->remote_ibf->size) ) + if (ntohl (msg->offset) != op->state->ibf_buckets_received) { - GNUNET_break (0); - fail_union_operation (eo); - return; + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (1<order != op->state->remote_ibf->size) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + if (ntohl (msg->salt) != op->state->salt_receive) + { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; } } + else + { + GNUNET_assert (0); + } buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; - if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) + if (0 == buckets_in_message) { - GNUNET_break (0); - fail_union_operation (eo); - return; + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; } - - ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); - eo->ibf_buckets_received += buckets_in_message; - if (eo->ibf_buckets_received == eo->remote_ibf->size) + if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) { + GNUNET_break_op (0); + fail_union_operation (op); + return GNUNET_SYSERR; + } + + GNUNET_assert (NULL != op->state->remote_ibf); + + ibf_read_slice (&msg[1], + op->state->ibf_buckets_received, + buckets_in_message, + op->state->remote_ibf); + op->state->ibf_buckets_received += buckets_in_message; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n"); - eo->phase = PHASE_EXPECT_ELEMENTS; - decode_and_send (eo); + if (op->state->ibf_buckets_received == op->state->remote_ibf->size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "received full ibf\n"); + op->state->phase = PHASE_INVENTORY_ACTIVE; + if (GNUNET_OK != + decode_and_send (op)) + { + /* Internal error, best we can do is shut down */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to decode IBF, closing connection\n"); + return GNUNET_SYSERR; + } } + return GNUNET_OK; } @@ -980,74 +1116,95 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) * Send a result message to the client indicating * that there is a new element. * - * @param eo union operation + * @param op union operation * @param element element to send + * @param status status to send with the new element */ static void -send_client_element (struct UnionEvaluateOperation *eo, - struct GNUNET_SET_Element *element) +send_client_element (struct Operation *op, + struct GNUNET_SET_Element *element, + int status) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_assert (0 != eo->request_id); - mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); - if (NULL == mqm) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sending element (size %u) to client\n", + element->size); + GNUNET_assert (0 != op->spec->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) { - GNUNET_MQ_discard (mqm); + GNUNET_MQ_discard (ev); GNUNET_break (0); return; } - rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (eo->request_id); - memcpy (&rm[1], element->data, element->size); - GNUNET_MQ_send (eo->set->client_mq, mqm); + rm->result_status = htons (status); + rm->request_id = htonl (op->spec->client_request_id); + rm->element_type = element->element_type; + GNUNET_memcpy (&rm[1], element->data, element->size); + GNUNET_MQ_send (op->spec->set->client_mq, ev); } /** - * Completion callback for shutdown + * Signal to the client that the operation has finished and + * destroy the operation. * - * @param cls the closure from GNUNET_STREAM_shutdown call - * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, - * SHUT_RDWR) + * @param cls operation to destroy */ -/* -static void -stream_shutdown_cb (void *cls, - int operation) +static void +send_done_and_destroy (void *cls) { - //struct UnionEvaluateOperation *eo = cls; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n"); + struct Operation *op = cls; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; - // destroy_union_operation (eo); + ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (op->spec->client_request_id); + rm->result_status = htons (GNUNET_SET_STATUS_DONE); + rm->element_type = htons (0); + GNUNET_MQ_send (op->spec->set->client_mq, ev); + /* Will also call the union-specific cancel function. */ + _GSS_operation_destroy (op, GNUNET_YES); } -*/ -/** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. - * - * @param eo union operation - * @param element element to send - */ static void -send_client_done_and_destroy (struct UnionEvaluateOperation *eo) +maybe_finish (struct Operation *op) { - struct GNUNET_MQ_Message *mqm; - struct GNUNET_SET_ResultMessage *rm; + unsigned int num_demanded; - GNUNET_assert (0 != eo->request_id); - mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); - rm->request_id = htonl (eo->request_id); - rm->result_status = htons (GNUNET_SET_STATUS_DONE); - GNUNET_MQ_send (eo->set->client_mq, mqm); + num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes); + + if (PHASE_FINISH_WAITING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_WAITING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) + { + struct GNUNET_MQ_Envelope *ev; - // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo); + op->state->phase = PHASE_DONE; + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); + GNUNET_MQ_send (op->mq, ev); + + /* We now wait until the other peer closes the channel + * after it got all elements from us. */ + } + } + if (PHASE_FINISH_CLOSING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "In PHASE_FINISH_CLOSING, pending %u demands\n", + num_demanded); + if (0 == num_demanded) + { + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); + } + } } @@ -1058,363 +1215,630 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo) * @param mh the message */ static void -handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_elements (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct UnionEvaluateOperation *eo = cls; + struct Operation *op = cls; struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; uint16_t element_size; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); - - if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && - (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) + if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) { - fail_union_operation (eo); - GNUNET_break (0); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); return; } - element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); - ee = GNUNET_malloc (sizeof *eo + element_size); - memcpy (&ee[1], &mh[1], element_size); + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; + + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); ee->remote = GNUNET_YES; + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); - insert_element (eo, ee); - send_client_element (eo, &ee->element); + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, + &ee->element_hash, + NULL)) + { + /* We got something we didn't demand, since it's not in our map. */ + GNUNET_break_op (0); + GNUNET_free (ee); + fail_union_operation (op); + return; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op_register_element (op, ee); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } - GNUNET_free (ee); + maybe_finish (op); } /** - * Handle an element request from a remote peer. + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). * * @param cls the union operation * @param mh the message */ static void -handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_inquiry (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct UnionEvaluateOperation *eo = cls; - struct IBF_Key *ibf_key; + struct Operation *op = cls; + const struct IBF_Key *ibf_key; unsigned int num_keys; + struct InquiryMessage *msg; /* look up elements and send them */ - if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + if (op->state->phase != PHASE_INVENTORY_PASSIVE) { - GNUNET_break (0); - fail_union_operation (eo); + GNUNET_break_op (0); + fail_union_operation (op); return; } - - num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key); - - if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) + num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) + / sizeof (struct IBF_Key); + if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) + != num_keys * sizeof (struct IBF_Key)) { - GNUNET_break (0); - fail_union_operation (eo); + GNUNET_break_op (0); + fail_union_operation (op); return; } - ibf_key = (struct IBF_Key *) &mh[1]; + msg = (struct InquiryMessage *) mh; + + ibf_key = (const struct IBF_Key *) &msg[1]; while (0 != num_keys--) { - send_elements_for_key (eo, *ibf_key); + struct IBF_Key unsalted_key; + unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); + send_offers_for_key (op, unsalted_key); ibf_key++; } } /** - * Callback used for notifications - * - * @param cls closure + * FIXME */ static void -peer_done_sent_cb (void *cls) +handle_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct UnionEvaluateOperation *eo = cls; + struct Operation *op = cls; + struct ElementEntry *ee; + struct GNUNET_SET_ElementMessage *emsg; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + struct GNUNET_MQ_Envelope *ev; + + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } - send_client_done_and_destroy (eo); + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); + if (NULL == ee) + { + /* Demand for non-existing element. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + { + /* Probably confused lazily copied sets. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Sending demanded element (size %u, hash %s) to peer\n", + (void *) op, + (unsigned int) ee->element.size, + GNUNET_h2s (&ee->element_hash)); + GNUNET_MQ_send (op->mq, ev); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + /* Nothing to do. */ + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } } /** - * Handle a done message from a remote peer - * + * Handle offers (of GNUNET_HashCode-s) and + * respond with demands (of GNUNET_HashCode-s). + * * @param cls the union operation * @param mh the message */ static void -handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) +handle_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct UnionEvaluateOperation *eo = cls; + struct Operation *op = cls; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; - if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) + /* look up elements and send them */ + if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_INVENTORY_ACTIVE)) { - /* we got all requests, but still have to send our elements as response */ - struct GNUNET_MQ_Message *mqm; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); - eo->phase = PHASE_FINISHED; - mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_break_op (0); + fail_union_operation (op); return; } - if (eo->phase == PHASE_EXPECT_ELEMENTS) + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); - eo->phase = PHASE_FINISHED; - send_client_done_and_destroy (eo); + GNUNET_break_op (0); + fail_union_operation (op); return; } - GNUNET_break (0); - fail_union_operation (eo); -} + for (hash = (const struct GNUNET_HashCode *) &mh[1]; + num_hashes > 0; + hash++, num_hashes--) + { + struct ElementEntry *ee; + struct GNUNET_MessageHeader *demands; + struct GNUNET_MQ_Envelope *ev; + + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + hash); + if (NULL != ee) + if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) + continue; + + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, + hash)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Skipped sending duplicate demand\n"); + continue; + } -/** - * The handlers array, used for both evaluate and accept - */ -static const struct GNUNET_MQ_Handler union_handlers[] = { - {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS}, - {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE}, - {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF}, - {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS}, - {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE}, - GNUNET_MQ_HANDLERS_END -}; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes, + hash, + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "[OP %x] Requesting element (hash %s)\n", + (void *) op, GNUNET_h2s (hash)); + ev = GNUNET_MQ_msg_header_extra (demands, + sizeof (struct GNUNET_HashCode), + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); + *(struct GNUNET_HashCode *) &demands[1] = *hash; + GNUNET_MQ_send (op->mq, ev); + } +} /** - * Functions of this type will be called when a stream is established - * - * @param cls the closure from GNUNET_STREAM_open - * @param socket socket to use to communicate with the - * other side (read/write) + * Handle a done message from a remote peer + * + * @param cls the union operation + * @param mh the message */ static void -stream_open_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) +handle_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) { - struct UnionEvaluateOperation *eo = cls; + struct Operation *op = cls; - GNUNET_assert (NULL == eo->mq); - GNUNET_assert (socket == eo->socket); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "open cb successful\n"); - eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo); - /* we started the operation, thus we have to send the operation request */ - send_operation_request (eo); - eo->phase = PHASE_EXPECT_SE; + if (op->state->phase == PHASE_INVENTORY_PASSIVE) + { + /* We got all requests, but still have to send our elements in response. */ + + op->state->phase = PHASE_FINISH_WAITING; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as passive partner), waiting for our demands to be satisfied\n"); + /* The active peer is done sending offers + * and inquiries. This means that all + * our responses to that (demands and offers) + * must be in flight (queued or in mesh). + * + * We should notify the active peer once + * all our demands are satisfied, so that the active + * peer can quit if we gave him everything. + */ + maybe_finish (op); + return; + } + if (op->state->phase == PHASE_INVENTORY_ACTIVE) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got DONE (as active partner), waiting to finish\n"); + /* All demands of the other peer are satisfied, + * and we processed all offers, thus we know + * exactly what our demands must be. + * + * We'll close the channel + * to the other peer once our demands are met. + */ + op->state->phase = PHASE_FINISH_CLOSING; + maybe_finish (op); + return; + } + GNUNET_break_op (0); + fail_union_operation (op); } /** - * Evaluate a union operation with - * a remote peer. + * Initiate operation to evaluate a set union with a remote peer. * - * @param m the evaluate request message from the client - * @parem set the set to evaluate the operation with + * @param op operation to perform (to be initialized) + * @param opaque_context message to be transmitted to the listener + * to convince him to accept, may be NULL */ -void -_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) +static void +union_evaluate (struct Operation *op, + const struct GNUNET_MessageHeader *opaque_context) { - struct UnionEvaluateOperation *eo; - struct GNUNET_MessageHeader *context_msg; - - eo = GNUNET_new (struct UnionEvaluateOperation); - eo->peer = m->target_peer; - eo->set = set; - eo->request_id = htonl (m->request_id); - GNUNET_assert (0 != eo->request_id); - eo->se = strata_estimator_dup (set->state.u->se); - eo->salt = ntohs (m->salt); - eo->app_id = m->app_id; - - context_msg = GNUNET_MQ_extract_nested_mh (m); - if (NULL != context_msg) + struct GNUNET_MQ_Envelope *ev; + struct OperationRequestMessage *msg; + + GNUNET_assert (NULL == op->state); + op->state = GNUNET_new (struct OperationState); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + /* copy the current generation's strata estimator for this operation */ + op->state->se = strata_estimator_dup (op->spec->set->state->se); + /* we started the operation, thus we have to send the operation request */ + op->state->phase = PHASE_EXPECT_SE; + op->state->salt_receive = op->state->salt_send = 42; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Initiating union operation evaluation\n"); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of initiated union operations", + 1, + GNUNET_NO); + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) { - eo->context_msg = GNUNET_copy_message (context_msg); + /* the context message is too large */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (op->spec->set->client); + return; } - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "evaluating union operation, (app %s)\n", - GNUNET_h2s (&eo->app_id)); - - eo->socket = - GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, - &stream_open_cb, eo, - GNUNET_STREAM_OPTION_END); - GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, - eo->set->state.u->ops_tail, - eo); - /* the stream open callback will kick off the operation */ + msg->operation = htonl (GNUNET_SET_OPERATION_UNION); + GNUNET_MQ_send (op->mq, + ev); + + if (NULL != opaque_context) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request with context message\n"); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "sent op request without context message\n"); } /** - * Accept an union operation request from a remote peer + * Accept an union operation request from a remote peer. + * Only initializes the private operation state. * - * @param m the accept message from the client - * @param set the set of the client - * @param incoming information about the requesting remote peer + * @param op operation that will be accepted as a union operation */ -void -_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, - struct Incoming *incoming) +static void +union_accept (struct Operation *op) { - struct UnionEvaluateOperation *eo; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); - - eo = GNUNET_new (struct UnionEvaluateOperation); - eo->generation_created = set->state.u->current_generation++; - eo->set = set; - eo->peer = incoming->peer; - eo->salt = ntohs (incoming->salt); - GNUNET_assert (0 != ntohl (m->request_id)); - eo->request_id = ntohl (m->request_id); - eo->se = strata_estimator_dup (set->state.u->se); - eo->mq = incoming->mq; - /* transfer ownership of mq and socket from incoming to eo */ - incoming->mq = NULL; - eo->socket = incoming->socket; - incoming->socket = NULL; - /* the peer's socket is now ours, we'll receive all messages */ - GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); - - GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, - eo->set->state.u->ops_tail, - eo); - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "accepting set union operation\n"); + GNUNET_assert (NULL == op->state); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# of accepted union operations", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# of total union operations", + 1, + GNUNET_NO); + + op->state = GNUNET_new (struct OperationState); + op->state->se = strata_estimator_dup (op->spec->set->state->se); + op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + op->state->salt_receive = op->state->salt_send = 42; /* kick off the operation */ - send_strata_estimator (eo); + send_strata_estimator (op); } /** * Create a new set supporting the union operation * - * @return the newly created set + * We maintain one strata estimator per set and then manipulate it over the + * lifetime of the set, as recreating a strata estimator would be expensive. + * + * @return the newly created set, NULL on error */ -struct Set * -_GSS_union_set_create (void) +static struct SetState * +union_set_create (void) { - struct Set *set; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n"); - - set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); - set->state.u = (struct UnionState *) &set[1]; - set->operation = GNUNET_SET_OPERATION_UNION; - /* keys of the hash map are stored in the element entrys, thus we do not - * want the hash map to copy them */ - set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - set->state.u->se = strata_estimator_create (SE_STRATA_COUNT, - SE_IBF_SIZE, SE_IBF_HASH_NUM); - return set; + struct SetState *set_state; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "union set created\n"); + set_state = GNUNET_new (struct SetState); + set_state->se = strata_estimator_create (SE_STRATA_COUNT, + SE_IBF_SIZE, SE_IBF_HASH_NUM); + if (NULL == set_state->se) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to allocate strata estimator\n"); + GNUNET_free (set_state); + return NULL; + } + return set_state; } /** * Add the element from the given element message to the set. * - * @param m message with the element - * @param set set to add the element to + * @param set_state state of the set want to add to + * @param ee the element to add to the set */ -void -_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) +static void +union_add (struct SetState *set_state, struct ElementEntry *ee) { - struct ElementEntry *ee; - struct ElementEntry *ee_dup; - uint16_t element_size; + strata_estimator_insert (set_state->se, + get_ibf_key (&ee->element_hash)); +} - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n"); - GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); - element_size = ntohs (m->header.size) - sizeof *m; - ee = GNUNET_malloc (element_size + sizeof *ee); - ee->element.size = element_size; - memcpy (&ee[1], &m[1], element_size); - ee->element.data = &ee[1]; - ee->generation_added = set->state.u->current_generation; - GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); - ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); - if (NULL != ee_dup) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n"); - GNUNET_free (ee); - return; - } - GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0)); +/** + * Remove the element given in the element message from the set. + * Only marks the element as removed, so that older set operations can still exchange it. + * + * @param set_state state of the set to remove from + * @param ee set element to remove + */ +static void +union_remove (struct SetState *set_state, struct ElementEntry *ee) +{ + strata_estimator_remove (set_state->se, + get_ibf_key (&ee->element_hash)); } /** - * Destroy a set that supports the union operation + * Destroy a set that supports the union operation. * - * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION + * @param set_state the set to destroy */ -void -_GSS_union_set_destroy (struct Set *set) +static void +union_set_destroy (struct SetState *set_state) { - GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); - if (NULL != set->client) + if (NULL != set_state->se) { - GNUNET_SERVER_client_drop (set->client); - set->client = NULL; - } - if (NULL != set->client_mq) - { - GNUNET_MQ_destroy (set->client_mq); - set->client_mq = NULL; + strata_estimator_destroy (set_state->se); + set_state->se = NULL; } + GNUNET_free (set_state); +} + - if (NULL != set->state.u->se) +/** + * Dispatch messages for a union operation. + * + * @param op the state of the union evaluate operation + * @param mh the received message + * @return #GNUNET_SYSERR if the tunnel should be disconnected, + * #GNUNET_OK otherwise + */ +int +union_handle_p2p_message (struct Operation *op, + const struct GNUNET_MessageHeader *mh) +{ + //LOG (GNUNET_ERROR_TYPE_DEBUG, + // "received p2p message (t: %u, s: %u)\n", + // ntohs (mh->type), + // ntohs (mh->size)); + switch (ntohs (mh->type)) { - strata_estimator_destroy (set->state.u->se); - set->state.u->se = NULL; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: + return handle_p2p_ibf (op, mh); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: + return handle_p2p_strata_estimator (op, mh, GNUNET_NO); + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC: + return handle_p2p_strata_estimator (op, mh, GNUNET_YES); + case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: + handle_p2p_elements (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: + handle_p2p_inquiry (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: + handle_p2p_done (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: + handle_p2p_offer (op, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: + handle_p2p_demand (op, mh); + break; + default: + /* Something wrong with cadet's message handlers? */ + GNUNET_assert (0); } + return GNUNET_OK; +} - destroy_elements (set->state.u); - while (NULL != set->state.u->ops_head) +/** + * Handler for peer-disconnects, notifies the client + * about the aborted operation in case the op was not concluded. + * + * @param op the destroyed operation + */ +static void +union_peer_disconnect (struct Operation *op) +{ + if (PHASE_DONE != op->state->phase) { - destroy_union_operation (set->state.u->ops_head); + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *msg; + + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_RESULT); + msg->request_id = htonl (op->spec->client_request_id); + msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); + msg->element_type = htons (0); + GNUNET_MQ_send (op->spec->set->client_mq, + ev); + LOG (GNUNET_ERROR_TYPE_WARNING, + "other peer disconnected prematurely, phase %u\n", + op->state->phase); + _GSS_operation_destroy (op, + GNUNET_YES); + return; } + // else: the session has already been concluded + LOG (GNUNET_ERROR_TYPE_DEBUG, + "other peer disconnected (finished)\n"); + if (GNUNET_NO == op->state->client_done_sent) + send_done_and_destroy (op); } + /** - * Remove the element given in the element message from the set. - * Only marks the element as removed, so that older set operations can still exchange it. + * Copy union-specific set state. * - * @param m message with the element - * @param set set to remove the element from + * @param set source set for copying the union state + * @return a copy of the union-specific set state */ -void -_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) +static struct SetState * +union_copy_state (struct Set *set) { - struct GNUNET_HashCode hash; - struct ElementEntry *ee; + struct SetState *new_state; - GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); - GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash); - ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash); - if (NULL == ee) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n"); - return; - } - if (GNUNET_YES == ee->removed) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n"); - return; - } - ee->removed = GNUNET_YES; - ee->generation_removed = set->state.u->current_generation; + new_state = GNUNET_new (struct SetState); + GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) ); + new_state->se = strata_estimator_dup (set->state->se); + + return new_state; } + +/** + * Get the table with implementing functions for + * set union. + * + * @return the operation specific VTable + */ +const struct SetVT * +_GSS_union_vt () +{ + static const struct SetVT union_vt = { + .create = &union_set_create, + .msg_handler = &union_handle_p2p_message, + .add = &union_add, + .remove = &union_remove, + .destroy_set = &union_set_destroy, + .evaluate = &union_evaluate, + .accept = &union_accept, + .peer_disconnect = &union_peer_disconnect, + .cancel = &union_op_cancel, + .copy_state = &union_copy_state, + }; + + return &union_vt; +}