Basic api documentation #7 (#220)
[oweals/peertube.git] / server / lib / jobs / job-scheduler.ts
1 import { AsyncQueue, forever, queue } from 'async'
2 import * as Sequelize from 'sequelize'
3 import { JobCategory } from '../../../shared'
4 import { logger } from '../../helpers/logger'
5 import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
6 import { JobModel } from '../../models/job/job'
7
8 export interface JobHandler<P, T> {
9   process (data: object, jobId: number): Promise<T>
10   onError (err: Error, jobId: number)
11   onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any>
12 }
13 type JobQueueCallback = (err: Error) => void
14
15 class JobScheduler<P, T> {
16
17   constructor (
18     private jobCategory: JobCategory,
19     private jobHandlers: { [ id: string ]: JobHandler<P, T> }
20   ) {}
21
22   async activate () {
23     const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
24
25     logger.info('Jobs scheduler %s activated.', this.jobCategory)
26
27     const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))
28
29     // Finish processing jobs from a previous start
30     const state = JOB_STATES.PROCESSING
31     try {
32       const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
33
34       this.enqueueJobs(jobsQueue, jobs)
35     } catch (err) {
36       logger.error('Cannot list pending jobs.', err)
37     }
38
39     forever(
40       async next => {
41         if (jobsQueue.length() !== 0) {
42           // Finish processing the queue first
43           return setTimeout(next, JOBS_FETCHING_INTERVAL)
44         }
45
46         const state = JOB_STATES.PENDING
47         try {
48           const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
49
50           this.enqueueJobs(jobsQueue, jobs)
51         } catch (err) {
52           logger.error('Cannot list pending jobs.', err)
53         }
54
55         // Optimization: we could use "drain" from queue object
56         return setTimeout(next, JOBS_FETCHING_INTERVAL)
57       },
58
59       err => logger.error('Error in job scheduler queue.', err)
60     )
61   }
62
63   createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
64     const createQuery = {
65       state: JOB_STATES.PENDING,
66       category: this.jobCategory,
67       handlerName,
68       handlerInputData
69     }
70
71     const options = { transaction }
72
73     return JobModel.create(createQuery, options)
74   }
75
76   private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
77     jobs.forEach(job => jobsQueue.push(job))
78   }
79
80   private async processJob (job: JobModel, callback: (err: Error) => void) {
81     const jobHandler = this.jobHandlers[job.handlerName]
82     if (jobHandler === undefined) {
83       const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
84       logger.error(errorString)
85
86       const error = new Error(errorString)
87       await this.onJobError(jobHandler, job, error)
88       return callback(error)
89     }
90
91     logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
92
93     job.state = JOB_STATES.PROCESSING
94     await job.save()
95
96     try {
97       const result: T = await jobHandler.process(job.handlerInputData, job.id)
98       await this.onJobSuccess(jobHandler, job, result)
99     } catch (err) {
100       logger.error('Error in job handler %s.', job.handlerName, err)
101
102       try {
103         await this.onJobError(jobHandler, job, err)
104       } catch (innerErr) {
105         this.cannotSaveJobError(innerErr)
106         return callback(innerErr)
107       }
108     }
109
110     return callback(null)
111   }
112
113   private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
114     job.state = JOB_STATES.ERROR
115
116     try {
117       await job.save()
118       if (jobHandler) await jobHandler.onError(err, job.id)
119     } catch (err) {
120       this.cannotSaveJobError(err)
121     }
122   }
123
124   private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
125     job.state = JOB_STATES.SUCCESS
126
127     try {
128       await job.save()
129       await jobHandler.onSuccess(job.id, jobResult, this)
130     } catch (err) {
131       this.cannotSaveJobError(err)
132     }
133   }
134
135   private cannotSaveJobError (err: Error) {
136     logger.error('Cannot save new job state.', err)
137   }
138 }
139
140 // ---------------------------------------------------------------------------
141
142 export {
143   JobScheduler
144 }