}
function activate () {
+ const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
+
logger.info('Jobs scheduler activated.')
const jobsQueue = queue(processJob)
- forever(
- function (next) {
- if (jobsQueue.length() !== 0) {
- // Finish processing the queue first
- return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
- }
+ // Finish processing jobs from a previous start
+ const state = constants.JOB_STATES.PROCESSING
+ db.Job.listWithLimit(limit, state, function (err, jobs) {
+ enqueueJobs(err, jobsQueue, jobs)
- db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
- if (err) {
- logger.error('Cannot list pending jobs.', { error: err })
- } else {
- jobs.forEach(function (job) {
- jobsQueue.push(job)
- })
+ forever(
+ function (next) {
+ if (jobsQueue.length() !== 0) {
+ // Finish processing the queue first
+ return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
}
- // Optimization: we could use "drain" from queue object
- return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
- })
- }
- )
+ const state = constants.JOB_STATES.PENDING
+ db.Job.listWithLimit(limit, state, function (err, jobs) {
+ if (err) {
+ logger.error('Cannot list pending jobs.', { error: err })
+ } else {
+ jobs.forEach(function (job) {
+ jobsQueue.push(job)
+ })
+ }
+
+ // Optimization: we could use "drain" from queue object
+ return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+ })
+ }
+ )
+ })
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
+function enqueueJobs (err, jobsQueue, jobs) {
+ if (err) {
+ logger.error('Cannot list pending jobs.', { error: err })
+ } else {
+ jobs.forEach(function (job) {
+ jobsQueue.push(job)
+ })
+ }
+}
+
function createJob (transaction, handlerName, handlerInputData, callback) {
const createQuery = {
state: constants.JOB_STATES.PENDING,