#include "fs.h"
+/**
+ * Start the given job (send signal, remove from pending queue, update
+ * counters and state).
+ *
+ * @param qe job to start
+ */
+static void
+start_job (struct GNUNET_FS_QueueEntry *qe)
+{
+ qe->client = GNUNET_CLIENT_connect (qe->h->sched, "fs", qe->h->cfg);
+ if (qe->client == NULL)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ qe->start (qe->cls, qe->client);
+ switch (qe->category)
+ {
+ case GNUNET_FS_QC_DOWNLOAD:
+ qe->h->active_downloads++;
+ break;
+ case GNUNET_FS_QC_PROBE:
+ qe->h->active_probes++;
+ break;
+ }
+ qe->start_time = GNUNET_TIME_absolute_get ();
+ GNUNET_CONTAINER_DLL_remove (qe->h->pending_head,
+ qe->h->pending_tail,
+ qe);
+ GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+ qe->h->running_tail,
+ qe->h->running_tail,
+ qe);
+}
+
+
+/**
+ * Stop the given job (send signal, remove from active queue, update
+ * counters and state).
+ *
+ * @param qe job to stop
+ */
+static void
+stop_job (struct GNUNET_FS_QueueEntry *qe)
+{
+ qe->client = NULL;
+ qe->stop (qe->cls);
+ switch (qe->category)
+ {
+ case GNUNET_FS_QC_DOWNLOAD:
+ qe->h->active_downloads--;
+ break;
+ case GNUNET_FS_QC_PROBE:
+ qe->h->active_probes--;
+ break;
+ }
+ qe->run_time = GNUNET_TIME_relative_add (qe->run_time,
+ GNUNET_TIME_absolute_get_duration (qe->start_time));
+ GNUNET_CONTAINER_DLL_remove (qe->h->running_head,
+ qe->h->running_tail,
+ qe);
+ GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+ qe->h->pending_tail,
+ qe->h->pending_tail,
+ qe);
+}
+
+
+/**
+ * Process the jobs in the job queue, possibly starting some
+ * and stopping others.
+ *
+ * @param cls the 'struct GNUNET_FS_Handle'
+ * @param tc scheduler context
+ */
+static void
+process_job_queue (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_FS_Handle *h = cls;
+
+ h->queue_job = GNUNET_SCHEDULER_NO_TASK;
+ /* FIXME: stupid implementation that just starts everything follows... */
+ while (NULL != h->pending_head)
+ start_job (h->pending_head);
+
+ /* FIXME: possibly re-schedule queue-job! */
+}
+
+/**
+ * Add a job to the queue.
+ *
+ * @param h handle to the overall FS state
+ * @param start function to call to begin the job
+ * @param stop function to call to pause the job, or on dequeue (if the job was running)
+ * @param cls closure for start and stop
+ * @param cat category of the job
+ * @return queue handle
+ */
+struct GNUNET_FS_QueueEntry *
+GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
+ GNUNET_FS_QueueStart start,
+ GNUNET_FS_QueueStop stop,
+ void *cls,
+ enum GNUNET_FS_QueueCategory cat)
+{
+ struct GNUNET_FS_QueueEntry *qe;
+
+ qe = GNUNET_malloc (sizeof (struct GNUNET_FS_QueueEntry));
+ qe->h = h;
+ qe->start = start;
+ qe->stop = stop;
+ qe->cls = cls;
+ qe->queue_time = GNUNET_TIME_absolute_get ();
+ qe->category = cat;
+ GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
+ h->pending_tail,
+ h->pending_tail,
+ qe);
+ if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (h->sched,
+ h->queue_job);
+ h->queue_job
+ = GNUNET_SCHEDULER_add_now (h->sched,
+ &process_job_queue,
+ h);
+ return qe;
+}
+
+
+/**
+ * Dequeue a job from the queue.
+ * @param qh handle for the job
+ */
+void
+GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh)
+{
+ if (qh->client != NULL)
+ {
+ if (qh->h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (qh->h->sched,
+ qh->h->queue_job);
+ qh->h->queue_job
+ = GNUNET_SCHEDULER_add_now (qh->h->sched,
+ &process_job_queue,
+ qh->h);
+ stop_job (qh);
+ }
+ GNUNET_CONTAINER_DLL_remove (qh->h->pending_head,
+ qh->h->pending_tail,
+ qh);
+ GNUNET_free (qh);
+}
+
+
/**
* Setup a connection to the file-sharing service.
*
{
// FIXME: serialize state!? (or is it always serialized???)
// FIXME: terminate receive-loop with client
+ if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (h->sched,
+ h->queue_job);
GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
GNUNET_free (h->client_name);
GNUNET_free (h);
};
+/**
+ * The job is now ready to run and should use the given client
+ * handle to communicate with the FS service.
+ *
+ * @param cls closure
+ * @param client handle to use for FS communication
+ */
+typedef void (*GNUNET_FS_QueueStart)(void *cls,
+ struct GNUNET_CLIENT_Connection *client);
+
+
+/**
+ * The job must now stop to run and should destry the client handle as
+ * soon as possible (ideally prior to returning).
+ */
+typedef void (*GNUNET_FS_QueueStop)(void *cls);
+
+/**
+ * Categories of jobs in the FS queue.
+ */
+enum GNUNET_FS_QueueCategory
+ {
+ /**
+ * File download.
+ */
+ GNUNET_FS_QC_DOWNLOAD,
+
+ /**
+ * Availability probe (related to search).
+ */
+ GNUNET_FS_QC_PROBE
+
+ };
+
+/**
+ * Entry in the job queue.
+ */
+struct GNUNET_FS_QueueEntry
+{
+ /**
+ * This is a linked list.
+ */
+ struct GNUNET_FS_QueueEntry *next;
+
+ /**
+ * This is a linked list.
+ */
+ struct GNUNET_FS_QueueEntry *prev;
+
+ /**
+ * Function to call when the job is started.
+ */
+ GNUNET_FS_QueueStart start;
+
+ /**
+ * Function to call when the job needs to stop (or is done / dequeued).
+ */
+ GNUNET_FS_QueueStop stop;
+
+ /**
+ * Closure for start and stop.
+ */
+ void *cls;
+
+ /**
+ * Handle to FS primary context.
+ */
+ struct GNUNET_FS_Handle *h;
+
+ /**
+ * Client handle, or NULL if job is not running.
+ */
+ struct GNUNET_CLIENT_Connection *client;
+
+ /**
+ * Time the job was originally queued.
+ */
+ struct GNUNET_TIME_Absolute queue_time;
+
+ /**
+ * Time the job was started last.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Total amount of time the job has been running (except for the
+ * current run).
+ */
+ struct GNUNET_TIME_Relative run_time;
+
+ /**
+ * What type of job is this?
+ */
+ enum GNUNET_FS_QueueCategory category;
+
+};
+
+
+
+
+/**
+ * Add a job to the queue.
+ *
+ * @param h handle to the overall FS state
+ * @param start function to call to begin the job
+ * @param stop function to call to pause the job, or on dequeue (if the job was running)
+ * @param cls closure for start and stop
+ * @return queue handle
+ */
+struct GNUNET_FS_QueueEntry *
+GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
+ GNUNET_FS_QueueStart start,
+ GNUNET_FS_QueueStop stop,
+ void *cls,
+ enum GNUNET_FS_QueueCategory cat);
+
+
+/**
+ * Dequeue a job from the queue.
+ * @param qh handle for the job
+ */
+void
+GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh);
+
+
/**
* Master context for most FS operations.
*/
struct GNUNET_CLIENT_Connection *client;
/**
- * How many downloads probing availability
- * of search results do we have running
- * right now?
+ * Head of DLL of running jobs.
+ */
+ struct GNUNET_FS_QueueEntry *running_head;
+
+ /**
+ * Tail of DLL of running jobs.
+ */
+ struct GNUNET_FS_QueueEntry *running_tail;
+
+ /**
+ * Head of DLL of pending jobs.
+ */
+ struct GNUNET_FS_QueueEntry *pending_head;
+
+ /**
+ * Tail of DLL of pending jobs.
+ */
+ struct GNUNET_FS_QueueEntry *pending_tail;
+
+ /**
+ * Task that processes the jobs in the running and pending queues
+ * (and moves jobs around as needed).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier queue_job;
+
+ /**
+ * How many downloads probing availability of search results do we
+ * have running right now?
*/
unsigned int active_probes;
+ /**
+ * How many actual downloads do we have running right now?
+ */
+ unsigned int active_downloads;
+
/**
* General flags.
*/
dc->treedepth);
#endif
// FIXME: make persistent
+ pi.status = GNUNET_FS_STATUS_DOWNLOAD_START;
+ make_download_status (&pi, dc);
+ pi.value.download.specifics.start.meta = meta;
+ dc->client_info = dc->h->upcb (dc->h->upcb_cls,
+ &pi);
+
- // FIXME: bound parallelism here!
+ // FIXME: bound parallelism here
client = GNUNET_CLIENT_connect (h->sched,
"fs",
h->cfg);
+ GNUNET_assert (NULL != client);
dc->client = client;
- schedule_block_download (dc,
- &dc->uri->data.chk.chk,
- 0,
- 1 /* 0 == CHK, 1 == top */);
GNUNET_CLIENT_receive (client,
&receive_results,
dc,
GNUNET_TIME_UNIT_FOREVER_REL);
- pi.status = GNUNET_FS_STATUS_DOWNLOAD_START;
+ pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
make_download_status (&pi, dc);
- pi.value.download.specifics.start.meta = meta;
dc->client_info = dc->h->upcb (dc->h->upcb_cls,
&pi);
-
+ schedule_block_download (dc,
+ &dc->uri->data.chk.chk,
+ 0,
+ 1 /* 0 == CHK, 1 == top */);
return dc;
}
*/
GNUNET_FS_STATUS_DOWNLOAD_STOPPED,
+ /**
+ * Notification that this download is now actively being
+ * pursued (as opposed to waiting in the queue).
+ */
+ GNUNET_FS_STATUS_DOWNLOAD_ACTIVE,
+
+ /**
+ * Notification that this download is no longer actively
+ * being pursued (back in the queue).
+ */
+ GNUNET_FS_STATUS_DOWNLOAD_INACTIVE,
+
/**
* First event generated when a client requests
* a search to begin or when a namespace result