const videosValidators = middlewares.validators.remote.videos
const signatureValidators = middlewares.validators.remote.signature
const logger = require('../../../helpers/logger')
+const utils = require('../../../helpers/utils')
const router = express.Router()
switch (request.type) {
case 'add':
- addRemoteVideo(data, fromPod, callbackEach)
+ addRemoteVideoRetryWrapper(data, fromPod, callbackEach)
break
case 'update':
- updateRemoteVideo(data, fromPod, callbackEach)
+ updateRemoteVideoRetryWrapper(data, fromPod, callbackEach)
break
case 'remove':
return res.type('json').status(204).end()
}
+// Handle retries on fail
+function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) {
+ utils.transactionRetryer(
+ function (callback) {
+ return addRemoteVideo(videoToCreateData, fromPod, callback)
+ },
+ function (err) {
+ if (err) {
+ logger.error('Cannot insert the remote video with many retries.', { error: err })
+ return finalCallback(err)
+ }
+
+ return finalCallback()
+ }
+ )
+}
+
function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
- logger.debug('Adding remote video "%s".', videoToCreateData.name)
+ logger.debug('Adding remote video "%s".', videoToCreateData.remoteId)
waterfall([
function startTransaction (callback) {
- db.sequelize.transaction().asCallback(function (err, t) {
+ db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) {
return callback(err, t)
})
},
authorId: author.id,
duration: videoToCreateData.duration,
createdAt: videoToCreateData.createdAt,
+ // FIXME: updatedAt does not seems to be considered by Sequelize
updatedAt: videoToCreateData.updatedAt
}
], function (err, t) {
if (err) {
- logger.error('Cannot insert the remote video.')
+ // This is just a debug because we will retry the insert
+ logger.debug('Cannot insert the remote video.', { error: err })
// Abort transaction?
if (t) t.rollback()
})
}
+// Handle retries on fail
+function updateRemoteVideoRetryWrapper (videoAttributesToUpdate, fromPod, finalCallback) {
+ utils.transactionRetryer(
+ function (callback) {
+ return updateRemoteVideo(videoAttributesToUpdate, fromPod, callback)
+ },
+ function (err) {
+ if (err) {
+ logger.error('Cannot update the remote video with many retries.', { error: err })
+ return finalCallback(err)
+ }
+
+ return finalCallback()
+ }
+ )
+}
+
function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) {
- logger.debug('Updating remote video "%s".', videoAttributesToUpdate.name)
+ logger.debug('Updating remote video "%s".', videoAttributesToUpdate.remoteId)
waterfall([
], function (err, t) {
if (err) {
- logger.error('Cannot update the remote video.')
+ // This is just a debug because we will retry the insert
+ logger.debug('Cannot update the remote video.', { error: err })
// Abort transaction?
if (t) t.rollback()
if (err || !video) {
if (!err) err = new Error('video not found')
- logger.error('Cannot load video from host and remote id.', { error: err })
+ logger.error('Cannot load video from id.', { error: err, id: reportData.videoRemoteId })
return callback(err)
}
if (err || !video) {
if (!err) err = new Error('video not found')
- logger.error('Cannot load video from host and remote id.', { error: err })
+ logger.error('Cannot load video from host and remote id.', { error: err, podHost, remoteId })
return callback(err)
}
oAuth.authenticate,
reqFiles,
validatorsVideos.videosUpdate,
- updateVideo
+ updateVideoRetryWrapper
)
router.post('/',
oAuth.authenticate,
reqFiles,
validatorsVideos.videosAdd,
- addVideo
+ addVideoRetryWrapper
)
router.get('/:id',
validatorsVideos.videosGet,
// ---------------------------------------------------------------------------
-function addVideo (req, res, next) {
- const videoFile = req.files.videofile[0]
+// Wrapper to video add that retry the function if there is a database error
+// We need this because we run the transaction in SERIALIZABLE isolation that can fail
+function addVideoRetryWrapper (req, res, next) {
+ utils.transactionRetryer(
+ function (callback) {
+ return addVideo(req, res, req.files.videofile[0], callback)
+ },
+ function (err) {
+ if (err) {
+ logger.error('Cannot insert the video with many retries.', { error: err })
+ return next(err)
+ }
+
+ // TODO : include Location of the new video -> 201
+ return res.type('json').status(204).end()
+ }
+ )
+}
+
+function addVideo (req, res, videoFile, callback) {
const videoInfos = req.body
waterfall([
- function startTransaction (callback) {
- db.sequelize.transaction().asCallback(function (err, t) {
- return callback(err, t)
+ function startTransaction (callbackWaterfall) {
+ db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) {
+ return callbackWaterfall(err, t)
})
},
- function findOrCreateAuthor (t, callback) {
+ function findOrCreateAuthor (t, callbackWaterfall) {
const user = res.locals.oauth.token.User
const name = user.username
const userId = user.id
db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
- return callback(err, t, authorInstance)
+ return callbackWaterfall(err, t, authorInstance)
})
},
- function findOrCreateTags (t, author, callback) {
+ function findOrCreateTags (t, author, callbackWaterfall) {
const tags = videoInfos.tags
db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
- return callback(err, t, author, tagInstances)
+ return callbackWaterfall(err, t, author, tagInstances)
})
},
- function createVideoObject (t, author, tagInstances, callback) {
+ function createVideoObject (t, author, tagInstances, callbackWaterfall) {
const videoData = {
name: videoInfos.name,
remoteId: null,
const video = db.Video.build(videoData)
- return callback(null, t, author, tagInstances, video)
+ return callbackWaterfall(null, t, author, tagInstances, video)
},
// Set the videoname the same as the id
- function renameVideoFile (t, author, tagInstances, video, callback) {
+ function renameVideoFile (t, author, tagInstances, video, callbackWaterfall) {
const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR
const source = path.join(videoDir, videoFile.filename)
const destination = path.join(videoDir, video.getVideoFilename())
fs.rename(source, destination, function (err) {
- return callback(err, t, author, tagInstances, video)
+ if (err) return callbackWaterfall(err)
+
+ // This is important in case if there is another attempt
+ videoFile.filename = video.getVideoFilename()
+ return callbackWaterfall(null, t, author, tagInstances, video)
})
},
- function insertVideoIntoDB (t, author, tagInstances, video, callback) {
+ function insertVideoIntoDB (t, author, tagInstances, video, callbackWaterfall) {
const options = { transaction: t }
// Add tags association
video.save(options).asCallback(function (err, videoCreated) {
- if (err) return callback(err)
+ if (err) return callbackWaterfall(err)
// Do not forget to add Author informations to the created video
videoCreated.Author = author
- return callback(err, t, tagInstances, videoCreated)
+ return callbackWaterfall(err, t, tagInstances, videoCreated)
})
},
- function associateTagsToVideo (t, tagInstances, video, callback) {
+ function associateTagsToVideo (t, tagInstances, video, callbackWaterfall) {
const options = { transaction: t }
video.setTags(tagInstances, options).asCallback(function (err) {
video.Tags = tagInstances
- return callback(err, t, video)
+ return callbackWaterfall(err, t, video)
})
},
- function sendToFriends (t, video, callback) {
+ function sendToFriends (t, video, callbackWaterfall) {
video.toAddRemoteJSON(function (err, remoteVideo) {
- if (err) return callback(err)
+ if (err) return callbackWaterfall(err)
// Now we'll add the video's meta data to our friends
- friends.addVideoToFriends(remoteVideo)
-
- return callback(null, t)
+ friends.addVideoToFriends(remoteVideo, t, function (err) {
+ return callbackWaterfall(err, t)
+ })
})
}
], function andFinally (err, t) {
if (err) {
- logger.error('Cannot insert the video.')
+ // This is just a debug because we will retry the insert
+ logger.debug('Cannot insert the video.', { error: err })
// Abort transaction?
if (t) t.rollback()
- return next(err)
+ return callback(err)
}
// Commit transaction
t.commit()
- // TODO : include Location of the new video -> 201
- return res.type('json').status(204).end()
+ logger.info('Video with name %s created.', videoInfos.name)
+
+ return callback(null)
})
}
-function updateVideo (req, res, next) {
+function updateVideoRetryWrapper (req, res, next) {
+ utils.transactionRetryer(
+ function (callback) {
+ return updateVideo(req, res, callback)
+ },
+ function (err) {
+ if (err) {
+ logger.error('Cannot update the video with many retries.', { error: err })
+ return next(err)
+ }
+
+ // TODO : include Location of the new video -> 201
+ return res.type('json').status(204).end()
+ }
+ )
+}
+
+function updateVideo (req, res, finalCallback) {
const videoInstance = res.locals.video
const videoInfosToUpdate = req.body
const json = videoInstance.toUpdateRemoteJSON()
// Now we'll update the video's meta data to our friends
- friends.updateVideoToFriends(json)
-
- return callback(null, t)
+ friends.updateVideoToFriends(json, t, function (err) {
+ return callback(err, t)
+ })
}
], function andFinally (err, t) {
if (err) {
- logger.error('Cannot insert the video.')
+ logger.debug('Cannot update the video.', { error: err })
// Abort transaction?
if (t) t.rollback()
- return next(err)
+ return finalCallback(err)
}
// Commit transaction
t.commit()
- // TODO : include Location of the new video -> 201
- return res.type('json').status(204).end()
+ return finalCallback(null)
})
}
sendOwnedVideosToPod
}
-function addVideoToFriends (videoData) {
- createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, videoData)
+function addVideoToFriends (videoData, transaction, callback) {
+ const options = {
+ type: 'add',
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoData,
+ transaction
+ }
+ createRequest(options, callback)
}
-function updateVideoToFriends (videoData) {
- createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, videoData)
+function updateVideoToFriends (videoData, transaction, callback) {
+ const options = {
+ type: 'update',
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoData,
+ transaction
+ }
+ createRequest(options, callback)
}
function removeVideoToFriends (videoParams) {
- createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
+ const options = {
+ type: 'remove',
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoParams
+ }
+ createRequest(options)
}
function reportAbuseVideoToFriend (reportData, video) {
}
// Wrapper that populate "toIds" argument with all our friends if it is not specified
-function createRequest (type, endpoint, data, toIds) {
- if (toIds) return _createRequest(type, endpoint, data, toIds)
+// { type, endpoint, data, toIds, transaction }
+function createRequest (options, callback) {
+ if (!callback) callback = function () {}
+ if (options.toIds) return _createRequest(options, callback)
// If the "toIds" pods is not specified, we send the request to all our friends
- db.Pod.listAllIds(function (err, podIds) {
+ db.Pod.listAllIds(options.transaction, function (err, podIds) {
if (err) {
logger.error('Cannot get pod ids', { error: err })
return
}
- return _createRequest(type, endpoint, data, podIds)
+ const newOptions = Object.assign(options, { toIds: podIds })
+ return _createRequest(newOptions, callback)
})
}
-function _createRequest (type, endpoint, data, toIds) {
+// { type, endpoint, data, toIds, transaction }
+function _createRequest (options, callback) {
+ const type = options.type
+ const endpoint = options.endpoint
+ const data = options.data
+ const toIds = options.toIds
+ const transaction = options.transaction
+
const pods = []
// If there are no destination pods abort
- if (toIds.length === 0) return
+ if (toIds.length === 0) return callback(null)
toIds.forEach(function (toPod) {
pods.push(db.Pod.build({ id: toPod }))
}
}
- // We run in transaction to keep coherency between Request and RequestToPod tables
- db.sequelize.transaction(function (t) {
- const dbRequestOptions = {
- transaction: t
- }
+ const dbRequestOptions = {
+ transaction
+ }
- return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
- return request.setPods(pods, dbRequestOptions)
- })
- }).asCallback(function (err) {
- if (err) logger.error('Error in createRequest transaction.', { error: err })
+ return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
+ if (err) return callback(err)
+
+ return request.setPods(pods, dbRequestOptions).asCallback(callback)
})
}