signup:
enabled: false
+
+# If enabled, the video will be transcoded to mp4 (x264) with "faststart" flag
+# Uses a lot of CPU!
+transcoding:
+ enabled: true
+ threads: 2
signup:
enabled: false
+
+# If enabled, the video will be transcoded to mp4 (x264) with "faststart" flag
+# Uses a lot of CPU!
+transcoding:
+ enabled: false
+ threads: 2
admin:
email: 'admin6@example.com'
+
+transcoding:
+ enabled: true
const friends = require('./server/lib/friends')
const installer = require('./server/initializers/installer')
const migrator = require('./server/initializers/migrator')
+const jobScheduler = require('./server/lib/jobs/job-scheduler')
const routes = require('./server/controllers')
// ----------- Command line -----------
// Activate the communication with friends
friends.activate()
+ // Activate job scheduler
+ jobScheduler.activate()
+
logger.info('Server listening on port %d', port)
logger.info('Webserver: %s', constants.CONFIG.WEBSERVER.URL)
'webserver.https', 'webserver.hostname', 'webserver.port',
'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password',
'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews',
- 'admin.email', 'signup.enabled'
+ 'admin.email', 'signup.enabled', 'transcoding.enabled', 'transcoding.threads'
]
const miss = []
},
SIGNUP: {
ENABLED: config.get('signup.enabled')
+ },
+ TRANSCODING: {
+ ENABLED: config.get('transcoding.enabled'),
+ THREADS: config.get('transcoding.threads')
}
}
CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT
WS: 'wss'
}
+const JOB_STATES = {
+ PENDING: 'pending',
+ PROCESSING: 'processing',
+ ERROR: 'error',
+ SUCCESS: 'success'
+}
+// How many maximum jobs we fetch from the database per cycle
+const JOBS_FETCH_LIMIT_PER_CYCLE = 10
+const JOBS_CONCURRENCY = 1
+// 1 minutes
+let JOBS_FETCHING_INTERVAL = 60000
+
// ---------------------------------------------------------------------------
const PRIVATE_CERT_NAME = 'peertube.key.pem'
CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14
FRIEND_SCORE.BASE = 20
REQUESTS_INTERVAL = 10000
+ JOBS_FETCHING_INTERVAL = 10000
REMOTE_SCHEME.HTTP = 'http'
REMOTE_SCHEME.WS = 'ws'
STATIC_MAX_AGE = 0
CONFIG,
CONSTRAINTS_FIELDS,
FRIEND_SCORE,
+ JOBS_FETCHING_INTERVAL,
+ JOB_STATES,
+ JOBS_CONCURRENCY,
+ JOBS_FETCH_LIMIT_PER_CYCLE,
LAST_MIGRATION_VERSION,
OAUTH_LIFETIME,
PAGINATION_COUNT_DEFAULT,
--- /dev/null
+'use strict'
+
+const videoTranscoder = require('./video-transcoder')
+
+module.exports = {
+ videoTranscoder
+}
--- /dev/null
+'use strict'
+
+const db = require('../../../initializers/database')
+const logger = require('../../../helpers/logger')
+
+const VideoTranscoderHandler = {
+ process,
+ onError,
+ onSuccess
+}
+
+// ---------------------------------------------------------------------------
+
+function process (data, callback) {
+ db.Video.load(data.id, function (err, video) {
+ if (err) return callback(err)
+
+ video.transcodeVideofile(callback)
+ })
+}
+
+function onError (err, jobId, callback) {
+ logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
+ return callback()
+}
+
+function onSuccess (data, jobId, callback) {
+ logger.info('Job %d is a success.', jobId)
+ return callback()
+}
+
+// ---------------------------------------------------------------------------
+
+module.exports = VideoTranscoderHandler
--- /dev/null
+'use strict'
+
+const forever = require('async/forever')
+const queue = require('async/queue')
+
+const constants = require('../../initializers/constants')
+const db = require('../../initializers/database')
+const logger = require('../../helpers/logger')
+
+const jobHandlers = require('./handlers')
+
+const jobScheduler = {
+ activate,
+ createJob
+}
+
+function activate () {
+ logger.info('Jobs scheduler activated.')
+
+ const jobsQueue = queue(processJob)
+
+ forever(
+ function (next) {
+ if (jobsQueue.length() !== 0) {
+ // Finish processing the queue first
+ return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+ }
+
+ db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
+ if (err) {
+ logger.error('Cannot list pending jobs.', { error: err })
+ } else {
+ jobs.forEach(function (job) {
+ jobsQueue.push(job)
+ })
+ }
+
+ // Optimization: we could use "drain" from queue object
+ return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+ })
+ }
+ )
+}
+
+// ---------------------------------------------------------------------------
+
+module.exports = jobScheduler
+
+// ---------------------------------------------------------------------------
+
+function createJob (transaction, handlerName, handlerInputData, callback) {
+ const createQuery = {
+ state: constants.JOB_STATES.PENDING,
+ handlerName,
+ handlerInputData
+ }
+ const options = { transaction }
+
+ db.Job.create(createQuery, options).asCallback(callback)
+}
+
+function processJob (job, callback) {
+ const jobHandler = jobHandlers[job.handlerName]
+
+ logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
+
+ job.state = constants.JOB_STATES.PROCESSING
+ job.save().asCallback(function (err) {
+ if (err) return cannotSaveJobError(err, callback)
+
+ if (jobHandler === undefined) {
+ logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
+ return callback()
+ }
+
+ return jobHandler.process(job.handlerInputData, function (err, result) {
+ if (err) {
+ logger.error('Error in job handler %s.', job.handlerName, { error: err })
+ return onJobError(jobHandler, job, callback)
+ }
+
+ return onJobSuccess(jobHandler, job, callback)
+ })
+ })
+}
+
+function onJobError (jobHandler, job, callback) {
+ job.state = constants.JOB_STATES.ERROR
+
+ job.save().asCallback(function (err) {
+ if (err) return cannotSaveJobError(err, callback)
+
+ return jobHandler.onError(err, job.id, callback)
+ })
+}
+
+function onJobSuccess (jobHandler, job, callback) {
+ job.state = constants.JOB_STATES.SUCCESS
+
+ job.save().asCallback(function (err) {
+ if (err) return cannotSaveJobError(err, callback)
+
+ return jobHandler.onSuccess(err, job.id, callback)
+ })
+}
+
+function cannotSaveJobError (err, callback) {
+ logger.error('Cannot save new job state.', { error: err })
+ return callback(err)
+}
--- /dev/null
+'use strict'
+
+const values = require('lodash/values')
+
+const constants = require('../initializers/constants')
+
+// ---------------------------------------------------------------------------
+
+module.exports = function (sequelize, DataTypes) {
+ const Job = sequelize.define('Job',
+ {
+ state: {
+ type: DataTypes.ENUM(values(constants.JOB_STATES)),
+ allowNull: false
+ },
+ handlerName: {
+ type: DataTypes.STRING,
+ allowNull: false
+ },
+ handlerInputData: {
+ type: DataTypes.JSON,
+ allowNull: true
+ }
+ },
+ {
+ indexes: [
+ {
+ fields: [ 'state' ]
+ }
+ ],
+ classMethods: {
+ listWithLimit
+ }
+ }
+ )
+
+ return Job
+}
+
+// ---------------------------------------------------------------------------
+
+function listWithLimit (limit, callback) {
+ const query = {
+ order: [
+ [ 'id', 'ASC' ]
+ ],
+ limit: limit,
+ where: {
+ state: constants.JOB_STATES.PENDING
+ }
+ }
+
+ return this.findAll(query).asCallback(callback)
+}
const magnetUtil = require('magnet-uri')
const map = require('lodash/map')
const parallel = require('async/parallel')
+const series = require('async/series')
const parseTorrent = require('parse-torrent')
const pathUtils = require('path')
const values = require('lodash/values')
const modelUtils = require('./utils')
const customVideosValidators = require('../helpers/custom-validators').videos
const db = require('../initializers/database')
+const jobScheduler = require('../lib/jobs/job-scheduler')
// ---------------------------------------------------------------------------
toFormatedJSON,
toAddRemoteJSON,
toUpdateRemoteJSON,
+ transcodeVideofile,
removeFromBlacklist
},
hooks: {
tasks.push(
function createVideoTorrent (callback) {
- const options = {
- announceList: [
- [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
- ],
- urlList: [
- constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
- ]
- }
-
- createTorrent(videoPath, options, function (err, torrent) {
- if (err) return callback(err)
-
- const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
- fs.writeFile(filePath, torrent, function (err) {
- if (err) return callback(err)
-
- const parsedTorrent = parseTorrent(torrent)
- video.set('infoHash', parsedTorrent.infoHash)
- video.validate().asCallback(callback)
- })
- })
+ createTorrentFromVideo(video, videoPath, callback)
},
function createVideoThumbnail (callback) {
createThumbnail(video, videoPath, callback)
},
- function createVIdeoPreview (callback) {
+ function createVideoPreview (callback) {
createPreview(video, videoPath, callback)
}
)
+ if (constants.CONFIG.TRANSCODING.ENABLED === true) {
+ tasks.push(
+ function createVideoTranscoderJob (callback) {
+ const dataInput = {
+ id: video.id
+ }
+
+ jobScheduler.createJob(options.transaction, 'videoTranscoder', dataInput, callback)
+ }
+ )
+ }
+
return parallel(tasks, next)
}
return json
}
+function transcodeVideofile (finalCallback) {
+ const video = this
+
+ const videosDirectory = constants.CONFIG.STORAGE.VIDEOS_DIR
+ const newExtname = '.mp4'
+ const videoInputPath = pathUtils.join(videosDirectory, video.getVideoFilename())
+ const videoOutputPath = pathUtils.join(videosDirectory, video.id + '-transcoded' + newExtname)
+
+ ffmpeg(videoInputPath)
+ .output(videoOutputPath)
+ .videoCodec('libx264')
+ .outputOption('-threads ' + constants.CONFIG.TRANSCODING.THREADS)
+ .outputOption('-movflags faststart')
+ .on('error', finalCallback)
+ .on('end', function () {
+ series([
+ function removeOldFile (callback) {
+ fs.unlink(videoInputPath, callback)
+ },
+
+ function moveNewFile (callback) {
+ // Important to do this before getVideoFilename() to take in account the new file extension
+ video.set('extname', newExtname)
+
+ const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
+ fs.rename(videoOutputPath, newVideoPath, callback)
+ },
+
+ function torrent (callback) {
+ const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
+ createTorrentFromVideo(video, newVideoPath, callback)
+ },
+
+ function videoExtension (callback) {
+ video.save().asCallback(callback)
+ }
+
+ ], function (err) {
+ if (err) {
+ // Autodescruction...
+ video.destroy().asCallback(function (err) {
+ if (err) logger.error('Cannot destruct video after transcoding failure.', { error: err })
+ })
+
+ return finalCallback(err)
+ }
+
+ return finalCallback(null)
+ })
+ })
+ .run()
+}
+
// ------------------------------ STATICS ------------------------------
function generateThumbnailFromData (video, thumbnailData, callback) {
fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback)
}
+function createTorrentFromVideo (video, videoPath, callback) {
+ const options = {
+ announceList: [
+ [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
+ ],
+ urlList: [
+ constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
+ ]
+ }
+
+ createTorrent(videoPath, options, function (err, torrent) {
+ if (err) return callback(err)
+
+ const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
+ fs.writeFile(filePath, torrent, function (err) {
+ if (err) return callback(err)
+
+ const parsedTorrent = parseTorrent(torrent)
+ video.set('infoHash', parsedTorrent.infoHash)
+ video.validate().asCallback(callback)
+ })
+ })
+}
+
function createPreview (video, videoPath, callback) {
generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback)
}