53f47d62974325e0318058d0158549fbcbeb6e2c
[oweals/peertube.git] / lib / poolRequests.js
1 ;(function () {
2   'use strict'
3
4   var async = require('async')
5   var pluck = require('lodash-node/compat/collection/pluck')
6
7   var constants = require('../initializers/constants')
8   var database = require('../initializers/database')
9   var logger = require('../helpers/logger')
10   var PodsDB = database.PodsDB
11   var PoolRequestsDB = database.PoolRequestsDB
12   var utils = require('../helpers/utils')
13   var VideosDB = database.VideosDB
14
15   var timer = null
16
17   var poolRequests = {
18     activate: activate,
19     addToPoolRequests: addToPoolRequests,
20     deactivate: deactivate,
21     forceSend: forceSend
22   }
23
24   function deactivate () {
25     logger.info('Pool requests deactivated.')
26     clearInterval(timer)
27   }
28
29   function forceSend () {
30     logger.info('Force pool requests sending.')
31     makePoolRequests()
32   }
33
34   function activate () {
35     logger.info('Pool requests activated.')
36     timer = setInterval(makePoolRequests, constants.INTERVAL)
37   }
38
39   function addToPoolRequests (id, type, request) {
40     logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
41
42     PoolRequestsDB.findOne({ id: id }, function (err, entity) {
43       if (err) logger.error(err)
44
45       if (entity) {
46         if (entity.type === type) {
47           logger.error(new Error('Cannot insert two same requests.'))
48           return
49         }
50
51         // Remove the request of the other type
52         PoolRequestsDB.remove({ id: id }, function (err) {
53           if (err) logger.error(err)
54         })
55       } else {
56         PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
57           if (err) logger.error(err)
58         })
59       }
60     })
61   }
62
63   // ---------------------------------------------------------------------------
64
65   module.exports = poolRequests
66
67   // ---------------------------------------------------------------------------
68
69   function makePoolRequest (type, requests, callback) {
70     if (!callback) callback = function () {}
71
72     PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
73       if (err) throw err
74
75       var params = {
76         encrypt: true,
77         sign: true,
78         method: 'POST',
79         path: null,
80         data: requests
81       }
82
83       if (type === 'add') {
84         params.path = '/api/' + constants.API_VERSION + '/remotevideos/add'
85       } else if (type === 'remove') {
86         params.path = '/api/' + constants.API_VERSION + '/remotevideos/remove'
87       } else {
88         throw new Error('Unkown pool request type.')
89       }
90
91       var bad_pods = []
92       var good_pods = []
93
94       utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
95
96       function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
97         if (err || (response.statusCode !== 200 && response.statusCode !== 204)) {
98           bad_pods.push(pod._id)
99           logger.error('Error sending secure request to %s pod.', url, { error: err || new Error('Status code not 20x') })
100         } else {
101           good_pods.push(pod._id)
102         }
103
104         return callback_each_pod_finished()
105       }
106
107       function callbackAllPodsFinished (err) {
108         if (err) return callback(err)
109
110         updatePodsScore(good_pods, bad_pods)
111         callback(null)
112       }
113     })
114   }
115
116   function makePoolRequests () {
117     logger.info('Making pool requests to friends.')
118
119     PoolRequestsDB.find({}, { _id: 1, type: 1, request: 1 }, function (err, pool_requests) {
120       if (err) throw err
121
122       if (pool_requests.length === 0) return
123
124       var requests = {
125         add: {
126           ids: [],
127           requests: []
128         },
129         remove: {
130           ids: [],
131           requests: []
132         }
133       }
134
135       async.each(pool_requests, function (pool_request, callback_each) {
136         if (pool_request.type === 'add') {
137           requests.add.requests.push(pool_request.request)
138           requests.add.ids.push(pool_request._id)
139         } else if (pool_request.type === 'remove') {
140           requests.remove.requests.push(pool_request.request)
141           requests.remove.ids.push(pool_request._id)
142         } else {
143           throw new Error('Unkown pool request type.')
144         }
145
146         callback_each()
147       }, function () {
148         // Send the add requests
149         if (requests.add.requests.length !== 0) {
150           makePoolRequest('add', requests.add.requests, function (err) {
151             if (err) logger.error('Errors when sent add pool requests.', { error: err })
152
153             removePoolRequestsFromDB(requests.add.ids)
154           })
155         }
156
157         // Send the remove requests
158         if (requests.remove.requests.length !== 0) {
159           makePoolRequest('remove', requests.remove.requests, function (err) {
160             if (err) logger.error('Errors when sent remove pool requests.', { error: err })
161
162             removePoolRequestsFromDB(requests.remove.ids)
163           })
164         }
165       })
166     })
167   }
168
169   function removeBadPods () {
170     PodsDB.find({ score: 0 }, { _id: 1, url: 1 }, function (err, pods) {
171       if (err) throw err
172
173       if (pods.length === 0) return
174
175       var urls = pluck(pods, 'url')
176       var ids = pluck(pods, '_id')
177
178       VideosDB.remove({ podUrl: { $in: urls } }, function (err, r) {
179         if (err) logger.error('Cannot remove videos from a pod that we removing.', { error: err })
180         var videos_removed = r.result.n
181         logger.info('Removed %d videos.', videos_removed)
182
183         PodsDB.remove({ _id: { $in: ids } }, function (err, r) {
184           if (err) logger.error('Cannot remove bad pods.', { error: err })
185
186           var pods_removed = r.result.n
187           logger.info('Removed %d pods.', pods_removed)
188         })
189       })
190     })
191   }
192
193   function removePoolRequestsFromDB (ids) {
194     PoolRequestsDB.remove({ _id: { $in: ids } }, function (err) {
195       if (err) {
196         logger.error('Cannot remove requests from the pool requests database.', { error: err })
197         return
198       }
199
200       logger.info('Pool requests flushed.')
201     })
202   }
203
204   function updatePodsScore (good_pods, bad_pods) {
205     logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
206
207     PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: constants.PODS_SCORE.BONUS } }, { multi: true }).exec()
208     PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: constants.PODS_SCORE.MALUS } }, { multi: true }, function (err) {
209       if (err) throw err
210       removeBadPods()
211     })
212   }
213 })()