From 2a313fb65fe21e5ac6a1ef268fdf43c2ba46a330 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 29 Jun 2016 17:20:17 +0000 Subject: [PATCH] convert fs_list_indexed to MQ API --- src/fs/fs_list_indexed.c | 170 ++++++++++++++++++++++++--------------- src/fs/fs_search.c | 25 ++++-- 2 files changed, 122 insertions(+), 73 deletions(-) diff --git a/src/fs/fs_list_indexed.c b/src/fs/fs_list_indexed.c index e10260d7b..21385d40b 100644 --- a/src/fs/fs_list_indexed.c +++ b/src/fs/fs_list_indexed.c @@ -32,19 +32,15 @@ /** - * Context for "GNUNET_FS_get_indexed_files". + * Context for #GNUNET_FS_get_indexed_files(). */ struct GNUNET_FS_GetIndexedContext { - /** - * Handle to global FS context. - */ - struct GNUNET_FS_Handle *h; /** * Connection to the FS service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Function to call for each indexed file. @@ -52,7 +48,7 @@ struct GNUNET_FS_GetIndexedContext GNUNET_FS_IndexedFileProcessor iterator; /** - * Closure for iterator. + * Closure for @e iterator. */ void *iterator_cls; @@ -62,7 +58,7 @@ struct GNUNET_FS_GetIndexedContext GNUNET_SCHEDULER_TaskCallback cont; /** - * Closure for cont. + * Closure for @e cont. */ void *cont_cls; }; @@ -72,58 +68,91 @@ struct GNUNET_FS_GetIndexedContext * Function called on each response from the FS * service with information about indexed files. * - * @param cls closure (of type "struct GNUNET_FS_GetIndexedContext*") + * @param cls closure (of type `struct GNUNET_FS_GetIndexedContext *`) * @param msg message with indexing information */ static void -handle_index_info (void *cls, const struct GNUNET_MessageHeader *msg) +handle_index_info_end (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_FS_GetIndexedContext *gic = cls; - const struct IndexInfoMessage *iim; - uint16_t msize; + + (void) gic->iterator (gic->iterator_cls, + NULL, + NULL); + GNUNET_FS_get_indexed_files_cancel (gic); +} + + +/** + * Check validity of response from the FS + * service with information about indexed files. + * + * @param cls closure (of type `struct GNUNET_FS_GetIndexedContext *`) + * @param iim message with indexing information + */ +static int +check_index_info (void *cls, + const struct IndexInfoMessage *iim) +{ + uint16_t msize = ntohs (iim->header.size) - sizeof (*iim); const char *filename; - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Failed to receive response for `%s' request from `%s' service.\n"), - "GET_INDEXED", "fs"); - (void) gic->iterator (gic->iterator_cls, NULL, NULL); - GNUNET_FS_get_indexed_files_cancel (gic); - return; - } - if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END) - { - /* normal end-of-list */ - (void) gic->iterator (gic->iterator_cls, NULL, NULL); - GNUNET_FS_get_indexed_files_cancel (gic); - return; - } - msize = ntohs (msg->size); - iim = (const struct IndexInfoMessage *) msg; filename = (const char *) &iim[1]; - if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY) || - (msize <= sizeof (struct IndexInfoMessage)) || - (filename[msize - sizeof (struct IndexInfoMessage) - 1] != '\0')) + if (filename[msize - 1] != '\0') { - /* bogus reply */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Failed to receive valid response for `%s' request from `%s' service.\n"), - "GET_INDEXED", "fs"); - (void) gic->iterator (gic->iterator_cls, NULL, NULL); - GNUNET_FS_get_indexed_files_cancel (gic); - return; + GNUNET_break (0); + return GNUNET_SYSERR; } - if (GNUNET_OK != gic->iterator (gic->iterator_cls, filename, &iim->file_id)) + return GNUNET_OK; +} + + +/** + * Function called on each response from the FS + * service with information about indexed files. + * + * @param cls closure (of type `struct GNUNET_FS_GetIndexedContext *`) + * @param iim message with indexing information + */ +static void +handle_index_info (void *cls, + const struct IndexInfoMessage *iim) +{ + struct GNUNET_FS_GetIndexedContext *gic = cls; + const char *filename; + + filename = (const char *) &iim[1]; + if (GNUNET_OK != + gic->iterator (gic->iterator_cls, + filename, + &iim->file_id)) { GNUNET_FS_get_indexed_files_cancel (gic); return; } - /* get more */ - GNUNET_CLIENT_receive (gic->client, &handle_index_info, gic, - GNUNET_CONSTANTS_SERVICE_TIMEOUT); +} + + +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_FS_GetIndexedContent *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_FS_GetIndexedContext *gic = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive response from `%s' service.\n"), + "fs"); + (void) gic->iterator (gic->iterator_cls, NULL, NULL); + GNUNET_FS_get_indexed_files_cancel (gic); } @@ -140,30 +169,41 @@ GNUNET_FS_get_indexed_files (struct GNUNET_FS_Handle *h, GNUNET_FS_IndexedFileProcessor iterator, void *iterator_cls) { - struct GNUNET_CLIENT_Connection *client; - struct GNUNET_FS_GetIndexedContext *gic; - struct GNUNET_MessageHeader msg; - - client = GNUNET_CLIENT_connect ("fs", h->cfg); - if (NULL == client) + GNUNET_MQ_hd_fixed_size (index_info_end, + GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_var_size (index_info, + GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY, + struct IndexInfoMessage); + struct GNUNET_FS_GetIndexedContext *gic + = GNUNET_new (struct GNUNET_FS_GetIndexedContext); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_index_info_end_handler (gic), + make_index_info_handler (gic), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_MessageHeader *msg; + + gic->mq = GNUNET_CLIENT_connecT (h->cfg, + "fs", + handlers, + &mq_error_handler, + h); + if (NULL == gic->mq) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to not connect to `%s' service.\n"), "fs"); + _("Failed to not connect to `%s' service.\n"), + "fs"); + GNUNET_free (gic); return NULL; } - gic = GNUNET_new (struct GNUNET_FS_GetIndexedContext); - gic->h = h; - gic->client = client; gic->iterator = iterator; gic->iterator_cls = iterator_cls; - msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - msg.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET); - GNUNET_assert (GNUNET_OK == - GNUNET_CLIENT_transmit_and_get_response (client, &msg, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_YES, - &handle_index_info, - gic)); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET); + GNUNET_MQ_send (gic->mq, + env); return gic; } @@ -176,7 +216,7 @@ GNUNET_FS_get_indexed_files (struct GNUNET_FS_Handle *h, void GNUNET_FS_get_indexed_files_cancel (struct GNUNET_FS_GetIndexedContext *gic) { - GNUNET_CLIENT_disconnect (gic->client); + GNUNET_MQ_destroy (gic->mq); GNUNET_free (gic); } diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c index eaabac746..f3221ac76 100644 --- a/src/fs/fs_search.c +++ b/src/fs/fs_search.c @@ -651,9 +651,12 @@ process_ksk_result (struct GNUNET_FS_SearchContext *sc, * @return context that can be used to control the search */ static struct GNUNET_FS_SearchContext * -search_start (struct GNUNET_FS_Handle *h, const struct GNUNET_FS_Uri *uri, - uint32_t anonymity, enum GNUNET_FS_SearchOptions options, - void *cctx, struct GNUNET_FS_SearchResult *psearch); +search_start (struct GNUNET_FS_Handle *h, + const struct GNUNET_FS_Uri *uri, + uint32_t anonymity, + enum GNUNET_FS_SearchOptions options, + void *cctx, + struct GNUNET_FS_SearchResult *psearch); /** @@ -957,7 +960,8 @@ try_reconnect (struct GNUNET_FS_SearchContext *sc); * @param msg message received, NULL on timeout or fatal error */ static void -receive_results (void *cls, const struct GNUNET_MessageHeader *msg) +receive_results (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_FS_SearchContext *sc = cls; const struct ClientPutMessage *cm; @@ -971,13 +975,16 @@ receive_results (void *cls, const struct GNUNET_MessageHeader *msg) } msize = ntohs (msg->size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving %u bytes of result from fs service\n", msize); + "Receiving %u bytes of result from fs service\n", + msize); cm = (const struct ClientPutMessage *) msg; process_result (sc, ntohl (cm->type), GNUNET_TIME_absolute_ntoh (cm->expiration), &cm[1], msize - sizeof (struct ClientPutMessage)); /* continue receiving */ - GNUNET_CLIENT_receive (sc->client, &receive_results, sc, + GNUNET_CLIENT_receive (sc->client, + &receive_results, + sc, GNUNET_TIME_UNIT_FOREVER_REL); } @@ -993,7 +1000,7 @@ schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc); /** - * Closure for 'build_result_set'. + * Closure for #build_result_set(). */ struct MessageBuilderContext { @@ -1094,7 +1101,9 @@ find_result_set (void *cls, * @return number of bytes written to @a buf */ static size_t -transmit_search_request (void *cls, size_t size, void *buf) +transmit_search_request (void *cls, + size_t size, + void *buf) { struct GNUNET_FS_SearchContext *sc = cls; struct MessageBuilderContext mbc; -- 2.25.1