From: Christian Grothoff Date: Thu, 31 Mar 2011 15:35:00 +0000 (+0000) Subject: fix migration support X-Git-Tag: initial-import-from-subversion-38251~18844 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=dfe8a19d9d0aebc8711992a99628732dbe674621;p=oweals%2Fgnunet.git fix migration support --- diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 3298a5216..702442bb2 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -509,6 +509,7 @@ revive_migration (void *cls, * Get a handle for a connected peer. * * @param peer peer's identity + * @return NULL if the peer is not currently connected */ struct GSF_ConnectedPeer * GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer) @@ -548,6 +549,10 @@ GSF_handle_p2p_migration_stop_ (void *cls, return GNUNET_OK; } bt = GNUNET_TIME_relative_ntoh (msm->duration); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Migration of content to peer `%s' blocked for %llu ms\n"), + GNUNET_i2s (other), + (unsigned long long) bt.rel_value); cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK) { diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 7b8cf40ab..2925dd8ae 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -110,6 +110,10 @@ struct GSF_PeerPerformanceData */ unsigned int pending_replies; + /** + * How many of the last blocks from migration were duplicates? + */ + unsigned int migration_duplication; }; @@ -175,6 +179,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, * Get a handle for a connected peer. * * @param peer peer's identity + * @return NULL if this peer is not currently connected */ struct GSF_ConnectedPeer * GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer); diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index ed156daf5..f6b31c4e2 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -759,6 +759,30 @@ process_reply (void *cls, } +/** + * Context for the 'put_migration_continuation'. + */ +struct PutMigrationContext +{ + + /** + * Start time for the operation. + */ + struct GNUNET_TIME_Absolute start; + + /** + * Request origin. + */ + struct GNUNET_PeerIdentity origin; + + /** + * GNUNET_YES if we had a matching request for this block, + * GNUNET_NO if not. + */ + int requested; +}; + + /** * Continuation called to notify client about result of the * operation. @@ -772,11 +796,37 @@ put_migration_continuation (void *cls, int success, const char *msg) { - struct GNUNET_TIME_Absolute *start = cls; + struct PutMigrationContext *pmc = cls; struct GNUNET_TIME_Relative delay; - - delay = GNUNET_TIME_absolute_get_duration (*start); - GNUNET_free (start); + struct GNUNET_TIME_Relative block_time; + struct GSF_ConnectedPeer *cp; + struct GSF_PeerPerformanceData *ppd; + + delay = GNUNET_TIME_absolute_get_duration (pmc->start); + cp = GSF_peer_get_ (&pmc->origin); + if ( (GNUNET_OK != success) && + (GNUNET_NO == pmc->requested) ) + { + /* block migration for a bit... */ + if (NULL != cp) + { + ppd = GSF_get_peer_performance_data_ (cp); + ppd->migration_duplication++; + block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 5 * ppd->migration_duplication + + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5)); + GSF_block_peer_migration_ (cp, block_time); + } + } + else + { + if (NULL != cp) + { + ppd = GSF_get_peer_performance_data_ (cp); + ppd->migration_duplication = 0; /* reset counter */ + } + } + GNUNET_free (pmc); /* FIXME: should we really update the load value on failure? */ GNUNET_LOAD_update (datastore_put_load, delay.rel_value); @@ -842,7 +892,7 @@ handle_dht_reply (void *cls, { struct GSF_PendingRequest *pr = cls; struct ProcessReplyClosure prq; - struct GNUNET_TIME_Absolute *start; + struct PutMigrationContext *pmc; memset (&prq, 0, sizeof (prq)); prq.data = data; @@ -859,8 +909,9 @@ handle_dht_reply (void *cls, GNUNET_h2s (key), prq.priority); #endif - start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); - *start = GNUNET_TIME_absolute_get (); + pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); + pmc->start = GNUNET_TIME_absolute_get (); + pmc->requested = GNUNET_YES; GNUNET_DATASTORE_put (GSF_dsh, 0, key, size, data, type, prq.priority, 1 /* anonymity */, @@ -868,7 +919,7 @@ handle_dht_reply (void *cls, 1 + prq.priority, MAX_DATASTORE_QUEUE, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, - start); + pmc); } } @@ -1124,7 +1175,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, struct ProcessReplyClosure prq; struct GNUNET_TIME_Relative block_time; double putl; - struct GNUNET_TIME_Absolute *start; + struct PutMigrationContext *pmc; msize = ntohs (message->size); if (msize < sizeof (struct PutMessage)) @@ -1178,8 +1229,11 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, GNUNET_h2s (&query), prq.priority); #endif - start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); - *start = GNUNET_TIME_absolute_get (); + pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); + pmc->start = GNUNET_TIME_absolute_get (); + pmc->requested = prq.request_found; + GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, + &pmc->origin); GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, prq.priority, 1 /* anonymity */, @@ -1187,7 +1241,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, 1 + prq.priority, MAX_DATASTORE_QUEUE, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, - start); + pmc); } else { @@ -1232,6 +1286,9 @@ GSF_pending_request_init_ () _("Configuration fails to specify `%s', assuming default value."), "MAX_PENDING_REQUESTS"); } + active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, + "FS", + "CONTENT_CACHING"); datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index b96ee3d1c..38d916832 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -525,9 +525,11 @@ process_migration_content (void *cls, } #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retrieved block `%s' of type %u for migration\n", + "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", GNUNET_h2s (key), - type); + type, + mig_size + 1, + MIGRATION_LIST_SIZE); #endif mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); mb->query = *key; @@ -570,6 +572,8 @@ gather_migration_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { mig_task = GNUNET_SCHEDULER_NO_TASK; + if (mig_size >= MAX_MIGRATION_QUEUE) + return; if (GSF_dsh != NULL) { mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh,