Server: try to have a better video integrity
authorChocobozzz <florian.bigard@gmail.com>
Fri, 6 Jan 2017 22:24:47 +0000 (23:24 +0100)
committerChocobozzz <florian.bigard@gmail.com>
Fri, 6 Jan 2017 22:46:36 +0000 (23:46 +0100)
server/controllers/api/remote/videos.js
server/controllers/api/videos.js
server/helpers/utils.js
server/lib/friends.js
server/models/pod.js
server/models/request.js

index d02da44639a9e3946d46652b60a07d9ddd094653..6d768eae885a54d62b6fb306308ba56a090d1fe4 100644 (file)
@@ -10,6 +10,7 @@ const secureMiddleware = middlewares.secure
 const videosValidators = middlewares.validators.remote.videos
 const signatureValidators = middlewares.validators.remote.signature
 const logger = require('../../../helpers/logger')
+const utils = require('../../../helpers/utils')
 
 const router = express.Router()
 
@@ -37,11 +38,11 @@ function remoteVideos (req, res, next) {
 
     switch (request.type) {
       case 'add':
-        addRemoteVideo(data, fromPod, callbackEach)
+        addRemoteVideoRetryWrapper(data, fromPod, callbackEach)
         break
 
       case 'update':
-        updateRemoteVideo(data, fromPod, callbackEach)
+        updateRemoteVideoRetryWrapper(data, fromPod, callbackEach)
         break
 
       case 'remove':
@@ -63,13 +64,30 @@ function remoteVideos (req, res, next) {
   return res.type('json').status(204).end()
 }
 
+// Handle retries on fail
+function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) {
+  utils.transactionRetryer(
+    function (callback) {
+      return addRemoteVideo(videoToCreateData, fromPod, callback)
+    },
+    function (err) {
+      if (err) {
+        logger.error('Cannot insert the remote video with many retries.', { error: err })
+        return finalCallback(err)
+      }
+
+      return finalCallback()
+    }
+  )
+}
+
 function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
-  logger.debug('Adding remote video "%s".', videoToCreateData.name)
+  logger.debug('Adding remote video "%s".', videoToCreateData.remoteId)
 
   waterfall([
 
     function startTransaction (callback) {
-      db.sequelize.transaction().asCallback(function (err, t) {
+      db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) {
         return callback(err, t)
       })
     },
@@ -103,6 +121,7 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
         authorId: author.id,
         duration: videoToCreateData.duration,
         createdAt: videoToCreateData.createdAt,
+        // FIXME: updatedAt does not seems to be considered by Sequelize
         updatedAt: videoToCreateData.updatedAt
       }
 
@@ -142,7 +161,8 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
 
   ], function (err, t) {
     if (err) {
-      logger.error('Cannot insert the remote video.')
+      // This is just a debug because we will retry the insert
+      logger.debug('Cannot insert the remote video.', { error: err })
 
       // Abort transaction?
       if (t) t.rollback()
@@ -157,8 +177,25 @@ function addRemoteVideo (videoToCreateData, fromPod, finalCallback) {
   })
 }
 
+// Handle retries on fail
+function updateRemoteVideoRetryWrapper (videoAttributesToUpdate, fromPod, finalCallback) {
+  utils.transactionRetryer(
+    function (callback) {
+      return updateRemoteVideo(videoAttributesToUpdate, fromPod, callback)
+    },
+    function (err) {
+      if (err) {
+        logger.error('Cannot update the remote video with many retries.', { error: err })
+        return finalCallback(err)
+      }
+
+      return finalCallback()
+    }
+  )
+}
+
 function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) {
-  logger.debug('Updating remote video "%s".', videoAttributesToUpdate.name)
+  logger.debug('Updating remote video "%s".', videoAttributesToUpdate.remoteId)
 
   waterfall([
 
@@ -208,7 +245,8 @@ function updateRemoteVideo (videoAttributesToUpdate, fromPod, finalCallback) {
 
   ], function (err, t) {
     if (err) {
-      logger.error('Cannot update the remote video.')
+      // This is just a debug because we will retry the insert
+      logger.debug('Cannot update the remote video.', { error: err })
 
       // Abort transaction?
       if (t) t.rollback()
@@ -238,7 +276,7 @@ function reportAbuseRemoteVideo (reportData, fromPod, callback) {
     if (err || !video) {
       if (!err) err = new Error('video not found')
 
-      logger.error('Cannot load video from host and remote id.', { error: err })
+      logger.error('Cannot load video from id.', { error: err, id: reportData.videoRemoteId })
       return callback(err)
     }
 
@@ -260,7 +298,7 @@ function fetchVideo (podHost, remoteId, callback) {
     if (err || !video) {
       if (!err) err = new Error('video not found')
 
-      logger.error('Cannot load video from host and remote id.', { error: err })
+      logger.error('Cannot load video from host and remote id.', { error: err, podHost, remoteId })
       return callback(err)
     }
 
index 6829804ecd82746dab55a30b722e26160516d9d1..4d45c11c0334fe585c31ad4ff5756dc8d03c0844 100644 (file)
@@ -70,13 +70,13 @@ router.put('/:id',
   oAuth.authenticate,
   reqFiles,
   validatorsVideos.videosUpdate,
-  updateVideo
+  updateVideoRetryWrapper
 )
 router.post('/',
   oAuth.authenticate,
   reqFiles,
   validatorsVideos.videosAdd,
-  addVideo
+  addVideoRetryWrapper
 )
 router.get('/:id',
   validatorsVideos.videosGet,
@@ -103,19 +103,37 @@ module.exports = router
 
 // ---------------------------------------------------------------------------
 
-function addVideo (req, res, next) {
-  const videoFile = req.files.videofile[0]
+// Wrapper to video add that retry the function if there is a database error
+// We need this because we run the transaction in SERIALIZABLE isolation that can fail
+function addVideoRetryWrapper (req, res, next) {
+  utils.transactionRetryer(
+    function (callback) {
+      return addVideo(req, res, req.files.videofile[0], callback)
+    },
+    function (err) {
+      if (err) {
+        logger.error('Cannot insert the video with many retries.', { error: err })
+        return next(err)
+      }
+
+      // TODO : include Location of the new video -> 201
+      return res.type('json').status(204).end()
+    }
+  )
+}
+
+function addVideo (req, res, videoFile, callback) {
   const videoInfos = req.body
 
   waterfall([
 
-    function startTransaction (callback) {
-      db.sequelize.transaction().asCallback(function (err, t) {
-        return callback(err, t)
+    function startTransaction (callbackWaterfall) {
+      db.sequelize.transaction({ isolationLevel: 'SERIALIZABLE' }).asCallback(function (err, t) {
+        return callbackWaterfall(err, t)
       })
     },
 
-    function findOrCreateAuthor (t, callback) {
+    function findOrCreateAuthor (t, callbackWaterfall) {
       const user = res.locals.oauth.token.User
 
       const name = user.username
@@ -124,19 +142,19 @@ function addVideo (req, res, next) {
       const userId = user.id
 
       db.Author.findOrCreateAuthor(name, podId, userId, t, function (err, authorInstance) {
-        return callback(err, t, authorInstance)
+        return callbackWaterfall(err, t, authorInstance)
       })
     },
 
-    function findOrCreateTags (t, author, callback) {
+    function findOrCreateTags (t, author, callbackWaterfall) {
       const tags = videoInfos.tags
 
       db.Tag.findOrCreateTags(tags, t, function (err, tagInstances) {
-        return callback(err, t, author, tagInstances)
+        return callbackWaterfall(err, t, author, tagInstances)
       })
     },
 
-    function createVideoObject (t, author, tagInstances, callback) {
+    function createVideoObject (t, author, tagInstances, callbackWaterfall) {
       const videoData = {
         name: videoInfos.name,
         remoteId: null,
@@ -148,74 +166,97 @@ function addVideo (req, res, next) {
 
       const video = db.Video.build(videoData)
 
-      return callback(null, t, author, tagInstances, video)
+      return callbackWaterfall(null, t, author, tagInstances, video)
     },
 
      // Set the videoname the same as the id
-    function renameVideoFile (t, author, tagInstances, video, callback) {
+    function renameVideoFile (t, author, tagInstances, video, callbackWaterfall) {
       const videoDir = constants.CONFIG.STORAGE.VIDEOS_DIR
       const source = path.join(videoDir, videoFile.filename)
       const destination = path.join(videoDir, video.getVideoFilename())
 
       fs.rename(source, destination, function (err) {
-        return callback(err, t, author, tagInstances, video)
+        if (err) return callbackWaterfall(err)
+
+        // This is important in case if there is another attempt
+        videoFile.filename = video.getVideoFilename()
+        return callbackWaterfall(null, t, author, tagInstances, video)
       })
     },
 
-    function insertVideoIntoDB (t, author, tagInstances, video, callback) {
+    function insertVideoIntoDB (t, author, tagInstances, video, callbackWaterfall) {
       const options = { transaction: t }
 
       // Add tags association
       video.save(options).asCallback(function (err, videoCreated) {
-        if (err) return callback(err)
+        if (err) return callbackWaterfall(err)
 
         // Do not forget to add Author informations to the created video
         videoCreated.Author = author
 
-        return callback(err, t, tagInstances, videoCreated)
+        return callbackWaterfall(err, t, tagInstances, videoCreated)
       })
     },
 
-    function associateTagsToVideo (t, tagInstances, video, callback) {
+    function associateTagsToVideo (t, tagInstances, video, callbackWaterfall) {
       const options = { transaction: t }
 
       video.setTags(tagInstances, options).asCallback(function (err) {
         video.Tags = tagInstances
 
-        return callback(err, t, video)
+        return callbackWaterfall(err, t, video)
       })
     },
 
-    function sendToFriends (t, video, callback) {
+    function sendToFriends (t, video, callbackWaterfall) {
       video.toAddRemoteJSON(function (err, remoteVideo) {
-        if (err) return callback(err)
+        if (err) return callbackWaterfall(err)
 
         // Now we'll add the video's meta data to our friends
-        friends.addVideoToFriends(remoteVideo)
-
-        return callback(null, t)
+        friends.addVideoToFriends(remoteVideo, t, function (err) {
+          return callbackWaterfall(err, t)
+        })
       })
     }
 
   ], function andFinally (err, t) {
     if (err) {
-      logger.error('Cannot insert the video.')
+      // This is just a debug because we will retry the insert
+      logger.debug('Cannot insert the video.', { error: err })
 
       // Abort transaction?
       if (t) t.rollback()
 
-      return next(err)
+      return callback(err)
     }
 
     // Commit transaction
     t.commit()
 
-    // TODO : include Location of the new video -> 201
-    return res.type('json').status(204).end()
+    logger.info('Video with name %s created.', videoInfos.name)
+
+    return callback(null)
   })
 }
 
-function updateVideo (req, res, next) {
+function updateVideoRetryWrapper (req, res, next) {
+  utils.transactionRetryer(
+    function (callback) {
+      return updateVideo(req, res, callback)
+    },
+    function (err) {
+      if (err) {
+        logger.error('Cannot update the video with many retries.', { error: err })
+        return next(err)
+      }
+
+      // TODO : include Location of the new video -> 201
+      return res.type('json').status(204).end()
+    }
+  )
+}
+
+function updateVideo (req, res, finalCallback) {
   const videoInstance = res.locals.video
   const videoInfosToUpdate = req.body
 
@@ -267,26 +308,25 @@ function updateVideo (req, res, next) {
       const json = videoInstance.toUpdateRemoteJSON()
 
       // Now we'll update the video's meta data to our friends
-      friends.updateVideoToFriends(json)
-
-      return callback(null, t)
+      friends.updateVideoToFriends(json, t, function (err) {
+        return callback(err, t)
+      })
     }
 
   ], function andFinally (err, t) {
     if (err) {
-      logger.error('Cannot insert the video.')
+      logger.debug('Cannot update the video.', { error: err })
 
       // Abort transaction?
       if (t) t.rollback()
 
-      return next(err)
+      return finalCallback(err)
     }
 
     // Commit transaction
     t.commit()
 
-    // TODO : include Location of the new video -> 201
-    return res.type('json').status(204).end()
+    return finalCallback(null)
   })
 }
 
index 9f4b145825758c304c4ef1aeb18402d16f90e364..a902850cdd044026247350d479b8de07dd45e51b 100644 (file)
@@ -1,6 +1,7 @@
 'use strict'
 
 const crypto = require('crypto')
+const retry = require('async/retry')
 
 const logger = require('./logger')
 
@@ -9,7 +10,8 @@ const utils = {
   cleanForExit,
   generateRandomString,
   isTestInstance,
-  getFormatedObjects
+  getFormatedObjects,
+  transactionRetryer
 }
 
 function badRequest (req, res, next) {
@@ -46,6 +48,18 @@ function getFormatedObjects (objects, objectsTotal) {
   }
 }
 
+function transactionRetryer (func, callback) {
+  retry({
+    times: 5,
+
+    errorFilter: function (err) {
+      const willRetry = (err.name === 'SequelizeDatabaseError')
+      logger.debug('Maybe retrying the transaction function.', { willRetry })
+      return willRetry
+    }
+  }, func, callback)
+}
+
 // ---------------------------------------------------------------------------
 
 module.exports = utils
index 4afb91b8bf23512d978ce4f3befbfde9d4054796..3d3d0fdeed1da06bb6f0cc37147a5b5f3a32f0a8 100644 (file)
@@ -24,16 +24,33 @@ const friends = {
   sendOwnedVideosToPod
 }
 
-function addVideoToFriends (videoData) {
-  createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, videoData)
+function addVideoToFriends (videoData, transaction, callback) {
+  const options = {
+    type: 'add',
+    endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+    data: videoData,
+    transaction
+  }
+  createRequest(options, callback)
 }
 
-function updateVideoToFriends (videoData) {
-  createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, videoData)
+function updateVideoToFriends (videoData, transaction, callback) {
+  const options = {
+    type: 'update',
+    endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+    data: videoData,
+    transaction
+  }
+  createRequest(options, callback)
 }
 
 function removeVideoToFriends (videoParams) {
-  createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
+  const options = {
+    type: 'remove',
+    endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+    data: videoParams
+  }
+  createRequest(options)
 }
 
 function reportAbuseVideoToFriend (reportData, video) {
@@ -258,25 +275,35 @@ function makeRequestsToWinningPods (cert, podsList, callback) {
 }
 
 // Wrapper that populate "toIds" argument with all our friends if it is not specified
-function createRequest (type, endpoint, data, toIds) {
-  if (toIds) return _createRequest(type, endpoint, data, toIds)
+// { type, endpoint, data, toIds, transaction }
+function createRequest (options, callback) {
+  if (!callback) callback = function () {}
+  if (options.toIds) return _createRequest(options, callback)
 
   // If the "toIds" pods is not specified, we send the request to all our friends
-  db.Pod.listAllIds(function (err, podIds) {
+  db.Pod.listAllIds(options.transaction, function (err, podIds) {
     if (err) {
       logger.error('Cannot get pod ids', { error: err })
       return
     }
 
-    return _createRequest(type, endpoint, data, podIds)
+    const newOptions = Object.assign(options, { toIds: podIds })
+    return _createRequest(newOptions, callback)
   })
 }
 
-function _createRequest (type, endpoint, data, toIds) {
+// { type, endpoint, data, toIds, transaction }
+function _createRequest (options, callback) {
+  const type = options.type
+  const endpoint = options.endpoint
+  const data = options.data
+  const toIds = options.toIds
+  const transaction = options.transaction
+
   const pods = []
 
   // If there are no destination pods abort
-  if (toIds.length === 0) return
+  if (toIds.length === 0) return callback(null)
 
   toIds.forEach(function (toPod) {
     pods.push(db.Pod.build({ id: toPod }))
@@ -290,17 +317,14 @@ function _createRequest (type, endpoint, data, toIds) {
     }
   }
 
-  // We run in transaction to keep coherency between Request and RequestToPod tables
-  db.sequelize.transaction(function (t) {
-    const dbRequestOptions = {
-      transaction: t
-    }
+  const dbRequestOptions = {
+    transaction
+  }
 
-    return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
-      return request.setPods(pods, dbRequestOptions)
-    })
-  }).asCallback(function (err) {
-    if (err) logger.error('Error in createRequest transaction.', { error: err })
+  return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
+    if (err) return callback(err)
+
+    return request.setPods(pods, dbRequestOptions).asCallback(callback)
   })
 }
 
index 83ecd732e0a1502873e9a8cb62020eaee48337d5..8e7dd1fd8d7f71a805ff087fd98072ef4f3df9a7 100644 (file)
@@ -115,11 +115,18 @@ function list (callback) {
   return this.findAll().asCallback(callback)
 }
 
-function listAllIds (callback) {
+function listAllIds (transaction, callback) {
+  if (!callback) {
+    callback = transaction
+    transaction = null
+  }
+
   const query = {
     attributes: [ 'id' ]
   }
 
+  if (transaction) query.transaction = transaction
+
   return this.findAll(query).asCallback(function (err, pods) {
     if (err) return callback(err)
 
index bae227c058aaa902ecdba0acc682a5f743bbb64a..1d60380449014dbdd5a1f65358ae614569077adc 100644 (file)
@@ -291,8 +291,8 @@ function listWithLimitAndRandom (limit, callback) {
       order: [
         [ 'id', 'ASC' ]
       ],
-      offset: start,
-      limit: limit,
+      // offset: start,
+      // limit: limit,
       include: [ this.sequelize.models.Pod ]
     }