Rename video-file job to video-transcoding
authorChocobozzz <me@florianbigard.com>
Tue, 19 Mar 2019 16:00:08 +0000 (17:00 +0100)
committerChocobozzz <me@florianbigard.com>
Tue, 19 Mar 2019 16:00:08 +0000 (17:00 +0100)
scripts/create-transcoding-job.ts
server/controllers/api/videos/index.ts
server/initializers/constants.ts
server/lib/job-queue/handlers/video-file.ts [deleted file]
server/lib/job-queue/handlers/video-import.ts
server/lib/job-queue/handlers/video-transcoding.ts [new file with mode: 0644]
server/lib/job-queue/job-queue.ts
shared/models/server/job.model.ts

index 7e5b687bbc2c22520fa5969d79ea29d2c71e5b32..4a677eacb1e1b2e937920146eb2725c5bf4765df 100755 (executable)
@@ -42,6 +42,6 @@ async function run () {
   }
 
   await JobQueue.Instance.init()
-  await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
+  await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
   console.log('Transcoding job for video %s created.', video.uuid)
 }
index db4f4c96ffc99078fb6e4db0a73fcbed442ba9ff..08bee97d3d6cf4684eb98e40b2d2b669f3867c5d 100644 (file)
@@ -283,7 +283,7 @@ async function addVideo (req: express.Request, res: express.Response) {
       isNewVideo: true
     }
 
-    await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
+    await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
   }
 
   return res.json({
index 54cd57619a2d03fd2b67a0c868cf0771ec405cf1..ff0ade17a7bb24869f32c8782ef5c85962bd32e1 100644 (file)
@@ -100,36 +100,40 @@ const REMOTE_SCHEME = {
   WS: 'wss'
 }
 
-const JOB_ATTEMPTS: { [ id in JobType ]: number } = {
+// TODO: remove 'video-file'
+const JOB_ATTEMPTS: { [ id in (JobType | 'video-file') ]: number } = {
   'activitypub-http-broadcast': 5,
   'activitypub-http-unicast': 5,
   'activitypub-http-fetcher': 5,
   'activitypub-follow': 5,
   'video-file-import': 1,
+  'video-transcoding': 1,
   'video-file': 1,
   'video-import': 1,
   'email': 5,
   'videos-views': 1,
   'activitypub-refresher': 1
 }
-const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
+const JOB_CONCURRENCY: { [ id in (JobType | 'video-file') ]: number } = {
   'activitypub-http-broadcast': 1,
   'activitypub-http-unicast': 5,
   'activitypub-http-fetcher': 1,
   'activitypub-follow': 3,
   'video-file-import': 1,
+  'video-transcoding': 1,
   'video-file': 1,
   'video-import': 1,
   'email': 5,
   'videos-views': 1,
   'activitypub-refresher': 1
 }
-const JOB_TTL: { [ id in JobType ]: number } = {
+const JOB_TTL: { [ id in (JobType | 'video-file') ]: number } = {
   'activitypub-http-broadcast': 60000 * 10, // 10 minutes
   'activitypub-http-unicast': 60000 * 10, // 10 minutes
   'activitypub-http-fetcher': 60000 * 10, // 10 minutes
   'activitypub-follow': 60000 * 10, // 10 minutes
   'video-file-import': 1000 * 3600, // 1 hour
+  'video-transcoding': 1000 * 3600 * 48, // 2 days, transcoding could be long
   'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long
   'video-import': 1000 * 3600 * 2, //  hours
   'email': 60000 * 10, // 10 minutes
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
deleted file mode 100644 (file)
index 3a867b7..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
-import * as Bull from 'bull'
-import { VideoResolution, VideoState } from '../../../../shared'
-import { logger } from '../../../helpers/logger'
-import { VideoModel } from '../../../models/video/video'
-import { JobQueue } from '../job-queue'
-import { federateVideoIfNeeded } from '../../activitypub'
-import { retryTransactionWrapper } from '../../../helpers/database-utils'
-import { sequelizeTypescript, CONFIG } from '../../../initializers'
-import * as Bluebird from 'bluebird'
-import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
-import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
-import { Notifier } from '../../notifier'
-
-export type VideoFilePayload = {
-  videoUUID: string
-  resolution?: VideoResolution
-  isNewVideo?: boolean
-  isPortraitMode?: boolean
-  generateHlsPlaylist?: boolean
-}
-
-export type VideoFileImportPayload = {
-  videoUUID: string,
-  filePath: string
-}
-
-async function processVideoFileImport (job: Bull.Job) {
-  const payload = job.data as VideoFileImportPayload
-  logger.info('Processing video file import in job %d.', job.id)
-
-  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
-  // No video, maybe deleted?
-  if (!video) {
-    logger.info('Do not process job %d, video does not exist.', job.id)
-    return undefined
-  }
-
-  await importVideoFile(video, payload.filePath)
-
-  await onVideoFileTranscoderOrImportSuccess(video)
-  return video
-}
-
-async function processVideoFile (job: Bull.Job) {
-  const payload = job.data as VideoFilePayload
-  logger.info('Processing video file in job %d.', job.id)
-
-  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
-  // No video, maybe deleted?
-  if (!video) {
-    logger.info('Do not process job %d, video does not exist.', job.id)
-    return undefined
-  }
-
-  if (payload.generateHlsPlaylist) {
-    await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
-
-    await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
-  } else if (payload.resolution) { // Transcoding in other resolution
-    await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
-
-    await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload)
-  } else {
-    await optimizeVideofile(video)
-
-    await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
-  }
-
-  return video
-}
-
-async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
-  if (video === undefined) return undefined
-
-  await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, false, t)
-  })
-}
-
-async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoFilePayload) {
-  if (video === undefined) return undefined
-
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    let videoPublished = false
-
-    // We transcoded the video file in another format, now we can publish it
-    if (videoDatabase.state !== VideoState.PUBLISHED) {
-      videoPublished = true
-
-      videoDatabase.state = VideoState.PUBLISHED
-      videoDatabase.publishedAt = new Date()
-      videoDatabase = await videoDatabase.save({ transaction: t })
-    }
-
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, videoPublished, t)
-
-    return { videoDatabase, videoPublished }
-  })
-
-  // don't notify prior to scheduled video update
-  if (videoPublished && !videoDatabase.ScheduleVideoUpdate) {
-    Notifier.Instance.notifyOnNewVideo(videoDatabase)
-    Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
-  }
-
-  await createHlsJobIfEnabled(payload)
-}
-
-async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoFilePayload) {
-  if (videoArg === undefined) return undefined
-
-  // Outside the transaction (IO on disk)
-  const { videoFileResolution } = await videoArg.getOriginalFileResolution()
-
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    // Create transcoding jobs if there are enabled resolutions
-    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution)
-    logger.info(
-      'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution,
-      { resolutions: resolutionsEnabled }
-    )
-
-    let videoPublished = false
-
-    if (resolutionsEnabled.length !== 0) {
-      const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = []
-
-      for (const resolution of resolutionsEnabled) {
-        const dataInput = {
-          videoUUID: videoDatabase.uuid,
-          resolution
-        }
-
-        const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
-        tasks.push(p)
-      }
-
-      await Promise.all(tasks)
-
-      logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
-    } else {
-      videoPublished = true
-
-      // No transcoding to do, it's now published
-      videoDatabase.state = VideoState.PUBLISHED
-      videoDatabase = await videoDatabase.save({ transaction: t })
-
-      logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
-    }
-
-    await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
-
-    return { videoDatabase, videoPublished }
-  })
-
-  // don't notify prior to scheduled video update
-  if (!videoDatabase.ScheduleVideoUpdate) {
-    if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
-    if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
-  }
-
-  await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }))
-}
-
-// ---------------------------------------------------------------------------
-
-export {
-  processVideoFile,
-  processVideoFileImport
-}
-
-// ---------------------------------------------------------------------------
-
-function createHlsJobIfEnabled (payload?: VideoFilePayload) {
-  // Generate HLS playlist?
-  if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
-    const hlsTranscodingPayload = {
-      videoUUID: payload.videoUUID,
-      resolution: payload.resolution,
-      isPortraitMode: payload.isPortraitMode,
-
-      generateHlsPlaylist: true
-    }
-
-    return JobQueue.Instance.createJob({ type: 'video-file', payload: hlsTranscodingPayload })
-  }
-}
index 12004dcd7ef5fb7af316bb2c23da7163f92343df..d96bfdf43cd1022454da6bf45b2630497d2a86f1 100644 (file)
@@ -207,7 +207,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: Vide
         isNewVideo: true
       }
 
-      await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
+      await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
     }
 
   } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
new file mode 100644 (file)
index 0000000..ceee83f
--- /dev/null
@@ -0,0 +1,204 @@
+import * as Bull from 'bull'
+import { VideoResolution, VideoState } from '../../../../shared'
+import { logger } from '../../../helpers/logger'
+import { VideoModel } from '../../../models/video/video'
+import { JobQueue } from '../job-queue'
+import { federateVideoIfNeeded } from '../../activitypub'
+import { retryTransactionWrapper } from '../../../helpers/database-utils'
+import { sequelizeTypescript, CONFIG } from '../../../initializers'
+import * as Bluebird from 'bluebird'
+import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
+import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
+import { Notifier } from '../../notifier'
+
+export type VideoTranscodingPayload = {
+  videoUUID: string
+  resolution?: VideoResolution
+  isNewVideo?: boolean
+  isPortraitMode?: boolean
+  generateHlsPlaylist?: boolean
+}
+
+export type VideoFileImportPayload = {
+  videoUUID: string,
+  filePath: string
+}
+
+async function processVideoFileImport (job: Bull.Job) {
+  const payload = job.data as VideoFileImportPayload
+  logger.info('Processing video file import in job %d.', job.id)
+
+  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
+  // No video, maybe deleted?
+  if (!video) {
+    logger.info('Do not process job %d, video does not exist.', job.id)
+    return undefined
+  }
+
+  await importVideoFile(video, payload.filePath)
+
+  await onVideoFileTranscoderOrImportSuccess(video)
+  return video
+}
+
+async function processVideoTranscoding (job: Bull.Job) {
+  const payload = job.data as VideoTranscodingPayload
+  logger.info('Processing video file in job %d.', job.id)
+
+  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
+  // No video, maybe deleted?
+  if (!video) {
+    logger.info('Do not process job %d, video does not exist.', job.id)
+    return undefined
+  }
+
+  if (payload.generateHlsPlaylist) {
+    await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
+
+    await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
+  } else if (payload.resolution) { // Transcoding in other resolution
+    await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
+
+    await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload)
+  } else {
+    await optimizeVideofile(video)
+
+    await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
+  }
+
+  return video
+}
+
+async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
+  if (video === undefined) return undefined
+
+  await sequelizeTypescript.transaction(async t => {
+    // Maybe the video changed in database, refresh it
+    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
+    // Video does not exist anymore
+    if (!videoDatabase) return undefined
+
+    // If the video was not published, we consider it is a new one for other instances
+    await federateVideoIfNeeded(videoDatabase, false, t)
+  })
+}
+
+async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoTranscodingPayload) {
+  if (video === undefined) return undefined
+
+  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
+    // Maybe the video changed in database, refresh it
+    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
+    // Video does not exist anymore
+    if (!videoDatabase) return undefined
+
+    let videoPublished = false
+
+    // We transcoded the video file in another format, now we can publish it
+    if (videoDatabase.state !== VideoState.PUBLISHED) {
+      videoPublished = true
+
+      videoDatabase.state = VideoState.PUBLISHED
+      videoDatabase.publishedAt = new Date()
+      videoDatabase = await videoDatabase.save({ transaction: t })
+    }
+
+    // If the video was not published, we consider it is a new one for other instances
+    await federateVideoIfNeeded(videoDatabase, videoPublished, t)
+
+    return { videoDatabase, videoPublished }
+  })
+
+  // don't notify prior to scheduled video update
+  if (videoPublished && !videoDatabase.ScheduleVideoUpdate) {
+    Notifier.Instance.notifyOnNewVideo(videoDatabase)
+    Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
+  }
+
+  await createHlsJobIfEnabled(payload)
+}
+
+async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoTranscodingPayload) {
+  if (videoArg === undefined) return undefined
+
+  // Outside the transaction (IO on disk)
+  const { videoFileResolution } = await videoArg.getOriginalFileResolution()
+
+  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
+    // Maybe the video changed in database, refresh it
+    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
+    // Video does not exist anymore
+    if (!videoDatabase) return undefined
+
+    // Create transcoding jobs if there are enabled resolutions
+    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution)
+    logger.info(
+      'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution,
+      { resolutions: resolutionsEnabled }
+    )
+
+    let videoPublished = false
+
+    if (resolutionsEnabled.length !== 0) {
+      const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = []
+
+      for (const resolution of resolutionsEnabled) {
+        const dataInput = {
+          videoUUID: videoDatabase.uuid,
+          resolution
+        }
+
+        const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
+        tasks.push(p)
+      }
+
+      await Promise.all(tasks)
+
+      logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
+    } else {
+      videoPublished = true
+
+      // No transcoding to do, it's now published
+      videoDatabase.state = VideoState.PUBLISHED
+      videoDatabase = await videoDatabase.save({ transaction: t })
+
+      logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
+    }
+
+    await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
+
+    return { videoDatabase, videoPublished }
+  })
+
+  // don't notify prior to scheduled video update
+  if (!videoDatabase.ScheduleVideoUpdate) {
+    if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
+    if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
+  }
+
+  await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }))
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  processVideoTranscoding,
+  processVideoFileImport
+}
+
+// ---------------------------------------------------------------------------
+
+function createHlsJobIfEnabled (payload?: VideoTranscodingPayload) {
+  // Generate HLS playlist?
+  if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
+    const hlsTranscodingPayload = {
+      videoUUID: payload.videoUUID,
+      resolution: payload.resolution,
+      isPortraitMode: payload.isPortraitMode,
+
+      generateHlsPlaylist: true
+    }
+
+    return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
+  }
+}
index ba9cbe0d99d64b9c0e3f10a579acfec291510180..e73042163a50e09e125e1a208bab3f38de5a9431 100644 (file)
@@ -7,7 +7,12 @@ import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from
 import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
 import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
 import { EmailPayload, processEmail } from './handlers/email'
-import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
+import {
+  processVideoFileImport,
+  processVideoTranscoding,
+  VideoFileImportPayload,
+  VideoTranscodingPayload
+} from './handlers/video-transcoding'
 import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
 import { processVideoImport, VideoImportPayload } from './handlers/video-import'
 import { processVideosViews } from './handlers/video-views'
@@ -19,19 +24,20 @@ type CreateJobArgument =
   { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
   { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
   { type: 'video-file-import', payload: VideoFileImportPayload } |
-  { type: 'video-file', payload: VideoFilePayload } |
+  { type: 'video-transcoding', payload: VideoTranscodingPayload } |
   { type: 'email', payload: EmailPayload } |
   { type: 'video-import', payload: VideoImportPayload } |
   { type: 'activitypub-refresher', payload: RefreshPayload } |
   { type: 'videos-views', payload: {} }
 
-const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
+const handlers: { [ id in (JobType | 'video-file') ]: (job: Bull.Job) => Promise<any>} = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
   'activitypub-http-fetcher': processActivityPubHttpFetcher,
   'activitypub-follow': processActivityPubFollow,
   'video-file-import': processVideoFileImport,
-  'video-file': processVideoFile,
+  'video-transcoding': processVideoTranscoding,
+  'video-file': processVideoTranscoding, // TODO: remove it (changed in 1.3)
   'email': processEmail,
   'video-import': processVideoImport,
   'videos-views': processVideosViews,
@@ -44,7 +50,7 @@ const jobTypes: JobType[] = [
   'activitypub-http-fetcher',
   'activitypub-http-unicast',
   'email',
-  'video-file',
+  'video-transcoding',
   'video-file-import',
   'video-import',
   'videos-views',
index 85bc9541b673eda5eddec677396a917e25c7bf45..1b9aa8a0718d123465758296d39b7a68e74ec241 100644 (file)
@@ -5,7 +5,7 @@ export type JobType = 'activitypub-http-unicast' |
   'activitypub-http-fetcher' |
   'activitypub-follow' |
   'video-file-import' |
-  'video-file' |
+  'video-transcoding' |
   'email' |
   'video-import' |
   'videos-views' |