From 5dff30a84e1ac0c52f9bd8b671335b5100d37b0d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 3 Jul 2016 12:50:36 +0000 Subject: [PATCH] convert search to MQ --- src/fs/fs_api.c | 2 +- src/fs/fs_api.h | 14 +- src/fs/fs_search.c | 421 +++++++++++++++++++++++---------------------- 3 files changed, 214 insertions(+), 223 deletions(-) diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c index 0bc07183e..7ebcd093e 100644 --- a/src/fs/fs_api.c +++ b/src/fs/fs_api.c @@ -2869,7 +2869,7 @@ signal_search_resume (struct GNUNET_FS_SearchContext *sc) pi.status = GNUNET_FS_STATUS_SEARCH_RESUME; pi.value.search.specifics.resume.message = sc->emsg; pi.value.search.specifics.resume.is_paused = - (NULL == sc->client) ? GNUNET_YES : GNUNET_NO; + (NULL == sc->mq) ? GNUNET_YES : GNUNET_NO; sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, &signal_result_resume, sc); diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h index 86219b3f8..126f5902e 100644 --- a/src/fs/fs_api.h +++ b/src/fs/fs_api.h @@ -1571,7 +1571,7 @@ struct GNUNET_FS_SearchContext /** * Connection to the FS service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Pointer we keep for the client. @@ -1620,18 +1620,6 @@ struct GNUNET_FS_SearchContext */ struct GNUNET_SCHEDULER_Task *task; - /** - * How many of the entries in the search request - * map have been passed to the service so far? - */ - unsigned int search_request_map_offset; - - /** - * How many of the keywords in the KSK - * map have been passed to the service so far? - */ - unsigned int keyword_offset; - /** * Anonymity level for the search. */ diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c index 9a1b822e1..8a3652e3f 100644 --- a/src/fs/fs_search.c +++ b/src/fs/fs_search.c @@ -892,22 +892,46 @@ process_sblock (struct GNUNET_FS_SearchContext *sc, /** - * Process a search result. + * Shutdown any existing connection to the FS + * service and try to establish a fresh one + * (and then re-transmit our search request). * - * @param sc our search context - * @param type type of the result - * @param expiration when it will expire - * @param data the (encrypted) response - * @param size size of @a data + * @param sc the search to reconnec */ static void -process_result (struct GNUNET_FS_SearchContext *sc, - enum GNUNET_BLOCK_Type type, - struct GNUNET_TIME_Absolute expiration, - const void *data, - size_t size) +try_reconnect (struct GNUNET_FS_SearchContext *sc); + + +/** + * We check a result message from the service. + * + * @param cls closure + * @param msg result message received + */ +static int +check_result (void *cls, + const struct ClientPutMessage *cm) { - if (GNUNET_TIME_absolute_get_duration (expiration).rel_value_us > 0) + /* payload of any variable size is OK */ + return GNUNET_OK; +} + + +/** + * We process a search result from the service. + * + * @param cls closure + * @param msg result message received + */ +static void +handle_result (void *cls, + const struct ClientPutMessage *cm) +{ + struct GNUNET_FS_SearchContext *sc = cls; + uint16_t msize = ntohs (cm->header.size) - sizeof (*cm); + enum GNUNET_BLOCK_Type type = ntohl (cm->type); + + if (GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (cm->expiration)).rel_value_us > 0) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Result received has already expired.\n"); @@ -917,9 +941,13 @@ process_result (struct GNUNET_FS_SearchContext *sc, { case GNUNET_BLOCK_TYPE_FS_UBLOCK: if (GNUNET_FS_URI_SKS == sc->uri->type) - process_sblock (sc, data, size); + process_sblock (sc, + (const struct UBlock *) &cm[1], + msize); else - process_kblock (sc, data, size); + process_kblock (sc, + (const struct UBlock *) &cm[1], + msize); break; case GNUNET_BLOCK_TYPE_ANY: GNUNET_break (0); @@ -935,60 +963,13 @@ process_result (struct GNUNET_FS_SearchContext *sc, break; default: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Got result with unknown block type `%d', ignoring"), type); + _("Got result with unknown block type `%d', ignoring"), + type); break; } } -/** - * Shutdown any existing connection to the FS - * service and try to establish a fresh one - * (and then re-transmit our search request). - * - * @param sc the search to reconnec - */ -static void -try_reconnect (struct GNUNET_FS_SearchContext *sc); - - -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -receive_results (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_FS_SearchContext *sc = cls; - const struct ClientPutMessage *cm; - uint16_t msize; - - if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || - (ntohs (msg->size) <= sizeof (struct ClientPutMessage))) - { - try_reconnect (sc); - return; - } - msize = ntohs (msg->size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving %u bytes of result from fs service\n", - msize); - cm = (const struct ClientPutMessage *) msg; - process_result (sc, ntohl (cm->type), - GNUNET_TIME_absolute_ntoh (cm->expiration), &cm[1], - msize - sizeof (struct ClientPutMessage)); - /* continue receiving */ - GNUNET_CLIENT_receive (sc->client, - &receive_results, - sc, - GNUNET_TIME_UNIT_FOREVER_REL); -} - - /** * Schedule the transmission of the (next) search request * to the service. @@ -1058,7 +1039,6 @@ build_result_set (void *cls, } if (0 == mbc->put_cnt) return GNUNET_SYSERR; - mbc->sc->search_request_map_offset++; mbc->xoff[--mbc->put_cnt] = *key; return GNUNET_OK; @@ -1091,155 +1071,157 @@ find_result_set (void *cls, /** - * We're ready to transmit the search request to the file-sharing - * service. Do it. If the request is too large to fit into a single - * message, transmit in increments. + * Schedule the transmission of the (next) search request + * to the service. * - * @param cls closure - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf + * @param sc context for the search */ -static size_t -transmit_search_request (void *cls, - size_t size, - void *buf) +static void +schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc) { - struct GNUNET_FS_SearchContext *sc = cls; struct MessageBuilderContext mbc; - size_t msize; + struct GNUNET_MQ_Envelope *env; struct SearchMessage *sm; struct GNUNET_CRYPTO_EcdsaPublicKey dpub; unsigned int total_seen_results; /* total number of result hashes to send */ - unsigned int message_size_limit; uint32_t options; + unsigned int left; + unsigned int todo; + unsigned int fit; + int first_call; + unsigned int search_request_map_offset; + unsigned int keyword_offset; - if (NULL == buf) - { - try_reconnect (sc); - return 0; - } + memset (&mbc, 0, sizeof (mbc)); mbc.sc = sc; - mbc.skip_cnt = sc->search_request_map_offset; - sm = buf; - sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH); - mbc.xoff = (struct GNUNET_HashCode *) &sm[1]; - options = SEARCH_MESSAGE_OPTION_NONE; - if (0 != (sc->options & GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY)) - options |= SEARCH_MESSAGE_OPTION_LOOPBACK_ONLY; if (GNUNET_FS_uri_test_ksk (sc->uri)) { - msize = sizeof (struct SearchMessage); - GNUNET_assert (size >= msize); - mbc.keyword_offset = sc->keyword_offset; - /* calculate total number of known results (in put_cnt => total_seen_results) */ mbc.put_cnt = 0; GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, - &find_result_set, &mbc); + &find_result_set, + &mbc); total_seen_results = mbc.put_cnt; - /* calculate how many results we can send in this message */ - message_size_limit = (size - msize) / sizeof (struct GNUNET_HashCode); - mbc.put_cnt = GNUNET_MIN (message_size_limit, - total_seen_results - mbc.skip_cnt); - if (sc->search_request_map_offset < total_seen_results) - GNUNET_assert (mbc.put_cnt > 0); - - /* now build message */ - msize += sizeof (struct GNUNET_HashCode) * mbc.put_cnt; - sm->header.size = htons (msize); - sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); - sm->anonymity_level = htonl (sc->anonymity); - memset (&sm->target, 0, sizeof (struct GNUNET_PeerIdentity)); - sm->query = sc->requests[sc->keyword_offset].uquery; - GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, - &build_result_set, &mbc); - GNUNET_assert (0 == mbc.put_cnt); - GNUNET_assert (total_seen_results >= sc->search_request_map_offset); - if (total_seen_results != sc->search_request_map_offset) - { - /* more requesting to be done... */ - sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); - schedule_transmit_search_request (sc); - return msize; - } - sm->options = htonl (options); - sc->keyword_offset++; - sc->search_request_map_offset = 0; - if (sc->uri->data.ksk.keywordCount != sc->keyword_offset) - { - /* more requesting to be done... */ - schedule_transmit_search_request (sc); - return msize; - } } else { - GNUNET_assert (GNUNET_FS_uri_test_sks (sc->uri)); - msize = sizeof (struct SearchMessage); - GNUNET_assert (size >= msize); - sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); - sm->anonymity_level = htonl (sc->anonymity); - memset (&sm->target, 0, sizeof (struct GNUNET_PeerIdentity)); - GNUNET_CRYPTO_ecdsa_public_key_derive (&sc->uri->data.sks.ns, - sc->uri->data.sks.identifier, - "fs-ublock", - &dpub); - GNUNET_CRYPTO_hash (&dpub, - sizeof (dpub), - &sm->query); - message_size_limit = (size - msize) / sizeof (struct GNUNET_HashCode); - total_seen_results = GNUNET_CONTAINER_multihashmap_size (sc->master_result_map); - mbc.put_cnt = GNUNET_MIN (message_size_limit, - total_seen_results - mbc.skip_cnt); - mbc.keyword_offset = 0; - if (sc->search_request_map_offset < total_seen_results) - GNUNET_assert (mbc.put_cnt > 0); - msize += sizeof (struct GNUNET_HashCode) * mbc.put_cnt; - GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, - &build_result_set, &mbc); - sm->header.size = htons (msize); - GNUNET_assert (total_seen_results >= sc->search_request_map_offset); - if (total_seen_results != sc->search_request_map_offset) + total_seen_results + = GNUNET_CONTAINER_multihashmap_size (sc->master_result_map); + } + search_request_map_offset = 0; + keyword_offset = 0; + + first_call = GNUNET_YES; + while ( (0 != (left = + (total_seen_results - search_request_map_offset))) || + (GNUNET_YES == first_call) ) + { + first_call = GNUNET_NO; + options = SEARCH_MESSAGE_OPTION_NONE; + if (0 != (sc->options & GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY)) + options |= SEARCH_MESSAGE_OPTION_LOOPBACK_ONLY; + + fit = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (*sm)) / sizeof (struct GNUNET_HashCode); + todo = GNUNET_MIN (fit, + left); + env = GNUNET_MQ_msg_extra (sm, + sizeof (struct GNUNET_HashCode) * todo, + GNUNET_MESSAGE_TYPE_FS_START_SEARCH); + mbc.skip_cnt = search_request_map_offset; + mbc.xoff = (struct GNUNET_HashCode *) &sm[1]; + + if (GNUNET_FS_uri_test_ksk (sc->uri)) + { + mbc.keyword_offset = keyword_offset; + /* calculate how many results we can send in this message */ + mbc.put_cnt = todo; + /* now build message */ + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); + sm->anonymity_level = htonl (sc->anonymity); + memset (&sm->target, + 0, + sizeof (struct GNUNET_PeerIdentity)); + sm->query = sc->requests[keyword_offset].uquery; + GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, + &build_result_set, + &mbc); + search_request_map_offset += todo; + GNUNET_assert (0 == mbc.put_cnt); + GNUNET_assert (total_seen_results >= search_request_map_offset); + if (total_seen_results != search_request_map_offset) + { + /* more requesting to be done... */ + sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); + } + else + { + sm->options = htonl (options); + keyword_offset++; + search_request_map_offset = 0; + if (sc->uri->data.ksk.keywordCount != keyword_offset) + { + /* more keywords => more requesting to be done... */ + first_call = GNUNET_YES; + } + } + } + else { - /* more requesting to be done... */ - sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); - schedule_transmit_search_request (sc); - return msize; + GNUNET_assert (GNUNET_FS_uri_test_sks (sc->uri)); + + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); + sm->anonymity_level = htonl (sc->anonymity); + memset (&sm->target, + 0, + sizeof (struct GNUNET_PeerIdentity)); + GNUNET_CRYPTO_ecdsa_public_key_derive (&sc->uri->data.sks.ns, + sc->uri->data.sks.identifier, + "fs-ublock", + &dpub); + GNUNET_CRYPTO_hash (&dpub, + sizeof (dpub), + &sm->query); + mbc.put_cnt = todo; + mbc.keyword_offset = 0; + GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, + &build_result_set, + &mbc); + GNUNET_assert (total_seen_results >= search_request_map_offset); + if (total_seen_results != search_request_map_offset) + { + /* more requesting to be done... */ + sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); + } + else + { + sm->options = htonl (options); + } } - sm->options = htonl (options); + GNUNET_MQ_send (sc->mq, + env); } - GNUNET_CLIENT_receive (sc->client, - &receive_results, sc, - GNUNET_TIME_UNIT_FOREVER_REL); - return msize; } /** - * Schedule the transmission of the (next) search request - * to the service. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param sc context for the search + * @param cls closure with the `struct GNUNET_FS_SearchContext *` + * @param error error code */ static void -schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc) +search_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - size_t size; - unsigned int left; - unsigned int fit; - unsigned int request; - - size = sizeof (struct SearchMessage); - left = - GNUNET_CONTAINER_multihashmap_size (sc->master_result_map) - - sc->search_request_map_offset; - fit = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - size) / sizeof (struct GNUNET_HashCode); - request = GNUNET_MIN (fit, left); - size += sizeof (struct GNUNET_HashCode) * request; - GNUNET_CLIENT_notify_transmit_ready (sc->client, size, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_NO, - &transmit_search_request, sc); + struct GNUNET_FS_SearchContext *sc = cls; + + if (NULL != sc->mq) + { + GNUNET_MQ_destroy (sc->mq); + sc->mq = NULL; + } + try_reconnect (sc); } @@ -1252,19 +1234,26 @@ schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc) static void do_reconnect (void *cls) { + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_FS_PUT, + struct ClientPutMessage); struct GNUNET_FS_SearchContext *sc = cls; - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_result_handler (sc), + GNUNET_MQ_handler_end () + }; sc->task = NULL; - client = GNUNET_CLIENT_connect ("fs", sc->h->cfg); - if (NULL == client) + sc->mq = GNUNET_CLIENT_connecT (sc->h->cfg, + "fs", + handlers, + &search_mq_error_handler, + sc); + if (NULL == sc->mq) { try_reconnect (sc); return; } - sc->client = client; - sc->search_request_map_offset = 0; - sc->keyword_offset = 0; schedule_transmit_search_request (sc); } @@ -1279,10 +1268,10 @@ do_reconnect (void *cls) static void try_reconnect (struct GNUNET_FS_SearchContext *sc) { - if (NULL != sc->client) + if (NULL != sc->mq) { - GNUNET_CLIENT_disconnect (sc->client); - sc->client = NULL; + GNUNET_MQ_destroy (sc->mq); + sc->mq = NULL; } sc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (sc->reconnect_backoff); sc->task = @@ -1388,7 +1377,7 @@ GNUNET_FS_search_start_searching_ (struct GNUNET_FS_SearchContext *sc) struct GNUNET_CRYPTO_EcdsaPublicKey anon_pub; struct SearchRequestEntry *sre; - GNUNET_assert (NULL == sc->client); + GNUNET_assert (NULL == sc->mq); if (GNUNET_FS_uri_test_ksk (sc->uri)) { GNUNET_assert (0 != sc->uri->data.ksk.keywordCount); @@ -1418,11 +1407,14 @@ GNUNET_FS_search_start_searching_ (struct GNUNET_FS_SearchContext *sc) &update_sre_result_maps, sc); } - sc->client = GNUNET_CLIENT_connect ("fs", sc->h->cfg); - if (NULL == sc->client) + GNUNET_assert (NULL == sc->task); + do_reconnect (sc); + if (NULL == sc->mq) + { + GNUNET_SCHEDULER_cancel (sc->task); + sc->task = NULL; return GNUNET_SYSERR; - sc->search_request_map_offset = 0; - schedule_transmit_search_request (sc); + } return GNUNET_OK; } @@ -1552,10 +1544,10 @@ GNUNET_FS_search_signal_suspend_ (void *cls) GNUNET_SCHEDULER_cancel (sc->task); sc->task = NULL; } - if (NULL != sc->client) + if (NULL != sc->mq) { - GNUNET_CLIENT_disconnect (sc->client); - sc->client = NULL; + GNUNET_MQ_destroy (sc->mq); + sc->mq = NULL; } GNUNET_CONTAINER_multihashmap_destroy (sc->master_result_map); if (NULL != sc->requests) @@ -1616,14 +1608,19 @@ GNUNET_FS_search_pause (struct GNUNET_FS_SearchContext *sc) GNUNET_SCHEDULER_cancel (sc->task); sc->task = NULL; } - if (NULL != sc->client) - GNUNET_CLIENT_disconnect (sc->client); - sc->client = NULL; + if (NULL != sc->mq) + { + GNUNET_MQ_destroy (sc->mq); + sc->mq = NULL; + } GNUNET_FS_search_sync_ (sc); GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, - &search_result_freeze_probes, sc); + &search_result_freeze_probes, + sc); pi.status = GNUNET_FS_STATUS_SEARCH_PAUSED; - sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); + sc->client_info = GNUNET_FS_search_make_status_ (&pi, + sc->h, + sc); } @@ -1637,7 +1634,7 @@ GNUNET_FS_search_continue (struct GNUNET_FS_SearchContext *sc) { struct GNUNET_FS_ProgressInfo pi; - GNUNET_assert (NULL == sc->client); + GNUNET_assert (NULL == sc->mq); GNUNET_assert (NULL == sc->task); do_reconnect (sc); GNUNET_FS_search_sync_ (sc); @@ -1769,9 +1766,15 @@ GNUNET_FS_search_stop (struct GNUNET_FS_SearchContext *sc) sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); GNUNET_break (NULL == sc->client_info); if (NULL != sc->task) + { GNUNET_SCHEDULER_cancel (sc->task); - if (NULL != sc->client) - GNUNET_CLIENT_disconnect (sc->client); + sc->task = NULL; + } + if (NULL != sc->mq) + { + GNUNET_MQ_destroy (sc->mq); + sc->mq = NULL; + } GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, &search_result_free, sc); GNUNET_CONTAINER_multihashmap_destroy (sc->master_result_map); -- 2.25.1