</div>
<p-table
- [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="id"
+ [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="uniqId"
[sortField]="sort.field" [sortOrder]="sort.order" (onLazyLoad)="loadLazy($event)"
>
<ng-template pTemplate="header">
<th i18n style="width: 210px">Type</th>
<th i18n style="width: 130px">State</th>
<th i18n style="width: 250px" pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th>
- <th i18n style="width: 250px">Updated</th>
+ <th i18n style="width: 250px">Processed on</th>
+ <th i18n style="width: 250px">Finished on</th>
</tr>
</ng-template>
<td>{{ job.type }}</td>
<td>{{ job.state }}</td>
<td>{{ job.createdAt }}</td>
- <td>{{ job.updatedAt }}</td>
+ <td>{{ job.processedOn }}</td>
+ <td>{{ job.finishedOn }}</td>
</tr>
</ng-template>
<ng-template pTemplate="rowexpansion" let-job>
<tr>
- <td colspan="6">
+ <td colspan="7">
<pre>{{ job.data }}</pre>
</td>
</tr>
<tr class="job-error" *ngIf="job.error">
- <td colspan="6">
+ <td colspan="7">
<pre>{{ job.error }}</pre>
</td>
</tr>
export class JobsListComponent extends RestTable implements OnInit {
private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
- jobState: JobState = 'inactive'
- jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]
+ jobState: JobState = 'waiting'
+ jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
jobs: Job[] = []
totalRecords: number
rowsPerPage = 10
return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
.pipe(
- map(res => this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'updatedAt' ])),
+ map(res => {
+ return this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'processedOn', 'finishedOn' ])
+ }),
map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)),
+ map(res => this.restExtractor.applyToResultListData(res, this.buildUniqId)),
catchError(err => this.restExtractor.handleError(err))
)
}
return Object.assign(obj, { data })
}
+
+ private buildUniqId (obj: Job) {
+ return Object.assign(obj, { uniqId: `${obj.id}-${obj.type}` })
+ }
}
rm -f "./config/local-test.json"
rm -f "./config/local-test-$i.json"
createdb -O peertube "peertube_test$i"
- redis-cli KEYS "q-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
+ redis-cli KEYS "bull-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
done
} from '../../middlewares'
import { paginationValidator } from '../../middlewares/validators'
import { listJobsValidator } from '../../middlewares/validators/jobs'
+import { isArray } from '../../helpers/custom-validators/misc'
const jobsRouter = express.Router()
// ---------------------------------------------------------------------------
async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
- const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC'
+ const state: JobState = req.params.state
+ const asc = req.query.sort === 'createdAt'
- const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort)
- const total = await JobQueue.Instance.count(req.params.state)
+ const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
+ const total = await JobQueue.Instance.count(state)
const result: ResultList<any> = {
total,
- data: jobs.map(j => formatJob(j.toJSON()))
+ data: jobs.map(j => formatJob(j, state))
}
return res.json(result)
}
-function formatJob (job: any): Job {
+function formatJob (job: any, state: JobState): Job {
+ const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null
+
return {
id: job.id,
- state: job.state as JobState,
- type: job.type as JobType,
+ state: state,
+ type: job.queue.name as JobType,
data: job.data,
- error: job.error,
- createdAt: new Date(parseInt(job.created_at, 10)),
- updatedAt: new Date(parseInt(job.updated_at, 10))
+ error,
+ createdAt: new Date(job.timestamp),
+ finishedOn: new Date(job.finishedOn),
+ processedOn: new Date(job.processedOn)
}
}
import { JobState } from '../../../shared/models'
import { exists } from './misc'
-const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]
+const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
function isValidJobState (value: JobState) {
return exists(value) && jobStates.indexOf(value) !== -1
// ---------------------------------------------------------------------------
-const LAST_MIGRATION_VERSION = 225
+const LAST_MIGRATION_VERSION = 230
// ---------------------------------------------------------------------------
--- /dev/null
+import * as Sequelize from 'sequelize'
+import { createClient } from 'redis'
+import { CONFIG } from '../constants'
+import { JobQueue } from '../../lib/job-queue'
+import { initDatabaseModels } from '../database'
+
+async function up (utils: {
+ transaction: Sequelize.Transaction
+ queryInterface: Sequelize.QueryInterface
+ sequelize: Sequelize.Sequelize
+}): Promise<any> {
+ await initDatabaseModels(false)
+
+ return new Promise((res, rej) => {
+ const client = createClient({
+ host: CONFIG.REDIS.HOSTNAME,
+ port: CONFIG.REDIS.PORT,
+ db: CONFIG.REDIS.DB
+ })
+
+ const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST
+
+ client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => {
+ if (err) return rej(err)
+
+ const jobPromises = jobStrings
+ .map(s => s.split('|'))
+ .map(([ , jobId ]) => {
+ return new Promise((res, rej) => {
+ client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => {
+ if (err) return rej(err)
+
+ try {
+ const parsedData = JSON.parse(job.data)
+
+ return res({ type: job.type, payload: parsedData })
+ } catch (err) {
+ console.error('Cannot parse data %s.', job.data)
+ return res(null)
+ }
+ })
+ })
+ })
+
+ JobQueue.Instance.init()
+ .then(() => Promise.all(jobPromises))
+ .then((jobs: any) => {
+ const createJobPromises = jobs
+ .filter(job => job !== null)
+ .map(job => JobQueue.Instance.createJob(job))
+
+ return Promise.all(createJobPromises)
+ })
+ .then(() => res())
+ })
+ })
+}
+
+function down (options) {
+ throw new Error('Not implemented.')
+}
+
+export { up, down }
-import * as kue from 'kue'
+import * as Bull from 'bull'
import { logger } from '../../../helpers/logger'
import { getServerActor } from '../../../helpers/utils'
import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers'
host: string
}
-async function processActivityPubFollow (job: kue.Job) {
+async function processActivityPubFollow (job: Bull.Job) {
const payload = job.data as ActivitypubFollowPayload
const host = payload.host
-import * as kue from 'kue'
+import * as Bull from 'bull'
import * as Bluebird from 'bluebird'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
body: any
}
-async function processActivityPubHttpBroadcast (job: kue.Job) {
+async function processActivityPubHttpBroadcast (job: Bull.Job) {
logger.info('Processing ActivityPub broadcast in job %d.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload
-import * as kue from 'kue'
+import * as Bull from 'bull'
import { logger } from '../../../helpers/logger'
import { processActivities } from '../../activitypub/process'
import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
uris: string[]
}
-async function processActivityPubHttpFetcher (job: kue.Job) {
+async function processActivityPubHttpFetcher (job: Bull.Job) {
logger.info('Processing ActivityPub fetcher in job %d.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload
-import * as kue from 'kue'
+import * as Bull from 'bull'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
body: any
}
-async function processActivityPubHttpUnicast (job: kue.Job) {
+async function processActivityPubHttpUnicast (job: Bull.Job) {
logger.info('Processing ActivityPub unicast in job %d.', job.id)
const payload = job.data as ActivitypubHttpUnicastPayload
-import * as kue from 'kue'
+import * as Bull from 'bull'
import { logger } from '../../../helpers/logger'
import { Emailer } from '../../emailer'
text: string
}
-async function processEmail (job: kue.Job) {
+async function processEmail (job: Bull.Job) {
const payload = job.data as EmailPayload
logger.info('Processing email in job %d.', job.id)
-import * as kue from 'kue'
+import * as Bull from 'bull'
import { VideoResolution, VideoState } from '../../../../shared'
import { logger } from '../../../helpers/logger'
import { computeResolutionsToTranscode } from '../../../helpers/utils'
import { federateVideoIfNeeded } from '../../activitypub'
import { retryTransactionWrapper } from '../../../helpers/database-utils'
import { sequelizeTypescript } from '../../../initializers'
+import * as Bluebird from 'bluebird'
export type VideoFilePayload = {
videoUUID: string
filePath: string
}
-async function processVideoFileImport (job: kue.Job) {
+async function processVideoFileImport (job: Bull.Job) {
const payload = job.data as VideoFileImportPayload
logger.info('Processing video file import in job %d.', job.id)
return video
}
-async function processVideoFile (job: kue.Job) {
+async function processVideoFile (job: Bull.Job) {
const payload = job.data as VideoFilePayload
logger.info('Processing video file in job %d.', job.id)
)
if (resolutionsEnabled.length !== 0) {
- const tasks: Promise<any>[] = []
+ const tasks: Bluebird<any>[] = []
for (const resolution of resolutionsEnabled) {
const dataInput = {
-import * as kue from 'kue'
+import * as Bull from 'bull'
import { JobState, JobType } from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
-import { Redis } from '../redis'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { EmailPayload, processEmail } from './handlers/email'
-import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file'
+import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
type CreateJobArgument =
{ type: 'video-file', payload: VideoFilePayload } |
{ type: 'email', payload: EmailPayload }
-const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
+const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'email': processEmail
}
-const jobsWithTLL: JobType[] = [
+const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
+ 'activitypub-http-broadcast': true,
+ 'activitypub-http-unicast': true,
+ 'activitypub-http-fetcher': true,
+ 'activitypub-follow': true
+}
+
+const jobTypes: JobType[] = [
+ 'activitypub-follow',
'activitypub-http-broadcast',
- 'activitypub-http-unicast',
'activitypub-http-fetcher',
- 'activitypub-follow'
+ 'activitypub-http-unicast',
+ 'email',
+ 'video-file',
+ 'video-file-import'
]
class JobQueue {
private static instance: JobQueue
- private jobQueue: kue.Queue
+ private queues: { [ id in JobType ]?: Bull.Queue } = {}
private initialized = false
private jobRedisPrefix: string
if (this.initialized === true) return
this.initialized = true
- this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST
-
- this.jobQueue = kue.createQueue({
+ this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST
+ const queueOptions = {
prefix: this.jobRedisPrefix,
redis: {
host: CONFIG.REDIS.HOSTNAME,
auth: CONFIG.REDIS.AUTH,
db: CONFIG.REDIS.DB
}
- })
-
- this.jobQueue.setMaxListeners(20)
+ }
- this.jobQueue.on('error', err => {
- logger.error('Error in job queue.', { err })
- process.exit(-1)
- })
- this.jobQueue.watchStuckJobs(5000)
+ for (const handlerName of Object.keys(handlers)) {
+ const queue = new Bull(handlerName, queueOptions)
+ const handler = handlers[handlerName]
- await this.reactiveStuckJobs()
+ queue.process(JOB_CONCURRENCY[handlerName], handler)
+ .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err }))
- for (const handlerName of Object.keys(handlers)) {
- this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
- try {
- const res = await handlers[ handlerName ](job)
- return done(null, res)
- } catch (err) {
- logger.error('Cannot execute job %d.', job.id, { err })
- return done(err)
- }
+ queue.on('error', err => {
+ logger.error('Error in job queue %s.', handlerName, { err })
+ process.exit(-1)
})
+
+ this.queues[handlerName] = queue
}
}
- createJob (obj: CreateJobArgument, priority = 'normal') {
- return new Promise((res, rej) => {
- let job = this.jobQueue
- .create(obj.type, obj.payload)
- .priority(priority)
- .attempts(JOB_ATTEMPTS[obj.type])
- .backoff({ delay: 60 * 1000, type: 'exponential' })
+ createJob (obj: CreateJobArgument) {
+ const queue = this.queues[obj.type]
+ if (queue === undefined) {
+ logger.error('Unknown queue %s: cannot create job.', obj.type)
+ return
+ }
- if (jobsWithTLL.indexOf(obj.type) !== -1) {
- job = job.ttl(JOB_REQUEST_TTL)
- }
+ const jobArgs: Bull.JobOptions = {
+ backoff: { delay: 60 * 1000, type: 'exponential' },
+ attempts: JOB_ATTEMPTS[obj.type]
+ }
- return job.save(err => {
- if (err) return rej(err)
+ if (jobsWithRequestTimeout[obj.type] === true) {
+ jobArgs.timeout = JOB_REQUEST_TTL
+ }
- return res()
- })
- })
+ return queue.add(obj.payload, jobArgs)
}
- async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> {
- const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count)
+ async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
+ let results: Bull.Job[] = []
- const jobPromises = jobStrings
- .map(s => s.split('|'))
- .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10)))
+ // TODO: optimize
+ for (const jobType of jobTypes) {
+ const queue = this.queues[ jobType ]
+ if (queue === undefined) {
+ logger.error('Unknown queue %s to list jobs.', jobType)
+ continue
+ }
- return Promise.all(jobPromises)
- }
+ // FIXME: Bull queue typings does not have getJobs method
+ const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
+ results = results.concat(jobs)
+ }
- count (state: JobState) {
- return new Promise<number>((res, rej) => {
- this.jobQueue[state + 'Count']((err, total) => {
- if (err) return rej(err)
+ results.sort((j1: any, j2: any) => {
+ if (j1.timestamp < j2.timestamp) return -1
+ else if (j1.timestamp === j2.timestamp) return 0
- return res(total)
- })
+ return 1
})
- }
- removeOldJobs () {
- const now = new Date().getTime()
- kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
- if (err) {
- logger.error('Cannot get jobs when removing old jobs.', { err })
- return
- }
+ if (asc === false) results.reverse()
- for (const job of jobs) {
- if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
- job.remove()
- }
- }
- })
+ return results.slice(start, start + count)
}
- private reactiveStuckJobs () {
- const promises: Promise<any>[] = []
-
- this.jobQueue.active((err, ids) => {
- if (err) throw err
+ async count (state: JobState): Promise<number> {
+ let total = 0
- for (const id of ids) {
- kue.Job.get(id, (err, job) => {
- if (err) throw err
+ for (const type of jobTypes) {
+ const queue = this.queues[ type ]
+ if (queue === undefined) {
+ logger.error('Unknown queue %s to count jobs.', type)
+ continue
+ }
- const p = new Promise((res, rej) => {
- job.inactive(err => {
- if (err) return rej(err)
- return res()
- })
- })
+ const counts = await queue.getJobCounts()
- promises.push(p)
- })
- }
- })
+ total += counts[ state ]
+ }
- return Promise.all(promises)
+ return total
}
- private getJob (id: number) {
- return new Promise<kue.Job>((res, rej) => {
- kue.Job.get(id, (err, job) => {
- if (err) return rej(err)
-
- return res(job)
- })
- })
+ removeOldJobs () {
+ for (const key of Object.keys(this.queues)) {
+ const queue = this.queues[key]
+ queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
+ }
}
static get Instance () {
return this.setObject(this.buildCachedRouteKey(req), cached, lifetime)
}
- listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) {
- return new Promise<string[]>((res, rej) => {
- this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => {
- if (err) return rej(err)
-
- return res(values)
- })
- })
- }
-
generateResetPasswordKey (userId: number) {
return 'reset-password-' + userId
}
import { VideoPrivacy } from '../../../../shared/models/videos'
import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model'
import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils'
-
import {
- flushAndRunMultipleServers, flushTests, getVideosList, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo,
+ flushAndRunMultipleServers,
+ getVideosList,
+ killallServers,
+ ServerInfo,
+ setAccessTokensToServers,
+ uploadVideo,
wait
} from '../../utils/index'
import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows'
import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
import {
- addVideoCommentReply, addVideoCommentThread, getVideoCommentThreads,
+ addVideoCommentReply,
+ addVideoCommentThread,
+ getVideoCommentThreads,
getVideoThreadComments
} from '../../utils/videos/video-comments'
})
it('Should not have pending/processing jobs anymore', async function () {
- const states: JobState[] = [ 'inactive', 'active' ]
+ const states: JobState[] = [ 'waiting', 'active' ]
for (const state of states) {
const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
import * as chai from 'chai'
import 'mocha'
-import { flushTests, killallServers, ServerInfo, setAccessTokensToServers, wait } from '../../utils/index'
+import { killallServers, ServerInfo, setAccessTokensToServers } from '../../utils/index'
import { doubleFollow } from '../../utils/server/follows'
import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
import { flushAndRunMultipleServers } from '../../utils/server/servers'
})
it('Should list jobs', async function () {
- const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete')
+ const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed')
expect(res.body.total).to.be.above(2)
expect(res.body.data).to.have.length.above(2)
})
it('Should list jobs with sort and pagination', async function () {
- const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt')
+ const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 1, 'createdAt')
expect(res.body.total).to.be.above(2)
expect(res.body.data).to.have.lengthOf(1)
const job = res.body.data[0]
- expect(job.state).to.equal('complete')
+ expect(job.state).to.equal('completed')
expect(job.type).to.equal('activitypub-http-unicast')
expect(dateIsValid(job.createdAt)).to.be.true
- expect(dateIsValid(job.updatedAt)).to.be.true
+ expect(dateIsValid(job.processedOn)).to.be.true
+ expect(dateIsValid(job.finishedOn)).to.be.true
})
after(async function () {
}
async function isTherePendingRequests (servers: ServerInfo[]) {
- const states: JobState[] = [ 'inactive', 'active', 'delayed' ]
+ const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
const tasks: Promise<any>[] = []
let pendingRequests = false
if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
else servers = serversArg as ServerInfo[]
- const states: JobState[] = [ 'inactive', 'active', 'delayed' ]
+ const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
const tasks: Promise<any>[] = []
let pendingRequests: boolean
-export type JobState = 'active' | 'complete' | 'failed' | 'inactive' | 'delayed'
+export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
export type JobType = 'activitypub-http-unicast' |
'activitypub-http-broadcast' |
data: any,
error: any,
createdAt: Date
- updatedAt: Date
+ finishedOn: Date
+ processedOn: Date
}