From: Christian Grothoff Date: Thu, 21 Jun 2012 23:05:54 +0000 (+0000) Subject: -work on #2437, plus misc minor bugfixes X-Git-Tag: initial-import-from-subversion-38251~12890 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=4e7a535cc04df4eef21dc1db39d782a3365e9df1;p=oweals%2Fgnunet.git -work on #2437, plus misc minor bugfixes --- diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c index 6d9af1187..5b4bfa020 100644 --- a/src/fs/fs_api.c +++ b/src/fs/fs_api.c @@ -59,6 +59,7 @@ start_job (struct GNUNET_FS_QueueEntry *qe) qe->start (qe->cls, qe->client); qe->start_times++; qe->h->active_blocks += qe->blocks; + qe->h->active_downloads++; qe->start_time = GNUNET_TIME_absolute_get (); GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, qe->h->pending_tail, qe); GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head, qe->h->running_tail, @@ -77,6 +78,7 @@ stop_job (struct GNUNET_FS_QueueEntry *qe) { qe->client = NULL; qe->stop (qe->cls); + GNUNET_assert (0 < qe->h->active_downloads); qe->h->active_downloads--; qe->h->active_blocks -= qe->blocks; qe->run_time = @@ -106,20 +108,24 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_TIME_Relative restart_at; struct GNUNET_TIME_Relative rst; struct GNUNET_TIME_Absolute end_time; - unsigned int num_download_waiting; - unsigned int num_download_active; - unsigned int num_download_expired; + unsigned int num_downloads_waiting; + unsigned int num_downloads_active; + unsigned int num_downloads_expired; unsigned int num_probes_active; unsigned int num_probes_waiting; unsigned int num_probes_expired; int num_probes_change; - int num_download_change; + int num_downloads_change; + int block_limit_hit; h->queue_job = GNUNET_SCHEDULER_NO_TASK; + /* restart_at will be set to the time when it makes sense to + re-evaluate the job queue (unless, of course, jobs complete + or are added, then we'll be triggered immediately */ restart_at = GNUNET_TIME_UNIT_FOREVER_REL; - /* first, see if we can start all the jobs */ + /* first, calculate some basic statistics on pending jobs */ num_probes_waiting = 0; - num_download_waiting = 0; + num_downloads_waiting = 0; for (qe = h->pending_head; NULL != qe; qe = qe->next) { switch (qe->priority) @@ -128,17 +134,18 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) num_probes_waiting++; break; case GNUNET_FS_QUEUE_PRIORITY_NORMAL: - num_download_waiting++; + num_downloads_waiting++; break; default: GNUNET_break (0); break; } } + /* now, calculate some basic statistics on running jobs */ num_probes_active = 0; num_probes_expired = 0; - num_download_active = 0; - num_download_expired = 0; + num_downloads_active = 0; + num_downloads_expired = 0; for (qe = h->running_head; NULL != qe; qe = qe->next) { run_time = @@ -146,48 +153,60 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) qe->blocks * qe->start_times); switch (qe->priority) { - case GNUNET_FS_QUEUE_PRIORITY_PROBE: - num_probes_active++; - /* run probes for at most 1s * number-of-restarts; note that - as the total runtime of a probe is limited to 2m, we don't - need to additionally limit the total time of a probe to - strictly limit its lifetime. */ - run_time = GNUNET_TIME_relative_min (run_time, - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - 1 + qe->start_times)); - end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); - rst = GNUNET_TIME_absolute_get_remaining (end_time); - restart_at = GNUNET_TIME_relative_min (rst, restart_at); - if (0 == rst.rel_value) - num_probes_expired++; - break; - case GNUNET_FS_QUEUE_PRIORITY_NORMAL: - num_download_active++; - end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); - rst = GNUNET_TIME_absolute_get_remaining (end_time); - restart_at = GNUNET_TIME_relative_min (rst, restart_at); - if (0 == rst.rel_value) - num_download_expired++; - break; - default: - GNUNET_break (0); - break; + case GNUNET_FS_QUEUE_PRIORITY_PROBE: + num_probes_active++; + /* run probes for at most 1s * number-of-restarts; note that + as the total runtime of a probe is limited to 2m, we don't + need to additionally limit the total time of a probe to + strictly limit its lifetime. */ + run_time = GNUNET_TIME_relative_min (run_time, + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 1 + qe->start_times)); + end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); + rst = GNUNET_TIME_absolute_get_remaining (end_time); + restart_at = GNUNET_TIME_relative_min (rst, restart_at); + if (0 == rst.rel_value) + num_probes_expired++; + break; + case GNUNET_FS_QUEUE_PRIORITY_NORMAL: + num_downloads_active++; + end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); + rst = GNUNET_TIME_absolute_get_remaining (end_time); + restart_at = GNUNET_TIME_relative_min (rst, restart_at); + if (0 == rst.rel_value) + num_downloads_expired++; + break; + default: + GNUNET_break (0); + break; } } - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "PA: %u, PE: %u, PW: %u; DA: %u, DE: %u, DW: %u\n", + num_probes_active, + num_probes_expired, + num_probes_waiting, + num_downloads_active, + num_downloads_expired, + num_downloads_waiting); + /* calculate stop decisions */ num_probes_change = 0; - num_download_change = 0; - if (h->active_downloads + num_download_waiting > h->max_parallel_requests) + num_downloads_change = 0; + if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests) { if (num_probes_active > 0) num_probes_change = - GNUNET_MIN (num_probes_active, - h->max_parallel_requests - (h->active_downloads + num_download_waiting)); - else if (h->active_downloads + num_download_waiting > h->max_parallel_requests) - num_download_change = - GNUNET_MIN (num_download_expired, - h->max_parallel_requests - (h->active_downloads + num_download_waiting)); + h->max_parallel_requests - (h->active_downloads + num_downloads_waiting)); + else if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests) + num_downloads_change = - GNUNET_MIN (num_downloads_expired, + h->max_parallel_requests - (h->active_downloads + num_downloads_waiting)); } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Stopping %d probes and %d downloads\n", + num_probes_change, + num_downloads_change); /* then, check if we should stop some jobs */ next = h->running_head; while (NULL != (qe = next)) @@ -197,121 +216,113 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_TIME_relative_multiply (h->avg_block_latency, qe->blocks * qe->start_times); switch (qe->priority) + { + case GNUNET_FS_QUEUE_PRIORITY_PROBE: + /* run probes for at most 1s * number-of-restarts; note that + as the total runtime of a probe is limited to 2m, we don't + need to additionally limit the total time of a probe to + strictly limit its lifetime. */ + run_time = GNUNET_TIME_relative_min (run_time, + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 1 + qe->start_times)); + end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); + rst = GNUNET_TIME_absolute_get_remaining (end_time); + restart_at = GNUNET_TIME_relative_min (rst, restart_at); + if ( (num_probes_change < 0) && + ( (num_probes_expired < - num_probes_change) || + (0 == rst.rel_value) ) ) { - case GNUNET_FS_QUEUE_PRIORITY_PROBE: - /* run probes for at most 1s * number-of-restarts; note that - as the total runtime of a probe is limited to 2m, we don't - need to additionally limit the total time of a probe to - strictly limit its lifetime. */ - run_time = GNUNET_TIME_relative_min (run_time, - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - 1 + qe->start_times)); - end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); - rst = GNUNET_TIME_absolute_get_remaining (end_time); - restart_at = GNUNET_TIME_relative_min (rst, restart_at); - if ( (num_probes_change < 0) && - ( (num_probes_expired < - num_probes_change) || - (0 == rst.rel_value) ) ) - { - stop_job (qe); - num_probes_change++; - if (0 == rst.rel_value) - num_probes_expired--; - } - break; - case GNUNET_FS_QUEUE_PRIORITY_NORMAL: - end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); - rst = GNUNET_TIME_absolute_get_remaining (end_time); - restart_at = GNUNET_TIME_relative_min (rst, restart_at); - if ( (num_download_change < 0) && - ( (num_download_expired < - num_download_change) || - (0 == rst.rel_value) ) ) - { - stop_job (qe); - num_download_change++; - if (0 == rst.rel_value) - num_download_expired--; - } - break; - default: - GNUNET_break (0); - break; + stop_job (qe); + num_probes_change++; + if (0 == rst.rel_value) + num_probes_expired--; } + break; + case GNUNET_FS_QUEUE_PRIORITY_NORMAL: + end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); + rst = GNUNET_TIME_absolute_get_remaining (end_time); + restart_at = GNUNET_TIME_relative_min (rst, restart_at); + if ( (num_downloads_change < 0) && + ( (num_downloads_expired < - num_downloads_change) || + (0 == rst.rel_value) ) ) + { + stop_job (qe); + num_downloads_change++; + if (0 == rst.rel_value) + num_downloads_expired--; + } + break; + default: + GNUNET_break (0); + break; + } } + GNUNET_break (0 == num_downloads_change); + GNUNET_break (0 == num_probes_change); - /* FIXME: calculate start decisions */ - num_probes_change = 0; - num_download_change = 0; - if (h->active_downloads + num_download_waiting < h->max_parallel_requests) - { - num_download_change = num_download_waiting; - num_probes_change = GNUNET_MIN (num_probes_waiting, - h->max_parallel_requests - (h->active_downloads + num_download_waiting)); - } - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "AD: %u, MP: %u\n", + h->active_downloads, + h->max_parallel_requests); - next = h->pending_head; - while (NULL != (qe = next)) + /* calculate start decisions */ + if (h->active_downloads + num_downloads_waiting < h->max_parallel_requests) { - next = qe->next; - if (NULL == h->running_head) - { - start_job (qe); - continue; - } - if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) && - (h->active_downloads < h->max_parallel_downloads)) - { - start_job (qe); - continue; - } + /* can start all downloads, fill rest with probes */ + num_downloads_change = num_downloads_waiting; + num_probes_change = GNUNET_MIN (num_probes_waiting, + h->max_parallel_requests - (h->active_downloads + num_downloads_waiting)); } - if (NULL == h->pending_head) - return; /* no need to stop anything */ - /* then, check if we should stop some jobs */ - next = h->running_head; - while (NULL != (qe = next)) + else { - next = qe->next; - run_time = - GNUNET_TIME_relative_multiply (h->avg_block_latency, - qe->blocks * qe->start_times); - switch (qe->priority) - { - case GNUNET_FS_QUEUE_PRIORITY_PROBE: - /* run probes for at most 1s * number-of-restarts; note that - as the total runtime of a probe is limited to 2m, we don't - need to additionally limit the total time of a probe to - strictly limit its lifetime. */ - run_time = GNUNET_TIME_relative_min (run_time, - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - 1 + qe->start_times)); - break; - case GNUNET_FS_QUEUE_PRIORITY_NORMAL: - break; - default: - GNUNET_break (0); - break; - } - end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); - rst = GNUNET_TIME_absolute_get_remaining (end_time); - restart_at = GNUNET_TIME_relative_min (rst, restart_at); - if (rst.rel_value > 0) - continue; - stop_job (qe); + num_downloads_change = h->max_parallel_requests - h->active_downloads; + num_probes_change = 0; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting %d probes and %d downloads\n", + num_probes_change, + num_downloads_change); + /* finally, start some more tasks if we now have empty slots */ + block_limit_hit = GNUNET_NO; next = h->pending_head; while (NULL != (qe = next)) { next = qe->next; - if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) && - (h->active_downloads < h->max_parallel_downloads)) + switch (qe->priority) { - start_job (qe); - continue; + case GNUNET_FS_QUEUE_PRIORITY_PROBE: + if (num_probes_change > 0) + { + start_job (qe); + num_probes_change--; + } + break; + case GNUNET_FS_QUEUE_PRIORITY_NORMAL: + if ( (num_downloads_change > 0) && + ( (qe->blocks + h->active_blocks <= h->max_parallel_requests) || + ( (qe->blocks > h->max_parallel_requests) && + (0 == h->active_downloads) ) ) ) + { + start_job (qe); + num_downloads_change--; + } + else if (num_downloads_change > 0) + block_limit_hit = GNUNET_YES; + break; + default: + GNUNET_break (0); + break; } } + GNUNET_break ( (0 == num_downloads_change) || (GNUNET_YES == block_limit_hit) ); + GNUNET_break (0 == num_probes_change); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Left with %d probes and %d downloads to start\n", + num_probes_change, + num_downloads_change); + + /* make sure we run again */ h->queue_job = GNUNET_SCHEDULER_add_delayed (restart_at, &process_job_queue, h); } diff --git a/src/fs/gnunet-download.c b/src/fs/gnunet-download.c index 43b306def..2293cedd7 100644 --- a/src/fs/gnunet-download.c +++ b/src/fs/gnunet-download.c @@ -52,6 +52,7 @@ static char *filename; static int local_only; + static void cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { @@ -63,13 +64,10 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_FS_DownloadContext *d; - - if (dc != NULL) + if (NULL != dc) { - d = dc; + GNUNET_FS_download_stop (dc, delete_incomplete); dc = NULL; - GNUNET_FS_download_stop (d, delete_incomplete); } } @@ -170,6 +168,13 @@ progress_cb (void *cls, const struct GNUNET_FS_ProgressInfo *info) } break; case GNUNET_FS_STATUS_DOWNLOAD_ERROR: +#if !WINDOWS + if (0 != isatty (1)) + fprintf (stdout, "\n"); +#else + if (FILE_TYPE_CHAR == GetFileType (GetStdHandle (STD_OUTPUT_HANDLE))) + fprintf (stdout, "\n"); +#endif FPRINTF (stderr, _("Error downloading: %s.\n"), info->value.download.specifics.error.message); GNUNET_SCHEDULER_shutdown (); @@ -178,6 +183,13 @@ progress_cb (void *cls, const struct GNUNET_FS_ProgressInfo *info) s = GNUNET_STRINGS_byte_size_fancy (info->value.download.completed * 1000 / (info->value.download. duration.rel_value + 1)); +#if !WINDOWS + if (0 != isatty (1)) + fprintf (stdout, "\n"); +#else + if (FILE_TYPE_CHAR == GetFileType (GetStdHandle (STD_OUTPUT_HANDLE))) + fprintf (stdout, "\n"); +#endif FPRINTF (stdout, _("Downloading `%s' done (%s/s).\n"), info->value.download.filename, s); GNUNET_free (s);