X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_pr.c;h=c16f94464e5fd9f54d363de1ef005b366cd0a81d;hb=eab56fb5d5d6d5585a7ad7d665adc7c5fb5ec249;hp=ac183a98053d84f284f6c67f4ec212dc4ff363c0;hpb=83b19539f4d322b43683f5838b72e9ec2c8e6073;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index ac183a980..c16f94464 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -31,6 +31,21 @@ #include "gnunet-service-fs_pe.h" #include "gnunet-service-fs_pr.h" +/** + * Maximum size of the datastore queue for P2P operations. Needs to + * be large enough to queue MAX_QUEUE_PER_PEER operations for roughly + * the number of active (connected) peers. + */ +#define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER) + +/** + * Bandwidth value of a 0-priority content (must be fairly high + * compared to query since content is typically significantly larger + * -- and more valueable since it can take many queries to get one + * piece of content). + */ +#define CONTENT_BANDWIDTH_VALUE 800 + /** * Hard limit on the number of results we may get from the datastore per query. */ @@ -330,7 +345,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, } GNUNET_CONTAINER_multihashmap_put (pr_map, query, pr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - if (0 != (options & GSF_PRO_REQUEST_EXPIRES)) + if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES)) { pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr, @@ -343,9 +358,10 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, GNUNET_assert (dpr != NULL); if (pr == dpr) break; /* let the request live briefly... */ - dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY, - NULL, 0); + if (NULL != dpr->rh) + dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr, + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_ABS, + GNUNET_BLOCK_TYPE_ANY, NULL, 0); GSF_pending_request_cancel_ (dpr, GNUNET_YES); } } @@ -355,7 +371,6 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, return pr; } - /** * Obtain the public data associated with a pending request * @@ -759,6 +774,7 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) struct ProcessReplyClosure *prq = cls; struct GSF_PendingRequest *pr = value; GNUNET_HashCode chash; + struct GNUNET_TIME_Absolute last_transmission; if (NULL == pr->rh) return GNUNET_YES; @@ -788,9 +804,11 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_LOAD_update (GSF_rt_entry_lifetime, GNUNET_TIME_absolute_get_duration (pr-> public_data.start_time).rel_value); + if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission)) + last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value; /* pass on to other peers / local clients */ pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration, - prq->type, prq->data, prq->size); + last_transmission, prq->type, prq->data, prq->size); return GNUNET_YES; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: GNUNET_STATISTICS_update (GSF_stats, @@ -839,8 +857,10 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) pr->public_data.results_found++; prq->request_found = GNUNET_YES; /* finally, pass on to other peer / local client */ + if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission)) + last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value; pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration, - prq->type, prq->data, prq->size); + last_transmission, prq->type, prq->data, prq->size); return GNUNET_YES; } @@ -875,48 +895,76 @@ struct PutMigrationContext * * @param cls closure * @param success GNUNET_SYSERR on failure + * @param min_expiration minimum expiration time required for content to be stored * @param msg NULL on success, otherwise an error message */ static void -put_migration_continuation (void *cls, int success, const char *msg) +put_migration_continuation (void *cls, int success, + struct GNUNET_TIME_Absolute min_expiration, + const char *msg) { struct PutMigrationContext *pmc = cls; - struct GNUNET_TIME_Relative delay; - struct GNUNET_TIME_Relative block_time; struct GSF_ConnectedPeer *cp; + struct GNUNET_TIME_Relative mig_pause; struct GSF_PeerPerformanceData *ppd; - delay = GNUNET_TIME_absolute_get_duration (pmc->start); + if (NULL != datastore_put_load) + { + if (GNUNET_SYSERR != success) + { + GNUNET_LOAD_update (datastore_put_load, + GNUNET_TIME_absolute_get_duration (pmc->start).rel_value); + } + else + { + /* on queue failure / timeout, increase the put load dramatically */ + GNUNET_LOAD_update (datastore_put_load, + GNUNET_TIME_UNIT_MINUTES.rel_value); + } + } cp = GSF_peer_get_ (&pmc->origin); - if ((GNUNET_OK != success) && (GNUNET_NO == pmc->requested)) + if (GNUNET_OK == success) { - /* block migration for a bit... */ if (NULL != cp) { ppd = GSF_get_peer_performance_data_ (cp); - ppd->migration_duplication++; - block_time = - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - 5 * ppd->migration_duplication + - GNUNET_CRYPTO_random_u32 - (GNUNET_CRYPTO_QUALITY_WEAK, 5)); - GSF_block_peer_migration_ (cp, block_time); + ppd->migration_delay.rel_value /= 2; } + GNUNET_free (pmc); + return; } - else + if ( (GNUNET_NO == success) && + (GNUNET_NO == pmc->requested) && + (NULL != cp) ) { - if (NULL != cp) + ppd = GSF_get_peer_performance_data_ (cp); + if (min_expiration.abs_value > 0) { - ppd = GSF_get_peer_performance_data_ (cp); - ppd->migration_duplication = 0; /* reset counter */ +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking to stop migration for %llu ms because datastore is full\n", + (unsigned long long) GNUNET_TIME_absolute_get_remaining (min_expiration).rel_value); +#endif + GSF_block_peer_migration_ (cp, min_expiration); + } + else + { + ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS, + ppd->migration_delay); + ppd->migration_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, + ppd->migration_delay); + mig_pause.rel_value = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + ppd->migration_delay.rel_value); + ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Replicated content already exists locally, asking to stop migration for %llu ms\n", + (unsigned long long) mig_pause.rel_value); +#endif + GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (mig_pause)); } } GNUNET_free (pmc); - /* FIXME: should we really update the load value on failure? */ - if (NULL != datastore_put_load) - GNUNET_LOAD_update (datastore_put_load, delay.rel_value); - if (GNUNET_OK == success) - return; GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# Datastore `PUT' failures"), 1, GNUNET_NO); @@ -985,6 +1033,9 @@ handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp, memset (&prq, 0, sizeof (prq)); prq.data = data; prq.expiration = exp; + /* do not allow migrated content to live longer than 1 year */ + prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), + prq.expiration); prq.size = size; prq.type = type; process_reply (&prq, key, pr); @@ -1007,7 +1058,7 @@ handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, pmc)) { - put_migration_continuation (pmc, GNUNET_NO, NULL); + put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL); } } } @@ -1170,7 +1221,7 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size, if (NULL == key) { #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "No further local responses available.\n"); #endif if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || @@ -1468,6 +1519,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, dsize = msize - sizeof (struct PutMessage); type = ntohl (put->type); expiration = GNUNET_TIME_absolute_ntoh (put->expiration); + /* do not allow migrated content to live longer than 1 year */ + expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), + expiration); if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) return GNUNET_SYSERR; if (GNUNET_OK != @@ -1522,7 +1576,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, pmc)) { - put_migration_continuation (pmc, GNUNET_NO, NULL); + put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL); } } else @@ -1547,7 +1601,15 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (unsigned int) (60000 * putl * putl))); - GSF_block_peer_migration_ (cp, block_time); +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking to stop migration for %llu ms because of load %f and events %d/%d\n", + (unsigned long long) block_time.rel_value, + putl, + active_to_migration, + (GNUNET_NO == prq.request_found)); +#endif + GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); } return GNUNET_OK; } @@ -1572,13 +1634,13 @@ GSF_pending_request_init_ () "MAX_PENDING_REQUESTS"); } if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "core", "TOTAL_QUOTA_OUT", - &bps)) + GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "ats", "WAN_QUOTA_OUT", + &bps)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Configuration fails to specify `%s', assuming default value."), - "TOTAL_QUOTA_IN"); + "WAN_QUOTA_OUT"); bps = 65536; } /* queue size should be #queries we can have pending and satisfy within