First version with PostgreSQL
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16   addVideoToFriends,
17   hasFriends,
18   getMyCertificate,
19   makeFriends,
20   quitFriends,
21   removeVideoToFriends,
22   sendOwnedVideosToPod
23 }
24
25 function addVideoToFriends (video) {
26   createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
27 }
28
29 function hasFriends (callback) {
30   db.Pod.countAll(function (err, count) {
31     if (err) return callback(err)
32
33     const hasFriends = (count !== 0)
34     callback(null, hasFriends)
35   })
36 }
37
38 function getMyCertificate (callback) {
39   fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
40 }
41
42 function makeFriends (hosts, callback) {
43   const podsScore = {}
44
45   logger.info('Make friends!')
46   getMyCertificate(function (err, cert) {
47     if (err) {
48       logger.error('Cannot read public cert.')
49       return callback(err)
50     }
51
52     eachSeries(hosts, function (host, callbackEach) {
53       computeForeignPodsList(host, podsScore, callbackEach)
54     }, function (err) {
55       if (err) return callback(err)
56
57       logger.debug('Pods scores computed.', { podsScore: podsScore })
58       const podsList = computeWinningPods(hosts, podsScore)
59       logger.debug('Pods that we keep.', { podsToKeep: podsList })
60
61       makeRequestsToWinningPods(cert, podsList, callback)
62     })
63   })
64 }
65
66 function quitFriends (callback) {
67   // Stop pool requests
68   db.Request.deactivate()
69   // Flush pool requests
70   db.Request.flush()
71
72   waterfall([
73     function getPodsList (callbackAsync) {
74       return db.Pod.list(callbackAsync)
75     },
76
77     function announceIQuitMyFriends (pods, callbackAsync) {
78       const requestParams = {
79         method: 'POST',
80         path: '/api/' + constants.API_VERSION + '/pods/remove',
81         sign: true
82       }
83
84       // Announce we quit them
85       // We don't care if the request fails
86       // The other pod will exclude us automatically after a while
87       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
88         requestParams.toPod = pod
89         requests.makeSecureRequest(requestParams, callbackEach)
90       }, function (err) {
91         if (err) {
92           logger.error('Some errors while quitting friends.', { err: err })
93           // Don't stop the process
94         }
95
96         return callbackAsync(null, pods)
97       })
98     },
99
100     function removePodsFromDB (pods, callbackAsync) {
101       each(pods, function (pod, callbackEach) {
102         pod.destroy().asCallback(callbackEach)
103       }, callbackAsync)
104     }
105   ], function (err) {
106     // Don't forget to re activate the scheduler, even if there was an error
107     db.Request.activate()
108
109     if (err) return callback(err)
110
111     logger.info('Removed all remote videos.')
112     return callback(null)
113   })
114 }
115
116 function removeVideoToFriends (videoParams) {
117   createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
118 }
119
120 function sendOwnedVideosToPod (podId) {
121   db.Video.listOwnedAndPopulateAuthor(function (err, videosList) {
122     if (err) {
123       logger.error('Cannot get the list of videos we own.')
124       return
125     }
126
127     videosList.forEach(function (video) {
128       video.toRemoteJSON(function (err, remoteVideo) {
129         if (err) {
130           logger.error('Cannot convert video to remote.', { error: err })
131           // Don't break the process
132           return
133         }
134
135         createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
136       })
137     })
138   })
139 }
140
141 // ---------------------------------------------------------------------------
142
143 module.exports = friends
144
145 // ---------------------------------------------------------------------------
146
147 function computeForeignPodsList (host, podsScore, callback) {
148   getForeignPodsList(host, function (err, foreignPodsList) {
149     if (err) return callback(err)
150
151     if (!foreignPodsList) foreignPodsList = []
152
153     // Let's give 1 point to the pod we ask the friends list
154     foreignPodsList.push({ host })
155
156     foreignPodsList.forEach(function (foreignPod) {
157       const foreignPodHost = foreignPod.host
158
159       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
160       else podsScore[foreignPodHost] = 1
161     })
162
163     callback()
164   })
165 }
166
167 function computeWinningPods (hosts, podsScore) {
168   // Build the list of pods to add
169   // Only add a pod if it exists in more than a half base pods
170   const podsList = []
171   const baseScore = hosts.length / 2
172   Object.keys(podsScore).forEach(function (podHost) {
173     // If the pod is not me and with a good score we add it
174     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
175       podsList.push({ host: podHost })
176     }
177   })
178
179   return podsList
180 }
181
182 function getForeignPodsList (host, callback) {
183   const path = '/api/' + constants.API_VERSION + '/pods'
184
185   request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
186     if (err) return callback(err)
187
188     try {
189       const json = JSON.parse(body)
190       return callback(null, json)
191     } catch (err) {
192       return callback(err)
193     }
194   })
195 }
196
197 function makeRequestsToWinningPods (cert, podsList, callback) {
198   // Stop pool requests
199   db.Request.deactivate()
200   // Flush pool requests
201   db.Request.forceSend()
202
203   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
204     const params = {
205       url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
206       method: 'POST',
207       json: {
208         host: constants.CONFIG.WEBSERVER.HOST,
209         publicKey: cert
210       }
211     }
212
213     requests.makeRetryRequest(params, function (err, res, body) {
214       if (err) {
215         logger.error('Error with adding %s pod.', pod.host, { error: err })
216         // Don't break the process
217         return callbackEach()
218       }
219
220       if (res.statusCode === 200) {
221         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
222         podObj.save().asCallback(function (err, podCreated) {
223           if (err) {
224             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
225             return callbackEach()
226           }
227
228           // Add our videos to the request scheduler
229           sendOwnedVideosToPod(podCreated._id)
230
231           return callbackEach()
232         })
233       } else {
234         logger.error('Status not 200 for %s pod.', pod.host)
235         return callbackEach()
236       }
237     })
238   }, function endRequests () {
239     // Final callback, we've ended all the requests
240     // Now we made new friends, we can re activate the pool of requests
241     db.Request.activate()
242
243     logger.debug('makeRequestsToWinningPods finished.')
244     return callback()
245   })
246 }
247
248 // Wrapper that populate "to" argument with all our friends if it is not specified
249 function createRequest (type, endpoint, data, to) {
250   if (to) return _createRequest(type, endpoint, data, to)
251
252   // If the "to" pods is not specified, we send the request to all our friends
253   db.Pod.listAllIds(function (err, podIds) {
254     if (err) {
255       logger.error('Cannot get pod ids', { error: err })
256       return
257     }
258
259     return _createRequest(type, endpoint, data, podIds)
260   })
261 }
262
263 function _createRequest (type, endpoint, data, to) {
264   const pods = []
265
266   // If there are no destination pods abort
267   if (to.length === 0) return
268
269   to.forEach(function (toPod) {
270     pods.push(db.Pod.build({ id: toPod }))
271   })
272
273   const createQuery = {
274     endpoint,
275     request: {
276       type: type,
277       data: data
278     }
279   }
280
281   // We run in transaction to keep coherency between Request and RequestToPod tables
282   db.sequelize.transaction(function (t) {
283     const dbRequestOptions = {
284       transaction: t
285     }
286
287     return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
288       return request.setPods(pods, dbRequestOptions)
289     })
290   }).asCallback(function (err) {
291     if (err) logger.error('Error in createRequest transaction.', { error: err })
292   })
293 }
294
295 function isMe (host) {
296   return host === constants.CONFIG.WEBSERVER.HOST
297 }