const each = require('async/each')
const eachLimit = require('async/eachLimit')
const eachSeries = require('async/eachSeries')
-const fs = require('fs')
+const series = require('async/series')
const request = require('request')
const waterfall = require('async/waterfall')
const constants = require('../initializers/constants')
const db = require('../initializers/database')
const logger = require('../helpers/logger')
+const peertubeCrypto = require('../helpers/peertube-crypto')
const requests = require('../helpers/requests')
+const utils = require('../helpers/utils')
+const RequestScheduler = require('./request-scheduler')
+const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
+const RequestVideoEventScheduler = require('./request-video-event-scheduler')
+
+const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
+
+const requestScheduler = new RequestScheduler()
+const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
+const requestVideoEventScheduler = new RequestVideoEventScheduler()
const friends = {
+ activate,
addVideoToFriends,
updateVideoToFriends,
+ reportAbuseVideoToFriend,
+ quickAndDirtyUpdateVideoToFriends,
+ quickAndDirtyUpdatesVideoToFriends,
+ addEventToRemoteVideo,
+ addEventsToRemoteVideo,
hasFriends,
- getMyCertificate,
makeFriends,
quitFriends,
removeVideoToFriends,
- sendOwnedVideosToPod
+ sendOwnedVideosToPod,
+ getRequestScheduler,
+ getRequestVideoQaduScheduler,
+ getRequestVideoEventScheduler
+}
+
+function activate () {
+ requestScheduler.activate()
+ requestVideoQaduScheduler.activate()
+ requestVideoEventScheduler.activate()
}
-function addVideoToFriends (video) {
- createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
+function addVideoToFriends (videoData, transaction, callback) {
+ const options = {
+ type: ENDPOINT_ACTIONS.ADD,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoData,
+ transaction
+ }
+ createRequest(options, callback)
+}
+
+function updateVideoToFriends (videoData, transaction, callback) {
+ const options = {
+ type: ENDPOINT_ACTIONS.UPDATE,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoData,
+ transaction
+ }
+ createRequest(options, callback)
+}
+
+function removeVideoToFriends (videoParams) {
+ const options = {
+ type: ENDPOINT_ACTIONS.REMOVE,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: videoParams
+ }
+ createRequest(options)
+}
+
+function reportAbuseVideoToFriend (reportData, video) {
+ const options = {
+ type: ENDPOINT_ACTIONS.REPORT_ABUSE,
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: reportData,
+ toIds: [ video.Author.podId ]
+ }
+ createRequest(options)
}
-function updateVideoToFriends (video) {
- createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, video)
+function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction, callback) {
+ const options = {
+ videoId: qaduParams.videoId,
+ type: qaduParams.type,
+ transaction
+ }
+ return createVideoQaduRequest(options, callback)
+}
+
+function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) {
+ const tasks = []
+
+ qadusParams.forEach(function (qaduParams) {
+ const fun = function (callback) {
+ quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
+ }
+
+ tasks.push(fun)
+ })
+
+ series(tasks, finalCallback)
+}
+
+function addEventToRemoteVideo (eventParams, transaction, callback) {
+ const options = {
+ videoId: eventParams.videoId,
+ type: eventParams.type,
+ transaction
+ }
+ createVideoEventRequest(options, callback)
+}
+
+function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) {
+ const tasks = []
+
+ eventsParams.forEach(function (eventParams) {
+ const fun = function (callback) {
+ addEventToRemoteVideo(eventParams, transaction, callback)
+ }
+
+ tasks.push(fun)
+ })
+
+ series(tasks, finalCallback)
}
function hasFriends (callback) {
})
}
-function getMyCertificate (callback) {
- fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
-}
-
function makeFriends (hosts, callback) {
const podsScore = {}
logger.info('Make friends!')
- getMyCertificate(function (err, cert) {
+ peertubeCrypto.getMyPublicCert(function (err, cert) {
if (err) {
logger.error('Cannot read public cert.')
return callback(err)
function quitFriends (callback) {
// Stop pool requests
- db.Request.deactivate()
+ requestScheduler.deactivate()
waterfall([
function flushRequests (callbackAsync) {
- db.Request.flush(callbackAsync)
+ requestScheduler.flush(err => callbackAsync(err))
+ },
+
+ function flushVideoQaduRequests (callbackAsync) {
+ requestVideoQaduScheduler.flush(err => callbackAsync(err))
},
function getPodsList (callbackAsync) {
}
], function (err) {
// Don't forget to re activate the scheduler, even if there was an error
- db.Request.activate()
+ requestScheduler.activate()
if (err) return callback(err)
})
}
-function removeVideoToFriends (videoParams) {
- createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
-}
-
function sendOwnedVideosToPod (podId) {
db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
if (err) {
return
}
- createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
+ const options = {
+ type: 'add',
+ endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
+ data: remoteVideo,
+ toIds: [ podId ]
+ }
+ createRequest(options)
})
})
})
}
+function getRequestScheduler () {
+ return requestScheduler
+}
+
+function getRequestVideoQaduScheduler () {
+ return requestVideoQaduScheduler
+}
+
+function getRequestVideoEventScheduler () {
+ return requestVideoEventScheduler
+}
+
// ---------------------------------------------------------------------------
module.exports = friends
// ---------------------------------------------------------------------------
function computeForeignPodsList (host, podsScore, callback) {
- getForeignPodsList(host, function (err, foreignPodsList) {
+ getForeignPodsList(host, function (err, res) {
if (err) return callback(err)
- if (!foreignPodsList) foreignPodsList = []
+ const foreignPodsList = res.data
// Let's give 1 point to the pod we ask the friends list
foreignPodsList.push({ host })
else podsScore[foreignPodHost] = 1
})
- callback()
+ return callback()
})
}
// Only add a pod if it exists in more than a half base pods
const podsList = []
const baseScore = hosts.length / 2
+
Object.keys(podsScore).forEach(function (podHost) {
// If the pod is not me and with a good score we add it
if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
function makeRequestsToWinningPods (cert, podsList, callback) {
// Stop pool requests
- db.Request.deactivate()
+ requestScheduler.deactivate()
// Flush pool requests
- db.Request.forceSend()
+ requestScheduler.forceSend()
eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
const params = {
method: 'POST',
json: {
host: constants.CONFIG.WEBSERVER.HOST,
+ email: constants.CONFIG.ADMIN.EMAIL,
publicKey: cert
}
}
}
if (res.statusCode === 200) {
- const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
+ const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
podObj.save().asCallback(function (err, podCreated) {
if (err) {
logger.error('Cannot add friend %s pod.', pod.host, { error: err })
}, function endRequests () {
// Final callback, we've ended all the requests
// Now we made new friends, we can re activate the pool of requests
- db.Request.activate()
+ requestScheduler.activate()
logger.debug('makeRequestsToWinningPods finished.')
return callback()
})
}
-// Wrapper that populate "to" argument with all our friends if it is not specified
-function createRequest (type, endpoint, data, to) {
- if (to) return _createRequest(type, endpoint, data, to)
+// Wrapper that populate "toIds" argument with all our friends if it is not specified
+// { type, endpoint, data, toIds, transaction }
+function createRequest (options, callback) {
+ if (!callback) callback = function () {}
+ if (options.toIds) return requestScheduler.createRequest(options, callback)
- // If the "to" pods is not specified, we send the request to all our friends
- db.Pod.listAllIds(function (err, podIds) {
+ // If the "toIds" pods is not specified, we send the request to all our friends
+ db.Pod.listAllIds(options.transaction, function (err, podIds) {
if (err) {
logger.error('Cannot get pod ids', { error: err })
return
}
- return _createRequest(type, endpoint, data, podIds)
+ const newOptions = Object.assign(options, { toIds: podIds })
+ return requestScheduler.createRequest(newOptions, callback)
})
}
-function _createRequest (type, endpoint, data, to) {
- const pods = []
-
- // If there are no destination pods abort
- if (to.length === 0) return
-
- to.forEach(function (toPod) {
- pods.push(db.Pod.build({ id: toPod }))
- })
+function createVideoQaduRequest (options, callback) {
+ if (!callback) callback = utils.createEmptyCallback()
- const createQuery = {
- endpoint,
- request: {
- type: type,
- data: data
- }
- }
+ requestVideoQaduScheduler.createRequest(options, callback)
+}
- // We run in transaction to keep coherency between Request and RequestToPod tables
- db.sequelize.transaction(function (t) {
- const dbRequestOptions = {
- transaction: t
- }
+function createVideoEventRequest (options, callback) {
+ if (!callback) callback = utils.createEmptyCallback()
- return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
- return request.setPods(pods, dbRequestOptions)
- })
- }).asCallback(function (err) {
- if (err) logger.error('Error in createRequest transaction.', { error: err })
- })
+ requestVideoEventScheduler.createRequest(options, callback)
}
function isMe (host) {