3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
18 reportAbuseVideoToFriend,
27 function addVideoToFriends (videoData, transaction, callback) {
30 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
34 createRequest(options, callback)
37 function updateVideoToFriends (videoData, transaction, callback) {
40 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
44 createRequest(options, callback)
47 function removeVideoToFriends (videoParams) {
50 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
53 createRequest(options)
56 function reportAbuseVideoToFriend (reportData, video) {
59 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
61 toIds: [ video.Author.podId ]
63 createRequest(options)
66 function hasFriends (callback) {
67 db.Pod.countAll(function (err, count) {
68 if (err) return callback(err)
70 const hasFriends = (count !== 0)
71 callback(null, hasFriends)
75 function getMyCertificate (callback) {
76 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
79 function makeFriends (hosts, callback) {
82 logger.info('Make friends!')
83 getMyCertificate(function (err, cert) {
85 logger.error('Cannot read public cert.')
89 eachSeries(hosts, function (host, callbackEach) {
90 computeForeignPodsList(host, podsScore, callbackEach)
92 if (err) return callback(err)
94 logger.debug('Pods scores computed.', { podsScore: podsScore })
95 const podsList = computeWinningPods(hosts, podsScore)
96 logger.debug('Pods that we keep.', { podsToKeep: podsList })
98 makeRequestsToWinningPods(cert, podsList, callback)
103 function quitFriends (callback) {
104 // Stop pool requests
105 db.Request.deactivate()
108 function flushRequests (callbackAsync) {
109 db.Request.flush(callbackAsync)
112 function getPodsList (callbackAsync) {
113 return db.Pod.list(callbackAsync)
116 function announceIQuitMyFriends (pods, callbackAsync) {
117 const requestParams = {
119 path: '/api/' + constants.API_VERSION + '/pods/remove',
123 // Announce we quit them
124 // We don't care if the request fails
125 // The other pod will exclude us automatically after a while
126 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
127 requestParams.toPod = pod
128 requests.makeSecureRequest(requestParams, callbackEach)
131 logger.error('Some errors while quitting friends.', { err: err })
132 // Don't stop the process
135 return callbackAsync(null, pods)
139 function removePodsFromDB (pods, callbackAsync) {
140 each(pods, function (pod, callbackEach) {
141 pod.destroy().asCallback(callbackEach)
145 // Don't forget to re activate the scheduler, even if there was an error
146 db.Request.activate()
148 if (err) return callback(err)
150 logger.info('Removed all remote videos.')
151 return callback(null)
155 function sendOwnedVideosToPod (podId) {
156 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
158 logger.error('Cannot get the list of videos we own.')
162 videosList.forEach(function (video) {
163 video.toAddRemoteJSON(function (err, remoteVideo) {
165 logger.error('Cannot convert video to remote.', { error: err })
166 // Don't break the process
172 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
176 createRequest(options)
182 // ---------------------------------------------------------------------------
184 module.exports = friends
186 // ---------------------------------------------------------------------------
188 function computeForeignPodsList (host, podsScore, callback) {
189 getForeignPodsList(host, function (err, res) {
190 if (err) return callback(err)
192 const foreignPodsList = res.data
194 // Let's give 1 point to the pod we ask the friends list
195 foreignPodsList.push({ host })
197 foreignPodsList.forEach(function (foreignPod) {
198 const foreignPodHost = foreignPod.host
200 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
201 else podsScore[foreignPodHost] = 1
208 function computeWinningPods (hosts, podsScore) {
209 // Build the list of pods to add
210 // Only add a pod if it exists in more than a half base pods
212 const baseScore = hosts.length / 2
213 Object.keys(podsScore).forEach(function (podHost) {
214 // If the pod is not me and with a good score we add it
215 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
216 podsList.push({ host: podHost })
223 function getForeignPodsList (host, callback) {
224 const path = '/api/' + constants.API_VERSION + '/pods'
226 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
227 if (err) return callback(err)
230 const json = JSON.parse(body)
231 return callback(null, json)
238 function makeRequestsToWinningPods (cert, podsList, callback) {
239 // Stop pool requests
240 db.Request.deactivate()
241 // Flush pool requests
242 db.Request.forceSend()
244 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
246 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
249 host: constants.CONFIG.WEBSERVER.HOST,
254 requests.makeRetryRequest(params, function (err, res, body) {
256 logger.error('Error with adding %s pod.', pod.host, { error: err })
257 // Don't break the process
258 return callbackEach()
261 if (res.statusCode === 200) {
262 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
263 podObj.save().asCallback(function (err, podCreated) {
265 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
266 return callbackEach()
269 // Add our videos to the request scheduler
270 sendOwnedVideosToPod(podCreated.id)
272 return callbackEach()
275 logger.error('Status not 200 for %s pod.', pod.host)
276 return callbackEach()
279 }, function endRequests () {
280 // Final callback, we've ended all the requests
281 // Now we made new friends, we can re activate the pool of requests
282 db.Request.activate()
284 logger.debug('makeRequestsToWinningPods finished.')
289 // Wrapper that populate "toIds" argument with all our friends if it is not specified
290 // { type, endpoint, data, toIds, transaction }
291 function createRequest (options, callback) {
292 if (!callback) callback = function () {}
293 if (options.toIds) return _createRequest(options, callback)
295 // If the "toIds" pods is not specified, we send the request to all our friends
296 db.Pod.listAllIds(options.transaction, function (err, podIds) {
298 logger.error('Cannot get pod ids', { error: err })
302 const newOptions = Object.assign(options, { toIds: podIds })
303 return _createRequest(newOptions, callback)
307 // { type, endpoint, data, toIds, transaction }
308 function _createRequest (options, callback) {
309 const type = options.type
310 const endpoint = options.endpoint
311 const data = options.data
312 const toIds = options.toIds
313 const transaction = options.transaction
317 // If there are no destination pods abort
318 if (toIds.length === 0) return callback(null)
320 toIds.forEach(function (toPod) {
321 pods.push(db.Pod.build({ id: toPod }))
324 const createQuery = {
332 const dbRequestOptions = {
336 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
337 if (err) return callback(err)
339 return request.setPods(pods, dbRequestOptions).asCallback(callback)
343 function isMe (host) {
344 return host === constants.CONFIG.WEBSERVER.HOST