Server: try to have a better video integrity
[oweals/peertube.git] / server / models / request.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const waterfall = require('async/waterfall')
6 const values = require('lodash/values')
7
8 const constants = require('../initializers/constants')
9 const logger = require('../helpers/logger')
10 const requests = require('../helpers/requests')
11
12 let timer = null
13 let lastRequestTimestamp = 0
14
15 // ---------------------------------------------------------------------------
16
17 module.exports = function (sequelize, DataTypes) {
18   const Request = sequelize.define('Request',
19     {
20       request: {
21         type: DataTypes.JSON,
22         allowNull: false
23       },
24       endpoint: {
25         type: DataTypes.ENUM(values(constants.REQUEST_ENDPOINTS)),
26         allowNull: false
27       }
28     },
29     {
30       classMethods: {
31         associate,
32
33         activate,
34         countTotalRequests,
35         deactivate,
36         flush,
37         forceSend,
38         remainingMilliSeconds
39       }
40     }
41   )
42
43   return Request
44 }
45
46 // ------------------------------ STATICS ------------------------------
47
48 function associate (models) {
49   this.belongsToMany(models.Pod, {
50     foreignKey: {
51       name: 'requestId',
52       allowNull: false
53     },
54     through: models.RequestToPod,
55     onDelete: 'CASCADE'
56   })
57 }
58
59 function activate () {
60   logger.info('Requests scheduler activated.')
61   lastRequestTimestamp = Date.now()
62
63   const self = this
64   timer = setInterval(function () {
65     lastRequestTimestamp = Date.now()
66     makeRequests.call(self)
67   }, constants.REQUESTS_INTERVAL)
68 }
69
70 function countTotalRequests (callback) {
71   const query = {
72     include: [ this.sequelize.models.Pod ]
73   }
74
75   return this.count(query).asCallback(callback)
76 }
77
78 function deactivate () {
79   logger.info('Requests scheduler deactivated.')
80   clearInterval(timer)
81   timer = null
82 }
83
84 function flush (callback) {
85   removeAll.call(this, function (err) {
86     if (err) logger.error('Cannot flush the requests.', { error: err })
87
88     return callback(err)
89   })
90 }
91
92 function forceSend () {
93   logger.info('Force requests scheduler sending.')
94   makeRequests.call(this)
95 }
96
97 function remainingMilliSeconds () {
98   if (timer === null) return -1
99
100   return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp)
101 }
102
103 // ---------------------------------------------------------------------------
104
105 // Make a requests to friends of a certain type
106 function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
107   if (!callback) callback = function () {}
108
109   const params = {
110     toPod: toPod,
111     sign: true, // Prove our identity
112     method: 'POST',
113     path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
114     data: requestsToMake // Requests we need to make
115   }
116
117   // Make multiple retry requests to all of pods
118   // The function fire some useful callbacks
119   requests.makeSecureRequest(params, function (err, res) {
120     if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
121       logger.error(
122         'Error sending secure request to %s pod.',
123         toPod.host,
124         {
125           error: err ? err.message : 'Status code not 20x : ' + res.statusCode
126         }
127       )
128
129       return callback(false)
130     }
131
132     return callback(true)
133   })
134 }
135
136 // Make all the requests of the scheduler
137 function makeRequests () {
138   const self = this
139   const RequestToPod = this.sequelize.models.RequestToPod
140
141   // We limit the size of the requests (REQUESTS_LIMIT)
142   // We don't want to stuck with the same failing requests so we get a random list
143   listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) {
144     if (err) {
145       logger.error('Cannot get the list of requests.', { err: err })
146       return // Abort
147     }
148
149     // If there are no requests, abort
150     if (requests.length === 0) {
151       logger.info('No requests to make.')
152       return
153     }
154
155     logger.info('Making requests to friends.')
156
157     // We want to group requests by destinations pod and endpoint
158     const requestsToMakeGrouped = {}
159
160     requests.forEach(function (request) {
161       request.Pods.forEach(function (toPod) {
162         const hashKey = toPod.id + request.endpoint
163         if (!requestsToMakeGrouped[hashKey]) {
164           requestsToMakeGrouped[hashKey] = {
165             toPodId: toPod.id,
166             endpoint: request.endpoint,
167             ids: [], // request ids, to delete them from the DB in the future
168             datas: [] // requests data,
169           }
170         }
171
172         requestsToMakeGrouped[hashKey].ids.push(request.id)
173         requestsToMakeGrouped[hashKey].datas.push(request.request)
174       })
175     })
176
177     const goodPods = []
178     const badPods = []
179
180     eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
181       const requestToMake = requestsToMakeGrouped[hashKey]
182
183       // FIXME: SQL request inside a loop :/
184       self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
185         if (err) {
186           logger.error('Error finding pod by id.', { err: err })
187           return callbackEach()
188         }
189
190         // Maybe the pod is not our friend anymore so simply remove it
191         if (!toPod) {
192           const requestIdsToDelete = requestToMake.ids
193
194           logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
195           RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
196           return callbackEach()
197         }
198
199         makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
200           if (success === true) {
201             logger.debug('Removing requests for pod %s.', requestToMake.toPodId, { requestsIds: requestToMake.ids })
202
203             goodPods.push(requestToMake.toPodId)
204
205             // Remove the pod id of these request ids
206             RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
207           } else {
208             badPods.push(requestToMake.toPodId)
209             callbackEach()
210           }
211         })
212       })
213     }, function () {
214       // All the requests were made, we update the pods score
215       updatePodsScore.call(self, goodPods, badPods)
216       // Flush requests with no pod
217       removeWithEmptyTo.call(self, function (err) {
218         if (err) logger.error('Error when removing requests with no pods.', { error: err })
219       })
220     })
221   })
222 }
223
224 // Remove pods with a score of 0 (too many requests where they were unreachable)
225 function removeBadPods () {
226   const self = this
227
228   waterfall([
229     function findBadPods (callback) {
230       self.sequelize.models.Pod.listBadPods(function (err, pods) {
231         if (err) {
232           logger.error('Cannot find bad pods.', { error: err })
233           return callback(err)
234         }
235
236         return callback(null, pods)
237       })
238     },
239
240     function removeTheseBadPods (pods, callback) {
241       each(pods, function (pod, callbackEach) {
242         pod.destroy().asCallback(callbackEach)
243       }, function (err) {
244         return callback(err, pods.length)
245       })
246     }
247   ], function (err, numberOfPodsRemoved) {
248     if (err) {
249       logger.error('Cannot remove bad pods.', { error: err })
250     } else if (numberOfPodsRemoved) {
251       logger.info('Removed %d pods.', numberOfPodsRemoved)
252     } else {
253       logger.info('No need to remove bad pods.')
254     }
255   })
256 }
257
258 function updatePodsScore (goodPods, badPods) {
259   const self = this
260   const Pod = this.sequelize.models.Pod
261
262   logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
263
264   if (goodPods.length !== 0) {
265     Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
266       if (err) logger.error('Cannot increment scores of good pods.', { error: err })
267     })
268   }
269
270   if (badPods.length !== 0) {
271     Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
272       if (err) logger.error('Cannot decrement scores of bad pods.', { error: err })
273       removeBadPods.call(self)
274     })
275   }
276 }
277
278 function listWithLimitAndRandom (limit, callback) {
279   const self = this
280
281   self.count().asCallback(function (err, count) {
282     if (err) return callback(err)
283
284     // Optimization...
285     if (count === 0) return callback(null, [])
286
287     let start = Math.floor(Math.random() * count) - limit
288     if (start < 0) start = 0
289
290     const query = {
291       order: [
292         [ 'id', 'ASC' ]
293       ],
294       // offset: start,
295       // limit: limit,
296       include: [ this.sequelize.models.Pod ]
297     }
298
299     self.findAll(query).asCallback(callback)
300   })
301 }
302
303 function removeAll (callback) {
304   // Delete all requests
305   this.truncate({ cascade: true }).asCallback(callback)
306 }
307
308 function removeWithEmptyTo (callback) {
309   if (!callback) callback = function () {}
310
311   const query = {
312     where: {
313       id: {
314         $notIn: [
315           this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
316         ]
317       }
318     }
319   }
320
321   this.destroy(query).asCallback(callback)
322 }