X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Ffs_publish.c;h=75eaef6f1c97b317898243148758301b27670ece;hb=4e29ecde9b3ad3e34af359f18b6679c06b17ce78;hp=138c0e96c1c803ec95ab9f72112a9e4337c0f36e;hpb=f27338992f0a5915ee974faea05f764c2df6f584;p=oweals%2Fgnunet.git diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c index 138c0e96c..75eaef6f1 100644 --- a/src/fs/fs_publish.c +++ b/src/fs/fs_publish.c @@ -1,21 +1,16 @@ /* This file is part of GNUnet. - Copyright (C) 2009, 2010 Christian Grothoff (and other contributing authors) + Copyright (C) 2009, 2010 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. + Affero General Public License for more details. */ /** * @file fs/fs_publish.c @@ -92,10 +87,10 @@ publish_cleanup (struct GNUNET_FS_PublishContext *pc) GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO); pc->dsh = NULL; } - if (NULL != pc->client) + if (NULL != pc->mq) { - GNUNET_CLIENT_disconnect (pc->client); - pc->client = NULL; + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; } GNUNET_assert (NULL == pc->upload_task); GNUNET_free (pc); @@ -143,8 +138,6 @@ ds_put_cont (void *cls, } return; } - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "SOME DONE!\n"); pc->any_done = GNUNET_YES; GNUNET_assert (NULL == pc->upload_task); pc->upload_task = @@ -268,7 +261,6 @@ publish_sblocks_cont (void *cls, { pc->qre = GNUNET_DATASTORE_release_reserve (pc->dsh, pc->rid, UINT_MAX, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, &finish_release_reserve, pc); } else @@ -383,7 +375,7 @@ block_reader (void *cls, { pt_size = GNUNET_MIN (max, p->data.dir.dir_size - offset); dd = p->data.dir.dir_data; - memcpy (buf, &dd[offset], pt_size); + GNUNET_memcpy (buf, &dd[offset], pt_size); } else { @@ -415,11 +407,9 @@ block_reader (void *cls, * the final result. * * @param cls our publishing context - * @param tc scheduler's task context (not used) */ static void -encode_cont (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +encode_cont (void *cls) { struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; @@ -498,7 +488,8 @@ block_proc (void *cls, p = pc->fi_pos; if (NULL == pc->dsh) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Waiting for datastore connection\n"); GNUNET_assert (NULL == pc->upload_task); pc->upload_task = GNUNET_SCHEDULER_add_with_priority @@ -512,8 +503,9 @@ block_proc (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Indexing block `%s' for offset %llu with index size %u\n", - GNUNET_h2s (&chk->query), (unsigned long long) offset, - sizeof (struct OnDemandBlock)); + GNUNET_h2s (&chk->query), + (unsigned long long) offset, + (unsigned int) sizeof (struct OnDemandBlock)); odb.offset = GNUNET_htonll (offset); odb.file_id = p->data.file.file_id; GNUNET_assert (pc->qre == NULL); @@ -529,7 +521,6 @@ block_proc (void *cls, p->bo.replication_level, p->bo.expiration_time, -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ds_put_cont, pc); return; } @@ -550,7 +541,6 @@ block_proc (void *cls, p->bo.replication_level, p->bo.expiration_time, -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ds_put_cont, pc); } @@ -685,53 +675,105 @@ publish_content (struct GNUNET_FS_PublishContext *pc) /** - * Process the response (or lack thereof) from - * the "fs" service to our 'start index' request. + * Check the response from the "fs" service to our 'start index' + * request. + * + * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) + * @param msg the response we got + */ +static int +check_index_start_failed (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + size_t msize = ntohs (msg->size) - sizeof (*msg); + const char *emsg = (const char *) &msg[1]; + + if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0') + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Process the response from the "fs" service to our 'start index' + * request. * * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) * @param msg the response we got */ static void -process_index_start_response (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_index_start_failed (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; - const char *emsg; - uint16_t msize; + const char *emsg = (const char *) &msg[1]; - GNUNET_CLIENT_disconnect (pc->client); - pc->client = NULL; + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; p = pc->fi_pos; - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Can not index file `%s': %s. Will try to insert instead.\n"), - p->filename, - _("timeout on index-start request to `fs' service")); - p->data.file.do_index = GNUNET_NO; - GNUNET_FS_file_information_sync_ (p); - publish_content (pc); - return; - } - if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Can not index file `%s': %s. Will try to insert instead.\n"), + p->filename, + gettext (emsg)); + p->data.file.do_index = GNUNET_NO; + GNUNET_FS_file_information_sync_ (p); + publish_content (pc); +} + + +/** + * Process the response from the "fs" service to our 'start index' + * request. + * + * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) + * @param msg the response we got + */ +static void +handle_index_start_ok (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_FS_PublishContext *pc = cls; + struct GNUNET_FS_FileInformation *p; + + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; + p = pc->fi_pos; + p->data.file.index_start_confirmed = GNUNET_YES; + GNUNET_FS_file_information_sync_ (p); + publish_content (pc); +} + + +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_FS_PublishContext *` + * @param error error code + */ +static void +index_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_FS_PublishContext *pc = cls; + struct GNUNET_FS_FileInformation *p; + + if (NULL != pc->mq) { - msize = ntohs (msg->size); - emsg = (const char *) &msg[1]; - if ((msize <= sizeof (struct GNUNET_MessageHeader)) || - (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')) - emsg = gettext_noop ("unknown error"); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Can not index file `%s': %s. Will try to insert instead.\n"), - p->filename, gettext (emsg)); - p->data.file.do_index = GNUNET_NO; - GNUNET_FS_file_information_sync_ (p); - publish_content (pc); - return; + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; } - p->data.file.index_start_confirmed = GNUNET_YES; - /* success! continue with indexing */ + p = pc->fi_pos; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Can not index file `%s': %s. Will try to insert instead.\n"), + p->filename, + _("error on index-start request to `fs' service")); + p->data.file.do_index = GNUNET_NO; GNUNET_FS_file_information_sync_ (p); publish_content (pc); } @@ -749,10 +791,21 @@ hash_for_index_cb (void *cls, const struct GNUNET_HashCode *res) { struct GNUNET_FS_PublishContext *pc = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_fixed_size (index_start_ok, + GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK, + struct GNUNET_MessageHeader, + pc), + GNUNET_MQ_hd_var_size (index_start_failed, + GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED, + struct GNUNET_MessageHeader, + pc), + GNUNET_MQ_handler_end () + }; struct GNUNET_FS_FileInformation *p; + struct GNUNET_MQ_Envelope *env; struct IndexStartMessage *ism; size_t slen; - struct GNUNET_CLIENT_Connection *client; uint64_t dev; uint64_t ino; char *fn; @@ -779,7 +832,7 @@ hash_for_index_cb (void *cls, GNUNET_assert (fn != NULL); slen = strlen (fn) + 1; if (slen >= - GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct IndexStartMessage)) + GNUNET_MAX_MESSAGE_SIZE - sizeof (struct IndexStartMessage)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _ @@ -791,8 +844,10 @@ hash_for_index_cb (void *cls, publish_content (pc); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n", - p->filename, GNUNET_h2s (res)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Hash of indexed file `%s' is `%s'\n", + p->filename, + GNUNET_h2s (res)); if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY)) { p->data.file.file_id = *res; @@ -803,8 +858,12 @@ hash_for_index_cb (void *cls, GNUNET_free (fn); return; } - client = GNUNET_CLIENT_connect ("fs", pc->h->cfg); - if (NULL == client) + pc->mq = GNUNET_CLIENT_connect (pc->h->cfg, + "fs", + handlers, + &index_mq_error_handler, + pc); + if (NULL == pc->mq) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Can not index file `%s': %s. Will try to insert instead.\n"), @@ -821,10 +880,13 @@ hash_for_index_cb (void *cls, p->data.file.have_hash = GNUNET_YES; GNUNET_FS_file_information_sync_ (p); } - ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen); - ism->header.size = htons (sizeof (struct IndexStartMessage) + slen); - ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START); - if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino)) + env = GNUNET_MQ_msg_extra (ism, + slen, + GNUNET_MESSAGE_TYPE_FS_INDEX_START); + if (GNUNET_OK == + GNUNET_DISK_file_get_identifiers (p->filename, + &dev, + &ino)) { ism->device = GNUNET_htonll (dev); ism->inode = GNUNET_htonll (ino); @@ -832,19 +894,16 @@ hash_for_index_cb (void *cls, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to get file identifiers for `%s'\n"), p->filename); + _("Failed to get file identifiers for `%s'\n"), + p->filename); } ism->file_id = *res; - memcpy (&ism[1], fn, slen); + GNUNET_memcpy (&ism[1], + fn, + slen); GNUNET_free (fn); - pc->client = client; - GNUNET_break (GNUNET_YES == - GNUNET_CLIENT_transmit_and_get_response (client, &ism->header, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &process_index_start_response, - pc)); - GNUNET_free (ism); + GNUNET_MQ_send (pc->mq, + env); } @@ -868,7 +927,8 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc) p->chk_uri, &p->bo, pc->options, - &publish_kblocks_cont, pc); + &publish_kblocks_cont, + pc); } else { @@ -878,40 +938,24 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc) /** - * Process the response (or lack thereof) from - * the "fs" service to our LOC sign request. + * Process the response from the "fs" service to our LOC sign request. * * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) - * @param msg the response we got + * @param sig the response we got */ static void -process_signature_response (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_signature_response (void *cls, + const struct ResponseLocSignatureMessage *sig) { struct GNUNET_FS_PublishContext *pc = cls; - const struct ResponseLocSignatureMessage *sig; struct GNUNET_FS_FileInformation *p; p = pc->fi_pos; - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Can not create LOC URI. Will continue with CHK instead.\n")); - publish_kblocks (pc); - return; - } - if (sizeof (struct ResponseLocSignatureMessage) != - ntohs (msg->size)) - { - GNUNET_break (0); - publish_kblocks (pc); - return; - } - sig = (const struct ResponseLocSignatureMessage *) msg; p->chk_uri->type = GNUNET_FS_URI_LOC; /* p->data.loc.fi kept from CHK before */ p->chk_uri->data.loc.peer = sig->peer; - p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh (sig->expiration_time); + p->chk_uri->data.loc.expirationTime + = GNUNET_TIME_absolute_ntoh (sig->expiration_time); p->chk_uri->data.loc.contentSignature = sig->signature; GNUNET_FS_file_information_sync_ (p); GNUNET_FS_publish_sync_ (pc); @@ -919,6 +963,31 @@ process_signature_response (void *cls, } +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_FS_PublishContext *` + * @param error error code + */ +static void +loc_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_FS_PublishContext *pc = cls; + + if (NULL != pc->mq) + { + GNUNET_MQ_destroy (pc->mq); + pc->mq = NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Can not create LOC URI. Will continue with CHK instead.\n")); + publish_kblocks (pc); +} + + /** * We're publishing without anonymity. Contact the FS service * to create a signed LOC URI for further processing, then @@ -929,12 +998,25 @@ process_signature_response (void *cls, static void create_loc_uri (struct GNUNET_FS_PublishContext *pc) { - struct RequestLocSignatureMessage req; + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_fixed_size (signature_response, + GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE, + struct ResponseLocSignatureMessage, + pc), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct RequestLocSignatureMessage *req; struct GNUNET_FS_FileInformation *p; - if (NULL == pc->client) - pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg); - if (NULL == pc->client) + if (NULL != pc->mq) + GNUNET_MQ_destroy (pc->mq); + pc->mq = GNUNET_CLIENT_connect (pc->h->cfg, + "fs", + handlers, + &loc_mq_error_handler, + pc); + if (NULL == pc->mq) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Can not create LOC URI. Will continue with CHK instead.\n")); @@ -942,19 +1024,14 @@ create_loc_uri (struct GNUNET_FS_PublishContext *pc) return; } p = pc->fi_pos; - req.header.size = htons (sizeof (struct RequestLocSignatureMessage)); - req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN); - req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT); - req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time); - req.chk = p->chk_uri->data.chk.chk; - req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length); - GNUNET_break (GNUNET_YES == - GNUNET_CLIENT_transmit_and_get_response (pc->client, - &req.header, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &process_signature_response, - pc)); + env = GNUNET_MQ_msg (req, + GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN); + req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT); + req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time); + req->chk = p->chk_uri->data.chk.chk; + req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length); + GNUNET_MQ_send (pc->mq, + env); } @@ -962,11 +1039,9 @@ create_loc_uri (struct GNUNET_FS_PublishContext *pc) * Main function that performs the upload. * * @param cls `struct GNUNET_FS_PublishContext *` identifies the upload - * @param tc task context */ void -GNUNET_FS_publish_main_ (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +GNUNET_FS_publish_main_ (void *cls) { struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_ProgressInfo pi; @@ -1146,7 +1221,7 @@ fip_signal_start (void *cls, { kc = GNUNET_FS_uri_ksk_get_keyword_count (*uri); pc->reserve_entries += kc; - pc->reserve_space += GNUNET_SERVER_MAX_MESSAGE_SIZE * kc; + pc->reserve_space += GNUNET_MAX_MESSAGE_SIZE * kc; } pi.status = GNUNET_FS_STATUS_PUBLISH_START; *client_info = GNUNET_FS_publish_make_status_ (&pi, pc, fi, 0);