Server: try to have a better video integrity
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16   addVideoToFriends,
17   updateVideoToFriends,
18   reportAbuseVideoToFriend,
19   hasFriends,
20   getMyCertificate,
21   makeFriends,
22   quitFriends,
23   removeVideoToFriends,
24   sendOwnedVideosToPod
25 }
26
27 function addVideoToFriends (videoData, transaction, callback) {
28   const options = {
29     type: 'add',
30     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
31     data: videoData,
32     transaction
33   }
34   createRequest(options, callback)
35 }
36
37 function updateVideoToFriends (videoData, transaction, callback) {
38   const options = {
39     type: 'update',
40     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
41     data: videoData,
42     transaction
43   }
44   createRequest(options, callback)
45 }
46
47 function removeVideoToFriends (videoParams) {
48   const options = {
49     type: 'remove',
50     endpoint: constants.REQUEST_ENDPOINTS.VIDEOS,
51     data: videoParams
52   }
53   createRequest(options)
54 }
55
56 function reportAbuseVideoToFriend (reportData, video) {
57   createRequest('report-abuse', constants.REQUEST_ENDPOINTS.VIDEOS, reportData, [ video.Author.podId ])
58 }
59
60 function hasFriends (callback) {
61   db.Pod.countAll(function (err, count) {
62     if (err) return callback(err)
63
64     const hasFriends = (count !== 0)
65     callback(null, hasFriends)
66   })
67 }
68
69 function getMyCertificate (callback) {
70   fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
71 }
72
73 function makeFriends (hosts, callback) {
74   const podsScore = {}
75
76   logger.info('Make friends!')
77   getMyCertificate(function (err, cert) {
78     if (err) {
79       logger.error('Cannot read public cert.')
80       return callback(err)
81     }
82
83     eachSeries(hosts, function (host, callbackEach) {
84       computeForeignPodsList(host, podsScore, callbackEach)
85     }, function (err) {
86       if (err) return callback(err)
87
88       logger.debug('Pods scores computed.', { podsScore: podsScore })
89       const podsList = computeWinningPods(hosts, podsScore)
90       logger.debug('Pods that we keep.', { podsToKeep: podsList })
91
92       makeRequestsToWinningPods(cert, podsList, callback)
93     })
94   })
95 }
96
97 function quitFriends (callback) {
98   // Stop pool requests
99   db.Request.deactivate()
100
101   waterfall([
102     function flushRequests (callbackAsync) {
103       db.Request.flush(callbackAsync)
104     },
105
106     function getPodsList (callbackAsync) {
107       return db.Pod.list(callbackAsync)
108     },
109
110     function announceIQuitMyFriends (pods, callbackAsync) {
111       const requestParams = {
112         method: 'POST',
113         path: '/api/' + constants.API_VERSION + '/pods/remove',
114         sign: true
115       }
116
117       // Announce we quit them
118       // We don't care if the request fails
119       // The other pod will exclude us automatically after a while
120       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
121         requestParams.toPod = pod
122         requests.makeSecureRequest(requestParams, callbackEach)
123       }, function (err) {
124         if (err) {
125           logger.error('Some errors while quitting friends.', { err: err })
126           // Don't stop the process
127         }
128
129         return callbackAsync(null, pods)
130       })
131     },
132
133     function removePodsFromDB (pods, callbackAsync) {
134       each(pods, function (pod, callbackEach) {
135         pod.destroy().asCallback(callbackEach)
136       }, callbackAsync)
137     }
138   ], function (err) {
139     // Don't forget to re activate the scheduler, even if there was an error
140     db.Request.activate()
141
142     if (err) return callback(err)
143
144     logger.info('Removed all remote videos.')
145     return callback(null)
146   })
147 }
148
149 function sendOwnedVideosToPod (podId) {
150   db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
151     if (err) {
152       logger.error('Cannot get the list of videos we own.')
153       return
154     }
155
156     videosList.forEach(function (video) {
157       video.toAddRemoteJSON(function (err, remoteVideo) {
158         if (err) {
159           logger.error('Cannot convert video to remote.', { error: err })
160           // Don't break the process
161           return
162         }
163
164         createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
165       })
166     })
167   })
168 }
169
170 // ---------------------------------------------------------------------------
171
172 module.exports = friends
173
174 // ---------------------------------------------------------------------------
175
176 function computeForeignPodsList (host, podsScore, callback) {
177   getForeignPodsList(host, function (err, res) {
178     if (err) return callback(err)
179
180     const foreignPodsList = res.data
181
182     // Let's give 1 point to the pod we ask the friends list
183     foreignPodsList.push({ host })
184
185     foreignPodsList.forEach(function (foreignPod) {
186       const foreignPodHost = foreignPod.host
187
188       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
189       else podsScore[foreignPodHost] = 1
190     })
191
192     callback()
193   })
194 }
195
196 function computeWinningPods (hosts, podsScore) {
197   // Build the list of pods to add
198   // Only add a pod if it exists in more than a half base pods
199   const podsList = []
200   const baseScore = hosts.length / 2
201   Object.keys(podsScore).forEach(function (podHost) {
202     // If the pod is not me and with a good score we add it
203     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
204       podsList.push({ host: podHost })
205     }
206   })
207
208   return podsList
209 }
210
211 function getForeignPodsList (host, callback) {
212   const path = '/api/' + constants.API_VERSION + '/pods'
213
214   request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
215     if (err) return callback(err)
216
217     try {
218       const json = JSON.parse(body)
219       return callback(null, json)
220     } catch (err) {
221       return callback(err)
222     }
223   })
224 }
225
226 function makeRequestsToWinningPods (cert, podsList, callback) {
227   // Stop pool requests
228   db.Request.deactivate()
229   // Flush pool requests
230   db.Request.forceSend()
231
232   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
233     const params = {
234       url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
235       method: 'POST',
236       json: {
237         host: constants.CONFIG.WEBSERVER.HOST,
238         publicKey: cert
239       }
240     }
241
242     requests.makeRetryRequest(params, function (err, res, body) {
243       if (err) {
244         logger.error('Error with adding %s pod.', pod.host, { error: err })
245         // Don't break the process
246         return callbackEach()
247       }
248
249       if (res.statusCode === 200) {
250         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
251         podObj.save().asCallback(function (err, podCreated) {
252           if (err) {
253             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
254             return callbackEach()
255           }
256
257           // Add our videos to the request scheduler
258           sendOwnedVideosToPod(podCreated.id)
259
260           return callbackEach()
261         })
262       } else {
263         logger.error('Status not 200 for %s pod.', pod.host)
264         return callbackEach()
265       }
266     })
267   }, function endRequests () {
268     // Final callback, we've ended all the requests
269     // Now we made new friends, we can re activate the pool of requests
270     db.Request.activate()
271
272     logger.debug('makeRequestsToWinningPods finished.')
273     return callback()
274   })
275 }
276
277 // Wrapper that populate "toIds" argument with all our friends if it is not specified
278 // { type, endpoint, data, toIds, transaction }
279 function createRequest (options, callback) {
280   if (!callback) callback = function () {}
281   if (options.toIds) return _createRequest(options, callback)
282
283   // If the "toIds" pods is not specified, we send the request to all our friends
284   db.Pod.listAllIds(options.transaction, function (err, podIds) {
285     if (err) {
286       logger.error('Cannot get pod ids', { error: err })
287       return
288     }
289
290     const newOptions = Object.assign(options, { toIds: podIds })
291     return _createRequest(newOptions, callback)
292   })
293 }
294
295 // { type, endpoint, data, toIds, transaction }
296 function _createRequest (options, callback) {
297   const type = options.type
298   const endpoint = options.endpoint
299   const data = options.data
300   const toIds = options.toIds
301   const transaction = options.transaction
302
303   const pods = []
304
305   // If there are no destination pods abort
306   if (toIds.length === 0) return callback(null)
307
308   toIds.forEach(function (toPod) {
309     pods.push(db.Pod.build({ id: toPod }))
310   })
311
312   const createQuery = {
313     endpoint,
314     request: {
315       type: type,
316       data: data
317     }
318   }
319
320   const dbRequestOptions = {
321     transaction
322   }
323
324   return db.Request.create(createQuery, dbRequestOptions).asCallback(function (err, request) {
325     if (err) return callback(err)
326
327     return request.setPods(pods, dbRequestOptions).asCallback(callback)
328   })
329 }
330
331 function isMe (host) {
332   return host === constants.CONFIG.WEBSERVER.HOST
333 }