Add ability to filter per job type
authorChocobozzz <me@florianbigard.com>
Wed, 4 Dec 2019 13:49:59 +0000 (14:49 +0100)
committerChocobozzz <me@florianbigard.com>
Wed, 4 Dec 2019 13:49:59 +0000 (14:49 +0100)
15 files changed:
client/src/app/+admin/system/jobs/job.service.ts
client/src/app/+admin/system/jobs/jobs.component.html
client/src/app/+admin/system/jobs/jobs.component.scss
client/src/app/+admin/system/jobs/jobs.component.ts
client/src/types/job-type-client.type.ts [new file with mode: 0644]
server/controllers/api/jobs.ts
server/helpers/custom-validators/jobs.ts
server/lib/job-queue/job-queue.ts
server/middlewares/validators/jobs.ts
server/tests/api/check-params/jobs.ts
server/tests/api/server/handle-down.ts
server/tests/api/server/jobs.ts
server/tests/real-world/real-world.ts
shared/extra-utils/server/jobs.ts
shared/models/server/job.model.ts

index 1daae8f030537568dd615336a79cf14f0aedd7cb..120144dff5a4872fcaa6d339ba2ad36b9a8c7bcf 100644 (file)
@@ -3,11 +3,12 @@ import { HttpClient, HttpParams } from '@angular/common/http'
 import { Injectable } from '@angular/core'
 import { SortMeta } from 'primeng/api'
 import { Observable } from 'rxjs'
-import { ResultList } from '../../../../../../shared'
+import { JobType, ResultList } from '../../../../../../shared'
 import { JobState } from '../../../../../../shared/models'
 import { Job } from '../../../../../../shared/models/server/job.model'
 import { environment } from '../../../../environments/environment'
 import { RestExtractor, RestPagination, RestService } from '../../../shared'
+import { JobTypeClient } from '../../../../types/job-type-client.type'
 
 @Injectable()
 export class JobService {
@@ -19,10 +20,12 @@ export class JobService {
     private restExtractor: RestExtractor
   ) {}
 
-  getJobs (state: JobState, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
+  getJobs (state: JobState, jobType: JobTypeClient, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
     let params = new HttpParams()
     params = this.restService.addRestGetParams(params, pagination, sort)
 
+    if (jobType !== 'all') params = params.append('jobType', jobType)
+
     return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
                .pipe(
                  map(res => {
index 7ed1888e2b2a2b0be0173f3706d38690bb0d6d29..cd26257dd2ea8000ee794ab270bc1d84436b42f3 100644 (file)
@@ -1,10 +1,22 @@
 <div class="admin-sub-header">
   <div i18n class="form-sub-title">Jobs list</div>
 
-  <div class="peertube-select-container">
-    <select [(ngModel)]="jobState" (ngModelChange)="onJobStateChanged()">
-      <option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
-    </select>
+  <div class="select-filter-block">
+    <label for="jobType">Job type</label>
+    <div class="peertube-select-container">
+      <select id="jobType" name="jobType" [(ngModel)]="jobType" (ngModelChange)="onJobStateOrTypeChanged()">
+        <option *ngFor="let jobType of jobTypes" [value]="jobType">{{ jobType }}</option>
+      </select>
+    </div>
+  </div>
+
+  <div class="select-filter-block">
+    <label for="jobState">Job state</label>
+    <div class="peertube-select-container">
+      <select id="jobState" name="jobState" [(ngModel)]="jobState" (ngModelChange)="onJobStateOrTypeChanged()">
+        <option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
+      </select>
+    </div>
   </div>
 </div>
 
index ab05f1982599d9ee59a7f01ceae8aadf2c279c83..ccc0b35ca0ae2ca42db386e66253854b98736f19 100644 (file)
@@ -1,8 +1,22 @@
 @import '_variables';
 @import '_mixins';
 
-.peertube-select-container {
-  @include peertube-select-container(auto);
+.admin-sub-header {
+  align-items: flex-end;
+
+  .select-filter-block {
+    &:not(:last-child) {
+      margin-right: 10px;
+    }
+
+    label {
+      margin-bottom: 2px;
+    }
+
+    .peertube-select-container {
+      @include peertube-select-container(auto);
+    }
+  }
 }
 
 pre {
index b24353ca68000d60e869a3f9dad8408e54303749..95ee17023ee5f6041b4db0050d46ada2fba41f0d 100644 (file)
@@ -2,11 +2,12 @@ import { Component, OnInit } from '@angular/core'
 import { peertubeLocalStorage } from '@app/shared/misc/peertube-local-storage'
 import { Notifier } from '@app/core'
 import { SortMeta } from 'primeng/api'
-import { Job } from '../../../../../../shared/index'
+import { Job, JobType } from '../../../../../../shared/index'
 import { JobState } from '../../../../../../shared/models'
 import { RestPagination, RestTable } from '../../../shared'
 import { JobService } from './job.service'
 import { I18n } from '@ngx-translate/i18n-polyfill'
+import { JobTypeClient } from '../../../../types/job-type-client.type'
 
 @Component({
   selector: 'my-jobs',
@@ -15,9 +16,26 @@ import { I18n } from '@ngx-translate/i18n-polyfill'
 })
 export class JobsComponent extends RestTable implements OnInit {
   private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
+  private static JOB_STATE_LOCAL_STORAGE_TYPE = 'jobs-list-type'
 
   jobState: JobState = 'waiting'
   jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
+
+  jobType: JobTypeClient = 'all'
+  jobTypes: JobTypeClient[] = [
+    'all',
+    'activitypub-follow',
+    'activitypub-http-broadcast',
+    'activitypub-http-fetcher',
+    'activitypub-http-unicast',
+    'email',
+    'video-transcoding',
+    'video-file-import',
+    'video-import',
+    'videos-views',
+    'activitypub-refresher'
+  ]
+
   jobs: Job[] = []
   totalRecords: number
   rowsPerPage = 10
@@ -33,20 +51,20 @@ export class JobsComponent extends RestTable implements OnInit {
   }
 
   ngOnInit () {
-    this.loadJobState()
+    this.loadJobStateAndType()
     this.initialize()
   }
 
-  onJobStateChanged () {
+  onJobStateOrTypeChanged () {
     this.pagination.start = 0
 
     this.loadData()
-    this.saveJobState()
+    this.saveJobStateAndType()
   }
 
   protected loadData () {
     this.jobsService
-      .getJobs(this.jobState, this.pagination, this.sort)
+      .getJobs(this.jobState, this.jobType, this.pagination, this.sort)
       .subscribe(
         resultList => {
           this.jobs = resultList.data
@@ -57,13 +75,16 @@ export class JobsComponent extends RestTable implements OnInit {
       )
   }
 
-  private loadJobState () {
-    const result = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE)
+  private loadJobStateAndType () {
+    const state = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE)
+    if (state) this.jobState = state as JobState
 
-    if (result) this.jobState = result as JobState
+    const type = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE)
+    if (type) this.jobType = type as JobType
   }
 
-  private saveJobState () {
+  private saveJobStateAndType () {
     peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE, this.jobState)
+    peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE, this.jobType)
   }
 }
diff --git a/client/src/types/job-type-client.type.ts b/client/src/types/job-type-client.type.ts
new file mode 100644 (file)
index 0000000..7d51f1d
--- /dev/null
@@ -0,0 +1,3 @@
+import { JobType } from '@shared/models'
+
+export type JobTypeClient = 'all' | JobType
index 1fa662349a958d1c96c5272bd1054619093d9302..05320311e98e00e21ebd4e52689496c31bf7db06 100644 (file)
@@ -24,7 +24,7 @@ jobsRouter.get('/:state',
   jobsSortValidator,
   setDefaultSort,
   setDefaultPagination,
-  asyncMiddleware(listJobsValidator),
+  listJobsValidator,
   asyncMiddleware(listJobs)
 )
 
@@ -39,8 +39,15 @@ export {
 async function listJobs (req: express.Request, res: express.Response) {
   const state = req.params.state as JobState
   const asc = req.query.sort === 'createdAt'
+  const jobType = req.query.jobType
 
-  const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
+  const jobs = await JobQueue.Instance.listForApi({
+    state,
+    start: req.query.start,
+    count: req.query.count,
+    asc,
+    jobType
+  })
   const total = await JobQueue.Instance.count(state)
 
   const result: ResultList<any> = {
index 1cc6e6912b3668e02d9495329e7e8076506ae0d0..dd33e85a39a4d28c64054ec9996ac157dec8942b 100644 (file)
@@ -1,14 +1,20 @@
 import { JobState } from '../../../shared/models'
 import { exists } from './misc'
+import { jobTypes } from '@server/lib/job-queue/job-queue'
 
 const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
 
 function isValidJobState (value: JobState) {
-  return exists(value) && jobStates.indexOf(value) !== -1
+  return exists(value) && jobStates.includes(value)
+}
+
+function isValidJobType (value: any) {
+  return exists(value) && jobTypes.includes(value)
 }
 
 // ---------------------------------------------------------------------------
 
 export {
-  isValidJobState
+  isValidJobState,
+  isValidJobType
 }
index 3c810da98e6d9b3fb4a889cd935aec8d16cabcbd..ec601e9eadd3cf761580b4a9dad94c036b25f1eb 100644 (file)
@@ -121,11 +121,20 @@ class JobQueue {
     return queue.add(obj.payload, jobArgs)
   }
 
-  async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
+  async listForApi (options: {
+    state: JobState,
+    start: number,
+    count: number,
+    asc?: boolean,
+    jobType: JobType
+  }): Promise<Bull.Job[]> {
+    const { state, start, count, asc, jobType } = options
     let results: Bull.Job[] = []
 
+    const filteredJobTypes = this.filterJobTypes(jobType)
+
     // TODO: optimize
-    for (const jobType of jobTypes) {
+    for (const jobType of filteredJobTypes) {
       const queue = this.queues[ jobType ]
       if (queue === undefined) {
         logger.error('Unknown queue %s to list jobs.', jobType)
@@ -149,10 +158,12 @@ class JobQueue {
     return results.slice(start, start + count)
   }
 
-  async count (state: JobState): Promise<number> {
+  async count (state: JobState, jobType?: JobType): Promise<number> {
     let total = 0
 
-    for (const type of jobTypes) {
+    const filteredJobTypes = this.filterJobTypes(jobType)
+
+    for (const type of filteredJobTypes) {
       const queue = this.queues[ type ]
       if (queue === undefined) {
         logger.error('Unknown queue %s to count jobs.', type)
@@ -180,6 +191,12 @@ class JobQueue {
     })
   }
 
+  private filterJobTypes (jobType?: JobType) {
+    if (!jobType) return jobTypes
+
+    return jobTypes.filter(t => t === jobType)
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }
@@ -188,5 +205,6 @@ class JobQueue {
 // ---------------------------------------------------------------------------
 
 export {
+  jobTypes,
   JobQueue
 }
index 41a8d689962e6bcf6674d0f0c4c9a59ebce3fd36..b57615dbcb48e1261513e40fa372819b0bbd02bc 100644 (file)
@@ -1,13 +1,17 @@
 import * as express from 'express'
-import { param } from 'express-validator'
-import { isValidJobState } from '../../helpers/custom-validators/jobs'
+import { param, query } from 'express-validator'
+import { isValidJobState, isValidJobType } from '../../helpers/custom-validators/jobs'
 import { logger } from '../../helpers/logger'
 import { areValidationErrors } from './utils'
 
 const listJobsValidator = [
-  param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
+  param('state')
+    .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
+  query('jobType')
+    .optional()
+    .custom(isValidJobType).withMessage('Should have a valid job state'),
 
-  async (req: express.Request, res: express.Response, next: express.NextFunction) => {
+  (req: express.Request, res: express.Response, next: express.NextFunction) => {
     logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })
 
     if (areValidationErrors(req, res)) return
index c7013951424d69a43012600acdfb63e9c41a7740..22e23796477b99f4d6a92acd5fc5fcda2d0d2760 100644 (file)
@@ -51,6 +51,17 @@ describe('Test jobs API validators', function () {
       })
     })
 
+    it('Should fail with an incorrect job type', async function () {
+      await makeGetRequest({
+        url: server.url,
+        token: server.accessToken,
+        path,
+        query: {
+          jobType: 'toto'
+        }
+      })
+    })
+
     it('Should fail with a bad start pagination', async function () {
       await checkBadStartPagination(server.url, path, server.accessToken)
     })
@@ -79,6 +90,7 @@ describe('Test jobs API validators', function () {
         statusCodeExpected: 403
       })
     })
+
   })
 
   after(async function () {
index a0f5054748aa266e72a62d3380ca545f59e24366..7e36067f156e27594634b6abb4875a6a5b1b414f 100644 (file)
@@ -184,7 +184,14 @@ describe('Test handle downs', function () {
     const states: JobState[] = [ 'waiting', 'active' ]
 
     for (const state of states) {
-      const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
+      const res = await getJobsListPaginationAndSort({
+        url: servers[ 0 ].url,
+        accessToken: servers[ 0 ].accessToken,
+        state: state,
+        start: 0,
+        count: 50,
+        sort: '-createdAt'
+      })
       expect(res.body.data).to.have.length(0)
     }
   })
index ceea47a854a77b3377b1e1cd3bd067fee7ed8787..58d8c8c105fd4b36fd47cce5f6baae4ea2bb8104 100644 (file)
@@ -41,20 +41,46 @@ describe('Test jobs', function () {
     expect(res.body.data).to.have.length.above(2)
   })
 
-  it('Should list jobs with sort and pagination', async function () {
-    const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 2, 'createdAt')
-    expect(res.body.total).to.be.above(2)
-    expect(res.body.data).to.have.lengthOf(2)
+  it('Should list jobs with sort, pagination and job type', async function () {
+    {
+      const res = await getJobsListPaginationAndSort({
+        url: servers[ 1 ].url,
+        accessToken: servers[ 1 ].accessToken,
+        state: 'completed',
+        start: 1,
+        count: 2,
+        sort: 'createdAt'
+      })
+      expect(res.body.total).to.be.above(2)
+      expect(res.body.data).to.have.lengthOf(2)
+
+      let job: Job = res.body.data[ 0 ]
+      // Skip repeat jobs
+      if (job.type === 'videos-views') job = res.body.data[ 1 ]
+
+      expect(job.state).to.equal('completed')
+      expect(job.type.startsWith('activitypub-')).to.be.true
+      expect(dateIsValid(job.createdAt as string)).to.be.true
+      expect(dateIsValid(job.processedOn as string)).to.be.true
+      expect(dateIsValid(job.finishedOn as string)).to.be.true
+    }
 
-    let job = res.body.data[0]
-    // Skip repeat jobs
-    if (job.type === 'videos-views') job = res.body.data[1]
+    {
+      const res = await getJobsListPaginationAndSort({
+        url: servers[ 1 ].url,
+        accessToken: servers[ 1 ].accessToken,
+        state: 'completed',
+        start: 0,
+        count: 100,
+        sort: 'createdAt',
+        jobType: 'activitypub-http-broadcast'
+      })
+      expect(res.body.total).to.be.above(2)
 
-    expect(job.state).to.equal('completed')
-    expect(job.type.startsWith('activitypub-')).to.be.true
-    expect(dateIsValid(job.createdAt)).to.be.true
-    expect(dateIsValid(job.processedOn)).to.be.true
-    expect(dateIsValid(job.finishedOn)).to.be.true
+      for (const j of res.body.data as Job[]) {
+        expect(j.type).to.equal('activitypub-http-broadcast')
+      }
+    }
   })
 
   after(async function () {
index 8b070004d7ac06fedcd880c4c224ea253f4c9679..cba5ac3119ee501c2eeff6e8e51c685deeec2a44 100644 (file)
@@ -354,7 +354,14 @@ async function isTherePendingRequests (servers: ServerInfo[]) {
   // Check if each server has pending request
   for (const server of servers) {
     for (const state of states) {
-      const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
+      const p = getJobsListPaginationAndSort({
+        url: server.url,
+        accessToken: server.accessToken,
+        state: state,
+        start: 0,
+        count: 10,
+        sort: '-createdAt'
+      })
         .then(res => {
           if (res.body.total > 0) pendingRequests = true
         })
index b3db885e85d3db8fe5d9771abe4baa87f09f13be..cc1352e14f152d18ee8f98feee18269314496129 100644 (file)
@@ -1,7 +1,8 @@
 import * as request from 'supertest'
-import { Job, JobState } from '../../models'
+import { Job, JobState, JobType } from '../../models'
 import { wait } from '../miscs/miscs'
 import { ServerInfo } from './servers'
+import { makeGetRequest } from '@shared/extra-utils'
 
 function getJobsList (url: string, accessToken: string, state: JobState) {
   const path = '/api/v1/jobs/' + state
@@ -14,18 +15,32 @@ function getJobsList (url: string, accessToken: string, state: JobState) {
           .expect('Content-Type', /json/)
 }
 
-function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) {
+function getJobsListPaginationAndSort (options: {
+  url: string,
+  accessToken: string,
+  state: JobState,
+  start: number,
+  count: number,
+  sort: string,
+  jobType?: JobType
+}) {
+  const { url, accessToken, state, start, count, sort, jobType } = options
   const path = '/api/v1/jobs/' + state
 
-  return request(url)
-          .get(path)
-          .query({ start })
-          .query({ count })
-          .query({ sort })
-          .set('Accept', 'application/json')
-          .set('Authorization', 'Bearer ' + accessToken)
-          .expect(200)
-          .expect('Content-Type', /json/)
+  const query = {
+    start,
+    count,
+    sort,
+    jobType
+  }
+
+  return makeGetRequest({
+    url,
+    path,
+    token: accessToken,
+    statusCodeExpected: 200,
+    query
+  })
 }
 
 async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
@@ -44,7 +59,14 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
     // Check if each server has pending request
     for (const server of servers) {
       for (const state of states) {
-        const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
+        const p = getJobsListPaginationAndSort({
+          url: server.url,
+          accessToken: server.accessToken,
+          state: state,
+          start: 0,
+          count: 10,
+          sort: '-createdAt'
+        })
           .then(res => res.body.data)
           .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
           .then(jobs => {
index 1b9aa8a0718d123465758296d39b7a68e74ec241..b82a633b2917e3767fb812beff7decebfbf15efb 100644 (file)
@@ -17,7 +17,7 @@ export interface Job {
   type: JobType
   data: any,
   error: any,
-  createdAt: Date
-  finishedOn: Date
-  processedOn: Date
+  createdAt: Date | string
+  finishedOn: Date | string
+  processedOn: Date | string
 }