3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
18 reportAbuseVideoToFriend,
27 function addVideoToFriends (videoData, transaction, callback) {
30 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
34 createRequest(options, callback)
37 function updateVideoToFriends (videoData, transaction, callback) {
40 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
44 createRequest(options, callback)
47 function removeVideoToFriends (videoParams) {
50 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
53 createRequest(options)
56 function reportAbuseVideoToFriend (reportData, video) {
57 createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ])
60 function hasFriends (callback) {
61 db.Pod.countAll(function (err, count) {
62 if (err) return callback(err)
64 const hasFriends = (count !== 0)
65 callback(null, hasFriends)
69 function getMyCertificate (callback) {
70 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
73 function makeFriends (hosts, callback) {
76 logger.info('Make friends!')
77 getMyCertificate(function (err, cert) {
79 logger.error('Cannot read public cert.')
83 eachSeries(hosts, function (host, callbackEach) {
84 computeForeignPodsList(host, podsScore, callbackEach)
86 if (err) return callback(err)
88 logger.debug('Pods scores computed.', { podsScore: podsScore })
89 const podsList = computeWinningPods(hosts, podsScore)
90 logger.debug('Pods that we keep.', { podsToKeep: podsList })
92 makeRequestsToWinningPods(cert, podsList, callback)
97 function quitFriends (callback) {
99 db.Request.deactivate()
102 function flushRequests (callbackAsync) {
103 db.Request.flush(callbackAsync)
106 function getPodsList (callbackAsync) {
107 return db.Pod.list(callbackAsync)
110 function announceIQuitMyFriends (pods, callbackAsync) {
111 const requestParams = {
113 path: '/api/' + constants.API_VERSION + '/pods/remove',
117 // Announce we quit them
118 // We don't care if the request fails
119 // The other pod will exclude us automatically after a while
120 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
121 requestParams.toPod = pod
122 requests.makeSecureRequest(requestParams, callbackEach)
125 logger.error('Some errors while quitting friends.', { err: err })
126 // Don't stop the process
129 return callbackAsync(null, pods)
133 function removePodsFromDB (pods, callbackAsync) {
134 each(pods, function (pod, callbackEach) {
135 pod.destroy().asCallback(callbackEach)
139 // Don't forget to re activate the scheduler, even if there was an error
140 db.Request.activate()
142 if (err) return callback(err)
144 logger.info('Removed all remote videos.')
145 return callback(null)
149 function sendOwnedVideosToPod (podId) {
150 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
152 logger.error('Cannot get the list of videos we own.')
156 videosList.forEach(function (video) {
157 video.toAddRemoteJSON(function (err, remoteVideo) {
159 logger.error('Cannot convert video to remote.', { error: err })
160 // Don't break the process
164 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
170 // ---------------------------------------------------------------------------
172 module.exports = friends
174 // ---------------------------------------------------------------------------
176 function computeForeignPodsList (host, podsScore, callback) {
177 getForeignPodsList(host, function (err, res) {
178 if (err) return callback(err)
180 const foreignPodsList = res.data
182 // Let's give 1 point to the pod we ask the friends list
183 foreignPodsList.push({ host })
185 foreignPodsList.forEach(function (foreignPod) {
186 const foreignPodHost = foreignPod.host
188 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
189 else podsScore[foreignPodHost] = 1
196 function computeWinningPods (hosts, podsScore) {
197 // Build the list of pods to add
198 // Only add a pod if it exists in more than a half base pods
200 const baseScore = hosts.length / 2
201 Object.keys(podsScore).forEach(function (podHost) {
202 // If the pod is not me and with a good score we add it
203 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
204 podsList.push({ host: podHost })
211 function getForeignPodsList (host, callback) {
212 const path = '/api/' + constants.API_VERSION + '/pods'
214 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
215 if (err) return callback(err)
218 const json = JSON.parse(body)
219 return callback(null, json)
226 function makeRequestsToWinningPods (cert, podsList, callback) {
227 // Stop pool requests
228 db.Request.deactivate()
229 // Flush pool requests
230 db.Request.forceSend()
232 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
234 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
237 host: constants.CONFIG.WEBSERVER.HOST,
242 requests.makeRetryRequest(params, function (err, res, body) {
244 logger.error('Error with adding %s pod.', pod.host, { error: err })
245 // Don't break the process
246 return callbackEach()
249 if (res.statusCode === 200) {
250 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
251 podObj.save().asCallback(function (err, podCreated) {
253 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
254 return callbackEach()
257 // Add our videos to the request scheduler
258 sendOwnedVideosToPod(podCreated.id)
260 return callbackEach()
263 logger.error('Status not 200 for %s pod.', pod.host)
264 return callbackEach()
267 }, function endRequests () {
268 // Final callback, we've ended all the requests
269 // Now we made new friends, we can re activate the pool of requests
270 db.Request.activate()
272 logger.debug('makeRequestsToWinningPods finished.')
277 // Wrapper that populate "toIds" argument with all our friends if it is not specified
278 // { type, endpoint, data, toIds, transaction }
279 function createRequest (options, callback) {
280 if (!callback) callback = function () {}
281 if (options.toIds) return _createRequest(options, callback)
283 // If the "toIds" pods is not specified, we send the request to all our friends
284 db.Pod.listAllIds(options.transaction, function (err, podIds) {
286 logger.error('Cannot get pod ids', { error: err })
290 const newOptions = Object.assign(options, { toIds: podIds })
291 return _createRequest(newOptions, callback)
295 // { type, endpoint, data, toIds, transaction }
296 function _createRequest (options, callback) {
297 const type = options.type
298 const endpoint = options.endpoint
299 const data = options.data
300 const toIds = options.toIds
301 const transaction = options.transaction
305 // If there are no destination pods abort
306 if (toIds.length === 0) return callback(null)
308 toIds.forEach(function (toPod) {
309 pods.push(db.Pod.build({ id: toPod }))
312 const createQuery = {
320 const dbRequestOptions = {
324 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
325 if (err) return callback(err)
327 return request.setPods(pods, dbRequestOptions).asCallback(callback)
331 function isMe (host) {
332 return host === constants.CONFIG.WEBSERVER.HOST