X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_push.c;h=71a8e81e4f0293ae5d1433121e4e847a5e2e3e8a;hb=3140154d46212e08e0d73ed891a66213a6813075;hp=9f515e2ee10129c874d3a1d31f8f0aac56dd6e53;hpb=f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 9f515e2ee..71a8e81e4 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2011 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -25,10 +25,32 @@ * @author Christian Grothoff */ #include "platform.h" +#include "gnunet-service-fs.h" +#include "gnunet-service-fs_cp.h" +#include "gnunet-service-fs_indexing.h" #include "gnunet-service-fs_push.h" -/* FIXME: below are only old code fragments to use... */ +/** + * Maximum number of blocks we keep in memory for migration. + */ +#define MAX_MIGRATION_QUEUE 8 + +/** + * Blocks are at most migrated to this number of peers + * plus one, each time they are fetched from the database. + */ +#define MIGRATION_LIST_SIZE 2 + +/** + * How long must content remain valid for us to consider it for migration? + * If content will expire too soon, there is clearly no point in pushing + * it to other peers. This value gives the threshold for migration. Note + * that if this value is increased, the migration testcase may need to be + * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c). + */ +#define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30) + /** * Block that is ready for migration to other peers. Actual data is at the end of the block. @@ -49,15 +71,15 @@ struct MigrationReadyBlock /** * Query for the block. */ - GNUNET_HashCode query; + struct GNUNET_HashCode query; /** - * When does this block expire? + * When does this block expire? */ struct GNUNET_TIME_Absolute expiration; /** - * Peers we would consider forwarding this + * Peers we already forwarded this * block to. Zero for empty entries. */ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; @@ -79,6 +101,40 @@ struct MigrationReadyBlock }; +/** + * Information about a peer waiting for + * migratable data. + */ +struct MigrationReadyPeer +{ + /** + * This is a doubly-linked list. + */ + struct MigrationReadyPeer *next; + + /** + * This is a doubly-linked list. + */ + struct MigrationReadyPeer *prev; + + /** + * Handle to peer. + */ + struct GSF_ConnectedPeer *peer; + + /** + * Handle for current transmission request, + * or NULL for none. + */ + struct GSF_PeerTransmitHandle *th; + + /** + * Message we are trying to push right now (or NULL) + */ + struct PutMessage *msg; +}; + + /** * Head of linked list of blocks that can be migrated. */ @@ -89,6 +145,16 @@ static struct MigrationReadyBlock *mig_head; */ static struct MigrationReadyBlock *mig_tail; +/** + * Head of linked list of peers. + */ +static struct MigrationReadyPeer *peer_head; + +/** + * Tail of linked list of peers. + */ +static struct MigrationReadyPeer *peer_tail; + /** * Request to datastore for migration (or NULL). */ @@ -97,7 +163,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe; /** * ID of task that collects blocks for migration. */ -static GNUNET_SCHEDULER_TaskIdentifier mig_task; +static struct GNUNET_SCHEDULER_Task * mig_task; /** * What is the maximum frequency at which we are allowed to @@ -106,14 +172,19 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task; static struct GNUNET_TIME_Relative min_migration_delay; /** - * Are we allowed to push out content from this peer. + * Size of the doubly-linked list of migration blocks. */ -static int active_from_migration; +static unsigned int mig_size; /** - * Size of the doubly-linked list of migration blocks. + * Is this module enabled? */ -static unsigned int mig_size; +static int enabled; + +/** + * Did we find anything in the datastore? + */ +static int value_found; /** @@ -124,160 +195,246 @@ static unsigned int mig_size; static void delete_migration_block (struct MigrationReadyBlock *mb) { - GNUNET_CONTAINER_DLL_remove (mig_head, - mig_tail, - mb); - GNUNET_PEER_decrement_rcs (mb->target_list, - MIGRATION_LIST_SIZE); + GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); + GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); mig_size--; GNUNET_free (mb); } /** - * Compare the distance of two peers to a key. + * Find content for migration to this peer. + */ +static void +find_content (struct MigrationReadyPeer *mrp); + + +/** + * Transmit the message currently scheduled for transmission. * - * @param key key - * @param p1 first peer - * @param p2 second peer - * @return GNUNET_YES if P1 is closer to key than P2 + * @param cls the `struct MigrationReadyPeer` + * @param buf_size number of bytes available in @a buf + * @param buf where to copy the message, NULL on error (peer disconnect) + * @return number of bytes copied to @a buf, can be 0 (without indicating an error) */ -static int -is_closer (const GNUNET_HashCode *key, - const struct GNUNET_PeerIdentity *p1, - const struct GNUNET_PeerIdentity *p2) +static size_t +transmit_message (void *cls, + size_t buf_size, + void *buf) { - return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, - &p2->hashPubKey, - key); + struct MigrationReadyPeer *peer = cls; + struct PutMessage *msg; + uint16_t msize; + + peer->th = NULL; + msg = peer->msg; + peer->msg = NULL; + if (NULL == buf) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to migrate content to another peer (disconnect)\n"); + GNUNET_free (msg); + return 0; + } + msize = ntohs (msg->header.size); + GNUNET_assert (msize <= buf_size); + memcpy (buf, msg, msize); + GNUNET_free (msg); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Pushing %u bytes to %s\n", + msize, + GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); + find_content (peer); + return msize; } /** - * Consider migrating content to a given peer. + * Send the given block to the given peer. * - * @param cls 'struct MigrationReadyBlock*' to select - * targets for (or NULL for none) - * @param key ID of the peer - * @param value 'struct ConnectedPeer' of the peer - * @return GNUNET_YES (always continue iteration) + * @param peer target peer + * @param block the block + * @return #GNUNET_YES if the block was deleted (!) */ static int -consider_migration (void *cls, - const GNUNET_HashCode *key, - void *value) +transmit_content (struct MigrationReadyPeer *peer, + struct MigrationReadyBlock *block) { - struct MigrationReadyBlock *mb = cls; - struct ConnectedPeer *cp = value; - struct MigrationReadyBlock *pos; - struct GNUNET_PeerIdentity cppid; - struct GNUNET_PeerIdentity otherpid; - struct GNUNET_PeerIdentity worstpid; size_t msize; + struct PutMessage *msg; unsigned int i; - unsigned int repl; - - /* consider 'cp' as a migration target for mb */ - if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) - return GNUNET_YES; /* peer has requested no migration! */ - if (mb != NULL) + struct GSF_PeerPerformanceData *ppd; + int ret; + + ppd = GSF_get_peer_performance_data_ (peer->peer); + GNUNET_assert (NULL == peer->th); + msize = sizeof (struct PutMessage) + block->size; + msg = GNUNET_malloc (msize); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + msg->header.size = htons (msize); + msg->type = htonl (block->type); + msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); + memcpy (&msg[1], &block[1], block->size); + peer->msg = msg; + for (i = 0; i < MIGRATION_LIST_SIZE; i++) + { + if (block->target_list[i] == 0) { - GNUNET_PEER_resolve (cp->pid, - &cppid); - repl = MIGRATION_LIST_SIZE; - for (i=0;itarget_list[i] == 0) - { - mb->target_list[i] = cp->pid; - GNUNET_PEER_change_rc (mb->target_list[i], 1); - repl = MIGRATION_LIST_SIZE; - break; - } - GNUNET_PEER_resolve (mb->target_list[i], - &otherpid); - if ( (repl == MIGRATION_LIST_SIZE) && - is_closer (&mb->query, - &cppid, - &otherpid)) - { - repl = i; - worstpid = otherpid; - } - else if ( (repl != MIGRATION_LIST_SIZE) && - (is_closer (&mb->query, - &worstpid, - &otherpid) ) ) - { - repl = i; - worstpid = otherpid; - } - } - if (repl != MIGRATION_LIST_SIZE) - { - GNUNET_PEER_change_rc (mb->target_list[repl], -1); - mb->target_list[repl] = cp->pid; - GNUNET_PEER_change_rc (mb->target_list[repl], 1); - } + block->target_list[i] = ppd->pid; + GNUNET_PEER_change_rc (block->target_list[i], 1); + break; } + } + if (MIGRATION_LIST_SIZE == i) + { + delete_migration_block (block); + ret = GNUNET_YES; + } + else + { + ret = GNUNET_NO; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking for transmission of %u bytes to %s for migration\n", + (unsigned int) msize, + GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); + peer->th = GSF_peer_transmit_ (peer->peer, + GNUNET_NO, 0 /* priority */ , + GNUNET_TIME_UNIT_FOREVER_REL, + msize, + &transmit_message, peer); + return ret; +} + + +/** + * Count the number of peers this block has + * already been forwarded to. + * + * @param block the block + * @return number of times block was forwarded + */ +static unsigned int +count_targets (struct MigrationReadyBlock *block) +{ + unsigned int i; + + for (i = 0; i < MIGRATION_LIST_SIZE; i++) + if (block->target_list[i] == 0) + return i; + return i; +} + + +/** + * Check if sending this block to this peer would + * be a good idea. + * + * @param peer target peer + * @param block the block + * @return score (>= 0: feasible, negative: infeasible) + */ +static long +score_content (struct MigrationReadyPeer *peer, + struct MigrationReadyBlock *block) +{ + unsigned int i; + struct GSF_PeerPerformanceData *ppd; + struct GNUNET_PeerIdentity id; + struct GNUNET_HashCode hc; + uint32_t dist; + + ppd = GSF_get_peer_performance_data_ (peer->peer); + for (i = 0; i < MIGRATION_LIST_SIZE; i++) + if (block->target_list[i] == ppd->pid) + return -1; + GNUNET_assert (0 != ppd->pid); + GNUNET_PEER_resolve (ppd->pid, &id); + GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); + dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); + /* closer distance, higher score: */ + return UINT32_MAX - dist; +} + + +/** + * If the migration task is not currently running, consider + * (re)scheduling it with the appropriate delay. + */ +static void +consider_gathering (void); - /* consider scheduling transmission to cp for content migration */ - if (cp->cth != NULL) - return GNUNET_YES; - msize = 0; + +/** + * Find content for migration to this peer. + * + * @param mrp peer to find content for + */ +static void +find_content (struct MigrationReadyPeer *mrp) +{ + struct MigrationReadyBlock *pos; + long score; + long best_score; + struct MigrationReadyBlock *best; + + GNUNET_assert (NULL == mrp->th); + best = NULL; + best_score = -1; pos = mig_head; - while (pos != NULL) + while (NULL != pos) + { + score = score_content (mrp, pos); + if (score > best_score) { - for (i=0;ipid == pos->target_list[i]) - { - if (msize == 0) - msize = pos->size; - else - msize = GNUNET_MIN (msize, - pos->size); - break; - } - } - pos = pos->next; + best_score = score; + best = pos; } - if (msize == 0) - return GNUNET_YES; /* no content available */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Trying to migrate at least %u bytes to peer `%s'\n", - msize, - GNUNET_h2s (key)); -#endif - if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) + pos = pos->next; + } + if (NULL == best) + { + if (mig_size < MAX_MIGRATION_QUEUE) { - GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); - cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No content found for pushing, waiting for queue to fill\n"); + return; /* will fill up eventually... */ } - cp->cth - = GNUNET_CORE_notify_transmit_ready (core, - 0, GNUNET_TIME_UNIT_FOREVER_REL, - (const struct GNUNET_PeerIdentity*) key, - msize + sizeof (struct PutMessage), - &transmit_to_peer, - cp); - return GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No suitable content found, purging content from full queue\n"); + /* failed to find migration target AND + * queue is full, purge most-forwarded + * block from queue to make room for more */ + pos = mig_head; + while (NULL != pos) + { + score = count_targets (pos); + if (score >= best_score) + { + best_score = score; + best = pos; + } + pos = pos->next; + } + GNUNET_assert (NULL != best); + delete_migration_block (best); + consider_gathering (); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to push best content to peer\n"); + transmit_content (mrp, best); } /** * Task that is run periodically to obtain blocks for content * migration - * + * * @param cls unused - * @param tc scheduler context (also unused) */ static void -gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); - - +gather_migration_blocks (void *cls); /** @@ -285,30 +442,36 @@ gather_migration_blocks (void *cls, * (re)scheduling it with the appropriate delay. */ static void -consider_migration_gathering () +consider_gathering () { struct GNUNET_TIME_Relative delay; - if (dsh == NULL) + if (NULL == GSF_dsh) + return; + if (NULL != mig_qe) return; - if (mig_qe != NULL) + if (NULL != mig_task) return; - if (mig_task != GNUNET_SCHEDULER_NO_TASK) + if (mig_size >= MAX_MIGRATION_QUEUE) return; - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - mig_size); - delay = GNUNET_TIME_relative_divide (delay, - MAX_MIGRATION_QUEUE); - delay = GNUNET_TIME_relative_max (delay, - min_migration_delay); - mig_task = GNUNET_SCHEDULER_add_delayed (delay, - &gather_migration_blocks, - NULL); + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); + delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, min_migration_delay); + if (GNUNET_NO == value_found) + { + /* wait at least 5s if the datastore is empty */ + delay = GNUNET_TIME_relative_max (delay, + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 5)); + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling gathering task (queue size: %u)\n", + mig_size); + mig_task = + GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); } - - /** * Process content offered for migration. * @@ -325,50 +488,54 @@ consider_migration_gathering () */ static void process_migration_content (void *cls, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) { struct MigrationReadyBlock *mb; - - if (key == NULL) - { - mig_qe = NULL; - if (mig_size < MAX_MIGRATION_QUEUE) - consider_migration_gathering (); - return; - } - if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < - MIN_MIGRATION_CONTENT_LIFETIME.rel_value) - { - /* content will expire soon, don't bother */ - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; - } + struct MigrationReadyPeer *pos; + + mig_qe = NULL; + if (NULL == key) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No content found for migration...\n"); + consider_gathering (); + return; + } + value_found = GNUNET_YES; + if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < + MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us) + { + /* content will expire soon, don't bother */ + consider_gathering (); + return; + } if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - { - if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, size, data, - type, priority, anonymity, - expiration, uid, - &process_migration_content, - NULL)) - { - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - } - return; - } -#if DEBUG_FS + { + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, + size, + data, + type, + priority, + anonymity, + expiration, + uid, + &process_migration_content, NULL)) + consider_gathering (); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retrieved block `%s' of type %u for migration\n", - GNUNET_h2s (key), - type); -#endif + "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", + GNUNET_h2s (key), + type, mig_size + 1, + MAX_MIGRATION_QUEUE); mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); mb->query = *key; mb->expiration = expiration; @@ -376,100 +543,163 @@ process_migration_content (void *cls, mb->type = type; memcpy (&mb[1], data, size); GNUNET_CONTAINER_DLL_insert_after (mig_head, - mig_tail, - mig_tail, - mb); + mig_tail, + mig_tail, + mb); mig_size++; - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &consider_migration, - mb); - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + for (pos = peer_head; NULL != pos; pos = pos->next) + { + if (NULL == pos->th) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to push best content to peer %s\n", + GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); + if (GNUNET_YES == transmit_content (pos, mb)) + break; /* 'mb' was freed! */ + } + } + consider_gathering (); } - /** * Task that is run periodically to obtain blocks for content * migration - * + * * @param cls unused - * @param tc scheduler context (also unused) */ static void -gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +gather_migration_blocks (void *cls) { - mig_task = GNUNET_SCHEDULER_NO_TASK; - if (dsh != NULL) - { - mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_migration_content, NULL); - GNUNET_assert (mig_qe != NULL); - } + mig_task = NULL; + if (mig_size >= MAX_MIGRATION_QUEUE) + return; + if (NULL == GSF_dsh) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking datastore for content for replication (queue size: %u)\n", + mig_size); + value_found = GNUNET_NO; + mig_qe = + GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, + &process_migration_content, NULL); + if (NULL == mig_qe) + consider_gathering (); +} + + +/** + * A peer connected to us. Start pushing content + * to this peer. + * + * @param peer handle for the peer that connected + */ +void +GSF_push_start_ (struct GSF_ConnectedPeer *peer) +{ + struct MigrationReadyPeer *mrp; + + if (GNUNET_YES != enabled) + return; + for (mrp = peer_head; NULL != mrp; mrp = mrp->next) + if (mrp->peer == peer) + break; + if (NULL != mrp) + { + /* same peer added twice, must not happen */ + GNUNET_break (0); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding peer %s to list for pushing\n", + GNUNET_i2s (GSF_connected_peer_get_identity2_(peer))); + + mrp = GNUNET_new (struct MigrationReadyPeer); + mrp->peer = peer; + find_content (mrp); + GNUNET_CONTAINER_DLL_insert (peer_head, + peer_tail, + mrp); } +/** + * A peer disconnected from us. Stop pushing content + * to this peer. + * + * @param peer handle for the peer that disconnected + */ +void +GSF_push_stop_ (struct GSF_ConnectedPeer *peer) +{ + struct MigrationReadyPeer *pos; + + for (pos = peer_head; NULL != pos; pos = pos->next) + if (pos->peer == peer) + break; + if (NULL == pos) + return; + GNUNET_CONTAINER_DLL_remove (peer_head, + peer_tail, + pos); + if (NULL != pos->th) + { + GSF_peer_transmit_cancel_ (pos->th); + pos->th = NULL; + } + if (NULL != pos->msg) + { + GNUNET_free (pos->msg); + pos->msg = NULL; + } + GNUNET_free (pos); +} + -size_t -API_ (void *cls, - size_t size, void *buf) +/** + * Setup the module. + */ +void +GSF_push_init_ () { - next = mig_head; - while (NULL != (mb = next)) - { - next = mb->next; - for (i=0;ipid == mb->target_list[i]) && - (mb->size + sizeof (migm) <= size) ) - { - GNUNET_PEER_change_rc (mb->target_list[i], -1); - mb->target_list[i] = 0; - mb->used_targets++; - memset (&migm, 0, sizeof (migm)); - migm.header.size = htons (sizeof (migm) + mb->size); - migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - migm.type = htonl (mb->type); - migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); - memcpy (&cbuf[msize], &migm, sizeof (migm)); - msize += sizeof (migm); - size -= sizeof (migm); - memcpy (&cbuf[msize], &mb[1], mb->size); - msize += mb->size; - size -= mb->size; -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pushing migration block `%s' (%u bytes) to `%s'\n", - GNUNET_h2s (&mb->query), - (unsigned int) mb->size, - GNUNET_i2s (&pid)); -#endif - break; - } - else - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", - GNUNET_h2s (&mb->query), - (unsigned int) mb->size, - GNUNET_i2s (&pid)); -#endif - } - } - if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || - (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) - { - delete_migration_block (mb); - consider_migration_gathering (); - } - } - consider_migration (NULL, - &pid.hashPubKey, - cp); + enabled = + GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); + if (GNUNET_YES != enabled) + return; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", + &min_migration_delay)) + { + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, + "fs", "MIN_MIGRATION_DELAY", + _("time required, content pushing disabled")); + return; + } + consider_gathering (); } +/** + * Shutdown the module. + */ +void +GSF_push_done_ () +{ + if (NULL != mig_task) + { + GNUNET_SCHEDULER_cancel (mig_task); + mig_task = NULL; + } + if (NULL != mig_qe) + { + GNUNET_DATASTORE_cancel (mig_qe); + mig_qe = NULL; + } + while (NULL != mig_head) + delete_migration_block (mig_head); + GNUNET_assert (0 == mig_size); +} +/* end of gnunet-service-fs_push.c */