3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
25 function addVideoToFriends (video) {
26 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
29 function hasFriends (callback) {
30 db.Pod.countAll(function (err, count) {
31 if (err) return callback(err)
33 const hasFriends = (count !== 0)
34 callback(null, hasFriends)
38 function getMyCertificate (callback) {
39 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
42 function makeFriends (hosts, callback) {
45 logger.info('Make friends!')
46 getMyCertificate(function (err, cert) {
48 logger.error('Cannot read public cert.')
52 eachSeries(hosts, function (host, callbackEach) {
53 computeForeignPodsList(host, podsScore, callbackEach)
55 if (err) return callback(err)
57 logger.debug('Pods scores computed.', { podsScore: podsScore })
58 const podsList = computeWinningPods(hosts, podsScore)
59 logger.debug('Pods that we keep.', { podsToKeep: podsList })
61 makeRequestsToWinningPods(cert, podsList, callback)
66 function quitFriends (callback) {
68 db.Request.deactivate()
71 function flushRequests (callbackAsync) {
72 db.Request.flush(callbackAsync)
75 function getPodsList (callbackAsync) {
76 return db.Pod.list(callbackAsync)
79 function announceIQuitMyFriends (pods, callbackAsync) {
80 const requestParams = {
82 path: '/api/' + constants.API_VERSION + '/pods/remove',
86 // Announce we quit them
87 // We don't care if the request fails
88 // The other pod will exclude us automatically after a while
89 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
90 requestParams.toPod = pod
91 requests.makeSecureRequest(requestParams, callbackEach)
94 logger.error('Some errors while quitting friends.', { err: err })
95 // Don't stop the process
98 return callbackAsync(null, pods)
102 function removePodsFromDB (pods, callbackAsync) {
103 each(pods, function (pod, callbackEach) {
104 pod.destroy().asCallback(callbackEach)
108 // Don't forget to re activate the scheduler, even if there was an error
109 db.Request.activate()
111 if (err) return callback(err)
113 logger.info('Removed all remote videos.')
114 return callback(null)
118 function removeVideoToFriends (videoParams) {
119 createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
122 function sendOwnedVideosToPod (podId) {
123 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
125 logger.error('Cannot get the list of videos we own.')
129 videosList.forEach(function (video) {
130 video.toRemoteJSON(function (err, remoteVideo) {
132 logger.error('Cannot convert video to remote.', { error: err })
133 // Don't break the process
137 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
143 // ---------------------------------------------------------------------------
145 module.exports = friends
147 // ---------------------------------------------------------------------------
149 function computeForeignPodsList (host, podsScore, callback) {
150 getForeignPodsList(host, function (err, foreignPodsList) {
151 if (err) return callback(err)
153 if (!foreignPodsList) foreignPodsList = []
155 // Let's give 1 point to the pod we ask the friends list
156 foreignPodsList.push({ host })
158 foreignPodsList.forEach(function (foreignPod) {
159 const foreignPodHost = foreignPod.host
161 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
162 else podsScore[foreignPodHost] = 1
169 function computeWinningPods (hosts, podsScore) {
170 // Build the list of pods to add
171 // Only add a pod if it exists in more than a half base pods
173 const baseScore = hosts.length / 2
174 Object.keys(podsScore).forEach(function (podHost) {
175 // If the pod is not me and with a good score we add it
176 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
177 podsList.push({ host: podHost })
184 function getForeignPodsList (host, callback) {
185 const path = '/api/' + constants.API_VERSION + '/pods'
187 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
188 if (err) return callback(err)
191 const json = JSON.parse(body)
192 return callback(null, json)
199 function makeRequestsToWinningPods (cert, podsList, callback) {
200 // Stop pool requests
201 db.Request.deactivate()
202 // Flush pool requests
203 db.Request.forceSend()
205 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
207 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
210 host: constants.CONFIG.WEBSERVER.HOST,
215 requests.makeRetryRequest(params, function (err, res, body) {
217 logger.error('Error with adding %s pod.', pod.host, { error: err })
218 // Don't break the process
219 return callbackEach()
222 if (res.statusCode === 200) {
223 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
224 podObj.save().asCallback(function (err, podCreated) {
226 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
227 return callbackEach()
230 // Add our videos to the request scheduler
231 sendOwnedVideosToPod(podCreated.id)
233 return callbackEach()
236 logger.error('Status not 200 for %s pod.', pod.host)
237 return callbackEach()
240 }, function endRequests () {
241 // Final callback, we've ended all the requests
242 // Now we made new friends, we can re activate the pool of requests
243 db.Request.activate()
245 logger.debug('makeRequestsToWinningPods finished.')
250 // Wrapper that populate "to" argument with all our friends if it is not specified
251 function createRequest (type, endpoint, data, to) {
252 if (to) return _createRequest(type, endpoint, data, to)
254 // If the "to" pods is not specified, we send the request to all our friends
255 db.Pod.listAllIds(function (err, podIds) {
257 logger.error('Cannot get pod ids', { error: err })
261 return _createRequest(type, endpoint, data, podIds)
265 function _createRequest (type, endpoint, data, to) {
268 // If there are no destination pods abort
269 if (to.length === 0) return
271 to.forEach(function (toPod) {
272 pods.push(db.Pod.build({ id: toPod }))
275 const createQuery = {
283 // We run in transaction to keep coherency between Request and RequestToPod tables
284 db.sequelize.transaction(function (t) {
285 const dbRequestOptions = {
289 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
290 return request.setPods(pods, dbRequestOptions)
292 }).asCallback(function (err) {
293 if (err) logger.error('Error in createRequest transaction.', { error: err })
297 function isMe (host) {
298 return host === constants.CONFIG.WEBSERVER.HOST