Merge branch 'postgresql'
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16   addVideoToFriends,
17   updateVideoToFriends,
18   reportAbuseVideoToFriend,
19   hasFriends,
20   getMyCertificate,
21   makeFriends,
22   quitFriends,
23   removeVideoToFriends,
24   sendOwnedVideosToPod
25 }
26
27 function addVideoToFriends (videoData, transaction, callback) {
28   const options = {
29     type: 'add',
30     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
31     data: videoData,
32     transaction
33   }
34   createRequest(options, callback)
35 }
36
37 function updateVideoToFriends (videoData, transaction, callback) {
38   const options = {
39     type: 'update',
40     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
41     data: videoData,
42     transaction
43   }
44   createRequest(options, callback)
45 }
46
47 function removeVideoToFriends (videoParams) {
48   const options = {
49     type: 'remove',
50     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
51     data: videoParams
52   }
53   createRequest(options)
54 }
55
56 function reportAbuseVideoToFriend (reportData, video) {
57   const options = {
58     type: 'report-abuse',
59     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
60     data: reportData,
61     toIds: [ video.Author.podId ]
62   }
63   createRequest(options)
64 }
65
66 function hasFriends (callback) {
67   db.Pod.countAll(function (err, count) {
68     if (err) return callback(err)
69
70     const hasFriends = (count !== 0)
71     callback(null, hasFriends)
72   })
73 }
74
75 function getMyCertificate (callback) {
76   fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
77 }
78
79 function makeFriends (hosts, callback) {
80   const podsScore = {}
81
82   logger.info('Make friends!')
83   getMyCertificate(function (err, cert) {
84     if (err) {
85       logger.error('Cannot read public cert.')
86       return callback(err)
87     }
88
89     eachSeries(hosts, function (host, callbackEach) {
90       computeForeignPodsList(host, podsScore, callbackEach)
91     }, function (err) {
92       if (err) return callback(err)
93
94       logger.debug('Pods scores computed.', { podsScore: podsScore })
95       const podsList = computeWinningPods(hosts, podsScore)
96       logger.debug('Pods that we keep.', { podsToKeep: podsList })
97
98       makeRequestsToWinningPods(cert, podsList, callback)
99     })
100   })
101 }
102
103 function quitFriends (callback) {
104   // Stop pool requests
105   db.Request.deactivate()
106
107   waterfall([
108     function flushRequests (callbackAsync) {
109       db.Request.flush(callbackAsync)
110     },
111
112     function getPodsList (callbackAsync) {
113       return db.Pod.list(callbackAsync)
114     },
115
116     function announceIQuitMyFriends (pods, callbackAsync) {
117       const requestParams = {
118         method: 'POST',
119         path: '/api/' + constants.API_VERSION + '/pods/remove',
120         sign: true
121       }
122
123       // Announce we quit them
124       // We don't care if the request fails
125       // The other pod will exclude us automatically after a while
126       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
127         requestParams.toPod = pod
128         requests.makeSecureRequest(requestParams, callbackEach)
129       }, function (err) {
130         if (err) {
131           logger.error('Some errors while quitting friends.', { err: err })
132           // Don't stop the process
133         }
134
135         return callbackAsync(null, pods)
136       })
137     },
138
139     function removePodsFromDB (pods, callbackAsync) {
140       each(pods, function (pod, callbackEach) {
141         pod.destroy().asCallback(callbackEach)
142       }, callbackAsync)
143     }
144   ], function (err) {
145     // Don't forget to re activate the scheduler, even if there was an error
146     db.Request.activate()
147
148     if (err) return callback(err)
149
150     logger.info('Removed all remote videos.')
151     return callback(null)
152   })
153 }
154
155 function sendOwnedVideosToPod (podId) {
156   db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
157     if (err) {
158       logger.error('Cannot get the list of videos we own.')
159       return
160     }
161
162     videosList.forEach(function (video) {
163       video.toAddRemoteJSON(function (err, remoteVideo) {
164         if (err) {
165           logger.error('Cannot convert video to remote.', { error: err })
166           // Don't break the process
167           return
168         }
169
170         const options = {
171           type: 'add',
172           endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
173           data: remoteVideo,
174           toIds: [ podId ]
175         }
176         createRequest(options)
177       })
178     })
179   })
180 }
181
182 // ---------------------------------------------------------------------------
183
184 module.exports = friends
185
186 // ---------------------------------------------------------------------------
187
188 function computeForeignPodsList (host, podsScore, callback) {
189   getForeignPodsList(host, function (err, res) {
190     if (err) return callback(err)
191
192     const foreignPodsList = res.data
193
194     // Let's give 1 point to the pod we ask the friends list
195     foreignPodsList.push({ host })
196
197     foreignPodsList.forEach(function (foreignPod) {
198       const foreignPodHost = foreignPod.host
199
200       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
201       else podsScore[foreignPodHost] = 1
202     })
203
204     callback()
205   })
206 }
207
208 function computeWinningPods (hosts, podsScore) {
209   // Build the list of pods to add
210   // Only add a pod if it exists in more than a half base pods
211   const podsList = []
212   const baseScore = hosts.length / 2
213   Object.keys(podsScore).forEach(function (podHost) {
214     // If the pod is not me and with a good score we add it
215     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
216       podsList.push({ host: podHost })
217     }
218   })
219
220   return podsList
221 }
222
223 function getForeignPodsList (host, callback) {
224   const path = '/api/' + constants.API_VERSION + '/pods'
225
226   request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
227     if (err) return callback(err)
228
229     try {
230       const json = JSON.parse(body)
231       return callback(null, json)
232     } catch (err) {
233       return callback(err)
234     }
235   })
236 }
237
238 function makeRequestsToWinningPods (cert, podsList, callback) {
239   // Stop pool requests
240   db.Request.deactivate()
241   // Flush pool requests
242   db.Request.forceSend()
243
244   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
245     const params = {
246       url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
247       method: 'POST',
248       json: {
249         host: constants.CONFIG.WEBSERVER.HOST,
250         publicKey: cert
251       }
252     }
253
254     requests.makeRetryRequest(params, function (err, res, body) {
255       if (err) {
256         logger.error('Error with adding %s pod.', pod.host, { error: err })
257         // Don't break the process
258         return callbackEach()
259       }
260
261       if (res.statusCode === 200) {
262         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
263         podObj.save().asCallback(function (err, podCreated) {
264           if (err) {
265             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
266             return callbackEach()
267           }
268
269           // Add our videos to the request scheduler
270           sendOwnedVideosToPod(podCreated.id)
271
272           return callbackEach()
273         })
274       } else {
275         logger.error('Status not 200 for %s pod.', pod.host)
276         return callbackEach()
277       }
278     })
279   }, function endRequests () {
280     // Final callback, we've ended all the requests
281     // Now we made new friends, we can re activate the pool of requests
282     db.Request.activate()
283
284     logger.debug('makeRequestsToWinningPods finished.')
285     return callback()
286   })
287 }
288
289 // Wrapper that populate "toIds" argument with all our friends if it is not specified
290 // { type, endpoint, data, toIds, transaction }
291 function createRequest (options, callback) {
292   if (!callback) callback = function () {}
293   if (options.toIds) return _createRequest(options, callback)
294
295   // If the "toIds" pods is not specified, we send the request to all our friends
296   db.Pod.listAllIds(options.transaction, function (err, podIds) {
297     if (err) {
298       logger.error('Cannot get pod ids', { error: err })
299       return
300     }
301
302     const newOptions = Object.assign(options, { toIds: podIds })
303     return _createRequest(newOptions, callback)
304   })
305 }
306
307 // { type, endpoint, data, toIds, transaction }
308 function _createRequest (options, callback) {
309   const type = options.type
310   const endpoint = options.endpoint
311   const data = options.data
312   const toIds = options.toIds
313   const transaction = options.transaction
314
315   const pods = []
316
317   // If there are no destination pods abort
318   if (toIds.length === 0) return callback(null)
319
320   toIds.forEach(function (toPod) {
321     pods.push(db.Pod.build({ id: toPod }))
322   })
323
324   const createQuery = {
325     endpoint,
326     request: {
327       type: type,
328       data: data
329     }
330   }
331
332   const dbRequestOptions = {
333     transaction
334   }
335
336   return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
337     if (err) return callback(err)
338
339     return request.setPods(pods, dbRequestOptions).asCallback(callback)
340   })
341 }
342
343 function isMe (host) {
344   return host === constants.CONFIG.WEBSERVER.HOST
345 }