First version with PostgreSQL
[oweals/peertube.git] / server / models / request.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const waterfall = require('async/waterfall')
6
7 const constants = require('../initializers/constants')
8 const logger = require('../helpers/logger')
9 const requests = require('../helpers/requests')
10
11 let timer = null
12 let lastRequestTimestamp = 0
13
14 // ---------------------------------------------------------------------------
15
16 module.exports = function (sequelize, DataTypes) {
17   const Request = sequelize.define('Request',
18     {
19       request: {
20         type: DataTypes.JSON
21       },
22       endpoint: {
23         // TODO: enum?
24         type: DataTypes.STRING
25       }
26     },
27     {
28       classMethods: {
29         associate,
30
31         activate,
32         countTotalRequests,
33         deactivate,
34         flush,
35         forceSend,
36         remainingMilliSeconds
37       }
38     }
39   )
40
41   return Request
42 }
43
44 // ------------------------------ STATICS ------------------------------
45
46 function associate (models) {
47   this.belongsToMany(models.Pod, {
48     foreignKey: {
49       name: 'requestId',
50       allowNull: false
51     },
52     through: models.RequestToPod,
53     onDelete: 'CASCADE'
54   })
55 }
56
57 function activate () {
58   logger.info('Requests scheduler activated.')
59   lastRequestTimestamp = Date.now()
60
61   const self = this
62   timer = setInterval(function () {
63     lastRequestTimestamp = Date.now()
64     makeRequests.call(self)
65   }, constants.REQUESTS_INTERVAL)
66 }
67
68 function countTotalRequests (callback) {
69   const query = {
70     include: [ this.sequelize.models.Pod ]
71   }
72
73   return this.count(query).asCallback(callback)
74 }
75
76 function deactivate () {
77   logger.info('Requests scheduler deactivated.')
78   clearInterval(timer)
79   timer = null
80 }
81
82 function flush () {
83   removeAll.call(this, function (err) {
84     if (err) logger.error('Cannot flush the requests.', { error: err })
85   })
86 }
87
88 function forceSend () {
89   logger.info('Force requests scheduler sending.')
90   makeRequests.call(this)
91 }
92
93 function remainingMilliSeconds () {
94   if (timer === null) return -1
95
96   return constants.REQUESTS_INTERVAL - (Date.now() - lastRequestTimestamp)
97 }
98
99 // ---------------------------------------------------------------------------
100
101 // Make a requests to friends of a certain type
102 function makeRequest (toPod, requestEndpoint, requestsToMake, callback) {
103   if (!callback) callback = function () {}
104
105   const params = {
106     toPod: toPod,
107     sign: true, // Prove our identity
108     method: 'POST',
109     path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint,
110     data: requestsToMake // Requests we need to make
111   }
112
113   // Make multiple retry requests to all of pods
114   // The function fire some useful callbacks
115   requests.makeSecureRequest(params, function (err, res) {
116     if (err || (res.statusCode !== 200 && res.statusCode !== 201 && res.statusCode !== 204)) {
117       logger.error(
118         'Error sending secure request to %s pod.',
119         toPod.host,
120         {
121           error: err || new Error('Status code not 20x : ' + res.statusCode)
122         }
123       )
124
125       return callback(false)
126     }
127
128     return callback(true)
129   })
130 }
131
132 // Make all the requests of the scheduler
133 function makeRequests () {
134   const self = this
135   const RequestToPod = this.sequelize.models.RequestToPod
136
137   // We limit the size of the requests (REQUESTS_LIMIT)
138   // We don't want to stuck with the same failing requests so we get a random list
139   listWithLimitAndRandom.call(self, constants.REQUESTS_LIMIT, function (err, requests) {
140     if (err) {
141       logger.error('Cannot get the list of requests.', { err: err })
142       return // Abort
143     }
144
145     // If there are no requests, abort
146     if (requests.length === 0) {
147       logger.info('No requests to make.')
148       return
149     }
150
151     logger.info('Making requests to friends.')
152
153     // We want to group requests by destinations pod and endpoint
154     const requestsToMakeGrouped = {}
155
156     requests.forEach(function (request) {
157       request.Pods.forEach(function (toPod) {
158         const hashKey = toPod.id + request.endpoint
159         if (!requestsToMakeGrouped[hashKey]) {
160           requestsToMakeGrouped[hashKey] = {
161             toPodId: toPod.id,
162             endpoint: request.endpoint,
163             ids: [], // request ids, to delete them from the DB in the future
164             datas: [] // requests data,
165           }
166         }
167
168         requestsToMakeGrouped[hashKey].ids.push(request.id)
169         requestsToMakeGrouped[hashKey].datas.push(request.request)
170       })
171     })
172
173     const goodPods = []
174     const badPods = []
175
176     eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) {
177       const requestToMake = requestsToMakeGrouped[hashKey]
178
179       // FIXME: SQL request inside a loop :/
180       self.sequelize.models.Pod.load(requestToMake.toPodId, function (err, toPod) {
181         if (err) {
182           logger.error('Error finding pod by id.', { err: err })
183           return callbackEach()
184         }
185
186         // Maybe the pod is not our friend anymore so simply remove it
187         if (!toPod) {
188           const requestIdsToDelete = requestToMake.ids
189
190           logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId)
191           RequestToPod.removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId)
192           return callbackEach()
193         }
194
195         makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) {
196           if (success === true) {
197             logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids })
198
199             goodPods.push(requestToMake.toPodId)
200
201             // Remove the pod id of these request ids
202             RequestToPod.removePodOf(requestToMake.ids, requestToMake.toPodId, callbackEach)
203           } else {
204             badPods.push(requestToMake.toPodId)
205             callbackEach()
206           }
207         })
208       })
209     }, function () {
210       // All the requests were made, we update the pods score
211       updatePodsScore.call(self, goodPods, badPods)
212       // Flush requests with no pod
213       removeWithEmptyTo.call(self, function (err) {
214         if (err) logger.error('Error when removing requests with no pods.', { error: err })
215       })
216     })
217   })
218 }
219
220 // Remove pods with a score of 0 (too many requests where they were unreachable)
221 function removeBadPods () {
222   const self = this
223
224   waterfall([
225     function findBadPods (callback) {
226       self.sequelize.models.Pod.listBadPods(function (err, pods) {
227         if (err) {
228           logger.error('Cannot find bad pods.', { error: err })
229           return callback(err)
230         }
231
232         return callback(null, pods)
233       })
234     },
235
236     function removeTheseBadPods (pods, callback) {
237       each(pods, function (pod, callbackEach) {
238         pod.destroy().asCallback(callbackEach)
239       }, function (err) {
240         return callback(err, pods.length)
241       })
242     }
243   ], function (err, numberOfPodsRemoved) {
244     if (err) {
245       logger.error('Cannot remove bad pods.', { error: err })
246     } else if (numberOfPodsRemoved) {
247       logger.info('Removed %d pods.', numberOfPodsRemoved)
248     } else {
249       logger.info('No need to remove bad pods.')
250     }
251   })
252 }
253
254 function updatePodsScore (goodPods, badPods) {
255   const self = this
256   const Pod = this.sequelize.models.Pod
257
258   logger.info('Updating %d good pods and %d bad pods scores.', goodPods.length, badPods.length)
259
260   if (goodPods.length !== 0) {
261     Pod.incrementScores(goodPods, constants.PODS_SCORE.BONUS, function (err) {
262       if (err) logger.error('Cannot increment scores of good pods.')
263     })
264   }
265
266   if (badPods.length !== 0) {
267     Pod.incrementScores(badPods, constants.PODS_SCORE.MALUS, function (err) {
268       if (err) logger.error('Cannot decrement scores of bad pods.')
269       removeBadPods.call(self)
270     })
271   }
272 }
273
274 function listWithLimitAndRandom (limit, callback) {
275   const self = this
276
277   self.count().asCallback(function (err, count) {
278     if (err) return callback(err)
279
280     // Optimization...
281     if (count === 0) return callback(null, [])
282
283     let start = Math.floor(Math.random() * count) - limit
284     if (start < 0) start = 0
285
286     const query = {
287       order: [
288         [ 'id', 'ASC' ]
289       ],
290       offset: start,
291       limit: limit,
292       include: [ this.sequelize.models.Pod ]
293     }
294
295     self.findAll(query).asCallback(callback)
296   })
297 }
298
299 function removeAll (callback) {
300   // Delete all requests
301   this.destroy({ truncate: true }).asCallback(callback)
302 }
303
304 function removeWithEmptyTo (callback) {
305   if (!callback) callback = function () {}
306
307   const query = {
308     where: {
309       id: {
310         $notIn: [
311           this.sequelize.literal('SELECT "requestId" FROM "RequestToPods"')
312         ]
313       }
314     }
315   }
316
317   this.destroy(query).asCallback(callback)
318 }