X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Ffs%2Fgnunet-service-fs_push.c;h=02990a63709a301c57581a87be8377c0aed4f13d;hb=c4e9ba925ffd758aaa3feee2ccfc0b76f26fe207;hp=4fde823a1d168046812dcb41ba891734ddd65b66;hpb=0443654040fb277df95aae28a7b2a54a4f0a73bf;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 4fde823a1..02990a637 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -1,22 +1,22 @@ /* This file is part of GNUnet. - (C) 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2011, 2016 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + Affero General Public License for more details. - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later + */ /** * @file fs/gnunet-service-fs_push.c @@ -31,8 +31,6 @@ #include "gnunet-service-fs_push.h" -#define DEBUG_FS_MIGRATION GNUNET_EXTRA_LOGGING - /** * Maximum number of blocks we keep in memory for migration. */ @@ -51,7 +49,8 @@ * that if this value is increased, the migration testcase may need to be * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c). */ -#define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30) +#define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply ( \ + GNUNET_TIME_UNIT_MINUTES, 30) /** @@ -59,7 +58,6 @@ */ struct MigrationReadyBlock { - /** * This is a doubly-linked list. */ @@ -73,7 +71,7 @@ struct MigrationReadyBlock /** * Query for the block. */ - GNUNET_HashCode query; + struct GNUNET_HashCode query; /** * When does this block expire? @@ -104,8 +102,7 @@ struct MigrationReadyBlock /** - * Information about a peer waiting for - * migratable data. + * Information about a peer waiting for migratable data. */ struct MigrationReadyPeer { @@ -125,15 +122,9 @@ struct MigrationReadyPeer struct GSF_ConnectedPeer *peer; /** - * Handle for current transmission request, - * or NULL for none. + * Envelope of the currently pushed message. */ - struct GSF_PeerTransmitHandle *th; - - /** - * Message we are trying to push right now (or NULL) - */ - struct PutMessage *msg; + struct GNUNET_MQ_Envelope *env; }; @@ -165,7 +156,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe; /** * ID of task that collects blocks for migration. */ -static GNUNET_SCHEDULER_TaskIdentifier mig_task; +static struct GNUNET_SCHEDULER_Task *mig_task; /** * What is the maximum frequency at which we are allowed to @@ -183,6 +174,11 @@ static unsigned int mig_size; */ static int enabled; +/** + * Did we find anything in the datastore? + */ +static int value_found; + /** * Delete the given migration block. @@ -192,8 +188,11 @@ static int enabled; static void delete_migration_block (struct MigrationReadyBlock *mb) { - GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); - GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); + GNUNET_CONTAINER_DLL_remove (mig_head, + mig_tail, + mb); + GNUNET_PEER_decrement_rcs (mb->target_list, + MIGRATION_LIST_SIZE); mig_size--; GNUNET_free (mb); } @@ -201,50 +200,11 @@ delete_migration_block (struct MigrationReadyBlock *mb) /** * Find content for migration to this peer. - */ -static void -find_content (struct MigrationReadyPeer *mrp); - - -/** - * Transmit the message currently scheduled for - * transmission. * - * @param cls the 'struct MigrationReadyPeer' - * @param buf_size number of bytes available in buf - * @param buf where to copy the message, NULL on error (peer disconnect) - * @return number of bytes copied to 'buf', can be 0 (without indicating an error) + * @param cls a `struct MigrationReadyPeer *` */ -static size_t -transmit_message (void *cls, size_t buf_size, void *buf) -{ - struct MigrationReadyPeer *peer = cls; - struct PutMessage *msg; - uint16_t msize; - - peer->th = NULL; - msg = peer->msg; - peer->msg = NULL; - if (buf == NULL) - { -#if DEBUG_FS_MIGRATION - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to migrate content to another peer (disconnect)\n"); -#endif - GNUNET_free (msg); - return 0; - } - msize = ntohs (msg->header.size); - GNUNET_assert (msize <= buf_size); - memcpy (buf, msg, msize); - GNUNET_free (msg); -#if DEBUG_FS_MIGRATION - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n", - msize); -#endif - find_content (peer); - return msize; -} +static void +find_content (void *cls); /** @@ -252,34 +212,34 @@ transmit_message (void *cls, size_t buf_size, void *buf) * * @param peer target peer * @param block the block - * @return GNUNET_YES if the block was deleted (!) + * @return #GNUNET_YES if the block was deleted (!) */ static int -transmit_content (struct MigrationReadyPeer *peer, +transmit_content (struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block) { - size_t msize; struct PutMessage *msg; unsigned int i; struct GSF_PeerPerformanceData *ppd; int ret; - ppd = GSF_get_peer_performance_data_ (peer->peer); - GNUNET_assert (NULL == peer->th); - msize = sizeof (struct PutMessage) + block->size; - msg = GNUNET_malloc (msize); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - msg->header.size = htons (msize); + ppd = GSF_get_peer_performance_data_ (mrp->peer); + GNUNET_assert (NULL == mrp->env); + mrp->env = GNUNET_MQ_msg_extra (msg, + block->size, + GNUNET_MESSAGE_TYPE_FS_PUT); msg->type = htonl (block->type); msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); - memcpy (&msg[1], &block[1], block->size); - peer->msg = msg; + GNUNET_memcpy (&msg[1], + &block[1], + block->size); for (i = 0; i < MIGRATION_LIST_SIZE; i++) { if (block->target_list[i] == 0) { block->target_list[i] = ppd->pid; - GNUNET_PEER_change_rc (block->target_list[i], 1); + GNUNET_PEER_change_rc (block->target_list[i], + 1); break; } } @@ -292,13 +252,13 @@ transmit_content (struct MigrationReadyPeer *peer, { ret = GNUNET_NO; } -#if DEBUG_FS_MIGRATION - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking for transmission of %u bytes for migration\n", msize); -#endif - peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ , - GNUNET_TIME_UNIT_FOREVER_REL, msize, - &transmit_message, peer); + GNUNET_MQ_notify_sent (mrp->env, + &find_content, + mrp); + GSF_peer_transmit_ (mrp->peer, + GNUNET_NO, + 0 /* priority */, + mrp->env); return ret; } @@ -326,26 +286,32 @@ count_targets (struct MigrationReadyBlock *block) * Check if sending this block to this peer would * be a good idea. * - * @param peer target peer + * @param mrp target peer * @param block the block * @return score (>= 0: feasible, negative: infeasible) */ static long -score_content (struct MigrationReadyPeer *peer, +score_content (struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block) { unsigned int i; struct GSF_PeerPerformanceData *ppd; struct GNUNET_PeerIdentity id; + struct GNUNET_HashCode hc; uint32_t dist; - ppd = GSF_get_peer_performance_data_ (peer->peer); + ppd = GSF_get_peer_performance_data_ (mrp->peer); for (i = 0; i < MIGRATION_LIST_SIZE; i++) if (block->target_list[i] == ppd->pid) return -1; GNUNET_assert (0 != ppd->pid); - GNUNET_PEER_resolve (ppd->pid, &id); - dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey); + GNUNET_PEER_resolve (ppd->pid, + &id); + GNUNET_CRYPTO_hash (&id, + sizeof(struct GNUNET_PeerIdentity), + &hc); + dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, + &hc); /* closer distance, higher score: */ return UINT32_MAX - dist; } @@ -362,17 +328,18 @@ consider_gathering (void); /** * Find content for migration to this peer. * - * @param mrp peer to find content for + * @param cls peer to find content for */ static void -find_content (struct MigrationReadyPeer *mrp) +find_content (void *cls) { + struct MigrationReadyPeer *mrp = cls; struct MigrationReadyBlock *pos; long score; long best_score; struct MigrationReadyBlock *best; - GNUNET_assert (NULL == mrp->th); + mrp->env = NULL; best = NULL; best_score = -1; pos = mig_head; @@ -390,16 +357,12 @@ find_content (struct MigrationReadyPeer *mrp) { if (mig_size < MAX_MIGRATION_QUEUE) { -#if DEBUG_FS_MIGRATION GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for pushing, waiting for queue to fill\n"); -#endif return; /* will fill up eventually... */ } -#if DEBUG_FS_MIGRATION GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No suitable content found, purging content from full queue\n"); -#endif /* failed to find migration target AND * queue is full, purge most-forwarded * block from queue to make room for more */ @@ -419,11 +382,10 @@ find_content (struct MigrationReadyPeer *mrp) consider_gathering (); return; } -#if DEBUG_FS_MIGRATION GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Preparing to push best content to peer\n"); -#endif - transmit_content (mrp, best); + transmit_content (mrp, + best); } @@ -432,11 +394,9 @@ find_content (struct MigrationReadyPeer *mrp) * migration * * @param cls unused - * @param tc scheduler context (also unused) */ static void -gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); +gather_migration_blocks (void *cls); /** @@ -448,23 +408,34 @@ consider_gathering () { struct GNUNET_TIME_Relative delay; - if (GSF_dsh == NULL) + if (NULL == GSF_dsh) return; - if (mig_qe != NULL) + if (NULL != mig_qe) return; - if (mig_task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != mig_task) return; if (mig_size >= MAX_MIGRATION_QUEUE) return; - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); - delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); - delay = GNUNET_TIME_relative_max (delay, min_migration_delay); -#if DEBUG_FS_MIGRATION + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + mig_size); + delay = GNUNET_TIME_relative_divide (delay, + MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, + min_migration_delay); + if (GNUNET_NO == value_found) + { + /* wait at least 5s if the datastore is empty */ + delay = GNUNET_TIME_relative_max (delay, + GNUNET_TIME_relative_multiply ( + GNUNET_TIME_UNIT_SECONDS, + 5)); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling gathering task (queue size: %u)\n", mig_size); -#endif - mig_task = - GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); + "Scheduling gathering task (queue size: %u)\n", + mig_size); + mig_task = GNUNET_SCHEDULER_add_delayed (delay, + &gather_migration_blocks, + NULL); } @@ -478,30 +449,37 @@ consider_gathering () * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content + * @param replication replication-level for the content * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available */ static void -process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size, - const void *data, enum GNUNET_BLOCK_Type type, - uint32_t priority, uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, uint64_t uid) +process_migration_content (void *cls, + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + uint32_t replication, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { struct MigrationReadyBlock *mb; struct MigrationReadyPeer *pos; mig_qe = NULL; - if (key == NULL) + if (NULL == key) { -#if DEBUG_FS_MIGRATION - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n"); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No content found for migration...\n"); consider_gathering (); return; } - if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < - MIN_MIGRATION_CONTENT_LIFETIME.rel_value) + value_found = GNUNET_YES; + if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < + MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us) { /* content will expire soon, don't bother */ consider_gathering (); @@ -510,38 +488,47 @@ process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size, if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) { if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, - anonymity, expiration, uid, - &process_migration_content, NULL)) + GNUNET_FS_handle_on_demand_block (key, + size, + data, + type, + priority, + anonymity, + replication, + expiration, + uid, + &process_migration_content, + NULL)) consider_gathering (); return; } -#if DEBUG_FS_MIGRATION GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", - GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE); -#endif - mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); + GNUNET_h2s (key), + type, mig_size + 1, + MAX_MIGRATION_QUEUE); + mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size); mb->query = *key; mb->expiration = expiration; mb->size = size; mb->type = type; - memcpy (&mb[1], data, size); - GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb); + GNUNET_memcpy (&mb[1], data, size); + GNUNET_CONTAINER_DLL_insert_after (mig_head, + mig_tail, + mig_tail, + mb); mig_size++; - pos = peer_head; - while (pos != NULL) + for (pos = peer_head; NULL != pos; pos = pos->next) { - if (NULL == pos->th) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to push best content to peer %s\n", + GNUNET_i2s (GSF_connected_peer_get_identity2_ (pos->peer))); + if ((NULL == pos->env) && + (GNUNET_YES == transmit_content (pos, + mb))) { -#if DEBUG_FS_MIGRATION - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing to push best content to peer\n"); -#endif - if (GNUNET_YES == transmit_content (pos, mb)) - break; /* 'mb' was freed! */ + break; /* 'mb' was freed! */ } - pos = pos->next; } consider_gathering (); } @@ -552,29 +539,26 @@ process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size, * migration * * @param cls unused - * @param tc scheduler context (also unused) */ static void -gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +gather_migration_blocks (void *cls) { - mig_task = GNUNET_SCHEDULER_NO_TASK; + mig_task = NULL; if (mig_size >= MAX_MIGRATION_QUEUE) return; - if (GSF_dsh != NULL) - { -#if DEBUG_FS_MIGRATION - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking datastore for content for replication (queue size: %u)\n", - mig_size); -#endif - mig_qe = - GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_migration_content, NULL); - if (NULL == mig_qe) - consider_gathering (); - } + if (NULL == GSF_dsh) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking datastore for content for replication (queue size: %u)\n", + mig_size); + value_found = GNUNET_NO; + mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, + 0, + UINT_MAX, + &process_migration_content, + NULL); + if (NULL == mig_qe) + consider_gathering (); } @@ -591,10 +575,26 @@ GSF_push_start_ (struct GSF_ConnectedPeer *peer) if (GNUNET_YES != enabled) return; - mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer)); + for (mrp = peer_head; NULL != mrp; mrp = mrp->next) + if (mrp->peer == peer) + break; + if (NULL != mrp) + { + /* same peer added twice, must not happen */ + GNUNET_break (0); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding peer %s to list for pushing\n", + GNUNET_i2s (GSF_connected_peer_get_identity2_ (peer))); + + mrp = GNUNET_new (struct MigrationReadyPeer); mrp->peer = peer; find_content (mrp); - GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp); + GNUNET_CONTAINER_DLL_insert (peer_head, + peer_tail, + mrp); } @@ -609,27 +609,17 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) { struct MigrationReadyPeer *pos; - pos = peer_head; - while (pos != NULL) - { + for (pos = peer_head; NULL != pos; pos = pos->next) if (pos->peer == peer) - { - GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos); - if (NULL != pos->th) - { - GSF_peer_transmit_cancel_ (pos->th); - pos->th = NULL; - } - if (NULL != pos->msg) - { - GNUNET_free (pos->msg); - pos->msg = NULL; - } - GNUNET_free (pos); - return; - } - pos = pos->next; - } + break; + if (NULL == pos) + return; + if (NULL != pos->env) + GNUNET_MQ_send_cancel (pos->env); + GNUNET_CONTAINER_DLL_remove (peer_head, + peer_tail, + pos); + GNUNET_free (pos); } @@ -640,18 +630,22 @@ void GSF_push_init_ () { enabled = - GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); + GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, + "FS", + "CONTENT_PUSHING"); if (GNUNET_YES != enabled) return; if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", + GNUNET_CONFIGURATION_get_value_time (GSF_cfg, + "fs", + "MIN_MIGRATION_DELAY", &min_migration_delay)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _ - ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"), - "MIN_MIGRATION_DELAY", "fs"); + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, + "fs", + "MIN_MIGRATION_DELAY", + _ ("time required, content pushing disabled")); return; } consider_gathering (); @@ -664,10 +658,10 @@ GSF_push_init_ () void GSF_push_done_ () { - if (GNUNET_SCHEDULER_NO_TASK != mig_task) + if (NULL != mig_task) { GNUNET_SCHEDULER_cancel (mig_task); - mig_task = GNUNET_SCHEDULER_NO_TASK; + mig_task = NULL; } if (NULL != mig_qe) {