X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs_push.c;h=22a76f332d2c51696a0aaa0343b271da9cb6fbf2;hb=99c66999a525b2783982ffa7590c988529184482;hp=c08b5702028802d8aaaf7f8703ce666095cff174;hpb=e8f35bb025c25839a52fb502e452393831e4e6f0;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index c08b57020..22a76f332 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -32,7 +32,18 @@ /** - * How long must content remain valid for us to consider it for migration? + * Maximum number of blocks we keep in memory for migration. + */ +#define MAX_MIGRATION_QUEUE 8 + +/** + * Blocks are at most migrated to this number of peers + * plus one, each time they are fetched from the database. + */ +#define MIGRATION_LIST_SIZE 2 + +/** + * How long must content remain valid for us to consider it for migration? * If content will expire too soon, there is clearly no point in pushing * it to other peers. This value gives the threshold for migration. Note * that if this value is increased, the migration testcase may need to be @@ -63,7 +74,7 @@ struct MigrationReadyBlock GNUNET_HashCode query; /** - * When does this block expire? + * When does this block expire? */ struct GNUNET_TIME_Absolute expiration; @@ -110,7 +121,7 @@ struct MigrationReadyPeer * Handle to peer. */ struct GSF_ConnectedPeer *peer; - + /** * Handle for current transmission request, * or NULL for none. @@ -179,11 +190,8 @@ static int enabled; static void delete_migration_block (struct MigrationReadyBlock *mb) { - GNUNET_CONTAINER_DLL_remove (mig_head, - mig_tail, - mb); - GNUNET_PEER_decrement_rcs (mb->target_list, - MIGRATION_LIST_SIZE); + GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); + GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); mig_size--; GNUNET_free (mb); } @@ -191,7 +199,7 @@ delete_migration_block (struct MigrationReadyBlock *mb) /** * Find content for migration to this peer. - */ + */ static void find_content (struct MigrationReadyPeer *mrp); @@ -206,9 +214,7 @@ find_content (struct MigrationReadyPeer *mrp); * @return number of bytes copied to 'buf', can be 0 (without indicating an error) */ static size_t -transmit_message (void *cls, - size_t buf_size, - void *buf) +transmit_message (void *cls, size_t buf_size, void *buf) { struct MigrationReadyPeer *peer = cls; struct PutMessage *msg; @@ -218,23 +224,18 @@ transmit_message (void *cls, msg = peer->msg; peer->msg = NULL; if (buf == NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to migrate content to another peer (disconnect)\n"); -#endif - GNUNET_free (msg); - return 0; - } + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to migrate content to another peer (disconnect)\n"); + GNUNET_free (msg); + return 0; + } msize = ntohs (msg->header.size); GNUNET_assert (msize <= buf_size); memcpy (buf, msg, msize); GNUNET_free (msg); -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pushing %u bytes to another peer\n", - msize); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to another peer\n", + msize); find_content (peer); return msize; } @@ -249,11 +250,11 @@ transmit_message (void *cls, */ static int transmit_content (struct MigrationReadyPeer *peer, - struct MigrationReadyBlock *block) + struct MigrationReadyBlock *block) { size_t msize; struct PutMessage *msg; - unsigned int i; + unsigned int i; struct GSF_PeerPerformanceData *ppd; int ret; @@ -265,40 +266,31 @@ transmit_content (struct MigrationReadyPeer *peer, msg->header.size = htons (msize); msg->type = htonl (block->type); msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); - memcpy (&msg[1], - &block[1], - block->size); + memcpy (&msg[1], &block[1], block->size); peer->msg = msg; - for (i=0;itarget_list[i] == 0) { - if (block->target_list[i] == 0) - { - block->target_list[i] = ppd->pid; - GNUNET_PEER_change_rc (block->target_list[i], 1); - break; - } + block->target_list[i] = ppd->pid; + GNUNET_PEER_change_rc (block->target_list[i], 1); + break; } + } if (MIGRATION_LIST_SIZE == i) - { - delete_migration_block (block); - ret = GNUNET_YES; - } + { + delete_migration_block (block); + ret = GNUNET_YES; + } else - { - ret = GNUNET_NO; - } -#if DEBUG_FS + { + ret = GNUNET_NO; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking for transmission of %u bytes for migration\n", - msize); -#endif - peer->th = GSF_peer_transmit_ (peer->peer, - GNUNET_NO, - 0 /* priority */, - GNUNET_TIME_UNIT_FOREVER_REL, - msize, - &transmit_message, - peer); + "Asking for transmission of %u bytes for migration\n", msize); + peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ , + GNUNET_TIME_UNIT_FOREVER_REL, msize, + &transmit_message, peer); return ret; } @@ -315,7 +307,7 @@ count_targets (struct MigrationReadyBlock *block) { unsigned int i; - for (i=0;itarget_list[i] == 0) return i; return i; @@ -324,7 +316,7 @@ count_targets (struct MigrationReadyBlock *block) /** * Check if sending this block to this peer would - * be a good idea. + * be a good idea. * * @param peer target peer * @param block the block @@ -332,7 +324,7 @@ count_targets (struct MigrationReadyBlock *block) */ static long score_content (struct MigrationReadyPeer *peer, - struct MigrationReadyBlock *block) + struct MigrationReadyBlock *block) { unsigned int i; struct GSF_PeerPerformanceData *ppd; @@ -340,13 +332,12 @@ score_content (struct MigrationReadyPeer *peer, uint32_t dist; ppd = GSF_get_peer_performance_data_ (peer->peer); - for (i=0;itarget_list[i] == ppd->pid) return -1; - GNUNET_PEER_resolve (ppd->pid, - &id); - dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, - &id.hashPubKey); + GNUNET_assert (0 != ppd->pid); + GNUNET_PEER_resolve (ppd->pid, &id); + dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &id.hashPubKey); /* closer distance, higher score: */ return UINT32_MAX - dist; } @@ -364,7 +355,7 @@ consider_gathering (void); * Find content for migration to this peer. * * @param mrp peer to find content for - */ + */ static void find_content (struct MigrationReadyPeer *mrp) { @@ -378,52 +369,46 @@ find_content (struct MigrationReadyPeer *mrp) best_score = -1; pos = mig_head; while (NULL != pos) + { + score = score_content (mrp, pos); + if (score > best_score) { - score = score_content (mrp, pos); - if (score > best_score) - { - best_score = score; - best = pos; - } - pos = pos->next; + best_score = score; + best = pos; } - if (NULL == best) + pos = pos->next; + } + if (NULL == best) + { + if (mig_size < MAX_MIGRATION_QUEUE) { - if (mig_size < MAX_MIGRATION_QUEUE) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No content found for pushing, waiting for queue to fill\n"); -#endif - return; /* will fill up eventually... */ - } -#if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No suitable content found, purging content from full queue\n"); -#endif - /* failed to find migration target AND - queue is full, purge most-forwarded - block from queue to make room for more */ - pos = mig_head; - while (NULL != pos) - { - score = count_targets (pos); - if (score >= best_score) - { - best_score = score; - best = pos; - } - pos = pos->next; - } - GNUNET_assert (NULL != best); - delete_migration_block (best); - consider_gathering (); - return; + "No content found for pushing, waiting for queue to fill\n"); + return; /* will fill up eventually... */ } -#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No suitable content found, purging content from full queue\n"); + /* failed to find migration target AND + * queue is full, purge most-forwarded + * block from queue to make room for more */ + pos = mig_head; + while (NULL != pos) + { + score = count_targets (pos); + if (score >= best_score) + { + best_score = score; + best = pos; + } + pos = pos->next; + } + GNUNET_assert (NULL != best); + delete_migration_block (best); + consider_gathering (); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing to push best content to peer\n"); -#endif + "Preparing to push best content to peer\n"); transmit_content (mrp, best); } @@ -431,13 +416,13 @@ find_content (struct MigrationReadyPeer *mrp) /** * Task that is run periodically to obtain blocks for content * migration - * + * * @param cls unused * @param tc scheduler context (also unused) */ static void gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc); + const struct GNUNET_SCHEDULER_TaskContext *tc); /** @@ -455,17 +440,15 @@ consider_gathering () return; if (mig_task != GNUNET_SCHEDULER_NO_TASK) return; - if (mig_size >= MAX_MIGRATION_QUEUE) + if (mig_size >= MAX_MIGRATION_QUEUE) return; - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - mig_size); - delay = GNUNET_TIME_relative_divide (delay, - MAX_MIGRATION_QUEUE); - delay = GNUNET_TIME_relative_max (delay, - min_migration_delay); - mig_task = GNUNET_SCHEDULER_add_delayed (delay, - &gather_migration_blocks, - NULL); + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); + delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, min_migration_delay); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling gathering task (queue size: %u)\n", mig_size); + mig_task = + GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); } @@ -484,104 +467,90 @@ consider_gathering () * maybe 0 if no unique identifier is available */ static void -process_migration_content (void *cls, - const GNUNET_HashCode * key, - size_t size, - const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute - expiration, uint64_t uid) +process_migration_content (void *cls, const GNUNET_HashCode * key, size_t size, + const void *data, enum GNUNET_BLOCK_Type type, + uint32_t priority, uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, uint64_t uid) { struct MigrationReadyBlock *mb; struct MigrationReadyPeer *pos; - + + mig_qe = NULL; if (key == NULL) - { - mig_qe = NULL; - consider_gathering (); - return; - } - if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n"); + consider_gathering (); + return; + } + if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < MIN_MIGRATION_CONTENT_LIFETIME.rel_value) - { - /* content will expire soon, don't bother */ - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); - return; - } + { + /* content will expire soon, don't bother */ + consider_gathering (); + return; + } if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) - { - if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, size, data, - type, priority, anonymity, - expiration, uid, - &process_migration_content, - NULL)) - { - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); - } - return; - } -#if DEBUG_FS + { + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, + anonymity, expiration, uid, + &process_migration_content, NULL)) + consider_gathering (); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", - GNUNET_h2s (key), - type, - mig_size + 1, - MIGRATION_LIST_SIZE); -#endif + "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", + GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE); mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); mb->query = *key; mb->expiration = expiration; mb->size = size; mb->type = type; memcpy (&mb[1], data, size); - GNUNET_CONTAINER_DLL_insert_after (mig_head, - mig_tail, - mig_tail, - mb); + GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb); mig_size++; pos = peer_head; while (pos != NULL) + { + if (NULL == pos->th) { - if (NULL == pos->th) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing to push best content to peer\n"); -#endif - if (GNUNET_YES == transmit_content (pos, mb)) - break; /* 'mb' was freed! */ - } - pos = pos->next; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to push best content to peer\n"); + if (GNUNET_YES == transmit_content (pos, mb)) + break; /* 'mb' was freed! */ } - GNUNET_DATASTORE_iterate_get_next (GSF_dsh); + pos = pos->next; + } + consider_gathering (); } /** * Task that is run periodically to obtain blocks for content * migration - * + * * @param cls unused * @param tc scheduler context (also unused) */ static void gather_migration_blocks (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { mig_task = GNUNET_SCHEDULER_NO_TASK; - if (mig_size >= MAX_MIGRATION_QUEUE) + if (mig_size >= MAX_MIGRATION_QUEUE) return; if (GSF_dsh != NULL) - { - mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, - 0, UINT_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_migration_content, NULL); - GNUNET_assert (mig_qe != NULL); - } + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asking datastore for content for replication (queue size: %u)\n", + mig_size); + mig_qe = + GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_migration_content, NULL); + if (NULL == mig_qe) + consider_gathering (); + } } @@ -601,9 +570,7 @@ GSF_push_start_ (struct GSF_ConnectedPeer *peer) mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer)); mrp->peer = peer; find_content (mrp); - GNUNET_CONTAINER_DLL_insert (peer_head, - peer_tail, - mrp); + GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp); } @@ -620,19 +587,25 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) pos = peer_head; while (pos != NULL) + { + if (pos->peer == peer) { - if (pos->peer == peer) - { - GNUNET_CONTAINER_DLL_remove (peer_head, - peer_tail, - pos); - if (NULL != pos->th) - GSF_peer_transmit_cancel_ (pos->th); - GNUNET_free (pos); - return; - } - pos = pos->next; + GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos); + if (NULL != pos->th) + { + GSF_peer_transmit_cancel_ (pos->th); + pos->th = NULL; + } + if (NULL != pos->msg) + { + GNUNET_free (pos->msg); + pos->msg = NULL; + } + GNUNET_free (pos); + return; } + pos = pos->next; + } } @@ -642,24 +615,21 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer) void GSF_push_init_ () { - enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, - "FS", - "CONTENT_PUSHING"); + enabled = + GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); if (GNUNET_YES != enabled) return; - - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_time (GSF_cfg, - "fs", - "MIN_MIGRATION_DELAY", - &min_migration_delay)) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"), - "MIN_MIGRATION_DELAY", - "fs"); - return; - } + + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", + &min_migration_delay)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _ + ("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"), + "MIN_MIGRATION_DELAY", "fs"); + return; + } consider_gathering (); } @@ -671,15 +641,15 @@ void GSF_push_done_ () { if (GNUNET_SCHEDULER_NO_TASK != mig_task) - { - GNUNET_SCHEDULER_cancel (mig_task); - mig_task = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (mig_task); + mig_task = GNUNET_SCHEDULER_NO_TASK; + } if (NULL != mig_qe) - { - GNUNET_DATASTORE_cancel (mig_qe); - mig_qe = NULL; - } + { + GNUNET_DATASTORE_cancel (mig_qe); + mig_qe = NULL; + } while (NULL != mig_head) delete_migration_block (mig_head); GNUNET_assert (0 == mig_size);