3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
14 const utils = require('../helpers/utils')
15 const RequestScheduler = require('./request-scheduler')
16 const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
17 const RequestVideoEventScheduler = require('./request-video-event-scheduler')
19 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
21 const requestScheduler = new RequestScheduler()
22 const requestSchedulerVideoQadu = new RequestVideoQaduScheduler()
23 const requestSchedulerVideoEvent = new RequestVideoEventScheduler()
29 reportAbuseVideoToFriend,
30 quickAndDirtyUpdateVideoToFriends,
31 addEventToRemoteVideo,
39 function activate () {
40 requestScheduler.activate()
41 requestSchedulerVideoQadu.activate()
42 requestSchedulerVideoEvent.activate()
45 function addVideoToFriends (videoData, transaction, callback) {
47 type: ENDPOINT_ACTIONS.ADD,
48 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
52 createRequest(options, callback)
55 function updateVideoToFriends (videoData, transaction, callback) {
57 type: ENDPOINT_ACTIONS.UPDATE,
58 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
62 createRequest(options, callback)
65 function removeVideoToFriends (videoParams) {
67 type: ENDPOINT_ACTIONS.REMOVE,
68 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
71 createRequest(options)
74 function reportAbuseVideoToFriend (reportData, video) {
76 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
77 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
79 toIds: [ video.Author.podId ]
81 createRequest(options)
84 function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
90 return createVideoQaduRequest(options, callback)
93 function addEventToRemoteVideo (videoId, type, transaction, callback) {
99 createVideoEventRequest(options, callback)
102 function hasFriends (callback) {
103 db.Pod.countAll(function (err, count) {
104 if (err) return callback(err)
106 const hasFriends = (count !== 0)
107 callback(null, hasFriends)
111 function makeFriends (hosts, callback) {
114 logger.info('Make friends!')
115 peertubeCrypto.getMyPublicCert(function (err, cert) {
117 logger.error('Cannot read public cert.')
121 eachSeries(hosts, function (host, callbackEach) {
122 computeForeignPodsList(host, podsScore, callbackEach)
124 if (err) return callback(err)
126 logger.debug('Pods scores computed.', { podsScore: podsScore })
127 const podsList = computeWinningPods(hosts, podsScore)
128 logger.debug('Pods that we keep.', { podsToKeep: podsList })
130 makeRequestsToWinningPods(cert, podsList, callback)
135 function quitFriends (callback) {
136 // Stop pool requests
137 requestScheduler.deactivate()
140 function flushRequests (callbackAsync) {
141 requestScheduler.flush(err => callbackAsync(err))
144 function flushVideoQaduRequests (callbackAsync) {
145 requestSchedulerVideoQadu.flush(err => callbackAsync(err))
148 function getPodsList (callbackAsync) {
149 return db.Pod.list(callbackAsync)
152 function announceIQuitMyFriends (pods, callbackAsync) {
153 const requestParams = {
155 path: '/api/' + constants.API_VERSION + '/pods/remove',
159 // Announce we quit them
160 // We don't care if the request fails
161 // The other pod will exclude us automatically after a while
162 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
163 requestParams.toPod = pod
164 requests.makeSecureRequest(requestParams, callbackEach)
167 logger.error('Some errors while quitting friends.', { err: err })
168 // Don't stop the process
171 return callbackAsync(null, pods)
175 function removePodsFromDB (pods, callbackAsync) {
176 each(pods, function (pod, callbackEach) {
177 pod.destroy().asCallback(callbackEach)
181 // Don't forget to re activate the scheduler, even if there was an error
182 requestScheduler.activate()
184 if (err) return callback(err)
186 logger.info('Removed all remote videos.')
187 return callback(null)
191 function sendOwnedVideosToPod (podId) {
192 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
194 logger.error('Cannot get the list of videos we own.')
198 videosList.forEach(function (video) {
199 video.toAddRemoteJSON(function (err, remoteVideo) {
201 logger.error('Cannot convert video to remote.', { error: err })
202 // Don't break the process
208 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
212 createRequest(options)
218 // ---------------------------------------------------------------------------
220 module.exports = friends
222 // ---------------------------------------------------------------------------
224 function computeForeignPodsList (host, podsScore, callback) {
225 getForeignPodsList(host, function (err, res) {
226 if (err) return callback(err)
228 const foreignPodsList = res.data
230 // Let's give 1 point to the pod we ask the friends list
231 foreignPodsList.push({ host })
233 foreignPodsList.forEach(function (foreignPod) {
234 const foreignPodHost = foreignPod.host
236 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
237 else podsScore[foreignPodHost] = 1
244 function computeWinningPods (hosts, podsScore) {
245 // Build the list of pods to add
246 // Only add a pod if it exists in more than a half base pods
248 const baseScore = hosts.length / 2
250 Object.keys(podsScore).forEach(function (podHost) {
251 // If the pod is not me and with a good score we add it
252 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
253 podsList.push({ host: podHost })
260 function getForeignPodsList (host, callback) {
261 const path = '/api/' + constants.API_VERSION + '/pods'
263 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
264 if (err) return callback(err)
267 const json = JSON.parse(body)
268 return callback(null, json)
275 function makeRequestsToWinningPods (cert, podsList, callback) {
276 // Stop pool requests
277 requestScheduler.deactivate()
278 // Flush pool requests
279 requestScheduler.forceSend()
281 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
283 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
286 host: constants.CONFIG.WEBSERVER.HOST,
287 email: constants.CONFIG.ADMIN.EMAIL,
292 requests.makeRetryRequest(params, function (err, res, body) {
294 logger.error('Error with adding %s pod.', pod.host, { error: err })
295 // Don't break the process
296 return callbackEach()
299 if (res.statusCode === 200) {
300 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
301 podObj.save().asCallback(function (err, podCreated) {
303 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
304 return callbackEach()
307 // Add our videos to the request scheduler
308 sendOwnedVideosToPod(podCreated.id)
310 return callbackEach()
313 logger.error('Status not 200 for %s pod.', pod.host)
314 return callbackEach()
317 }, function endRequests () {
318 // Final callback, we've ended all the requests
319 // Now we made new friends, we can re activate the pool of requests
320 requestScheduler.activate()
322 logger.debug('makeRequestsToWinningPods finished.')
327 // Wrapper that populate "toIds" argument with all our friends if it is not specified
328 // { type, endpoint, data, toIds, transaction }
329 function createRequest (options, callback) {
330 if (!callback) callback = function () {}
331 if (options.toIds) return requestScheduler.createRequest(options, callback)
333 // If the "toIds" pods is not specified, we send the request to all our friends
334 db.Pod.listAllIds(options.transaction, function (err, podIds) {
336 logger.error('Cannot get pod ids', { error: err })
340 const newOptions = Object.assign(options, { toIds: podIds })
341 return requestScheduler.createRequest(newOptions, callback)
345 function createVideoQaduRequest (options, callback) {
346 if (!callback) callback = utils.createEmptyCallback()
348 requestSchedulerVideoQadu.createRequest(options, callback)
351 function createVideoEventRequest (options, callback) {
352 if (!callback) callback = utils.createEmptyCallback()
354 requestSchedulerVideoEvent.createRequest(options, callback)
357 function isMe (host) {
358 return host === constants.CONFIG.WEBSERVER.HOST