Fix request schedulers stats
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const request = require('request')
7 const waterfall = require('async/waterfall')
8
9 const constants = require('../initializers/constants')
10 const db = require('../initializers/database')
11 const logger = require('../helpers/logger')
12 const peertubeCrypto = require('../helpers/peertube-crypto')
13 const requests = require('../helpers/requests')
14 const utils = require('../helpers/utils')
15 const RequestScheduler = require('./request-scheduler')
16 const RequestVideoQaduScheduler = require('./request-video-qadu-scheduler')
17 const RequestVideoEventScheduler = require('./request-video-event-scheduler')
18
19 const ENDPOINT_ACTIONS = constants.REQUEST_ENDPOINT_ACTIONS[constants.REQUEST_ENDPOINTS.VIDEOS]
20
21 const requestScheduler = new RequestScheduler()
22 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
23 const requestVideoEventScheduler = new RequestVideoEventScheduler()
24
25 const friends = {
26   activate,
27   addVideoToFriends,
28   updateVideoToFriends,
29   reportAbuseVideoToFriend,
30   quickAndDirtyUpdateVideoToFriends,
31   addEventToRemoteVideo,
32   hasFriends,
33   makeFriends,
34   quitFriends,
35   removeVideoToFriends,
36   sendOwnedVideosToPod,
37   getRequestScheduler,
38   getRequestVideoQaduScheduler,
39   getRequestVideoEventScheduler
40 }
41
42 function activate () {
43   requestScheduler.activate()
44   requestVideoQaduScheduler.activate()
45   requestVideoEventScheduler.activate()
46 }
47
48 function addVideoToFriends (videoData, transaction, callback) {
49   const options = {
50     type: ENDPOINT_ACTIONS.ADD,
51     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
52     data: videoData,
53     transaction
54   }
55   createRequest(options, callback)
56 }
57
58 function updateVideoToFriends (videoData, transaction, callback) {
59   const options = {
60     type: ENDPOINT_ACTIONS.UPDATE,
61     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
62     data: videoData,
63     transaction
64   }
65   createRequest(options, callback)
66 }
67
68 function removeVideoToFriends (videoParams) {
69   const options = {
70     type: ENDPOINT_ACTIONS.REMOVE,
71     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
72     data: videoParams
73   }
74   createRequest(options)
75 }
76
77 function reportAbuseVideoToFriend (reportData, video) {
78   const options = {
79     type: ENDPOINT_ACTIONS.REPORT_ABUSE,
80     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
81     data: reportData,
82     toIds: [ video.Author.podId ]
83   }
84   createRequest(options)
85 }
86
87 function quickAndDirtyUpdateVideoToFriends (videoId, type, transaction, callback) {
88   const options = {
89     videoId,
90     type,
91     transaction
92   }
93   return createVideoQaduRequest(options, callback)
94 }
95
96 function addEventToRemoteVideo (videoId, type, transaction, callback) {
97   const options = {
98     videoId,
99     type,
100     transaction
101   }
102   createVideoEventRequest(options, callback)
103 }
104
105 function hasFriends (callback) {
106   db.Pod.countAll(function (err, count) {
107     if (err) return callback(err)
108
109     const hasFriends = (count !== 0)
110     callback(null, hasFriends)
111   })
112 }
113
114 function makeFriends (hosts, callback) {
115   const podsScore = {}
116
117   logger.info('Make friends!')
118   peertubeCrypto.getMyPublicCert(function (err, cert) {
119     if (err) {
120       logger.error('Cannot read public cert.')
121       return callback(err)
122     }
123
124     eachSeries(hosts, function (host, callbackEach) {
125       computeForeignPodsList(host, podsScore, callbackEach)
126     }, function (err) {
127       if (err) return callback(err)
128
129       logger.debug('Pods scores computed.', { podsScore: podsScore })
130       const podsList = computeWinningPods(hosts, podsScore)
131       logger.debug('Pods that we keep.', { podsToKeep: podsList })
132
133       makeRequestsToWinningPods(cert, podsList, callback)
134     })
135   })
136 }
137
138 function quitFriends (callback) {
139   // Stop pool requests
140   requestScheduler.deactivate()
141
142   waterfall([
143     function flushRequests (callbackAsync) {
144       requestScheduler.flush(err => callbackAsync(err))
145     },
146
147     function flushVideoQaduRequests (callbackAsync) {
148       requestVideoQaduScheduler.flush(err => callbackAsync(err))
149     },
150
151     function getPodsList (callbackAsync) {
152       return db.Pod.list(callbackAsync)
153     },
154
155     function announceIQuitMyFriends (pods, callbackAsync) {
156       const requestParams = {
157         method: 'POST',
158         path: '/api/' + constants.API_VERSION + '/pods/remove',
159         sign: true
160       }
161
162       // Announce we quit them
163       // We don't care if the request fails
164       // The other pod will exclude us automatically after a while
165       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
166         requestParams.toPod = pod
167         requests.makeSecureRequest(requestParams, callbackEach)
168       }, function (err) {
169         if (err) {
170           logger.error('Some errors while quitting friends.', { err: err })
171           // Don't stop the process
172         }
173
174         return callbackAsync(null, pods)
175       })
176     },
177
178     function removePodsFromDB (pods, callbackAsync) {
179       each(pods, function (pod, callbackEach) {
180         pod.destroy().asCallback(callbackEach)
181       }, callbackAsync)
182     }
183   ], function (err) {
184     // Don't forget to re activate the scheduler, even if there was an error
185     requestScheduler.activate()
186
187     if (err) return callback(err)
188
189     logger.info('Removed all remote videos.')
190     return callback(null)
191   })
192 }
193
194 function sendOwnedVideosToPod (podId) {
195   db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
196     if (err) {
197       logger.error('Cannot get the list of videos we own.')
198       return
199     }
200
201     videosList.forEach(function (video) {
202       video.toAddRemoteJSON(function (err, remoteVideo) {
203         if (err) {
204           logger.error('Cannot convert video to remote.', { error: err })
205           // Don't break the process
206           return
207         }
208
209         const options = {
210           type: 'add',
211           endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
212           data: remoteVideo,
213           toIds: [ podId ]
214         }
215         createRequest(options)
216       })
217     })
218   })
219 }
220
221 function getRequestScheduler () {
222   return requestScheduler
223 }
224
225 function getRequestVideoQaduScheduler () {
226   return requestVideoQaduScheduler
227 }
228
229 function getRequestVideoEventScheduler () {
230   return requestVideoEventScheduler
231 }
232
233 // ---------------------------------------------------------------------------
234
235 module.exports = friends
236
237 // ---------------------------------------------------------------------------
238
239 function computeForeignPodsList (host, podsScore, callback) {
240   getForeignPodsList(host, function (err, res) {
241     if (err) return callback(err)
242
243     const foreignPodsList = res.data
244
245     // Let's give 1 point to the pod we ask the friends list
246     foreignPodsList.push({ host })
247
248     foreignPodsList.forEach(function (foreignPod) {
249       const foreignPodHost = foreignPod.host
250
251       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
252       else podsScore[foreignPodHost] = 1
253     })
254
255     return callback()
256   })
257 }
258
259 function computeWinningPods (hosts, podsScore) {
260   // Build the list of pods to add
261   // Only add a pod if it exists in more than a half base pods
262   const podsList = []
263   const baseScore = hosts.length / 2
264
265   Object.keys(podsScore).forEach(function (podHost) {
266     // If the pod is not me and with a good score we add it
267     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
268       podsList.push({ host: podHost })
269     }
270   })
271
272   return podsList
273 }
274
275 function getForeignPodsList (host, callback) {
276   const path = '/api/' + constants.API_VERSION + '/pods'
277
278   request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
279     if (err) return callback(err)
280
281     try {
282       const json = JSON.parse(body)
283       return callback(null, json)
284     } catch (err) {
285       return callback(err)
286     }
287   })
288 }
289
290 function makeRequestsToWinningPods (cert, podsList, callback) {
291   // Stop pool requests
292   requestScheduler.deactivate()
293   // Flush pool requests
294   requestScheduler.forceSend()
295
296   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
297     const params = {
298       url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
299       method: 'POST',
300       json: {
301         host: constants.CONFIG.WEBSERVER.HOST,
302         email: constants.CONFIG.ADMIN.EMAIL,
303         publicKey: cert
304       }
305     }
306
307     requests.makeRetryRequest(params, function (err, res, body) {
308       if (err) {
309         logger.error('Error with adding %s pod.', pod.host, { error: err })
310         // Don't break the process
311         return callbackEach()
312       }
313
314       if (res.statusCode === 200) {
315         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
316         podObj.save().asCallback(function (err, podCreated) {
317           if (err) {
318             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
319             return callbackEach()
320           }
321
322           // Add our videos to the request scheduler
323           sendOwnedVideosToPod(podCreated.id)
324
325           return callbackEach()
326         })
327       } else {
328         logger.error('Status not 200 for %s pod.', pod.host)
329         return callbackEach()
330       }
331     })
332   }, function endRequests () {
333     // Final callback, we've ended all the requests
334     // Now we made new friends, we can re activate the pool of requests
335     requestScheduler.activate()
336
337     logger.debug('makeRequestsToWinningPods finished.')
338     return callback()
339   })
340 }
341
342 // Wrapper that populate "toIds" argument with all our friends if it is not specified
343 // { type, endpoint, data, toIds, transaction }
344 function createRequest (options, callback) {
345   if (!callback) callback = function () {}
346   if (options.toIds) return requestScheduler.createRequest(options, callback)
347
348   // If the "toIds" pods is not specified, we send the request to all our friends
349   db.Pod.listAllIds(options.transaction, function (err, podIds) {
350     if (err) {
351       logger.error('Cannot get pod ids', { error: err })
352       return
353     }
354
355     const newOptions = Object.assign(options, { toIds: podIds })
356     return requestScheduler.createRequest(newOptions, callback)
357   })
358 }
359
360 function createVideoQaduRequest (options, callback) {
361   if (!callback) callback = utils.createEmptyCallback()
362
363   requestVideoQaduScheduler.createRequest(options, callback)
364 }
365
366 function createVideoEventRequest (options, callback) {
367   if (!callback) callback = utils.createEmptyCallback()
368
369   requestVideoEventScheduler.createRequest(options, callback)
370 }
371
372 function isMe (host) {
373   return host === constants.CONFIG.WEBSERVER.HOST
374 }