X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_put.c;h=e8c7f586d1992bd4ec981cd70524765737b3c2d4;hb=0d8487a744066dce7d097fb91ae0f965033c79ea;hp=ec2344b373d5a07ba158705dcd24d55600a474ce;hpb=5746309cb4be2073d550ad7a6885e918631dbc38;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index ec2344b37..e8c7f586d 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2011 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -33,6 +33,11 @@ */ #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +/** + * How many replicas do we try to create per PUT? + */ +#define DEFAULT_PUT_REPLICATION 5 + /** * Context for each zero-anonymity iterator. @@ -50,10 +55,15 @@ struct PutOperator */ enum GNUNET_BLOCK_Type dht_put_type; + /** + * Handle to PUT operation. + */ + struct GNUNET_DHT_PutHandle *dht_put; + /** * ID of task that collects blocks for DHT PUTs. */ - GNUNET_SCHEDULER_TaskIdentifier dht_task; + struct GNUNET_SCHEDULER_Task * dht_task; /** * How many entires with zero anonymity of our type do we currently @@ -62,9 +72,14 @@ struct PutOperator uint64_t zero_anonymity_count_estimate; /** - * Current offset when iterating the database. + * Count of results received from the database. */ - uint64_t current_offset; + uint64_t result_count; + + /** + * Next UID to request when iterating the database. + */ + uint64_t next_uid; }; @@ -73,39 +88,31 @@ struct PutOperator * of block that we're putting into the DHT). */ static struct PutOperator operators[] = { - {NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0}, - {NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0}, - {NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0}, + {NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0}, {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0} }; /** * Task that is run periodically to obtain blocks for DHT PUTs. - * + * * @param cls type of blocks to gather * @param tc scheduler context (unused) */ -static void gather_dht_put_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext - *tc); +static void +gather_dht_put_blocks (void *cls); /** - * Task that is run periodically to obtain blocks for DHT PUTs. - * - * @param cls type of blocks to gather - * @param tc scheduler context (unused) + * Calculate when to run the next PUT operation and schedule it. + * + * @param po put operator to schedule */ static void -delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +schedule_next_put (struct PutOperator *po) { - struct PutOperator *po = cls; struct GNUNET_TIME_Relative delay; - po->dht_task = GNUNET_SCHEDULER_NO_TASK; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; if (po->zero_anonymity_count_estimate > 0) { delay = @@ -124,6 +131,41 @@ delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/** + * Continuation called after DHT PUT operation has finished. + * + * @param cls type of blocks to gather + * @param success GNUNET_OK if the PUT was transmitted, + * GNUNET_NO on timeout, + * GNUNET_SYSERR on disconnect from service + * after the PUT message was transmitted + * (so we don't know if it was received or not) + */ +static void +delay_dht_put_blocks (void *cls, int success) +{ + struct PutOperator *po = cls; + + po->dht_put = NULL; + schedule_next_put (po); +} + + +/** + * Task that is run periodically to obtain blocks for DHT PUTs. + * + * @param cls type of blocks to gather + */ +static void +delay_dht_put_task (void *cls) +{ + struct PutOperator *po = cls; + + po->dht_task = NULL; + schedule_next_put (po); +} + + /** * Store content in DHT. * @@ -134,61 +176,75 @@ delay_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content + * @param replication replication-level for the content * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available */ static void -process_dht_put_content (void *cls, const GNUNET_HashCode * key, size_t size, - const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, uint64_t uid) +process_dht_put_content (void *cls, + const struct GNUNET_HashCode * key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + uint32_t replication, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { struct PutOperator *po = cls; po->dht_qe = NULL; if (key == NULL) { - po->zero_anonymity_count_estimate = po->current_offset - 1; - po->current_offset = 0; - po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po); + po->zero_anonymity_count_estimate = po->result_count; + po->result_count = 0; + po->next_uid = 0; + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); return; } + po->result_count++; + po->next_uid = uid + 1; po->zero_anonymity_count_estimate = - GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate); -#if DEBUG_FS + GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key), type); -#endif - GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION, GNUNET_DHT_RO_NONE, - type, size, data, expiration, GNUNET_TIME_UNIT_FOREVER_REL, - &delay_dht_put_blocks, po); + po->dht_put = GNUNET_DHT_put (GSF_dht, + key, + DEFAULT_PUT_REPLICATION, + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, + type, + size, + data, + expiration, + &delay_dht_put_blocks, + po); } /** * Task that is run periodically to obtain blocks for DHT PUTs. - * + * * @param cls type of blocks to gather - * @param tc scheduler context (unused) */ static void -gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +gather_dht_put_blocks (void *cls) { struct PutOperator *po = cls; - po->dht_task = GNUNET_SCHEDULER_NO_TASK; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; + po->dht_task = NULL; po->dht_qe = - GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0, + GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, + po->next_uid, + 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, po->dht_put_type, - &process_dht_put_content, po); + &process_dht_put_content, + po); if (NULL == po->dht_qe) - po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, po); + po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po); } @@ -222,10 +278,15 @@ GSF_put_done_ () i = 0; while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY) { - if (GNUNET_SCHEDULER_NO_TASK != po->dht_task) + if (NULL != po->dht_task) { GNUNET_SCHEDULER_cancel (po->dht_task); - po->dht_task = GNUNET_SCHEDULER_NO_TASK; + po->dht_task = NULL; + } + if (NULL != po->dht_put) + { + GNUNET_DHT_put_cancel (po->dht_put); + po->dht_put = NULL; } if (NULL != po->dht_qe) {