3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
15 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
20 reportAbuseVideoToFriend,
28 function addVideoToFriends (videoData, transaction, callback) {
30 type: ENDPOINT_ACTIONS.ADD,
31 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
35 createRequest(options, callback)
38 function updateVideoToFriends (videoData, transaction, callback) {
40 type: ENDPOINT_ACTIONS.UPDATE,
41 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
45 createRequest(options, callback)
48 function removeVideoToFriends (videoParams) {
50 type: ENDPOINT_ACTIONS.REMOVE,
51 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
54 createRequest(options)
57 function reportAbuseVideoToFriend (reportData, video) {
59 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
60 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
62 toIds: [ video.Author.podId ]
64 createRequest(options)
67 function hasFriends (callback) {
68 db.Pod.countAll(function (err, count) {
69 if (err) return callback(err)
71 const hasFriends = (count !== 0)
72 callback(null, hasFriends)
76 function makeFriends (hosts, callback) {
79 logger.info('Make friends!')
80 peertubeCrypto.getMyPublicCert(function (err, cert) {
82 logger.error('Cannot read public cert.')
86 eachSeries(hosts, function (host, callbackEach) {
87 computeForeignPodsList(host, podsScore, callbackEach)
89 if (err) return callback(err)
91 logger.debug('Pods scores computed.', { podsScore: podsScore })
92 const podsList = computeWinningPods(hosts, podsScore)
93 logger.debug('Pods that we keep.', { podsToKeep: podsList })
95 makeRequestsToWinningPods(cert, podsList, callback)
100 function quitFriends (callback) {
101 // Stop pool requests
102 db.Request.deactivate()
105 function flushRequests (callbackAsync) {
106 db.Request.flush(callbackAsync)
109 function getPodsList (callbackAsync) {
110 return db.Pod.list(callbackAsync)
113 function announceIQuitMyFriends (pods, callbackAsync) {
114 const requestParams = {
116 path: '/api/' + constants.API_VERSION + '/pods/remove',
120 // Announce we quit them
121 // We don't care if the request fails
122 // The other pod will exclude us automatically after a while
123 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
124 requestParams.toPod = pod
125 requests.makeSecureRequest(requestParams, callbackEach)
128 logger.error('Some errors while quitting friends.', { err: err })
129 // Don't stop the process
132 return callbackAsync(null, pods)
136 function removePodsFromDB (pods, callbackAsync) {
137 each(pods, function (pod, callbackEach) {
138 pod.destroy().asCallback(callbackEach)
142 // Don't forget to re activate the scheduler, even if there was an error
143 db.Request.activate()
145 if (err) return callback(err)
147 logger.info('Removed all remote videos.')
148 return callback(null)
152 function sendOwnedVideosToPod (podId) {
153 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
155 logger.error('Cannot get the list of videos we own.')
159 videosList.forEach(function (video) {
160 video.toAddRemoteJSON(function (err, remoteVideo) {
162 logger.error('Cannot convert video to remote.', { error: err })
163 // Don't break the process
169 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
173 createRequest(options)
179 // ---------------------------------------------------------------------------
181 module.exports = friends
183 // ---------------------------------------------------------------------------
185 function computeForeignPodsList (host, podsScore, callback) {
186 getForeignPodsList(host, function (err, res) {
187 if (err) return callback(err)
189 const foreignPodsList = res.data
191 // Let's give 1 point to the pod we ask the friends list
192 foreignPodsList.push({ host })
194 foreignPodsList.forEach(function (foreignPod) {
195 const foreignPodHost = foreignPod.host
197 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
198 else podsScore[foreignPodHost] = 1
205 function computeWinningPods (hosts, podsScore) {
206 // Build the list of pods to add
207 // Only add a pod if it exists in more than a half base pods
209 const baseScore = hosts.length / 2
211 Object.keys(podsScore).forEach(function (podHost) {
212 // If the pod is not me and with a good score we add it
213 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
214 podsList.push({ host: podHost })
221 function getForeignPodsList (host, callback) {
222 const path = '/api/' + constants.API_VERSION + '/pods'
224 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
225 if (err) return callback(err)
228 const json = JSON.parse(body)
229 return callback(null, json)
236 function makeRequestsToWinningPods (cert, podsList, callback) {
237 // Stop pool requests
238 db.Request.deactivate()
239 // Flush pool requests
240 db.Request.forceSend()
242 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
244 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
247 host: constants.CONFIG.WEBSERVER.HOST,
252 requests.makeRetryRequest(params, function (err, res, body) {
254 logger.error('Error with adding %s pod.', pod.host, { error: err })
255 // Don't break the process
256 return callbackEach()
259 if (res.statusCode === 200) {
260 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
261 podObj.save().asCallback(function (err, podCreated) {
263 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
264 return callbackEach()
267 // Add our videos to the request scheduler
268 sendOwnedVideosToPod(podCreated.id)
270 return callbackEach()
273 logger.error('Status not 200 for %s pod.', pod.host)
274 return callbackEach()
277 }, function endRequests () {
278 // Final callback, we've ended all the requests
279 // Now we made new friends, we can re activate the pool of requests
280 db.Request.activate()
282 logger.debug('makeRequestsToWinningPods finished.')
287 // Wrapper that populate "toIds" argument with all our friends if it is not specified
288 // { type, endpoint, data, toIds, transaction }
289 function createRequest (options, callback) {
290 if (!callback) callback = function () {}
291 if (options.toIds) return _createRequest(options, callback)
293 // If the "toIds" pods is not specified, we send the request to all our friends
294 db.Pod.listAllIds(options.transaction, function (err, podIds) {
296 logger.error('Cannot get pod ids', { error: err })
300 const newOptions = Object.assign(options, { toIds: podIds })
301 return _createRequest(newOptions, callback)
305 // { type, endpoint, data, toIds, transaction }
306 function _createRequest (options, callback) {
307 const type = options.type
308 const endpoint = options.endpoint
309 const data = options.data
310 const toIds = options.toIds
311 const transaction = options.transaction
315 // If there are no destination pods abort
316 if (toIds.length === 0) return callback(null)
318 toIds.forEach(function (toPod) {
319 pods.push(db.Pod.build({ id: toPod }))
322 const createQuery = {
330 const dbRequestOptions = {
334 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
335 if (err) return callback(err)
337 return request.setPods(pods, dbRequestOptions).asCallback(callback)
341 function isMe (host) {
342 return host === constants.CONFIG.WEBSERVER.HOST