3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
14 const utils = require('../helpers/utils')
15 const RequestScheduler = require('./request-scheduler')
16 const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
17 const RequestVideoEventScheduler = require('./request-video-event-scheduler')
19 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
21 const requestScheduler = new RequestScheduler()
22 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
23 const requestVideoEventScheduler = new RequestVideoEventScheduler()
29 reportAbuseVideoToFriend,
30 quickAndDirtyUpdateVideoToFriends,
31 addEventToRemoteVideo,
38 getRequestVideoQaduScheduler,
39 getRequestVideoEventScheduler
42 function activate () {
43 requestScheduler.activate()
44 requestVideoQaduScheduler.activate()
45 requestVideoEventScheduler.activate()
48 function addVideoToFriends (videoData, transaction, callback) {
50 type: ENDPOINT_ACTIONS.ADD,
51 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
55 createRequest(options, callback)
58 function updateVideoToFriends (videoData, transaction, callback) {
60 type: ENDPOINT_ACTIONS.UPDATE,
61 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
65 createRequest(options, callback)
68 function removeVideoToFriends (videoParams) {
70 type: ENDPOINT_ACTIONS.REMOVE,
71 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
74 createRequest(options)
77 function reportAbuseVideoToFriend (reportData, video) {
79 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
80 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
82 toIds: [ video.Author.podId ]
84 createRequest(options)
87 function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
93 return createVideoQaduRequest(options, callback)
96 function addEventToRemoteVideo (videoId, type, transaction, callback) {
102 createVideoEventRequest(options, callback)
105 function hasFriends (callback) {
106 db.Pod.countAll(function (err, count) {
107 if (err) return callback(err)
109 const hasFriends = (count !== 0)
110 callback(null, hasFriends)
114 function makeFriends (hosts, callback) {
117 logger.info('Make friends!')
118 peertubeCrypto.getMyPublicCert(function (err, cert) {
120 logger.error('Cannot read public cert.')
124 eachSeries(hosts, function (host, callbackEach) {
125 computeForeignPodsList(host, podsScore, callbackEach)
127 if (err) return callback(err)
129 logger.debug('Pods scores computed.', { podsScore: podsScore })
130 const podsList = computeWinningPods(hosts, podsScore)
131 logger.debug('Pods that we keep.', { podsToKeep: podsList })
133 makeRequestsToWinningPods(cert, podsList, callback)
138 function quitFriends (callback) {
139 // Stop pool requests
140 requestScheduler.deactivate()
143 function flushRequests (callbackAsync) {
144 requestScheduler.flush(err => callbackAsync(err))
147 function flushVideoQaduRequests (callbackAsync) {
148 requestVideoQaduScheduler.flush(err => callbackAsync(err))
151 function getPodsList (callbackAsync) {
152 return db.Pod.list(callbackAsync)
155 function announceIQuitMyFriends (pods, callbackAsync) {
156 const requestParams = {
158 path: '/api/' + constants.API_VERSION + '/pods/remove',
162 // Announce we quit them
163 // We don't care if the request fails
164 // The other pod will exclude us automatically after a while
165 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
166 requestParams.toPod = pod
167 requests.makeSecureRequest(requestParams, callbackEach)
170 logger.error('Some errors while quitting friends.', { err: err })
171 // Don't stop the process
174 return callbackAsync(null, pods)
178 function removePodsFromDB (pods, callbackAsync) {
179 each(pods, function (pod, callbackEach) {
180 pod.destroy().asCallback(callbackEach)
184 // Don't forget to re activate the scheduler, even if there was an error
185 requestScheduler.activate()
187 if (err) return callback(err)
189 logger.info('Removed all remote videos.')
190 return callback(null)
194 function sendOwnedVideosToPod (podId) {
195 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
197 logger.error('Cannot get the list of videos we own.')
201 videosList.forEach(function (video) {
202 video.toAddRemoteJSON(function (err, remoteVideo) {
204 logger.error('Cannot convert video to remote.', { error: err })
205 // Don't break the process
211 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
215 createRequest(options)
221 function getRequestScheduler () {
222 return requestScheduler
225 function getRequestVideoQaduScheduler () {
226 return requestVideoQaduScheduler
229 function getRequestVideoEventScheduler () {
230 return requestVideoEventScheduler
233 // ---------------------------------------------------------------------------
235 module.exports = friends
237 // ---------------------------------------------------------------------------
239 function computeForeignPodsList (host, podsScore, callback) {
240 getForeignPodsList(host, function (err, res) {
241 if (err) return callback(err)
243 const foreignPodsList = res.data
245 // Let's give 1 point to the pod we ask the friends list
246 foreignPodsList.push({ host })
248 foreignPodsList.forEach(function (foreignPod) {
249 const foreignPodHost = foreignPod.host
251 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
252 else podsScore[foreignPodHost] = 1
259 function computeWinningPods (hosts, podsScore) {
260 // Build the list of pods to add
261 // Only add a pod if it exists in more than a half base pods
263 const baseScore = hosts.length / 2
265 Object.keys(podsScore).forEach(function (podHost) {
266 // If the pod is not me and with a good score we add it
267 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
268 podsList.push({ host: podHost })
275 function getForeignPodsList (host, callback) {
276 const path = '/api/' + constants.API_VERSION + '/pods'
278 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
279 if (err) return callback(err)
282 const json = JSON.parse(body)
283 return callback(null, json)
290 function makeRequestsToWinningPods (cert, podsList, callback) {
291 // Stop pool requests
292 requestScheduler.deactivate()
293 // Flush pool requests
294 requestScheduler.forceSend()
296 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
298 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
301 host: constants.CONFIG.WEBSERVER.HOST,
302 email: constants.CONFIG.ADMIN.EMAIL,
307 requests.makeRetryRequest(params, function (err, res, body) {
309 logger.error('Error with adding %s pod.', pod.host, { error: err })
310 // Don't break the process
311 return callbackEach()
314 if (res.statusCode === 200) {
315 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
316 podObj.save().asCallback(function (err, podCreated) {
318 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
319 return callbackEach()
322 // Add our videos to the request scheduler
323 sendOwnedVideosToPod(podCreated.id)
325 return callbackEach()
328 logger.error('Status not 200 for %s pod.', pod.host)
329 return callbackEach()
332 }, function endRequests () {
333 // Final callback, we've ended all the requests
334 // Now we made new friends, we can re activate the pool of requests
335 requestScheduler.activate()
337 logger.debug('makeRequestsToWinningPods finished.')
342 // Wrapper that populate "toIds" argument with all our friends if it is not specified
343 // { type, endpoint, data, toIds, transaction }
344 function createRequest (options, callback) {
345 if (!callback) callback = function () {}
346 if (options.toIds) return requestScheduler.createRequest(options, callback)
348 // If the "toIds" pods is not specified, we send the request to all our friends
349 db.Pod.listAllIds(options.transaction, function (err, podIds) {
351 logger.error('Cannot get pod ids', { error: err })
355 const newOptions = Object.assign(options, { toIds: podIds })
356 return requestScheduler.createRequest(newOptions, callback)
360 function createVideoQaduRequest (options, callback) {
361 if (!callback) callback = utils.createEmptyCallback()
363 requestVideoQaduScheduler.createRequest(options, callback)
366 function createVideoEventRequest (options, callback) {
367 if (!callback) callback = utils.createEmptyCallback()
369 requestVideoEventScheduler.createRequest(options, callback)
372 function isMe (host) {
373 return host === constants.CONFIG.WEBSERVER.HOST