3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
15 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
20 reportAbuseVideoToFriend,
29 function addVideoToFriends (videoData, transaction, callback) {
31 type: ENDPOINT_ACTIONS.ADD,
32 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
36 createRequest(options, callback)
39 function updateVideoToFriends (videoData, transaction, callback) {
41 type: ENDPOINT_ACTIONS.UPDATE,
42 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
46 createRequest(options, callback)
49 function removeVideoToFriends (videoParams) {
51 type: ENDPOINT_ACTIONS.REMOVE,
52 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
55 createRequest(options)
58 function reportAbuseVideoToFriend (reportData, video) {
60 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
61 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
63 toIds: [ video.Author.podId ]
65 createRequest(options)
68 function hasFriends (callback) {
69 db.Pod.countAll(function (err, count) {
70 if (err) return callback(err)
72 const hasFriends = (count !== 0)
73 callback(null, hasFriends)
77 function getMyCertificate (callback) {
78 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
81 function makeFriends (hosts, callback) {
84 logger.info('Make friends!')
85 getMyCertificate(function (err, cert) {
87 logger.error('Cannot read public cert.')
91 eachSeries(hosts, function (host, callbackEach) {
92 computeForeignPodsList(host, podsScore, callbackEach)
94 if (err) return callback(err)
96 logger.debug('Pods scores computed.', { podsScore: podsScore })
97 const podsList = computeWinningPods(hosts, podsScore)
98 logger.debug('Pods that we keep.', { podsToKeep: podsList })
100 makeRequestsToWinningPods(cert, podsList, callback)
105 function quitFriends (callback) {
106 // Stop pool requests
107 db.Request.deactivate()
110 function flushRequests (callbackAsync) {
111 db.Request.flush(callbackAsync)
114 function getPodsList (callbackAsync) {
115 return db.Pod.list(callbackAsync)
118 function announceIQuitMyFriends (pods, callbackAsync) {
119 const requestParams = {
121 path: '/api/' + constants.API_VERSION + '/pods/remove',
125 // Announce we quit them
126 // We don't care if the request fails
127 // The other pod will exclude us automatically after a while
128 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
129 requestParams.toPod = pod
130 requests.makeSecureRequest(requestParams, callbackEach)
133 logger.error('Some errors while quitting friends.', { err: err })
134 // Don't stop the process
137 return callbackAsync(null, pods)
141 function removePodsFromDB (pods, callbackAsync) {
142 each(pods, function (pod, callbackEach) {
143 pod.destroy().asCallback(callbackEach)
147 // Don't forget to re activate the scheduler, even if there was an error
148 db.Request.activate()
150 if (err) return callback(err)
152 logger.info('Removed all remote videos.')
153 return callback(null)
157 function sendOwnedVideosToPod (podId) {
158 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
160 logger.error('Cannot get the list of videos we own.')
164 videosList.forEach(function (video) {
165 video.toAddRemoteJSON(function (err, remoteVideo) {
167 logger.error('Cannot convert video to remote.', { error: err })
168 // Don't break the process
174 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
178 createRequest(options)
184 // ---------------------------------------------------------------------------
186 module.exports = friends
188 // ---------------------------------------------------------------------------
190 function computeForeignPodsList (host, podsScore, callback) {
191 getForeignPodsList(host, function (err, res) {
192 if (err) return callback(err)
194 const foreignPodsList = res.data
196 // Let's give 1 point to the pod we ask the friends list
197 foreignPodsList.push({ host })
199 foreignPodsList.forEach(function (foreignPod) {
200 const foreignPodHost = foreignPod.host
202 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
203 else podsScore[foreignPodHost] = 1
210 function computeWinningPods (hosts, podsScore) {
211 // Build the list of pods to add
212 // Only add a pod if it exists in more than a half base pods
214 const baseScore = hosts.length / 2
216 Object.keys(podsScore).forEach(function (podHost) {
217 // If the pod is not me and with a good score we add it
218 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
219 podsList.push({ host: podHost })
226 function getForeignPodsList (host, callback) {
227 const path = '/api/' + constants.API_VERSION + '/pods'
229 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
230 if (err) return callback(err)
233 const json = JSON.parse(body)
234 return callback(null, json)
241 function makeRequestsToWinningPods (cert, podsList, callback) {
242 // Stop pool requests
243 db.Request.deactivate()
244 // Flush pool requests
245 db.Request.forceSend()
247 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
249 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
252 host: constants.CONFIG.WEBSERVER.HOST,
257 requests.makeRetryRequest(params, function (err, res, body) {
259 logger.error('Error with adding %s pod.', pod.host, { error: err })
260 // Don't break the process
261 return callbackEach()
264 if (res.statusCode === 200) {
265 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
266 podObj.save().asCallback(function (err, podCreated) {
268 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
269 return callbackEach()
272 // Add our videos to the request scheduler
273 sendOwnedVideosToPod(podCreated.id)
275 return callbackEach()
278 logger.error('Status not 200 for %s pod.', pod.host)
279 return callbackEach()
282 }, function endRequests () {
283 // Final callback, we've ended all the requests
284 // Now we made new friends, we can re activate the pool of requests
285 db.Request.activate()
287 logger.debug('makeRequestsToWinningPods finished.')
292 // Wrapper that populate "toIds" argument with all our friends if it is not specified
293 // { type, endpoint, data, toIds, transaction }
294 function createRequest (options, callback) {
295 if (!callback) callback = function () {}
296 if (options.toIds) return _createRequest(options, callback)
298 // If the "toIds" pods is not specified, we send the request to all our friends
299 db.Pod.listAllIds(options.transaction, function (err, podIds) {
301 logger.error('Cannot get pod ids', { error: err })
305 const newOptions = Object.assign(options, { toIds: podIds })
306 return _createRequest(newOptions, callback)
310 // { type, endpoint, data, toIds, transaction }
311 function _createRequest (options, callback) {
312 const type = options.type
313 const endpoint = options.endpoint
314 const data = options.data
315 const toIds = options.toIds
316 const transaction = options.transaction
320 // If there are no destination pods abort
321 if (toIds.length === 0) return callback(null)
323 toIds.forEach(function (toPod) {
324 pods.push(db.Pod.build({ id: toPod }))
327 const createQuery = {
335 const dbRequestOptions = {
339 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
340 if (err) return callback(err)
342 return request.setPods(pods, dbRequestOptions).asCallback(callback)
346 function isMe (host) {
347 return host === constants.CONFIG.WEBSERVER.HOST