From: Chocobozzz Date: Mon, 2 May 2016 15:25:05 +0000 (+0200) Subject: Rename pool requests --> requests scheduler X-Git-Tag: v0.0.1-alpha~963 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=e3647ae226d19ed1401d4c617d35a68de1c4657a;p=oweals%2Fpeertube.git Rename pool requests --> requests scheduler --- diff --git a/server.js b/server.js index 9772ce92b..204cc146f 100644 --- a/server.js +++ b/server.js @@ -28,7 +28,7 @@ const customValidators = require('./server/helpers/customValidators') const database = require('./server/initializers/database') const installer = require('./server/initializers/installer') const logger = require('./server/helpers/logger') -const poolRequests = require('./server/lib/poolRequests') +const poolRequests = require('./server/lib/requestsScheduler') const routes = require('./server/controllers') const utils = require('./server/helpers/utils') const videos = require('./server/lib/videos') diff --git a/server/lib/friends.js b/server/lib/friends.js index c3c231604..9a2c5c06e 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -9,7 +9,7 @@ const constants = require('../initializers/constants') const logger = require('../helpers/logger') const peertubeCrypto = require('../helpers/peertubeCrypto') const Pods = require('../models/pods') -const poolRequests = require('../lib/poolRequests') +const requestsScheduler = require('../lib/requestsScheduler') const requests = require('../helpers/requests') const Videos = require('../models/videos') @@ -30,7 +30,7 @@ function addVideoToFriends (video) { const id = video.name + video.magnetUri // ensure namePath is null video.namePath = null - poolRequests.addRequest(id, 'add', video) + requestsScheduler.addRequest(id, 'add', video) } function hasFriends (callback) { @@ -70,9 +70,9 @@ function makeFriends (callback) { function quitFriends (callback) { // Stop pool requests - poolRequests.deactivate() + requestsScheduler.deactivate() // Flush pool requests - poolRequests.forceSend() + requestsScheduler.forceSend() Pods.list(function (err, pods) { if (err) return callback(err) @@ -90,7 +90,7 @@ function quitFriends (callback) { // Announce we quit them requests.makeMultipleRetryRequest(request, pods, function () { Pods.removeAll(function (err) { - poolRequests.activate() + requestsScheduler.activate() if (err) return callback(err) @@ -110,7 +110,7 @@ function quitFriends (callback) { function removeVideoToFriends (video) { // To avoid duplicates const id = video.name + video.magnetUri - poolRequests.addRequest(id, 'remove', video) + requestsScheduler.addRequest(id, 'remove', video) } // --------------------------------------------------------------------------- @@ -164,9 +164,9 @@ function getForeignPodsList (url, callback) { function makeRequestsToWinningPods (cert, pods_list, callback) { // Stop pool requests - poolRequests.deactivate() + requestsScheduler.deactivate() // Flush pool requests - poolRequests.forceSend() + requestsScheduler.forceSend() // Get the list of our videos to send to our new friends Videos.listOwned(function (err, videos_list) { @@ -213,7 +213,7 @@ function makeRequestsToWinningPods (cert, pods_list, callback) { function endRequests (err) { // Now we made new friends, we can re activate the pool of requests - poolRequests.activate() + requestsScheduler.activate() if (err) { logger.error('There was some errors when we wanted to make friends.') diff --git a/server/lib/poolRequests.js b/server/lib/poolRequests.js deleted file mode 100644 index 49e61700c..000000000 --- a/server/lib/poolRequests.js +++ /dev/null @@ -1,221 +0,0 @@ -'use strict' - -const async = require('async') -const map = require('lodash/map') - -const constants = require('../initializers/constants') -const logger = require('../helpers/logger') -const Pods = require('../models/pods') -const PoolRequests = require('../models/poolRequests') -const requests = require('../helpers/requests') -const Videos = require('../models/videos') - -let timer = null - -const poolRequests = { - activate: activate, - addRequest: addRequest, - deactivate: deactivate, - forceSend: forceSend -} - -function activate () { - logger.info('Pool requests activated.') - timer = setInterval(makePoolRequests, constants.INTERVAL) -} - -function addRequest (id, type, request) { - logger.debug('Add request to the pool requests.', { id: id, type: type, request: request }) - - PoolRequests.findById(id, function (err, entity) { - if (err) { - logger.error('Cannot find one pool request.', { error: err }) - return // Abort - } - - if (entity) { - if (entity.type === type) { - logger.error('Cannot insert two same requests.') - return // Abort - } - - // Remove the request of the other type - PoolRequests.removeRequestById(id, function (err) { - if (err) { - logger.error('Cannot remove a pool request.', { error: err }) - return // Abort - } - }) - } else { - PoolRequests.create(id, type, request, function (err) { - if (err) logger.error('Cannot create a pool request.', { error: err }) - return // Abort - }) - } - }) -} - -function deactivate () { - logger.info('Pool requests deactivated.') - clearInterval(timer) -} - -function forceSend () { - logger.info('Force pool requests sending.') - makePoolRequests() -} - -// --------------------------------------------------------------------------- - -module.exports = poolRequests - -// --------------------------------------------------------------------------- - -function makePoolRequest (type, requests_to_make, callback) { - if (!callback) callback = function () {} - - Pods.list(function (err, pods) { - if (err) return callback(err) - - const params = { - encrypt: true, - sign: true, - method: 'POST', - path: null, - data: requests_to_make - } - - if (type === 'add') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' - } else if (type === 'remove') { - params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' - } else { - return callback(new Error('Unkown pool request type.')) - } - - const bad_pods = [] - const good_pods = [] - - requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) - - function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { - if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { - bad_pods.push(pod._id) - logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) - } else { - good_pods.push(pod._id) - } - - return callback_each_pod_finished() - } - - function callbackAllPodsFinished (err) { - if (err) return callback(err) - - updatePodsScore(good_pods, bad_pods) - callback(null) - } - }) -} - -function makePoolRequests () { - logger.info('Making pool requests to friends.') - - PoolRequests.list(function (err, pool_requests) { - if (err) { - logger.error('Cannot get the list of pool requests.', { err: err }) - return // Abort - } - - if (pool_requests.length === 0) return - - const requests_to_make = { - add: { - ids: [], - requests: [] - }, - remove: { - ids: [], - requests: [] - } - } - - async.each(pool_requests, function (pool_request, callback_each) { - if (pool_request.type === 'add') { - requests_to_make.add.requests.push(pool_request.request) - requests_to_make.add.ids.push(pool_request._id) - } else if (pool_request.type === 'remove') { - requests_to_make.remove.requests.push(pool_request.request) - requests_to_make.remove.ids.push(pool_request._id) - } else { - logger.error('Unkown pool request type.', { request_type: pool_request.type }) - return // abort - } - - callback_each() - }, function () { - // Send the add requests - if (requests_to_make.add.requests.length !== 0) { - makePoolRequest('add', requests_to_make.add.requests, function (err) { - if (err) logger.error('Errors when sent add pool requests.', { error: err }) - - PoolRequests.removeRequests(requests_to_make.add.ids) - }) - } - - // Send the remove requests - if (requests_to_make.remove.requests.length !== 0) { - makePoolRequest('remove', requests_to_make.remove.requests, function (err) { - if (err) logger.error('Errors when sent remove pool requests.', { error: err }) - - PoolRequests.removeRequests(requests_to_make.remove.ids) - }) - } - }) - }) -} - -function removeBadPods () { - Pods.findBadPods(function (err, pods) { - if (err) { - logger.error('Cannot find bad pods.', { error: err }) - return // abort - } - - if (pods.length === 0) return - - const urls = map(pods, 'url') - const ids = map(pods, '_id') - - Videos.removeAllRemotesOf(urls, function (err, r) { - if (err) { - logger.error('Cannot remove videos from a pod that we removing.', { error: err }) - } else { - const videos_removed = r.result.n - logger.info('Removed %d videos.', videos_removed) - } - - Pods.removeAllByIds(ids, function (err, r) { - if (err) { - logger.error('Cannot remove bad pods.', { error: err }) - } else { - const pods_removed = r.result.n - logger.info('Removed %d pods.', pods_removed) - } - }) - }) - }) -} - -function updatePodsScore (good_pods, bad_pods) { - logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) - - Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { - if (err) logger.error('Cannot increment scores of good pods.') - }) - - Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { - if (err) logger.error('Cannot increment scores of bad pods.') - removeBadPods() - }) -} diff --git a/server/lib/requestsScheduler.js b/server/lib/requestsScheduler.js new file mode 100644 index 000000000..2c5474e51 --- /dev/null +++ b/server/lib/requestsScheduler.js @@ -0,0 +1,221 @@ +'use strict' + +const async = require('async') +const map = require('lodash/map') + +const constants = require('../initializers/constants') +const logger = require('../helpers/logger') +const Pods = require('../models/pods') +const Requests = require('../models/requests') +const requests = require('../helpers/requests') +const Videos = require('../models/videos') + +let timer = null + +const requestsScheduler = { + activate: activate, + addRequest: addRequest, + deactivate: deactivate, + forceSend: forceSend +} + +function activate () { + logger.info('Requests scheduler activated.') + timer = setInterval(makeRequests, constants.INTERVAL) +} + +function addRequest (id, type, request) { + logger.debug('Add request to the requests scheduler.', { id: id, type: type, request: request }) + + Requests.findById(id, function (err, entity) { + if (err) { + logger.error('Cannot find one request.', { error: err }) + return // Abort + } + + if (entity) { + if (entity.type === type) { + logger.error('Cannot insert two same requests.') + return // Abort + } + + // Remove the request of the other type + Requests.removeRequestById(id, function (err) { + if (err) { + logger.error('Cannot remove a request.', { error: err }) + return // Abort + } + }) + } else { + Requests.create(id, type, request, function (err) { + if (err) logger.error('Cannot create a request.', { error: err }) + return // Abort + }) + } + }) +} + +function deactivate () { + logger.info('Requests scheduler deactivated.') + clearInterval(timer) +} + +function forceSend () { + logger.info('Force requests scheduler sending.') + makeRequests() +} + +// --------------------------------------------------------------------------- + +module.exports = requestsScheduler + +// --------------------------------------------------------------------------- + +function makeRequest (type, requests_to_make, callback) { + if (!callback) callback = function () {} + + Pods.list(function (err, pods) { + if (err) return callback(err) + + const params = { + encrypt: true, + sign: true, + method: 'POST', + path: null, + data: requests_to_make + } + + if (type === 'add') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/add' + } else if (type === 'remove') { + params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove' + } else { + return callback(new Error('Unkown pool request type.')) + } + + const bad_pods = [] + const good_pods = [] + + requests.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished) + + function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) { + if (err || (response.statusCode !== 200 && response.statusCode !== 204)) { + bad_pods.push(pod._id) + logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') }) + } else { + good_pods.push(pod._id) + } + + return callback_each_pod_finished() + } + + function callbackAllPodsFinished (err) { + if (err) return callback(err) + + updatePodsScore(good_pods, bad_pods) + callback(null) + } + }) +} + +function makeRequests () { + logger.info('Making requests to friends.') + + Requests.list(function (err, requests) { + if (err) { + logger.error('Cannot get the list of requests.', { err: err }) + return // Abort + } + + if (requests.length === 0) return + + const requests_to_make = { + add: { + ids: [], + requests: [] + }, + remove: { + ids: [], + requests: [] + } + } + + async.each(requests, function (pool_request, callback_each) { + if (pool_request.type === 'add') { + requests_to_make.add.requests.push(pool_request.request) + requests_to_make.add.ids.push(pool_request._id) + } else if (pool_request.type === 'remove') { + requests_to_make.remove.requests.push(pool_request.request) + requests_to_make.remove.ids.push(pool_request._id) + } else { + logger.error('Unkown request type.', { request_type: pool_request.type }) + return // abort + } + + callback_each() + }, function () { + // Send the add requests + if (requests_to_make.add.requests.length !== 0) { + makeRequest('add', requests_to_make.add.requests, function (err) { + if (err) logger.error('Errors when sent add requests.', { error: err }) + + Requests.removeRequests(requests_to_make.add.ids) + }) + } + + // Send the remove requests + if (requests_to_make.remove.requests.length !== 0) { + makeRequest('remove', requests_to_make.remove.requests, function (err) { + if (err) logger.error('Errors when sent remove pool requests.', { error: err }) + + Requests.removeRequests(requests_to_make.remove.ids) + }) + } + }) + }) +} + +function removeBadPods () { + Pods.findBadPods(function (err, pods) { + if (err) { + logger.error('Cannot find bad pods.', { error: err }) + return // abort + } + + if (pods.length === 0) return + + const urls = map(pods, 'url') + const ids = map(pods, '_id') + + Videos.removeAllRemotesOf(urls, function (err, r) { + if (err) { + logger.error('Cannot remove videos from a pod that we removing.', { error: err }) + } else { + const videos_removed = r.result.n + logger.info('Removed %d videos.', videos_removed) + } + + Pods.removeAllByIds(ids, function (err, r) { + if (err) { + logger.error('Cannot remove bad pods.', { error: err }) + } else { + const pods_removed = r.result.n + logger.info('Removed %d pods.', pods_removed) + } + }) + }) + }) +} + +function updatePodsScore (good_pods, bad_pods) { + logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length) + + Pods.incrementScores(good_pods, constants.PODS_SCORE.BONUS, function (err) { + if (err) logger.error('Cannot increment scores of good pods.') + }) + + Pods.incrementScores(bad_pods, constants.PODS_SCORE.MALUS, function (err) { + if (err) logger.error('Cannot increment scores of bad pods.') + removeBadPods() + }) +} diff --git a/server/models/poolRequests.js b/server/models/poolRequests.js deleted file mode 100644 index 28093a94c..000000000 --- a/server/models/poolRequests.js +++ /dev/null @@ -1,55 +0,0 @@ -'use strict' - -const mongoose = require('mongoose') - -const logger = require('../helpers/logger') - -// --------------------------------------------------------------------------- - -const poolRequestsSchema = mongoose.Schema({ - type: String, - id: String, // Special id to find duplicates (video created we want to remove...) - request: mongoose.Schema.Types.Mixed -}) -const PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema) - -// --------------------------------------------------------------------------- - -const PoolRequests = { - create: create, - findById: findById, - list: list, - removeRequestById: removeRequestById, - removeRequests: removeRequests -} - -function create (id, type, request, callback) { - PoolRequestsDB.create({ id: id, type: type, request: request }, callback) -} - -function findById (id, callback) { - PoolRequestsDB.findOne({ id: id }, callback) -} - -function list (callback) { - PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback) -} - -function removeRequestById (id, callback) { - PoolRequestsDB.remove({ id: id }, callback) -} - -function removeRequests (ids) { - PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) { - if (err) { - logger.error('Cannot remove requests from the pool requests database.', { error: err }) - return // Abort - } - - logger.info('Pool requests flushed.') - }) -} - -// --------------------------------------------------------------------------- - -module.exports = PoolRequests diff --git a/server/models/requests.js b/server/models/requests.js new file mode 100644 index 000000000..2152ae0e9 --- /dev/null +++ b/server/models/requests.js @@ -0,0 +1,55 @@ +'use strict' + +const mongoose = require('mongoose') + +const logger = require('../helpers/logger') + +// --------------------------------------------------------------------------- + +const requestsSchema = mongoose.Schema({ + type: String, + id: String, // Special id to find duplicates (video created we want to remove...) + request: mongoose.Schema.Types.Mixed +}) +const RequestsDB = mongoose.model('requests', requestsSchema) + +// --------------------------------------------------------------------------- + +const Requests = { + create: create, + findById: findById, + list: list, + removeRequestById: removeRequestById, + removeRequests: removeRequests +} + +function create (id, type, request, callback) { + RequestsDB.create({ id: id, type: type, request: request }, callback) +} + +function findById (id, callback) { + RequestsDB.findOne({ id: id }, callback) +} + +function list (callback) { + RequestsDB.find({}, { _id: 1, type: 1, request: 1 }, callback) +} + +function removeRequestById (id, callback) { + RequestsDB.remove({ id: id }, callback) +} + +function removeRequests (ids) { + RequestsDB.remove({ _id: { $in: ids } }, function (err) { + if (err) { + logger.error('Cannot remove requests from the requests database.', { error: err }) + return // Abort + } + + logger.info('Pool requests flushed.') + }) +} + +// --------------------------------------------------------------------------- + +module.exports = Requests