3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
14 const RequestScheduler = require('./request-scheduler')
16 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
17 const requestScheduler = new RequestScheduler()
23 reportAbuseVideoToFriend,
31 function activate () {
32 requestScheduler.activate()
35 function addVideoToFriends (videoData, transaction, callback) {
37 type: ENDPOINT_ACTIONS.ADD,
38 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
42 createRequest(options, callback)
45 function updateVideoToFriends (videoData, transaction, callback) {
47 type: ENDPOINT_ACTIONS.UPDATE,
48 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
52 createRequest(options, callback)
55 function removeVideoToFriends (videoParams) {
57 type: ENDPOINT_ACTIONS.REMOVE,
58 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
61 createRequest(options)
64 function reportAbuseVideoToFriend (reportData, video) {
66 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
67 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
69 toIds: [ video.Author.podId ]
71 createRequest(options)
74 function hasFriends (callback) {
75 db.Pod.countAll(function (err, count) {
76 if (err) return callback(err)
78 const hasFriends = (count !== 0)
79 callback(null, hasFriends)
83 function makeFriends (hosts, callback) {
86 logger.info('Make friends!')
87 peertubeCrypto.getMyPublicCert(function (err, cert) {
89 logger.error('Cannot read public cert.')
93 eachSeries(hosts, function (host, callbackEach) {
94 computeForeignPodsList(host, podsScore, callbackEach)
96 if (err) return callback(err)
98 logger.debug('Pods scores computed.', { podsScore: podsScore })
99 const podsList = computeWinningPods(hosts, podsScore)
100 logger.debug('Pods that we keep.', { podsToKeep: podsList })
102 makeRequestsToWinningPods(cert, podsList, callback)
107 function quitFriends (callback) {
108 // Stop pool requests
109 requestScheduler.deactivate()
112 function flushRequests (callbackAsync) {
113 requestScheduler.flush(callbackAsync)
116 function getPodsList (callbackAsync) {
117 return db.Pod.list(callbackAsync)
120 function announceIQuitMyFriends (pods, callbackAsync) {
121 const requestParams = {
123 path: '/api/' + constants.API_VERSION + '/pods/remove',
127 // Announce we quit them
128 // We don't care if the request fails
129 // The other pod will exclude us automatically after a while
130 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
131 requestParams.toPod = pod
132 requests.makeSecureRequest(requestParams, callbackEach)
135 logger.error('Some errors while quitting friends.', { err: err })
136 // Don't stop the process
139 return callbackAsync(null, pods)
143 function removePodsFromDB (pods, callbackAsync) {
144 each(pods, function (pod, callbackEach) {
145 pod.destroy().asCallback(callbackEach)
149 // Don't forget to re activate the scheduler, even if there was an error
150 requestScheduler.activate()
152 if (err) return callback(err)
154 logger.info('Removed all remote videos.')
155 return callback(null)
159 function sendOwnedVideosToPod (podId) {
160 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
162 logger.error('Cannot get the list of videos we own.')
166 videosList.forEach(function (video) {
167 video.toAddRemoteJSON(function (err, remoteVideo) {
169 logger.error('Cannot convert video to remote.', { error: err })
170 // Don't break the process
176 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
180 createRequest(options)
186 // ---------------------------------------------------------------------------
188 module.exports = friends
190 // ---------------------------------------------------------------------------
192 function computeForeignPodsList (host, podsScore, callback) {
193 getForeignPodsList(host, function (err, res) {
194 if (err) return callback(err)
196 const foreignPodsList = res.data
198 // Let's give 1 point to the pod we ask the friends list
199 foreignPodsList.push({ host })
201 foreignPodsList.forEach(function (foreignPod) {
202 const foreignPodHost = foreignPod.host
204 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
205 else podsScore[foreignPodHost] = 1
212 function computeWinningPods (hosts, podsScore) {
213 // Build the list of pods to add
214 // Only add a pod if it exists in more than a half base pods
216 const baseScore = hosts.length / 2
218 Object.keys(podsScore).forEach(function (podHost) {
219 // If the pod is not me and with a good score we add it
220 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
221 podsList.push({ host: podHost })
228 function getForeignPodsList (host, callback) {
229 const path = '/api/' + constants.API_VERSION + '/pods'
231 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
232 if (err) return callback(err)
235 const json = JSON.parse(body)
236 return callback(null, json)
243 function makeRequestsToWinningPods (cert, podsList, callback) {
244 // Stop pool requests
245 requestScheduler.deactivate()
246 // Flush pool requests
247 requestScheduler.forceSend()
249 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
251 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
254 host: constants.CONFIG.WEBSERVER.HOST,
255 email: constants.CONFIG.ADMIN.EMAIL,
260 requests.makeRetryRequest(params, function (err, res, body) {
262 logger.error('Error with adding %s pod.', pod.host, { error: err })
263 // Don't break the process
264 return callbackEach()
267 if (res.statusCode === 200) {
268 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
269 podObj.save().asCallback(function (err, podCreated) {
271 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
272 return callbackEach()
275 // Add our videos to the request scheduler
276 sendOwnedVideosToPod(podCreated.id)
278 return callbackEach()
281 logger.error('Status not 200 for %s pod.', pod.host)
282 return callbackEach()
285 }, function endRequests () {
286 // Final callback, we've ended all the requests
287 // Now we made new friends, we can re activate the pool of requests
288 requestScheduler.activate()
290 logger.debug('makeRequestsToWinningPods finished.')
295 // Wrapper that populate "toIds" argument with all our friends if it is not specified
296 // { type, endpoint, data, toIds, transaction }
297 function createRequest (options, callback) {
298 if (!callback) callback = function () {}
299 if (options.toIds) return requestScheduler.createRequest(options, callback)
301 // If the "toIds" pods is not specified, we send the request to all our friends
302 db.Pod.listAllIds(options.transaction, function (err, podIds) {
304 logger.error('Cannot get pod ids', { error: err })
308 const newOptions = Object.assign(options, { toIds: podIds })
309 return requestScheduler.createRequest(newOptions, callback)
313 function isMe (host) {
314 return host === constants.CONFIG.WEBSERVER.HOST