3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
15 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
20 reportAbuseVideoToFriend,
29 function addVideoToFriends (videoData, transaction, callback) {
31 type: ENDPOINT_ACTIONS.ADD,
32 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
36 createRequest(options, callback)
39 function updateVideoToFriends (videoData, transaction, callback) {
41 type: ENDPOINT_ACTIONS.UPDATE,
42 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
46 createRequest(options, callback)
49 function removeVideoToFriends (videoParams) {
51 type: ENDPOINT_ACTIONS.REMOVE,
52 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
55 createRequest(options)
58 function reportAbuseVideoToFriend (reportData, video) {
60 type: ENDPOINT_ACTIONS.REPORT_ABUSE,
61 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
63 toIds: [ video.Author.podId ]
65 createRequest(options)
68 function hasFriends (callback) {
69 db.Pod.countAll(function (err, count) {
70 if (err) return callback(err)
72 const hasFriends = (count !== 0)
73 callback(null, hasFriends)
77 function getMyCertificate (callback) {
78 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
81 function makeFriends (hosts, callback) {
84 logger.info('Make friends!')
85 getMyCertificate(function (err, cert) {
87 logger.error('Cannot read public cert.')
91 eachSeries(hosts, function (host, callbackEach) {
92 computeForeignPodsList(host, podsScore, callbackEach)
94 if (err) return callback(err)
96 logger.debug('Pods scores computed.', { podsScore: podsScore })
97 const podsList = computeWinningPods(hosts, podsScore)
98 logger.debug('Pods that we keep.', { podsToKeep: podsList })
100 makeRequestsToWinningPods(cert, podsList, callback)
105 function quitFriends (callback) {
106 // Stop pool requests
107 db.Request.deactivate()
110 function flushRequests (callbackAsync) {
111 db.Request.flush(callbackAsync)
114 function getPodsList (callbackAsync) {
115 return db.Pod.list(callbackAsync)
118 function announceIQuitMyFriends (pods, callbackAsync) {
119 const requestParams = {
121 path: '/api/' + constants.API_VERSION + '/pods/remove',
125 // Announce we quit them
126 // We don't care if the request fails
127 // The other pod will exclude us automatically after a while
128 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
129 requestParams.toPod = pod
130 requests.makeSecureRequest(requestParams, callbackEach)
133 logger.error('Some errors while quitting friends.', { err: err })
134 // Don't stop the process
137 return callbackAsync(null, pods)
141 function removePodsFromDB (pods, callbackAsync) {
142 each(pods, function (pod, callbackEach) {
143 pod.destroy().asCallback(callbackEach)
147 // Don't forget to re activate the scheduler, even if there was an error
148 db.Request.activate()
150 if (err) return callback(err)
152 logger.info('Removed all remote videos.')
153 return callback(null)
157 function sendOwnedVideosToPod (podId) {
158 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
160 logger.error('Cannot get the list of videos we own.')
164 videosList.forEach(function (video) {
165 video.toAddRemoteJSON(function (err, remoteVideo) {
167 logger.error('Cannot convert video to remote.', { error: err })
168 // Don't break the process
174 endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
178 createRequest(options)
184 // ---------------------------------------------------------------------------
186 module.exports = friends
188 // ---------------------------------------------------------------------------
190 function computeForeignPodsList (host, podsScore, callback) {
191 getForeignPodsList(host, function (err, res) {
192 if (err) return callback(err)
194 const foreignPodsList = res.data
196 // Let's give 1 point to the pod we ask the friends list
197 foreignPodsList.push({ host })
199 foreignPodsList.forEach(function (foreignPod) {
200 const foreignPodHost = foreignPod.host
202 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
203 else podsScore[foreignPodHost] = 1
210 function computeWinningPods (hosts, podsScore) {
211 // Build the list of pods to add
212 // Only add a pod if it exists in more than a half base pods
214 const baseScore = hosts.length / 2
215 Object.keys(podsScore).forEach(function (podHost) {
216 // If the pod is not me and with a good score we add it
217 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
218 podsList.push({ host: podHost })
225 function getForeignPodsList (host, callback) {
226 const path = '/api/' + constants.API_VERSION + '/pods'
228 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
229 if (err) return callback(err)
232 const json = JSON.parse(body)
233 return callback(null, json)
240 function makeRequestsToWinningPods (cert, podsList, callback) {
241 // Stop pool requests
242 db.Request.deactivate()
243 // Flush pool requests
244 db.Request.forceSend()
246 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
248 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
251 host: constants.CONFIG.WEBSERVER.HOST,
256 requests.makeRetryRequest(params, function (err, res, body) {
258 logger.error('Error with adding %s pod.', pod.host, { error: err })
259 // Don't break the process
260 return callbackEach()
263 if (res.statusCode === 200) {
264 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
265 podObj.save().asCallback(function (err, podCreated) {
267 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
268 return callbackEach()
271 // Add our videos to the request scheduler
272 sendOwnedVideosToPod(podCreated.id)
274 return callbackEach()
277 logger.error('Status not 200 for %s pod.', pod.host)
278 return callbackEach()
281 }, function endRequests () {
282 // Final callback, we've ended all the requests
283 // Now we made new friends, we can re activate the pool of requests
284 db.Request.activate()
286 logger.debug('makeRequestsToWinningPods finished.')
291 // Wrapper that populate "toIds" argument with all our friends if it is not specified
292 // { type, endpoint, data, toIds, transaction }
293 function createRequest (options, callback) {
294 if (!callback) callback = function () {}
295 if (options.toIds) return _createRequest(options, callback)
297 // If the "toIds" pods is not specified, we send the request to all our friends
298 db.Pod.listAllIds(options.transaction, function (err, podIds) {
300 logger.error('Cannot get pod ids', { error: err })
304 const newOptions = Object.assign(options, { toIds: podIds })
305 return _createRequest(newOptions, callback)
309 // { type, endpoint, data, toIds, transaction }
310 function _createRequest (options, callback) {
311 const type = options.type
312 const endpoint = options.endpoint
313 const data = options.data
314 const toIds = options.toIds
315 const transaction = options.transaction
319 // If there are no destination pods abort
320 if (toIds.length === 0) return callback(null)
322 toIds.forEach(function (toPod) {
323 pods.push(db.Pod.build({ id: toPod }))
326 const createQuery = {
334 const dbRequestOptions = {
338 return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
339 if (err) return callback(err)
341 return request.setPods(pods, dbRequestOptions).asCallback(callback)
345 function isMe (host) {
346 return host === constants.CONFIG.WEBSERVER.HOST