3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
26 function addVideoToFriends (video) {
27 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
30 function updateVideoToFriends (video) {
31 createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, video)
34 function hasFriends (callback) {
35 db.Pod.countAll(function (err, count) {
36 if (err) return callback(err)
38 const hasFriends = (count !== 0)
39 callback(null, hasFriends)
43 function getMyCertificate (callback) {
44 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
47 function makeFriends (hosts, callback) {
50 logger.info('Make friends!')
51 getMyCertificate(function (err, cert) {
53 logger.error('Cannot read public cert.')
57 eachSeries(hosts, function (host, callbackEach) {
58 computeForeignPodsList(host, podsScore, callbackEach)
60 if (err) return callback(err)
62 logger.debug('Pods scores computed.', { podsScore: podsScore })
63 const podsList = computeWinningPods(hosts, podsScore)
64 logger.debug('Pods that we keep.', { podsToKeep: podsList })
66 makeRequestsToWinningPods(cert, podsList, callback)
71 function quitFriends (callback) {
73 db.Request.deactivate()
76 function flushRequests (callbackAsync) {
77 db.Request.flush(callbackAsync)
80 function getPodsList (callbackAsync) {
81 return db.Pod.list(callbackAsync)
84 function announceIQuitMyFriends (pods, callbackAsync) {
85 const requestParams = {
87 path: '/api/' + constants.API_VERSION + '/pods/remove',
91 // Announce we quit them
92 // We don't care if the request fails
93 // The other pod will exclude us automatically after a while
94 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
95 requestParams.toPod = pod
96 requests.makeSecureRequest(requestParams, callbackEach)
99 logger.error('Some errors while quitting friends.', { err: err })
100 // Don't stop the process
103 return callbackAsync(null, pods)
107 function removePodsFromDB (pods, callbackAsync) {
108 each(pods, function (pod, callbackEach) {
109 pod.destroy().asCallback(callbackEach)
113 // Don't forget to re activate the scheduler, even if there was an error
114 db.Request.activate()
116 if (err) return callback(err)
118 logger.info('Removed all remote videos.')
119 return callback(null)
123 function removeVideoToFriends (videoParams) {
124 createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
127 function sendOwnedVideosToPod (podId) {
128 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
130 logger.error('Cannot get the list of videos we own.')
134 videosList.forEach(function (video) {
135 video.toAddRemoteJSON(function (err, remoteVideo) {
137 logger.error('Cannot convert video to remote.', { error: err })
138 // Don't break the process
142 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
148 // ---------------------------------------------------------------------------
150 module.exports = friends
152 // ---------------------------------------------------------------------------
154 function computeForeignPodsList (host, podsScore, callback) {
155 getForeignPodsList(host, function (err, foreignPodsList) {
156 if (err) return callback(err)
158 if (!foreignPodsList) foreignPodsList = []
160 // Let's give 1 point to the pod we ask the friends list
161 foreignPodsList.push({ host })
163 foreignPodsList.forEach(function (foreignPod) {
164 const foreignPodHost = foreignPod.host
166 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
167 else podsScore[foreignPodHost] = 1
174 function computeWinningPods (hosts, podsScore) {
175 // Build the list of pods to add
176 // Only add a pod if it exists in more than a half base pods
178 const baseScore = hosts.length / 2
179 Object.keys(podsScore).forEach(function (podHost) {
180 // If the pod is not me and with a good score we add it
181 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
182 podsList.push({ host: podHost })
189 function getForeignPodsList (host, callback) {
190 const path = '/api/' + constants.API_VERSION + '/pods'
192 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
193 if (err) return callback(err)
196 const json = JSON.parse(body)
197 return callback(null, json)
204 function makeRequestsToWinningPods (cert, podsList, callback) {
205 // Stop pool requests
206 db.Request.deactivate()
207 // Flush pool requests
208 db.Request.forceSend()
210 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
212 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
215 host: constants.CONFIG.WEBSERVER.HOST,
220 requests.makeRetryRequest(params, function (err, res, body) {
222 logger.error('Error with adding %s pod.', pod.host, { error: err })
223 // Don't break the process
224 return callbackEach()
227 if (res.statusCode === 200) {
228 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
229 podObj.save().asCallback(function (err, podCreated) {
231 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
232 return callbackEach()
235 // Add our videos to the request scheduler
236 sendOwnedVideosToPod(podCreated.id)
238 return callbackEach()
241 logger.error('Status not 200 for %s pod.', pod.host)
242 return callbackEach()
245 }, function endRequests () {
246 // Final callback, we've ended all the requests
247 // Now we made new friends, we can re activate the pool of requests
248 db.Request.activate()
250 logger.debug('makeRequestsToWinningPods finished.')
255 // Wrapper that populate "to" argument with all our friends if it is not specified
256 function createRequest (type, endpoint, data, to) {
257 if (to) return _createRequest(type, endpoint, data, to)
259 // If the "to" pods is not specified, we send the request to all our friends
260 db.Pod.listAllIds(function (err, podIds) {
262 logger.error('Cannot get pod ids', { error: err })
266 return _createRequest(type, endpoint, data, podIds)
270 function _createRequest (type, endpoint, data, to) {
273 // If there are no destination pods abort
274 if (to.length === 0) return
276 to.forEach(function (toPod) {
277 pods.push(db.Pod.build({ id: toPod }))
280 const createQuery = {
288 // We run in transaction to keep coherency between Request and RequestToPod tables
289 db.sequelize.transaction(function (t) {
290 const dbRequestOptions = {
294 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
295 return request.setPods(pods, dbRequestOptions)
297 }).asCallback(function (err) {
298 if (err) logger.error('Error in createRequest transaction.', { error: err })
302 function isMe (host) {
303 return host === constants.CONFIG.WEBSERVER.HOST