3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
18 reportAbuseVideoToFriend,
27 function addVideoToFriends (videoData) {
28 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, videoData)
31 function updateVideoToFriends (videoData) {
32 createRequest('update', constants.REQUEST_ENDPOINTS.VIDEOS, videoData)
35 function removeVideoToFriends (videoParams) {
36 createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
39 function reportAbuseVideoToFriend (reportData, video) {
40 createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ])
43 function hasFriends (callback) {
44 db.Pod.countAll(function (err, count) {
45 if (err) return callback(err)
47 const hasFriends = (count !== 0)
48 callback(null, hasFriends)
52 function getMyCertificate (callback) {
53 fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
56 function makeFriends (hosts, callback) {
59 logger.info('Make friends!')
60 getMyCertificate(function (err, cert) {
62 logger.error('Cannot read public cert.')
66 eachSeries(hosts, function (host, callbackEach) {
67 computeForeignPodsList(host, podsScore, callbackEach)
69 if (err) return callback(err)
71 logger.debug('Pods scores computed.', { podsScore: podsScore })
72 const podsList = computeWinningPods(hosts, podsScore)
73 logger.debug('Pods that we keep.', { podsToKeep: podsList })
75 makeRequestsToWinningPods(cert, podsList, callback)
80 function quitFriends (callback) {
82 db.Request.deactivate()
85 function flushRequests (callbackAsync) {
86 db.Request.flush(callbackAsync)
89 function getPodsList (callbackAsync) {
90 return db.Pod.list(callbackAsync)
93 function announceIQuitMyFriends (pods, callbackAsync) {
94 const requestParams = {
96 path: '/api/' + constants.API_VERSION + '/pods/remove',
100 // Announce we quit them
101 // We don't care if the request fails
102 // The other pod will exclude us automatically after a while
103 eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
104 requestParams.toPod = pod
105 requests.makeSecureRequest(requestParams, callbackEach)
108 logger.error('Some errors while quitting friends.', { err: err })
109 // Don't stop the process
112 return callbackAsync(null, pods)
116 function removePodsFromDB (pods, callbackAsync) {
117 each(pods, function (pod, callbackEach) {
118 pod.destroy().asCallback(callbackEach)
122 // Don't forget to re activate the scheduler, even if there was an error
123 db.Request.activate()
125 if (err) return callback(err)
127 logger.info('Removed all remote videos.')
128 return callback(null)
132 function sendOwnedVideosToPod (podId) {
133 db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
135 logger.error('Cannot get the list of videos we own.')
139 videosList.forEach(function (video) {
140 video.toAddRemoteJSON(function (err, remoteVideo) {
142 logger.error('Cannot convert video to remote.', { error: err })
143 // Don't break the process
147 createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
153 // ---------------------------------------------------------------------------
155 module.exports = friends
157 // ---------------------------------------------------------------------------
159 function computeForeignPodsList (host, podsScore, callback) {
160 getForeignPodsList(host, function (err, res) {
161 if (err) return callback(err)
163 const foreignPodsList = res.data
165 // Let's give 1 point to the pod we ask the friends list
166 foreignPodsList.push({ host })
168 foreignPodsList.forEach(function (foreignPod) {
169 const foreignPodHost = foreignPod.host
171 if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
172 else podsScore[foreignPodHost] = 1
179 function computeWinningPods (hosts, podsScore) {
180 // Build the list of pods to add
181 // Only add a pod if it exists in more than a half base pods
183 const baseScore = hosts.length / 2
184 Object.keys(podsScore).forEach(function (podHost) {
185 // If the pod is not me and with a good score we add it
186 if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
187 podsList.push({ host: podHost })
194 function getForeignPodsList (host, callback) {
195 const path = '/api/' + constants.API_VERSION + '/pods'
197 request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
198 if (err) return callback(err)
201 const json = JSON.parse(body)
202 return callback(null, json)
209 function makeRequestsToWinningPods (cert, podsList, callback) {
210 // Stop pool requests
211 db.Request.deactivate()
212 // Flush pool requests
213 db.Request.forceSend()
215 eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
217 url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
220 host: constants.CONFIG.WEBSERVER.HOST,
225 requests.makeRetryRequest(params, function (err, res, body) {
227 logger.error('Error with adding %s pod.', pod.host, { error: err })
228 // Don't break the process
229 return callbackEach()
232 if (res.statusCode === 200) {
233 const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
234 podObj.save().asCallback(function (err, podCreated) {
236 logger.error('Cannot add friend %s pod.', pod.host, { error: err })
237 return callbackEach()
240 // Add our videos to the request scheduler
241 sendOwnedVideosToPod(podCreated.id)
243 return callbackEach()
246 logger.error('Status not 200 for %s pod.', pod.host)
247 return callbackEach()
250 }, function endRequests () {
251 // Final callback, we've ended all the requests
252 // Now we made new friends, we can re activate the pool of requests
253 db.Request.activate()
255 logger.debug('makeRequestsToWinningPods finished.')
260 // Wrapper that populate "toIds" argument with all our friends if it is not specified
261 function createRequest (type, endpoint, data, toIds) {
262 if (toIds) return _createRequest(type, endpoint, data, toIds)
264 // If the "toIds" pods is not specified, we send the request to all our friends
265 db.Pod.listAllIds(function (err, podIds) {
267 logger.error('Cannot get pod ids', { error: err })
271 return _createRequest(type, endpoint, data, podIds)
275 function _createRequest (type, endpoint, data, toIds) {
278 // If there are no destination pods abort
279 if (toIds.length === 0) return
281 toIds.forEach(function (toPod) {
282 pods.push(db.Pod.build({ id: toPod }))
285 const createQuery = {
293 // We run in transaction to keep coherency between Request and RequestToPod tables
294 db.sequelize.transaction(function (t) {
295 const dbRequestOptions = {
299 return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
300 return request.setPods(pods, dbRequestOptions)
302 }).asCallback(function (err) {
303 if (err) logger.error('Error in createRequest transaction.', { error: err })
307 function isMe (host) {
308 return host === constants.CONFIG.WEBSERVER.HOST