3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
14 const RequestScheduler = require('./request-scheduler')
15 const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
17 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
19 const requestScheduler = new RequestScheduler()
20 const requestSchedulerVideoQadu = new RequestVideoQaduScheduler()
26 reportAbuseVideoToFriend,
27 quickAndDirtyUpdateVideoToFriends,
35 function activate () {
36 requestScheduler.activate()
37 requestSchedulerVideoQadu.activate()
40 function addVideoToFriends (videoData, transaction, callback) {
42 type: ENDPOINT_ACTIONS.ADD,
43 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
47 createRequest(options, callback)
50 function updateVideoToFriends (videoData, transaction, callback) {
52 type: ENDPOINT_ACTIONS.UPDATE,
53 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
57 createRequest(options, callback)
60 function removeVideoToFriends (videoParams) {
62 type: ENDPOINT_ACTIONS.REMOVE,
63 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
66 createRequest(options)
69 function reportAbuseVideoToFriend (reportData, video) {
71 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
72 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
74 toIds: [ video.Author.podId ]
76 createRequest(options)
79 function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
85 return createVideoQaduRequest(options, callback)
88 function hasFriends (callback) {
89 db.Pod.countAll(function (err, count) {
90 if (err) return callback(err)
92 const hasFriends = (count !== 0)
93 callback(null, hasFriends)
97 function makeFriends (hosts, callback) {
100 logger.info('Make friends!')
101 peertubeCrypto.getMyPublicCert(function (err, cert) {
103 logger.error('Cannot read public cert.')
107 eachSeries(hosts, function (host, callbackEach) {
108 computeForeignPodsList(host, podsScore, callbackEach)
110 if (err) return callback(err)
112 logger.debug('Pods scores computed.', { podsScore: podsScore })
113 const podsList = computeWinningPods(hosts, podsScore)
114 logger.debug('Pods that we keep.', { podsToKeep: podsList })
116 makeRequestsToWinningPods(cert, podsList, callback)
121 function quitFriends (callback) {
122 // Stop pool requests
123 requestScheduler.deactivate()
126 function flushRequests (callbackAsync) {
127 requestScheduler.flush(err => callbackAsync(err))
130 function flushVideoQaduRequests (callbackAsync) {
131 requestSchedulerVideoQadu.flush(err => callbackAsync(err))
134 function getPodsList (callbackAsync) {
135 return db.Pod.list(callbackAsync)
138 function announceIQuitMyFriends (pods, callbackAsync) {
139 const requestParams = {
141 path: '/api/' + constants.API_VERSION + '/pods/remove',
145 // Announce we quit them
146 // We don't care if the request fails
147 // The other pod will exclude us automatically after a while
148 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
149 requestParams.toPod = pod
150 requests.makeSecureRequest(requestParams, callbackEach)
153 logger.error('Some errors while quitting friends.', { err: err })
154 // Don't stop the process
157 return callbackAsync(null, pods)
161 function removePodsFromDB (pods, callbackAsync) {
162 each(pods, function (pod, callbackEach) {
163 pod.destroy().asCallback(callbackEach)
167 // Don't forget to re activate the scheduler, even if there was an error
168 requestScheduler.activate()
170 if (err) return callback(err)
172 logger.info('Removed all remote videos.')
173 return callback(null)
177 function sendOwnedVideosToPod (podId) {
178 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
180 logger.error('Cannot get the list of videos we own.')
184 videosList.forEach(function (video) {
185 video.toAddRemoteJSON(function (err, remoteVideo) {
187 logger.error('Cannot convert video to remote.', { error: err })
188 // Don't break the process
194 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
198 createRequest(options)
204 // ---------------------------------------------------------------------------
206 module.exports = friends
208 // ---------------------------------------------------------------------------
210 function computeForeignPodsList (host, podsScore, callback) {
211 getForeignPodsList(host, function (err, res) {
212 if (err) return callback(err)
214 const foreignPodsList = res.data
216 // Let's give 1 point to the pod we ask the friends list
217 foreignPodsList.push({ host })
219 foreignPodsList.forEach(function (foreignPod) {
220 const foreignPodHost = foreignPod.host
222 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
223 else podsScore[foreignPodHost] = 1
230 function computeWinningPods (hosts, podsScore) {
231 // Build the list of pods to add
232 // Only add a pod if it exists in more than a half base pods
234 const baseScore = hosts.length / 2
236 Object.keys(podsScore).forEach(function (podHost) {
237 // If the pod is not me and with a good score we add it
238 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
239 podsList.push({ host: podHost })
246 function getForeignPodsList (host, callback) {
247 const path = '/api/' + constants.API_VERSION + '/pods'
249 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
250 if (err) return callback(err)
253 const json = JSON.parse(body)
254 return callback(null, json)
261 function makeRequestsToWinningPods (cert, podsList, callback) {
262 // Stop pool requests
263 requestScheduler.deactivate()
264 // Flush pool requests
265 requestScheduler.forceSend()
267 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
269 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
272 host: constants.CONFIG.WEBSERVER.HOST,
273 email: constants.CONFIG.ADMIN.EMAIL,
278 requests.makeRetryRequest(params, function (err, res, body) {
280 logger.error('Error with adding %s pod.', pod.host, { error: err })
281 // Don't break the process
282 return callbackEach()
285 if (res.statusCode === 200) {
286 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
287 podObj.save().asCallback(function (err, podCreated) {
289 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
290 return callbackEach()
293 // Add our videos to the request scheduler
294 sendOwnedVideosToPod(podCreated.id)
296 return callbackEach()
299 logger.error('Status not 200 for %s pod.', pod.host)
300 return callbackEach()
303 }, function endRequests () {
304 // Final callback, we've ended all the requests
305 // Now we made new friends, we can re activate the pool of requests
306 requestScheduler.activate()
308 logger.debug('makeRequestsToWinningPods finished.')
313 // Wrapper that populate "toIds" argument with all our friends if it is not specified
314 // { type, endpoint, data, toIds, transaction }
315 function createRequest (options, callback) {
316 if (!callback) callback = function () {}
317 if (options.toIds) return requestScheduler.createRequest(options, callback)
319 // If the "toIds" pods is not specified, we send the request to all our friends
320 db.Pod.listAllIds(options.transaction, function (err, podIds) {
322 logger.error('Cannot get pod ids', { error: err })
326 const newOptions = Object.assign(options, { toIds: podIds })
327 return requestScheduler.createRequest(newOptions, callback)
331 function createVideoQaduRequest (options, callback) {
332 if (!callback) callback = function () {}
334 requestSchedulerVideoQadu.createRequest(options, callback)
337 function isMe (host) {
338 return host === constants.CONFIG.WEBSERVER.HOST