From e74208cef672a3e2689329fdb76439b4b5db01f8 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 17 Jun 2011 13:32:17 +0000 Subject: [PATCH] cleaner datastore handling, fixing test failure --- src/fs/fs.c | 11 +- src/fs/fs_publish.c | 350 ++++++++++++++++----------- src/fs/test_fs_publish_persistence.c | 12 +- 3 files changed, 219 insertions(+), 154 deletions(-) diff --git a/src/fs/fs.c b/src/fs/fs.c index fdaa12205..046c4dbb0 100644 --- a/src/fs/fs.c +++ b/src/fs/fs.c @@ -1477,10 +1477,13 @@ deserialize_publish_file (void *cls, /* re-start publishing (if needed)... */ if (pc->all_done != GNUNET_YES) - pc->upload_task - = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, - &GNUNET_FS_publish_main_, - pc); + { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); + pc->upload_task + = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, + &GNUNET_FS_publish_main_, + pc); + } if (GNUNET_OK != GNUNET_BIO_read_close (rh, &emsg)) { diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c index 26d21bb0e..777450ec2 100644 --- a/src/fs/fs_publish.c +++ b/src/fs/fs_publish.c @@ -45,7 +45,7 @@ struct PutContCtx /** * Current publishing context. */ - struct GNUNET_FS_PublishContext *sc; + struct GNUNET_FS_PublishContext *pc; /** * Specific file with the block. @@ -69,18 +69,18 @@ struct PutContCtx * a publish event and call the callback. * * @param pi structure to fill in - * @param sc overall publishing context + * @param pc overall publishing context * @param p file information for the file being published * @param offset where in the file are we so far * @return value returned from callback */ void * GNUNET_FS_publish_make_status_ (struct GNUNET_FS_ProgressInfo *pi, - struct GNUNET_FS_PublishContext *sc, + struct GNUNET_FS_PublishContext *pc, const struct GNUNET_FS_FileInformation *p, uint64_t offset) { - pi->value.publish.pc = sc; + pi->value.publish.pc = pc; pi->value.publish.fi = p; pi->value.publish.cctx = p->client_info; @@ -96,7 +96,7 @@ GNUNET_FS_publish_make_status_ (struct GNUNET_FS_ProgressInfo *pi, pi->value.publish.completed = offset; pi->value.publish.duration = GNUNET_TIME_absolute_get_duration (p->start_time); pi->value.publish.anonymity = p->bo.anonymity_level; - return sc->h->upcb (sc->h->upcb_cls, + return pc->h->upcb (pc->h->upcb_cls, pi); } @@ -113,6 +113,10 @@ publish_cleanup (void *cls, { struct GNUNET_FS_PublishContext *pc = cls; +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cleaning up publish context (done!)\n"); +#endif if (pc->fhc != NULL) { GNUNET_CRYPTO_hash_file_cancel (pc->fhc); @@ -120,7 +124,10 @@ publish_cleanup (void *cls, } GNUNET_FS_file_information_destroy (pc->fi, NULL, NULL); if (pc->namespace != NULL) - GNUNET_FS_namespace_delete (pc->namespace, GNUNET_NO); + { + GNUNET_FS_namespace_delete (pc->namespace, GNUNET_NO); + pc->namespace = NULL; + } GNUNET_free_non_null (pc->nid); GNUNET_free_non_null (pc->nuid); GNUNET_free_non_null (pc->serialization); @@ -130,7 +137,11 @@ publish_cleanup (void *cls, pc->dsh = NULL; } if (pc->client != NULL) - GNUNET_CLIENT_disconnect (pc->client, GNUNET_NO); + { + GNUNET_CLIENT_disconnect (pc->client, GNUNET_NO); + pc->client = NULL; + } + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); GNUNET_free (pc); } @@ -151,17 +162,18 @@ ds_put_cont (void *cls, struct PutContCtx *pcc = cls; struct GNUNET_FS_ProgressInfo pi; - if (GNUNET_SYSERR == pcc->sc->in_network_wait) + pcc->pc->qre = NULL; + if (GNUNET_SYSERR == pcc->pc->in_network_wait) { /* we were aborted in the meantime, finish shutdown! */ GNUNET_SCHEDULER_add_continuation (&publish_cleanup, - pcc->sc, + pcc->pc, GNUNET_SCHEDULER_REASON_PREREQ_DONE); GNUNET_free (pcc); return; } - GNUNET_assert (GNUNET_YES == pcc->sc->in_network_wait); - pcc->sc->in_network_wait = GNUNET_NO; + GNUNET_assert (GNUNET_YES == pcc->pc->in_network_wait); + pcc->pc->in_network_wait = GNUNET_NO; if (GNUNET_SYSERR == success) { GNUNET_asprintf (&pcc->p->emsg, @@ -170,22 +182,25 @@ ds_put_cont (void *cls, pi.status = GNUNET_FS_STATUS_PUBLISH_ERROR; pi.value.publish.eta = GNUNET_TIME_UNIT_FOREVER_REL; pi.value.publish.specifics.error.message = pcc->p->emsg; - pcc->p->client_info = GNUNET_FS_publish_make_status_ (&pi, pcc->sc, pcc->p, 0); + pcc->p->client_info = GNUNET_FS_publish_make_status_ (&pi, pcc->pc, pcc->p, 0); if ( (pcc->p->is_directory == GNUNET_NO) && (pcc->p->filename != NULL) && (pcc->p->data.file.do_index == GNUNET_YES) ) { /* run unindex to clean up */ - GNUNET_FS_unindex_start (pcc->sc->h, + GNUNET_FS_unindex_start (pcc->pc->h, pcc->p->filename, NULL); } } if (NULL != pcc->cont) - pcc->sc->upload_task - = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, - pcc->cont, - pcc->cont_cls); + { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pcc->pc->upload_task); + pcc->pc->upload_task + = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, + pcc->cont, + pcc->cont_cls); + } GNUNET_free (pcc); } @@ -196,18 +211,18 @@ ds_put_cont (void *cls, * published. * * @param p the completed upload - * @param sc context of the publication + * @param pc context of the publication */ static void signal_publish_completion (struct GNUNET_FS_FileInformation *p, - struct GNUNET_FS_PublishContext *sc) + struct GNUNET_FS_PublishContext *pc) { struct GNUNET_FS_ProgressInfo pi; pi.status = GNUNET_FS_STATUS_PUBLISH_COMPLETED; pi.value.publish.eta = GNUNET_TIME_UNIT_ZERO; pi.value.publish.specifics.completed.chk_uri = p->chk_uri; - p->client_info = GNUNET_FS_publish_make_status_ (&pi, sc, p, + p->client_info = GNUNET_FS_publish_make_status_ (&pi, pc, p, GNUNET_ntohll (p->chk_uri->data.chk.file_length)); } @@ -218,12 +233,12 @@ signal_publish_completion (struct GNUNET_FS_FileInformation *p, * a problem during publication. * * @param p the upload that had trouble - * @param sc context of the publication + * @param pc context of the publication * @param emsg error message */ static void signal_publish_error (struct GNUNET_FS_FileInformation *p, - struct GNUNET_FS_PublishContext *sc, + struct GNUNET_FS_PublishContext *pc, const char *emsg) { struct GNUNET_FS_ProgressInfo pi; @@ -232,13 +247,13 @@ signal_publish_error (struct GNUNET_FS_FileInformation *p, pi.status = GNUNET_FS_STATUS_PUBLISH_ERROR; pi.value.publish.eta = GNUNET_TIME_UNIT_FOREVER_REL; pi.value.publish.specifics.error.message =emsg; - p->client_info = GNUNET_FS_publish_make_status_ (&pi, sc, p, 0); + p->client_info = GNUNET_FS_publish_make_status_ (&pi, pc, p, 0); if ( (p->is_directory == GNUNET_NO) && (p->filename != NULL) && (p->data.file.do_index == GNUNET_YES) ) { /* run unindex to clean up */ - GNUNET_FS_unindex_start (sc->h, + GNUNET_FS_unindex_start (pc->h, p->filename, NULL); } @@ -261,6 +276,10 @@ finish_release_reserve (void *cls, struct GNUNET_FS_PublishContext *pc = cls; pc->qre = NULL; +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Releasing reserve done!\n"); +#endif signal_publish_completion (pc->fi, pc); pc->all_done = GNUNET_YES; GNUNET_FS_publish_sync_ (pc); @@ -312,24 +331,24 @@ publish_sblocks_cont (void *cls, * We are almost done publishing the structure, * add SBlocks (if needed). * - * @param sc overall upload data + * @param pc overall upload data */ static void -publish_sblock (struct GNUNET_FS_PublishContext *sc) +publish_sblock (struct GNUNET_FS_PublishContext *pc) { - if (NULL != sc->namespace) - GNUNET_FS_publish_sks (sc->h, - sc->namespace, - sc->nid, - sc->nuid, - sc->fi->meta, - sc->fi->chk_uri, - &sc->fi->bo, - sc->options, + if (NULL != pc->namespace) + GNUNET_FS_publish_sks (pc->h, + pc->namespace, + pc->nid, + pc->nuid, + pc->fi->meta, + pc->fi->chk_uri, + &pc->fi->bo, + pc->options, &publish_sblocks_cont, - sc); + pc); else - publish_sblocks_cont (sc, NULL, NULL); + publish_sblocks_cont (pc, NULL, NULL); } @@ -360,6 +379,7 @@ publish_kblocks_cont (void *cls, signal_publish_error (p, pc, emsg); GNUNET_FS_file_information_sync_ (p); GNUNET_FS_publish_sync_ (pc); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); pc->upload_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, &GNUNET_FS_publish_main_, @@ -378,6 +398,7 @@ publish_kblocks_cont (void *cls, else pc->fi_pos = p->dir; GNUNET_FS_publish_sync_ (pc); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); pc->upload_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, &GNUNET_FS_publish_main_, @@ -405,12 +426,12 @@ block_reader (void *cls, void *buf, char **emsg) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; size_t pt_size; const char *dd; - p = sc->fi_pos; + p = pc->fi_pos; if (p->is_directory) { pt_size = GNUNET_MIN(max, @@ -451,13 +472,13 @@ static void encode_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; struct GNUNET_FS_ProgressInfo pi; char *emsg; uint64_t flen; - p = sc->fi_pos; + p = pc->fi_pos; GNUNET_FS_tree_encoder_finish (p->te, &p->chk_uri, &emsg); @@ -476,7 +497,7 @@ encode_cont (void *cls, pi.status = GNUNET_FS_STATUS_PUBLISH_ERROR; pi.value.publish.eta = GNUNET_TIME_UNIT_FOREVER_REL; pi.value.publish.specifics.error.message = p->emsg; - p->client_info = GNUNET_FS_publish_make_status_ (&pi, sc, p, 0); + p->client_info = GNUNET_FS_publish_make_status_ (&pi, pc, p, 0); } #if DEBUG_PUBLISH GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -489,13 +510,14 @@ encode_cont (void *cls, pi.value.publish.specifics.progress.offset = flen; pi.value.publish.specifics.progress.data_len = 0; pi.value.publish.specifics.progress.depth = GNUNET_FS_compute_depth (flen); - p->client_info = GNUNET_FS_publish_make_status_ (&pi, sc, p, flen); + p->client_info = GNUNET_FS_publish_make_status_ (&pi, pc, p, flen); /* continue with main */ - sc->upload_task + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); + pc->upload_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, &GNUNET_FS_publish_main_, - sc); + pc); } @@ -522,31 +544,32 @@ block_proc (void *cls, const void *block, uint16_t block_size) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; struct PutContCtx * dpc_cls; struct OnDemandBlock odb; - p = sc->fi_pos; - if (NULL == sc->dsh) + p = pc->fi_pos; + if (NULL == pc->dsh) { #if DEBUG_PUBLISH GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n"); #endif - sc->upload_task + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); + pc->upload_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, &GNUNET_FS_publish_main_, - sc); + pc); return; } - GNUNET_assert (GNUNET_NO == sc->in_network_wait); - sc->in_network_wait = GNUNET_YES; + GNUNET_assert (GNUNET_NO == pc->in_network_wait); + pc->in_network_wait = GNUNET_YES; dpc_cls = GNUNET_malloc(sizeof(struct PutContCtx)); dpc_cls->cont = &GNUNET_FS_publish_main_; - dpc_cls->cont_cls = sc; - dpc_cls->sc = sc; + dpc_cls->cont_cls = pc; + dpc_cls->pc = pc; dpc_cls->p = p; if ( (! p->is_directory) && (GNUNET_YES == p->data.file.do_index) && @@ -561,20 +584,21 @@ block_proc (void *cls, #endif odb.offset = GNUNET_htonll (offset); odb.file_id = p->data.file.file_id; - GNUNET_DATASTORE_put (sc->dsh, - (p->is_directory) ? 0 : sc->rid, - &chk->query, - sizeof (struct OnDemandBlock), - &odb, - GNUNET_BLOCK_TYPE_FS_ONDEMAND, - p->bo.content_priority, - p->bo.anonymity_level, - p->bo.replication_level, - p->bo.expiration_time, - -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &ds_put_cont, - dpc_cls); + GNUNET_assert (pc->qre == NULL); + pc->qre = GNUNET_DATASTORE_put (pc->dsh, + (p->is_directory) ? 0 : pc->rid, + &chk->query, + sizeof (struct OnDemandBlock), + &odb, + GNUNET_BLOCK_TYPE_FS_ONDEMAND, + p->bo.content_priority, + p->bo.anonymity_level, + p->bo.replication_level, + p->bo.expiration_time, + -2, 1, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &ds_put_cont, + dpc_cls); return; } #if DEBUG_PUBLISH @@ -584,20 +608,21 @@ block_proc (void *cls, (unsigned long long) offset, (unsigned int) block_size); #endif - GNUNET_DATASTORE_put (sc->dsh, - (p->is_directory) ? 0 : sc->rid, - &chk->query, - block_size, - block, - type, - p->bo.content_priority, - p->bo.anonymity_level, - p->bo.replication_level, - p->bo.expiration_time, - -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &ds_put_cont, - dpc_cls); + GNUNET_assert (pc->qre == NULL); + pc->qre = GNUNET_DATASTORE_put (pc->dsh, + (p->is_directory) ? 0 : pc->rid, + &chk->query, + block_size, + block, + type, + p->bo.content_priority, + p->bo.anonymity_level, + p->bo.replication_level, + p->bo.expiration_time, + -2, 1, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &ds_put_cont, + dpc_cls); } @@ -618,17 +643,17 @@ progress_proc (void *cls, size_t pt_size, unsigned int depth) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; struct GNUNET_FS_ProgressInfo pi; - p = sc->fi_pos; + p = pc->fi_pos; pi.status = GNUNET_FS_STATUS_PUBLISH_PROGRESS; pi.value.publish.specifics.progress.data = pt_block; pi.value.publish.specifics.progress.offset = offset; pi.value.publish.specifics.progress.data_len = pt_size; pi.value.publish.specifics.progress.depth = depth; - p->client_info = GNUNET_FS_publish_make_status_ (&pi, sc, p, offset); + p->client_info = GNUNET_FS_publish_make_status_ (&pi, pc, p, offset); } @@ -637,10 +662,10 @@ progress_proc (void *cls, * block into memory, encrypt it and send it to the FS service. Then * continue with the main task. * - * @param sc overall upload data + * @param pc overall upload data */ static void -publish_content (struct GNUNET_FS_PublishContext *sc) +publish_content (struct GNUNET_FS_PublishContext *pc) { struct GNUNET_FS_FileInformation *p; char *emsg; @@ -649,7 +674,7 @@ publish_content (struct GNUNET_FS_PublishContext *sc) void *raw_data; uint64_t size; - p = sc->fi_pos; + p = pc->fi_pos; GNUNET_assert (p != NULL); if (NULL == p->te) { @@ -708,9 +733,9 @@ publish_content (struct GNUNET_FS_PublishContext *sc) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating tree encoder\n"); #endif - p->te = GNUNET_FS_tree_encoder_create (sc->h, + p->te = GNUNET_FS_tree_encoder_create (pc->h, size, - sc, + pc, &block_reader, &block_proc, &progress_proc, @@ -736,14 +761,14 @@ static void process_index_start_response (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; const char *emsg; uint16_t msize; - GNUNET_CLIENT_disconnect (sc->client, GNUNET_NO); - sc->client = NULL; - p = sc->fi_pos; + GNUNET_CLIENT_disconnect (pc->client, GNUNET_NO); + pc->client = NULL; + p = pc->fi_pos; if (msg == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -752,7 +777,7 @@ process_index_start_response (void *cls, _("timeout on index-start request to `fs' service")); p->data.file.do_index = GNUNET_NO; GNUNET_FS_file_information_sync_ (p); - publish_content (sc); + publish_content (pc); return; } if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK) @@ -768,13 +793,13 @@ process_index_start_response (void *cls, gettext (emsg)); p->data.file.do_index = GNUNET_NO; GNUNET_FS_file_information_sync_ (p); - publish_content (sc); + publish_content (pc); return; } p->data.file.index_start_confirmed = GNUNET_YES; /* success! continue with indexing */ GNUNET_FS_file_information_sync_ (p); - publish_content (sc); + publish_content (pc); } @@ -790,7 +815,7 @@ hash_for_index_cb (void *cls, const GNUNET_HashCode * res) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_FileInformation *p; struct IndexStartMessage *ism; size_t slen; @@ -799,8 +824,8 @@ hash_for_index_cb (void *cls, uint64_t ino; char *fn; - sc->fhc = NULL; - p = sc->fi_pos; + pc->fhc = NULL; + p = pc->fi_pos; if (NULL == res) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -809,12 +834,12 @@ hash_for_index_cb (void *cls, _("failed to compute hash")); p->data.file.do_index = GNUNET_NO; GNUNET_FS_file_information_sync_ (p); - publish_content (sc); + publish_content (pc); return; } if (GNUNET_YES == p->data.file.index_start_confirmed) { - publish_content (sc); + publish_content (pc); return; } fn = GNUNET_STRINGS_filename_expand (p->filename); @@ -829,7 +854,7 @@ hash_for_index_cb (void *cls, GNUNET_free (fn); p->data.file.do_index = GNUNET_NO; GNUNET_FS_file_information_sync_ (p); - publish_content (sc); + publish_content (pc); return; } #if DEBUG_PUBLISH @@ -838,18 +863,18 @@ hash_for_index_cb (void *cls, p->filename, GNUNET_h2s (res)); #endif - if (0 != (sc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY)) + if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY)) { p->data.file.file_id = *res; p->data.file.have_hash = GNUNET_YES; p->data.file.index_start_confirmed = GNUNET_YES; GNUNET_FS_file_information_sync_ (p); - publish_content (sc); + publish_content (pc); GNUNET_free (fn); return; } client = GNUNET_CLIENT_connect ("fs", - sc->h->cfg); + pc->h->cfg); if (NULL == client) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -857,7 +882,7 @@ hash_for_index_cb (void *cls, p->filename, _("could not connect to `fs' service")); p->data.file.do_index = GNUNET_NO; - publish_content (sc); + publish_content (pc); GNUNET_free (fn); return; } @@ -893,14 +918,14 @@ hash_for_index_cb (void *cls, fn, slen); GNUNET_free (fn); - sc->client = client; + pc->client = client; GNUNET_break (GNUNET_YES == GNUNET_CLIENT_transmit_and_get_response (client, &ism->header, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, &process_index_start_response, - sc)); + pc)); GNUNET_free (ism); } @@ -1078,42 +1103,46 @@ fip_signal_start(void *cls, int *do_index, void **client_info) { - struct GNUNET_FS_PublishContext *sc = cls; + struct GNUNET_FS_PublishContext *pc = cls; struct GNUNET_FS_ProgressInfo pi; unsigned int kc; uint64_t left; +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting publish operation\n"); +#endif if (*do_index) { /* space for on-demand blocks */ - sc->reserve_space += ((length + DBLOCK_SIZE - 1) / DBLOCK_SIZE) * sizeof (struct OnDemandBlock); + pc->reserve_space += ((length + DBLOCK_SIZE - 1) / DBLOCK_SIZE) * sizeof (struct OnDemandBlock); } else { /* space for DBlocks */ - sc->reserve_space += length; + pc->reserve_space += length; } /* entries for IBlocks and DBlocks, space for IBlocks */ left = length; while (1) { left = (left + DBLOCK_SIZE - 1) / DBLOCK_SIZE; - sc->reserve_entries += left; + pc->reserve_entries += left; if (left <= 1) break; left = left * sizeof (struct ContentHashKey); - sc->reserve_space += left; + pc->reserve_space += left; } - sc->reserve_entries++; + pc->reserve_entries++; /* entries and space for keywords */ if (NULL != *uri) { kc = GNUNET_FS_uri_ksk_get_keyword_count (*uri); - sc->reserve_entries += kc; - sc->reserve_space += GNUNET_SERVER_MAX_MESSAGE_SIZE * kc; + pc->reserve_entries += kc; + pc->reserve_space += GNUNET_SERVER_MAX_MESSAGE_SIZE * kc; } pi.status = GNUNET_FS_STATUS_PUBLISH_START; - *client_info = GNUNET_FS_publish_make_status_ (&pi, sc, fi, 0); + *client_info = GNUNET_FS_publish_make_status_ (&pi, pc, fi, 0); GNUNET_FS_file_information_sync_ (fi); return GNUNET_OK; } @@ -1143,21 +1172,31 @@ fip_signal_suspend(void *cls, int *do_index, void **client_info) { - struct GNUNET_FS_PublishContext*sc = cls; + struct GNUNET_FS_PublishContext*pc = cls; struct GNUNET_FS_ProgressInfo pi; uint64_t off; +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Suspending publish operation\n"); +#endif GNUNET_free_non_null (fi->serialization); fi->serialization = NULL; off = (fi->chk_uri == NULL) ? 0 : length; pi.status = GNUNET_FS_STATUS_PUBLISH_SUSPEND; - GNUNET_break (NULL == GNUNET_FS_publish_make_status_ (&pi, sc, fi, off)); + GNUNET_break (NULL == GNUNET_FS_publish_make_status_ (&pi, pc, fi, off)); *client_info = NULL; - if (NULL != sc->dsh) + if (NULL != pc->qre) { - GNUNET_DATASTORE_disconnect (sc->dsh, GNUNET_NO); - sc->dsh = NULL; + GNUNET_DATASTORE_cancel (pc->qre); + pc->qre = NULL; + } + if (NULL != pc->dsh) + { + GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO); + pc->dsh = NULL; } + pc->rid = 0; return GNUNET_OK; } @@ -1182,6 +1221,7 @@ GNUNET_FS_publish_signal_suspend_ (void *cls) &fip_signal_suspend, pc); GNUNET_FS_end_top (pc->h, pc->top); + pc->top = NULL; publish_cleanup (pc, NULL); } @@ -1202,6 +1242,11 @@ finish_reserve (void *cls, struct GNUNET_FS_PublishContext *pc = cls; pc->qre = NULL; +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reservation complete (%d)!\n", + success); +#endif if ( (msg != NULL) || (success <= 0) ) { @@ -1214,6 +1259,7 @@ finish_reserve (void *cls, return; } pc->rid = success; + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task); pc->upload_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, &GNUNET_FS_publish_main_, @@ -1279,6 +1325,7 @@ GNUNET_FS_publish_start (struct GNUNET_FS_Handle *h, GNUNET_FS_publish_sync_ (ret); if (NULL != ret->dsh) { + GNUNET_assert (NULL == ret->qre); GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Reserving space for %u entries and %llu bytes for publication\n"), (unsigned int) ret->reserve_entries, @@ -1294,6 +1341,7 @@ GNUNET_FS_publish_start (struct GNUNET_FS_Handle *h, } else { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == ret->upload_task); ret->upload_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_BACKGROUND, &GNUNET_FS_publish_main_, @@ -1327,13 +1375,13 @@ fip_signal_stop(void *cls, int *do_index, void **client_info) { - struct GNUNET_FS_PublishContext*sc = cls; + struct GNUNET_FS_PublishContext*pc = cls; struct GNUNET_FS_ProgressInfo pi; uint64_t off; if (fi->serialization != NULL) { - GNUNET_FS_remove_sync_file_ (sc->h, + GNUNET_FS_remove_sync_file_ (pc->h, GNUNET_FS_SYNC_PATH_FILE_INFO, fi->serialization); GNUNET_free (fi->serialization); @@ -1341,7 +1389,7 @@ fip_signal_stop(void *cls, } off = (fi->chk_uri == NULL) ? 0 : length; pi.status = GNUNET_FS_STATUS_PUBLISH_STOPPED; - GNUNET_break (NULL == GNUNET_FS_publish_make_status_ (&pi, sc, fi, off)); + GNUNET_break (NULL == GNUNET_FS_publish_make_status_ (&pi, pc, fi, off)); *client_info = NULL; return GNUNET_OK; } @@ -1358,6 +1406,10 @@ fip_signal_stop(void *cls, void GNUNET_FS_publish_stop (struct GNUNET_FS_PublishContext *pc) { +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Publish stop called\n"); +#endif GNUNET_FS_end_top (pc->h, pc->top); if (NULL != pc->qre) { @@ -1427,6 +1479,11 @@ struct PublishKskContext */ struct GNUNET_DATASTORE_Handle *dsh; + /** + * Handle to datastore PUT request. + */ + struct GNUNET_DATASTORE_QueueEntry *qre; + /** * Function to call once we're done. */ @@ -1488,8 +1545,13 @@ kb_put_cont (void *cls, { struct PublishKskContext *pkc = cls; + pkc->qre = NULL; if (GNUNET_OK != success) { +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "KB PUT operation complete\n"); +#endif if (NULL != pkc->dsh) { GNUNET_DATASTORE_disconnect (pkc->dsh, GNUNET_NO); @@ -1533,6 +1595,10 @@ publish_ksk_cont (void *cls, if ( (pkc->i == pkc->ksk_uri->data.ksk.keywordCount) || (NULL == pkc->dsh) ) { +#if DEBUG_PUBLISH + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "KSK PUT operation complete\n"); +#endif if (NULL != pkc->dsh) { GNUNET_DATASTORE_disconnect (pkc->dsh, GNUNET_NO); @@ -1573,22 +1639,22 @@ publish_ksk_cont (void *cls, &pkc->cpy->purpose, &pkc->cpy->signature)); GNUNET_CRYPTO_rsa_key_free (pk); - GNUNET_DATASTORE_put (pkc->dsh, - 0, - &query, - pkc->mdsize + - sizeof (struct KBlock) + - pkc->slen, - pkc->cpy, - GNUNET_BLOCK_TYPE_FS_KBLOCK, - pkc->bo.content_priority, - pkc->bo.anonymity_level, - pkc->bo.replication_level, - pkc->bo.expiration_time, - -2, 1, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &kb_put_cont, - pkc); + pkc->qre = GNUNET_DATASTORE_put (pkc->dsh, + 0, + &query, + pkc->mdsize + + sizeof (struct KBlock) + + pkc->slen, + pkc->cpy, + GNUNET_BLOCK_TYPE_FS_KBLOCK, + pkc->bo.content_priority, + pkc->bo.anonymity_level, + pkc->bo.replication_level, + pkc->bo.expiration_time, + -2, 1, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &kb_put_cont, + pkc); } diff --git a/src/fs/test_fs_publish_persistence.c b/src/fs/test_fs_publish_persistence.c index fdb5beb7d..e06d5c18f 100644 --- a/src/fs/test_fs_publish_persistence.c +++ b/src/fs/test_fs_publish_persistence.c @@ -147,9 +147,8 @@ progress_cb (void *cls, (unsigned long long) (FILESIZE * 1000 / (1+GNUNET_TIME_absolute_get_duration (start).rel_value) / 1024)); if (0 == strcmp ("publish-context-dir", event->value.publish.cctx)) - GNUNET_SCHEDULER_add_continuation (&abort_publish_task, - NULL, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); + GNUNET_SCHEDULER_add_now (&abort_publish_task, + NULL); break; case GNUNET_FS_STATUS_PUBLISH_PROGRESS: consider_restart (event->status); @@ -180,11 +179,8 @@ progress_cb (void *cls, "Error publishing file: %s\n", event->value.publish.specifics.error.message); err = 1; - if (0 == strcmp ("publish-context-dir", - event->value.publish.cctx)) - GNUNET_SCHEDULER_add_continuation (&abort_publish_task, - NULL, - GNUNET_SCHEDULER_REASON_PREREQ_DONE); + GNUNET_SCHEDULER_add_now (&abort_publish_task, + NULL); break; case GNUNET_FS_STATUS_PUBLISH_START: consider_restart (event->status); -- 2.25.1