X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_pr.c;h=c16f94464e5fd9f54d363de1ef005b366cd0a81d;hb=eab56fb5d5d6d5585a7ad7d665adc7c5fb5ec249;hp=1bd16c9f157a1db6089facce1ad9c210c4fc705d;hpb=71ea5bd2d05058008e604ffd42993be9c7250e04;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 1bd16c9f1..c16f94464 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -345,7 +345,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, } GNUNET_CONTAINER_multihashmap_put (pr_map, query, pr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - if (0 != (options & GSF_PRO_REQUEST_EXPIRES)) + if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES)) { pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr, @@ -358,9 +358,10 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, GNUNET_assert (dpr != NULL); if (pr == dpr) break; /* let the request live briefly... */ - dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY, - NULL, 0); + if (NULL != dpr->rh) + dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr, + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_ABS, + GNUNET_BLOCK_TYPE_ANY, NULL, 0); GSF_pending_request_cancel_ (dpr, GNUNET_YES); } } @@ -370,7 +371,6 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, return pr; } - /** * Obtain the public data associated with a pending request * @@ -774,6 +774,7 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) struct ProcessReplyClosure *prq = cls; struct GSF_PendingRequest *pr = value; GNUNET_HashCode chash; + struct GNUNET_TIME_Absolute last_transmission; if (NULL == pr->rh) return GNUNET_YES; @@ -803,9 +804,11 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_LOAD_update (GSF_rt_entry_lifetime, GNUNET_TIME_absolute_get_duration (pr-> public_data.start_time).rel_value); + if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission)) + last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value; /* pass on to other peers / local clients */ pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration, - prq->type, prq->data, prq->size); + last_transmission, prq->type, prq->data, prq->size); return GNUNET_YES; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: GNUNET_STATISTICS_update (GSF_stats, @@ -854,8 +857,10 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) pr->public_data.results_found++; prq->request_found = GNUNET_YES; /* finally, pass on to other peer / local client */ + if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission)) + last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value; pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration, - prq->type, prq->data, prq->size); + last_transmission, prq->type, prq->data, prq->size); return GNUNET_YES; } @@ -890,48 +895,76 @@ struct PutMigrationContext * * @param cls closure * @param success GNUNET_SYSERR on failure + * @param min_expiration minimum expiration time required for content to be stored * @param msg NULL on success, otherwise an error message */ static void -put_migration_continuation (void *cls, int success, const char *msg) +put_migration_continuation (void *cls, int success, + struct GNUNET_TIME_Absolute min_expiration, + const char *msg) { struct PutMigrationContext *pmc = cls; - struct GNUNET_TIME_Relative delay; - struct GNUNET_TIME_Relative block_time; struct GSF_ConnectedPeer *cp; + struct GNUNET_TIME_Relative mig_pause; struct GSF_PeerPerformanceData *ppd; - delay = GNUNET_TIME_absolute_get_duration (pmc->start); + if (NULL != datastore_put_load) + { + if (GNUNET_SYSERR != success) + { + GNUNET_LOAD_update (datastore_put_load, + GNUNET_TIME_absolute_get_duration (pmc->start).rel_value); + } + else + { + /* on queue failure / timeout, increase the put load dramatically */ + GNUNET_LOAD_update (datastore_put_load, + GNUNET_TIME_UNIT_MINUTES.rel_value); + } + } cp = GSF_peer_get_ (&pmc->origin); - if ((GNUNET_OK != success) && (GNUNET_NO == pmc->requested)) + if (GNUNET_OK == success) { - /* block migration for a bit... */ if (NULL != cp) { ppd = GSF_get_peer_performance_data_ (cp); - ppd->migration_duplication++; - block_time = - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - 5 * ppd->migration_duplication + - GNUNET_CRYPTO_random_u32 - (GNUNET_CRYPTO_QUALITY_WEAK, 5)); - GSF_block_peer_migration_ (cp, block_time); + ppd->migration_delay.rel_value /= 2; } + GNUNET_free (pmc); + return; } - else + if ( (GNUNET_NO == success) && + (GNUNET_NO == pmc->requested) && + (NULL != cp) ) { - if (NULL != cp) + ppd = GSF_get_peer_performance_data_ (cp); + if (min_expiration.abs_value > 0) { - ppd = GSF_get_peer_performance_data_ (cp); - ppd->migration_duplication = 0; /* reset counter */ +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking to stop migration for %llu ms because datastore is full\n", + (unsigned long long) GNUNET_TIME_absolute_get_remaining (min_expiration).rel_value); +#endif + GSF_block_peer_migration_ (cp, min_expiration); + } + else + { + ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS, + ppd->migration_delay); + ppd->migration_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, + ppd->migration_delay); + mig_pause.rel_value = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + ppd->migration_delay.rel_value); + ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicated content already exists locally, asking to stop migration for %llu ms\n", + (unsigned long long) mig_pause.rel_value); +#endif + GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (mig_pause)); } } GNUNET_free (pmc); - /* FIXME: should we really update the load value on failure? */ - if (NULL != datastore_put_load) - GNUNET_LOAD_update (datastore_put_load, delay.rel_value); - if (GNUNET_OK == success) - return; GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# Datastore `PUT' failures"), 1, GNUNET_NO); @@ -1000,6 +1033,9 @@ handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp, memset (&prq, 0, sizeof (prq)); prq.data = data; prq.expiration = exp; + /* do not allow migrated content to live longer than 1 year */ + prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), + prq.expiration); prq.size = size; prq.type = type; process_reply (&prq, key, pr); @@ -1022,7 +1058,7 @@ handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, pmc)) { - put_migration_continuation (pmc, GNUNET_NO, NULL); + put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL); } } } @@ -1185,7 +1221,7 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size, if (NULL == key) { #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "No further local responses available.\n"); #endif if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || @@ -1483,6 +1519,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, dsize = msize - sizeof (struct PutMessage); type = ntohl (put->type); expiration = GNUNET_TIME_absolute_ntoh (put->expiration); + /* do not allow migrated content to live longer than 1 year */ + expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), + expiration); if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) return GNUNET_SYSERR; if (GNUNET_OK != @@ -1537,7 +1576,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, pmc)) { - put_migration_continuation (pmc, GNUNET_NO, NULL); + put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL); } } else @@ -1562,7 +1601,15 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (unsigned int) (60000 * putl * putl))); - GSF_block_peer_migration_ (cp, block_time); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking to stop migration for %llu ms because of load %f and events %d/%d\n", + (unsigned long long) block_time.rel_value, + putl, + active_to_migration, + (GNUNET_NO == prq.request_found)); +#endif + GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); } return GNUNET_OK; }