Server: paths refractoring
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
8
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
14
15 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
16
17 const friends = {
18   addVideoToFriends,
19   updateVideoToFriends,
20   reportAbuseVideoToFriend,
21   hasFriends,
22   makeFriends,
23   quitFriends,
24   removeVideoToFriends,
25   sendOwnedVideosToPod
26 }
27
28 function addVideoToFriends (videoData, transaction, callback) {
29   const options = {
30     type: ENDPOINT_ACTIONS.ADD,
31     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
32     data: videoData,
33     transaction
34   }
35   createRequest(options, callback)
36 }
37
38 function updateVideoToFriends (videoData, transaction, callback) {
39   const options = {
40     type: ENDPOINT_ACTIONS.UPDATE,
41     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
42     data: videoData,
43     transaction
44   }
45   createRequest(options, callback)
46 }
47
48 function removeVideoToFriends (videoParams) {
49   const options = {
50     type: ENDPOINT_ACTIONS.REMOVE,
51     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
52     data: videoParams
53   }
54   createRequest(options)
55 }
56
57 function reportAbuseVideoToFriend (reportData, video) {
58   const options = {
59     type: ENDPOINT_ACTIONS.REPORT_ABUSE,
60     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
61     data: reportData,
62     toIds: [ video.Author.podId ]
63   }
64   createRequest(options)
65 }
66
67 function hasFriends (callback) {
68   db.Pod.countAll(function (err, count) {
69     if (err) return callback(err)
70
71     const hasFriends = (count !== 0)
72     callback(null, hasFriends)
73   })
74 }
75
76 function makeFriends (hosts, callback) {
77   const podsScore = {}
78
79   logger.info('Make friends!')
80   peertubeCrypto.getMyPublicCert(function (err, cert) {
81     if (err) {
82       logger.error('Cannot read public cert.')
83       return callback(err)
84     }
85
86     eachSeries(hosts, function (host, callbackEach) {
87       computeForeignPodsList(host, podsScore, callbackEach)
88     }, function (err) {
89       if (err) return callback(err)
90
91       logger.debug('Pods scores computed.', { podsScore: podsScore })
92       const podsList = computeWinningPods(hosts, podsScore)
93       logger.debug('Pods that we keep.', { podsToKeep: podsList })
94
95       makeRequestsToWinningPods(cert, podsList, callback)
96     })
97   })
98 }
99
100 function quitFriends (callback) {
101   // Stop pool requests
102   db.Request.deactivate()
103
104   waterfall([
105     function flushRequests (callbackAsync) {
106       db.Request.flush(callbackAsync)
107     },
108
109     function getPodsList (callbackAsync) {
110       return db.Pod.list(callbackAsync)
111     },
112
113     function announceIQuitMyFriends (pods, callbackAsync) {
114       const requestParams = {
115         method: 'POST',
116         path: '/api/' + constants.API_VERSION + '/pods/remove',
117         sign: true
118       }
119
120       // Announce we quit them
121       // We don't care if the request fails
122       // The other pod will exclude us automatically after a while
123       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
124         requestParams.toPod = pod
125         requests.makeSecureRequest(requestParams, callbackEach)
126       }, function (err) {
127         if (err) {
128           logger.error('Some errors while quitting friends.', { err: err })
129           // Don't stop the process
130         }
131
132         return callbackAsync(null, pods)
133       })
134     },
135
136     function removePodsFromDB (pods, callbackAsync) {
137       each(pods, function (pod, callbackEach) {
138         pod.destroy().asCallback(callbackEach)
139       }, callbackAsync)
140     }
141   ], function (err) {
142     // Don't forget to re activate the scheduler, even if there was an error
143     db.Request.activate()
144
145     if (err) return callback(err)
146
147     logger.info('Removed all remote videos.')
148     return callback(null)
149   })
150 }
151
152 function sendOwnedVideosToPod (podId) {
153   db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
154     if (err) {
155       logger.error('Cannot get the list of videos we own.')
156       return
157     }
158
159     videosList.forEach(function (video) {
160       video.toAddRemoteJSON(function (err, remoteVideo) {
161         if (err) {
162           logger.error('Cannot convert video to remote.', { error: err })
163           // Don't break the process
164           return
165         }
166
167         const options = {
168           type: 'add',
169           endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
170           data: remoteVideo,
171           toIds: [ podId ]
172         }
173         createRequest(options)
174       })
175     })
176   })
177 }
178
179 // ---------------------------------------------------------------------------
180
181 module.exports = friends
182
183 // ---------------------------------------------------------------------------
184
185 function computeForeignPodsList (host, podsScore, callback) {
186   getForeignPodsList(host, function (err, res) {
187     if (err) return callback(err)
188
189     const foreignPodsList = res.data
190
191     // Let's give 1 point to the pod we ask the friends list
192     foreignPodsList.push({ host })
193
194     foreignPodsList.forEach(function (foreignPod) {
195       const foreignPodHost = foreignPod.host
196
197       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
198       else podsScore[foreignPodHost] = 1
199     })
200
201     return callback()
202   })
203 }
204
205 function computeWinningPods (hosts, podsScore) {
206   // Build the list of pods to add
207   // Only add a pod if it exists in more than a half base pods
208   const podsList = []
209   const baseScore = hosts.length / 2
210
211   Object.keys(podsScore).forEach(function (podHost) {
212     // If the pod is not me and with a good score we add it
213     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
214       podsList.push({ host: podHost })
215     }
216   })
217
218   return podsList
219 }
220
221 function getForeignPodsList (host, callback) {
222   const path = '/api/' + constants.API_VERSION + '/pods'
223
224   request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
225     if (err) return callback(err)
226
227     try {
228       const json = JSON.parse(body)
229       return callback(null, json)
230     } catch (err) {
231       return callback(err)
232     }
233   })
234 }
235
236 function makeRequestsToWinningPods (cert, podsList, callback) {
237   // Stop pool requests
238   db.Request.deactivate()
239   // Flush pool requests
240   db.Request.forceSend()
241
242   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
243     const params = {
244       url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
245       method: 'POST',
246       json: {
247         host: constants.CONFIG.WEBSERVER.HOST,
248         publicKey: cert
249       }
250     }
251
252     requests.makeRetryRequest(params, function (err, res, body) {
253       if (err) {
254         logger.error('Error with adding %s pod.', pod.host, { error: err })
255         // Don't break the process
256         return callbackEach()
257       }
258
259       if (res.statusCode === 200) {
260         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
261         podObj.save().asCallback(function (err, podCreated) {
262           if (err) {
263             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
264             return callbackEach()
265           }
266
267           // Add our videos to the request scheduler
268           sendOwnedVideosToPod(podCreated.id)
269
270           return callbackEach()
271         })
272       } else {
273         logger.error('Status not 200 for %s pod.', pod.host)
274         return callbackEach()
275       }
276     })
277   }, function endRequests () {
278     // Final callback, we've ended all the requests
279     // Now we made new friends, we can re activate the pool of requests
280     db.Request.activate()
281
282     logger.debug('makeRequestsToWinningPods finished.')
283     return callback()
284   })
285 }
286
287 // Wrapper that populate "toIds" argument with all our friends if it is not specified
288 // { type, endpoint, data, toIds, transaction }
289 function createRequest (options, callback) {
290   if (!callback) callback = function () {}
291   if (options.toIds) return _createRequest(options, callback)
292
293   // If the "toIds" pods is not specified, we send the request to all our friends
294   db.Pod.listAllIds(options.transaction, function (err, podIds) {
295     if (err) {
296       logger.error('Cannot get pod ids', { error: err })
297       return
298     }
299
300     const newOptions = Object.assign(options, { toIds: podIds })
301     return _createRequest(newOptions, callback)
302   })
303 }
304
305 // { type, endpoint, data, toIds, transaction }
306 function _createRequest (options, callback) {
307   const type = options.type
308   const endpoint = options.endpoint
309   const data = options.data
310   const toIds = options.toIds
311   const transaction = options.transaction
312
313   const pods = []
314
315   // If there are no destination pods abort
316   if (toIds.length === 0) return callback(null)
317
318   toIds.forEach(function (toPod) {
319     pods.push(db.Pod.build({ id: toPod }))
320   })
321
322   const createQuery = {
323     endpoint,
324     request: {
325       type: type,
326       data: data
327     }
328   }
329
330   const dbRequestOptions = {
331     transaction
332   }
333
334   return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
335     if (err) return callback(err)
336
337     return request.setPods(pods, dbRequestOptions).asCallback(callback)
338   })
339 }
340
341 function isMe (host) {
342   return host === constants.CONFIG.WEBSERVER.HOST
343 }