import { Injectable } from '@angular/core'
import { SortMeta } from 'primeng/api'
import { Observable } from 'rxjs'
-import { ResultList } from '../../../../../../shared'
+import { JobType, ResultList } from '../../../../../../shared'
import { JobState } from '../../../../../../shared/models'
import { Job } from '../../../../../../shared/models/server/job.model'
import { environment } from '../../../../environments/environment'
import { RestExtractor, RestPagination, RestService } from '../../../shared'
+import { JobTypeClient } from '../../../../types/job-type-client.type'
@Injectable()
export class JobService {
private restExtractor: RestExtractor
) {}
- getJobs (state: JobState, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
+ getJobs (state: JobState, jobType: JobTypeClient, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
let params = new HttpParams()
params = this.restService.addRestGetParams(params, pagination, sort)
+ if (jobType !== 'all') params = params.append('jobType', jobType)
+
return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
.pipe(
map(res => {
<div class="admin-sub-header">
<div i18n class="form-sub-title">Jobs list</div>
- <div class="peertube-select-container">
- <select [(ngModel)]="jobState" (ngModelChange)="onJobStateChanged()">
- <option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
- </select>
+ <div class="select-filter-block">
+ <label for="jobType">Job type</label>
+ <div class="peertube-select-container">
+ <select id="jobType" name="jobType" [(ngModel)]="jobType" (ngModelChange)="onJobStateOrTypeChanged()">
+ <option *ngFor="let jobType of jobTypes" [value]="jobType">{{ jobType }}</option>
+ </select>
+ </div>
+ </div>
+
+ <div class="select-filter-block">
+ <label for="jobState">Job state</label>
+ <div class="peertube-select-container">
+ <select id="jobState" name="jobState" [(ngModel)]="jobState" (ngModelChange)="onJobStateOrTypeChanged()">
+ <option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
+ </select>
+ </div>
</div>
</div>
@import '_variables';
@import '_mixins';
-.peertube-select-container {
- @include peertube-select-container(auto);
+.admin-sub-header {
+ align-items: flex-end;
+
+ .select-filter-block {
+ &:not(:last-child) {
+ margin-right: 10px;
+ }
+
+ label {
+ margin-bottom: 2px;
+ }
+
+ .peertube-select-container {
+ @include peertube-select-container(auto);
+ }
+ }
}
pre {
import { peertubeLocalStorage } from '@app/shared/misc/peertube-local-storage'
import { Notifier } from '@app/core'
import { SortMeta } from 'primeng/api'
-import { Job } from '../../../../../../shared/index'
+import { Job, JobType } from '../../../../../../shared/index'
import { JobState } from '../../../../../../shared/models'
import { RestPagination, RestTable } from '../../../shared'
import { JobService } from './job.service'
import { I18n } from '@ngx-translate/i18n-polyfill'
+import { JobTypeClient } from '../../../../types/job-type-client.type'
@Component({
selector: 'my-jobs',
})
export class JobsComponent extends RestTable implements OnInit {
private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
+ private static JOB_STATE_LOCAL_STORAGE_TYPE = 'jobs-list-type'
jobState: JobState = 'waiting'
jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
+
+ jobType: JobTypeClient = 'all'
+ jobTypes: JobTypeClient[] = [
+ 'all',
+ 'activitypub-follow',
+ 'activitypub-http-broadcast',
+ 'activitypub-http-fetcher',
+ 'activitypub-http-unicast',
+ 'email',
+ 'video-transcoding',
+ 'video-file-import',
+ 'video-import',
+ 'videos-views',
+ 'activitypub-refresher'
+ ]
+
jobs: Job[] = []
totalRecords: number
rowsPerPage = 10
}
ngOnInit () {
- this.loadJobState()
+ this.loadJobStateAndType()
this.initialize()
}
- onJobStateChanged () {
+ onJobStateOrTypeChanged () {
this.pagination.start = 0
this.loadData()
- this.saveJobState()
+ this.saveJobStateAndType()
}
protected loadData () {
this.jobsService
- .getJobs(this.jobState, this.pagination, this.sort)
+ .getJobs(this.jobState, this.jobType, this.pagination, this.sort)
.subscribe(
resultList => {
this.jobs = resultList.data
)
}
- private loadJobState () {
- const result = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE)
+ private loadJobStateAndType () {
+ const state = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE)
+ if (state) this.jobState = state as JobState
- if (result) this.jobState = result as JobState
+ const type = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE)
+ if (type) this.jobType = type as JobType
}
- private saveJobState () {
+ private saveJobStateAndType () {
peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE, this.jobState)
+ peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE, this.jobType)
}
}
--- /dev/null
+import { JobType } from '@shared/models'
+
+export type JobTypeClient = 'all' | JobType
jobsSortValidator,
setDefaultSort,
setDefaultPagination,
- asyncMiddleware(listJobsValidator),
+ listJobsValidator,
asyncMiddleware(listJobs)
)
async function listJobs (req: express.Request, res: express.Response) {
const state = req.params.state as JobState
const asc = req.query.sort === 'createdAt'
+ const jobType = req.query.jobType
- const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
+ const jobs = await JobQueue.Instance.listForApi({
+ state,
+ start: req.query.start,
+ count: req.query.count,
+ asc,
+ jobType
+ })
const total = await JobQueue.Instance.count(state)
const result: ResultList<any> = {
import { JobState } from '../../../shared/models'
import { exists } from './misc'
+import { jobTypes } from '@server/lib/job-queue/job-queue'
const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
function isValidJobState (value: JobState) {
- return exists(value) && jobStates.indexOf(value) !== -1
+ return exists(value) && jobStates.includes(value)
+}
+
+function isValidJobType (value: any) {
+ return exists(value) && jobTypes.includes(value)
}
// ---------------------------------------------------------------------------
export {
- isValidJobState
+ isValidJobState,
+ isValidJobType
}
return queue.add(obj.payload, jobArgs)
}
- async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
+ async listForApi (options: {
+ state: JobState,
+ start: number,
+ count: number,
+ asc?: boolean,
+ jobType: JobType
+ }): Promise<Bull.Job[]> {
+ const { state, start, count, asc, jobType } = options
let results: Bull.Job[] = []
+ const filteredJobTypes = this.filterJobTypes(jobType)
+
// TODO: optimize
- for (const jobType of jobTypes) {
+ for (const jobType of filteredJobTypes) {
const queue = this.queues[ jobType ]
if (queue === undefined) {
logger.error('Unknown queue %s to list jobs.', jobType)
return results.slice(start, start + count)
}
- async count (state: JobState): Promise<number> {
+ async count (state: JobState, jobType?: JobType): Promise<number> {
let total = 0
- for (const type of jobTypes) {
+ const filteredJobTypes = this.filterJobTypes(jobType)
+
+ for (const type of filteredJobTypes) {
const queue = this.queues[ type ]
if (queue === undefined) {
logger.error('Unknown queue %s to count jobs.', type)
})
}
+ private filterJobTypes (jobType?: JobType) {
+ if (!jobType) return jobTypes
+
+ return jobTypes.filter(t => t === jobType)
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}
// ---------------------------------------------------------------------------
export {
+ jobTypes,
JobQueue
}
import * as express from 'express'
-import { param } from 'express-validator'
-import { isValidJobState } from '../../helpers/custom-validators/jobs'
+import { param, query } from 'express-validator'
+import { isValidJobState, isValidJobType } from '../../helpers/custom-validators/jobs'
import { logger } from '../../helpers/logger'
import { areValidationErrors } from './utils'
const listJobsValidator = [
- param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
+ param('state')
+ .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
+ query('jobType')
+ .optional()
+ .custom(isValidJobType).withMessage('Should have a valid job state'),
- async (req: express.Request, res: express.Response, next: express.NextFunction) => {
+ (req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })
if (areValidationErrors(req, res)) return
})
})
+ it('Should fail with an incorrect job type', async function () {
+ await makeGetRequest({
+ url: server.url,
+ token: server.accessToken,
+ path,
+ query: {
+ jobType: 'toto'
+ }
+ })
+ })
+
it('Should fail with a bad start pagination', async function () {
await checkBadStartPagination(server.url, path, server.accessToken)
})
statusCodeExpected: 403
})
})
+
})
after(async function () {
const states: JobState[] = [ 'waiting', 'active' ]
for (const state of states) {
- const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
+ const res = await getJobsListPaginationAndSort({
+ url: servers[ 0 ].url,
+ accessToken: servers[ 0 ].accessToken,
+ state: state,
+ start: 0,
+ count: 50,
+ sort: '-createdAt'
+ })
expect(res.body.data).to.have.length(0)
}
})
expect(res.body.data).to.have.length.above(2)
})
- it('Should list jobs with sort and pagination', async function () {
- const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 2, 'createdAt')
- expect(res.body.total).to.be.above(2)
- expect(res.body.data).to.have.lengthOf(2)
+ it('Should list jobs with sort, pagination and job type', async function () {
+ {
+ const res = await getJobsListPaginationAndSort({
+ url: servers[ 1 ].url,
+ accessToken: servers[ 1 ].accessToken,
+ state: 'completed',
+ start: 1,
+ count: 2,
+ sort: 'createdAt'
+ })
+ expect(res.body.total).to.be.above(2)
+ expect(res.body.data).to.have.lengthOf(2)
+
+ let job: Job = res.body.data[ 0 ]
+ // Skip repeat jobs
+ if (job.type === 'videos-views') job = res.body.data[ 1 ]
+
+ expect(job.state).to.equal('completed')
+ expect(job.type.startsWith('activitypub-')).to.be.true
+ expect(dateIsValid(job.createdAt as string)).to.be.true
+ expect(dateIsValid(job.processedOn as string)).to.be.true
+ expect(dateIsValid(job.finishedOn as string)).to.be.true
+ }
- let job = res.body.data[0]
- // Skip repeat jobs
- if (job.type === 'videos-views') job = res.body.data[1]
+ {
+ const res = await getJobsListPaginationAndSort({
+ url: servers[ 1 ].url,
+ accessToken: servers[ 1 ].accessToken,
+ state: 'completed',
+ start: 0,
+ count: 100,
+ sort: 'createdAt',
+ jobType: 'activitypub-http-broadcast'
+ })
+ expect(res.body.total).to.be.above(2)
- expect(job.state).to.equal('completed')
- expect(job.type.startsWith('activitypub-')).to.be.true
- expect(dateIsValid(job.createdAt)).to.be.true
- expect(dateIsValid(job.processedOn)).to.be.true
- expect(dateIsValid(job.finishedOn)).to.be.true
+ for (const j of res.body.data as Job[]) {
+ expect(j.type).to.equal('activitypub-http-broadcast')
+ }
+ }
})
after(async function () {
// Check if each server has pending request
for (const server of servers) {
for (const state of states) {
- const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
+ const p = getJobsListPaginationAndSort({
+ url: server.url,
+ accessToken: server.accessToken,
+ state: state,
+ start: 0,
+ count: 10,
+ sort: '-createdAt'
+ })
.then(res => {
if (res.body.total > 0) pendingRequests = true
})
import * as request from 'supertest'
-import { Job, JobState } from '../../models'
+import { Job, JobState, JobType } from '../../models'
import { wait } from '../miscs/miscs'
import { ServerInfo } from './servers'
+import { makeGetRequest } from '@shared/extra-utils'
function getJobsList (url: string, accessToken: string, state: JobState) {
const path = '/api/v1/jobs/' + state
.expect('Content-Type', /json/)
}
-function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) {
+function getJobsListPaginationAndSort (options: {
+ url: string,
+ accessToken: string,
+ state: JobState,
+ start: number,
+ count: number,
+ sort: string,
+ jobType?: JobType
+}) {
+ const { url, accessToken, state, start, count, sort, jobType } = options
const path = '/api/v1/jobs/' + state
- return request(url)
- .get(path)
- .query({ start })
- .query({ count })
- .query({ sort })
- .set('Accept', 'application/json')
- .set('Authorization', 'Bearer ' + accessToken)
- .expect(200)
- .expect('Content-Type', /json/)
+ const query = {
+ start,
+ count,
+ sort,
+ jobType
+ }
+
+ return makeGetRequest({
+ url,
+ path,
+ token: accessToken,
+ statusCodeExpected: 200,
+ query
+ })
}
async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
// Check if each server has pending request
for (const server of servers) {
for (const state of states) {
- const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
+ const p = getJobsListPaginationAndSort({
+ url: server.url,
+ accessToken: server.accessToken,
+ state: state,
+ start: 0,
+ count: 10,
+ sort: '-createdAt'
+ })
.then(res => res.body.data)
.then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
.then(jobs => {
type: JobType
data: any,
error: any,
- createdAt: Date
- finishedOn: Date
- processedOn: Date
+ createdAt: Date | string
+ finishedOn: Date | string
+ processedOn: Date | string
}