From 5f925941b0954eee3c93bd6fe72eeb092106cea2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 14 Aug 2016 15:25:12 +0000 Subject: [PATCH] migrating set to new CADET port API - tests now fail due to CADET issues --- src/set/gnunet-service-set.c | 283 ++++++++++------------ src/set/gnunet-service-set.h | 8 + src/set/gnunet-service-set_intersection.c | 1 - src/set/gnunet-service-set_protocol.h | 2 +- src/set/gnunet-service-set_union.c | 1 - src/set/test_set.conf | 2 +- src/set/test_set_api.c | 2 +- 7 files changed, 144 insertions(+), 155 deletions(-) diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 16f330a4d..e4e2535af 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -66,6 +66,11 @@ struct Listener */ struct GNUNET_HashCode app_id; + /** + * The port we are listening on with CADET. + */ + struct GNUNET_CADET_Port *open_port; + /** * The type of the operation. */ @@ -229,6 +234,7 @@ listener_destroy (struct Listener *listener) GNUNET_MQ_destroy (listener->client_mq); listener->client_mq = NULL; } + GNUNET_CADET_close_port (listener->open_port); GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener); @@ -320,7 +326,8 @@ collect_generation_garbage (struct Set *set) &gc); } -int + +static int is_excluded_generation (unsigned int generation, struct GenerationRange *excluded, unsigned int excluded_size) @@ -337,7 +344,7 @@ is_excluded_generation (unsigned int generation, } -int +static int is_element_of_generation (struct ElementEntry *ee, unsigned int query_generation, struct GenerationRange *excluded, @@ -611,8 +618,8 @@ static void handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) { - struct Set *set; struct Listener *listener; + struct Set *set; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n"); @@ -674,29 +681,6 @@ incoming_destroy (struct Operation *incoming) } -/** - * Find a listener that is interested in the given operation type - * and application id. - * - * @param op operation type to look for - * @param app_id application id to look for - * @return a matching listener, or NULL if no listener matches the - * given operation and application id - */ -static struct Listener * -listener_get_by_target (enum GNUNET_SET_OperationType op, - const struct GNUNET_HashCode *app_id) -{ - struct Listener *listener; - - for (listener = listeners_head; NULL != listener; listener = listener->next) - if ( (listener->operation == op) && - (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) ) - return listener; - return NULL; -} - - /** * Suggest the given request to the listener. The listening client can * then accept or reject the remote request. @@ -729,7 +713,8 @@ incoming_suggest (struct Operation *incoming, incoming->suggest_id); cmsg->accept_id = htonl (incoming->suggest_id); cmsg->peer_id = incoming->spec->peer; - GNUNET_MQ_send (listener->client_mq, mqm); + GNUNET_MQ_send (listener->client_mq, + mqm); } @@ -755,7 +740,7 @@ handle_incoming_msg (struct Operation *op, const struct GNUNET_MessageHeader *mh) { const struct OperationRequestMessage *msg; - struct Listener *listener; + struct Listener *listener = op->listener; struct OperationSpecification *spec; const struct GNUNET_MessageHeader *nested_context; @@ -787,29 +772,20 @@ handle_incoming_msg (struct Operation *op, if (NULL != nested_context) spec->context_msg = GNUNET_copy_message (nested_context); spec->operation = ntohl (msg->operation); - spec->app_id = msg->app_id; + spec->app_id = listener->app_id; spec->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); spec->peer = op->peer; spec->remote_element_count = ntohl (msg->element_count); op->spec = spec; - listener = listener_get_by_target (ntohl (msg->operation), - &msg->app_id); - if (NULL == listener) - { - GNUNET_break (NULL != op->timeout_task); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No matching listener for incoming request (op %u, app %s), waiting with timeout\n", - ntohl (msg->operation), - GNUNET_h2s (&msg->app_id)); - return GNUNET_OK; - } + listener = op->listener; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received P2P operation request (op %u, app %s) for active listener\n", + "Received P2P operation request (op %u, port %s) for active listener\n", ntohl (msg->operation), - GNUNET_h2s (&msg->app_id)); - incoming_suggest (op, listener); + GNUNET_h2s (&listener->app_id)); + incoming_suggest (op, + listener); return GNUNET_OK; } @@ -1133,6 +1109,100 @@ handle_client_create_set (void *cls, } +/** + * Timeout happens iff: + * - we suggested an operation to our listener, + * but did not receive a response in time + * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST + * + * @param cls channel context + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls) +{ + struct Operation *incoming = cls; + + incoming->timeout_task = NULL; + GNUNET_assert (GNUNET_YES == incoming->is_incoming); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Remote peer's incoming request timed out\n"); + incoming_destroy (incoming); +} + + +/** + * Terminates an incoming operation in case we have not yet received an + * operation request. Called by the channel destruction handler. + * + * @param op the channel context + */ +static void +handle_incoming_disconnect (struct Operation *op) +{ + GNUNET_assert (GNUNET_YES == op->is_incoming); + /* channel is already dead, incoming_destroy must not + * destroy it ... */ + op->channel = NULL; + incoming_destroy (op); + op->vt = NULL; +} + + +/** + * Method called whenever another peer has added us to a channel the + * other peer initiated. Only called (once) upon reception of data + * from a channel we listen on. + * + * The channel context represents the operation itself and gets added + * to a DLL, from where it gets looked up when our local listener + * client responds to a proposed/suggested operation or connects and + * associates with this operation. + * + * @param cls closure + * @param channel new handle to the channel + * @param initiator peer that started the channel + * @param port Port this channel is for. + * @param options Unused. + * @return initial channel context for the channel + * returns NULL on error + */ +static void * +channel_new_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_HashCode *port, + enum GNUNET_CADET_ChannelOption options) +{ + static const struct SetVT incoming_vt = { + .msg_handler = &handle_incoming_msg, + .peer_disconnect = &handle_incoming_disconnect + }; + struct Listener *listener = cls; + struct Operation *incoming; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New incoming channel\n"); + incoming = GNUNET_new (struct Operation); + incoming->listener = listener; + incoming->is_incoming = GNUNET_YES; + incoming->peer = *initiator; + incoming->channel = channel; + incoming->mq = GNUNET_CADET_mq_create (incoming->channel); + incoming->vt = &incoming_vt; + incoming->timeout_task + = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, + &incoming_timeout_cb, + incoming); + GNUNET_CONTAINER_DLL_insert_tail (incoming_head, + incoming_tail, + incoming); + // incoming_suggest (incoming, + // listener); + return incoming; +} + + /** * Called when a client wants to create a new listener. * @@ -1165,8 +1235,12 @@ handle_client_listen (void *cls, GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); + listener->open_port = GNUNET_CADET_open_port (cadet, + &msg->app_id, + &channel_new_cb, + listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New listener created (op %u, app %s)\n", + "New listener created (op %u, port %s)\n", listener->operation, GNUNET_h2s (&listener->app_id)); @@ -1187,7 +1261,8 @@ handle_client_listen (void *cls, incoming_suggest (op, listener); } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); } @@ -1305,6 +1380,7 @@ advance_generation (struct Set *set) r); } + /** * Called when a client wants to initiate a set operation with another * peer. Initiates the CADET connection to the listener and sends the @@ -1355,10 +1431,13 @@ handle_client_evaluate (void *cls, GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new CADET channel to port %s\n", + GNUNET_h2s (&msg->app_id)); op->channel = GNUNET_CADET_channel_create (cadet, op, &msg->target_peer, - GC_u2h (GNUNET_APPLICATION_TYPE_SET), + &msg->app_id, GNUNET_CADET_OPTION_RELIABLE); op->mq = GNUNET_CADET_mq_create (op->channel); set->vt->evaluate (op, @@ -1534,7 +1613,8 @@ handle_client_copy_lazy_connect (void *cls, return; } - if (NULL == set->vt->copy_state) { + if (NULL == set->vt->copy_state) + { /* Lazy copy not supported for this set operation */ GNUNET_break (0); GNUNET_free (set); @@ -1734,109 +1814,13 @@ shutdown_task (void *cls) } -/** - * Timeout happens iff: - * - we suggested an operation to our listener, - * but did not receive a response in time - * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST - * - * @param cls channel context - * @param tc context information (why was this task triggered now) - */ -static void -incoming_timeout_cb (void *cls) -{ - struct Operation *incoming = cls; - - incoming->timeout_task = NULL; - GNUNET_assert (GNUNET_YES == incoming->is_incoming); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Remote peer's incoming request timed out\n"); - incoming_destroy (incoming); -} - - -/** - * Terminates an incoming operation in case we have not yet received an - * operation request. Called by the channel destruction handler. - * - * @param op the channel context - */ -static void -handle_incoming_disconnect (struct Operation *op) -{ - GNUNET_assert (GNUNET_YES == op->is_incoming); - /* channel is already dead, incoming_destroy must not - * destroy it ... */ - op->channel = NULL; - incoming_destroy (op); - op->vt = NULL; -} - - -/** - * Method called whenever another peer has added us to a channel the - * other peer initiated. Only called (once) upon reception of data - * with a message type which was subscribed to in - * GNUNET_CADET_connect(). - * - * The channel context represents the operation itself and gets added to a DLL, - * from where it gets looked up when our local listener client responds - * to a proposed/suggested operation or connects and associates with this operation. - * - * @param cls closure - * @param channel new handle to the channel - * @param initiator peer that started the channel - * @param port Port this channel is for. - * @param options Unused. - * @return initial channel context for the channel - * returns NULL on error - */ -static void * -channel_new_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_HashCode *port, - enum GNUNET_CADET_ChannelOption options) -{ - static const struct SetVT incoming_vt = { - .msg_handler = &handle_incoming_msg, - .peer_disconnect = &handle_incoming_disconnect - }; - struct Operation *incoming; - - if (0 != memcmp (GC_u2h (GNUNET_APPLICATION_TYPE_SET), port, sizeof (*port))) - { - GNUNET_break (0); - GNUNET_CADET_channel_destroy (channel); - return NULL; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "New incoming channel\n"); - incoming = GNUNET_new (struct Operation); - incoming->is_incoming = GNUNET_YES; - incoming->peer = *initiator; - incoming->channel = channel; - incoming->mq = GNUNET_CADET_mq_create (incoming->channel); - incoming->vt = &incoming_vt; - incoming->timeout_task - = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, - &incoming_timeout_cb, - incoming); - GNUNET_CONTAINER_DLL_insert_tail (incoming_head, - incoming_tail, - incoming); - return incoming; -} - - /** * Function called whenever a channel is destroyed. Should clean up * any associated state. It must NOT call * GNUNET_CADET_channel_destroy() on the channel. * * The peer_disconnect function is part of a a virtual table set initially either - * when a peer creates a new channel with us (#channel_new_cb()), or once we create + * when a peer creates a new channel with us, or once we create * a new channel ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1881,7 +1865,7 @@ channel_end_cb (void *cls, * received via a cadet channel. * * The msg_handler is a virtual table set in initially either when a peer - * creates a new channel with us (channel_new_cb), or once we create a new channel + * creates a new channel with us, or once we create a new channel * ourselves (evaluate). * * Once we know the exact type of operation (union/intersection), the vt is @@ -1992,16 +1976,15 @@ run (void *cls, configuration = cfg; GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); GNUNET_SERVER_disconnect_notify (server, - &handle_client_disconnect, NULL); + &handle_client_disconnect, + NULL); GNUNET_SERVER_add_handlers (server, server_handlers); _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); - cadet = GNUNET_CADET_connect (cfg, NULL, + cadet = GNUNET_CADET_connect (cfg, + NULL, &channel_end_cb, cadet_handlers); - GNUNET_CADET_open_port (cadet, - GC_u2h (GNUNET_APPLICATION_TYPE_SET), - &channel_new_cb, NULL); if (NULL == cadet) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index de9dfbded..9e1ffd01a 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -336,6 +336,9 @@ struct ElementEntry }; +struct Listener; + + /** * Operation context used to execute a set operation. */ @@ -353,6 +356,11 @@ struct Operation */ struct GNUNET_CADET_Channel *channel; + /** + * Port this operation runs on. + */ + struct Listener *listener; + /** * Message queue for the channel. */ diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index e9d97f6a8..258ad6443 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -1036,7 +1036,6 @@ intersection_evaluate (struct Operation *op, return; } msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); - msg->app_id = op->spec->app_id; msg->element_count = htonl (op->state->my_element_count); GNUNET_MQ_send (op->mq, ev); diff --git a/src/set/gnunet-service-set_protocol.h b/src/set/gnunet-service-set_protocol.h index 97cf40a63..748da15fc 100644 --- a/src/set/gnunet-service-set_protocol.h +++ b/src/set/gnunet-service-set_protocol.h @@ -52,7 +52,7 @@ struct OperationRequestMessage /** * Application-specific identifier of the request. */ - struct GNUNET_HashCode app_id; + struct GNUNET_HashCode app_idX; /* rest: optional message */ }; diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 669e34c8f..6a1072a30 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1597,7 +1597,6 @@ union_evaluate (struct Operation *op, return; } msg->operation = htonl (GNUNET_SET_OPERATION_UNION); - msg->app_id = op->spec->app_id; GNUNET_MQ_send (op->mq, ev); diff --git a/src/set/test_set.conf b/src/set/test_set.conf index 30ccbde55..69e7f5c52 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf @@ -5,7 +5,7 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-set/ [set] AUTOSTART = YES -PREFIX = valgrind +# PREFIX = valgrind #PREFIX = valgrind --leak-check=full #PREFIX = gdbserver :1234 OPTIONS = -L INFO diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index 19d63d00b..eea47f57d 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c @@ -151,7 +151,7 @@ listen_cb (void *cls, GNUNET_SET_RESULT_ADDED, &result_cb_set2, NULL); - GNUNET_SET_commit (oh, + GNUNET_SET_commit (oh2, set2); } -- 2.25.1