3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const series = require('async/series')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const peertubeCrypto = require('../helpers/peertube-crypto')
14 const requests = require('../helpers/requests')
15 const utils = require('../helpers/utils')
16 const RequestScheduler = require('./request-scheduler')
17 const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
18 const RequestVideoEventScheduler = require('./request-video-event-scheduler')
20 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
22 const requestScheduler = new RequestScheduler()
23 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
24 const requestVideoEventScheduler = new RequestVideoEventScheduler()
30 reportAbuseVideoToFriend,
31 quickAndDirtyUpdateVideoToFriends,
32 quickAndDirtyUpdatesVideoToFriends,
33 addEventToRemoteVideo,
34 addEventsToRemoteVideo,
41 getRequestVideoQaduScheduler,
42 getRequestVideoEventScheduler
45 function activate () {
46 requestScheduler.activate()
47 requestVideoQaduScheduler.activate()
48 requestVideoEventScheduler.activate()
51 function addVideoToFriends (videoData, transaction, callback) {
53 type: ENDPOINT_ACTIONS.ADD,
54 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
58 createRequest(options, callback)
61 function updateVideoToFriends (videoData, transaction, callback) {
63 type: ENDPOINT_ACTIONS.UPDATE,
64 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
68 createRequest(options, callback)
71 function removeVideoToFriends (videoParams) {
73 type: ENDPOINT_ACTIONS.REMOVE,
74 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
77 createRequest(options)
80 function reportAbuseVideoToFriend (reportData, video) {
82 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
83 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
85 toIds: [ video.Author.podId ]
87 createRequest(options)
90 function quickAndDirtyUpdateVideoToFriends (qaduParams, transaction, callback) {
92 videoId: qaduParams.videoId,
93 type: qaduParams.type,
96 return createVideoQaduRequest(options, callback)
99 function quickAndDirtyUpdatesVideoToFriends (qadusParams, transaction, finalCallback) {
102 qadusParams.forEach(function (qaduParams) {
103 const fun = function (callback) {
104 quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
110 series(tasks, finalCallback)
113 function addEventToRemoteVideo (eventParams, transaction, callback) {
115 videoId: eventParams.videoId,
116 type: eventParams.type,
119 createVideoEventRequest(options, callback)
122 function addEventsToRemoteVideo (eventsParams, transaction, finalCallback) {
125 eventsParams.forEach(function (eventParams) {
126 const fun = function (callback) {
127 addEventToRemoteVideo(eventParams, transaction, callback)
133 series(tasks, finalCallback)
136 function hasFriends (callback) {
137 db.Pod.countAll(function (err, count) {
138 if (err) return callback(err)
140 const hasFriends = (count !== 0)
141 callback(null, hasFriends)
145 function makeFriends (hosts, callback) {
148 logger.info('Make friends!')
149 peertubeCrypto.getMyPublicCert(function (err, cert) {
151 logger.error('Cannot read public cert.')
155 eachSeries(hosts, function (host, callbackEach) {
156 computeForeignPodsList(host, podsScore, callbackEach)
158 if (err) return callback(err)
160 logger.debug('Pods scores computed.', { podsScore: podsScore })
161 const podsList = computeWinningPods(hosts, podsScore)
162 logger.debug('Pods that we keep.', { podsToKeep: podsList })
164 makeRequestsToWinningPods(cert, podsList, callback)
169 function quitFriends (callback) {
170 // Stop pool requests
171 requestScheduler.deactivate()
174 function flushRequests (callbackAsync) {
175 requestScheduler.flush(err => callbackAsync(err))
178 function flushVideoQaduRequests (callbackAsync) {
179 requestVideoQaduScheduler.flush(err => callbackAsync(err))
182 function getPodsList (callbackAsync) {
183 return db.Pod.list(callbackAsync)
186 function announceIQuitMyFriends (pods, callbackAsync) {
187 const requestParams = {
189 path: '/api/' + constants.API_VERSION + '/pods/remove',
193 // Announce we quit them
194 // We don't care if the request fails
195 // The other pod will exclude us automatically after a while
196 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
197 requestParams.toPod = pod
198 requests.makeSecureRequest(requestParams, callbackEach)
201 logger.error('Some errors while quitting friends.', { err: err })
202 // Don't stop the process
205 return callbackAsync(null, pods)
209 function removePodsFromDB (pods, callbackAsync) {
210 each(pods, function (pod, callbackEach) {
211 pod.destroy().asCallback(callbackEach)
215 // Don't forget to re activate the scheduler, even if there was an error
216 requestScheduler.activate()
218 if (err) return callback(err)
220 logger.info('Removed all remote videos.')
221 return callback(null)
225 function sendOwnedVideosToPod (podId) {
226 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
228 logger.error('Cannot get the list of videos we own.')
232 videosList.forEach(function (video) {
233 video.toAddRemoteJSON(function (err, remoteVideo) {
235 logger.error('Cannot convert video to remote.', { error: err })
236 // Don't break the process
242 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
246 createRequest(options)
252 function getRequestScheduler () {
253 return requestScheduler
256 function getRequestVideoQaduScheduler () {
257 return requestVideoQaduScheduler
260 function getRequestVideoEventScheduler () {
261 return requestVideoEventScheduler
264 // ---------------------------------------------------------------------------
266 module.exports = friends
268 // ---------------------------------------------------------------------------
270 function computeForeignPodsList (host, podsScore, callback) {
271 getForeignPodsList(host, function (err, res) {
272 if (err) return callback(err)
274 const foreignPodsList = res.data
276 // Let's give 1 point to the pod we ask the friends list
277 foreignPodsList.push({ host })
279 foreignPodsList.forEach(function (foreignPod) {
280 const foreignPodHost = foreignPod.host
282 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
283 else podsScore[foreignPodHost] = 1
290 function computeWinningPods (hosts, podsScore) {
291 // Build the list of pods to add
292 // Only add a pod if it exists in more than a half base pods
294 const baseScore = hosts.length / 2
296 Object.keys(podsScore).forEach(function (podHost) {
297 // If the pod is not me and with a good score we add it
298 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
299 podsList.push({ host: podHost })
306 function getForeignPodsList (host, callback) {
307 const path = '/api/' + constants.API_VERSION + '/pods'
309 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
310 if (err) return callback(err)
313 const json = JSON.parse(body)
314 return callback(null, json)
321 function makeRequestsToWinningPods (cert, podsList, callback) {
322 // Stop pool requests
323 requestScheduler.deactivate()
324 // Flush pool requests
325 requestScheduler.forceSend()
327 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
329 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
332 host: constants.CONFIG.WEBSERVER.HOST,
333 email: constants.CONFIG.ADMIN.EMAIL,
338 requests.makeRetryRequest(params, function (err, res, body) {
340 logger.error('Error with adding %s pod.', pod.host, { error: err })
341 // Don't break the process
342 return callbackEach()
345 if (res.statusCode === 200) {
346 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
347 podObj.save().asCallback(function (err, podCreated) {
349 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
350 return callbackEach()
353 // Add our videos to the request scheduler
354 sendOwnedVideosToPod(podCreated.id)
356 return callbackEach()
359 logger.error('Status not 200 for %s pod.', pod.host)
360 return callbackEach()
363 }, function endRequests () {
364 // Final callback, we've ended all the requests
365 // Now we made new friends, we can re activate the pool of requests
366 requestScheduler.activate()
368 logger.debug('makeRequestsToWinningPods finished.')
373 // Wrapper that populate "toIds" argument with all our friends if it is not specified
374 // { type, endpoint, data, toIds, transaction }
375 function createRequest (options, callback) {
376 if (!callback) callback = function () {}
377 if (options.toIds) return requestScheduler.createRequest(options, callback)
379 // If the "toIds" pods is not specified, we send the request to all our friends
380 db.Pod.listAllIds(options.transaction, function (err, podIds) {
382 logger.error('Cannot get pod ids', { error: err })
386 const newOptions = Object.assign(options, { toIds: podIds })
387 return requestScheduler.createRequest(newOptions, callback)
391 function createVideoQaduRequest (options, callback) {
392 if (!callback) callback = utils.createEmptyCallback()
394 requestVideoQaduScheduler.createRequest(options, callback)
397 function createVideoEventRequest (options, callback) {
398 if (!callback) callback = utils.createEmptyCallback()
400 requestVideoEventScheduler.createRequest(options, callback)
403 function isMe (host) {
404 return host === constants.CONFIG.WEBSERVER.HOST