From: Christian Grothoff Date: Sat, 15 May 2010 19:07:15 +0000 (+0000) Subject: towards migration X-Git-Tag: initial-import-from-subversion-38251~21692 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=0e255e78a2f74e70f749431d1ab97198c572e257;p=oweals%2Fgnunet.git towards migration --- diff --git a/src/fs/fs.h b/src/fs/fs.h index 9901cd173..3277ea340 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -36,6 +36,11 @@ */ #define MAX_DATASTORE_QUEUE 16 +/** + * Maximum number of blocks we keep in memory for migration. + */ +#define MAX_MIGRATION_QUEUE 32 + /** * Size of the individual blocks used for file-sharing. */ diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 8bec62b08..a08a041da 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -559,6 +559,44 @@ struct PendingRequest }; +/** + * Block that is ready for migration to other peers. Actual data is at the end of the block. + */ +struct MigrationReadyBlock +{ + + /** + * This is a doubly-linked list. + */ + struct MigrationReadyBlock *next; + + /** + * This is a doubly-linked list. + */ + struct MigrationReadyBlock *prev; + + /** + * Query for the block. + */ + GNUNET_HashCode query; + + /** + * When does this block expire? + */ + struct GNUNET_TIME_Absolute expiration; + + /** + * Size of the block. + */ + size_t size; + + /** + * Type of the block. + */ + enum GNUNET_BLOCK_Type type; +}; + + /** * Our scheduler. */ @@ -610,6 +648,37 @@ static struct ClientList *client_list; */ static struct GNUNET_CORE_Handle *core; +/** + * Head of linked list of blocks that can be migrated. + */ +static struct MigrationReadyBlock *mig_head; + +/** + * Tail of linked list of blocks that can be migrated. + */ +static struct MigrationReadyBlock *mig_tail; + +/** + * Request to datastore for migration (or NULL). + */ +static struct GNUNET_DATASTORE_QueueEntry *mig_qe; + +/** + * ID of task that collects blocks for migration. + */ +static GNUNET_SCHEDULER_TaskIdentifier mig_task; + +/** + * What is the maximum frequency at which we are allowed to + * poll the datastore for migration content? + */ +static struct GNUNET_TIME_Relative min_migration_delay; + +/** + * Size of the doubly-linked list of migration blocks. + */ +static unsigned int mig_size; + /** * Are we allowed to migrate content to this peer. */ @@ -618,6 +687,141 @@ static int active_migration; /* ******************* clean up functions ************************ */ +/** + * Delete the given migration block. + * + * @param mb block to delete + */ +static void +delete_migration_block (struct MigrationReadyBlock *mb) +{ + GNUNET_CONTAINER_DLL_remove (mig_head, + mig_tail, + mb); + mig_size--; + GNUNET_free (mb); +} + + +/** + * Consider migrating content to a given peer. + * + * @param cls not used + * @param key ID of the peer (not used) + * @param value 'struct ConnectedPeer' of the peer + * @return GNUNET_YES (always continue iteration)2 + */ +static int +consider_migration (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct ConnectedPeer *cp = value; + + if (cp->cth != NULL) + return GNUNET_YES; /* or what? */ + /* FIXME: not implemented! */ + return GNUNET_YES; +} + + + + +/** + * Task that is run periodically to obtain blocks for content + * migration + * + * @param cls unused + * @param tc scheduler context (also unused) + */ +static void +gather_migration_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Process content offered for migration. + * + * @param cls closure + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +process_migration_content (void *cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, uint64_t uid) +{ + struct MigrationReadyBlock *mb; + struct GNUNET_TIME_Relative delay; + + if (key == NULL) + { + mig_qe = NULL; + if (mig_size < MAX_MIGRATION_QUEUE) + { + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + mig_size); + delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS, + MAX_MIGRATION_QUEUE); + delay = GNUNET_TIME_relative_max (delay, + min_migration_delay); + mig_task = GNUNET_SCHEDULER_add_delayed (sched, + delay, + &gather_migration_blocks, + NULL); + } + return; + } + mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); + mb->query = *key; + mb->expiration = expiration; + mb->size = size; + mb->type = type; + memcpy (&mb[1], data, size); + GNUNET_CONTAINER_DLL_insert_after (mig_head, + mig_tail, + mig_tail, + mb); + mig_size++; + if (mig_size == 1) + GNUNET_CONTAINER_multihashmap_iterate (connected_peers, + &consider_migration, + NULL); + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); +} + + +/** + * Task that is run periodically to obtain blocks for content + * migration + * + * @param cls unused + * @param tc scheduler context (also unused) + */ +static void +gather_migration_blocks (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + mig_task = GNUNET_SCHEDULER_NO_TASK; + mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_migration_content, NULL); +} + + /** * We're done with a particular message list entry. * Free all associated resources. @@ -782,9 +986,12 @@ peer_connect_handler (void *cls, &peer->hashPubKey, cp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (mig_size > 0) + (void) consider_migration (NULL, &peer->hashPubKey, cp); } + /** * Free (each) request made by the peer. * @@ -974,6 +1181,16 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + if (mig_qe != NULL) + { + GNUNET_DATASTORE_cancel (mig_qe); + mig_qe = NULL; + } + if (GNUNET_SCHEDULER_NO_TASK != mig_task) + { + GNUNET_SCHEDULER_cancel (sched, mig_task); + mig_task = GNUNET_SCHEDULER_NO_TASK; + } while (client_list != NULL) handle_client_disconnect (NULL, client_list->client); @@ -1001,6 +1218,9 @@ shutdown_task (void *cls, } GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); + while (mig_head != NULL) + delete_migration_block (mig_head); + GNUNET_assert (0 == mig_size); dsh = NULL; sched = NULL; cfg = NULL; @@ -1065,7 +1285,7 @@ transmit_to_peer (void *cls, } #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to peer %u.\n", + "Transmitting %u bytes to peer %u\n", msize, cp->pid); #endif @@ -2930,6 +3150,7 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, sched = s; cfg = c; stats = GNUNET_STATISTICS_create (sched, "fs", cfg); + min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config @@ -2964,6 +3185,13 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, } return GNUNET_SYSERR; } + /* FIXME: distinguish between sending and storing in options? */ + if (active_migration) + { + mig_task = GNUNET_SCHEDULER_add_now (sched, + &gather_migration_blocks, + NULL); + } GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);