Server: add job scheduler to transcode video files
authorChocobozzz <florian.bigard@gmail.com>
Tue, 2 May 2017 20:02:27 +0000 (22:02 +0200)
committerChocobozzz <florian.bigard@gmail.com>
Thu, 4 May 2017 19:12:32 +0000 (21:12 +0200)
config/default.yaml
config/production.yaml.example
config/test-6.yaml
server.js
server/initializers/checker.js
server/initializers/constants.js
server/lib/jobs/handlers/index.js [new file with mode: 0644]
server/lib/jobs/handlers/video-transcoder.js [new file with mode: 0644]
server/lib/jobs/job-scheduler.js [new file with mode: 0644]
server/models/job.js [new file with mode: 0644]
server/models/video.js

index a5ac157945d22990462b314ca562eef9bb530dec..27eb2a5336539afdeae1ece1ad00f25f64caceb7 100644 (file)
@@ -28,3 +28,9 @@ admin:
 
 signup:
   enabled: false
+
+# If enabled, the video will be transcoded to mp4 (x264) with "faststart" flag
+# Uses a lot of CPU!
+transcoding:
+  enabled: true
+  threads: 2
index 8e9d4b32d20b5c88f1b2be5603487c12a6884b1d..c18457df608213122dcb9dc4af66b269a97b44c8 100644 (file)
@@ -29,3 +29,9 @@ admin:
 
 signup:
   enabled: false
+
+# If enabled, the video will be transcoded to mp4 (x264) with "faststart" flag
+# Uses a lot of CPU!
+transcoding:
+  enabled: false
+  threads: 2
index d74d3b05214214de7d7e797fe5fe70c3dcae36ad..169af973a8b87ef8ae1eb748063d55ba3b25c139 100644 (file)
@@ -18,3 +18,6 @@ storage:
 
 admin:
   email: 'admin6@example.com'
+
+transcoding:
+  enabled: true
index 33d399786da00edd0f787ee8cba3b7819ed09ba9..54fa914bfbf5ad24dda7737d788adcfc9a681f32 100644 (file)
--- a/server.js
+++ b/server.js
@@ -40,6 +40,7 @@ const customValidators = require('./server/helpers/custom-validators')
 const friends = require('./server/lib/friends')
 const installer = require('./server/initializers/installer')
 const migrator = require('./server/initializers/migrator')
+const jobScheduler = require('./server/lib/jobs/job-scheduler')
 const routes = require('./server/controllers')
 
 // ----------- Command line -----------
@@ -133,6 +134,9 @@ function onDatabaseInitDone () {
         // Activate the communication with friends
         friends.activate()
 
+        // Activate job scheduler
+        jobScheduler.activate()
+
         logger.info('Server listening on port %d', port)
         logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL)
 
index 461a851fecf2c9f1e14414a5a0224f5815fbda2e..a3727563ac8041ff63067c1d50f81138fff7ad81 100644 (file)
@@ -29,7 +29,7 @@ function checkMissedConfig () {
     'webserver.https', 'webserver.hostname', 'webserver.port',
     'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password',
     'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews',
-    'admin.email', 'signup.enabled'
+    'admin.email', 'signup.enabled', 'transcoding.enabled', 'transcoding.threads'
   ]
   const miss = []
 
index d6da20982b29a0eded0e0c70dfebda862db334dd..87e9c80029bcd20803eaf43aa4e317566c67b375 100644 (file)
@@ -64,6 +64,10 @@ const CONFIG = {
   },
   SIGNUP: {
     ENABLED: config.get('signup.enabled')
+  },
+  TRANSCODING: {
+    ENABLED: config.get('transcoding.enabled'),
+    THREADS: config.get('transcoding.threads')
   }
 }
 CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT
@@ -223,6 +227,18 @@ const REMOTE_SCHEME = {
   WS: 'wss'
 }
 
+const JOB_STATES = {
+  PENDING: 'pending',
+  PROCESSING: 'processing',
+  ERROR: 'error',
+  SUCCESS: 'success'
+}
+// How many maximum jobs we fetch from the database per cycle
+const JOBS_FETCH_LIMIT_PER_CYCLE = 10
+const JOBS_CONCURRENCY = 1
+// 1 minutes
+let JOBS_FETCHING_INTERVAL = 60000
+
 // ---------------------------------------------------------------------------
 
 const PRIVATE_CERT_NAME = 'peertube.key.pem'
@@ -264,6 +280,7 @@ if (isTestInstance() === true) {
   CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14
   FRIEND_SCORE.BASE = 20
   REQUESTS_INTERVAL = 10000
+  JOBS_FETCHING_INTERVAL = 10000
   REMOTE_SCHEME.HTTP = 'http'
   REMOTE_SCHEME.WS = 'ws'
   STATIC_MAX_AGE = 0
@@ -277,6 +294,10 @@ module.exports = {
   CONFIG,
   CONSTRAINTS_FIELDS,
   FRIEND_SCORE,
+  JOBS_FETCHING_INTERVAL,
+  JOB_STATES,
+  JOBS_CONCURRENCY,
+  JOBS_FETCH_LIMIT_PER_CYCLE,
   LAST_MIGRATION_VERSION,
   OAUTH_LIFETIME,
   PAGINATION_COUNT_DEFAULT,
diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js
new file mode 100644 (file)
index 0000000..59c1ccc
--- /dev/null
@@ -0,0 +1,7 @@
+'use strict'
+
+const videoTranscoder = require('./video-transcoder')
+
+module.exports = {
+  videoTranscoder
+}
diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js
new file mode 100644 (file)
index 0000000..8524df3
--- /dev/null
@@ -0,0 +1,34 @@
+'use strict'
+
+const db = require('../../../initializers/database')
+const logger = require('../../../helpers/logger')
+
+const VideoTranscoderHandler = {
+  process,
+  onError,
+  onSuccess
+}
+
+// ---------------------------------------------------------------------------
+
+function process (data, callback) {
+  db.Video.load(data.id, function (err, video) {
+    if (err) return callback(err)
+
+    video.transcodeVideofile(callback)
+  })
+}
+
+function onError (err, jobId, callback) {
+  logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
+  return callback()
+}
+
+function onSuccess (data, jobId, callback) {
+  logger.info('Job %d is a success.', jobId)
+  return callback()
+}
+
+// ---------------------------------------------------------------------------
+
+module.exports = VideoTranscoderHandler
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
new file mode 100644 (file)
index 0000000..589a306
--- /dev/null
@@ -0,0 +1,110 @@
+'use strict'
+
+const forever = require('async/forever')
+const queue = require('async/queue')
+
+const constants = require('../../initializers/constants')
+const db = require('../../initializers/database')
+const logger = require('../../helpers/logger')
+
+const jobHandlers = require('./handlers')
+
+const jobScheduler = {
+  activate,
+  createJob
+}
+
+function activate () {
+  logger.info('Jobs scheduler activated.')
+
+  const jobsQueue = queue(processJob)
+
+  forever(
+    function (next) {
+      if (jobsQueue.length() !== 0) {
+        // Finish processing the queue first
+        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+      }
+
+      db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
+        if (err) {
+          logger.error('Cannot list pending jobs.', { error: err })
+        } else {
+          jobs.forEach(function (job) {
+            jobsQueue.push(job)
+          })
+        }
+
+        // Optimization: we could use "drain" from queue object
+        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+      })
+    }
+  )
+}
+
+// ---------------------------------------------------------------------------
+
+module.exports = jobScheduler
+
+// ---------------------------------------------------------------------------
+
+function createJob (transaction, handlerName, handlerInputData, callback) {
+  const createQuery = {
+    state: constants.JOB_STATES.PENDING,
+    handlerName,
+    handlerInputData
+  }
+  const options = { transaction }
+
+  db.Job.create(createQuery, options).asCallback(callback)
+}
+
+function processJob (job, callback) {
+  const jobHandler = jobHandlers[job.handlerName]
+
+  logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
+
+  job.state = constants.JOB_STATES.PROCESSING
+  job.save().asCallback(function (err) {
+    if (err) return cannotSaveJobError(err, callback)
+
+    if (jobHandler === undefined) {
+      logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
+      return callback()
+    }
+
+    return jobHandler.process(job.handlerInputData, function (err, result) {
+      if (err) {
+        logger.error('Error in job handler %s.', job.handlerName, { error: err })
+        return onJobError(jobHandler, job, callback)
+      }
+
+      return onJobSuccess(jobHandler, job, callback)
+    })
+  })
+}
+
+function onJobError (jobHandler, job, callback) {
+  job.state = constants.JOB_STATES.ERROR
+
+  job.save().asCallback(function (err) {
+    if (err) return cannotSaveJobError(err, callback)
+
+    return jobHandler.onError(err, job.id, callback)
+  })
+}
+
+function onJobSuccess (jobHandler, job, callback) {
+  job.state = constants.JOB_STATES.SUCCESS
+
+  job.save().asCallback(function (err) {
+    if (err) return cannotSaveJobError(err, callback)
+
+    return jobHandler.onSuccess(err, job.id, callback)
+  })
+}
+
+function cannotSaveJobError (err, callback) {
+  logger.error('Cannot save new job state.', { error: err })
+  return callback(err)
+}
diff --git a/server/models/job.js b/server/models/job.js
new file mode 100644 (file)
index 0000000..eeb50e1
--- /dev/null
@@ -0,0 +1,54 @@
+'use strict'
+
+const values = require('lodash/values')
+
+const constants = require('../initializers/constants')
+
+// ---------------------------------------------------------------------------
+
+module.exports = function (sequelize, DataTypes) {
+  const Job = sequelize.define('Job',
+    {
+      state: {
+        type: DataTypes.ENUM(values(constants.JOB_STATES)),
+        allowNull: false
+      },
+      handlerName: {
+        type: DataTypes.STRING,
+        allowNull: false
+      },
+      handlerInputData: {
+        type: DataTypes.JSON,
+        allowNull: true
+      }
+    },
+    {
+      indexes: [
+        {
+          fields: [ 'state' ]
+        }
+      ],
+      classMethods: {
+        listWithLimit
+      }
+    }
+  )
+
+  return Job
+}
+
+// ---------------------------------------------------------------------------
+
+function listWithLimit (limit, callback) {
+  const query = {
+    order: [
+      [ 'id', 'ASC' ]
+    ],
+    limit: limit,
+    where: {
+      state: constants.JOB_STATES.PENDING
+    }
+  }
+
+  return this.findAll(query).asCallback(callback)
+}
index 029cb6d7cd040fb5b7e71940153d23304a558f6a..da4ddb420520fa9f669c801a314b77b79e4e30c2 100644 (file)
@@ -7,6 +7,7 @@ const fs = require('fs')
 const magnetUtil = require('magnet-uri')
 const map = require('lodash/map')
 const parallel = require('async/parallel')
+const series = require('async/series')
 const parseTorrent = require('parse-torrent')
 const pathUtils = require('path')
 const values = require('lodash/values')
@@ -17,6 +18,7 @@ const friends = require('../lib/friends')
 const modelUtils = require('./utils')
 const customVideosValidators = require('../helpers/custom-validators').videos
 const db = require('../initializers/database')
+const jobScheduler = require('../lib/jobs/job-scheduler')
 
 // ---------------------------------------------------------------------------
 
@@ -203,6 +205,7 @@ module.exports = function (sequelize, DataTypes) {
         toFormatedJSON,
         toAddRemoteJSON,
         toUpdateRemoteJSON,
+        transcodeVideofile,
         removeFromBlacklist
       },
       hooks: {
@@ -234,38 +237,30 @@ function beforeCreate (video, options, next) {
 
     tasks.push(
       function createVideoTorrent (callback) {
-        const options = {
-          announceList: [
-            [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
-          ],
-          urlList: [
-            constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
-          ]
-        }
-
-        createTorrent(videoPath, options, function (err, torrent) {
-          if (err) return callback(err)
-
-          const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
-          fs.writeFile(filePath, torrent, function (err) {
-            if (err) return callback(err)
-
-            const parsedTorrent = parseTorrent(torrent)
-            video.set('infoHash', parsedTorrent.infoHash)
-            video.validate().asCallback(callback)
-          })
-        })
+        createTorrentFromVideo(video, videoPath, callback)
       },
 
       function createVideoThumbnail (callback) {
         createThumbnail(video, videoPath, callback)
       },
 
-      function createVIdeoPreview (callback) {
+      function createVideoPreview (callback) {
         createPreview(video, videoPath, callback)
       }
     )
 
+    if (constants.CONFIG.TRANSCODING.ENABLED === true) {
+      tasks.push(
+        function createVideoTranscoderJob (callback) {
+          const dataInput = {
+            id: video.id
+          }
+
+          jobScheduler.createJob(options.transaction, 'videoTranscoder', dataInput, callback)
+        }
+      )
+    }
+
     return parallel(tasks, next)
   }
 
@@ -503,6 +498,59 @@ function toUpdateRemoteJSON (callback) {
   return json
 }
 
+function transcodeVideofile (finalCallback) {
+  const video = this
+
+  const videosDirectory = constants.CONFIG.STORAGE.VIDEOS_DIR
+  const newExtname = '.mp4'
+  const videoInputPath = pathUtils.join(videosDirectory, video.getVideoFilename())
+  const videoOutputPath = pathUtils.join(videosDirectory, video.id + '-transcoded' + newExtname)
+
+  ffmpeg(videoInputPath)
+    .output(videoOutputPath)
+    .videoCodec('libx264')
+    .outputOption('-threads ' + constants.CONFIG.TRANSCODING.THREADS)
+    .outputOption('-movflags faststart')
+    .on('error', finalCallback)
+    .on('end', function () {
+      series([
+        function removeOldFile (callback) {
+          fs.unlink(videoInputPath, callback)
+        },
+
+        function moveNewFile (callback) {
+          // Important to do this before getVideoFilename() to take in account the new file extension
+          video.set('extname', newExtname)
+
+          const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
+          fs.rename(videoOutputPath, newVideoPath, callback)
+        },
+
+        function torrent (callback) {
+          const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
+          createTorrentFromVideo(video, newVideoPath, callback)
+        },
+
+        function videoExtension (callback) {
+          video.save().asCallback(callback)
+        }
+
+      ], function (err) {
+        if (err) {
+          // Autodescruction...
+          video.destroy().asCallback(function (err) {
+            if (err) logger.error('Cannot destruct video after transcoding failure.', { error: err })
+          })
+
+          return finalCallback(err)
+        }
+
+        return finalCallback(null)
+      })
+    })
+    .run()
+}
+
 // ------------------------------ STATICS ------------------------------
 
 function generateThumbnailFromData (video, thumbnailData, callback) {
@@ -737,6 +785,30 @@ function removePreview (video, callback) {
   fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback)
 }
 
+function createTorrentFromVideo (video, videoPath, callback) {
+  const options = {
+    announceList: [
+      [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
+    ],
+    urlList: [
+      constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
+    ]
+  }
+
+  createTorrent(videoPath, options, function (err, torrent) {
+    if (err) return callback(err)
+
+    const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
+    fs.writeFile(filePath, torrent, function (err) {
+      if (err) return callback(err)
+
+      const parsedTorrent = parseTorrent(torrent)
+      video.set('infoHash', parsedTorrent.infoHash)
+      video.validate().asCallback(callback)
+    })
+  })
+}
+
 function createPreview (video, videoPath, callback) {
   generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback)
 }