X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fset%2Fgnunet-service-set.c;h=e4e2535af94b84004865a34b4312fe335bb12b04;hb=5b32752cd7b02adcb8e6fec7798637638c6f63a0;hp=cc172974ef979dda47cdff0925472a49ae88e294;hpb=ca2a0be297732b8cd8b2f6635bc6be6d6a717860;p=oweals%2Fgnunet.git diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index cc172974e..e4e2535af 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013, 2014 Christian Grothoff (and other contributing authors) + Copyright (C) 2013, 2014 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -66,6 +66,11 @@ struct Listener */ struct GNUNET_HashCode app_id; + /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + /** * The type of the operation. */ @@ -229,6 +234,7 @@ listener_destroy (struct Listener *listener) GNUNET_MQ_destroy (listener->client_mq); listener->client_mq = NULL; } + GNUNET_CADET_close_port (listener->open_port); GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener); @@ -320,7 +326,8 @@ collect_generation_garbage (struct Set *set) &gc); } -int + +static int is_excluded_generation (unsigned int generation, struct GenerationRange *excluded, unsigned int excluded_size) @@ -337,7 +344,7 @@ is_excluded_generation (unsigned int generation, } -int +static int is_element_of_generation (struct ElementEntry *ee, unsigned int query_generation, struct GenerationRange *excluded, @@ -611,8 +618,8 @@ static void handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) { - struct Set *set; struct Listener *listener; + struct Set *set; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n"); @@ -674,29 +681,6 @@ incoming_destroy (struct Operation *incoming) } -/** - * Find a listener that is interested in the given operation type - * and application id. - * - * @param op operation type to look for - * @param app_id application id to look for - * @return a matching listener, or NULL if no listener matches the - * given operation and application id - */ -static struct Listener * -listener_get_by_target (enum GNUNET_SET_OperationType op, - const struct GNUNET_HashCode *app_id) -{ - struct Listener *listener; - - for (listener = listeners_head; NULL != listener; listener = listener->next) - if ( (listener->operation == op) && - (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) ) - return listener; - return NULL; -} - - /** * Suggest the given request to the listener. The listening client can * then accept or reject the remote request. @@ -729,7 +713,8 @@ incoming_suggest (struct Operation *incoming, incoming->suggest_id); cmsg->accept_id = htonl (incoming->suggest_id); cmsg->peer_id = incoming->spec->peer; - GNUNET_MQ_send (listener->client_mq, mqm); + GNUNET_MQ_send (listener->client_mq, + mqm); } @@ -755,7 +740,7 @@ handle_incoming_msg (struct Operation *op, const struct GNUNET_MessageHeader *mh) { const struct OperationRequestMessage *msg; - struct Listener *listener; + struct Listener *listener = op->listener; struct OperationSpecification *spec; const struct GNUNET_MessageHeader *nested_context; @@ -787,29 +772,20 @@ handle_incoming_msg (struct Operation *op, if (NULL != nested_context) spec->context_msg = GNUNET_copy_message (nested_context); spec->operation = ntohl (msg->operation); - spec->app_id = msg->app_id; + spec->app_id = listener->app_id; spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); spec->peer = op->peer; spec->remote_element_count = ntohl (msg->element_count); op->spec = spec; - listener = listener_get_by_target (ntohl (msg->operation), - &msg->app_id); - if (NULL == listener) - { - GNUNET_break (NULL != op->timeout_task); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No matching listener for incoming request (op %u, app %s), waiting with timeout\n", - ntohl (msg->operation), - GNUNET_h2s (&msg->app_id)); - return GNUNET_OK; - } + listener = op->listener; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received P2P operation request (op %u, app %s) for active listener\n", + "Received P2P operation request (op %u, port %s) for active listener\n", ntohl (msg->operation), - GNUNET_h2s (&msg->app_id)); - incoming_suggest (op, listener); + GNUNET_h2s (&listener->app_id)); + incoming_suggest (op, + listener); return GNUNET_OK; } @@ -827,12 +803,13 @@ execute_add (struct Set *set, msg = (const struct GNUNET_SET_ElementMessage *) m; el.size = ntohs (m->size) - sizeof *msg; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client inserts element of size %u\n", - el.size); el.data = &msg[1]; el.element_type = ntohs (msg->element_type); GNUNET_SET_element_hash (&el, &hash); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Client inserts element %s of size %u\n", + GNUNET_h2s (&hash), + el.size); ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); @@ -841,7 +818,7 @@ execute_add (struct Set *set, { ee = GNUNET_malloc (el.size + sizeof *ee); ee->element.size = el.size; - memcpy (&ee[1], + GNUNET_memcpy (&ee[1], el.data, el.size); ee->element.data = &ee[1]; @@ -974,7 +951,7 @@ again: GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); set->iter = NULL; set->iteration_id++; - + GNUNET_assert (set->content->iterator_count > 0); set->content->iterator_count -= 1; @@ -1011,7 +988,7 @@ again: ev = GNUNET_MQ_msg_extra (msg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT); - memcpy (&msg[1], + GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size); msg->element_type = htons (ee->element.element_type); @@ -1112,6 +1089,13 @@ handle_client_create_set (void *cls, } set->operation = ntohl (msg->operation); set->state = set->vt->create (); + if (NULL == set->state) + { + /* initialization failed (i.e. out of memory) */ + GNUNET_free (set); + GNUNET_SERVER_client_disconnect (client); + return; + } set->content = GNUNET_new (struct SetContent); set->content->refcount = 1; set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); @@ -1125,6 +1109,100 @@ handle_client_create_set (void *cls, } +/** + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + * + * @param cls channel context + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls) +{ + struct Operation *incoming = cls; + + incoming->timeout_task = NULL; + GNUNET_assert (GNUNET_YES == incoming->is_incoming); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Remote peer's incoming request timed out\n"); + incoming_destroy (incoming); +} + + +/** + * Terminates an incoming operation in case we have not yet received an + * operation request. Called by the channel destruction handler. + * + * @param op the channel context + */ +static void +handle_incoming_disconnect (struct Operation *op) +{ + GNUNET_assert (GNUNET_YES == op->is_incoming); + /* channel is already dead, incoming_destroy must not + * destroy it ... */ + op->channel = NULL; + incoming_destroy (op); + op->vt = NULL; +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param initiator peer that started the channel + * @param port Port this channel is for. + * @param options Unused. + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_HashCode *port, + enum GNUNET_CADET_ChannelOption options) +{ + static const struct SetVT incoming_vt = { + .msg_handler = &handle_incoming_msg, + .peer_disconnect = &handle_incoming_disconnect + }; + struct Listener *listener = cls; + struct Operation *incoming; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New incoming channel\n"); + incoming = GNUNET_new (struct Operation); + incoming->listener = listener; + incoming->is_incoming = GNUNET_YES; + incoming->peer = *initiator; + incoming->channel = channel; + incoming->mq = GNUNET_CADET_mq_create (incoming->channel); + incoming->vt = &incoming_vt; + incoming->timeout_task + = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + incoming); + GNUNET_CONTAINER_DLL_insert_tail (incoming_head, + incoming_tail, + incoming); + // incoming_suggest (incoming, + // listener); + return incoming; +} + + /** * Called when a client wants to create a new listener. * @@ -1157,8 +1235,12 @@ handle_client_listen (void *cls, GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New listener created (op %u, app %s)\n", + "New listener created (op %u, port %s)\n", listener->operation, GNUNET_h2s (&listener->app_id)); @@ -1179,7 +1261,8 @@ handle_client_listen (void *cls, incoming_suggest (op, listener); } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); } @@ -1297,6 +1380,7 @@ advance_generation (struct Set *set) r); } + /** * Called when a client wants to initiate a set operation with another * peer. Initiates the CADET connection to the listener and sends the @@ -1347,10 +1431,13 @@ handle_client_evaluate (void *cls, GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new CADET channel to port %s\n", + GNUNET_h2s (&msg->app_id)); op->channel = GNUNET_CADET_channel_create (cadet, op, &msg->target_peer, - GNUNET_APPLICATION_TYPE_SET, + &msg->app_id, GNUNET_CADET_OPTION_RELIABLE); op->mq = GNUNET_CADET_mq_create (op->channel); set->vt->evaluate (op, @@ -1496,7 +1583,7 @@ handle_client_copy_lazy_connect (void *cls, { found = GNUNET_YES; break; - } + } } if (GNUNET_NO == found) @@ -1526,7 +1613,8 @@ handle_client_copy_lazy_connect (void *cls, return; } - if (NULL == set->vt->copy_state) { + if (NULL == set->vt->copy_state) + { /* Lazy copy not supported for this set operation */ GNUNET_break (0); GNUNET_free (set); @@ -1702,11 +1790,9 @@ handle_client_accept (void *cls, * Called to clean up, after a shutdown has been requested. * * @param cls closure - * @param tc context information (why was this task triggered now) */ static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls) { while (NULL != incoming_head) incoming_destroy (incoming_head); @@ -1728,113 +1814,13 @@ shutdown_task (void *cls, } -/** - * Timeout happens iff: - * - we suggested an operation to our listener, - * but did not receive a response in time - * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST - * - shutdown (obviously) - * - * @param cls channel context - * @param tc context information (why was this task triggered now) - */ -static void -incoming_timeout_cb (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct Operation *incoming = cls; - - incoming->timeout_task = NULL; - GNUNET_assert (GNUNET_YES == incoming->is_incoming); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Remote peer's incoming request timed out\n"); - incoming_destroy (incoming); -} - - -/** - * Terminates an incoming operation in case we have not yet received an - * operation request. Called by the channel destruction handler. - * - * @param op the channel context - */ -static void -handle_incoming_disconnect (struct Operation *op) -{ - GNUNET_assert (GNUNET_YES == op->is_incoming); - /* channel is already dead, incoming_destroy must not - * destroy it ... */ - op->channel = NULL; - incoming_destroy (op); - op->vt = NULL; -} - - -/** - * Method called whenever another peer has added us to a channel the - * other peer initiated. Only called (once) upon reception of data - * with a message type which was subscribed to in - * GNUNET_CADET_connect(). - * - * The channel context represents the operation itself and gets added to a DLL, - * from where it gets looked up when our local listener client responds - * to a proposed/suggested operation or connects and associates with this operation. - * - * @param cls closure - * @param channel new handle to the channel - * @param initiator peer that started the channel - * @param port Port this channel is for. - * @param options Unused. - * @return initial channel context for the channel - * returns NULL on error - */ -static void * -channel_new_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - uint32_t port, - enum GNUNET_CADET_ChannelOption options) -{ - static const struct SetVT incoming_vt = { - .msg_handler = &handle_incoming_msg, - .peer_disconnect = &handle_incoming_disconnect - }; - struct Operation *incoming; - - if (GNUNET_APPLICATION_TYPE_SET != port) - { - GNUNET_break (0); - GNUNET_CADET_channel_destroy (channel); - return NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New incoming channel\n"); - incoming = GNUNET_new (struct Operation); - incoming->is_incoming = GNUNET_YES; - incoming->peer = *initiator; - incoming->channel = channel; - incoming->mq = GNUNET_CADET_mq_create (incoming->channel); - incoming->vt = &incoming_vt; - incoming->timeout_task - = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, - &incoming_timeout_cb, - incoming); - GNUNET_CONTAINER_DLL_insert_tail (incoming_head, - incoming_tail, - incoming); - return incoming; -} - - /** * Function called whenever a channel is destroyed. Should clean up * any associated state. It must NOT call * GNUNET_CADET_channel_destroy() on the channel. * * The peer_disconnect function is part of a a virtual table set initially either - * when a peer creates a new channel with us (#channel_new_cb()), or once we create + * when a peer creates a new channel with us, or once we create * a new channel ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1879,7 +1865,7 @@ channel_end_cb (void *cls, * received via a cadet channel. * * The msg_handler is a virtual table set in initially either when a peer - * creates a new channel with us (channel_new_cb), or once we create a new channel + * creates a new channel with us, or once we create a new channel * ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1980,26 +1966,25 @@ run (void *cls, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, + { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0}, {NULL, 0, 0} }; - static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; configuration = cfg; - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); GNUNET_SERVER_disconnect_notify (server, - &handle_client_disconnect, NULL); + &handle_client_disconnect, + NULL); GNUNET_SERVER_add_handlers (server, server_handlers); _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); - cadet = GNUNET_CADET_connect (cfg, NULL, - &channel_new_cb, + cadet = GNUNET_CADET_connect (cfg, + NULL, &channel_end_cb, - cadet_handlers, - cadet_ports); + cadet_handlers); if (NULL == cadet) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR,