From e913da8373e0707ef43c38c85e457af85498b143 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 3 May 2010 19:52:40 +0000 Subject: [PATCH] download deserialization and not keeping a file handle open per active download --- TODO | 3 +- src/fs/fs.c | 229 +++++++++++++++++++++++++++++-------------- src/fs/fs.h | 17 ++-- src/fs/fs_download.c | 181 ++++++++++++++++++---------------- 4 files changed, 266 insertions(+), 164 deletions(-) diff --git a/TODO b/TODO index 9469382ce..69727e0ba 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,5 @@ 0.9.0pre1: * FS: [CG] - - resume signalling for search/download must be recursive! - - deserialization code (download) - serialization code (download) - linking of downloads to searches (expose opaque struct SearchResult; allow starting download based on search result (new API): @@ -11,6 +9,7 @@ => expose link to search result in download events (including search result's client-info pointer!) - generate SUSPEND events (publish, unindex, search, download) + - fix 'start_time' time stamps (search, publish, unindex, download) - actually call 'sync' functions (publish, unindex, search, download) - code review: => refactor fs.c to join common code segments! diff --git a/src/fs/fs.c b/src/fs/fs.c index 053fd3dd0..93990aafa 100644 --- a/src/fs/fs.c +++ b/src/fs/fs.c @@ -28,6 +28,7 @@ #include "gnunet_util_lib.h" #include "gnunet_fs_service.h" #include "fs.h" +#include "fs_tree.h" /** @@ -1766,6 +1767,71 @@ deserialize_subdownload (void *cls, } +/** + * Send the 'resume' signal to the callback; also actually + * resume the download (put it in the queue). Does this + * recursively for the top-level download and all child + * downloads. + * + * @param dc download to resume + */ +static void +signal_download_resume (struct GNUNET_FS_DownloadContext *dc) +{ + struct GNUNET_FS_DownloadContext *dcc; + struct GNUNET_FS_ProgressInfo pi; + + pi.status = GNUNET_FS_STATUS_DOWNLOAD_RESUME; + pi.value.download.specifics.resume.meta = dc->meta; + pi.value.download.specifics.resume.message = dc->emsg; + GNUNET_FS_download_make_status_ (&pi, + dc); + dcc = dc->child_head; + while (NULL != dcc) + { + signal_download_resume (dcc); + dcc = dcc->next; + } + if (dc->pending != NULL) + GNUNET_FS_download_start_downloading_ (dc); +} + + +/** + * Free this download context and all of its descendants. + * (only works during deserialization since not all possible + * state it taken care of). + * + * @param dc context to free + */ +static void +free_download_context (struct GNUNET_FS_DownloadContext *dc) +{ + struct GNUNET_FS_DownloadContext *dcc; + struct DownloadRequest *dr; + if (dc->meta != NULL) + GNUNET_CONTAINER_meta_data_destroy (dc->meta); + if (dc->uri != NULL) + GNUNET_FS_uri_destroy (dc->uri); + GNUNET_free_non_null (dc->temp_filename); + GNUNET_free_non_null (dc->emsg); + GNUNET_free_non_null (dc->filename); + while (NULL != (dcc = dc->child_head)) + { + GNUNET_CONTAINER_DLL_remove (dc->child_head, + dc->child_tail, + dcc); + free_download_context (dcc); + } + while (NULL != (dr = dc->pending)) + { + dc->pending = dr->next; + GNUNET_free (dr); + } + GNUNET_free (dc); +} + + /** * Deserialize a download. * @@ -1781,41 +1847,81 @@ deserialize_download (struct GNUNET_FS_Handle *h, const char *serialization) { struct GNUNET_FS_DownloadContext *dc; - struct GNUNET_FS_DownloadContext *dcc; + struct DownloadRequest *dr; char pbuf[32]; - struct GNUNET_FS_ProgressInfo pi; char *emsg; char *uris; char *dn; + uint32_t options; + uint32_t status; + uint32_t num_pending; uris = NULL; emsg = NULL; + dr = NULL; dc = GNUNET_malloc (sizeof (struct GNUNET_FS_DownloadContext)); dc->parent = parent; dc->h = h; dc->serialization = GNUNET_strdup (serialization); -#if 0 - /* FIXME */ if ( (GNUNET_OK != - GNUNET_BIO_read_string (rh, "-uri", &uris, 10*1024)) || - (NULL == (sc->uri = GNUNET_FS_uri_parse (uris, &emsg))) || - ( (GNUNET_YES != GNUNET_FS_uri_test_ksk (sc->uri)) && - (GNUNET_YES != GNUNET_FS_uri_test_sks (sc->uri)) ) || + GNUNET_BIO_read_string (rh, "download-uri", &uris, 10*1024)) || + (NULL == (dc->uri = GNUNET_FS_uri_parse (uris, &emsg))) || + ( (GNUNET_YES != GNUNET_FS_uri_test_chk (dc->uri)) && + (GNUNET_YES != GNUNET_FS_uri_test_loc (dc->uri)) ) || (GNUNET_OK != - GNUNET_BIO_read_int64 (rh, &sc->start_time.value)) || + GNUNET_BIO_read_meta_data (rh, "download-meta", &dc->meta)) || (GNUNET_OK != - GNUNET_BIO_read_string (rh, "search-emsg", &sc->emsg, 10*1024)) || + GNUNET_BIO_read_string (rh, "download-emsg", &dc->emsg, 10*1024)) || + (GNUNET_OK != + GNUNET_BIO_read_string (rh, "download-fn", &dc->filename, 10*1024)) || + (GNUNET_OK != + GNUNET_BIO_read_string (rh, "download-tfn", &dc->temp_filename, 10*1024)) || + (GNUNET_OK != + GNUNET_BIO_read_int64 (rh, &dc->old_file_size)) || + (GNUNET_OK != + GNUNET_BIO_read_int64 (rh, &dc->offset)) || + (GNUNET_OK != + GNUNET_BIO_read_int64 (rh, &dc->length)) || + (GNUNET_OK != + GNUNET_BIO_read_int64 (rh, &dc->completed)) || + (GNUNET_OK != + GNUNET_BIO_read_int64 (rh, &dc->start_time.value)) || + (GNUNET_OK != + GNUNET_BIO_read_int32 (rh, &dc->anonymity)) || (GNUNET_OK != GNUNET_BIO_read_int32 (rh, &options)) || (GNUNET_OK != - GNUNET_BIO_read (rh, "search-pause", &in_pause, sizeof (in_pause))) || + GNUNET_BIO_read_int32 (rh, &status)) || (GNUNET_OK != - GNUNET_BIO_read_int32 (rh, &sc->anonymity)) ) + GNUNET_BIO_read_int32 (rh, &num_pending)) ) goto cleanup; /* FIXME: adjust start_time.value */ - sc->options = (enum GNUNET_FS_SearchOptions) options; - sc->master_result_map = GNUNET_CONTAINER_multihashmap_create (16); -#endif + dc->options = (enum GNUNET_FS_DownloadOptions) options; + dc->active = GNUNET_CONTAINER_multihashmap_create (16); + dc->has_finished = (int) status; + dc->treedepth = GNUNET_FS_compute_depth (GNUNET_FS_uri_chk_get_file_size (dc->uri)); + if (GNUNET_FS_uri_test_loc (dc->uri)) + GNUNET_assert (GNUNET_OK == + GNUNET_FS_uri_loc_get_peer_identity (dc->uri, + &dc->target)); + if ( (dc->length > dc->completed) && + (num_pending == 0) ) + goto cleanup; + while (0 < num_pending--) + { + dr = GNUNET_malloc (sizeof (struct DownloadRequest)); + if ( (GNUNET_OK != + GNUNET_BIO_read (rh, "chk", &dr->chk, sizeof (struct ContentHashKey))) || + (GNUNET_OK != + GNUNET_BIO_read_int64 (rh, &dr->offset)) || + (GNUNET_OK != + GNUNET_BIO_read_int32 (rh, &dr->depth)) ) + goto cleanup; + dr->is_pending = GNUNET_YES; + dr->next = dc->pending; + dc->pending = dr; + dr = NULL; + } GNUNET_snprintf (pbuf, sizeof (pbuf), "%s%s%s", @@ -1828,50 +1934,43 @@ deserialize_download (struct GNUNET_FS_Handle *h, GNUNET_DISK_directory_scan (dn, &deserialize_subdownload, dc); GNUNET_free (dn); } -#if 0 - if ('\0' == in_pause) - { - if (GNUNET_OK != - GNUNET_FS_search_start_searching_ (sc)) - goto cleanup; - } -#endif - if (0) - goto cleanup; if (parent != NULL) GNUNET_CONTAINER_DLL_insert (parent->child_head, parent->child_tail, dc); - pi.status = GNUNET_FS_STATUS_DOWNLOAD_RESUME; -#if 0 - pi.value.search.specifics.resume.message = sc->emsg; - pi.value.search.specifics.resume.is_paused = ('\0' == in_pause) ? GNUNET_NO : GNUNET_YES; -#endif - GNUNET_FS_download_make_status_ (&pi, - dc); - dcc = dc->child_head; - while (NULL != dcc) - { - /* FIXME: wrong, need recursion! */ - pi.status = GNUNET_FS_STATUS_DOWNLOAD_RESUME; -#if 0 - pi.value.search.specifics.resume.message = scc->emsg; - pi.value.search.specifics.resume.is_paused = ('\0' == in_pause) ? GNUNET_NO : GNUNET_YES; -#endif - GNUNET_FS_download_make_status_ (&pi, - dcc); - dcc = dcc->next; - } -#if 0 + signal_download_resume (dc); GNUNET_free (uris); -#endif return; cleanup: -#if 0 - GNUNET_free_non_null (emsg); - free_search_context (sc); -#endif GNUNET_free_non_null (uris); + GNUNET_free_non_null (dr); + free_download_context (dc); +} + + +/** + * Signal resuming of a search to our clients (for the + * top level search and all sub-searches). + * + * @param sc search being resumed + */ +static void +signal_search_resume (struct GNUNET_FS_SearchContext *sc) +{ + struct GNUNET_FS_SearchContext *scc; + struct GNUNET_FS_ProgressInfo pi; + + pi.status = GNUNET_FS_STATUS_SEARCH_RESUME; + pi.value.search.specifics.resume.message = sc->emsg; + pi.value.search.specifics.resume.is_paused = (sc->client == NULL) ? GNUNET_YES : GNUNET_NO; + sc->client_info = GNUNET_FS_search_make_status_ (&pi, + sc); + scc = sc->child_head; + while (NULL != scc) + { + signal_search_resume (scc); + scc = scc->next; + } } @@ -1893,7 +1992,6 @@ deserialize_search (struct GNUNET_FS_Handle *h, struct GNUNET_FS_SearchContext *scc; struct GNUNET_BIO_ReadHandle *rhc; char pbuf[32]; - struct GNUNET_FS_ProgressInfo pi; char *emsg; char *uris; char *child_ser; @@ -1938,11 +2036,12 @@ deserialize_search (struct GNUNET_FS_Handle *h, GNUNET_DISK_directory_scan (dn, &deserialize_search_result, sc); GNUNET_free (dn); } - if ('\0' == in_pause) + if ( ('\0' == in_pause) && + (GNUNET_OK != + GNUNET_FS_search_start_searching_ (sc)) ) { - if (GNUNET_OK != - GNUNET_FS_search_start_searching_ (sc)) - goto cleanup; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Could not resume running search, will resume as paused search\n")); } while (1) { @@ -1976,23 +2075,7 @@ deserialize_search (struct GNUNET_FS_Handle *h, GNUNET_CONTAINER_DLL_insert (parent->child_head, parent->child_tail, sc); - pi.status = GNUNET_FS_STATUS_SEARCH_RESUME; - pi.value.search.specifics.resume.message = sc->emsg; - pi.value.search.specifics.resume.is_paused = ('\0' == in_pause) ? GNUNET_NO : GNUNET_YES; - sc->client_info = GNUNET_FS_search_make_status_ (&pi, - sc); - scc = sc->child_head; - while (NULL != scc) - { - /* FIXME: wrong, need recursion! */ - pi.status = GNUNET_FS_STATUS_SEARCH_RESUME; - pi.value.search.specifics.resume.message = scc->emsg; - pi.value.search.specifics.resume.is_paused = ('\0' == in_pause) ? GNUNET_NO : GNUNET_YES; - scc->client_info = GNUNET_FS_search_make_status_ (&pi, - scc); - - scc = scc->next; - } + signal_search_resume (sc); GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, &signal_result_resume, sc); diff --git a/src/fs/fs.h b/src/fs/fs.h index d3960c265..9b3a3902d 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -822,6 +822,15 @@ GNUNET_FS_unindex_do_remove_ (struct GNUNET_FS_UnindexContext *uc); int GNUNET_FS_search_start_searching_ (struct GNUNET_FS_SearchContext *sc); +/** + * Start the downloading process (by entering the queue). + * + * @param dc our download context + */ +void +GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc); + + /** * Start download probes for the given search result. * @@ -1382,7 +1391,7 @@ struct DownloadRequest /** * Set if this request is currently in the linked list of pending * requests. Needed in case we get a response for a request that we - * have not yet send (due to FS bug or two blocks with identical + * have not yet send (i.e. due to two blocks with identical * content); in this case, we would need to remove the block from * the pending list (and need a fast way to check if the block is on * it). @@ -1486,12 +1495,6 @@ struct GNUNET_FS_DownloadContext */ struct DownloadRequest *pending; - /** - * The file handle, NULL if we don't create - * a file. - */ - struct GNUNET_DISK_FileHandle *handle; - /** * Non-NULL if we are currently having a request for * transmission pending with the client handle. diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 468b255ee..25ae2002a 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -273,6 +273,7 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, char block[DBLOCK_SIZE]; GNUNET_HashCode key; struct ProcessResultClosure prc; + struct GNUNET_DISK_FileHandle *fh; #if DEBUG_DOWNLOAD GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -301,15 +302,21 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, &chk->query, sm, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - + fh = NULL; if ( (dc->old_file_size > off) && - (dc->handle != NULL) && + (dc->filename != NULL) ) + { + fh = GNUNET_DISK_file_open (dc->filename, + GNUNET_DISK_OPEN_READ, + GNUNET_DISK_PERM_NONE); + } + if ( (fh != NULL) && (off == - GNUNET_DISK_file_seek (dc->handle, + GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET) ) && (len == - GNUNET_DISK_file_read (dc->handle, + GNUNET_DISK_file_read (fh, block, len)) ) { @@ -353,6 +360,8 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, return; } } + if (fh != NULL) + GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); if (depth < dc->treedepth) { // FIXME: try if we could @@ -828,6 +837,7 @@ process_result_with_request (void *cls, struct DownloadRequest *sm = value; struct DownloadRequest *ppos; struct DownloadRequest *pprev; + struct GNUNET_DISK_FileHandle *fh; struct GNUNET_FS_DownloadContext *dc = prc->dc; struct GNUNET_CRYPTO_AesSessionKey skey; struct GNUNET_CRYPTO_AesInitializationVector iv; @@ -838,13 +848,13 @@ process_result_with_request (void *cls, size_t app; int i; struct ContentHashKey *chk; - char *emsg; + fh = NULL; bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll (dc->uri->data.chk.file_length), dc->treedepth, sm->offset, sm->depth); - if (prc->size != bs) + if (prc->size != bs) { #if DEBUG_DOWNLOAD GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -853,19 +863,7 @@ process_result_with_request (void *cls, prc->size); #endif dc->emsg = GNUNET_strdup ("Internal error or bogus download URI"); - /* signal error */ - pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; - pi.value.download.specifics.error.message = dc->emsg; - GNUNET_FS_download_make_status_ (&pi, dc); - /* abort all pending requests */ - if (NULL != dc->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); - dc->th = NULL; - } - GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO); - dc->client = NULL; - return GNUNET_NO; + goto signal_error; } GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (dc->active, @@ -899,55 +897,68 @@ process_result_with_request (void *cls, dc->treedepth); /* save to disk */ if ( ( GNUNET_YES == prc->do_store) && - (NULL != dc->handle) && + ( (dc->filename != NULL) || + (is_recursive_download (dc)) ) && ( (sm->depth == dc->treedepth) || (0 == (dc->options & GNUNET_FS_DOWNLOAD_NO_TEMPORARIES)) ) ) { - emsg = NULL; + fh = GNUNET_DISK_file_open (dc->filename != NULL + ? dc->filename + : dc->temp_filename, + GNUNET_DISK_OPEN_READWRITE | + GNUNET_DISK_OPEN_CREATE, + GNUNET_DISK_PERM_USER_READ | + GNUNET_DISK_PERM_USER_WRITE | + GNUNET_DISK_PERM_GROUP_READ | + GNUNET_DISK_PERM_OTHER_READ); + } + if ( (NULL == fh) && + (GNUNET_YES == prc->do_store) && + ( (dc->filename != NULL) || + (is_recursive_download (dc)) ) && + ( (sm->depth == dc->treedepth) || + (0 == (dc->options & GNUNET_FS_DOWNLOAD_NO_TEMPORARIES)) ) ) + { + GNUNET_asprintf (&dc->emsg, + _("Download failed: could not open file `%s': %s\n"), + dc->filename, + STRERROR (errno)); + goto signal_error; + } + if (fh != NULL) + { #if DEBUG_DOWNLOAD GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Saving decrypted block to disk at offset %llu\n", (unsigned long long) off); #endif if ( (off != - GNUNET_DISK_file_seek (dc->handle, + GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET) ) ) - GNUNET_asprintf (&emsg, - _("Failed to seek to offset %llu in file `%s': %s\n"), - (unsigned long long) off, - dc->filename, - STRERROR (errno)); - else if (prc->size != - GNUNET_DISK_file_write (dc->handle, - pt, - prc->size)) - GNUNET_asprintf (&emsg, - _("Failed to write block of %u bytes at offset %llu in file `%s': %s\n"), - (unsigned int) prc->size, - (unsigned long long) off, - dc->filename, - STRERROR (errno)); - if (NULL != emsg) { - dc->emsg = emsg; - // FIXME: make persistent - - /* signal error */ - pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; - pi.value.download.specifics.error.message = emsg; - GNUNET_FS_download_make_status_ (&pi, dc); - /* abort all pending requests */ - if (NULL != dc->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); - dc->th = NULL; - } - GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO); - dc->client = NULL; - GNUNET_free (sm); - return GNUNET_NO; + GNUNET_asprintf (&dc->emsg, + _("Failed to seek to offset %llu in file `%s': %s\n"), + (unsigned long long) off, + dc->filename, + STRERROR (errno)); + goto signal_error; + } + if (prc->size != + GNUNET_DISK_file_write (fh, + pt, + prc->size)) + { + GNUNET_asprintf (&dc->emsg, + _("Failed to write block of %u bytes at offset %llu in file `%s': %s\n"), + (unsigned int) prc->size, + (unsigned long long) off, + dc->filename, + STRERROR (errno)); + goto signal_error; } + GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); + fh = NULL; } if (sm->depth == dc->treedepth) { @@ -994,10 +1005,8 @@ process_result_with_request (void *cls, (unsigned long long) GNUNET_ntohll (dc->uri->data.chk.file_length)); #endif /* truncate file to size (since we store IBlocks at the end) */ - if (dc->handle != NULL) + if (dc->filename != NULL) { - GNUNET_DISK_file_close (dc->handle); - dc->handle = NULL; if (0 != truncate (dc->filename, GNUNET_ntohll (dc->uri->data.chk.file_length))) GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, @@ -1050,6 +1059,23 @@ process_result_with_request (void *cls, } GNUNET_free (sm); return GNUNET_YES; + + signal_error: + if (fh != NULL) + GNUNET_DISK_file_close (fh); + pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; + pi.value.download.specifics.error.message = dc->emsg; + GNUNET_FS_download_make_status_ (&pi, dc); + /* abort all pending requests */ + if (NULL != dc->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); + dc->th = NULL; + } + GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO); + dc->client = NULL; + GNUNET_free (sm); + return GNUNET_NO; } @@ -1435,25 +1461,6 @@ GNUNET_FS_download_start (struct GNUNET_FS_Handle *h, GNUNET_DISK_file_size (filename, &dc->old_file_size, GNUNET_YES); - dc->handle = GNUNET_DISK_file_open (filename, - GNUNET_DISK_OPEN_READWRITE | - GNUNET_DISK_OPEN_CREATE, - GNUNET_DISK_PERM_USER_READ | - GNUNET_DISK_PERM_USER_WRITE | - GNUNET_DISK_PERM_GROUP_READ | - GNUNET_DISK_PERM_OTHER_READ); - if (dc->handle == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Download failed: could not open file `%s': %s\n"), - dc->filename, - STRERROR (errno)); - GNUNET_CONTAINER_meta_data_destroy (dc->meta); - GNUNET_FS_uri_destroy (dc->uri); - GNUNET_free (dc->filename); - GNUNET_free (dc); - return NULL; - } } // FIXME: set "dc->target" for LOC uris! dc->offset = offset; @@ -1484,12 +1491,24 @@ GNUNET_FS_download_start (struct GNUNET_FS_Handle *h, &dc->uri->data.chk.chk, 0, 1 /* 0 == CHK, 1 == top */); - dc->job_queue = GNUNET_FS_queue_ (h, + GNUNET_FS_download_start_downloading_ (dc); + return dc; +} + + +/** + * Start the downloading process (by entering the queue). + * + * @param dc our download context + */ +void +GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc) +{ + dc->job_queue = GNUNET_FS_queue_ (dc->h, &activate_fs_download, &deactivate_fs_download, dc, - (length + DBLOCK_SIZE-1) / DBLOCK_SIZE); - return dc; + (dc->length + DBLOCK_SIZE-1) / DBLOCK_SIZE); } @@ -1548,8 +1567,6 @@ GNUNET_FS_download_stop (struct GNUNET_FS_DownloadContext *dc, GNUNET_CONTAINER_multihashmap_destroy (dc->active); if (dc->filename != NULL) { - if (NULL != dc->handle) - GNUNET_DISK_file_close (dc->handle); if ( (dc->completed != dc->length) && (GNUNET_YES == do_delete) ) { -- 2.25.1