0.9.0pre1:
* FS: [CG]
- - bound parallelism (# fs downloads)
- - distinguish in performance tracking and event signalling between
- downloads that are actually running and those that are merely in the queue
+ - search: availability probes [needed for full persistence support...]
+ - Allow checking of presence of search results and/or content via command-line tools
+ (add options to gnunet-search / gnunet-download to limit search to local peer)
+ [needed for full persistence support...]
- persistence support (publish, unindex, search, download)
- gnunet-service-fs (hot-path routing, load-based routing, nitpicks)
- [gnunet-service-fs.c:208]: member 'LocalGetContext::results_bf_size' is never used
- shutdown sequence?
* FS: [CG]
- datastore reservation (publishing)
- - search: availability probes
- location URIs (publish, search, download)
- non-anonymous FS service (needs DHT)
+ DHT integration for search
- convert documentation pages to books
- update books (especially for developers)
- create good Drupal theme for GNUnet
- - make a NICE download page and figure out how to
- enable developers to publish TGZs nicely
+ - make a NICE download page and figure out how to enable developers to publish TGZs nicely
- port "contact" page
- add content type for "todo" items?
* POSTGRES database backends: [CG]
(Note: build library always, build service when libxml2/etc. are available)
* FS: [CG]
- Remove KBlocks in gnunet-unindex (see discussion with Kenneth Almquist on gnunet-devs in 9/2009)
- - Allow checking of presence of search results and/or content via command-line tools
- (add options to gnunet-search / gnunet-download to limit search to local peer)
* PEERINFO: [CG]
- expire 'ancient' HELLOs (those without valid addresses AND that
we have not 'used' (for their public keys) in a while; need a way
/*
This file is part of GNUnet.
- (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008, 2009 Christian Grothoff (and other contributing authors)
+ (C) 2001, 2002, 2003, 2004, 2005, 2006, 2008, 2009, 2010 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
return;
}
qe->start (qe->cls, qe->client);
- switch (qe->category)
- {
- case GNUNET_FS_QC_DOWNLOAD:
- qe->h->active_downloads++;
- break;
- case GNUNET_FS_QC_PROBE:
- qe->h->active_probes++;
- break;
- }
+ qe->start_times++;
+ qe->h->active_blocks += qe->blocks;
qe->start_time = GNUNET_TIME_absolute_get ();
GNUNET_CONTAINER_DLL_remove (qe->h->pending_head,
qe->h->pending_tail,
qe);
- GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+ GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head,
qe->h->running_tail,
qe->h->running_tail,
qe);
{
qe->client = NULL;
qe->stop (qe->cls);
- switch (qe->category)
- {
- case GNUNET_FS_QC_DOWNLOAD:
- qe->h->active_downloads--;
- break;
- case GNUNET_FS_QC_PROBE:
- qe->h->active_probes--;
- break;
- }
+ qe->h->active_downloads--;
+ qe->h->active_blocks -= qe->blocks;
qe->run_time = GNUNET_TIME_relative_add (qe->run_time,
GNUNET_TIME_absolute_get_duration (qe->start_time));
GNUNET_CONTAINER_DLL_remove (qe->h->running_head,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_FS_Handle *h = cls;
+ struct GNUNET_FS_QueueEntry *qe;
+ struct GNUNET_FS_QueueEntry *next;
+ struct GNUNET_TIME_Relative run_time;
+ struct GNUNET_TIME_Relative restart_at;
+ struct GNUNET_TIME_Relative rst;
+ struct GNUNET_TIME_Absolute end_time;
h->queue_job = GNUNET_SCHEDULER_NO_TASK;
- /* FIXME: stupid implementation that just starts everything follows... */
- while (NULL != h->pending_head)
- start_job (h->pending_head);
-
- /* FIXME: possibly re-schedule queue-job! */
+ next = h->pending_head;
+ while (NULL != (qe = next))
+ {
+ next = qe->next;
+ if (h->running_head == NULL)
+ {
+ start_job (qe);
+ continue;
+ }
+ if ( (qe->blocks + h->active_blocks <= h->max_parallel_requests) &&
+ (h->active_downloads + 1 <= h->max_parallel_downloads) )
+ {
+ start_job (qe);
+ continue;
+ }
+ }
+ if (h->pending_head == NULL)
+ return; /* no need to stop anything */
+ restart_at = GNUNET_TIME_UNIT_FOREVER_REL;
+ next = h->running_head;
+ while (NULL != (qe = next))
+ {
+ next = qe->next;
+ /* FIXME: might be faster/simpler to do this calculation only once
+ when we start a job (OTOH, this would allow us to dynamically
+ and easily adjust qe->blocks over time, given the right API...) */
+ run_time = GNUNET_TIME_relative_multiply (h->avg_block_latency,
+ qe->blocks * qe->start_times);
+ end_time = GNUNET_TIME_absolute_add (qe->start_time,
+ run_time);
+ rst = GNUNET_TIME_absolute_get_remaining (end_time);
+ restart_at = GNUNET_TIME_relative_min (rst, restart_at);
+ if (rst.value > 0)
+ continue;
+ stop_job (qe);
+ }
+ h->queue_job = GNUNET_SCHEDULER_add_delayed (h->sched,
+ restart_at,
+ &process_job_queue,
+ h);
}
/**
* @param start function to call to begin the job
* @param stop function to call to pause the job, or on dequeue (if the job was running)
* @param cls closure for start and stop
- * @param cat category of the job
+ * @param blocks number of blocks this jobs uses
* @return queue handle
*/
struct GNUNET_FS_QueueEntry *
GNUNET_FS_QueueStart start,
GNUNET_FS_QueueStop stop,
void *cls,
- enum GNUNET_FS_QueueCategory cat)
+ unsigned int blocks)
{
struct GNUNET_FS_QueueEntry *qe;
qe->stop = stop;
qe->cls = cls;
qe->queue_time = GNUNET_TIME_absolute_get ();
- qe->category = cat;
+ qe->blocks = blocks;
GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
h->pending_tail,
h->pending_tail,
void
GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh)
{
+ struct GNUNET_FS_Handle *h;
+
+ h = qh->h;
if (qh->client != NULL)
- {
- if (qh->h->queue_job != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (qh->h->sched,
- qh->h->queue_job);
- qh->h->queue_job
- = GNUNET_SCHEDULER_add_now (qh->h->sched,
- &process_job_queue,
- qh->h);
- stop_job (qh);
- }
- GNUNET_CONTAINER_DLL_remove (qh->h->pending_head,
- qh->h->pending_tail,
+ stop_job (qh);
+ GNUNET_CONTAINER_DLL_remove (h->pending_head,
+ h->pending_tail,
qh);
GNUNET_free (qh);
+ if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (h->sched,
+ h->queue_job);
+ h->queue_job
+ = GNUNET_SCHEDULER_add_now (h->sched,
+ &process_job_queue,
+ h);
}
{
struct GNUNET_FS_Handle *ret;
struct GNUNET_CLIENT_Connection *client;
-
+ enum GNUNET_FS_OPTIONS opt;
+ va_list ap;
+
client = GNUNET_CLIENT_connect (sched,
"fs",
cfg);
ret->upcb_cls = upcb_cls;
ret->client = client;
ret->flags = flags;
- // FIXME: process varargs!
+ ret->max_parallel_downloads = 1;
+ ret->max_parallel_requests = 1;
+ ret->avg_block_latency = GNUNET_TIME_UNIT_MINUTES; /* conservative starting point */
+ va_start (ap, flags);
+ while (GNUNET_FS_OPTIONS_END != (opt = va_arg (ap, enum GNUNET_FS_OPTIONS)))
+ {
+ switch (opt)
+ {
+ case GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM:
+ ret->max_parallel_downloads = va_arg (ap, unsigned int);
+ break;
+ case GNUNET_FS_OPTIONS_REQUEST_PARALLELISM:
+ ret->max_parallel_requests = va_arg (ap, unsigned int);
+ break;
+ default:
+ GNUNET_break (0);
+ GNUNET_free (ret->client_name);
+ GNUNET_free (ret);
+ va_end (ap);
+ return NULL;
+ }
+ }
+ va_end (ap);
// FIXME: setup receive-loop with client
// FIXME: deserialize state; use client-name to find master-directory!
*/
typedef void (*GNUNET_FS_QueueStop)(void *cls);
-/**
- * Categories of jobs in the FS queue.
- */
-enum GNUNET_FS_QueueCategory
- {
- /**
- * File download.
- */
- GNUNET_FS_QC_DOWNLOAD,
-
- /**
- * Availability probe (related to search).
- */
- GNUNET_FS_QC_PROBE
-
- };
/**
* Entry in the job queue.
struct GNUNET_TIME_Relative run_time;
/**
- * What type of job is this?
+ * How many blocks do the active downloads have?
+ */
+ unsigned int blocks;
+
+ /**
+ * How often have we (re)started this download?
*/
- enum GNUNET_FS_QueueCategory category;
+ unsigned int start_times;
};
* @param start function to call to begin the job
* @param stop function to call to pause the job, or on dequeue (if the job was running)
* @param cls closure for start and stop
+ * @param blocks number of blocks this download has
* @return queue handle
*/
struct GNUNET_FS_QueueEntry *
GNUNET_FS_QueueStart start,
GNUNET_FS_QueueStop stop,
void *cls,
- enum GNUNET_FS_QueueCategory cat);
+ unsigned int blocks);
/**
GNUNET_SCHEDULER_TaskIdentifier queue_job;
/**
- * How many downloads probing availability of search results do we
- * have running right now?
+ * Average time we take for a single request to be satisfied.
+ * FIXME: not yet calcualted properly...
*/
- unsigned int active_probes;
+ struct GNUNET_TIME_Relative avg_block_latency;
/**
* How many actual downloads do we have running right now?
*/
unsigned int active_downloads;
+ /**
+ * How many blocks do the active downloads have?
+ */
+ unsigned int active_blocks;
+
/**
* General flags.
*/
enum GNUNET_FS_Flags flags;
+ /**
+ * Maximum number of parallel downloads.
+ */
+ unsigned int max_parallel_downloads;
+
+ /**
+ * Maximum number of parallel requests.
+ */
+ unsigned int max_parallel_requests;
+
};
*/
struct GNUNET_CLIENT_TransmitHandle *th;
+ /**
+ * Our entry in the job queue.
+ */
+ struct GNUNET_FS_QueueEntry *job_queue;
+
/**
* Identity of the peer having the content, or all-zeros
* if we don't know of such a peer.
* @author Christian Grothoff
*
* TODO:
- * - handle recursive downloads (need directory &
- * fs-level download-parallelism management)
* - location URI suppport (can wait, easy)
- * - check if blocks exist already (can wait, easy)
- * - check if iblocks can be computed from existing blocks (can wait, hard)
* - persistence (can wait)
+ * - check if iblocks can be computed from existing blocks (can wait, hard)
*/
#include "platform.h"
#include "gnunet_constants.h"
"truncate",
dc->filename);
}
-
+ if (dc->job_queue != NULL)
+ {
+ GNUNET_FS_dequeue_ (dc->job_queue);
+ dc->job_queue = NULL;
+ }
if (is_recursive_download (dc))
full_recursive_download (dc);
if (dc->child_head == NULL)
}
+
+/**
+ * We're allowed to ask the FS service for our blocks. Start the download.
+ *
+ * @param cls the 'struct GNUNET_FS_DownloadContext'
+ * @param client handle to use for communcation with FS (we must destroy it!)
+ */
+static void
+activate_fs_download (void *cls,
+ struct GNUNET_CLIENT_Connection *client)
+{
+ struct GNUNET_FS_DownloadContext *dc = cls;
+ struct GNUNET_FS_ProgressInfo pi;
+
+ GNUNET_assert (NULL != client);
+ dc->client = client;
+ GNUNET_CLIENT_receive (client,
+ &receive_results,
+ dc,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
+ make_download_status (&pi, dc);
+ dc->client_info = dc->h->upcb (dc->h->upcb_cls,
+ &pi);
+ GNUNET_CONTAINER_multihashmap_iterate (dc->active,
+ &retry_entry,
+ dc);
+ if ( (dc->th == NULL) &&
+ (dc->client != NULL) )
+ dc->th = GNUNET_CLIENT_notify_transmit_ready (dc->client,
+ sizeof (struct SearchMessage),
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ GNUNET_NO,
+ &transmit_download_request,
+ dc);
+}
+
+
+/**
+ * We must stop to ask the FS service for our blocks. Pause the download.
+ *
+ * @param cls the 'struct GNUNET_FS_DownloadContext'
+ * @param client handle to use for communcation with FS (we must destroy it!)
+ */
+static void
+deactivate_fs_download (void *cls)
+{
+ struct GNUNET_FS_DownloadContext *dc = cls;
+ struct GNUNET_FS_ProgressInfo pi;
+
+ if (NULL != dc->th)
+ {
+ GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
+ dc->th = NULL;
+ }
+ if (NULL != dc->client)
+ {
+ GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO);
+ dc->client = NULL;
+ }
+ pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE;
+ make_download_status (&pi, dc);
+ dc->client_info = dc->h->upcb (dc->h->upcb_cls,
+ &pi);
+}
+
+
/**
* Download parts of a file. Note that this will store
* the blocks at the respective offset in the given file. Also, the
{
struct GNUNET_FS_ProgressInfo pi;
struct GNUNET_FS_DownloadContext *dc;
- struct GNUNET_CLIENT_Connection *client;
GNUNET_assert (GNUNET_FS_uri_test_chk (uri));
if ( (offset + length < offset) ||
pi.value.download.specifics.start.meta = meta;
dc->client_info = dc->h->upcb (dc->h->upcb_cls,
&pi);
-
-
- // FIXME: bound parallelism here
- client = GNUNET_CLIENT_connect (h->sched,
- "fs",
- h->cfg);
- GNUNET_assert (NULL != client);
- dc->client = client;
- GNUNET_CLIENT_receive (client,
- &receive_results,
- dc,
- GNUNET_TIME_UNIT_FOREVER_REL);
- pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
- make_download_status (&pi, dc);
- dc->client_info = dc->h->upcb (dc->h->upcb_cls,
- &pi);
schedule_block_download (dc,
&dc->uri->data.chk.chk,
0,
- 1 /* 0 == CHK, 1 == top */);
+ 1 /* 0 == CHK, 1 == top */);
+ dc->job_queue = GNUNET_FS_queue_ (h,
+ &activate_fs_download,
+ &deactivate_fs_download,
+ dc,
+ (length + DBLOCK_SIZE-1) / DBLOCK_SIZE);
return dc;
}
{
struct GNUNET_FS_ProgressInfo pi;
+ if (dc->job_queue != NULL)
+ {
+ GNUNET_FS_dequeue_ (dc->job_queue);
+ dc->job_queue = NULL;
+ }
while (NULL != dc->child_head)
GNUNET_FS_download_stop (dc->child_head,
do_delete);
if (GNUNET_SCHEDULER_NO_TASK != dc->task)
GNUNET_SCHEDULER_cancel (dc->h->sched,
dc->task);
- if (NULL != dc->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
- dc->th = NULL;
- }
- if (NULL != dc->client)
- GNUNET_CLIENT_disconnect (dc->client, GNUNET_NO);
GNUNET_CONTAINER_multihashmap_iterate (dc->active,
&free_entry,
NULL);
GNUNET_FS_uri_destroy (sr->uri);
GNUNET_CONTAINER_meta_data_destroy (sr->meta);
if (sr->probe_ctx != NULL)
- {
- GNUNET_FS_download_stop (sr->probe_ctx, GNUNET_YES);
- h->active_probes--;
- /* FIXME: trigger starting of new
- probes here!? Maybe not -- could
- cause new probes to be immediately
- stopped again... */
- }
+ GNUNET_FS_download_stop (sr->probe_ctx, GNUNET_YES);
if (sr->probe_cancel_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched,
- sr->probe_cancel_task);
- }
+ GNUNET_SCHEDULER_cancel (h->sched,
+ sr->probe_cancel_task);
GNUNET_free (sr);
return GNUNET_OK;
}
daemon,
GNUNET_SCHEDULER_REASON_PREREQ_DONE);
break;
+ case GNUNET_FS_STATUS_DOWNLOAD_ACTIVE:
+ case GNUNET_FS_STATUS_DOWNLOAD_INACTIVE:
+ break;
/* FIXME: monitor data correctness during download progress */
/* FIXME: do performance reports given sufficient verbosity */
/* FIXME: advance timeout task to "immediate" on error */
NULL,
GNUNET_SCHEDULER_REASON_PREREQ_DONE);
break;
+ case GNUNET_FS_STATUS_DOWNLOAD_ACTIVE:
+ case GNUNET_FS_STATUS_DOWNLOAD_INACTIVE:
+ break;
default:
fprintf (stderr,
_("Unexpected status: %d\n"),
* followed by an "unsigned int" giving the desired maximum number
* of parallel downloads).
*/
- GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM = 1
+ GNUNET_FS_OPTIONS_DOWNLOAD_PARALLELISM = 1,
+
+ /**
+ * Maximum number of requests that should be pending at a given
+ * point in time (invidivual downloads may go above this, but
+ * if we are above this threshold, we should not activate any
+ * additional downloads.
+ */
+ GNUNET_FS_OPTIONS_REQUEST_PARALLELISM = 2
};