remoteVideos
)
+router.post('/qadu',
+ signatureValidators.signature,
+ secureMiddleware.checkSignature,
+ videosValidators.remoteQaduVideos,
+ remoteVideosQadu
+)
+
// ---------------------------------------------------------------------------
module.exports = router
return res.type('json').status(204).end()
}
+function remoteVideosQadu (req, res, next) {
+ const requests = req.body.data
+ const fromPod = res.locals.secure.pod
+
+ eachSeries(requests, function (request, callbackEach) {
+ const videoData = request.data
+
+ quickAndDirtyUpdateVideoRetryWrapper(videoData, fromPod, callbackEach)
+ }, function (err) {
+ if (err) logger.error('Error managing remote videos.', { error: err })
+ })
+
+ return res.type('json').status(204).end()
+}
+
+function quickAndDirtyUpdateVideoRetryWrapper (videoData, fromPod, finalCallback) {
+ const options = {
+ arguments: [ videoData, fromPod ],
+ errorMessage: 'Cannot update quick and dirty the remote video with many retries.'
+ }
+
+ databaseUtils.retryTransactionWrapper(quickAndDirtyUpdateVideo, options, finalCallback)
+}
+
+function quickAndDirtyUpdateVideo (videoData, fromPod, finalCallback) {
+ waterfall([
+ databaseUtils.startSerializableTransaction,
+
+ function findVideo (t, callback) {
+ fetchVideo(fromPod.host, videoData.remoteId, function (err, videoInstance) {
+ return callback(err, t, videoInstance)
+ })
+ },
+
+ function updateVideoIntoDB (t, videoInstance, callback) {
+ const options = { transaction: t }
+
+ if (videoData.views) {
+ videoInstance.set('views', videoData.views)
+ }
+
+ if (videoData.likes) {
+ videoInstance.set('likes', videoData.likes)
+ }
+
+ if (videoData.dislikes) {
+ videoInstance.set('dislikes', videoData.dislikes)
+ }
+
+ videoInstance.save(options).asCallback(function (err) {
+ return callback(err, t)
+ })
+ },
+
+ databaseUtils.commitTransaction
+
+ ], function (err, t) {
+ if (err) {
+ logger.debug('Cannot quick and dirty update the remote video.', { error: err })
+ return databaseUtils.rollbackTransaction(err, t, finalCallback)
+ }
+
+ logger.info('Remote video %s quick and dirty updated', videoData.name)
+ return finalCallback(null)
+ })
+}
+
// Handle retries on fail
function addRemoteVideoRetryWrapper (videoToCreateData, fromPod, finalCallback) {
const options = {
function getVideo (req, res, next) {
const videoInstance = res.locals.video
+
+ if (videoInstance.isOwned()) {
+ // The increment is done directly in the database, not using the instance value
+ videoInstance.increment('views').asCallback(function (err) {
+ if (err) {
+ logger.error('Cannot add view to video %d.', videoInstance.id)
+ return
+ }
+
+ // FIXME: make a real view system
+ // For example, only add a view when a user watch a video during 30s etc
+ friends.quickAndDirtyUpdateVideoToFriends(videoInstance.id, constants.REQUEST_VIDEO_QADU_TYPES.VIEWS)
+ })
+ }
+
+ // Do not wait the view system
res.json(videoInstance.toFormatedJSON())
}
'use strict'
+const has = require('lodash/has')
+
const constants = require('../../../initializers/constants')
const videosValidators = require('../videos')
const miscValidators = require('../misc')
const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
const remoteVideosValidators = {
- isEachRemoteRequestVideosValid
+ isEachRemoteRequestVideosValid,
+ isEachRemoteRequestVideosQaduValid
}
function isEachRemoteRequestVideosValid (requests) {
const video = request.data
return (
isRequestTypeAddValid(request.type) &&
- isCommonVideoAttrbiutesValid(video) &&
+ isCommonVideoAttributesValid(video) &&
videosValidators.isVideoAuthorValid(video.author) &&
videosValidators.isVideoThumbnailDataValid(video.thumbnailData)
) ||
(
isRequestTypeUpdateValid(request.type) &&
- isCommonVideoAttrbiutesValid(video)
+ isCommonVideoAttributesValid(video)
) ||
(
isRequestTypeRemoveValid(request.type) &&
})
}
+function isEachRemoteRequestVideosQaduValid (requests) {
+ return miscValidators.isArray(requests) &&
+ requests.every(function (request) {
+ const video = request.data
+
+ return (
+ videosValidators.isVideoRemoteIdValid(video.remoteId) &&
+ (has(video, 'views') === false || videosValidators.isVideoViewsValid) &&
+ (has(video, 'likes') === false || videosValidators.isVideoLikesValid) &&
+ (has(video, 'dislikes') === false || videosValidators.isVideoDislikesValid)
+ )
+ })
+}
+
// ---------------------------------------------------------------------------
module.exports = remoteVideosValidators
// ---------------------------------------------------------------------------
-function isCommonVideoAttrbiutesValid (video) {
+function isCommonVideoAttributesValid (video) {
return videosValidators.isVideoDateValid(video.createdAt) &&
videosValidators.isVideoDateValid(video.updatedAt) &&
videosValidators.isVideoDescriptionValid(video.description) &&
isVideoRemoteIdValid,
isVideoAbuseReasonValid,
isVideoAbuseReporterUsernameValid,
- isVideoFile
+ isVideoFile,
+ isVideoViewsValid,
+ isVideoLikesValid,
+ isVideoDislikesValid
}
function isVideoAuthorValid (value) {
return usersValidators.isUserUsernameValid(value)
}
+function isVideoViewsValid (value) {
+ return validator.isInt(value, { min: 0 })
+}
+
+function isVideoLikesValid (value) {
+ return validator.isInt(value, { min: 0 })
+}
+
+function isVideoDislikesValid (value) {
+ return validator.isInt(value, { min: 0 })
+}
+
function isVideoFile (value, files) {
// Should have files
if (!files) return false
requestParams.json.data = params.data
}
+ console.log(requestParams.json.data)
+
request.post(requestParams, callback)
}
// ---------------------------------------------------------------------------
-const LAST_MIGRATION_VERSION = 10
+const LAST_MIGRATION_VERSION = 15
// ---------------------------------------------------------------------------
const SORTABLE_COLUMNS = {
USERS: [ 'id', '-id', 'username', '-username', 'createdAt', '-createdAt' ],
VIDEO_ABUSES: [ 'id', '-id', 'createdAt', '-createdAt' ],
- VIDEOS: [ 'name', '-name', 'duration', '-duration', 'createdAt', '-createdAt' ]
+ VIDEOS: [ 'name', '-name', 'duration', '-duration', 'createdAt', '-createdAt', 'views', '-views' ]
}
const OAUTH_LIFETIME = {
// How many requests we send to a pod per interval
const REQUESTS_LIMIT_PER_POD = 5
+const REQUESTS_VIDEO_QADU_LIMIT_PODS = 10
+// The QADU requests are not big
+const REQUESTS_VIDEO_QADU_LIMIT_PER_POD = 50
+
// Number of requests to retry for replay requests module
const RETRY_REQUESTS = 5
const REQUEST_ENDPOINTS = {
- VIDEOS: 'videos'
+ VIDEOS: 'videos',
+ QADU: 'videos/qadu'
}
const REQUEST_ENDPOINT_ACTIONS = {}
REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS] = {
REPORT_ABUSE: 'report-abuse'
}
+const REQUEST_VIDEO_QADU_TYPES = {
+ LIKES: 'likes',
+ DISLIKES: 'dislikes',
+ VIEWS: 'views'
+}
+
const REMOTE_SCHEME = {
HTTP: 'https',
WS: 'wss'
REMOTE_SCHEME,
REQUEST_ENDPOINT_ACTIONS,
REQUEST_ENDPOINTS,
+ REQUEST_VIDEO_QADU_TYPES,
REQUESTS_IN_PARALLEL,
REQUESTS_INTERVAL,
REQUESTS_LIMIT_PER_POD,
REQUESTS_LIMIT_PODS,
+ REQUESTS_VIDEO_QADU_LIMIT_PER_POD,
+ REQUESTS_VIDEO_QADU_LIMIT_PODS,
RETRY_REQUESTS,
SEARCHABLE_COLUMNS,
SIGNATURE_ALGORITHM,
--- /dev/null
+'use strict'
+
+// utils = { transaction, queryInterface, sequelize, Sequelize }
+exports.up = function (utils, finalCallback) {
+ const q = utils.queryInterface
+ const Sequelize = utils.Sequelize
+
+ const data = {
+ type: Sequelize.INTEGER,
+ allowNull: false,
+ defaultValue: 0
+ }
+
+ q.addColumn('Videos', 'views', data, { transaction: utils.transaction }).asCallback(finalCallback)
+}
+
+exports.down = function (options, callback) {
+ throw new Error('Not implemented.')
+}
--- /dev/null
+'use strict'
+
+const eachLimit = require('async/eachLimit')
+
+const constants = require('../initializers/constants')
+const db = require('../initializers/database')
+const logger = require('../helpers/logger')
+const requests = require('../helpers/requests')
+
+module.exports = class BaseRequestScheduler {
+
+ constructor (options) {
+ this.lastRequestTimestamp = 0
+ this.timer = null
+ }
+
+ activate () {
+ logger.info('Requests scheduler activated.')
+ this.lastRequestTimestamp = Date.now()
+
+ this.timer = setInterval(() => {
+ this.lastRequestTimestamp = Date.now()
+ this.makeRequests()
+ }, constants.REQUESTS_INTERVAL)
+ }
+
+ deactivate () {
+ logger.info('Requests scheduler deactivated.')
+ clearInterval(this.timer)
+ this.timer = null
+ }
+
+ forceSend () {
+ logger.info('Force requests scheduler sending.')
+ this.makeRequests()
+ }
+
+ remainingMilliSeconds () {
+ if (this.timer === null) return -1
+
+ return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
+ }
+
+ // ---------------------------------------------------------------------------
+
+ // Make a requests to friends of a certain type
+ makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
+ if (!callback) callback = function () {}
+
+ const params = {
+ toPod: toPod,
+ sign: true, // Prove our identity
+ method: 'POST',
+ path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
+ data: requestsToMake // Requests we need to make
+ }
+
+ // Make multiple retry requests to all of pods
+ // The function fire some useful callbacks
+ requests.makeSecureRequest(params, (err, res) => {
+ if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
+ err = err ? err.message : 'Status code not 20x : ' + res.statusCode
+ logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
+
+ return callback(false)
+ }
+
+ return callback(true)
+ })
+ }
+
+ // Make all the requests of the scheduler
+ makeRequests () {
+ this.getRequestModel().listWithLimitAndRandom(this.limitPods, this.limitPerPod, (err, requests) => {
+ if (err) {
+ logger.error('Cannot get the list of "%s".', this.description, { err: err })
+ return // Abort
+ }
+
+ // If there are no requests, abort
+ if (requests.length === 0) {
+ logger.info('No "%s" to make.', this.description)
+ return
+ }
+
+ // We want to group requests by destinations pod and endpoint
+ const requestsToMakeGrouped = this.buildRequestObjects(requests)
+
+ logger.info('Making "%s" to friends.', this.description)
+
+ const goodPods = []
+ const badPods = []
+
+ eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
+ const requestToMake = requestsToMakeGrouped[hashKey]
+ const toPod = requestToMake.toPod
+
+ // Maybe the pod is not our friend anymore so simply remove it
+ if (!toPod) {
+ const requestIdsToDelete = requestToMake.ids
+
+ logger.info('Removing %d "%s" of unexisting pod %s.', requestIdsToDelete.length, this.description, requestToMake.toPod.id)
+ return this.getRequestToPodModel().removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
+ }
+
+ this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
+ if (success === false) {
+ badPods.push(requestToMake.toPod.id)
+ return callbackEach()
+ }
+
+ logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
+ goodPods.push(requestToMake.toPod.id)
+
+ // Remove the pod id of these request ids
+ this.getRequestToPodModel().removeByRequestIdsAndPod(requestToMake.ids, requestToMake.toPod.id, callbackEach)
+
+ this.afterRequestHook()
+ })
+ }, () => {
+ // All the requests were made, we update the pods score
+ db.Pod.updatePodsScore(goodPods, badPods)
+
+ this.afterRequestsHook()
+ })
+ })
+ }
+
+ flush (callback) {
+ this.getRequestModel().removeAll(callback)
+ }
+
+ afterRequestHook () {
+ // Nothing to do, let children reimplement it
+ }
+
+ afterRequestsHook () {
+ // Nothing to do, let children reimplement it
+ }
+}
const peertubeCrypto = require('../helpers/peertube-crypto')
const requests = require('../helpers/requests')
const RequestScheduler = require('./request-scheduler')
+const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
+
const requestScheduler = new RequestScheduler()
+const requestSchedulerVideoQadu = new RequestVideoQaduScheduler()
const friends = {
activate,
addVideoToFriends,
updateVideoToFriends,
reportAbuseVideoToFriend,
+ quickAndDirtyUpdateVideoToFriends,
hasFriends,
makeFriends,
quitFriends,
function activate () {
requestScheduler.activate()
+ requestSchedulerVideoQadu.activate()
}
function addVideoToFriends (videoData, transaction, callback) {
createRequest(options)
}
+function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
+ const options = {
+ videoId,
+ type,
+ transaction
+ }
+ return createVideoQaduRequest(options, callback)
+}
+
function hasFriends (callback) {
db.Pod.countAll(function (err, count) {
if (err) return callback(err)
waterfall([
function flushRequests (callbackAsync) {
- requestScheduler.flush(callbackAsync)
+ requestScheduler.flush(err => callbackAsync(err))
+ },
+
+ function flushVideoQaduRequests (callbackAsync) {
+ requestSchedulerVideoQadu.flush(err => callbackAsync(err))
},
function getPodsList (callbackAsync) {
})
}
+function createVideoQaduRequest (options, callback) {
+ if (!callback) callback = function () {}
+
+ requestSchedulerVideoQadu.createRequest(options, callback)
+}
+
function isMe (host) {
return host === constants.CONFIG.WEBSERVER.HOST
}
'use strict'
-const eachLimit = require('async/eachLimit')
-
const constants = require('../initializers/constants')
+const BaseRequestScheduler = require('./base-request-scheduler')
const db = require('../initializers/database')
const logger = require('../helpers/logger')
-const requests = require('../helpers/requests')
-module.exports = class RequestScheduler {
+module.exports = class RequestScheduler extends BaseRequestScheduler {
constructor () {
- this.lastRequestTimestamp = 0
- this.timer = null
- }
+ super()
- activate () {
- logger.info('Requests scheduler activated.')
- this.lastRequestTimestamp = Date.now()
+ // We limit the size of the requests
+ this.limitPods = constants.REQUESTS_LIMIT_PODS
+ this.limitPerPod = constants.REQUESTS_LIMIT_PER_POD
- this.timer = setInterval(() => {
- this.lastRequestTimestamp = Date.now()
- this.makeRequests()
- }, constants.REQUESTS_INTERVAL)
+ this.description = 'requests'
}
- deactivate () {
- logger.info('Requests scheduler deactivated.')
- clearInterval(this.timer)
- this.timer = null
+ getRequestModel () {
+ return db.Request
}
- forceSend () {
- logger.info('Force requests scheduler sending.')
- this.makeRequests()
+ getRequestToPodModel () {
+ return db.RequestToPod
}
- remainingMilliSeconds () {
- if (this.timer === null) return -1
+ buildRequestObjects (requests) {
+ const requestsToMakeGrouped = {}
+
+ Object.keys(requests).forEach(toPodId => {
+ requests[toPodId].forEach(data => {
+ const request = data.request
+ const pod = data.pod
+ const hashKey = toPodId + request.endpoint
+
+ if (!requestsToMakeGrouped[hashKey]) {
+ requestsToMakeGrouped[hashKey] = {
+ toPod: pod,
+ endpoint: request.endpoint,
+ ids: [], // request ids, to delete them from the DB in the future
+ datas: [] // requests data,
+ }
+ }
+
+ requestsToMakeGrouped[hashKey].ids.push(request.id)
+ requestsToMakeGrouped[hashKey].datas.push(request.request)
+ })
+ })
- return constants.REQUESTS_INTERVAL - (Date.now() - this.lastRequestTimestamp)
+ return requestsToMakeGrouped
}
// { type, endpoint, data, toIds, transaction }
// ---------------------------------------------------------------------------
- // Make all the requests of the scheduler
- makeRequests () {
- // We limit the size of the requests
- // We don't want to stuck with the same failing requests so we get a random list
- db.Request.listWithLimitAndRandom(constants.REQUESTS_LIMIT_PODS, constants.REQUESTS_LIMIT_PER_POD, (err, requests) => {
- if (err) {
- logger.error('Cannot get the list of requests.', { err: err })
- return // Abort
- }
-
- // If there are no requests, abort
- if (requests.length === 0) {
- logger.info('No requests to make.')
- return
- }
-
- // We want to group requests by destinations pod and endpoint
- const requestsToMakeGrouped = this.buildRequestObjects(requests)
-
- logger.info('Making requests to friends.')
-
- const goodPods = []
- const badPods = []
-
- eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, (hashKey, callbackEach) => {
- const requestToMake = requestsToMakeGrouped[hashKey]
- const toPod = requestToMake.toPod
-
- // Maybe the pod is not our friend anymore so simply remove it
- if (!toPod) {
- const requestIdsToDelete = requestToMake.ids
-
- logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPod.id)
- return db.RequestToPod.removePodOf(requestIdsToDelete, requestToMake.toPod.id, callbackEach)
- }
-
- this.makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, (success) => {
- if (success === false) {
- badPods.push(requestToMake.toPod.id)
- return callbackEach()
- }
-
- logger.debug('Removing requests for pod %s.', requestToMake.toPod.id, { requestsIds: requestToMake.ids })
- goodPods.push(requestToMake.toPod.id)
-
- // Remove the pod id of these request ids
- db.RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPod.id, callbackEach)
- })
- }, () => {
- // All the requests were made, we update the pods score
- db.Request.updatePodsScore(goodPods, badPods)
- // Flush requests with no pod
- db.Request.removeWithEmptyTo(err => {
- if (err) logger.error('Error when removing requests with no pods.', { error: err })
- })
- })
- })
- }
-
- // Make a requests to friends of a certain type
- makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
- if (!callback) callback = function () {}
-
- const params = {
- toPod: toPod,
- sign: true, // Prove our identity
- method: 'POST',
- path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
- data: requestsToMake // Requests we need to make
- }
-
- // Make multiple retry requests to all of pods
- // The function fire some useful callbacks
- requests.makeSecureRequest(params, (err, res) => {
- if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
- err = err ? err.message : 'Status code not 20x : ' + res.statusCode
- logger.error('Error sending secure request to %s pod.', toPod.host, { error: err })
-
- return callback(false)
- }
-
- return callback(true)
- })
- }
-
- buildRequestObjects (requests) {
- const requestsToMakeGrouped = {}
-
- Object.keys(requests).forEach(toPodId => {
- requests[toPodId].forEach(data => {
- const request = data.request
- const pod = data.pod
- const hashKey = toPodId + request.endpoint
-
- if (!requestsToMakeGrouped[hashKey]) {
- requestsToMakeGrouped[hashKey] = {
- toPod: pod,
- endpoint: request.endpoint,
- ids: [], // request ids, to delete them from the DB in the future
- datas: [] // requests data,
- }
- }
-
- requestsToMakeGrouped[hashKey].ids.push(request.id)
- requestsToMakeGrouped[hashKey].datas.push(request.request)
- })
- })
-
- return requestsToMakeGrouped
- }
-
- flush (callback) {
- db.Request.removeAll(err => {
- if (err) logger.error('Cannot flush the requests.', { error: err })
-
- return callback(err)
+ afterRequestsHook () {
+ // Flush requests with no pod
+ this.getRequestModel().removeWithEmptyTo(err => {
+ if (err) logger.error('Error when removing requests with no pods.', { error: err })
})
}
}
--- /dev/null
+'use strict'
+
+const BaseRequestScheduler = require('./base-request-scheduler')
+const constants = require('../initializers/constants')
+const db = require('../initializers/database')
+const logger = require('../helpers/logger')
+
+module.exports = class RequestVideoQaduScheduler extends BaseRequestScheduler {
+
+ constructor () {
+ super()
+
+ // We limit the size of the requests
+ this.limitPods = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
+ this.limitPerPod = constants.REQUESTS_VIDEO_QADU_LIMIT_PODS
+
+ this.description = 'video QADU requests'
+ }
+
+ getRequestModel () {
+ return db.RequestVideoQadu
+ }
+
+ getRequestToPodModel () {
+ return db.RequestVideoQadu
+ }
+
+ buildRequestObjects (requests) {
+ const requestsToMakeGrouped = {}
+
+ Object.keys(requests).forEach(toPodId => {
+ requests[toPodId].forEach(data => {
+ const request = data.request
+ const video = data.video
+ const pod = data.pod
+ const hashKey = toPodId
+
+ if (!requestsToMakeGrouped[hashKey]) {
+ requestsToMakeGrouped[hashKey] = {
+ toPod: pod,
+ endpoint: constants.REQUEST_ENDPOINTS.QADU,
+ ids: [], // request ids, to delete them from the DB in the future
+ datas: [], // requests data
+ videos: {}
+ }
+ }
+
+ if (!requestsToMakeGrouped[hashKey].videos[video.id]) {
+ requestsToMakeGrouped[hashKey].videos[video.id] = {}
+ }
+
+ const videoData = requestsToMakeGrouped[hashKey].videos[video.id]
+
+ switch (request.type) {
+ case constants.REQUEST_VIDEO_QADU_TYPES.LIKES:
+ videoData.likes = video.likes
+ break
+
+ case constants.REQUEST_VIDEO_QADU_TYPES.DISLIKES:
+ videoData.likes = video.dislikes
+ break
+
+ case constants.REQUEST_VIDEO_QADU_TYPES.VIEWS:
+ videoData.views = video.views
+ break
+
+ default:
+ logger.error('Unknown request video QADU type %s.', request.type)
+ return
+ }
+
+ // Do not forget the remoteId so the remote pod can identify the video
+ videoData.remoteId = video.id
+ requestsToMakeGrouped[hashKey].ids.push(request.id)
+ requestsToMakeGrouped[hashKey].videos[video.id] = videoData
+ })
+ })
+
+ Object.keys(requestsToMakeGrouped).forEach(hashKey => {
+ Object.keys(requestsToMakeGrouped[hashKey].videos).forEach(videoId => {
+ const videoData = requestsToMakeGrouped[hashKey].videos[videoId]
+
+ requestsToMakeGrouped[hashKey].datas.push({
+ data: videoData
+ })
+ })
+
+ // We don't need it anymore, it was just to build our datas array
+ delete requestsToMakeGrouped[hashKey].videos
+ })
+
+ return requestsToMakeGrouped
+ }
+
+ // { type, videoId, transaction? }
+ createRequest (options, callback) {
+ const type = options.type
+ const videoId = options.videoId
+ const transaction = options.transaction
+
+ const dbRequestOptions = {}
+ if (transaction) dbRequestOptions.transaction = transaction
+
+ // Send the update to all our friends
+ db.Pod.listAllIds(options.transaction, function (err, podIds) {
+ if (err) return callback(err)
+
+ const queries = []
+ podIds.forEach(podId => {
+ queries.push({ type, videoId, podId })
+ })
+
+ return db.RequestVideoQadu.bulkCreate(queries, dbRequestOptions).asCallback(callback)
+ })
+ }
+}
const logger = require('../../../helpers/logger')
const validatorsRemoteVideos = {
- remoteVideos
+ remoteVideos,
+ remoteQaduVideos
}
function remoteVideos (req, res, next) {
checkErrors(req, res, next)
}
+function remoteQaduVideos (req, res, next) {
+ req.checkBody('data').isEachRemoteRequestVideosQaduValid()
+
+ logger.debug('Checking remoteVideosQadu parameters', { parameters: req.body })
+
+ checkErrors(req, res, next)
+}
+
// ---------------------------------------------------------------------------
module.exports = validatorsRemoteVideos
'use strict'
+const each = require('async/each')
const map = require('lodash/map')
+const waterfall = require('async/waterfall')
const constants = require('../initializers/constants')
+const logger = require('../helpers/logger')
const customPodsValidators = require('../helpers/custom-validators').pods
// ---------------------------------------------------------------------------
listBadPods,
load,
loadByHost,
+ updatePodsScore,
removeAll
},
instanceMethods: {
})
}
-function listRandomPodIdsWithRequest (limit, callback) {
+function listRandomPodIdsWithRequest (limit, tableRequestPod, callback) {
const self = this
self.count().asCallback(function (err, count) {
where: {
id: {
$in: [
- this.sequelize.literal('SELECT "podId" FROM "RequestToPods"')
+ this.sequelize.literal('SELECT "podId" FROM "' + tableRequestPod + '"')
]
}
}
function removeAll (callback) {
return this.destroy().asCallback(callback)
}
+
+function updatePodsScore (goodPods, badPods) {
+ const self = this
+
+ logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
+
+ if (goodPods.length !== 0) {
+ this.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
+ if (err) logger.error('Cannot increment scores of good pods.', { error: err })
+ })
+ }
+
+ if (badPods.length !== 0) {
+ this.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
+ if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
+ removeBadPods.call(self)
+ })
+ }
+}
+
+// ---------------------------------------------------------------------------
+
+// Remove pods with a score of 0 (too many requests where they were unreachable)
+function removeBadPods () {
+ const self = this
+
+ waterfall([
+ function findBadPods (callback) {
+ self.sequelize.models.Pod.listBadPods(function (err, pods) {
+ if (err) {
+ logger.error('Cannot find bad pods.', { error: err })
+ return callback(err)
+ }
+
+ return callback(null, pods)
+ })
+ },
+
+ function removeTheseBadPods (pods, callback) {
+ each(pods, function (pod, callbackEach) {
+ pod.destroy().asCallback(callbackEach)
+ }, function (err) {
+ return callback(err, pods.length)
+ })
+ }
+ ], function (err, numberOfPodsRemoved) {
+ if (err) {
+ logger.error('Cannot remove bad pods.', { error: err })
+ } else if (numberOfPodsRemoved) {
+ logger.info('Removed %d pods.', numberOfPodsRemoved)
+ } else {
+ logger.info('No need to remove bad pods.')
+ }
+ })
+}
}
],
classMethods: {
- removePodOf
+ removeByRequestIdsAndPod
}
})
// ---------------------------------------------------------------------------
-function removePodOf (requestsIds, podId, callback) {
+function removeByRequestIdsAndPod (requestsIds, podId, callback) {
if (!callback) callback = function () {}
const query = {
--- /dev/null
+'use strict'
+
+/*
+ Request Video for Quick And Dirty Updates like:
+ - views
+ - likes
+ - dislikes
+
+ We can't put it in the same system than basic requests for efficiency.
+ Moreover we don't want to slow down the basic requests with a lot of views/likes/dislikes requests.
+ So we put it an independant request scheduler.
+*/
+
+const values = require('lodash/values')
+
+const constants = require('../initializers/constants')
+
+// ---------------------------------------------------------------------------
+
+module.exports = function (sequelize, DataTypes) {
+ const RequestVideoQadu = sequelize.define('RequestVideoQadu',
+ {
+ type: {
+ type: DataTypes.ENUM(values(constants.REQUEST_VIDEO_QADU_TYPES)),
+ allowNull: false
+ }
+ },
+ {
+ timestamps: false,
+ indexes: [
+ {
+ fields: [ 'podId' ]
+ },
+ {
+ fields: [ 'videoId' ]
+ }
+ ],
+ classMethods: {
+ associate,
+
+ listWithLimitAndRandom,
+
+ countTotalRequests,
+ removeAll,
+ removeByRequestIdsAndPod
+ }
+ }
+ )
+
+ return RequestVideoQadu
+}
+
+// ------------------------------ STATICS ------------------------------
+
+function associate (models) {
+ this.belongsTo(models.Pod, {
+ foreignKey: {
+ name: 'podId',
+ allowNull: false
+ },
+ onDelete: 'CASCADE'
+ })
+
+ this.belongsTo(models.Video, {
+ foreignKey: {
+ name: 'videoId',
+ allowNull: false
+ },
+ onDelete: 'CASCADE'
+ })
+}
+
+function countTotalRequests (callback) {
+ const query = {
+ include: [ this.sequelize.models.Pod ]
+ }
+
+ return this.count(query).asCallback(callback)
+}
+
+function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
+ const self = this
+ const Pod = this.sequelize.models.Pod
+
+ Pod.listRandomPodIdsWithRequest(limitPods, 'RequestVideoQadus', function (err, podIds) {
+ if (err) return callback(err)
+
+ // We don't have friends that have requests
+ if (podIds.length === 0) return callback(null, [])
+
+ const query = {
+ include: [
+ {
+ model: self.sequelize.models.Pod,
+ where: {
+ id: {
+ $in: podIds
+ }
+ }
+ },
+ {
+ model: self.sequelize.models.Video
+ }
+ ]
+ }
+
+ self.findAll(query).asCallback(function (err, requests) {
+ if (err) return callback(err)
+
+ const requestsGrouped = groupAndTruncateRequests(requests, limitRequestsPerPod)
+ return callback(err, requestsGrouped)
+ })
+ })
+}
+
+function removeByRequestIdsAndPod (ids, podId, callback) {
+ const query = {
+ where: {
+ id: {
+ $in: ids
+ },
+ podId
+ }
+ }
+
+ this.destroy(query).asCallback(callback)
+}
+
+function removeAll (callback) {
+ // Delete all requests
+ this.truncate({ cascade: true }).asCallback(callback)
+}
+
+// ---------------------------------------------------------------------------
+
+function groupAndTruncateRequests (requests, limitRequestsPerPod) {
+ const requestsGrouped = {}
+
+ requests.forEach(function (request) {
+ const pod = request.Pod
+
+ if (!requestsGrouped[pod.id]) requestsGrouped[pod.id] = []
+
+ if (requestsGrouped[pod.id].length < limitRequestsPerPod) {
+ requestsGrouped[pod.id].push({
+ request: request,
+ video: request.Video,
+ pod
+ })
+ }
+ })
+
+ return requestsGrouped
+}
'use strict'
-const each = require('async/each')
-const waterfall = require('async/waterfall')
const values = require('lodash/values')
const constants = require('../initializers/constants')
-const logger = require('../helpers/logger')
// ---------------------------------------------------------------------------
listWithLimitAndRandom,
countTotalRequests,
- removeBadPods,
- updatePodsScore,
removeAll,
removeWithEmptyTo
}
return this.count(query).asCallback(callback)
}
-// Remove pods with a score of 0 (too many requests where they were unreachable)
-function removeBadPods () {
- const self = this
-
- waterfall([
- function findBadPods (callback) {
- self.sequelize.models.Pod.listBadPods(function (err, pods) {
- if (err) {
- logger.error('Cannot find bad pods.', { error: err })
- return callback(err)
- }
-
- return callback(null, pods)
- })
- },
-
- function removeTheseBadPods (pods, callback) {
- each(pods, function (pod, callbackEach) {
- pod.destroy().asCallback(callbackEach)
- }, function (err) {
- return callback(err, pods.length)
- })
- }
- ], function (err, numberOfPodsRemoved) {
- if (err) {
- logger.error('Cannot remove bad pods.', { error: err })
- } else if (numberOfPodsRemoved) {
- logger.info('Removed %d pods.', numberOfPodsRemoved)
- } else {
- logger.info('No need to remove bad pods.')
- }
- })
-}
-
-function updatePodsScore (goodPods, badPods) {
- const self = this
- const Pod = this.sequelize.models.Pod
-
- logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
-
- if (goodPods.length !== 0) {
- Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
- if (err) logger.error('Cannot increment scores of good pods.', { error: err })
- })
- }
-
- if (badPods.length !== 0) {
- Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
- if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
- removeBadPods.call(self)
- })
- }
-}
-
function listWithLimitAndRandom (limitPods, limitRequestsPerPod, callback) {
const self = this
const Pod = this.sequelize.models.Pod
- Pod.listRandomPodIdsWithRequest(limitPods, function (err, podIds) {
+ Pod.listRandomPodIdsWithRequest(limitPods, 'RequestToPods', function (err, podIds) {
if (err) return callback(err)
// We don't have friends that have requests
if (podIds.length === 0) return callback(null, [])
- // The the first x requests of these pods
+ // The first x requests of these pods
// It is very important to sort by id ASC to keep the requests order!
const query = {
order: [
if (res === false) throw new Error('Video duration is not valid.')
}
}
+ },
+ views: {
+ type: DataTypes.INTEGER,
+ allowNull: false,
+ defaultValue: 0,
+ validate: {
+ min: 0,
+ isInt: true
+ }
}
},
{
},
{
fields: [ 'infoHash' ]
+ },
+ {
+ fields: [ 'views' ]
}
],
classMethods: {
magnetUri: this.generateMagnetUri(),
author: this.Author.name,
duration: this.duration,
+ views: this.views,
tags: map(this.Tags, 'name'),
thumbnailPath: pathUtils.join(constants.STATIC_PATHS.THUMBNAILS, this.getThumbnailName()),
createdAt: this.createdAt,
const chai = require('chai')
const each = require('async/each')
const expect = chai.expect
+const parallel = require('async/parallel')
const series = require('async/series')
const WebTorrent = require('webtorrent')
const webtorrent = new WebTorrent()
})
})
+ describe('Should update video views', function () {
+ let videoId1
+ let videoId2
+
+ before(function (done) {
+ videosUtils.getVideosList(servers[2].url, function (err, res) {
+ if (err) throw err
+
+ const videos = res.body.data.filter(video => video.isLocal === true)
+ videoId1 = videos[0].id
+ videoId2 = videos[1].id
+
+ done()
+ })
+ })
+
+ it('Should views multiple videos on owned servers', function (done) {
+ this.timeout(30000)
+
+ parallel([
+ function (callback) {
+ videosUtils.getVideo(servers[2].url, videoId1, callback)
+ },
+
+ function (callback) {
+ videosUtils.getVideo(servers[2].url, videoId1, callback)
+ },
+
+ function (callback) {
+ videosUtils.getVideo(servers[2].url, videoId1, callback)
+ },
+
+ function (callback) {
+ videosUtils.getVideo(servers[2].url, videoId2, callback)
+ }
+ ], function (err) {
+ if (err) throw err
+
+ setTimeout(done, 22000)
+ })
+ })
+
+ it('Should have views updated on each pod', function (done) {
+ each(servers, function (server, callback) {
+ videosUtils.getVideosList(server.url, function (err, res) {
+ if (err) throw err
+
+ const videos = res.body.data
+ expect(videos.find(video => video.views === 3)).to.be.exist
+ expect(videos.find(video => video.views === 1)).to.be.exist
+
+ callback()
+ })
+ }, done)
+ })
+ })
+/*
describe('Should manipulate these videos', function () {
it('Should update the video 3 by asking pod 3', function (done) {
this.timeout(15000)
}, done)
})
})
-
+*/
after(function (done) {
servers.forEach(function (server) {
process.kill(-server.app.pid)
})
})
+ it('Should have the views updated', function (done) {
+ videosUtils.getVideo(server.url, videoId, function (err, res) {
+ if (err) throw err
+
+ const video = res.body
+ expect(video.views).to.equal(1)
+
+ done()
+ })
+ })
+
it('Should search the video by name by default', function (done) {
videosUtils.searchVideo(server.url, 'my', function (err, res) {
if (err) throw err