X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_put.c;h=0a97bcb222121c49276812dba05a3f4336d3b3d9;hb=6da7b6a2dc5ec3645d5f8bdbd4ab77d6090b823f;hp=5fd2ce81c4a551e266aee9a54b5f10cfa271dc9d;hpb=390a68296dd89f61461bdca02060d36e2e02af2b;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index 5fd2ce81c..0a97bcb22 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -33,83 +33,133 @@ */ #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) - /** - * Request to datastore for DHT PUTs (or NULL). + * How many replicas do we try to create per PUT? */ -static struct GNUNET_DATASTORE_QueueEntry *dht_qe; +#define DEFAULT_PUT_REPLICATION 5 -/** - * Type we will request for the next DHT PUT round from the datastore. - */ -static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; /** - * ID of task that collects blocks for DHT PUTs. + * Context for each zero-anonymity iterator. */ -static GNUNET_SCHEDULER_TaskIdentifier dht_task; +struct PutOperator +{ + + /** + * Request to datastore for DHT PUTs (or NULL). + */ + struct GNUNET_DATASTORE_QueueEntry *dht_qe; + + /** + * Type we request from the datastore. + */ + enum GNUNET_BLOCK_Type dht_put_type; + + /** + * Handle to PUT operation. + */ + struct GNUNET_DHT_PutHandle *dht_put; + + /** + * ID of task that collects blocks for DHT PUTs. + */ + struct GNUNET_SCHEDULER_Task * dht_task; + + /** + * How many entires with zero anonymity of our type do we currently + * estimate to have in the database? + */ + uint64_t zero_anonymity_count_estimate; + + /** + * Current offset when iterating the database. + */ + uint64_t current_offset; +}; + /** - * How many entires with zero anonymity do we currently estimate - * to have in the database? + * ANY-terminated list of our operators (one per type + * of block that we're putting into the DHT). */ -static unsigned int zero_anonymity_count_estimate; +static struct PutOperator operators[] = { + {NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0}, + {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0} +}; /** * Task that is run periodically to obtain blocks for DHT PUTs. - * + * * @param cls type of blocks to gather * @param tc scheduler context (unused) */ static void gather_dht_put_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - + const struct GNUNET_SCHEDULER_TaskContext *tc); /** - * If the DHT PUT gathering task is not currently running, consider - * (re)scheduling it with the appropriate delay. + * Calculate when to run the next PUT operation and schedule it. + * + * @param po put operator to schedule */ static void -consider_dht_put_gathering (void *cls) +schedule_next_put (struct PutOperator *po) { struct GNUNET_TIME_Relative delay; - if (GSF_dsh == NULL) - return; - if (dht_qe != NULL) - return; - if (dht_task != GNUNET_SCHEDULER_NO_TASK) - return; - if (zero_anonymity_count_estimate > 0) - { - delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, - zero_anonymity_count_estimate); - delay = GNUNET_TIME_relative_min (delay, - MAX_DHT_PUT_FREQ); - } + if (po->zero_anonymity_count_estimate > 0) + { + delay = + GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, + po->zero_anonymity_count_estimate); + delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ); + } else - { - /* if we have NO zero-anonymity content yet, wait 5 minutes for some to - (hopefully) appear */ - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); - } - dht_task = GNUNET_SCHEDULER_add_delayed (delay, - &gather_dht_put_blocks, - cls); + { + /* if we have NO zero-anonymity content yet, wait 5 minutes for some to + * (hopefully) appear */ + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); + } + po->dht_task = + GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po); +} + + +/** + * Continuation called after DHT PUT operation has finished. + * + * @param cls type of blocks to gather + * @param success GNUNET_OK if the PUT was transmitted, + * GNUNET_NO on timeout, + * GNUNET_SYSERR on disconnect from service + * after the PUT message was transmitted + * (so we don't know if it was received or not) + */ +static void +delay_dht_put_blocks (void *cls, int success) +{ + struct PutOperator *po = cls; + + po->dht_put = NULL; + schedule_next_put (po); } /** - * Function called upon completion of the DHT PUT operation. + * Task that is run periodically to obtain blocks for DHT PUTs. + * + * @param cls type of blocks to gather + * @param tc scheduler context */ static void -dht_put_continuation (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +delay_dht_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - GNUNET_DATASTORE_get_next (GSF_dsh); + struct PutOperator *po = cls; + + po->dht_task = NULL; + schedule_next_put (po); } @@ -129,81 +179,57 @@ dht_put_continuation (void *cls, */ static void process_dht_put_content (void *cls, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode * key, size_t size, - const void *data, + const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) -{ - static unsigned int counter; - static GNUNET_HashCode last_vhash; - static GNUNET_HashCode vhash; + uint32_t priority, uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, uint64_t uid) +{ + struct PutOperator *po = cls; + po->dht_qe = NULL; if (key == NULL) - { - dht_qe = NULL; - consider_dht_put_gathering (cls); - return; - } - /* slightly funky code to estimate the total number of values with zero - anonymity from the maximum observed length of a monotonically increasing - sequence of hashes over the contents */ - GNUNET_CRYPTO_hash (data, size, &vhash); - if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0) - { - if (zero_anonymity_count_estimate > 0) - zero_anonymity_count_estimate /= 2; - counter = 0; - } - last_vhash = vhash; - if (counter < 31) - counter++; - if (zero_anonymity_count_estimate < (1 << counter)) - zero_anonymity_count_estimate = (1 << counter); -#if DEBUG_FS + { + po->zero_anonymity_count_estimate = po->current_offset - 1; + po->current_offset = 0; + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); + return; + } + po->zero_anonymity_count_estimate = + GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retrieved block `%s' of type %u for DHT PUT\n", - GNUNET_h2s (key), - type); -#endif - GNUNET_DHT_put (GSF_dht, - key, - DEFAULT_PUT_REPLICATION, - GNUNET_DHT_RO_NONE, - type, - size, - data, - expiration, - GNUNET_TIME_UNIT_FOREVER_REL, - &dht_put_continuation, - cls); + "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), + type); + po->dht_put = GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION, + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data, + expiration, GNUNET_TIME_UNIT_FOREVER_REL, + &delay_dht_put_blocks, po); } /** * Task that is run periodically to obtain blocks for DHT PUTs. - * + * * @param cls type of blocks to gather * @param tc scheduler context (unused) */ static void -gather_dht_put_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - dht_task = GNUNET_SCHEDULER_NO_TASK; - if (GSF_dsh == NULL) + struct PutOperator *po = cls; + + po->dht_task = NULL; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; - dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, - 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - dht_put_type++, - &process_dht_put_content, NULL); - GNUNET_assert (dht_qe != NULL); + po->dht_qe = + GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, + UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + po->dht_put_type, + &process_dht_put_content, po); + if (NULL == po->dht_qe) + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); } @@ -213,7 +239,15 @@ gather_dht_put_blocks (void *cls, void GSF_put_init_ () { - dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); + unsigned int i; + + i = 0; + while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY) + { + operators[i].dht_task = + GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]); + i++; + } } @@ -223,16 +257,29 @@ GSF_put_init_ () void GSF_put_done_ () { - if (GNUNET_SCHEDULER_NO_TASK != dht_task) + struct PutOperator *po; + unsigned int i; + + i = 0; + while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY) + { + if (NULL != po->dht_task) + { + GNUNET_SCHEDULER_cancel (po->dht_task); + po->dht_task = NULL; + } + if (NULL != po->dht_put) { - GNUNET_SCHEDULER_cancel (dht_task); - dht_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_DHT_put_cancel (po->dht_put); + po->dht_put = NULL; } - if (NULL != dht_qe) + if (NULL != po->dht_qe) { - GNUNET_DATASTORE_cancel (dht_qe); - dht_qe = NULL; + GNUNET_DATASTORE_cancel (po->dht_qe); + po->dht_qe = NULL; } + i++; + } } /* end of gnunet-service-fs_put.c */