X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_put.c;h=07d32ef31bd6d9d540249c5680576fefb447a695;hb=20e86c5bb520dadff4354ab8a0728b914ed82e3f;hp=121a90bcd3c25804ff990ecf6b8160ac4d2d9c39;hpb=e8f35bb025c25839a52fb502e452393831e4e6f0;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index 121a90bcd..07d32ef31 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -35,81 +35,92 @@ /** - * Request to datastore for DHT PUTs (or NULL). + * Context for each zero-anonymity iterator. */ -static struct GNUNET_DATASTORE_QueueEntry *dht_qe; +struct PutOperator +{ -/** - * Type we will request for the next DHT PUT round from the datastore. - */ -static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; + /** + * Request to datastore for DHT PUTs (or NULL). + */ + struct GNUNET_DATASTORE_QueueEntry *dht_qe; + + /** + * Type we request from the datastore. + */ + enum GNUNET_BLOCK_Type dht_put_type; + + /** + * ID of task that collects blocks for DHT PUTs. + */ + GNUNET_SCHEDULER_TaskIdentifier dht_task; + + /** + * How many entires with zero anonymity of our type do we currently + * estimate to have in the database? + */ + uint64_t zero_anonymity_count_estimate; + + /** + * Current offset when iterating the database. + */ + uint64_t current_offset; +}; -/** - * ID of task that collects blocks for DHT PUTs. - */ -static GNUNET_SCHEDULER_TaskIdentifier dht_task; /** - * How many entires with zero anonymity do we currently estimate - * to have in the database? + * ANY-terminated list of our operators (one per type + * of block that we're putting into the DHT). */ -static unsigned int zero_anonymity_count_estimate; +static struct PutOperator operators[] = { + {NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0}, + {NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0}, + {NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0}, + {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0} +}; /** * Task that is run periodically to obtain blocks for DHT PUTs. - * + * * @param cls type of blocks to gather * @param tc scheduler context (unused) */ static void gather_dht_put_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - + const struct GNUNET_SCHEDULER_TaskContext *tc); /** - * If the DHT PUT gathering task is not currently running, consider - * (re)scheduling it with the appropriate delay. + * Task that is run periodically to obtain blocks for DHT PUTs. + * + * @param cls type of blocks to gather + * @param tc scheduler context (unused) */ static void -consider_dht_put_gathering (void *cls) +delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct PutOperator *po = cls; struct GNUNET_TIME_Relative delay; - if (GSF_dsh == NULL) + po->dht_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (dht_qe != NULL) - return; - if (dht_task != GNUNET_SCHEDULER_NO_TASK) - return; - if (zero_anonymity_count_estimate > 0) - { - delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, - zero_anonymity_count_estimate); - delay = GNUNET_TIME_relative_min (delay, - MAX_DHT_PUT_FREQ); - } + if (po->zero_anonymity_count_estimate > 0) + { + delay = + GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, + po->zero_anonymity_count_estimate); + delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ); + } else - { - /* if we have NO zero-anonymity content yet, wait 5 minutes for some to - (hopefully) appear */ - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); - } - dht_task = GNUNET_SCHEDULER_add_delayed (delay, - &gather_dht_put_blocks, - cls); -} - - -/** - * Function called upon completion of the DHT PUT operation. - */ -static void -dht_put_continuation (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); + { + /* if we have NO zero-anonymity content yet, wait 5 minutes for some to + * (hopefully) appear */ + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); + } + po->dht_task = + GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po); } @@ -128,82 +139,57 @@ dht_put_continuation (void *cls, * maybe 0 if no unique identifier is available */ static void -process_dht_put_content (void *cls, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) -{ - static unsigned int counter; - static GNUNET_HashCode last_vhash; - static GNUNET_HashCode vhash; +process_dht_put_content (void *cls, const GNUNET_HashCode * key, size_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, uint64_t uid) +{ + struct PutOperator *po = cls; + po->dht_qe = NULL; if (key == NULL) - { - dht_qe = NULL; - consider_dht_put_gathering (cls); - return; - } - /* slightly funky code to estimate the total number of values with zero - anonymity from the maximum observed length of a monotonically increasing - sequence of hashes over the contents */ - GNUNET_CRYPTO_hash (data, size, &vhash); - if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0) - { - if (zero_anonymity_count_estimate > 0) - zero_anonymity_count_estimate /= 2; - counter = 0; - } - last_vhash = vhash; - if (counter < 31) - counter++; - if (zero_anonymity_count_estimate < (1 << counter)) - zero_anonymity_count_estimate = (1 << counter); + { + po->zero_anonymity_count_estimate = po->current_offset - 1; + po->current_offset = 0; + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po); + return; + } + po->zero_anonymity_count_estimate = + GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate); #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retrieved block `%s' of type %u for DHT PUT\n", - GNUNET_h2s (key), - type); + "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), + type); #endif - GNUNET_DHT_put (GSF_dht, - key, - DEFAULT_PUT_REPLICATION, - GNUNET_DHT_RO_NONE, - type, - size, - data, - expiration, - GNUNET_TIME_UNIT_FOREVER_REL, - &dht_put_continuation, - cls); + GNUNET_DHT_put (GSF_dht, key, 5 /* DEFAULT_PUT_REPLICATION */ , + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data, + expiration, GNUNET_TIME_UNIT_FOREVER_REL, + &delay_dht_put_blocks, po); } /** * Task that is run periodically to obtain blocks for DHT PUTs. - * + * * @param cls type of blocks to gather * @param tc scheduler context (unused) */ static void -gather_dht_put_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - dht_task = GNUNET_SCHEDULER_NO_TASK; - if (GSF_dsh == NULL) + struct PutOperator *po = cls; + + po->dht_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; - dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh, - 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - dht_put_type++, - &process_dht_put_content, NULL); - GNUNET_assert (dht_qe != NULL); + po->dht_qe = + GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, + UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + po->dht_put_type, + &process_dht_put_content, po); + if (NULL == po->dht_qe) + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po); } @@ -213,7 +199,15 @@ gather_dht_put_blocks (void *cls, void GSF_put_init_ () { - dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); + unsigned int i; + + i = 0; + while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY) + { + operators[i].dht_task = + GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]); + i++; + } } @@ -223,16 +217,24 @@ GSF_put_init_ () void GSF_put_done_ () { - if (GNUNET_SCHEDULER_NO_TASK != dht_task) + struct PutOperator *po; + unsigned int i; + + i = 0; + while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY) + { + if (GNUNET_SCHEDULER_NO_TASK != po->dht_task) { - GNUNET_SCHEDULER_cancel (dht_task); - dht_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (po->dht_task); + po->dht_task = GNUNET_SCHEDULER_NO_TASK; } - if (NULL != dht_qe) + if (NULL != po->dht_qe) { - GNUNET_DATASTORE_cancel (dht_qe); - dht_qe = NULL; + GNUNET_DATASTORE_cancel (po->dht_qe); + po->dht_qe = NULL; } + i++; + } } /* end of gnunet-service-fs_put.c */