From 9c50ed21bb57e53eaa0b0011b1523f1984c857f3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 27 Apr 2010 13:40:04 +0000 Subject: [PATCH] bounded parallelism --- TODO | 13 ++- src/fs/fs.c | 136 ++++++++++++++++++++++---------- src/fs/fs.h | 54 +++++++------ src/fs/fs_download.c | 114 +++++++++++++++++++------- src/fs/fs_search.c | 15 +--- src/fs/fs_test_lib.c | 3 + src/fs/gnunet-download.c | 3 + src/include/gnunet_fs_service.h | 10 ++- 8 files changed, 233 insertions(+), 115 deletions(-) diff --git a/TODO b/TODO index a4393324c..3db19b785 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,9 @@ 0.9.0pre1: * FS: [CG] - - bound parallelism (# fs downloads) - - distinguish in performance tracking and event signalling between - downloads that are actually running and those that are merely in the queue + - search: availability probes [needed for full persistence support...] + - Allow checking of presence of search results and/or content via command-line tools + (add options to gnunet-search / gnunet-download to limit search to local peer) + [needed for full persistence support...] - persistence support (publish, unindex, search, download) - gnunet-service-fs (hot-path routing, load-based routing, nitpicks) - [gnunet-service-fs.c:208]: member 'LocalGetContext::results_bf_size' is never used @@ -70,7 +71,6 @@ - shutdown sequence? * FS: [CG] - datastore reservation (publishing) - - search: availability probes - location URIs (publish, search, download) - non-anonymous FS service (needs DHT) + DHT integration for search @@ -116,8 +116,7 @@ - convert documentation pages to books - update books (especially for developers) - create good Drupal theme for GNUnet - - make a NICE download page and figure out how to - enable developers to publish TGZs nicely + - make a NICE download page and figure out how to enable developers to publish TGZs nicely - port "contact" page - add content type for "todo" items? * POSTGRES database backends: [CG] @@ -151,8 +150,6 @@ (Note: build library always, build service when libxml2/etc. are available) * FS: [CG] - Remove KBlocks in gnunet-unindex (see discussion with Kenneth Almquist on gnunet-devs in 9/2009) - - Allow checking of presence of search results and/or content via command-line tools - (add options to gnunet-search / gnunet-download to limit search to local peer) * PEERINFO: [CG] - expire 'ancient' HELLOs (those without valid addresses AND that we have not 'used' (for their public keys) in a while; need a way diff --git a/src/fs/fs.c b/src/fs/fs.c index 2888abe93..6dbc182b7 100644 --- a/src/fs/fs.c +++ b/src/fs/fs.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008, 2009 Christian Grothoff (and other contributing authors) + (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008, 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -45,20 +45,13 @@ start_job (struct GNUNET_FS_QueueEntry *qe) return; } qe->start (qe->cls, qe->client); - switch (qe->category) - { - case GNUNET_FS_QC_DOWNLOAD: - qe->h->active_downloads++; - break; - case GNUNET_FS_QC_PROBE: - qe->h->active_probes++; - break; - } + qe->start_times++; + qe->h->active_blocks += qe->blocks; qe->start_time = GNUNET_TIME_absolute_get (); GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, qe->h->pending_tail, qe); - GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, + GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head, qe->h->running_tail, qe->h->running_tail, qe); @@ -76,15 +69,8 @@ stop_job (struct GNUNET_FS_QueueEntry *qe) { qe->client = NULL; qe->stop (qe->cls); - switch (qe->category) - { - case GNUNET_FS_QC_DOWNLOAD: - qe->h->active_downloads--; - break; - case GNUNET_FS_QC_PROBE: - qe->h->active_probes--; - break; - } + qe->h->active_downloads--; + qe->h->active_blocks -= qe->blocks; qe->run_time = GNUNET_TIME_relative_add (qe->run_time, GNUNET_TIME_absolute_get_duration (qe->start_time)); GNUNET_CONTAINER_DLL_remove (qe->h->running_head, @@ -109,13 +95,54 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_FS_Handle *h = cls; + struct GNUNET_FS_QueueEntry *qe; + struct GNUNET_FS_QueueEntry *next; + struct GNUNET_TIME_Relative run_time; + struct GNUNET_TIME_Relative restart_at; + struct GNUNET_TIME_Relative rst; + struct GNUNET_TIME_Absolute end_time; h->queue_job = GNUNET_SCHEDULER_NO_TASK; - /* FIXME: stupid implementation that just starts everything follows... */ - while (NULL != h->pending_head) - start_job (h->pending_head); - - /* FIXME: possibly re-schedule queue-job! */ + next = h->pending_head; + while (NULL != (qe = next)) + { + next = qe->next; + if (h->running_head == NULL) + { + start_job (qe); + continue; + } + if ( (qe->blocks + h->active_blocks <= h->max_parallel_requests) && + (h->active_downloads + 1 <= h->max_parallel_downloads) ) + { + start_job (qe); + continue; + } + } + if (h->pending_head == NULL) + return; /* no need to stop anything */ + restart_at = GNUNET_TIME_UNIT_FOREVER_REL; + next = h->running_head; + while (NULL != (qe = next)) + { + next = qe->next; + /* FIXME: might be faster/simpler to do this calculation only once + when we start a job (OTOH, this would allow us to dynamically + and easily adjust qe->blocks over time, given the right API...) */ + run_time = GNUNET_TIME_relative_multiply (h->avg_block_latency, + qe->blocks * qe->start_times); + end_time = GNUNET_TIME_absolute_add (qe->start_time, + run_time); + rst = GNUNET_TIME_absolute_get_remaining (end_time); + restart_at = GNUNET_TIME_relative_min (rst, restart_at); + if (rst.value > 0) + continue; + stop_job (qe); + } + h->queue_job = GNUNET_SCHEDULER_add_delayed (h->sched, + restart_at, + &process_job_queue, + h); } /** @@ -125,7 +152,7 @@ process_job_queue (void *cls, * @param start function to call to begin the job * @param stop function to call to pause the job, or on dequeue (if the job was running) * @param cls closure for start and stop - * @param cat category of the job + * @param blocks number of blocks this jobs uses * @return queue handle */ struct GNUNET_FS_QueueEntry * @@ -133,7 +160,7 @@ GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, GNUNET_FS_QueueStart start, GNUNET_FS_QueueStop stop, void *cls, - enum GNUNET_FS_QueueCategory cat) + unsigned int blocks) { struct GNUNET_FS_QueueEntry *qe; @@ -143,7 +170,7 @@ GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, qe->stop = stop; qe->cls = cls; qe->queue_time = GNUNET_TIME_absolute_get (); - qe->category = cat; + qe->blocks = blocks; GNUNET_CONTAINER_DLL_insert_after (h->pending_head, h->pending_tail, h->pending_tail, @@ -166,21 +193,22 @@ GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, void GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh) { + struct GNUNET_FS_Handle *h; + + h = qh->h; if (qh->client != NULL) - { - if (qh->h->queue_job != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (qh->h->sched, - qh->h->queue_job); - qh->h->queue_job - = GNUNET_SCHEDULER_add_now (qh->h->sched, - &process_job_queue, - qh->h); - stop_job (qh); - } - GNUNET_CONTAINER_DLL_remove (qh->h->pending_head, - qh->h->pending_tail, + stop_job (qh); + GNUNET_CONTAINER_DLL_remove (h->pending_head, + h->pending_tail, qh); GNUNET_free (qh); + if (h->queue_job != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (h->sched, + h->queue_job); + h->queue_job + = GNUNET_SCHEDULER_add_now (h->sched, + &process_job_queue, + h); } @@ -207,7 +235,9 @@ GNUNET_FS_start (struct GNUNET_SCHEDULER_Handle *sched, { struct GNUNET_FS_Handle *ret; struct GNUNET_CLIENT_Connection *client; - + enum GNUNET_FS_OPTIONS opt; + va_list ap; + client = GNUNET_CLIENT_connect (sched, "fs", cfg); @@ -221,7 +251,29 @@ GNUNET_FS_start (struct GNUNET_SCHEDULER_Handle *sched, ret->upcb_cls = upcb_cls; ret->client = client; ret->flags = flags; - // FIXME: process varargs! + ret->max_parallel_downloads = 1; + ret->max_parallel_requests = 1; + ret->avg_block_latency = GNUNET_TIME_UNIT_MINUTES; /* conservative starting point */ + va_start (ap, flags); + while (GNUNET_FS_OPTIONS_END != (opt = va_arg (ap, enum GNUNET_FS_OPTIONS))) + { + switch (opt) + { + case GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM: + ret->max_parallel_downloads = va_arg (ap, unsigned int); + break; + case GNUNET_FS_OPTIONS_REQUEST_PARALLELISM: + ret->max_parallel_requests = va_arg (ap, unsigned int); + break; + default: + GNUNET_break (0); + GNUNET_free (ret->client_name); + GNUNET_free (ret); + va_end (ap); + return NULL; + } + } + va_end (ap); // FIXME: setup receive-loop with client // FIXME: deserialize state; use client-name to find master-directory! diff --git a/src/fs/fs.h b/src/fs/fs.h index b10add77c..e11b0aa74 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -462,22 +462,6 @@ typedef void (*GNUNET_FS_QueueStart)(void *cls, */ typedef void (*GNUNET_FS_QueueStop)(void *cls); -/** - * Categories of jobs in the FS queue. - */ -enum GNUNET_FS_QueueCategory - { - /** - * File download. - */ - GNUNET_FS_QC_DOWNLOAD, - - /** - * Availability probe (related to search). - */ - GNUNET_FS_QC_PROBE - - }; /** * Entry in the job queue. @@ -536,9 +520,14 @@ struct GNUNET_FS_QueueEntry struct GNUNET_TIME_Relative run_time; /** - * What type of job is this? + * How many blocks do the active downloads have? + */ + unsigned int blocks; + + /** + * How often have we (re)started this download? */ - enum GNUNET_FS_QueueCategory category; + unsigned int start_times; }; @@ -552,6 +541,7 @@ struct GNUNET_FS_QueueEntry * @param start function to call to begin the job * @param stop function to call to pause the job, or on dequeue (if the job was running) * @param cls closure for start and stop + * @param blocks number of blocks this download has * @return queue handle */ struct GNUNET_FS_QueueEntry * @@ -559,7 +549,7 @@ GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, GNUNET_FS_QueueStart start, GNUNET_FS_QueueStop stop, void *cls, - enum GNUNET_FS_QueueCategory cat); + unsigned int blocks); /** @@ -632,21 +622,36 @@ struct GNUNET_FS_Handle GNUNET_SCHEDULER_TaskIdentifier queue_job; /** - * How many downloads probing availability of search results do we - * have running right now? + * Average time we take for a single request to be satisfied. + * FIXME: not yet calcualted properly... */ - unsigned int active_probes; + struct GNUNET_TIME_Relative avg_block_latency; /** * How many actual downloads do we have running right now? */ unsigned int active_downloads; + /** + * How many blocks do the active downloads have? + */ + unsigned int active_blocks; + /** * General flags. */ enum GNUNET_FS_Flags flags; + /** + * Maximum number of parallel downloads. + */ + unsigned int max_parallel_downloads; + + /** + * Maximum number of parallel requests. + */ + unsigned int max_parallel_requests; + }; @@ -1188,6 +1193,11 @@ struct GNUNET_FS_DownloadContext */ struct GNUNET_CLIENT_TransmitHandle *th; + /** + * Our entry in the job queue. + */ + struct GNUNET_FS_QueueEntry *job_queue; + /** * Identity of the peer having the content, or all-zeros * if we don't know of such a peer. diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 1f03082e8..424aaf5d4 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -23,12 +23,9 @@ * @author Christian Grothoff * * TODO: - * - handle recursive downloads (need directory & - * fs-level download-parallelism management) * - location URI suppport (can wait, easy) - * - check if blocks exist already (can wait, easy) - * - check if iblocks can be computed from existing blocks (can wait, hard) * - persistence (can wait) + * - check if iblocks can be computed from existing blocks (can wait, hard) */ #include "platform.h" #include "gnunet_constants.h" @@ -990,7 +987,11 @@ process_result_with_request (void *cls, "truncate", dc->filename); } - + if (dc->job_queue != NULL) + { + GNUNET_FS_dequeue_ (dc->job_queue); + dc->job_queue = NULL; + } if (is_recursive_download (dc)) full_recursive_download (dc); if (dc->child_head == NULL) @@ -1273,6 +1274,73 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc) } + +/** + * We're allowed to ask the FS service for our blocks. Start the download. + * + * @param cls the 'struct GNUNET_FS_DownloadContext' + * @param client handle to use for communcation with FS (we must destroy it!) + */ +static void +activate_fs_download (void *cls, + struct GNUNET_CLIENT_Connection *client) +{ + struct GNUNET_FS_DownloadContext *dc = cls; + struct GNUNET_FS_ProgressInfo pi; + + GNUNET_assert (NULL != client); + dc->client = client; + GNUNET_CLIENT_receive (client, + &receive_results, + dc, + GNUNET_TIME_UNIT_FOREVER_REL); + pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; + make_download_status (&pi, dc); + dc->client_info = dc->h->upcb (dc->h->upcb_cls, + &pi); + GNUNET_CONTAINER_multihashmap_iterate (dc->active, + &retry_entry, + dc); + if ( (dc->th == NULL) && + (dc->client != NULL) ) + dc->th = GNUNET_CLIENT_notify_transmit_ready (dc->client, + sizeof (struct SearchMessage), + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + GNUNET_NO, + &transmit_download_request, + dc); +} + + +/** + * We must stop to ask the FS service for our blocks. Pause the download. + * + * @param cls the 'struct GNUNET_FS_DownloadContext' + * @param client handle to use for communcation with FS (we must destroy it!) + */ +static void +deactivate_fs_download (void *cls) +{ + struct GNUNET_FS_DownloadContext *dc = cls; + struct GNUNET_FS_ProgressInfo pi; + + if (NULL != dc->th) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); + dc->th = NULL; + } + if (NULL != dc->client) + { + GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO); + dc->client = NULL; + } + pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; + make_download_status (&pi, dc); + dc->client_info = dc->h->upcb (dc->h->upcb_cls, + &pi); +} + + /** * Download parts of a file. Note that this will store * the blocks at the respective offset in the given file. Also, the @@ -1318,7 +1386,6 @@ GNUNET_FS_download_start (struct GNUNET_FS_Handle *h, { struct GNUNET_FS_ProgressInfo pi; struct GNUNET_FS_DownloadContext *dc; - struct GNUNET_CLIENT_Connection *client; GNUNET_assert (GNUNET_FS_uri_test_chk (uri)); if ( (offset + length < offset) || @@ -1401,26 +1468,15 @@ GNUNET_FS_download_start (struct GNUNET_FS_Handle *h, pi.value.download.specifics.start.meta = meta; dc->client_info = dc->h->upcb (dc->h->upcb_cls, &pi); - - - // FIXME: bound parallelism here - client = GNUNET_CLIENT_connect (h->sched, - "fs", - h->cfg); - GNUNET_assert (NULL != client); - dc->client = client; - GNUNET_CLIENT_receive (client, - &receive_results, - dc, - GNUNET_TIME_UNIT_FOREVER_REL); - pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; - make_download_status (&pi, dc); - dc->client_info = dc->h->upcb (dc->h->upcb_cls, - &pi); schedule_block_download (dc, &dc->uri->data.chk.chk, 0, - 1 /* 0 == CHK, 1 == top */); + 1 /* 0 == CHK, 1 == top */); + dc->job_queue = GNUNET_FS_queue_ (h, + &activate_fs_download, + &deactivate_fs_download, + dc, + (length + DBLOCK_SIZE-1) / DBLOCK_SIZE); return dc; } @@ -1455,6 +1511,11 @@ GNUNET_FS_download_stop (struct GNUNET_FS_DownloadContext *dc, { struct GNUNET_FS_ProgressInfo pi; + if (dc->job_queue != NULL) + { + GNUNET_FS_dequeue_ (dc->job_queue); + dc->job_queue = NULL; + } while (NULL != dc->child_head) GNUNET_FS_download_stop (dc->child_head, do_delete); @@ -1472,13 +1533,6 @@ GNUNET_FS_download_stop (struct GNUNET_FS_DownloadContext *dc, if (GNUNET_SCHEDULER_NO_TASK != dc->task) GNUNET_SCHEDULER_cancel (dc->h->sched, dc->task); - if (NULL != dc->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); - dc->th = NULL; - } - if (NULL != dc->client) - GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO); GNUNET_CONTAINER_multihashmap_iterate (dc->active, &free_entry, NULL); diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c index 294d3d454..c75b21668 100644 --- a/src/fs/fs_search.c +++ b/src/fs/fs_search.c @@ -1022,19 +1022,10 @@ search_result_free (void *cls, GNUNET_FS_uri_destroy (sr->uri); GNUNET_CONTAINER_meta_data_destroy (sr->meta); if (sr->probe_ctx != NULL) - { - GNUNET_FS_download_stop (sr->probe_ctx, GNUNET_YES); - h->active_probes--; - /* FIXME: trigger starting of new - probes here!? Maybe not -- could - cause new probes to be immediately - stopped again... */ - } + GNUNET_FS_download_stop (sr->probe_ctx, GNUNET_YES); if (sr->probe_cancel_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, - sr->probe_cancel_task); - } + GNUNET_SCHEDULER_cancel (h->sched, + sr->probe_cancel_task); GNUNET_free (sr); return GNUNET_OK; } diff --git a/src/fs/fs_test_lib.c b/src/fs/fs_test_lib.c index 6f36e20cb..dd783f406 100644 --- a/src/fs/fs_test_lib.c +++ b/src/fs/fs_test_lib.c @@ -208,6 +208,9 @@ progress_cb (void *cls, daemon, GNUNET_SCHEDULER_REASON_PREREQ_DONE); break; + case GNUNET_FS_STATUS_DOWNLOAD_ACTIVE: + case GNUNET_FS_STATUS_DOWNLOAD_INACTIVE: + break; /* FIXME: monitor data correctness during download progress */ /* FIXME: do performance reports given sufficient verbosity */ /* FIXME: advance timeout task to "immediate" on error */ diff --git a/src/fs/gnunet-download.c b/src/fs/gnunet-download.c index d9ab7543d..bbc750026 100644 --- a/src/fs/gnunet-download.c +++ b/src/fs/gnunet-download.c @@ -138,6 +138,9 @@ progress_cb (void *cls, NULL, GNUNET_SCHEDULER_REASON_PREREQ_DONE); break; + case GNUNET_FS_STATUS_DOWNLOAD_ACTIVE: + case GNUNET_FS_STATUS_DOWNLOAD_INACTIVE: + break; default: fprintf (stderr, _("Unexpected status: %d\n"), diff --git a/src/include/gnunet_fs_service.h b/src/include/gnunet_fs_service.h index 64bfda3ba..a228d0263 100644 --- a/src/include/gnunet_fs_service.h +++ b/src/include/gnunet_fs_service.h @@ -1476,7 +1476,15 @@ enum GNUNET_FS_OPTIONS * followed by an "unsigned int" giving the desired maximum number * of parallel downloads). */ - GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM = 1 + GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM = 1, + + /** + * Maximum number of requests that should be pending at a given + * point in time (invidivual downloads may go above this, but + * if we are above this threshold, we should not activate any + * additional downloads. + */ + GNUNET_FS_OPTIONS_REQUEST_PARALLELISM = 2 }; -- 2.25.1