3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
15 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
20 reportAbuseVideoToFriend,
28 function addVideoToFriends (videoData, transaction, callback) {
30 type: ENDPOINT_ACTIONS.ADD,
31 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
35 createRequest(options, callback)
38 function updateVideoToFriends (videoData, transaction, callback) {
40 type: ENDPOINT_ACTIONS.UPDATE,
41 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
45 createRequest(options, callback)
48 function removeVideoToFriends (videoParams) {
50 type: ENDPOINT_ACTIONS.REMOVE,
51 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
54 createRequest(options)
57 function reportAbuseVideoToFriend (reportData, video) {
59 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
60 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
62 toIds: [ video.Author.podId ]
64 createRequest(options)
67 function hasFriends (callback) {
68 db.Pod.countAll(function (err, count) {
69 if (err) return callback(err)
71 const hasFriends = (count !== 0)
72 callback(null, hasFriends)
76 function makeFriends (hosts, callback) {
79 logger.info('Make friends!')
80 peertubeCrypto.getMyPublicCert(function (err, cert) {
82 logger.error('Cannot read public cert.')
86 eachSeries(hosts, function (host, callbackEach) {
87 computeForeignPodsList(host, podsScore, callbackEach)
89 if (err) return callback(err)
91 logger.debug('Pods scores computed.', { podsScore: podsScore })
92 const podsList = computeWinningPods(hosts, podsScore)
93 logger.debug('Pods that we keep.', { podsToKeep: podsList })
95 makeRequestsToWinningPods(cert, podsList, callback)
100 function quitFriends (callback) {
101 // Stop pool requests
102 db.Request.deactivate()
105 function flushRequests (callbackAsync) {
106 db.Request.flush(callbackAsync)
109 function getPodsList (callbackAsync) {
110 return db.Pod.list(callbackAsync)
113 function announceIQuitMyFriends (pods, callbackAsync) {
114 const requestParams = {
116 path: '/api/' + constants.API_VERSION + '/pods/remove',
120 // Announce we quit them
121 // We don't care if the request fails
122 // The other pod will exclude us automatically after a while
123 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
124 requestParams.toPod = pod
125 requests.makeSecureRequest(requestParams, callbackEach)
128 logger.error('Some errors while quitting friends.', { err: err })
129 // Don't stop the process
132 return callbackAsync(null, pods)
136 function removePodsFromDB (pods, callbackAsync) {
137 each(pods, function (pod, callbackEach) {
138 pod.destroy().asCallback(callbackEach)
142 // Don't forget to re activate the scheduler, even if there was an error
143 db.Request.activate()
145 if (err) return callback(err)
147 logger.info('Removed all remote videos.')
148 return callback(null)
152 function sendOwnedVideosToPod (podId) {
153 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
155 logger.error('Cannot get the list of videos we own.')
159 videosList.forEach(function (video) {
160 video.toAddRemoteJSON(function (err, remoteVideo) {
162 logger.error('Cannot convert video to remote.', { error: err })
163 // Don't break the process
169 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
173 createRequest(options)
179 // ---------------------------------------------------------------------------
181 module.exports = friends
183 // ---------------------------------------------------------------------------
185 function computeForeignPodsList (host, podsScore, callback) {
186 getForeignPodsList(host, function (err, res) {
187 if (err) return callback(err)
189 const foreignPodsList = res.data
191 // Let's give 1 point to the pod we ask the friends list
192 foreignPodsList.push({ host })
194 foreignPodsList.forEach(function (foreignPod) {
195 const foreignPodHost = foreignPod.host
197 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
198 else podsScore[foreignPodHost] = 1
205 function computeWinningPods (hosts, podsScore) {
206 // Build the list of pods to add
207 // Only add a pod if it exists in more than a half base pods
209 const baseScore = hosts.length / 2
211 Object.keys(podsScore).forEach(function (podHost) {
212 // If the pod is not me and with a good score we add it
213 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
214 podsList.push({ host: podHost })
221 function getForeignPodsList (host, callback) {
222 const path = '/api/' + constants.API_VERSION + '/pods'
224 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
225 if (err) return callback(err)
228 const json = JSON.parse(body)
229 return callback(null, json)
236 function makeRequestsToWinningPods (cert, podsList, callback) {
237 // Stop pool requests
238 db.Request.deactivate()
239 // Flush pool requests
240 db.Request.forceSend()
242 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
244 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
247 host: constants.CONFIG.WEBSERVER.HOST,
248 email: constants.CONFIG.ADMIN.EMAIL,
253 requests.makeRetryRequest(params, function (err, res, body) {
255 logger.error('Error with adding %s pod.', pod.host, { error: err })
256 // Don't break the process
257 return callbackEach()
260 if (res.statusCode === 200) {
261 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
262 podObj.save().asCallback(function (err, podCreated) {
264 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
265 return callbackEach()
268 // Add our videos to the request scheduler
269 sendOwnedVideosToPod(podCreated.id)
271 return callbackEach()
274 logger.error('Status not 200 for %s pod.', pod.host)
275 return callbackEach()
278 }, function endRequests () {
279 // Final callback, we've ended all the requests
280 // Now we made new friends, we can re activate the pool of requests
281 db.Request.activate()
283 logger.debug('makeRequestsToWinningPods finished.')
288 // Wrapper that populate "toIds" argument with all our friends if it is not specified
289 // { type, endpoint, data, toIds, transaction }
290 function createRequest (options, callback) {
291 if (!callback) callback = function () {}
292 if (options.toIds) return _createRequest(options, callback)
294 // If the "toIds" pods is not specified, we send the request to all our friends
295 db.Pod.listAllIds(options.transaction, function (err, podIds) {
297 logger.error('Cannot get pod ids', { error: err })
301 const newOptions = Object.assign(options, { toIds: podIds })
302 return _createRequest(newOptions, callback)
306 // { type, endpoint, data, toIds, transaction }
307 function _createRequest (options, callback) {
308 const type = options.type
309 const endpoint = options.endpoint
310 const data = options.data
311 const toIds = options.toIds
312 const transaction = options.transaction
316 // If there are no destination pods abort
317 if (toIds.length === 0) return callback(null)
319 toIds.forEach(function (toPod) {
320 pods.push(db.Pod.build({ id: toPod }))
323 const createQuery = {
331 const dbRequestOptions = {
335 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
336 if (err) return callback(err)
338 return request.setPods(pods, dbRequestOptions).asCallback(callback)
342 function isMe (host) {
343 return host === constants.CONFIG.WEBSERVER.HOST