From 07a59d7b5b66e8b59029894b9ee42069abb7a187 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 3 Jul 2016 13:18:48 +0000 Subject: [PATCH] convert fs publish to MQ --- src/fs/fs_api.h | 7 +- src/fs/fs_publish.c | 284 +++++++++++++++++++++++++++++--------------- 2 files changed, 187 insertions(+), 104 deletions(-) diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h index 126f5902e..e85de94a7 100644 --- a/src/fs/fs_api.h +++ b/src/fs/fs_api.h @@ -1202,11 +1202,6 @@ struct GNUNET_FS_PublishContext */ struct GNUNET_FS_Handle *h; - /** - * Connection to FS service (only used for LOC URI signing). - */ - struct GNUNET_CLIENT_Connection *fs_client; - /** * Our top-level activity entry (if we are top-level, otherwise NULL). */ @@ -1242,7 +1237,7 @@ struct GNUNET_FS_PublishContext * Our own message queue for the FS service; only briefly used when * we start to index a file, otherwise NULL. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Current position in the file-tree for the upload. diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c index 89cc2714c..7cf8b4815 100644 --- a/src/fs/fs_publish.c +++ b/src/fs/fs_publish.c @@ -92,10 +92,10 @@ publish_cleanup (struct GNUNET_FS_PublishContext *pc) GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO); pc->dsh = NULL; } - if (NULL != pc->client) + if (NULL != pc->mq) { - GNUNET_CLIENT_disconnect (pc->client); - pc->client = NULL; + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; } GNUNET_assert (NULL == pc->upload_task); GNUNET_free (pc); @@ -493,7 +493,8 @@ block_proc (void *cls, p = pc->fi_pos; if (NULL == pc->dsh) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Waiting for datastore connection\n"); GNUNET_assert (NULL == pc->upload_task); pc->upload_task = GNUNET_SCHEDULER_add_with_priority @@ -679,53 +680,105 @@ publish_content (struct GNUNET_FS_PublishContext *pc) /** - * Process the response (or lack thereof) from - * the "fs" service to our 'start index' request. + * Check the response from the "fs" service to our 'start index' + * request. + * + * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) + * @param msg the response we got + */ +static int +check_index_start_failed (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + size_t msize = ntohs (msg->size) - sizeof (*msg); + const char *emsg = (const char *) &msg[1]; + + if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0') + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Process the response from the "fs" service to our 'start index' + * request. * * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) * @param msg the response we got */ static void -process_index_start_response (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_index_start_failed (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; - const char *emsg; - uint16_t msize; + const char *emsg = (const char *) &msg[1]; - GNUNET_CLIENT_disconnect (pc->client); - pc->client = NULL; + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; p = pc->fi_pos; - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Can not index file `%s': %s. Will try to insert instead.\n"), - p->filename, - _("timeout on index-start request to `fs' service")); - p->data.file.do_index = GNUNET_NO; - GNUNET_FS_file_information_sync_ (p); - publish_content (pc); - return; - } - if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Can not index file `%s': %s. Will try to insert instead.\n"), + p->filename, + gettext (emsg)); + p->data.file.do_index = GNUNET_NO; + GNUNET_FS_file_information_sync_ (p); + publish_content (pc); +} + + +/** + * Process the response from the "fs" service to our 'start index' + * request. + * + * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) + * @param msg the response we got + */ +static void +handle_index_start_ok (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_FS_PublishContext *pc = cls; + struct GNUNET_FS_FileInformation *p; + + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; + p = pc->fi_pos; + p->data.file.index_start_confirmed = GNUNET_YES; + GNUNET_FS_file_information_sync_ (p); + publish_content (pc); +} + + +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_FS_PublishContext *` + * @param error error code + */ +static void +index_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_FS_PublishContext *pc = cls; + struct GNUNET_FS_FileInformation *p; + + if (NULL != pc->mq) { - msize = ntohs (msg->size); - emsg = (const char *) &msg[1]; - if ((msize <= sizeof (struct GNUNET_MessageHeader)) || - (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')) - emsg = gettext_noop ("unknown error"); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Can not index file `%s': %s. Will try to insert instead.\n"), - p->filename, gettext (emsg)); - p->data.file.do_index = GNUNET_NO; - GNUNET_FS_file_information_sync_ (p); - publish_content (pc); - return; + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; } - p->data.file.index_start_confirmed = GNUNET_YES; - /* success! continue with indexing */ + p = pc->fi_pos; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Can not index file `%s': %s. Will try to insert instead.\n"), + p->filename, + _("error on index-start request to `fs' service")); + p->data.file.do_index = GNUNET_NO; GNUNET_FS_file_information_sync_ (p); publish_content (pc); } @@ -742,11 +795,22 @@ static void hash_for_index_cb (void *cls, const struct GNUNET_HashCode *res) { + GNUNET_MQ_hd_fixed_size (index_start_ok, + GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_var_size (index_start_failed, + GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED, + struct GNUNET_MessageHeader); struct GNUNET_FS_PublishContext *pc = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_index_start_ok_handler (pc), + make_index_start_failed_handler (pc), + GNUNET_MQ_handler_end () + }; struct GNUNET_FS_FileInformation *p; + struct GNUNET_MQ_Envelope *env; struct IndexStartMessage *ism; size_t slen; - struct GNUNET_CLIENT_Connection *client; uint64_t dev; uint64_t ino; char *fn; @@ -785,8 +849,10 @@ hash_for_index_cb (void *cls, publish_content (pc); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n", - p->filename, GNUNET_h2s (res)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Hash of indexed file `%s' is `%s'\n", + p->filename, + GNUNET_h2s (res)); if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY)) { p->data.file.file_id = *res; @@ -797,8 +863,12 @@ hash_for_index_cb (void *cls, GNUNET_free (fn); return; } - client = GNUNET_CLIENT_connect ("fs", pc->h->cfg); - if (NULL == client) + pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg, + "fs", + handlers, + &index_mq_error_handler, + pc); + if (NULL == pc->mq) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Can not index file `%s': %s. Will try to insert instead.\n"), @@ -815,10 +885,13 @@ hash_for_index_cb (void *cls, p->data.file.have_hash = GNUNET_YES; GNUNET_FS_file_information_sync_ (p); } - ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen); - ism->header.size = htons (sizeof (struct IndexStartMessage) + slen); - ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START); - if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino)) + env = GNUNET_MQ_msg_extra (ism, + slen, + GNUNET_MESSAGE_TYPE_FS_INDEX_START); + if (GNUNET_OK == + GNUNET_DISK_file_get_identifiers (p->filename, + &dev, + &ino)) { ism->device = GNUNET_htonll (dev); ism->inode = GNUNET_htonll (ino); @@ -826,19 +899,16 @@ hash_for_index_cb (void *cls, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to get file identifiers for `%s'\n"), p->filename); + _("Failed to get file identifiers for `%s'\n"), + p->filename); } ism->file_id = *res; - memcpy (&ism[1], fn, slen); + memcpy (&ism[1], + fn, + slen); GNUNET_free (fn); - pc->client = client; - GNUNET_break (GNUNET_YES == - GNUNET_CLIENT_transmit_and_get_response (client, &ism->header, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &process_index_start_response, - pc)); - GNUNET_free (ism); + GNUNET_MQ_send (pc->mq, + env); } @@ -862,7 +932,8 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc) p->chk_uri, &p->bo, pc->options, - &publish_kblocks_cont, pc); + &publish_kblocks_cont, + pc); } else { @@ -872,40 +943,24 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc) /** - * Process the response (or lack thereof) from - * the "fs" service to our LOC sign request. + * Process the response from the "fs" service to our LOC sign request. * * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) - * @param msg the response we got + * @param sig the response we got */ static void -process_signature_response (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_signature_response (void *cls, + const struct ResponseLocSignatureMessage *sig) { struct GNUNET_FS_PublishContext *pc = cls; - const struct ResponseLocSignatureMessage *sig; struct GNUNET_FS_FileInformation *p; p = pc->fi_pos; - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Can not create LOC URI. Will continue with CHK instead.\n")); - publish_kblocks (pc); - return; - } - if (sizeof (struct ResponseLocSignatureMessage) != - ntohs (msg->size)) - { - GNUNET_break (0); - publish_kblocks (pc); - return; - } - sig = (const struct ResponseLocSignatureMessage *) msg; p->chk_uri->type = GNUNET_FS_URI_LOC; /* p->data.loc.fi kept from CHK before */ p->chk_uri->data.loc.peer = sig->peer; - p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh (sig->expiration_time); + p->chk_uri->data.loc.expirationTime + = GNUNET_TIME_absolute_ntoh (sig->expiration_time); p->chk_uri->data.loc.contentSignature = sig->signature; GNUNET_FS_file_information_sync_ (p); GNUNET_FS_publish_sync_ (pc); @@ -913,6 +968,31 @@ process_signature_response (void *cls, } +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_FS_PublishContext *` + * @param error error code + */ +static void +loc_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_FS_PublishContext *pc = cls; + + if (NULL != pc->mq) + { + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Can not create LOC URI. Will continue with CHK instead.\n")); + publish_kblocks (pc); +} + + /** * We're publishing without anonymity. Contact the FS service * to create a signed LOC URI for further processing, then @@ -923,12 +1003,25 @@ process_signature_response (void *cls, static void create_loc_uri (struct GNUNET_FS_PublishContext *pc) { - struct RequestLocSignatureMessage req; + GNUNET_MQ_hd_fixed_size (signature_response, + GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE, + struct ResponseLocSignatureMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_signature_response_handler (pc), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct RequestLocSignatureMessage *req; struct GNUNET_FS_FileInformation *p; - if (NULL == pc->client) - pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg); - if (NULL == pc->client) + if (NULL != pc->mq) + GNUNET_MQ_destroy (pc->mq); + pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg, + "fs", + handlers, + &loc_mq_error_handler, + pc); + if (NULL == pc->mq) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Can not create LOC URI. Will continue with CHK instead.\n")); @@ -936,19 +1029,14 @@ create_loc_uri (struct GNUNET_FS_PublishContext *pc) return; } p = pc->fi_pos; - req.header.size = htons (sizeof (struct RequestLocSignatureMessage)); - req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN); - req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT); - req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time); - req.chk = p->chk_uri->data.chk.chk; - req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length); - GNUNET_break (GNUNET_YES == - GNUNET_CLIENT_transmit_and_get_response (pc->client, - &req.header, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &process_signature_response, - pc)); + env = GNUNET_MQ_msg (req, + GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN); + req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT); + req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time); + req->chk = p->chk_uri->data.chk.chk; + req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length); + GNUNET_MQ_send (pc->mq, + env); } -- 2.25.1