From: Christian Grothoff Date: Mon, 13 Mar 2017 00:24:22 +0000 (+0100) Subject: major clean up and bugfixes of SET X-Git-Tag: gnunet-0.11.0rc0~284^2~6^2 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=bf6f552fdefe75425635f66343f98995e2f602f6;p=oweals%2Fgnunet.git major clean up and bugfixes of SET --- diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index b80c1f2fd..752253411 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013, 2014, 2017 GNUnet e.V. + Copyright (C) 2013-2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -35,6 +35,35 @@ */ #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES + +/** + * Lazy copy requests made by a client. + */ +struct LazyCopyRequest +{ + /** + * Kept in a DLL. + */ + struct LazyCopyRequest *prev; + + /** + * Kept in a DLL. + */ + struct LazyCopyRequest *next; + + /** + * Which set are we supposed to copy? + */ + struct Set *source_set; + + /** + * Cookie identifying the request. + */ + uint32_t cookie; + +}; + + /** * A listener is inhabited by a client, and waits for evaluation * requests from remote peers. @@ -52,27 +81,36 @@ struct Listener struct Listener *prev; /** - * Client that owns the listener. - * Only one client may own a listener. + * Head of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. */ - struct GNUNET_SERVICE_Client *client; + struct Operation *op_head; /** - * Message queue for the client + * Tail of DLL of operations this listener is responsible for. + * Once the client has accepted/declined the operation, the + * operation is moved to the respective set's operation DLLS. */ - struct GNUNET_MQ_Handle *client_mq; + struct Operation *op_tail; /** - * Application ID for the operation, used to distinguish - * multiple operations of the same type with the same peer. + * Client that owns the listener. + * Only one client may own a listener. */ - struct GNUNET_HashCode app_id; + struct ClientState *cs; /** * The port we are listening on with CADET. */ struct GNUNET_CADET_Port *open_port; + /** + * Application ID for the operation, used to distinguish + * multiple operations of the same type with the same peer. + */ + struct GNUNET_HashCode app_id; + /** * The type of the operation. */ @@ -80,21 +118,6 @@ struct Listener }; -struct LazyCopyRequest -{ - struct Set *source_set; - uint32_t cookie; - - struct LazyCopyRequest *prev; - struct LazyCopyRequest *next; -}; - - -/** - * Configuration of our local peer. - */ -static const struct GNUNET_CONFIGURATION_Handle *configuration; - /** * Handle to the cadet service, used to listen for and connect to * remote peers. @@ -102,94 +125,48 @@ static const struct GNUNET_CONFIGURATION_Handle *configuration; static struct GNUNET_CADET_Handle *cadet; /** - * Sets are held in a doubly linked list. + * DLL of lazy copy requests by this client. */ -static struct Set *sets_head; +static struct LazyCopyRequest *lazy_copy_head; /** - * Sets are held in a doubly linked list. + * DLL of lazy copy requests by this client. */ -static struct Set *sets_tail; +static struct LazyCopyRequest *lazy_copy_tail; /** - * Listeners are held in a doubly linked list. + * Generator for unique cookie we set per lazy copy request. */ -static struct Listener *listeners_head; +static uint32_t lazy_copy_cookie; /** - * Listeners are held in a doubly linked list. + * Statistics handle. */ -static struct Listener *listeners_tail; +struct GNUNET_STATISTICS_Handle *_GSS_statistics; /** - * Incoming sockets from remote peers are held in a doubly linked - * list. + * Listeners are held in a doubly linked list. */ -static struct Operation *incoming_head; +static struct Listener *listener_head; /** - * Incoming sockets from remote peers are held in a doubly linked - * list. + * Listeners are held in a doubly linked list. */ -static struct Operation *incoming_tail; - -static struct LazyCopyRequest *lazy_copy_head; -static struct LazyCopyRequest *lazy_copy_tail; - -static uint32_t lazy_copy_cookie = 1; +static struct Listener *listener_tail; /** * Counter for allocating unique IDs for clients, used to identify * incoming operation requests from remote peers, that the client can - * choose to accept or refuse. - */ -static uint32_t suggest_id = 1; - -/** - * Statistics handle. + * choose to accept or refuse. 0 must not be used (reserved for + * uninitialized). */ -struct GNUNET_STATISTICS_Handle *_GSS_statistics; - - -/** - * Get set that is owned by the given client, if any. - * - * @param client client to look for - * @return set that the client owns, NULL if the client - * does not own a set - */ -static struct Set * -set_get (struct GNUNET_SERVICE_Client *client) -{ - for (struct Set *set = sets_head; NULL != set; set = set->next) - if (set->client == client) - return set; - return NULL; -} - - -/** - * Get the listener associated with the given client, if any. - * - * @param client the client - * @return listener associated with the client, NULL - * if there isn't any - */ -static struct Listener * -listener_get (struct GNUNET_SERVICE_Client *client) -{ - for (struct Listener *listener = listeners_head; - NULL != listener; - listener = listener->next) - if (listener->client == client) - return listener; - return NULL; -} +static uint32_t suggest_id; /** * Get the incoming socket associated with the given id. * + * @param listener the listener to look in * @param id id to look for * @return the incoming socket associated with the id, * or NULL if there is none @@ -197,44 +174,49 @@ listener_get (struct GNUNET_SERVICE_Client *client) static struct Operation * get_incoming (uint32_t id) { - for (struct Operation *op = incoming_head; NULL != op; op = op->next) - if (op->suggest_id == id) - { - GNUNET_assert (GNUNET_YES == op->is_incoming); - return op; - } + for (struct Listener *listener = listener_head; + NULL != listener; + listener = listener->next) + { + for (struct Operation *op = listener->op_head; NULL != op; op = op->next) + if (op->suggest_id == id) + return op; + } return NULL; } /** - * Destroy a listener, free all resources associated with it. + * Destroy an incoming request from a remote peer * - * @param listener listener to destroy + * @param op remote request to destroy */ static void -listener_destroy (struct Listener *listener) +incoming_destroy (struct Operation *op) { - /* If the client is not dead yet, destroy it. - * The client's destroy callback will destroy the listener again. */ - if (NULL != listener->client) - { - struct GNUNET_SERVICE_Client *client = listener->client; - - GNUNET_MQ_destroy (listener->client_mq); - listener->client_mq = NULL; + struct Listener *listener; + struct GNUNET_CADET_Channel *channel; - listener->client = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting listener client\n"); - GNUNET_SERVICE_client_drop (client); - return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying incoming operation %p\n", + op); + if (NULL != (listener = op->listener)) + { + GNUNET_CONTAINER_DLL_remove (listener->op_head, + listener->op_tail, + op); + op->listener = NULL; + } + if (NULL != op->timeout_task) + { + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + } + if (NULL != (channel = op->channel)) + { + op->channel = NULL; + GNUNET_CADET_channel_destroy (channel); } - GNUNET_CADET_close_port (listener->open_port); - GNUNET_CONTAINER_DLL_remove (listeners_head, - listeners_tail, - listener); - GNUNET_free (listener); } @@ -304,12 +286,11 @@ garbage_collect_cb (void *cls, static void collect_generation_garbage (struct Set *set) { - struct Operation *op; struct GarbageContext gc; gc.min_op_generation = UINT_MAX; gc.max_op_generation = 0; - for (op = set->ops_head; NULL != op; op = op->next) + for (struct Operation *op = set->ops_head; NULL != op; op = op->next) { gc.min_op_generation = GNUNET_MIN (gc.min_op_generation, op->generation_created); @@ -323,23 +304,36 @@ collect_generation_garbage (struct Set *set) } +/** + * Is @a generation in the range of exclusions? + * + * @param generation generation to query + * @param excluded array of generations where the element is excluded + * @param excluded_size length of the @a excluded array + * @return #GNUNET_YES if @a generation is in any of the ranges + */ static int is_excluded_generation (unsigned int generation, struct GenerationRange *excluded, unsigned int excluded_size) { - unsigned int i; - - for (i = 0; i < excluded_size; i++) - { - if ( (generation >= excluded[i].start) && (generation < excluded[i].end) ) + for (unsigned int i = 0; i < excluded_size; i++) + if ( (generation >= excluded[i].start) && + (generation < excluded[i].end) ) return GNUNET_YES; - } - return GNUNET_NO; } +/** + * Is element @a ee part of the set during @a query_generation? + * + * @param ee element to test + * @param query_generation generation to query + * @param excluded array of generations where the element is excluded + * @param excluded_size length of the @a excluded array + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ static int is_element_of_generation (struct ElementEntry *ee, unsigned int query_generation, @@ -348,11 +342,12 @@ is_element_of_generation (struct ElementEntry *ee, { struct MutationEvent *mut; int is_present; - unsigned int i; GNUNET_assert (NULL != ee->mutations); - - if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size)) + if (GNUNET_YES == + is_excluded_generation (query_generation, + excluded, + excluded_size)) { GNUNET_break (0); return GNUNET_NO; @@ -362,7 +357,7 @@ is_element_of_generation (struct ElementEntry *ee, /* Could be made faster with binary search, but lists are small, so why bother. */ - for (i = 0; i < ee->mutations_size; i++) + for (unsigned int i = 0; i < ee->mutations_size; i++) { mut = &ee->mutations[i]; @@ -374,7 +369,10 @@ is_element_of_generation (struct ElementEntry *ee, continue; } - if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) + if (GNUNET_YES == + is_excluded_generation (mut->generation, + excluded, + excluded_size)) { /* The generation is excluded (because it belongs to another fork via a lazy copy) and thus mutations aren't considered @@ -383,11 +381,12 @@ is_element_of_generation (struct ElementEntry *ee, } /* This would be an inconsistency in how we manage mutations. */ - if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) ) + if ( (GNUNET_YES == is_present) && + (GNUNET_YES == mut->added) ) GNUNET_assert (0); - /* Likewise. */ - if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) ) + if ( (GNUNET_NO == is_present) && + (GNUNET_NO == mut->added) ) GNUNET_assert (0); is_present = mut->added; @@ -397,44 +396,33 @@ is_element_of_generation (struct ElementEntry *ee, } -int -_GSS_is_element_of_set (struct ElementEntry *ee, - struct Set *set) -{ - return is_element_of_generation (ee, - set->current_generation, - set->excluded_generations, - set->excluded_generations_size); -} - - -static int -is_element_of_iteration (struct ElementEntry *ee, - struct Set *set) -{ - return is_element_of_generation (ee, - set->iter_generation, - set->excluded_generations, - set->excluded_generations_size); -} - - +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ int _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) { return is_element_of_generation (ee, op->generation_created, - op->spec->set->excluded_generations, - op->spec->set->excluded_generations_size); + op->set->excluded_generations, + op->set->excluded_generations_size); } /** - * Destroy the given operation. Call the implementation-specific - * cancel function of the operation. Disconnects from the remote - * peer. Does not disconnect the client, as there may be multiple - * operations per set. + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. * * @param op operation to destroy * @param gc #GNUNET_YES to perform garbage collection on the set @@ -443,45 +431,67 @@ void _GSS_operation_destroy (struct Operation *op, int gc) { - struct Set *set; + struct Set *set = op->set; struct GNUNET_CADET_Channel *channel; - if (NULL == op->vt) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying operation %p\n", + op); + GNUNET_assert (NULL == op->listener); + if (NULL != op->state) { - /* already in #_GSS_operation_destroy() */ - return; + set->vt->cancel (op); + op->state = NULL; } - GNUNET_assert (GNUNET_NO == op->is_incoming); - GNUNET_assert (NULL != op->spec); - set = op->spec->set; - GNUNET_CONTAINER_DLL_remove (set->ops_head, - set->ops_tail, - op); - op->vt->cancel (op); - op->vt = NULL; - if (NULL != op->spec) + if (NULL != set) { - if (NULL != op->spec->context_msg) - { - GNUNET_free (op->spec->context_msg); - op->spec->context_msg = NULL; - } - GNUNET_free (op->spec); - op->spec = NULL; + GNUNET_CONTAINER_DLL_remove (set->ops_head, + set->ops_tail, + op); + op->set = NULL; + } + if (NULL != op->context_msg) + { + GNUNET_free (op->context_msg); + op->context_msg = NULL; } if (NULL != (channel = op->channel)) { + /* This will free op; called conditionally as this helper function + is also called from within the channel disconnect handler. */ op->channel = NULL; GNUNET_CADET_channel_destroy (channel); } - - if (GNUNET_YES == gc) + if ( (NULL != set) && + (GNUNET_YES == gc) ) collect_generation_garbage (set); /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, * there was a channel end handler that will free 'op' on the call stack. */ } +/** + * Callback called when a client connects to the service. + * + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a `struct ClientState` + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) +{ + struct ClientState *cs; + + cs = GNUNET_new (struct ClientState); + cs->client = c; + cs->mq = mq; + return cs; +} + + /** * Iterator over hash map entries to free element entries. * @@ -498,66 +508,76 @@ destroy_elements_iterator (void *cls, struct ElementEntry *ee = value; GNUNET_free_non_null (ee->mutations); - GNUNET_free (ee); return GNUNET_YES; } /** - * Destroy a set, and free all resources and operations associated with it. + * Clean up after a client has disconnected * - * @param set the set to destroy + * @param cls closure, unused + * @param client the client to clean up after + * @param internal_cls the `struct ClientState` */ static void -set_destroy (struct Set *set) +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *client, + void *internal_cls) { - if (NULL != set->client) - { - /* If the client is not dead yet, destroy it. The client's destroy - * callback will call `set_destroy()` again in this case. We do - * this so that the channel end handler still has a valid set handle - * to destroy. */ - struct GNUNET_SERVICE_Client *client = set->client; - - set->client = NULL; - GNUNET_SERVICE_client_drop (client); - return; - } - GNUNET_assert (NULL != set->state); - while (NULL != set->ops_head) - _GSS_operation_destroy (set->ops_head, GNUNET_NO); - set->vt->destroy_set (set->state); - set->state = NULL; - if (NULL != set->iter) - { - GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); - set->iter = NULL; - set->iteration_id++; - } + struct ClientState *cs = internal_cls; + struct Operation *op; + struct Listener *listener; + struct Set *set; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client disconnected, cleaning up\n"); + if (NULL != (set = cs->set)) { - struct SetContent *content; + struct SetContent *content = set->content; struct PendingMutation *pm; struct PendingMutation *pm_current; + struct LazyCopyRequest *lcr; - content = set->content; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying client's set\n"); + /* Destroy pending set operations */ + while (NULL != set->ops_head) + _GSS_operation_destroy (set->ops_head, + GNUNET_NO); + + /* Destroy operation-specific state */ + GNUNET_assert (NULL != set->state); + set->vt->destroy_set (set->state); + set->state = NULL; + + /* Clean up ongoing iterations */ + if (NULL != set->iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); + set->iter = NULL; + set->iteration_id++; + } - // discard any pending mutations that reference this set + /* discard any pending mutations that reference this set */ pm = content->pending_mutations_head; while (NULL != pm) { pm_current = pm; pm = pm->next; - if (pm_current-> set == set) + if (pm_current->set == set) + { GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, content->pending_mutations_tail, pm_current); - + GNUNET_free (pm_current); + } } + /* free set content (or at least decrement RC) */ set->content = NULL; GNUNET_assert (0 != content->refcount); - content->refcount -= 1; + content->refcount--; if (0 == content->refcount) { GNUNET_assert (NULL != content->elements); @@ -568,166 +588,41 @@ set_destroy (struct Set *set) content->elements = NULL; GNUNET_free (content); } - } - GNUNET_free_non_null (set->excluded_generations); - set->excluded_generations = NULL; - GNUNET_CONTAINER_DLL_remove (sets_head, - sets_tail, - set); + GNUNET_free_non_null (set->excluded_generations); + set->excluded_generations = NULL; - // remove set from pending copy requests - { - struct LazyCopyRequest *lcr; + /* remove set from pending copy requests */ lcr = lazy_copy_head; while (NULL != lcr) { - struct LazyCopyRequest *lcr_current; - lcr_current = lcr; + struct LazyCopyRequest *lcr_current = lcr; + lcr = lcr->next; if (lcr_current->source_set == set) + { GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, lcr_current); + GNUNET_free (lcr_current); + } } + GNUNET_free (set); } - GNUNET_free (set); -} - - -/** - * Callback called when a client connects to the service. - * - * @param cls closure for the service - * @param c the new client that connected to the service - * @param mq the message queue used to send messages to the client - * @return @a c - */ -static void * -client_connect_cb (void *cls, - struct GNUNET_SERVICE_Client *c, - struct GNUNET_MQ_Handle *mq) -{ - return c; -} - - -/** - * Destroy an incoming request from a remote peer - * - * @param incoming remote request to destroy - */ -static void -incoming_destroy (struct Operation *incoming) -{ - struct GNUNET_CADET_Channel *channel; - - GNUNET_assert (GNUNET_YES == incoming->is_incoming); - GNUNET_CONTAINER_DLL_remove (incoming_head, - incoming_tail, - incoming); - if (NULL != incoming->timeout_task) - { - GNUNET_SCHEDULER_cancel (incoming->timeout_task); - incoming->timeout_task = NULL; - } - /* make sure that the tunnel end handler will not destroy us again */ - incoming->vt = NULL; - if (NULL != incoming->spec) - { - GNUNET_free (incoming->spec); - incoming->spec = NULL; - } - if (NULL != (channel = incoming->channel)) - { - incoming->channel = NULL; - GNUNET_CADET_channel_destroy (channel); - } -} - - -/** - * Clean up after a client has disconnected - * - * @param cls closure, unused - * @param client the client to clean up after - * @param internal_cls our client-specific internal data structure - */ -static void -client_disconnect_cb (void *cls, - struct GNUNET_SERVICE_Client *client, - void *internal_cls) -{ - struct Set *set; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "client disconnected, cleaning up\n"); - set = set_get (client); - if (NULL != set) + if (NULL != (listener = cs->listener)) { - set->client = NULL; - set_destroy (set); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client's set destroyed\n"); - } - struct Listener *listener = listener_get (client); - if (NULL != listener) - { - /* destroy all incoming operations whose client just - * got destroyed */ - //struct Operation *op = incoming_head; - /* - while (NULL != op) - { - struct Operation *curr = op; - op = op->next; - if ( (GNUNET_YES == curr->is_incoming) && - (curr->listener == listener) ) - incoming_destroy (curr); - } - */ - listener->client = NULL; - listener_destroy (listener); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client's listener destroyed\n"); + "Destroying client's listener\n"); + GNUNET_CADET_close_port (listener->open_port); + listener->open_port = NULL; + while (NULL != (op = listener->op_head)) + incoming_destroy (op); + GNUNET_CONTAINER_DLL_remove (listener_head, + listener_tail, + listener); + GNUNET_free (listener); } -} - - -/** - * Suggest the given request to the listener. The listening client can - * then accept or reject the remote request. - * - * @param incoming the incoming peer with the request to suggest - * @param listener the listener to suggest the request to - */ -static void -incoming_suggest (struct Operation *incoming, - struct Listener *listener) -{ - struct GNUNET_MQ_Envelope *mqm; - struct GNUNET_SET_RequestMessage *cmsg; - - GNUNET_assert (GNUNET_YES == incoming->is_incoming); - GNUNET_assert (NULL != incoming->spec); - GNUNET_assert (0 == incoming->suggest_id); - incoming->suggest_id = suggest_id++; - if (0 == suggest_id) - suggest_id++; - GNUNET_assert (NULL != incoming->timeout_task); - GNUNET_SCHEDULER_cancel (incoming->timeout_task); - incoming->timeout_task = NULL; - mqm = GNUNET_MQ_msg_nested_mh (cmsg, - GNUNET_MESSAGE_TYPE_SET_REQUEST, - incoming->spec->context_msg); - GNUNET_assert (NULL != mqm); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Suggesting incoming request with accept id %u to listener\n", - incoming->suggest_id); - cmsg->accept_id = htonl (incoming->suggest_id); - cmsg->peer_id = incoming->spec->peer; - GNUNET_MQ_send (listener->client_mq, - mqm); + GNUNET_free (cs); } @@ -744,10 +639,22 @@ check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) { struct Operation *op = cls; + struct Listener *listener = op->listener; const struct GNUNET_MessageHeader *nested_context; /* double operation request */ - if (NULL != op->spec) + if (0 != op->suggest_id) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + /* This should be equivalent to the previous condition, but can't hurt to check twice */ + if (NULL == op->listener) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation)) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -786,61 +693,74 @@ handle_incoming_msg (void *cls, { struct Operation *op = cls; struct Listener *listener = op->listener; - struct OperationSpecification *spec; const struct GNUNET_MessageHeader *nested_context; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_SET_RequestMessage *cmsg; - GNUNET_assert (GNUNET_YES == op->is_incoming); - spec = GNUNET_new (struct OperationSpecification); nested_context = GNUNET_MQ_extract_nested_mh (msg); /* Make a copy of the nested_context (application-specific context information that is opaque to set) so we can pass it to the listener later on */ if (NULL != nested_context) - spec->context_msg = GNUNET_copy_message (nested_context); - spec->operation = ntohl (msg->operation); - spec->app_id = listener->app_id; - spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, - UINT32_MAX); - spec->peer = op->peer; - spec->remote_element_count = ntohl (msg->element_count); - op->spec = spec; - listener = op->listener; + op->context_msg = GNUNET_copy_message (nested_context); + op->remote_element_count = ntohl (msg->element_count); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received P2P operation request (op %u, port %s) for active listener\n", (uint32_t) ntohl (msg->operation), - GNUNET_h2s (&listener->app_id)); - incoming_suggest (op, - listener); + GNUNET_h2s (&op->listener->app_id)); + GNUNET_assert (0 == op->suggest_id); + if (0 == suggest_id) + suggest_id++; + op->suggest_id = suggest_id++; + GNUNET_assert (NULL != op->timeout_task); + GNUNET_SCHEDULER_cancel (op->timeout_task); + op->timeout_task = NULL; + env = GNUNET_MQ_msg_nested_mh (cmsg, + GNUNET_MESSAGE_TYPE_SET_REQUEST, + op->context_msg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Suggesting incoming request with accept id %u to listener %p of client %p\n", + op->suggest_id, + listener, + listener->cs); + cmsg->accept_id = htonl (op->suggest_id); + cmsg->peer_id = op->peer; + GNUNET_MQ_send (listener->cs->mq, + env); + /* NOTE: GNUNET_CADET_receive_done() will be called in + #handle_client_accept() */ } +/** + * Add an element to @a set as specified by @a msg + * + * @param set set to manipulate + * @param msg message specifying the change + */ static void execute_add (struct Set *set, - const struct GNUNET_MessageHeader *m) + const struct GNUNET_SET_ElementMessage *msg) { - const struct GNUNET_SET_ElementMessage *msg; struct GNUNET_SET_Element el; struct ElementEntry *ee; struct GNUNET_HashCode hash; - GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type)); - - msg = (const struct GNUNET_SET_ElementMessage *) m; - el.size = ntohs (m->size) - sizeof *msg; + GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type)); + el.size = ntohs (msg->header.size) - sizeof (*msg); el.data = &msg[1]; el.element_type = ntohs (msg->element_type); - GNUNET_SET_element_hash (&el, &hash); - + GNUNET_SET_element_hash (&el, + &hash); ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); - if (NULL == ee) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client inserts element %s of size %u\n", GNUNET_h2s (&hash), el.size); - ee = GNUNET_malloc (el.size + sizeof *ee); + ee = GNUNET_malloc (el.size + sizeof (*ee)); ee->element.size = el.size; GNUNET_memcpy (&ee[1], el.data, @@ -857,7 +777,11 @@ execute_add (struct Set *set, ee, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); } - else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) + else if (GNUNET_YES == + is_element_of_generation (ee, + set->current_generation, + set->excluded_generations, + set->excluded_generations_size)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client inserted element %s of size %u twice (ignored)\n", @@ -877,24 +801,27 @@ execute_add (struct Set *set, ee->mutations_size, mut); } - - set->vt->add (set->state, ee); + set->vt->add (set->state, + ee); } +/** + * Remove an element from @a set as specified by @a msg + * + * @param set set to manipulate + * @param msg message specifying the change + */ static void execute_remove (struct Set *set, - const struct GNUNET_MessageHeader *m) + const struct GNUNET_SET_ElementMessage *msg) { - const struct GNUNET_SET_ElementMessage *msg; struct GNUNET_SET_Element el; struct ElementEntry *ee; struct GNUNET_HashCode hash; - GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)); - - msg = (const struct GNUNET_SET_ElementMessage *) m; - el.size = ntohs (m->size) - sizeof *msg; + GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type)); + el.size = ntohs (msg->header.size) - sizeof (*msg); el.data = &msg[1]; el.element_type = ntohs (msg->element_type); GNUNET_SET_element_hash (&el, &hash); @@ -908,7 +835,11 @@ execute_remove (struct Set *set, el.size); return; } - if (GNUNET_NO == _GSS_is_element_of_set (ee, set)) + if (GNUNET_NO == + is_element_of_generation (ee, + set->current_generation, + set->excluded_generations, + set->excluded_generations_size)) { /* Client tried to remove element twice */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -931,22 +862,28 @@ execute_remove (struct Set *set, ee->mutations_size, mut); } - set->vt->remove (set->state, ee); + set->vt->remove (set->state, + ee); } - +/** + * Perform a mutation on a set as specified by the @a msg + * + * @param set the set to mutate + * @param msg specification of what to change + */ static void execute_mutation (struct Set *set, - const struct GNUNET_MessageHeader *m) + const struct GNUNET_SET_ElementMessage *msg) { - switch (ntohs (m->type)) + switch (ntohs (msg->header.type)) { case GNUNET_MESSAGE_TYPE_SET_ADD: - execute_add (set, m); + execute_add (set, msg); break; case GNUNET_MESSAGE_TYPE_SET_REMOVE: - execute_remove (set, m); + execute_remove (set, msg); break; default: GNUNET_break (0); @@ -954,6 +891,34 @@ execute_mutation (struct Set *set, } +/** + * Execute mutations that were delayed on a set because of + * pending operations. + * + * @param set the set to execute mutations on + */ +static void +execute_delayed_mutations (struct Set *set) +{ + struct PendingMutation *pm; + + if (0 != set->content->iterator_count) + return; /* still cannot do this */ + while (NULL != (pm = set->content->pending_mutations_head)) + { + GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing pending mutation on %p.\n", + pm->set); + execute_mutation (pm->set, + pm->msg); + GNUNET_free (pm->msg); + GNUNET_free (pm); + } +} + /** * Send the next element of a set to the set's client. The next element is given by @@ -977,65 +942,45 @@ send_client_element (struct Set *set) struct GNUNET_SET_IterResponseMessage *msg; GNUNET_assert (NULL != set->iter); - -again: - - ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, - NULL, - (const void **) &ee); - if (GNUNET_NO == ret) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Iteration on %p done.\n", - (void *) set); - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); - GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); - set->iter = NULL; - set->iteration_id++; - - GNUNET_assert (set->content->iterator_count > 0); - set->content->iterator_count -= 1; - - if (0 == set->content->iterator_count) + do { + ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, + NULL, + (const void **) &ee); + if (GNUNET_NO == ret) { - while (NULL != set->content->pending_mutations_head) - { - struct PendingMutation *pm; - - pm = set->content->pending_mutations_head; - GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, - set->content->pending_mutations_tail, - pm); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Executing pending mutation on %p.\n", - (void *) pm->set); - execute_mutation (pm->set, pm->mutation_message); - GNUNET_free (pm->mutation_message); - GNUNET_free (pm); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Iteration on %p done.\n", + set); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); + GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); + set->iter = NULL; + set->iteration_id++; + GNUNET_assert (set->content->iterator_count > 0); + set->content->iterator_count--; + execute_delayed_mutations (set); + GNUNET_MQ_send (set->cs->mq, + ev); + return; } - - } - else - { GNUNET_assert (NULL != ee); - - if (GNUNET_NO == is_element_of_iteration (ee, set)) - goto again; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending iteration element on %p.\n", - (void *) set); - ev = GNUNET_MQ_msg_extra (msg, - ee->element.size, - GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); - GNUNET_memcpy (&msg[1], - ee->element.data, - ee->element.size); - msg->element_type = htons (ee->element.element_type); - msg->iteration_id = htons (set->iteration_id); - } - GNUNET_MQ_send (set->client_mq, ev); + } while (GNUNET_NO == + is_element_of_generation (ee, + set->iter_generation, + set->excluded_generations, + set->excluded_generations_size)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending iteration element on %p.\n", + set); + ev = GNUNET_MQ_msg_extra (msg, + ee->element.size, + GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); + GNUNET_memcpy (&msg[1], + ee->element.data, + ee->element.size); + msg->element_type = htons (ee->element.element_type); + msg->iteration_id = htons (set->iteration_id); + GNUNET_MQ_send (set->cs->mq, + ev); } @@ -1052,22 +997,21 @@ static void handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { /* attempt to iterate over a non existing set */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } if (NULL != set->iter) { /* Only one concurrent iterate-action allowed per set */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1075,8 +1019,8 @@ handle_client_iterate (void *cls, (void *) set, set->current_generation, GNUNET_CONTAINER_multihashmap_size (set->content->elements)); - GNUNET_SERVICE_client_continue (client); - set->content->iterator_count += 1; + GNUNET_SERVICE_client_continue (cs->client); + set->content->iterator_count++; set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); set->iter_generation = set->current_generation; send_client_element (set); @@ -1095,17 +1039,17 @@ static void handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client created new set (operation %u)\n", (uint32_t) ntohl (msg->operation)); - if (NULL != set_get (client)) + if (NULL != cs->set) { /* There can only be one set per client */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } set = GNUNET_new (struct Set); @@ -1120,7 +1064,7 @@ handle_client_create_set (void *cls, default: GNUNET_free (set); GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); @@ -1129,18 +1073,16 @@ handle_client_create_set (void *cls, { /* initialization failed (i.e. out of memory) */ GNUNET_free (set); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } set->content = GNUNET_new (struct SetContent); set->content->refcount = 1; - set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - set->client = client; - set->client_mq = GNUNET_SERVICE_client_get_mq (client); - GNUNET_CONTAINER_DLL_insert (sets_head, - sets_tail, - set); - GNUNET_SERVICE_client_continue (client); + set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_YES); + set->cs = cs; + cs->set = set; + GNUNET_SERVICE_client_continue (cs->client); } @@ -1156,31 +1098,12 @@ handle_client_create_set (void *cls, static void incoming_timeout_cb (void *cls) { - struct Operation *incoming = cls; + struct Operation *op = cls; - incoming->timeout_task = NULL; - GNUNET_assert (GNUNET_YES == incoming->is_incoming); + op->timeout_task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Remote peer's incoming request timed out\n"); - incoming_destroy (incoming); -} - - -/** - * Terminates an incoming operation in case we have not yet received an - * operation request. Called by the channel destruction handler. - * - * @param op the channel context - */ -static void -handle_incoming_disconnect (struct Operation *op) -{ - GNUNET_assert (GNUNET_YES == op->is_incoming); - /* channel is already dead, incoming_destroy must not - * destroy it ... */ - op->channel = NULL; incoming_destroy (op); - op->vt = NULL; } @@ -1205,31 +1128,26 @@ channel_new_cb (void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source) { - static const struct SetVT incoming_vt = { - .peer_disconnect = &handle_incoming_disconnect - }; struct Listener *listener = cls; - struct Operation *incoming; + struct Operation *op; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); - incoming = GNUNET_new (struct Operation); - incoming->listener = listener; - incoming->is_incoming = GNUNET_YES; - incoming->peer = *source; - incoming->channel = channel; - incoming->mq = GNUNET_CADET_get_mq (incoming->channel); - incoming->vt = &incoming_vt; - incoming->timeout_task + op = GNUNET_new (struct Operation); + op->listener = listener; + op->peer = *source; + op->channel = channel; + op->mq = GNUNET_CADET_get_mq (op->channel); + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, &incoming_timeout_cb, - incoming); - GNUNET_CONTAINER_DLL_insert_tail (incoming_head, - incoming_tail, - incoming); - // incoming_suggest (incoming, - // listener); - return incoming; + op); + GNUNET_CONTAINER_DLL_insert (listener->op_head, + listener->op_tail, + op); + return op; } @@ -1258,22 +1176,14 @@ channel_end_cb (void *channel_ctx, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); op->channel = NULL; - op->keep++; - /* the vt can be null if a client already requested canceling op. */ - if (NULL != op->vt) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "calling peer disconnect due to channel end\n"); - op->vt->peer_disconnect (op); - } - op->keep--; - if (0 == op->keep) - { - /* cadet will never call us with the context again! */ - GNUNET_free (op); - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "channel_end_cb finished\n"); + if (NULL != op->listener) + incoming_destroy (op); + else if (NULL != op->set) + op->set->vt->channel_death (op); + else + _GSS_operation_destroy (op, + GNUNET_YES); + GNUNET_free (op); } @@ -1310,7 +1220,7 @@ static void handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct GNUNET_MQ_MessageHandler cadet_handlers[] = { GNUNET_MQ_hd_var_size (incoming_msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, @@ -1376,50 +1286,33 @@ handle_client_listen (void *cls, }; struct Listener *listener; - if (NULL != listener_get (client)) + if (NULL != cs->listener) { /* max. one active listener per client! */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } listener = GNUNET_new (struct Listener); - listener->client = client; - listener->client_mq = GNUNET_SERVICE_client_get_mq (client); + listener->cs = cs; listener->app_id = msg->app_id; - listener->operation = ntohl (msg->operation); - GNUNET_CONTAINER_DLL_insert_tail (listeners_head, - listeners_tail, - listener); + listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); + GNUNET_CONTAINER_DLL_insert (listener_head, + listener_tail, + listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New listener created (op %u, port %s)\n", listener->operation, GNUNET_h2s (&listener->app_id)); - listener->open_port = GNUNET_CADET_open_porT (cadet, - &msg->app_id, - &channel_new_cb, - listener, - &channel_window_cb, - &channel_end_cb, - cadet_handlers); - /* check for existing incoming requests the listener might be interested in */ - for (struct Operation *op = incoming_head; NULL != op; op = op->next) - { - if (NULL == op->spec) - continue; /* no details available yet */ - if (0 != op->suggest_id) - continue; /* this one has been already suggested to a listener */ - if (listener->operation != op->spec->operation) - continue; /* incompatible operation */ - if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, - &op->spec->app_id)) - continue; /* incompatible appliation */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found matching existing request\n"); - incoming_suggest (op, - listener); - } - GNUNET_SERVICE_client_continue (client); + listener->open_port + = GNUNET_CADET_open_porT (cadet, + &msg->app_id, + &channel_new_cb, + listener, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); + GNUNET_SERVICE_client_continue (cs->client); } @@ -1434,26 +1327,26 @@ static void handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; - struct Operation *incoming; + struct ClientState *cs = cls; + struct Operation *op; - incoming = get_incoming (ntohl (msg->accept_reject_id)); - if (NULL == incoming) + op = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == op) { /* no matching incoming operation for this reject; could be that the other peer already disconnected... */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client rejected unknown operation %u\n", (unsigned int) ntohl (msg->accept_reject_id)); - GNUNET_SERVICE_client_continue (client); + GNUNET_SERVICE_client_continue (cs->client); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer request (op %u, app %s) rejected by client\n", - incoming->spec->operation, - GNUNET_h2s (&incoming->spec->app_id)); - GNUNET_CADET_channel_destroy (incoming->channel); - GNUNET_SERVICE_client_continue (client); + op->listener->operation, + GNUNET_h2s (&cs->listener->app_id)); + GNUNET_CADET_channel_destroy (op->channel); + GNUNET_SERVICE_client_continue (cs->client); } @@ -1461,13 +1354,14 @@ handle_client_reject (void *cls, * Called when a client wants to add or remove an element to a set it inhabits. * * @param cls client that sent the message - * @param m message sent by the client + * @param msg message sent by the client */ static int check_client_mutation (void *cls, - const struct GNUNET_MessageHeader *m) + const struct GNUNET_SET_ElementMessage *msg) { - /* FIXME: any check we might want to do here? */ + /* NOTE: Technically, we should probably check with the + block library whether the element we are given is well-formed */ return GNUNET_OK; } @@ -1476,24 +1370,23 @@ check_client_mutation (void *cls, * Called when a client wants to add or remove an element to a set it inhabits. * * @param cls client that sent the message - * @param m message sent by the client + * @param msg message sent by the client */ static void handle_client_mutation (void *cls, - const struct GNUNET_MessageHeader *m) + const struct GNUNET_SET_ElementMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { /* client without a set requested an operation */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - GNUNET_SERVICE_client_continue (client); + GNUNET_SERVICE_client_continue (cs->client); if (0 != set->content->iterator_count) { @@ -1502,7 +1395,7 @@ handle_client_mutation (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); pm = GNUNET_new (struct PendingMutation); - pm->mutation_message = GNUNET_copy_message (m); + pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); pm->set = set; GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, set->content->pending_mutations_tail, @@ -1512,7 +1405,7 @@ handle_client_mutation (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); execute_mutation (set, - m); + msg); } @@ -1577,7 +1470,7 @@ static void handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Operation *op = GNUNET_new (struct Operation); const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { GNUNET_MQ_hd_var_size (incoming_msg, @@ -1643,45 +1536,38 @@ handle_client_evaluate (void *cls, GNUNET_MQ_handler_end () }; struct Set *set; - struct OperationSpecification *spec; const struct GNUNET_MessageHeader *context; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { GNUNET_break (0); GNUNET_free (op); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - spec = GNUNET_new (struct OperationSpecification); - spec->operation = set->operation; - spec->app_id = msg->app_id; - spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, - UINT32_MAX); - spec->peer = msg->target_peer; - spec->set = set; - spec->result_mode = ntohl (msg->result_mode); - spec->client_request_id = ntohl (msg->request_id); - spec->byzantine = msg->byzantine; - spec->byzantine_lower_bound = msg->byzantine_lower_bound; - spec->force_full = msg->force_full; - spec->force_delta = msg->force_delta; + op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, + UINT32_MAX); + op->peer = msg->target_peer; + op->result_mode = ntohl (msg->result_mode); + op->client_request_id = ntohl (msg->request_id); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; context = GNUNET_MQ_extract_nested_mh (msg); - op->spec = spec; - // Advance generation values, so that - // mutations won't interfer with the running operation. + /* Advance generation values, so that + mutations won't interfer with the running operation. */ + op->set = set; op->generation_created = set->current_generation; advance_generation (set); - op->operation = set->operation; - op->vt = set->vt; GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating new CADET channel to port %s\n", - GNUNET_h2s (&msg->app_id)); + "Creating new CADET channel to port %s for set operation type %u\n", + GNUNET_h2s (&msg->app_id), + set->operation); op->channel = GNUNET_CADET_channel_creatE (cadet, op, &msg->target_peer, @@ -1691,9 +1577,15 @@ handle_client_evaluate (void *cls, &channel_end_cb, cadet_handlers); op->mq = GNUNET_CADET_get_mq (op->channel); - set->vt->evaluate (op, - context); - GNUNET_SERVICE_client_continue (client); + op->state = set->vt->evaluate (op, + context); + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + GNUNET_SERVICE_client_continue (cs->client); } @@ -1709,15 +1601,14 @@ static void handle_client_iter_ack (void *cls, const struct GNUNET_SET_IterAckMessage *ack) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { /* client without a set acknowledged receiving a value */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } if (NULL == set->iter) @@ -1725,10 +1616,10 @@ handle_client_iter_ack (void *cls, /* client sent an ack, but we were not expecting one (as set iteration has finished) */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - GNUNET_SERVICE_client_continue (client); + GNUNET_SERVICE_client_continue (cs->client); if (ntohl (ack->send_more)) { send_client_element (set); @@ -1752,42 +1643,33 @@ static void handle_client_copy_lazy_prepare (void *cls, const struct GNUNET_MessageHeader *mh) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; struct LazyCopyRequest *cr; struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { /* client without a set requested an operation */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client requested creation of lazy copy\n"); cr = GNUNET_new (struct LazyCopyRequest); - - cr->cookie = lazy_copy_cookie; - lazy_copy_cookie += 1; + cr->cookie = ++lazy_copy_cookie; cr->source_set = set; - GNUNET_CONTAINER_DLL_insert (lazy_copy_head, lazy_copy_tail, cr); - - ev = GNUNET_MQ_msg (resp_msg, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); resp_msg->cookie = cr->cookie; - GNUNET_MQ_send (set->client_mq, ev); - - - GNUNET_SERVICE_client_continue (client); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client requested lazy copy\n"); + GNUNET_MQ_send (set->cs->mq, + ev); + GNUNET_SERVICE_client_continue (cs->client); } @@ -1801,21 +1683,19 @@ static void handle_client_copy_lazy_connect (void *cls, const struct GNUNET_SET_CopyLazyConnectMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct LazyCopyRequest *cr; struct Set *set; int found; - if (NULL != set_get (client)) + if (NULL != cs->set) { /* There can only be one set per client */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - found = GNUNET_NO; - for (cr = lazy_copy_head; NULL != cr; cr = cr->next) { if (cr->cookie == msg->cookie) @@ -1824,21 +1704,20 @@ handle_client_copy_lazy_connect (void *cls, break; } } - if (GNUNET_NO == found) { /* client asked for copy with cookie we don't know */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - GNUNET_CONTAINER_DLL_remove (lazy_copy_head, lazy_copy_tail, cr); - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client %p requested use of lazy copy\n", + cs); set = GNUNET_new (struct Set); - switch (cr->source_set->operation) { case GNUNET_SET_OPERATION_INTERSECTION: @@ -1858,37 +1737,28 @@ handle_client_copy_lazy_connect (void *cls, GNUNET_break (0); GNUNET_free (set); GNUNET_free (cr); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } set->operation = cr->source_set->operation; - set->state = set->vt->copy_state (cr->source_set); + set->state = set->vt->copy_state (cr->source_set->state); set->content = cr->source_set->content; - set->content->refcount += 1; + set->content->refcount++; set->current_generation = cr->source_set->current_generation; set->excluded_generations_size = cr->source_set->excluded_generations_size; - set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations, - set->excluded_generations_size * sizeof (struct GenerationRange)); + set->excluded_generations + = GNUNET_memdup (cr->source_set->excluded_generations, + set->excluded_generations_size * sizeof (struct GenerationRange)); /* Advance the generation of the new set, so that mutations to the of the cloned set and the source set are independent. */ advance_generation (set); - - - set->client = client; - set->client_mq = GNUNET_SERVICE_client_get_mq (client); - GNUNET_CONTAINER_DLL_insert (sets_head, - sets_tail, - set); - + set->cs = cs; + cs->set = set; GNUNET_free (cr); - - GNUNET_SERVICE_client_continue (client); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client connected to lazy set\n"); + GNUNET_SERVICE_client_continue (cs->client); } @@ -1902,26 +1772,22 @@ static void handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; struct Operation *op; int found; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { /* client without a set requested an operation */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client requested cancel for op %u\n", - (uint32_t) ntohl (msg->request_id)); found = GNUNET_NO; for (op = set->ops_head; NULL != op; op = op->next) { - if (op->spec->client_request_id == ntohl (msg->request_id)) + if (op->client_request_id == ntohl (msg->request_id)) { found = GNUNET_YES; break; @@ -1934,15 +1800,19 @@ handle_client_cancel (void *cls, * yet and try to cancel the (just barely non-existent) operation. * So this is not a hard error. */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client canceled non-existent op\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client canceled non-existent op %u\n", + (uint32_t) ntohl (msg->request_id)); } else { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client requested cancel for op %u\n", + (uint32_t) ntohl (msg->request_id)); _GSS_operation_destroy (op, GNUNET_YES); } - GNUNET_SERVICE_client_continue (client); + GNUNET_SERVICE_client_continue (cs->client); } @@ -1958,18 +1828,18 @@ static void handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) { - struct GNUNET_SERVICE_Client *client = cls; + struct ClientState *cs = cls; struct Set *set; struct Operation *op; struct GNUNET_SET_ResultMessage *result_message; struct GNUNET_MQ_Envelope *ev; + struct Listener *listener; - set = set_get (client); - if (NULL == set) + if (NULL == (set = cs->set)) { /* client without a set requested to accept */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (client); + GNUNET_SERVICE_client_drop (cs->client); return; } op = get_incoming (ntohl (msg->accept_reject_id)); @@ -1977,72 +1847,75 @@ handle_client_accept (void *cls, { /* It is not an error if the set op does not exist -- it may * have been destroyed when the partner peer disconnected. */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client accepted request that is no longer active\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Client %p accepted request %u of listener %p that is no longer active\n", + cs, + ntohl (msg->accept_reject_id), + cs->listener); ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT); result_message->request_id = msg->request_id; - result_message->element_type = 0; result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); - GNUNET_MQ_send (set->client_mq, ev); - GNUNET_SERVICE_client_continue (client); + GNUNET_MQ_send (set->cs->mq, + ev); + GNUNET_SERVICE_client_continue (cs->client); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client accepting request %u\n", (uint32_t) ntohl (msg->accept_reject_id)); - GNUNET_assert (GNUNET_YES == op->is_incoming); - op->is_incoming = GNUNET_NO; - GNUNET_CONTAINER_DLL_remove (incoming_head, - incoming_tail, + listener = op->listener; + op->listener = NULL; + GNUNET_CONTAINER_DLL_remove (listener->op_head, + listener->op_tail, op); - op->spec->set = set; + op->set = set; GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); - op->spec->client_request_id = ntohl (msg->request_id); - op->spec->result_mode = ntohl (msg->result_mode); - op->spec->byzantine = msg->byzantine; - op->spec->byzantine_lower_bound = msg->byzantine_lower_bound; - op->spec->force_full = msg->force_full; - op->spec->force_delta = msg->force_delta; - - // Advance generation values, so that - // mutations won't interfer with the running operation. + op->client_request_id = ntohl (msg->request_id); + op->result_mode = ntohl (msg->result_mode); + op->byzantine = msg->byzantine; + op->byzantine_lower_bound = msg->byzantine_lower_bound; + op->force_full = msg->force_full; + op->force_delta = msg->force_delta; + + /* Advance generation values, so that future mutations do not + interfer with the running operation. */ op->generation_created = set->current_generation; advance_generation (set); - - op->vt = set->vt; - op->operation = set->operation; - op->vt->accept (op); - GNUNET_SERVICE_client_continue (client); + GNUNET_assert (NULL == op->state); + op->state = set->vt->accept (op); + if (NULL == op->state) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (cs->client); + return; + } + /* Now allow CADET to continue, as we did not do this in + #handle_incoming_msg (as we wanted to first see if the + local client would accept the request). */ + GNUNET_CADET_receive_done (op->channel); + GNUNET_SERVICE_client_continue (cs->client); } /** * Called to clean up, after a shutdown has been requested. * - * @param cls closure + * @param cls closure, NULL */ static void shutdown_task (void *cls) { - while (NULL != incoming_head) - incoming_destroy (incoming_head); - while (NULL != listeners_head) - listener_destroy (listeners_head); - while (NULL != sets_head) - set_destroy (sets_head); - - /* it's important to destroy cadet at the end, as all channels - * must be destroyed before the cadet handle! */ + /* Delay actual shutdown to allow service to disconnect clients */ if (NULL != cadet) { GNUNET_CADET_disconnect (cadet); cadet = NULL; } - GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); + GNUNET_STATISTICS_destroy (_GSS_statistics, + GNUNET_YES); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); } @@ -2061,15 +1934,19 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service) { - configuration = cfg; + /* FIXME: need to modify SERVICE (!) API to allow + us to run a shutdown task *after* clients were + forcefully disconnected! */ GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); - _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); + _GSS_statistics = GNUNET_STATISTICS_create ("set", + cfg); cadet = GNUNET_CADET_connecT (cfg); if (NULL == cadet) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Could not connect to CADET service\n")); + GNUNET_SCHEDULER_shutdown (); return; } } @@ -2095,7 +1972,7 @@ GNUNET_SERVICE_MAIN NULL), GNUNET_MQ_hd_var_size (client_mutation, GNUNET_MESSAGE_TYPE_SET_ADD, - struct GNUNET_MessageHeader, + struct GNUNET_SET_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size (client_create_set, GNUNET_MESSAGE_TYPE_SET_CREATE, @@ -2119,7 +1996,7 @@ GNUNET_SERVICE_MAIN NULL), GNUNET_MQ_hd_var_size (client_mutation, GNUNET_MESSAGE_TYPE_SET_REMOVE, - struct GNUNET_MessageHeader, + struct GNUNET_SET_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size (client_cancel, GNUNET_MESSAGE_TYPE_SET_CANCEL, diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 86313d179..19413fd30 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013, 2014 GNUnet e.V. + Copyright (C) 2013-2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -67,85 +67,6 @@ struct ElementEntry; struct Operation; -/** - * Detail information about an operation. - */ -struct OperationSpecification -{ - - /** - * The remove peer we evaluate the operation with. - */ - struct GNUNET_PeerIdentity peer; - - /** - * Application ID for the operation, used to distinguish - * multiple operations of the same type with the same peer. - */ - struct GNUNET_HashCode app_id; - - /** - * Context message, may be NULL. - */ - struct GNUNET_MessageHeader *context_msg; - - /** - * Set associated with the operation, NULL until the spec has been - * associated with a set. - */ - struct Set *set; - - /** - * Salt to use for the operation. - */ - uint32_t salt; - - /** - * Remote peers element count - */ - uint32_t remote_element_count; - - /** - * ID used to identify an operation between service and client - */ - uint32_t client_request_id; - - /** - * The type of the operation. - */ - enum GNUNET_SET_OperationType operation; - - /** - * When are elements sent to the client, and which elements are sent? - */ - enum GNUNET_SET_ResultMode result_mode; - - /** - * Always use delta operation instead of sending full sets, - * even it it's less efficient. - */ - int force_delta; - - /** - * Always send full sets, even if delta operations would - * be more efficient. - */ - int force_full; - - /** - * #GNUNET_YES to fail operations where Byzantine faults - * are suspected - */ - int byzantine; - - /** - * Lower bound for the set size, used only when - * byzantine mode is enabled. - */ - int byzantine_lower_bound; -}; - - /** * Signature of functions that create the implementation-specific * state for a set supporting a specific operation. @@ -153,7 +74,7 @@ struct OperationSpecification * @return a set state specific to the supported operation, NULL on error */ typedef struct SetState * -(*CreateImpl) (void); +(*SetCreateImpl) (void); /** @@ -164,18 +85,18 @@ typedef struct SetState * * @param ee element message from the client */ typedef void -(*AddRemoveImpl) (struct SetState *state, +(*SetAddRemoveImpl) (struct SetState *state, struct ElementEntry *ee); /** - * Signature of functions that handle disconnection of the remote - * peer. + * Make a copy of a set's internal state. * - * @param op the set operation, contains implementation-specific data + * @param state set state to copy + * @return copy of the internal state */ -typedef void -(*PeerDisconnectImpl) (struct Operation *op); +typedef struct SetState * +(*SetCopyStateImpl) (struct SetState *state); /** @@ -185,7 +106,7 @@ typedef void * @param state the set state, contains implementation-specific data */ typedef void -(*DestroySetImpl) (struct SetState *state); +(*SetDestroyImpl) (struct SetState *state); /** @@ -193,8 +114,9 @@ typedef void * * @param op operation that is created by accepting the operation, * should be initialized by the implementation + * @return operation-specific state to keep in @a op */ -typedef void +typedef struct OperationState * (*OpAcceptImpl) (struct Operation *op); @@ -206,23 +128,31 @@ typedef void * begin the evaluation * @param opaque_context message to be transmitted to the listener * to convince him to accept, may be NULL + * @return operation-specific state to keep in @a op */ -typedef void +typedef struct OperationState * (*OpEvaluateImpl) (struct Operation *op, const struct GNUNET_MessageHeader *opaque_context); - /** - * Signature of functions that implement operation cancellation + * Signature of functions that implement operation cancelation. + * This includes notifying the client about the operation's final + * state. * * @param op operation state */ typedef void -(*CancelImpl) (struct Operation *op); +(*OpCancelImpl) (struct Operation *op); -typedef struct SetState * -(*CopyStateImpl) (struct Set *op); +/** + * Signature of functions called when the CADET channel died. + * + * @param op operation state + */ +typedef void +(*OpChannelDeathImpl) (struct Operation *op); + /** @@ -234,17 +164,27 @@ struct SetVT /** * Callback for the set creation. */ - CreateImpl create; + SetCreateImpl create; /** * Callback for element insertion */ - AddRemoveImpl add; + SetAddRemoveImpl add; /** * Callback for element removal. */ - AddRemoveImpl remove; + SetAddRemoveImpl remove; + + /** + * Callback for making a copy of a set's internal state. + */ + SetCopyStateImpl copy_state; + + /** + * Callback for destruction of the set state. + */ + SetDestroyImpl destroy_set; /** * Callback for accepting a set operation request @@ -257,21 +197,15 @@ struct SetVT OpEvaluateImpl evaluate; /** - * Callback for destruction of the set state. - */ - DestroySetImpl destroy_set; - - /** - * Callback for handling the remote peer's disconnect. + * Callback for canceling an operation. */ - PeerDisconnectImpl peer_disconnect; + OpCancelImpl cancel; /** - * Callback for canceling an operation by its ID. + * Callback called in case the CADET channel died. */ - CancelImpl cancel; + OpChannelDeathImpl channel_death; - CopyStateImpl copy_state; }; @@ -341,20 +275,56 @@ struct ElementEntry }; +/** + * A listener is inhabited by a client, and waits for evaluation + * requests from remote peers. + */ struct Listener; +/** + * State we keep per client. + */ +struct ClientState +{ + /** + * Set, if associated with the client, otherwise NULL. + */ + struct Set *set; + + /** + * Listener, if associated with the client, otherwise NULL. + */ + struct Listener *listener; + + /** + * Client handle. + */ + struct GNUNET_SERVICE_Client *client; + + /** + * Message queue. + */ + struct GNUNET_MQ_Handle *mq; + +}; + + /** * Operation context used to execute a set operation. */ struct Operation { + /** - * V-Table for the operation belonging to the tunnel contest. - * - * Used for all operation specific operations after receiving the ops request + * Kept in a DLL of the listener, if @e listener is non-NULL. */ - const struct SetVT *vt; + struct Operation *next; + + /** + * Kept in a DLL of the listener, if @e listener is non-NULL. + */ + struct Operation *prev; /** * Channel to the peer. @@ -372,11 +342,15 @@ struct Operation struct GNUNET_MQ_Handle *mq; /** - * Detail information about the set operation, including the set to - * use. When 'spec' is NULL, the operation is not yet entirely - * initialized. + * Context message, may be NULL. + */ + struct GNUNET_MessageHeader *context_msg; + + /** + * Set associated with the operation, NULL until the spec has been + * associated with a set. */ - struct OperationSpecification *spec; + struct Set *set; /** * Operation-specific operation state. Note that the exact @@ -385,16 +359,6 @@ struct Operation */ struct OperationState *state; - /** - * Evaluate operations are held in a linked list. - */ - struct Operation *next; - - /** - * Evaluate operations are held in a linked list. - */ - struct Operation *prev; - /** * The identity of the requesting peer. Needs to * be stored here as the op spec might not have been created yet. @@ -408,9 +372,48 @@ struct Operation struct GNUNET_SCHEDULER_Task *timeout_task; /** - * The type of the operation. + * Salt to use for the operation. */ - enum GNUNET_SET_OperationType operation; + uint32_t salt; + + /** + * Remote peers element count + */ + uint32_t remote_element_count; + + /** + * ID used to identify an operation between service and client + */ + uint32_t client_request_id; + + /** + * When are elements sent to the client, and which elements are sent? + */ + enum GNUNET_SET_ResultMode result_mode; + + /** + * Always use delta operation instead of sending full sets, + * even it it's less efficient. + */ + int force_delta; + + /** + * Always send full sets, even if delta operations would + * be more efficient. + */ + int force_full; + + /** + * #GNUNET_YES to fail operations where Byzantine faults + * are suspected + */ + int byzantine; + + /** + * Lower bound for the set size, used only when + * byzantine mode is enabled. + */ + int byzantine_lower_bound; /** * Unique request id for the request from a remote peer, sent to the @@ -419,46 +422,27 @@ struct Operation */ uint32_t suggest_id; - /** - * #GNUNET_YES if this is not a "real" set operation yet, and we still - * need to wait for the other peer to give us more details. - */ - int is_incoming; - /** * Generation in which the operation handle * was created. */ unsigned int generation_created; - /** - * Incremented whenever (during shutdown) some component still - * needs to do something with this before the operation is freed. - * (Used as a reference counter, but only during termination.) - */ - unsigned int keep; }; /** - * SetContent stores the actual set elements, - * which may be shared by multiple generations derived - * from one set. + * SetContent stores the actual set elements, which may be shared by + * multiple generations derived from one set. */ struct SetContent { - /** - * Number of references to the content. - */ - unsigned int refcount; /** * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`. */ struct GNUNET_CONTAINER_MultiHashMap *elements; - unsigned int latest_generation; - /** * Mutations requested by the client that we're * unable to execute right now because we're iterating @@ -473,6 +457,16 @@ struct SetContent */ struct PendingMutation *pending_mutations_tail; + /** + * Number of references to the content. + */ + unsigned int refcount; + + /** + * FIXME: document! + */ + unsigned int latest_generation; + /** * Number of concurrently active iterators. */ @@ -494,11 +488,24 @@ struct GenerationRange }; +/** + * Information about a mutation to apply to a set. + */ struct PendingMutation { + /** + * Mutations are kept in a DLL. + */ struct PendingMutation *prev; + + /** + * Mutations are kept in a DLL. + */ struct PendingMutation *next; + /** + * Set this mutation is about. + */ struct Set *set; /** @@ -506,7 +513,7 @@ struct PendingMutation * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or * #GNUNET_MESSAGE_TYPE_SET_REMOVE. */ - struct GNUNET_MessageHeader *mutation_message; + struct GNUNET_SET_ElementMessage *msg; }; @@ -530,12 +537,13 @@ struct Set * Client that owns the set. Only one client may own a set, * and there can only be one set per client. */ - struct GNUNET_SERVICE_Client *client; + struct ClientState *cs; /** - * Message queue for the client. + * Content, possibly shared by multiple sets, + * and thus reference counted. */ - struct GNUNET_MQ_Handle *client_mq; + struct SetContent *content; /** * Virtual table for this set. Determined by the operation type of @@ -568,15 +576,15 @@ struct Set struct Operation *ops_tail; /** - * Current generation, that is, number of previously executed - * operations and lazy copies on the underlying set content. + * List of generations we have to exclude, due to lazy copies. */ - unsigned int current_generation; + struct GenerationRange *excluded_generations; /** - * List of generations we have to exclude, due to lazy copies. + * Current generation, that is, number of previously executed + * operations and lazy copies on the underlying set content. */ - struct GenerationRange *excluded_generations; + unsigned int current_generation; /** * Number of elements in array @a excluded_generations. @@ -588,22 +596,17 @@ struct Set */ enum GNUNET_SET_OperationType operation; - /** - * Each @e iter is assigned a unique number, so that the client - * can distinguish iterations. - */ - uint16_t iteration_id; - /** * Generation we're currently iteration over. */ unsigned int iter_generation; /** - * Content, possibly shared by multiple sets, - * and thus reference counted. + * Each @e iter is assigned a unique number, so that the client + * can distinguish iterations. */ - struct SetContent *content; + uint16_t iteration_id; + }; @@ -611,10 +614,14 @@ extern struct GNUNET_STATISTICS_Handle *_GSS_statistics; /** - * Destroy the given operation. Call the implementation-specific - * cancel function of the operation. Disconnects from the remote - * peer. Does not disconnect the client, as there may be multiple - * operations per set. + * Destroy the given operation. Used for any operation where both + * peers were known and that thus actually had a vt and channel. Must + * not be used for operations where 'listener' is still set and we do + * not know the other peer. + * + * Call the implementation-specific cancel function of the operation. + * Disconnects from the remote peer. Does not disconnect the client, + * as there may be multiple operations per set. * * @param op operation to destroy * @param gc #GNUNET_YES to perform garbage collection on the set @@ -642,10 +649,13 @@ const struct SetVT * _GSS_intersection_vt (void); -int -_GSS_is_element_of_set (struct ElementEntry *ee, - struct Set *set); - +/** + * Is element @a ee part of the set used by @a op? + * + * @param ee element to test + * @param op operation the defines the set and its generation + * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not + */ int _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op); diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 8307672b9..9dc421792 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -55,6 +55,18 @@ enum IntersectionOperationPhase */ PHASE_BF_EXCHANGE, + /** + * We must next send the P2P DONE message (after finishing mostly + * with the local client). Then we will wait for the channel to close. + */ + PHASE_MUST_SEND_DONE, + + /** + * We have received the P2P DONE message, and must finish with the + * local client before terminating the channel. + */ + PHASE_DONE_RECEIVED, + /** * The protocol is over. Results may still have to be sent to the * client. @@ -162,6 +174,13 @@ struct OperationState * Did we send the client that we are done? */ int client_done_sent; + + /** + * Set whenever we reach the state where the death of the + * channel is perfectly find and should NOT result in the + * operation being cancelled. + */ + int channel_death_expected; }; @@ -193,12 +212,12 @@ send_client_removed_element (struct Operation *op, struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode) + if (GNUNET_SET_RESULT_REMOVED != op->result_mode) return; /* Wrong mode for transmitting removed elements */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending removed element (size %u) to client\n", element->size); - GNUNET_assert (0 != op->spec->client_request_id); + GNUNET_assert (0 != op->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); @@ -208,12 +227,12 @@ send_client_removed_element (struct Operation *op, return; } rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (op->spec->client_request_id); + rm->request_id = htonl (op->client_request_id); rm->element_type = element->element_type; GNUNET_memcpy (&rm[1], - element->data, - element->size); - GNUNET_MQ_send (op->spec->set->client_mq, + element->data, + element->size); + GNUNET_MQ_send (op->set->cs->mq, ev); } @@ -397,9 +416,9 @@ fail_intersection_operation (struct Operation *op) ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->request_id = htonl (op->spec->client_request_id); + msg->request_id = htonl (op->client_request_id); msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, + GNUNET_MQ_send (op->set->cs->mq, ev); _GSS_operation_destroy (op, GNUNET_YES); @@ -428,8 +447,8 @@ send_bloomfilter (struct Operation *op) should use more bits to maximize its set reduction potential and minimize overall bandwidth consumption. */ bf_elementbits = 2 + ceil (log2((double) - (op->spec->remote_element_count / - (double) op->state->my_element_count))); + (op->remote_element_count / + (double) op->state->my_element_count))); if (bf_elementbits < 1) bf_elementbits = 1; /* make sure k is not 0 */ /* optimize BF-size to ~50% of bits set */ @@ -515,18 +534,67 @@ send_client_done_and_destroy (void *cls) struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to local client\n"); ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); - rm->request_id = htonl (op->spec->client_request_id); + rm->request_id = htonl (op->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, + GNUNET_MQ_send (op->set->cs->mq, ev); _GSS_operation_destroy (op, GNUNET_YES); } +/** + * Remember that we are done dealing with the local client + * AND have sent the other peer our message that we are done, + * so we are not just waiting for the channel to die before + * telling the local client that we are done as our last act. + * + * @param cls the `struct Operation`. + */ +static void +finished_local_operations (void *cls) +{ + struct Operation *op = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "DONE sent to other peer, now waiting for other end to close the channel\n"); + op->state->phase = PHASE_FINISHED; + op->state->channel_death_expected = GNUNET_YES; +} + + +/** + * Notify the other peer that we are done. Once this message + * is out, we still need to notify the local client that we + * are done. + * + * @param op operation to notify for. + */ +static void +send_p2p_done (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + struct IntersectionDoneMessage *idm; + + GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); + GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); + ev = GNUNET_MQ_msg (idm, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); + idm->final_element_count = htonl (op->state->my_element_count); + idm->element_xor_hash = op->state->my_xor; + GNUNET_MQ_notify_sent (ev, + &finished_local_operations, + op); + GNUNET_MQ_send (op->mq, + ev); +} + + /** * Send all elements in the full result iterator. * @@ -550,10 +618,21 @@ send_remaining_elements (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending done and destroy because iterator ran out\n"); - op->keep--; GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); op->state->full_result_iter = NULL; - send_client_done_and_destroy (op); + if (PHASE_DONE_RECEIVED == op->state->phase) + { + op->state->phase = PHASE_FINISHED; + send_client_done_and_destroy (op); + } + else if (PHASE_MUST_SEND_DONE == op->state->phase) + { + send_p2p_done (op); + } + else + { + GNUNET_assert (0); + } return; } ee = nxt; @@ -562,48 +641,136 @@ send_remaining_elements (void *cls) "Sending element %s:%u to client (full set)\n", GNUNET_h2s (&ee->element_hash), element->size); - GNUNET_assert (0 != op->spec->client_request_id); + GNUNET_assert (0 != op->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); GNUNET_assert (NULL != ev); rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (op->spec->client_request_id); + rm->request_id = htonl (op->client_request_id); rm->element_type = element->element_type; GNUNET_memcpy (&rm[1], - element->data, - element->size); + element->data, + element->size); GNUNET_MQ_notify_sent (ev, &send_remaining_elements, op); - GNUNET_MQ_send (op->spec->set->client_mq, + GNUNET_MQ_send (op->set->cs->mq, ev); } /** - * Inform the peer that this operation is complete. + * Fills the "my_elements" hashmap with the initial set of + * (non-deleted) elements from the set of the specification. * - * @param op the intersection operation to fail + * @param cls closure with the `struct Operation *` + * @param key current key code for the element + * @param value value in the hash map with the `struct ElementEntry *` + * @return #GNUNET_YES (we should continue to iterate) + */ +static int +initialize_map_unfiltered (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ElementEntry *ee = value; + struct Operation *op = cls; + + if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + return GNUNET_YES; /* element not live in operation's generation */ + GNUNET_CRYPTO_hash_xor (&op->state->my_xor, + &ee->element_hash, + &op->state->my_xor); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initial full initialization of my_elements, adding %s:%u\n", + GNUNET_h2s (&ee->element_hash), + ee->element.size); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return GNUNET_YES; +} + + +/** + * Send our element count to the peer, in case our element count is + * lower than his. + * + * @param op intersection operation */ static void -send_peer_done (struct Operation *op) +send_element_count (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; - struct IntersectionDoneMessage *idm; + struct IntersectionElementInfoMessage *msg; - op->state->phase = PHASE_FINISHED; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Intersection succeeded, sending DONE\n"); - GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); - op->state->local_bf = NULL; + "Sending our element count (%u)\n", + op->state->my_element_count); + ev = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); + msg->sender_element_count = htonl (op->state->my_element_count); + GNUNET_MQ_send (op->mq, ev); +} - ev = GNUNET_MQ_msg (idm, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); - idm->final_element_count = htonl (op->state->my_element_count); - idm->element_xor_hash = op->state->my_xor; - GNUNET_MQ_send (op->mq, - ev); + +/** + * We go first, initialize our map with all elements and + * send the first Bloom filter. + * + * @param op operation to start exchange for + */ +static void +begin_bf_exchange (struct Operation *op) +{ + op->state->phase = PHASE_BF_EXCHANGE; + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &initialize_map_unfiltered, + op); + send_bloomfilter (op); +} + + +/** + * Handle the initial `struct IntersectionElementInfoMessage` from a + * remote peer. + * + * @param cls the intersection operation + * @param mh the header of the message + */ +void +handle_intersection_p2p_element_info (void *cls, + const struct IntersectionElementInfoMessage *msg) +{ + struct Operation *op = cls; + + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) + { + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + } + op->remote_element_count = ntohl (msg->sender_element_count); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received remote element count (%u), I have %u\n", + op->remote_element_count, + op->state->my_element_count); + if ( ( (PHASE_INITIAL != op->state->phase) && + (PHASE_COUNT_SENT != op->state->phase) ) || + (op->state->my_element_count > op->remote_element_count) || + (0 == op->state->my_element_count) || + (0 == op->remote_element_count) ) + { + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + } + GNUNET_break (NULL == op->state->remote_bf); + begin_bf_exchange (op); + GNUNET_CADET_receive_done (op->channel); } @@ -618,9 +785,9 @@ process_bf (struct Operation *op) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n", op->state->phase, - op->spec->remote_element_count, + op->remote_element_count, op->state->my_element_count, - GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); + GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); switch (op->state->phase) { case PHASE_INITIAL: @@ -631,7 +798,7 @@ process_bf (struct Operation *op) /* This is the first BF being sent, build our initial map with filtering in place */ op->state->my_element_count = 0; - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, &filtered_map_initialization, op); break; @@ -641,6 +808,14 @@ process_bf (struct Operation *op) &iterator_bf_reduce, op); break; + case PHASE_MUST_SEND_DONE: + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + case PHASE_DONE_RECEIVED: + GNUNET_break_op (0); + fail_intersection_operation(op); + return; case PHASE_FINISHED: GNUNET_break_op (0); fail_intersection_operation(op); @@ -650,13 +825,28 @@ process_bf (struct Operation *op) op->state->remote_bf = NULL; if ( (0 == op->state->my_element_count) || /* fully disjoint */ - ( (op->state->my_element_count == op->spec->remote_element_count) && + ( (op->state->my_element_count == op->remote_element_count) && (0 == memcmp (&op->state->my_xor, &op->state->other_xor, sizeof (struct GNUNET_HashCode))) ) ) { /* we are done */ - send_peer_done (op); + op->state->phase = PHASE_MUST_SEND_DONE; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Intersection succeeded, sending DONE to other peer\n"); + GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); + op->state->local_bf = NULL; + if (GNUNET_SET_RESULT_FULL == op->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + send_remaining_elements (op); + return; + } + send_p2p_done (op); return; } op->state->phase = PHASE_BF_EXCHANGE; @@ -677,7 +867,7 @@ check_intersection_p2p_bf (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -727,7 +917,7 @@ handle_intersection_p2p_bf (void *cls, bf_size, bf_bits_per_element); op->state->salt = ntohl (msg->sender_mutator); - op->spec->remote_element_count = ntohl (msg->sender_element_count); + op->remote_element_count = ntohl (msg->sender_element_count); process_bf (op); break; } @@ -740,7 +930,7 @@ handle_intersection_p2p_bf (void *cls, op->state->bf_bits_per_element = bf_bits_per_element; op->state->bf_data_offset = 0; op->state->salt = ntohl (msg->sender_mutator); - op->spec->remote_element_count = ntohl (msg->sender_element_count); + op->remote_element_count = ntohl (msg->sender_element_count); } else { @@ -749,7 +939,7 @@ handle_intersection_p2p_bf (void *cls, (op->state->bf_bits_per_element != bf_bits_per_element) || (op->state->bf_data_offset + chunk_size > bf_size) || (op->state->salt != ntohl (msg->sender_mutator)) || - (op->spec->remote_element_count != ntohl (msg->sender_element_count)) ) + (op->remote_element_count != ntohl (msg->sender_element_count)) ) { GNUNET_break_op (0); fail_intersection_operation (op); @@ -782,147 +972,6 @@ handle_intersection_p2p_bf (void *cls, } -/** - * Fills the "my_elements" hashmap with the initial set of - * (non-deleted) elements from the set of the specification. - * - * @param cls closure with the `struct Operation *` - * @param key current key code for the element - * @param value value in the hash map with the `struct ElementEntry *` - * @return #GNUNET_YES (we should continue to iterate) - */ -static int -initialize_map_unfiltered (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct ElementEntry *ee = value; - struct Operation *op = cls; - - if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) - return GNUNET_YES; /* element not live in operation's generation */ - GNUNET_CRYPTO_hash_xor (&op->state->my_xor, - &ee->element_hash, - &op->state->my_xor); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initial full initialization of my_elements, adding %s:%u\n", - GNUNET_h2s (&ee->element_hash), - ee->element.size); - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, - &ee->element_hash, - ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - return GNUNET_YES; -} - - -/** - * Send our element count to the peer, in case our element count is - * lower than his. - * - * @param op intersection operation - */ -static void -send_element_count (struct Operation *op) -{ - struct GNUNET_MQ_Envelope *ev; - struct IntersectionElementInfoMessage *msg; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending our element count (%u)\n", - op->state->my_element_count); - ev = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); - msg->sender_element_count = htonl (op->state->my_element_count); - GNUNET_MQ_send (op->mq, ev); -} - - -/** - * We go first, initialize our map with all elements and - * send the first Bloom filter. - * - * @param op operation to start exchange for - */ -static void -begin_bf_exchange (struct Operation *op) -{ - op->state->phase = PHASE_BF_EXCHANGE; - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, - &initialize_map_unfiltered, - op); - send_bloomfilter (op); -} - - -/** - * Handle the initial `struct IntersectionElementInfoMessage` from a - * remote peer. - * - * @param cls the intersection operation - * @param mh the header of the message - */ -void -handle_intersection_p2p_element_info (void *cls, - const struct IntersectionElementInfoMessage *msg) -{ - struct Operation *op = cls; - - if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) - { - GNUNET_break_op (0); - fail_intersection_operation(op); - return; - } - op->spec->remote_element_count = ntohl (msg->sender_element_count); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received remote element count (%u), I have %u\n", - op->spec->remote_element_count, - op->state->my_element_count); - if ( ( (PHASE_INITIAL != op->state->phase) && - (PHASE_COUNT_SENT != op->state->phase) ) || - (op->state->my_element_count > op->spec->remote_element_count) || - (0 == op->state->my_element_count) || - (0 == op->spec->remote_element_count) ) - { - GNUNET_break_op (0); - fail_intersection_operation(op); - return; - } - GNUNET_break (NULL == op->state->remote_bf); - begin_bf_exchange (op); - GNUNET_CADET_receive_done (op->channel); -} - - -/** - * Send a result message to the client indicating that the operation - * is over. After the result done message has been sent to the - * client, destroy the evaluate operation. - * - * @param op intersection operation - */ -static void -finish_and_destroy (struct Operation *op) -{ - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); - - if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending full result set (%u elements)\n", - GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); - op->state->full_result_iter - = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); - op->keep++; - send_remaining_elements (op); - return; - } - send_client_done_and_destroy (op); -} - - /** * Remove all elements from our hashmap. * @@ -970,10 +1019,10 @@ handle_intersection_p2p_done (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_INTERSECTION != op->operation) + if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation) { GNUNET_break_op (0); - fail_intersection_operation(op); + fail_intersection_operation (op); return; } if (PHASE_BF_EXCHANGE != op->state->phase) @@ -1005,9 +1054,22 @@ handle_intersection_p2p_done (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got IntersectionDoneMessage, have %u elements in intersection\n", op->state->my_element_count); - op->state->phase = PHASE_FINISHED; - finish_and_destroy (op); + op->state->phase = PHASE_DONE_RECEIVED; GNUNET_CADET_receive_done (op->channel); + + GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + if (GNUNET_SET_RESULT_FULL == op->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending full result set to client (%u elements)\n", + GNUNET_CONTAINER_multihashmap_size (op->state->my_elements)); + op->state->full_result_iter + = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements); + send_remaining_elements (op); + return; + } + op->state->phase = PHASE_FINISHED; + send_client_done_and_destroy (op); } @@ -1018,24 +1080,16 @@ handle_intersection_p2p_done (void *cls, * begin the evaluation * @param opaque_context message to be transmitted to the listener * to convince him to accept, may be NULL + * @return operation-specific state to keep in @a op */ -static void +static struct OperationState * intersection_evaluate (struct Operation *op, const struct GNUNET_MessageHeader *opaque_context) { + struct OperationState *state; struct GNUNET_MQ_Envelope *ev; struct OperationRequestMessage *msg; - op->state = GNUNET_new (struct OperationState); - /* we started the operation, thus we have to send the operation request */ - op->state->phase = PHASE_INITIAL; - op->state->my_element_count = op->spec->set->state->current_set_element_count; - op->state->my_elements - = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, - GNUNET_YES); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initiating intersection operation evaluation\n"); ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, opaque_context); @@ -1043,20 +1097,30 @@ intersection_evaluate (struct Operation *op, { /* the context message is too large!? */ GNUNET_break (0); - GNUNET_SERVICE_client_drop (op->spec->set->client); - return; + return NULL; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initiating intersection operation evaluation\n"); + state = GNUNET_new (struct OperationState); + /* we started the operation, thus we have to send the operation request */ + state->phase = PHASE_INITIAL; + state->my_element_count = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, + GNUNET_YES); + msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); - msg->element_count = htonl (op->state->my_element_count); + msg->element_count = htonl (state->my_element_count); GNUNET_MQ_send (op->mq, ev); - op->state->phase = PHASE_COUNT_SENT; + state->phase = PHASE_COUNT_SENT; if (NULL != opaque_context) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent op request with context message\n"); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent op request without context message\n"); + return state; } @@ -1066,53 +1130,33 @@ intersection_evaluate (struct Operation *op, * * @param op operation that will be accepted as an intersection operation */ -static void +static struct OperationState * intersection_accept (struct Operation *op) { + struct OperationState *state; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Accepting set intersection operation\n"); - op->state = GNUNET_new (struct OperationState); - op->state->phase = PHASE_INITIAL; - op->state->my_element_count - = op->spec->set->state->current_set_element_count; - GNUNET_assert (NULL == op->state->my_elements); - op->state->my_elements - = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, - op->spec->remote_element_count), + state = GNUNET_new (struct OperationState); + state->phase = PHASE_INITIAL; + state->my_element_count + = op->set->state->current_set_element_count; + state->my_elements + = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, + op->remote_element_count), GNUNET_YES); - if (op->spec->remote_element_count < op->state->my_element_count) + op->state = state; + if (op->remote_element_count < state->my_element_count) { /* If the other peer (Alice) has fewer elements than us (Bob), we just send the count as Alice should send the first BF */ send_element_count (op); - op->state->phase = PHASE_COUNT_SENT; - return; + state->phase = PHASE_COUNT_SENT; + return state; } /* We have fewer elements, so we start with the BF */ begin_bf_exchange (op); -} - - -/** - * Handler for peer-disconnects, notifies the client about the aborted - * operation. If we did not expect anything from the other peer, we - * gracefully terminate the operation. - * - * @param op the destroyed operation - */ -static void -intersection_peer_disconnect (struct Operation *op) -{ - if (PHASE_FINISHED != op->state->phase) - { - fail_intersection_operation (op); - return; - } - /* the session has already been concluded */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Other peer disconnected (finished)\n"); - if (GNUNET_NO == op->state->client_done_sent) - finish_and_destroy (op); + return state; } @@ -1214,6 +1258,28 @@ intersection_remove (struct SetState *set_state, } +/** + * Callback for channel death for the intersection operation. + * + * @param op operation that lost the channel + */ +static void +intersection_channel_death (struct Operation *op) +{ + if (GNUNET_YES == op->state->channel_death_expected) + { + /* oh goodie, we are done! */ + send_client_done_and_destroy (op); + } + else + { + /* sorry, channel went down early, too bad. */ + _GSS_operation_destroy (op, + GNUNET_YES); + } +} + + /** * Get the table with implementing functions for set intersection. * @@ -1229,8 +1295,8 @@ _GSS_intersection_vt () .destroy_set = &intersection_set_destroy, .evaluate = &intersection_evaluate, .accept = &intersection_accept, - .peer_disconnect = &intersection_peer_disconnect, .cancel = &intersection_op_cancel, + .channel_death = &intersection_channel_death, }; return &intersection_vt; diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 9eaf12fef..fc7e578e6 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -368,9 +368,10 @@ fail_union_operation (struct Operation *op) "union operation failed\n"); ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->request_id = htonl (op->spec->client_request_id); + msg->request_id = htonl (op->client_request_id); msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, ev); + GNUNET_MQ_send (op->set->cs->mq, + ev); _GSS_operation_destroy (op, GNUNET_YES); } @@ -401,7 +402,14 @@ get_ibf_key (const struct GNUNET_HashCode *src) */ struct GetElementContext { + /** + * FIXME. + */ struct GNUNET_HashCode hash; + + /** + * FIXME. + */ struct KeyEntry *k; }; @@ -504,6 +512,9 @@ op_register_element (struct Operation *op, } +/** + * FIXME. + */ static void salt_key (const struct IBF_Key *k_in, uint32_t salt, @@ -517,6 +528,9 @@ salt_key (const struct IBF_Key *k_in, } +/** + * FIXME. + */ static void unsalt_key (const struct IBF_Key *k_in, uint32_t salt, @@ -550,7 +564,9 @@ prepare_ibf_iterator (void *cls, (void *) op, (unsigned long) ke->ibf_key.key_val, GNUNET_h2s (&ke->element->element_hash)); - salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); + salt_key (&ke->ibf_key, + op->state->salt_send, + &salted_key); ibf_insert (op->state->local_ibf, salted_key); return GNUNET_YES; } @@ -576,12 +592,14 @@ init_key_to_element_iterator (void *cls, /* make sure that the element belongs to the set at the time * of creating the operation */ - if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) + if (GNUNET_NO == + _GSS_is_element_of_operation (ee, + op)) return GNUNET_YES; - GNUNET_assert (GNUNET_NO == ee->remote); - - op_register_element (op, ee, GNUNET_NO); + op_register_element (op, + ee, + GNUNET_NO); return GNUNET_YES; } @@ -598,9 +616,11 @@ initialize_key_to_element (struct Operation *op) unsigned int len; GNUNET_assert (NULL == op->state->key_to_element); - len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); + len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements); op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); + GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &init_key_to_element_iterator, + op); } @@ -706,44 +726,6 @@ send_ibf (struct Operation *op, } -/** - * Send a strata estimator to the remote peer. - * - * @param op the union operation with the remote peer - */ -static void -send_strata_estimator (struct Operation *op) -{ - const struct StrataEstimator *se = op->state->se; - struct GNUNET_MQ_Envelope *ev; - struct StrataEstimatorMessage *strata_msg; - char *buf; - size_t len; - uint16_t type; - - buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); - len = strata_estimator_write (op->state->se, - buf); - if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) - type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; - else - type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; - ev = GNUNET_MQ_msg_extra (strata_msg, - len, - type); - GNUNET_memcpy (&strata_msg[1], - buf, - len); - GNUNET_free (buf); - strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); - GNUNET_MQ_send (op->mq, - ev); - op->state->phase = PHASE_EXPECT_IBF; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "sent SE, expecting IBF\n"); -} - - /** * Compute the necessary order of an ibf * from the size of the symmetric set difference. @@ -777,7 +759,7 @@ get_order_from_difference (unsigned int diff) * @return #GNUNET_YES (to continue iterating) */ static int -send_element_iterator (void *cls, +send_full_element_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { @@ -803,16 +785,23 @@ send_element_iterator (void *cls, } +/** + * Switch to full set transmission for @a op. + * + * @param op operation to switch to full set transmission. + */ static void send_full_set (struct Operation *op) { struct GNUNET_MQ_Envelope *ev; op->state->phase = PHASE_FULL_SENDING; + LOG (GNUNET_ERROR_TYPE_INFO, + "Dedicing to transmit the full set\n"); /* FIXME: use a more memory-friendly way of doing this with an iterator, just as we do in the non-full case! */ - (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, - &send_element_iterator, + (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements, + &send_full_element_iterator, op); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); GNUNET_MQ_send (op->mq, @@ -923,15 +912,15 @@ handle_union_p2p_strata_estimator (void *cls, } } - if ( (GNUNET_YES == op->spec->byzantine) && - (other_size < op->spec->byzantine_lower_bound) ) + if ( (GNUNET_YES == op->byzantine) && + (other_size < op->byzantine_lower_bound) ) { GNUNET_break (0); fail_union_operation (op); return; } - if ( (GNUNET_YES == op->spec->force_full) || + if ( (GNUNET_YES == op->force_full) || (diff > op->state->initial_size / 4) || (0 == other_size) ) { @@ -1058,14 +1047,16 @@ decode_and_send (struct Operation *op) GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); if (GNUNET_OK != - prepare_ibf (op, op->state->remote_ibf->size)) + prepare_ibf (op, + op->state->remote_ibf->size)) { GNUNET_break (0); /* allocation failed */ return GNUNET_SYSERR; } diff_ibf = ibf_dup (op->state->local_ibf); - ibf_subtract (diff_ibf, op->state->remote_ibf); + ibf_subtract (diff_ibf, + op->state->remote_ibf); ibf_destroy (op->state->remote_ibf); op->state->remote_ibf = NULL; @@ -1162,8 +1153,12 @@ decode_and_send (struct Operation *op) if (1 == side) { struct IBF_Key unsalted_key; - unsalt_key (&key, op->state->salt_receive, &unsalted_key); - send_offers_for_key (op, unsalted_key); + + unsalt_key (&key, + op->state->salt_receive, + &unsalted_key); + send_offers_for_key (op, + unsalted_key); } else if (-1 == side) { @@ -1211,7 +1206,7 @@ check_union_p2p_ibf (void *cls, struct Operation *op = cls; unsigned int buckets_in_message; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1304,6 +1299,8 @@ handle_union_p2p_ibf (void *cls, else { GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); + LOG (GNUNET_ERROR_TYPE_INFO, + "Received more of IBF\n"); } GNUNET_assert (NULL != op->state->remote_ibf); @@ -1351,7 +1348,7 @@ send_client_element (struct Operation *op, LOG (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); - GNUNET_assert (0 != op->spec->client_request_id); + GNUNET_assert (0 != op->client_request_id); ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); if (NULL == ev) { @@ -1360,11 +1357,14 @@ send_client_element (struct Operation *op, return; } rm->result_status = htons (status); - rm->request_id = htonl (op->spec->client_request_id); + rm->request_id = htonl (op->client_request_id); rm->element_type = htons (element->element_type); rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); - GNUNET_memcpy (&rm[1], element->data, element->size); - GNUNET_MQ_send (op->spec->set->client_mq, ev); + GNUNET_memcpy (&rm[1], + element->data, + element->size); + GNUNET_MQ_send (op->set->cs->mq, + ev); } @@ -1381,14 +1381,19 @@ send_done_and_destroy (void *cls) struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); - rm->request_id = htonl (op->spec->client_request_id); + LOG (GNUNET_ERROR_TYPE_INFO, + "Signalling client that union operation is done\n"); + ev = GNUNET_MQ_msg (rm, + GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (op->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); rm->element_type = htons (0); rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); - GNUNET_MQ_send (op->spec->set->client_mq, ev); + GNUNET_MQ_send (op->set->cs->mq, + ev); /* Will also call the union-specific cancel function. */ - _GSS_operation_destroy (op, GNUNET_YES); + _GSS_operation_destroy (op, + GNUNET_YES); } @@ -1415,8 +1420,8 @@ maybe_finish (struct Operation *op) op->state->phase = PHASE_DONE; ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); - GNUNET_MQ_send (op->mq, ev); - + GNUNET_MQ_send (op->mq, + ev); /* We now wait until the other peer closes the channel * after it got all elements from us. */ } @@ -1447,7 +1452,7 @@ check_union_p2p_elements (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1535,7 +1540,7 @@ handle_union_p2p_elements (void *cls, op->state->received_fresh++; op_register_element (op, ee, GNUNET_YES); /* only send results immediately if the client wants it */ - switch (op->spec->result_mode) + switch (op->result_mode) { case GNUNET_SET_RESULT_ADDED: send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); @@ -1575,7 +1580,7 @@ check_union_p2p_full_element (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1644,7 +1649,7 @@ handle_union_p2p_full_element (void *cls, op->state->received_fresh++; op_register_element (op, ee, GNUNET_YES); /* only send results immediately if the client wants it */ - switch (op->spec->result_mode) + switch (op->result_mode) { case GNUNET_SET_RESULT_ADDED: send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); @@ -1659,7 +1664,7 @@ handle_union_p2p_full_element (void *cls, } } - if ( (GNUNET_YES == op->spec->byzantine) && + if ( (GNUNET_YES == op->byzantine) && (op->state->received_total > 384 + op->state->received_fresh * 4) && (op->state->received_fresh < op->state->received_total / 6) ) { @@ -1690,7 +1695,7 @@ check_union_p2p_inquiry (void *cls, struct Operation *op = cls; unsigned int num_keys; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1727,6 +1732,8 @@ handle_union_p2p_inquiry (void *cls, const struct IBF_Key *ibf_key; unsigned int num_keys; + LOG (GNUNET_ERROR_TYPE_INFO, + "Received union inquiry\n"); num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) / sizeof (struct IBF_Key); ibf_key = (const struct IBF_Key *) &msg[1]; @@ -1734,8 +1741,11 @@ handle_union_p2p_inquiry (void *cls, { struct IBF_Key unsalted_key; - unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); - send_offers_for_key (op, unsalted_key); + unsalt_key (ibf_key, + ntohl (msg->salt), + &unsalted_key); + send_offers_for_key (op, + unsalted_key); ibf_key++; } GNUNET_CADET_receive_done (op->channel); @@ -1753,9 +1763,9 @@ handle_union_p2p_inquiry (void *cls, * #GNUNET_NO if not. */ static int -send_missing_elements_iter (void *cls, - uint32_t key, - void *value) +send_missing_full_elements_iter (void *cls, + uint32_t key, + void *value) { struct Operation *op = cls; struct KeyEntry *ke = value; @@ -1765,13 +1775,15 @@ send_missing_elements_iter (void *cls, if (GNUNET_YES == ke->received) return GNUNET_YES; - - ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); - GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); - emsg->reserved = htons (0); + ev = GNUNET_MQ_msg_extra (emsg, + ee->element.size, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + GNUNET_memcpy (&emsg[1], + ee->element.data, + ee->element.size); emsg->element_type = htons (ee->element.element_type); - GNUNET_MQ_send (op->mq, ev); - + GNUNET_MQ_send (op->mq, + ev); return GNUNET_YES; } @@ -1790,7 +1802,7 @@ handle_union_p2p_request_full (void *cls, LOG (GNUNET_ERROR_TYPE_INFO, "Received request for full set transmission\n"); - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); fail_union_operation (op); @@ -1833,11 +1845,15 @@ handle_union_p2p_full_done (void *cls, /* send all the elements that did not come from the remote peer */ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, - &send_missing_elements_iter, + &send_missing_full_elements_iter, op); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); - GNUNET_MQ_send (op->mq, ev); + GNUNET_MQ_notify_sent (ev, + &send_done_and_destroy, + op); + GNUNET_MQ_send (op->mq, + ev); op->state->phase = PHASE_DONE; /* we now wait until the other peer shuts the tunnel down*/ } @@ -1880,7 +1896,7 @@ check_union_p2p_demand (void *cls, struct Operation *op = cls; unsigned int num_hashes; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1921,7 +1937,7 @@ handle_union_p2p_demand (void *cls, num_hashes > 0; hash++, num_hashes--) { - ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, hash); if (NULL == ee) { @@ -1952,7 +1968,7 @@ handle_union_p2p_demand (void *cls, 1, GNUNET_NO); - switch (op->spec->result_mode) + switch (op->result_mode) { case GNUNET_SET_RESULT_ADDED: /* Nothing to do. */ @@ -1984,7 +2000,7 @@ check_union_p2p_offer (void *cls, struct Operation *op = cls; unsigned int num_hashes; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -1998,8 +2014,8 @@ check_union_p2p_offer (void *cls, } num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) / sizeof (struct GNUNET_HashCode); - if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) - != num_hashes * sizeof (struct GNUNET_HashCode)) + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) != + num_hashes * sizeof (struct GNUNET_HashCode)) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -2033,7 +2049,7 @@ handle_union_p2p_offer (void *cls, struct GNUNET_MessageHeader *demands; struct GNUNET_MQ_Envelope *ev; - ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements, hash); if (NULL != ee) if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) @@ -2060,7 +2076,9 @@ handle_union_p2p_offer (void *cls, ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); - *(struct GNUNET_HashCode *) &demands[1] = *hash; + GNUNET_memcpy (&demands[1], + hash, + sizeof (struct GNUNET_HashCode)); GNUNET_MQ_send (op->mq, ev); } GNUNET_CADET_receive_done (op->channel); @@ -2079,7 +2097,7 @@ handle_union_p2p_done (void *cls, { struct Operation *op = cls; - if (GNUNET_SET_OPERATION_UNION != op->operation) + if (GNUNET_SET_OPERATION_UNION != op->set->operation) { GNUNET_break_op (0); fail_union_operation (op); @@ -2134,21 +2152,31 @@ handle_union_p2p_done (void *cls, * @param opaque_context message to be transmitted to the listener * to convince him to accept, may be NULL */ -static void +static struct OperationState * union_evaluate (struct Operation *op, const struct GNUNET_MessageHeader *opaque_context) { + struct OperationState *state; struct GNUNET_MQ_Envelope *ev; struct OperationRequestMessage *msg; - GNUNET_assert (NULL == op->state); - op->state = GNUNET_new (struct OperationState); - op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + ev = GNUNET_MQ_msg_nested_mh (msg, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + opaque_context); + if (NULL == ev) + { + /* the context message is too large */ + GNUNET_break (0); + return NULL; + } + state = GNUNET_new (struct OperationState); + state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, + GNUNET_NO); /* copy the current generation's strata estimator for this operation */ - op->state->se = strata_estimator_dup (op->spec->set->state->se); + state->se = strata_estimator_dup (op->set->state->se); /* we started the operation, thus we have to send the operation request */ - op->state->phase = PHASE_EXPECT_SE; - op->state->salt_receive = op->state->salt_send = 42; + state->phase = PHASE_EXPECT_SE; + state->salt_receive = state->salt_send = 42; // FIXME????? LOG (GNUNET_ERROR_TYPE_DEBUG, "Initiating union operation evaluation\n"); GNUNET_STATISTICS_update (_GSS_statistics, @@ -2159,16 +2187,6 @@ union_evaluate (struct Operation *op, "# of initiated union operations", 1, GNUNET_NO); - ev = GNUNET_MQ_msg_nested_mh (msg, - GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - opaque_context); - if (NULL == ev) - { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVICE_client_drop (op->spec->set->client); - return; - } msg->operation = htonl (GNUNET_SET_OPERATION_UNION); GNUNET_MQ_send (op->mq, ev); @@ -2180,8 +2198,10 @@ union_evaluate (struct Operation *op, LOG (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); + op->state = state; initialize_key_to_element (op); - op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); + state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element); + return state; } @@ -2191,13 +2211,19 @@ union_evaluate (struct Operation *op, * * @param op operation that will be accepted as a union operation */ -static void +static struct OperationState * union_accept (struct Operation *op) { + struct OperationState *state; + const struct StrataEstimator *se; + struct GNUNET_MQ_Envelope *ev; + struct StrataEstimatorMessage *strata_msg; + char *buf; + size_t len; + uint16_t type; + LOG (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); - GNUNET_assert (NULL == op->state); - GNUNET_STATISTICS_update (_GSS_statistics, "# of accepted union operations", 1, @@ -2207,14 +2233,37 @@ union_accept (struct Operation *op) 1, GNUNET_NO); - op->state = GNUNET_new (struct OperationState); - op->state->se = strata_estimator_dup (op->spec->set->state->se); - op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); - op->state->salt_receive = op->state->salt_send = 42; + state = GNUNET_new (struct OperationState); + state->se = strata_estimator_dup (op->set->state->se); + state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, + GNUNET_NO); + state->salt_receive = state->salt_send = 42; // FIXME????? + op->state = state; initialize_key_to_element (op); - op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); + state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element); + /* kick off the operation */ - send_strata_estimator (op); + se = state->se; + buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size); + len = strata_estimator_write (se, + buf); + if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size) + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; + else + type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; + ev = GNUNET_MQ_msg_extra (strata_msg, + len, + type); + GNUNET_memcpy (&strata_msg[1], + buf, + len); + GNUNET_free (buf); + strata_msg->set_size + = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements)); + GNUNET_MQ_send (op->mq, + ev); + state->phase = PHASE_EXPECT_IBF; + return state; } @@ -2254,7 +2303,8 @@ union_set_create (void) * @param ee the element to add to the set */ static void -union_add (struct SetState *set_state, struct ElementEntry *ee) +union_add (struct SetState *set_state, + struct ElementEntry *ee) { strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash)); @@ -2269,7 +2319,8 @@ union_add (struct SetState *set_state, struct ElementEntry *ee) * @param ee set element to remove */ static void -union_remove (struct SetState *set_state, struct ElementEntry *ee) +union_remove (struct SetState *set_state, + struct ElementEntry *ee) { strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash)); @@ -2293,61 +2344,39 @@ union_set_destroy (struct SetState *set_state) } -/** - * Handler for peer-disconnects, notifies the client - * about the aborted operation in case the op was not concluded. - * - * @param op the destroyed operation - */ -static void -union_peer_disconnect (struct Operation *op) -{ - if (PHASE_DONE != op->state->phase) - { - struct GNUNET_MQ_Envelope *ev; - struct GNUNET_SET_ResultMessage *msg; - - ev = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_SET_RESULT); - msg->request_id = htonl (op->spec->client_request_id); - msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->element_type = htons (0); - GNUNET_MQ_send (op->spec->set->client_mq, - ev); - LOG (GNUNET_ERROR_TYPE_WARNING, - "other peer disconnected prematurely, phase %u\n", - op->state->phase); - _GSS_operation_destroy (op, - GNUNET_YES); - return; - } - // else: the session has already been concluded - LOG (GNUNET_ERROR_TYPE_DEBUG, - "other peer disconnected (finished)\n"); - if (GNUNET_NO == op->state->client_done_sent) - send_done_and_destroy (op); -} - - /** * Copy union-specific set state. * - * @param set source set for copying the union state + * @param state source state for copying the union state * @return a copy of the union-specific set state */ static struct SetState * -union_copy_state (struct Set *set) +union_copy_state (struct SetState *state) { struct SetState *new_state; + GNUNET_assert ( (NULL != state) && + (NULL != state->se) ); new_state = GNUNET_new (struct SetState); - GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) ); - new_state->se = strata_estimator_dup (set->state->se); + new_state->se = strata_estimator_dup (state->se); return new_state; } +/** + * Handle case where channel went down for an operation. + * + * @param op operation that lost the channel + */ +static void +union_channel_death (struct Operation *op) +{ + _GSS_operation_destroy (op, + GNUNET_YES); +} + + /** * Get the table with implementing functions for * set union. @@ -2364,9 +2393,9 @@ _GSS_union_vt () .destroy_set = &union_set_destroy, .evaluate = &union_evaluate, .accept = &union_accept, - .peer_disconnect = &union_peer_disconnect, .cancel = &union_op_cancel, .copy_state = &union_copy_state, + .channel_death = &union_channel_death }; return &union_vt;