X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set.c;h=e4e2535af94b84004865a34b4312fe335bb12b04;hb=5b32752cd7b02adcb8e6fec7798637638c6f63a0;hp=2291bd2f268212418b0836167334cb24de378900;hpb=dc0da39a4d4a086b5286ae4705a3b96695d2f5f0;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 2291bd2f2..e4e2535af 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2013, 2014 Christian Grothoff (and other contributing authors) + Copyright (C) 2013, 2014 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file set/gnunet-service-set.c @@ -25,6 +25,7 @@ */ #include "gnunet-service-set.h" #include "gnunet-service-set_protocol.h" +#include "gnunet_statistics_service.h" /** * How long do we hold on to an incoming channel if there is @@ -65,6 +66,11 @@ struct Listener */ struct GNUNET_HashCode app_id; + /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + /** * The type of the operation. */ @@ -72,6 +78,16 @@ struct Listener }; +struct LazyCopyRequest +{ + struct Set *source_set; + uint32_t cookie; + + struct LazyCopyRequest *prev; + struct LazyCopyRequest *next; +}; + + /** * Configuration of our local peer. */ @@ -115,6 +131,11 @@ static struct Operation *incoming_head; */ static struct Operation *incoming_tail; +static struct LazyCopyRequest *lazy_copy_head; +static struct LazyCopyRequest *lazy_copy_tail; + +static uint32_t lazy_copy_cookie = 1; + /** * Counter for allocating unique IDs for clients, used to identify * incoming operation requests from remote peers, that the client can @@ -122,6 +143,11 @@ static struct Operation *incoming_tail; */ static uint32_t suggest_id = 1; +/** + * Statistics handle. + */ +struct GNUNET_STATISTICS_Handle *_GSS_statistics; + /** * Get set that is owned by the given client, if any. @@ -208,6 +234,7 @@ listener_destroy (struct Listener *listener) GNUNET_MQ_destroy (listener->client_mq); listener->client_mq = NULL; } + GNUNET_CADET_close_port (listener->open_port); GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener); @@ -253,20 +280,20 @@ garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value) { - struct GarbageContext *gc = cls; - struct ElementEntry *ee = value; - - if (GNUNET_YES != ee->removed) - return GNUNET_OK; - if ( (gc->max_op_generation < ee->generation_added) || - (ee->generation_removed > gc->min_op_generation) ) - { - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (gc->map, - key, - ee)); - GNUNET_free (ee); - } + //struct GarbageContext *gc = cls; + //struct ElementEntry *ee = value; + + //if (GNUNET_YES != ee->removed) + // return GNUNET_OK; + //if ( (gc->max_op_generation < ee->generation_added) || + // (ee->generation_removed > gc->min_op_generation) ) + //{ + // GNUNET_assert (GNUNET_YES == + // GNUNET_CONTAINER_multihashmap_remove (gc->map, + // key, + // ee)); + // GNUNET_free (ee); + //} return GNUNET_OK; } @@ -293,13 +320,120 @@ collect_generation_garbage (struct Set *set) gc.max_op_generation = GNUNET_MAX (gc.max_op_generation, op->generation_created); } - gc.map = set->elements; - GNUNET_CONTAINER_multihashmap_iterate (set->elements, + gc.map = set->content->elements; + GNUNET_CONTAINER_multihashmap_iterate (set->content->elements, &garbage_collect_cb, &gc); } +static int +is_excluded_generation (unsigned int generation, + struct GenerationRange *excluded, + unsigned int excluded_size) +{ + unsigned int i; + + for (i = 0; i < excluded_size; i++) + { + if ( (generation >= excluded[i].start) && (generation < excluded[i].end) ) + return GNUNET_YES; + } + + return GNUNET_NO; +} + + +static int +is_element_of_generation (struct ElementEntry *ee, + unsigned int query_generation, + struct GenerationRange *excluded, + unsigned int excluded_size) +{ + struct MutationEvent *mut; + int is_present; + unsigned int i; + + GNUNET_assert (NULL != ee->mutations); + + if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size)) + { + GNUNET_break (0); + return GNUNET_NO; + } + + is_present = GNUNET_NO; + + /* Could be made faster with binary search, but lists + are small, so why bother. */ + for (i = 0; i < ee->mutations_size; i++) + { + mut = &ee->mutations[i]; + + if (mut->generation > query_generation) + { + /* The mutation doesn't apply to our generation + anymore. We can'b break here, since mutations aren't + sorted by generation. */ + continue; + } + + if (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) + { + /* The generation is excluded (because it belongs to another + fork via a lazy copy) and thus mutations aren't considered + for membership testing. */ + continue; + } + + /* This would be an inconsistency in how we manage mutations. */ + if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) ) + GNUNET_assert (0); + + /* Likewise. */ + if ( (GNUNET_NO == is_present) && (GNUNET_NO == mut->added) ) + GNUNET_assert (0); + + is_present = mut->added; + } + + return is_present; +} + + +int +_GSS_is_element_of_set (struct ElementEntry *ee, + struct Set *set) +{ + return is_element_of_generation (ee, + set->current_generation, + set->excluded_generations, + set->excluded_generations_size); +} + + +static int +is_element_of_iteration (struct ElementEntry *ee, + struct Set *set) +{ + return is_element_of_generation (ee, + set->iter_generation, + set->excluded_generations, + set->excluded_generations_size); +} + + +int +_GSS_is_element_of_operation (struct ElementEntry *ee, + struct Operation *op) +{ + return is_element_of_generation (ee, + op->generation_created, + op->spec->set->excluded_generations, + op->spec->set->excluded_generations_size); +} + + /** * Destroy the given operation. Call the implementation-specific * cancel function of the operation. Disconnects from the remote @@ -371,6 +505,8 @@ destroy_elements_iterator (void *cls, { struct ElementEntry *ee = value; + GNUNET_free_non_null (ee->mutations); + GNUNET_free (ee); return GNUNET_YES; } @@ -412,17 +548,62 @@ set_destroy (struct Set *set) set->iter = NULL; set->iteration_id++; } - if (NULL != set->elements) { - GNUNET_CONTAINER_multihashmap_iterate (set->elements, - &destroy_elements_iterator, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (set->elements); - set->elements = NULL; + struct SetContent *content; + struct PendingMutation *pm; + struct PendingMutation *pm_current; + + content = set->content; + + // discard any pending mutations that reference this set + pm = content->pending_mutations_head; + while (NULL != pm) + { + pm_current = pm; + pm = pm->next; + if (pm_current-> set == set) + GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, + content->pending_mutations_tail, + pm_current); + + } + + set->content = NULL; + GNUNET_assert (0 != content->refcount); + content->refcount -= 1; + if (0 == content->refcount) + { + GNUNET_assert (NULL != content->elements); + GNUNET_CONTAINER_multihashmap_iterate (content->elements, + &destroy_elements_iterator, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (content->elements); + content->elements = NULL; + GNUNET_free (content); + } } + GNUNET_free_non_null (set->excluded_generations); + set->excluded_generations = NULL; GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set); + + // remove set from pending copy requests + { + struct LazyCopyRequest *lcr; + lcr = lazy_copy_head; + while (NULL != lcr) + { + struct LazyCopyRequest *lcr_current; + lcr_current = lcr; + lcr = lcr->next; + if (lcr_current->source_set == set) + GNUNET_CONTAINER_DLL_remove (lazy_copy_head, + lazy_copy_tail, + lcr_current); + } + } + GNUNET_free (set); } @@ -437,8 +618,8 @@ static void handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) { - struct Set *set; struct Listener *listener; + struct Set *set; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n"); @@ -475,10 +656,10 @@ incoming_destroy (struct Operation *incoming) GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); - if (GNUNET_SCHEDULER_NO_TASK != incoming->timeout_task) + if (NULL != incoming->timeout_task) { GNUNET_SCHEDULER_cancel (incoming->timeout_task); - incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; + incoming->timeout_task = NULL; } /* make sure that the tunnel end handler will not destroy us again */ incoming->vt = NULL; @@ -500,29 +681,6 @@ incoming_destroy (struct Operation *incoming) } -/** - * Find a listener that is interested in the given operation type - * and application id. - * - * @param op operation type to look for - * @param app_id application id to look for - * @return a matching listener, or NULL if no listener matches the - * given operation and application id - */ -static struct Listener * -listener_get_by_target (enum GNUNET_SET_OperationType op, - const struct GNUNET_HashCode *app_id) -{ - struct Listener *listener; - - for (listener = listeners_head; NULL != listener; listener = listener->next) - if ( (listener->operation == op) && - (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) ) - return listener; - return NULL; -} - - /** * Suggest the given request to the listener. The listening client can * then accept or reject the remote request. @@ -543,9 +701,9 @@ incoming_suggest (struct Operation *incoming, incoming->suggest_id = suggest_id++; if (0 == suggest_id) suggest_id++; - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != incoming->timeout_task); + GNUNET_assert (NULL != incoming->timeout_task); GNUNET_SCHEDULER_cancel (incoming->timeout_task); - incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; + incoming->timeout_task = NULL; mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, incoming->spec->context_msg); @@ -555,7 +713,8 @@ incoming_suggest (struct Operation *incoming, incoming->suggest_id); cmsg->accept_id = htonl (incoming->suggest_id); cmsg->peer_id = incoming->spec->peer; - GNUNET_MQ_send (listener->client_mq, mqm); + GNUNET_MQ_send (listener->client_mq, + mqm); } @@ -581,7 +740,7 @@ handle_incoming_msg (struct Operation *op, const struct GNUNET_MessageHeader *mh) { const struct OperationRequestMessage *msg; - struct Listener *listener; + struct Listener *listener = op->listener; struct OperationSpecification *spec; const struct GNUNET_MessageHeader *nested_context; @@ -613,33 +772,148 @@ handle_incoming_msg (struct Operation *op, if (NULL != nested_context) spec->context_msg = GNUNET_copy_message (nested_context); spec->operation = ntohl (msg->operation); - spec->app_id = msg->app_id; + spec->app_id = listener->app_id; spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); spec->peer = op->peer; spec->remote_element_count = ntohl (msg->element_count); op->spec = spec; - listener = listener_get_by_target (ntohl (msg->operation), - &msg->app_id); - if (NULL == listener) - { - GNUNET_break (GNUNET_SCHEDULER_NO_TASK != op->timeout_task); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No matching listener for incoming request (op %u, app %s), waiting with timeout\n", - ntohl (msg->operation), - GNUNET_h2s (&msg->app_id)); - return GNUNET_OK; - } + listener = op->listener; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received P2P operation request (op %u, app %s) for active listener\n", + "Received P2P operation request (op %u, port %s) for active listener\n", ntohl (msg->operation), - GNUNET_h2s (&msg->app_id)); - incoming_suggest (op, listener); + GNUNET_h2s (&listener->app_id)); + incoming_suggest (op, + listener); return GNUNET_OK; } +static void +execute_add (struct Set *set, + const struct GNUNET_MessageHeader *m) +{ + const struct GNUNET_SET_ElementMessage *msg; + struct GNUNET_SET_Element el; + struct ElementEntry *ee; + struct GNUNET_HashCode hash; + + GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (m->type)); + + msg = (const struct GNUNET_SET_ElementMessage *) m; + el.size = ntohs (m->size) - sizeof *msg; + el.data = &msg[1]; + el.element_type = ntohs (msg->element_type); + GNUNET_SET_element_hash (&el, &hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserts element %s of size %u\n", + GNUNET_h2s (&hash), + el.size); + + ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, + &hash); + + if (NULL == ee) + { + ee = GNUNET_malloc (el.size + sizeof *ee); + ee->element.size = el.size; + GNUNET_memcpy (&ee[1], + el.data, + el.size); + ee->element.data = &ee[1]; + ee->element.element_type = el.element_type; + ee->remote = GNUNET_NO; + ee->mutations = NULL; + ee->mutations_size = 0; + ee->element_hash = hash; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_put (set->content->elements, + &ee->element_hash, + ee, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + } + else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) + { + /* same element inserted twice */ + return; + } + + { + struct MutationEvent mut = { + .generation = set->current_generation, + .added = GNUNET_YES + }; + GNUNET_array_append (ee->mutations, ee->mutations_size, mut); + } + + set->vt->add (set->state, ee); +} + + +static void +execute_remove (struct Set *set, + const struct GNUNET_MessageHeader *m) +{ + const struct GNUNET_SET_ElementMessage *msg; + struct GNUNET_SET_Element el; + struct ElementEntry *ee; + struct GNUNET_HashCode hash; + + GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type)); + + msg = (const struct GNUNET_SET_ElementMessage *) m; + el.size = ntohs (m->size) - sizeof *msg; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client removes element of size %u\n", + el.size); + el.data = &msg[1]; + el.element_type = ntohs (msg->element_type); + GNUNET_SET_element_hash (&el, &hash); + ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, + &hash); + if (NULL == ee) + { + /* Client tried to remove non-existing element. */ + return; + } + if (GNUNET_NO == _GSS_is_element_of_set (ee, set)) + { + /* Client tried to remove element twice */ + return; + } + else + { + struct MutationEvent mut = { + .generation = set->current_generation, + .added = GNUNET_NO + }; + GNUNET_array_append (ee->mutations, ee->mutations_size, mut); + } + set->vt->remove (set->state, ee); +} + + + +static void +execute_mutation (struct Set *set, + const struct GNUNET_MessageHeader *m) +{ + switch (ntohs (m->type)) + { + case GNUNET_MESSAGE_TYPE_SET_ADD: + execute_add (set, m); + break; + case GNUNET_MESSAGE_TYPE_SET_REMOVE: + execute_remove (set, m); + break; + default: + GNUNET_break (0); + } +} + + + /** * Send the next element of a set to the set's client. The next element is given by * the set's current hashmap iterator. The set's iterator will be set to NULL if there @@ -662,26 +936,62 @@ send_client_element (struct Set *set) struct GNUNET_SET_IterResponseMessage *msg; GNUNET_assert (NULL != set->iter); + +again: + ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter, NULL, (const void **) &ee); if (GNUNET_NO == ret) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Iteration on %p done.\n", + (void *) set); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); set->iter = NULL; set->iteration_id++; + + GNUNET_assert (set->content->iterator_count > 0); + set->content->iterator_count -= 1; + + if (0 == set->content->iterator_count) + { + while (NULL != set->content->pending_mutations_head) + { + struct PendingMutation *pm; + + pm = set->content->pending_mutations_head; + GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing pending mutation on %p.\n", + (void *) pm->set); + execute_mutation (pm->set, pm->mutation_message); + GNUNET_free (pm->mutation_message); + GNUNET_free (pm); + } + } + } else { GNUNET_assert (NULL != ee); + + if (GNUNET_NO == is_element_of_iteration (ee, set)) + goto again; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending iteration element on %p.\n", + (void *) set); ev = GNUNET_MQ_msg_extra (msg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); - memcpy (&msg[1], + GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size); - msg->element_type = ee->element.element_type; + msg->element_type = htons (ee->element.element_type); msg->iteration_id = htons (set->iteration_id); } GNUNET_MQ_send (set->client_mq, ev); @@ -721,11 +1031,15 @@ handle_client_iterate (void *cls, return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Iterating set with %u elements\n", - GNUNET_CONTAINER_multihashmap_size (set->elements)); + "Iterating set %p in gen %u with %u content elements\n", + (void *) set, + set->current_generation, + GNUNET_CONTAINER_multihashmap_size (set->content->elements)); GNUNET_SERVER_receive_done (client, GNUNET_OK); - set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements); + set->content->iterator_count += 1; + set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements); + set->iter_generation = set->current_generation; send_client_element (set); } @@ -773,8 +1087,18 @@ handle_client_create_set (void *cls, GNUNET_SERVER_client_disconnect (client); return; } + set->operation = ntohl (msg->operation); set->state = set->vt->create (); - set->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + if (NULL == set->state) + { + /* initialization failed (i.e. out of memory) */ + GNUNET_free (set); + GNUNET_SERVER_client_disconnect (client); + return; + } + set->content = GNUNET_new (struct SetContent); + set->content->refcount = 1; + set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); set->client = client; set->client_mq = GNUNET_MQ_queue_for_server_client (client); GNUNET_CONTAINER_DLL_insert (sets_head, @@ -786,74 +1110,173 @@ handle_client_create_set (void *cls, /** - * Called when a client wants to create a new listener. + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST * - * @param cls unused - * @param client client that sent the message - * @param m message sent by the client + * @param cls channel context + * @param tc context information (why was this task triggered now) */ static void -handle_client_listen (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +incoming_timeout_cb (void *cls) { - const struct GNUNET_SET_ListenMessage *msg; - struct Listener *listener; - struct Operation *op; + struct Operation *incoming = cls; - msg = (const struct GNUNET_SET_ListenMessage *) m; - if (NULL != listener_get (client)) - { - /* max. one active listener per client! */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - return; - } - listener = GNUNET_new (struct Listener); - listener->client = client; - listener->client_mq = GNUNET_MQ_queue_for_server_client (client); - listener->app_id = msg->app_id; - listener->operation = ntohl (msg->operation); - GNUNET_CONTAINER_DLL_insert_tail (listeners_head, - listeners_tail, - listener); + incoming->timeout_task = NULL; + GNUNET_assert (GNUNET_YES == incoming->is_incoming); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New listener created (op %u, app %s)\n", - listener->operation, - GNUNET_h2s (&listener->app_id)); - - /* check for existing incoming requests the listener might be interested in */ - for (op = incoming_head; NULL != op; op = op->next) - { - if (NULL == op->spec) - continue; /* no details available yet */ - if (0 != op->suggest_id) - continue; /* this one has been already suggested to a listener */ - if (listener->operation != op->spec->operation) - continue; /* incompatible operation */ - if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, - &op->spec->app_id)) - continue; /* incompatible appliation */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found matching existing request\n"); - incoming_suggest (op, - listener); - } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + "Remote peer's incoming request timed out\n"); + incoming_destroy (incoming); } /** - * Called when the listening client rejects an operation - * request by another peer. + * Terminates an incoming operation in case we have not yet received an + * operation request. Called by the channel destruction handler. * - * @param cls unused - * @param client client that sent the message - * @param m message sent by the client + * @param op the channel context */ static void -handle_client_reject (void *cls, - struct GNUNET_SERVER_Client *client, +handle_incoming_disconnect (struct Operation *op) +{ + GNUNET_assert (GNUNET_YES == op->is_incoming); + /* channel is already dead, incoming_destroy must not + * destroy it ... */ + op->channel = NULL; + incoming_destroy (op); + op->vt = NULL; +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param initiator peer that started the channel + * @param port Port this channel is for. + * @param options Unused. + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_HashCode *port, + enum GNUNET_CADET_ChannelOption options) +{ + static const struct SetVT incoming_vt = { + .msg_handler = &handle_incoming_msg, + .peer_disconnect = &handle_incoming_disconnect + }; + struct Listener *listener = cls; + struct Operation *incoming; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New incoming channel\n"); + incoming = GNUNET_new (struct Operation); + incoming->listener = listener; + incoming->is_incoming = GNUNET_YES; + incoming->peer = *initiator; + incoming->channel = channel; + incoming->mq = GNUNET_CADET_mq_create (incoming->channel); + incoming->vt = &incoming_vt; + incoming->timeout_task + = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + incoming); + GNUNET_CONTAINER_DLL_insert_tail (incoming_head, + incoming_tail, + incoming); + // incoming_suggest (incoming, + // listener); + return incoming; +} + + +/** + * Called when a client wants to create a new listener. + * + * @param cls unused + * @param client client that sent the message + * @param m message sent by the client + */ +static void +handle_client_listen (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + const struct GNUNET_SET_ListenMessage *msg; + struct Listener *listener; + struct Operation *op; + + msg = (const struct GNUNET_SET_ListenMessage *) m; + if (NULL != listener_get (client)) + { + /* max. one active listener per client! */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; + } + listener = GNUNET_new (struct Listener); + listener->client = client; + listener->client_mq = GNUNET_MQ_queue_for_server_client (client); + listener->app_id = msg->app_id; + listener->operation = ntohl (msg->operation); + GNUNET_CONTAINER_DLL_insert_tail (listeners_head, + listeners_tail, + listener); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New listener created (op %u, port %s)\n", + listener->operation, + GNUNET_h2s (&listener->app_id)); + + /* check for existing incoming requests the listener might be interested in */ + for (op = incoming_head; NULL != op; op = op->next) + { + if (NULL == op->spec) + continue; /* no details available yet */ + if (0 != op->suggest_id) + continue; /* this one has been already suggested to a listener */ + if (listener->operation != op->spec->operation) + continue; /* incompatible operation */ + if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, + &op->spec->app_id)) + continue; /* incompatible appliation */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found matching existing request\n"); + incoming_suggest (op, + listener); + } + GNUNET_SERVER_receive_done (client, + GNUNET_OK); +} + + +/** + * Called when the listening client rejects an operation + * request by another peer. + * + * @param cls unused + * @param client client that sent the message + * @param m message sent by the client + */ +static void +handle_client_reject (void *cls, + struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { struct Operation *incoming; @@ -879,23 +1302,20 @@ handle_client_reject (void *cls, } + /** - * Called when a client wants to add an element to a set it inhabits. + * Called when a client wants to add or remove an element to a set it inhabits. * * @param cls unused * @param client client that sent the message * @param m message sent by the client */ static void -handle_client_add (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +handle_client_mutation (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) { struct Set *set; - const struct GNUNET_SET_ElementMessage *msg; - struct GNUNET_SET_Element el; - struct ElementEntry *ee; - struct ElementEntry *ee_dup; set = set_get (client); if (NULL == set) @@ -905,98 +1325,59 @@ handle_client_add (void *cls, GNUNET_SERVER_client_disconnect (client); return; } + GNUNET_SERVER_receive_done (client, GNUNET_OK); - msg = (const struct GNUNET_SET_ElementMessage *) m; - el.size = ntohs (m->size) - sizeof *msg; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client inserts element of size %u\n", - el.size); - el.data = &msg[1]; - ee = GNUNET_malloc (el.size + sizeof *ee); - ee->element.size = el.size; - memcpy (&ee[1], - el.data, - el.size); - ee->element.data = &ee[1]; - ee->generation_added = set->current_generation; - ee->remote = GNUNET_NO; - GNUNET_CRYPTO_hash (ee->element.data, - el.size, - &ee->element_hash); - ee_dup = GNUNET_CONTAINER_multihashmap_get (set->elements, - &ee->element_hash); - if (NULL != ee_dup) + + if (0 != set->content->iterator_count) { - /* same element inserted twice */ - GNUNET_break (0); - GNUNET_free (ee); + struct PendingMutation *pm; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling mutation on set\n"); + + pm = GNUNET_new (struct PendingMutation); + pm->mutation_message = GNUNET_copy_message (m); + pm->set = set; + GNUNET_CONTAINER_DLL_insert (set->content->pending_mutations_head, + set->content->pending_mutations_tail, + pm); return; } - GNUNET_break (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_put (set->elements, - &ee->element_hash, - ee, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - set->vt->add (set->state, ee); + + execute_mutation (set, m); } /** - * Called when a client wants to remove an element from a set it - * inhabits. + * Advance the current generation of a set, + * adding exclusion ranges if necessary. * - * @param cls unused - * @param client client that sent the message - * @param m message sent by the client + * @param set the set where we want to advance the generation */ static void -handle_client_remove (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +advance_generation (struct Set *set) { - struct Set *set; - const struct GNUNET_SET_ElementMessage *msg; - struct GNUNET_SET_Element el; - struct ElementEntry *ee; - struct GNUNET_HashCode hash; + struct GenerationRange r; - set = set_get (client); - if (NULL == set) - { - /* client without a set requested an operation */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - return; - } - GNUNET_SERVER_receive_done (client, - GNUNET_OK); - msg = (const struct GNUNET_SET_ElementMessage *) m; - el.size = ntohs (m->size) - sizeof *msg; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client removes element of size %u\n", - el.size); - el.data = &msg[1]; - GNUNET_CRYPTO_hash (el.data, - el.size, - &hash); - ee = GNUNET_CONTAINER_multihashmap_get (set->elements, - &hash); - if (NULL == ee) + if (set->current_generation == set->content->latest_generation) { - /* Client tried to remove non-existing element */ - GNUNET_break (0); - return; - } - if (GNUNET_YES == ee->removed) - { - /* Client tried to remove element twice */ - GNUNET_break (0); + set->content->latest_generation += 1; + set->current_generation += 1; return; } - ee->removed = GNUNET_YES; - ee->generation_removed = set->current_generation; - set->vt->remove (set->state, ee); + + GNUNET_assert (set->current_generation < set->content->latest_generation); + + r.start = set->current_generation + 1; + r.end = set->content->latest_generation + 1; + + set->content->latest_generation = r.end; + set->current_generation = r.end; + + GNUNET_array_append (set->excluded_generations, + set->excluded_generations_size, + r); } @@ -1040,15 +1421,23 @@ handle_client_evaluate (void *cls, context = GNUNET_MQ_extract_nested_mh (msg); op = GNUNET_new (struct Operation); op->spec = spec; - op->generation_created = set->current_generation++; + + // Advance generation values, so that + // mutations won't interfer with the running operation. + op->generation_created = set->current_generation; + advance_generation (set); + op->vt = set->vt; GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new CADET channel to port %s\n", + GNUNET_h2s (&msg->app_id)); op->channel = GNUNET_CADET_channel_create (cadet, op, &msg->target_peer, - GNUNET_APPLICATION_TYPE_SET, + &msg->app_id, GNUNET_CADET_OPTION_RELIABLE); op->mq = GNUNET_CADET_mq_create (op->channel); set->vt->evaluate (op, @@ -1107,6 +1496,164 @@ handle_client_iter_ack (void *cls, } +/** + * Handle a request from the client to + * copy a set. + * + * @param cls unused + * @param client the client + * @param mh the message + */ +static void +handle_client_copy_lazy_prepare (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *mh) +{ + struct Set *set; + struct LazyCopyRequest *cr; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_CopyLazyResponseMessage *resp_msg; + + set = set_get (client); + if (NULL == set) + { + /* client without a set requested an operation */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; + } + + cr = GNUNET_new (struct LazyCopyRequest); + + cr->cookie = lazy_copy_cookie; + lazy_copy_cookie += 1; + cr->source_set = set; + + GNUNET_CONTAINER_DLL_insert (lazy_copy_head, + lazy_copy_tail, + cr); + + + ev = GNUNET_MQ_msg (resp_msg, + GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE); + resp_msg->cookie = cr->cookie; + GNUNET_MQ_send (set->client_mq, ev); + + + GNUNET_SERVER_receive_done (client, + GNUNET_OK); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client requested lazy copy\n"); +} + + +/** + * Handle a request from the client to + * connect to a copy of a set. + * + * @param cls unused + * @param client the client + * @param mh the message + */ +static void +handle_client_copy_lazy_connect (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *mh) +{ + struct LazyCopyRequest *cr; + const struct GNUNET_SET_CopyLazyConnectMessage *msg = + (const struct GNUNET_SET_CopyLazyConnectMessage *) mh; + struct Set *set; + int found; + + if (NULL != set_get (client)) + { + /* There can only be one set per client */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; + } + + found = GNUNET_NO; + + for (cr = lazy_copy_head; NULL != cr; cr = cr->next) + { + if (cr->cookie == msg->cookie) + { + found = GNUNET_YES; + break; + } + } + + if (GNUNET_NO == found) + { + /* client asked for copy with cookie we don't know */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (client); + return; + } + + GNUNET_CONTAINER_DLL_remove (lazy_copy_head, + lazy_copy_tail, + cr); + + set = GNUNET_new (struct Set); + + switch (cr->source_set->operation) + { + case GNUNET_SET_OPERATION_INTERSECTION: + set->vt = _GSS_intersection_vt (); + break; + case GNUNET_SET_OPERATION_UNION: + set->vt = _GSS_union_vt (); + break; + default: + GNUNET_assert (0); + return; + } + + if (NULL == set->vt->copy_state) + { + /* Lazy copy not supported for this set operation */ + GNUNET_break (0); + GNUNET_free (set); + GNUNET_free (cr); + GNUNET_SERVER_client_disconnect (client); + return; + } + + set->operation = cr->source_set->operation; + set->state = set->vt->copy_state (cr->source_set); + set->content = cr->source_set->content; + set->content->refcount += 1; + + set->current_generation = cr->source_set->current_generation; + set->excluded_generations_size = cr->source_set->excluded_generations_size; + set->excluded_generations = GNUNET_memdup (cr->source_set->excluded_generations, + set->excluded_generations_size * sizeof (struct GenerationRange)); + + /* Advance the generation of the new set, so that mutations to the + of the cloned set and the source set are independent. */ + advance_generation (set); + + + set->client = client; + set->client_mq = GNUNET_MQ_queue_for_server_client (client); + GNUNET_CONTAINER_DLL_insert (sets_head, + sets_tail, + set); + + GNUNET_free (cr); + + GNUNET_SERVER_receive_done (client, + GNUNET_OK); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client connected to lazy set\n"); +} + + /** * Handle a request from the client to * cancel a running set operation. @@ -1226,7 +1773,12 @@ handle_client_accept (void *cls, op); op->spec->client_request_id = ntohl (msg->request_id); op->spec->result_mode = ntohl (msg->result_mode); - op->generation_created = set->current_generation++; + + // Advance generation values, so that + // mutations won't interfer with the running operation. + op->generation_created = set->current_generation; + advance_generation (set); + op->vt = set->vt; op->vt->accept (op); GNUNET_SERVER_receive_done (client, @@ -1238,11 +1790,9 @@ handle_client_accept (void *cls, * Called to clean up, after a shutdown has been requested. * * @param cls closure - * @param tc context information (why was this task triggered now) */ static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls) { while (NULL != incoming_head) incoming_destroy (incoming_head); @@ -1258,118 +1808,19 @@ shutdown_task (void *cls, GNUNET_CADET_disconnect (cadet); cadet = NULL; } + GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); } -/** - * Timeout happens iff: - * - we suggested an operation to our listener, - * but did not receive a response in time - * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST - * - shutdown (obviously) - * - * @param cls channel context - * @param tc context information (why was this task triggered now) - */ -static void -incoming_timeout_cb (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Operation *incoming = cls; - - incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (GNUNET_YES == incoming->is_incoming); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Remote peer's incoming request timed out\n"); - incoming_destroy (incoming); -} - - -/** - * Terminates an incoming operation in case we have not yet received an - * operation request. Called by the channel destruction handler. - * - * @param op the channel context - */ -static void -handle_incoming_disconnect (struct Operation *op) -{ - GNUNET_assert (GNUNET_YES == op->is_incoming); - /* channel is already dead, incoming_destroy must not - * destroy it ... */ - op->channel = NULL; - incoming_destroy (op); - op->vt = NULL; -} - - -/** - * Method called whenever another peer has added us to a channel the - * other peer initiated. Only called (once) upon reception of data - * with a message type which was subscribed to in - * GNUNET_CADET_connect(). - * - * The channel context represents the operation itself and gets added to a DLL, - * from where it gets looked up when our local listener client responds - * to a proposed/suggested operation or connects and associates with this operation. - * - * @param cls closure - * @param channel new handle to the channel - * @param initiator peer that started the channel - * @param port Port this channel is for. - * @param options Unused. - * @return initial channel context for the channel - * returns NULL on error - */ -static void * -channel_new_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - uint32_t port, - enum GNUNET_CADET_ChannelOption options) -{ - static const struct SetVT incoming_vt = { - .msg_handler = &handle_incoming_msg, - .peer_disconnect = &handle_incoming_disconnect - }; - struct Operation *incoming; - - if (GNUNET_APPLICATION_TYPE_SET != port) - { - GNUNET_break (0); - GNUNET_CADET_channel_destroy (channel); - return NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New incoming channel\n"); - incoming = GNUNET_new (struct Operation); - incoming->is_incoming = GNUNET_YES; - incoming->peer = *initiator; - incoming->channel = channel; - incoming->mq = GNUNET_CADET_mq_create (incoming->channel); - incoming->vt = &incoming_vt; - incoming->timeout_task - = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, - &incoming_timeout_cb, - incoming); - GNUNET_CONTAINER_DLL_insert_tail (incoming_head, - incoming_tail, - incoming); - return incoming; -} - - /** * Function called whenever a channel is destroyed. Should clean up * any associated state. It must NOT call * GNUNET_CADET_channel_destroy() on the channel. * * The peer_disconnect function is part of a a virtual table set initially either - * when a peer creates a new channel with us (#channel_new_cb()), or once we create + * when a peer creates a new channel with us, or once we create * a new channel ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1414,7 +1865,7 @@ channel_end_cb (void *cls, * received via a cadet channel. * * The msg_handler is a virtual table set in initially either when a peer - * creates a new channel with us (channel_new_cb), or once we create a new channel + * creates a new channel with us, or once we create a new channel * ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1473,7 +1924,7 @@ run (void *cls, { &handle_client_iter_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_ACK, sizeof (struct GNUNET_SET_IterAckMessage) }, - { &handle_client_add, NULL, + { &handle_client_mutation, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, { &handle_client_create_set, NULL, @@ -1491,40 +1942,49 @@ run (void *cls, { &handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, sizeof (struct GNUNET_SET_RejectMessage)}, - { &handle_client_remove, NULL, + { &handle_client_mutation, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, { &handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, sizeof (struct GNUNET_SET_CancelMessage)}, + { &handle_client_copy_lazy_prepare, NULL, + GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE, + sizeof (struct GNUNET_MessageHeader)}, + { &handle_client_copy_lazy_connect, NULL, + GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT, + sizeof (struct GNUNET_SET_CopyLazyConnectMessage)}, { NULL, NULL, 0, 0} }; static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, + { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0}, + { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0}, + { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, + { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, + { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0}, {NULL, 0, 0} }; - static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; configuration = cfg; - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); GNUNET_SERVER_disconnect_notify (server, - &handle_client_disconnect, NULL); + &handle_client_disconnect, + NULL); GNUNET_SERVER_add_handlers (server, server_handlers); - cadet = GNUNET_CADET_connect (cfg, NULL, - &channel_new_cb, + _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); + cadet = GNUNET_CADET_connect (cfg, + NULL, &channel_end_cb, - cadet_handlers, - cadet_ports); + cadet_handlers); if (NULL == cadet) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR,