3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
25 function addVideoToFriends (video) {
26 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
29 function hasFriends (callback) {
30 db.Pod.countAll(function (err, count) {
31 if (err) return callback(err)
33 const hasFriends = (count !== 0)
34 callback(null, hasFriends)
38 function getMyCertificate (callback) {
39 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
42 function makeFriends (hosts, callback) {
45 logger.info('Make friends!')
46 getMyCertificate(function (err, cert) {
48 logger.error('Cannot read public cert.')
52 eachSeries(hosts, function (host, callbackEach) {
53 computeForeignPodsList(host, podsScore, callbackEach)
55 if (err) return callback(err)
57 logger.debug('Pods scores computed.', { podsScore: podsScore })
58 const podsList = computeWinningPods(hosts, podsScore)
59 logger.debug('Pods that we keep.', { podsToKeep: podsList })
61 makeRequestsToWinningPods(cert, podsList, callback)
66 function quitFriends (callback) {
68 db.Request.deactivate()
69 // Flush pool requests
73 function getPodsList (callbackAsync) {
74 return db.Pod.list(callbackAsync)
77 function announceIQuitMyFriends (pods, callbackAsync) {
78 const requestParams = {
80 path: '/api/' + constants.API_VERSION + '/pods/remove',
84 // Announce we quit them
85 // We don't care if the request fails
86 // The other pod will exclude us automatically after a while
87 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
88 requestParams.toPod = pod
89 requests.makeSecureRequest(requestParams, callbackEach)
92 logger.error('Some errors while quitting friends.', { err: err })
93 // Don't stop the process
96 return callbackAsync(null, pods)
100 function removePodsFromDB (pods, callbackAsync) {
101 each(pods, function (pod, callbackEach) {
102 pod.destroy().asCallback(callbackEach)
106 // Don't forget to re activate the scheduler, even if there was an error
107 db.Request.activate()
109 if (err) return callback(err)
111 logger.info('Removed all remote videos.')
112 return callback(null)
116 function removeVideoToFriends (videoParams) {
117 createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
120 function sendOwnedVideosToPod (podId) {
121 db.Video.listOwnedAndPopulateAuthor(function (err, videosList) {
123 logger.error('Cannot get the list of videos we own.')
127 videosList.forEach(function (video) {
128 video.toRemoteJSON(function (err, remoteVideo) {
130 logger.error('Cannot convert video to remote.', { error: err })
131 // Don't break the process
135 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
141 // ---------------------------------------------------------------------------
143 module.exports = friends
145 // ---------------------------------------------------------------------------
147 function computeForeignPodsList (host, podsScore, callback) {
148 getForeignPodsList(host, function (err, foreignPodsList) {
149 if (err) return callback(err)
151 if (!foreignPodsList) foreignPodsList = []
153 // Let's give 1 point to the pod we ask the friends list
154 foreignPodsList.push({ host })
156 foreignPodsList.forEach(function (foreignPod) {
157 const foreignPodHost = foreignPod.host
159 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
160 else podsScore[foreignPodHost] = 1
167 function computeWinningPods (hosts, podsScore) {
168 // Build the list of pods to add
169 // Only add a pod if it exists in more than a half base pods
171 const baseScore = hosts.length / 2
172 Object.keys(podsScore).forEach(function (podHost) {
173 // If the pod is not me and with a good score we add it
174 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
175 podsList.push({ host: podHost })
182 function getForeignPodsList (host, callback) {
183 const path = '/api/' + constants.API_VERSION + '/pods'
185 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
186 if (err) return callback(err)
189 const json = JSON.parse(body)
190 return callback(null, json)
197 function makeRequestsToWinningPods (cert, podsList, callback) {
198 // Stop pool requests
199 db.Request.deactivate()
200 // Flush pool requests
201 db.Request.forceSend()
203 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
205 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
208 host: constants.CONFIG.WEBSERVER.HOST,
213 requests.makeRetryRequest(params, function (err, res, body) {
215 logger.error('Error with adding %s pod.', pod.host, { error: err })
216 // Don't break the process
217 return callbackEach()
220 if (res.statusCode === 200) {
221 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
222 podObj.save().asCallback(function (err, podCreated) {
224 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
225 return callbackEach()
228 // Add our videos to the request scheduler
229 sendOwnedVideosToPod(podCreated._id)
231 return callbackEach()
234 logger.error('Status not 200 for %s pod.', pod.host)
235 return callbackEach()
238 }, function endRequests () {
239 // Final callback, we've ended all the requests
240 // Now we made new friends, we can re activate the pool of requests
241 db.Request.activate()
243 logger.debug('makeRequestsToWinningPods finished.')
248 // Wrapper that populate "to" argument with all our friends if it is not specified
249 function createRequest (type, endpoint, data, to) {
250 if (to) return _createRequest(type, endpoint, data, to)
252 // If the "to" pods is not specified, we send the request to all our friends
253 db.Pod.listAllIds(function (err, podIds) {
255 logger.error('Cannot get pod ids', { error: err })
259 return _createRequest(type, endpoint, data, podIds)
263 function _createRequest (type, endpoint, data, to) {
266 // If there are no destination pods abort
267 if (to.length === 0) return
269 to.forEach(function (toPod) {
270 pods.push(db.Pod.build({ id: toPod }))
273 const createQuery = {
281 // We run in transaction to keep coherency between Request and RequestToPod tables
282 db.sequelize.transaction(function (t) {
283 const dbRequestOptions = {
287 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
288 return request.setPods(pods, dbRequestOptions)
290 }).asCallback(function (err) {
291 if (err) logger.error('Error in createRequest transaction.', { error: err })
295 function isMe (host) {
296 return host === constants.CONFIG.WEBSERVER.HOST