From 9a1a09225ad61aeda6da0db15b2f5dc8b3c874f9 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 26 Apr 2010 16:06:20 +0000 Subject: [PATCH] towards job queuing --- src/fs/fs.c | 158 +++++++++++++++++++++++++++++++ src/fs/fs.h | 161 +++++++++++++++++++++++++++++++- src/fs/fs_download.c | 21 +++-- src/include/gnunet_fs_service.h | 12 +++ 4 files changed, 341 insertions(+), 11 deletions(-) diff --git a/src/fs/fs.c b/src/fs/fs.c index 2d080bfa6..2888abe93 100644 --- a/src/fs/fs.c +++ b/src/fs/fs.c @@ -29,6 +29,161 @@ #include "fs.h" +/** + * Start the given job (send signal, remove from pending queue, update + * counters and state). + * + * @param qe job to start + */ +static void +start_job (struct GNUNET_FS_QueueEntry *qe) +{ + qe->client = GNUNET_CLIENT_connect (qe->h->sched, "fs", qe->h->cfg); + if (qe->client == NULL) + { + GNUNET_break (0); + return; + } + qe->start (qe->cls, qe->client); + switch (qe->category) + { + case GNUNET_FS_QC_DOWNLOAD: + qe->h->active_downloads++; + break; + case GNUNET_FS_QC_PROBE: + qe->h->active_probes++; + break; + } + qe->start_time = GNUNET_TIME_absolute_get (); + GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, + qe->h->pending_tail, + qe); + GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, + qe->h->running_tail, + qe->h->running_tail, + qe); +} + + +/** + * Stop the given job (send signal, remove from active queue, update + * counters and state). + * + * @param qe job to stop + */ +static void +stop_job (struct GNUNET_FS_QueueEntry *qe) +{ + qe->client = NULL; + qe->stop (qe->cls); + switch (qe->category) + { + case GNUNET_FS_QC_DOWNLOAD: + qe->h->active_downloads--; + break; + case GNUNET_FS_QC_PROBE: + qe->h->active_probes--; + break; + } + qe->run_time = GNUNET_TIME_relative_add (qe->run_time, + GNUNET_TIME_absolute_get_duration (qe->start_time)); + GNUNET_CONTAINER_DLL_remove (qe->h->running_head, + qe->h->running_tail, + qe); + GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, + qe->h->pending_tail, + qe->h->pending_tail, + qe); +} + + +/** + * Process the jobs in the job queue, possibly starting some + * and stopping others. + * + * @param cls the 'struct GNUNET_FS_Handle' + * @param tc scheduler context + */ +static void +process_job_queue (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_FS_Handle *h = cls; + + h->queue_job = GNUNET_SCHEDULER_NO_TASK; + /* FIXME: stupid implementation that just starts everything follows... */ + while (NULL != h->pending_head) + start_job (h->pending_head); + + /* FIXME: possibly re-schedule queue-job! */ +} + +/** + * Add a job to the queue. + * + * @param h handle to the overall FS state + * @param start function to call to begin the job + * @param stop function to call to pause the job, or on dequeue (if the job was running) + * @param cls closure for start and stop + * @param cat category of the job + * @return queue handle + */ +struct GNUNET_FS_QueueEntry * +GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, + GNUNET_FS_QueueStart start, + GNUNET_FS_QueueStop stop, + void *cls, + enum GNUNET_FS_QueueCategory cat) +{ + struct GNUNET_FS_QueueEntry *qe; + + qe = GNUNET_malloc (sizeof (struct GNUNET_FS_QueueEntry)); + qe->h = h; + qe->start = start; + qe->stop = stop; + qe->cls = cls; + qe->queue_time = GNUNET_TIME_absolute_get (); + qe->category = cat; + GNUNET_CONTAINER_DLL_insert_after (h->pending_head, + h->pending_tail, + h->pending_tail, + qe); + if (h->queue_job != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (h->sched, + h->queue_job); + h->queue_job + = GNUNET_SCHEDULER_add_now (h->sched, + &process_job_queue, + h); + return qe; +} + + +/** + * Dequeue a job from the queue. + * @param qh handle for the job + */ +void +GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh) +{ + if (qh->client != NULL) + { + if (qh->h->queue_job != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (qh->h->sched, + qh->h->queue_job); + qh->h->queue_job + = GNUNET_SCHEDULER_add_now (qh->h->sched, + &process_job_queue, + qh->h); + stop_job (qh); + } + GNUNET_CONTAINER_DLL_remove (qh->h->pending_head, + qh->h->pending_tail, + qh); + GNUNET_free (qh); +} + + /** * Setup a connection to the file-sharing service. * @@ -97,6 +252,9 @@ GNUNET_FS_stop (struct GNUNET_FS_Handle *h) { // FIXME: serialize state!? (or is it always serialized???) // FIXME: terminate receive-loop with client + if (h->queue_job != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (h->sched, + h->queue_job); GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); GNUNET_free (h->client_name); GNUNET_free (h); diff --git a/src/fs/fs.h b/src/fs/fs.h index fdf501b44..b10add77c 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -445,6 +445,131 @@ struct GNUNET_FS_FileInformation }; +/** + * The job is now ready to run and should use the given client + * handle to communicate with the FS service. + * + * @param cls closure + * @param client handle to use for FS communication + */ +typedef void (*GNUNET_FS_QueueStart)(void *cls, + struct GNUNET_CLIENT_Connection *client); + + +/** + * The job must now stop to run and should destry the client handle as + * soon as possible (ideally prior to returning). + */ +typedef void (*GNUNET_FS_QueueStop)(void *cls); + +/** + * Categories of jobs in the FS queue. + */ +enum GNUNET_FS_QueueCategory + { + /** + * File download. + */ + GNUNET_FS_QC_DOWNLOAD, + + /** + * Availability probe (related to search). + */ + GNUNET_FS_QC_PROBE + + }; + +/** + * Entry in the job queue. + */ +struct GNUNET_FS_QueueEntry +{ + /** + * This is a linked list. + */ + struct GNUNET_FS_QueueEntry *next; + + /** + * This is a linked list. + */ + struct GNUNET_FS_QueueEntry *prev; + + /** + * Function to call when the job is started. + */ + GNUNET_FS_QueueStart start; + + /** + * Function to call when the job needs to stop (or is done / dequeued). + */ + GNUNET_FS_QueueStop stop; + + /** + * Closure for start and stop. + */ + void *cls; + + /** + * Handle to FS primary context. + */ + struct GNUNET_FS_Handle *h; + + /** + * Client handle, or NULL if job is not running. + */ + struct GNUNET_CLIENT_Connection *client; + + /** + * Time the job was originally queued. + */ + struct GNUNET_TIME_Absolute queue_time; + + /** + * Time the job was started last. + */ + struct GNUNET_TIME_Absolute start_time; + + /** + * Total amount of time the job has been running (except for the + * current run). + */ + struct GNUNET_TIME_Relative run_time; + + /** + * What type of job is this? + */ + enum GNUNET_FS_QueueCategory category; + +}; + + + + +/** + * Add a job to the queue. + * + * @param h handle to the overall FS state + * @param start function to call to begin the job + * @param stop function to call to pause the job, or on dequeue (if the job was running) + * @param cls closure for start and stop + * @return queue handle + */ +struct GNUNET_FS_QueueEntry * +GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, + GNUNET_FS_QueueStart start, + GNUNET_FS_QueueStop stop, + void *cls, + enum GNUNET_FS_QueueCategory cat); + + +/** + * Dequeue a job from the queue. + * @param qh handle for the job + */ +void +GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh); + + /** * Master context for most FS operations. */ @@ -481,12 +606,42 @@ struct GNUNET_FS_Handle struct GNUNET_CLIENT_Connection *client; /** - * How many downloads probing availability - * of search results do we have running - * right now? + * Head of DLL of running jobs. + */ + struct GNUNET_FS_QueueEntry *running_head; + + /** + * Tail of DLL of running jobs. + */ + struct GNUNET_FS_QueueEntry *running_tail; + + /** + * Head of DLL of pending jobs. + */ + struct GNUNET_FS_QueueEntry *pending_head; + + /** + * Tail of DLL of pending jobs. + */ + struct GNUNET_FS_QueueEntry *pending_tail; + + /** + * Task that processes the jobs in the running and pending queues + * (and moves jobs around as needed). + */ + GNUNET_SCHEDULER_TaskIdentifier queue_job; + + /** + * How many downloads probing availability of search results do we + * have running right now? */ unsigned int active_probes; + /** + * How many actual downloads do we have running right now? + */ + unsigned int active_downloads; + /** * General flags. */ diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 8194ad9f5..1f03082e8 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -1396,26 +1396,31 @@ GNUNET_FS_download_start (struct GNUNET_FS_Handle *h, dc->treedepth); #endif // FIXME: make persistent + pi.status = GNUNET_FS_STATUS_DOWNLOAD_START; + make_download_status (&pi, dc); + pi.value.download.specifics.start.meta = meta; + dc->client_info = dc->h->upcb (dc->h->upcb_cls, + &pi); + - // FIXME: bound parallelism here! + // FIXME: bound parallelism here client = GNUNET_CLIENT_connect (h->sched, "fs", h->cfg); + GNUNET_assert (NULL != client); dc->client = client; - schedule_block_download (dc, - &dc->uri->data.chk.chk, - 0, - 1 /* 0 == CHK, 1 == top */); GNUNET_CLIENT_receive (client, &receive_results, dc, GNUNET_TIME_UNIT_FOREVER_REL); - pi.status = GNUNET_FS_STATUS_DOWNLOAD_START; + pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; make_download_status (&pi, dc); - pi.value.download.specifics.start.meta = meta; dc->client_info = dc->h->upcb (dc->h->upcb_cls, &pi); - + schedule_block_download (dc, + &dc->uri->data.chk.chk, + 0, + 1 /* 0 == CHK, 1 == top */); return dc; } diff --git a/src/include/gnunet_fs_service.h b/src/include/gnunet_fs_service.h index 6926b537e..64bfda3ba 100644 --- a/src/include/gnunet_fs_service.h +++ b/src/include/gnunet_fs_service.h @@ -556,6 +556,18 @@ enum GNUNET_FS_Status */ GNUNET_FS_STATUS_DOWNLOAD_STOPPED, + /** + * Notification that this download is now actively being + * pursued (as opposed to waiting in the queue). + */ + GNUNET_FS_STATUS_DOWNLOAD_ACTIVE, + + /** + * Notification that this download is no longer actively + * being pursued (back in the queue). + */ + GNUNET_FS_STATUS_DOWNLOAD_INACTIVE, + /** * First event generated when a client requests * a search to begin or when a namespace result -- 2.25.1