Move tags in another table
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const request = require('request')
8 const waterfall = require('async/waterfall')
9
10 const constants = require('../initializers/constants')
11 const db = require('../initializers/database')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const friends = {
16   addVideoToFriends,
17   hasFriends,
18   getMyCertificate,
19   makeFriends,
20   quitFriends,
21   removeVideoToFriends,
22   sendOwnedVideosToPod
23 }
24
25 function addVideoToFriends (video) {
26   createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video)
27 }
28
29 function hasFriends (callback) {
30   db.Pod.countAll(function (err, count) {
31     if (err) return callback(err)
32
33     const hasFriends = (count !== 0)
34     callback(null, hasFriends)
35   })
36 }
37
38 function getMyCertificate (callback) {
39   fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
40 }
41
42 function makeFriends (hosts, callback) {
43   const podsScore = {}
44
45   logger.info('Make friends!')
46   getMyCertificate(function (err, cert) {
47     if (err) {
48       logger.error('Cannot read public cert.')
49       return callback(err)
50     }
51
52     eachSeries(hosts, function (host, callbackEach) {
53       computeForeignPodsList(host, podsScore, callbackEach)
54     }, function (err) {
55       if (err) return callback(err)
56
57       logger.debug('Pods scores computed.', { podsScore: podsScore })
58       const podsList = computeWinningPods(hosts, podsScore)
59       logger.debug('Pods that we keep.', { podsToKeep: podsList })
60
61       makeRequestsToWinningPods(cert, podsList, callback)
62     })
63   })
64 }
65
66 function quitFriends (callback) {
67   // Stop pool requests
68   db.Request.deactivate()
69
70   waterfall([
71     function flushRequests (callbackAsync) {
72       db.Request.flush(callbackAsync)
73     },
74
75     function getPodsList (callbackAsync) {
76       return db.Pod.list(callbackAsync)
77     },
78
79     function announceIQuitMyFriends (pods, callbackAsync) {
80       const requestParams = {
81         method: 'POST',
82         path: '/api/' + constants.API_VERSION + '/pods/remove',
83         sign: true
84       }
85
86       // Announce we quit them
87       // We don't care if the request fails
88       // The other pod will exclude us automatically after a while
89       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
90         requestParams.toPod = pod
91         requests.makeSecureRequest(requestParams, callbackEach)
92       }, function (err) {
93         if (err) {
94           logger.error('Some errors while quitting friends.', { err: err })
95           // Don't stop the process
96         }
97
98         return callbackAsync(null, pods)
99       })
100     },
101
102     function removePodsFromDB (pods, callbackAsync) {
103       each(pods, function (pod, callbackEach) {
104         pod.destroy().asCallback(callbackEach)
105       }, callbackAsync)
106     }
107   ], function (err) {
108     // Don't forget to re activate the scheduler, even if there was an error
109     db.Request.activate()
110
111     if (err) return callback(err)
112
113     logger.info('Removed all remote videos.')
114     return callback(null)
115   })
116 }
117
118 function removeVideoToFriends (videoParams) {
119   createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams)
120 }
121
122 function sendOwnedVideosToPod (podId) {
123   db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
124     if (err) {
125       logger.error('Cannot get the list of videos we own.')
126       return
127     }
128
129     videosList.forEach(function (video) {
130       video.toRemoteJSON(function (err, remoteVideo) {
131         if (err) {
132           logger.error('Cannot convert video to remote.', { error: err })
133           // Don't break the process
134           return
135         }
136
137         createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ])
138       })
139     })
140   })
141 }
142
143 // ---------------------------------------------------------------------------
144
145 module.exports = friends
146
147 // ---------------------------------------------------------------------------
148
149 function computeForeignPodsList (host, podsScore, callback) {
150   getForeignPodsList(host, function (err, foreignPodsList) {
151     if (err) return callback(err)
152
153     if (!foreignPodsList) foreignPodsList = []
154
155     // Let's give 1 point to the pod we ask the friends list
156     foreignPodsList.push({ host })
157
158     foreignPodsList.forEach(function (foreignPod) {
159       const foreignPodHost = foreignPod.host
160
161       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
162       else podsScore[foreignPodHost] = 1
163     })
164
165     callback()
166   })
167 }
168
169 function computeWinningPods (hosts, podsScore) {
170   // Build the list of pods to add
171   // Only add a pod if it exists in more than a half base pods
172   const podsList = []
173   const baseScore = hosts.length / 2
174   Object.keys(podsScore).forEach(function (podHost) {
175     // If the pod is not me and with a good score we add it
176     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
177       podsList.push({ host: podHost })
178     }
179   })
180
181   return podsList
182 }
183
184 function getForeignPodsList (host, callback) {
185   const path = '/api/' + constants.API_VERSION + '/pods'
186
187   request.get(constants.REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
188     if (err) return callback(err)
189
190     try {
191       const json = JSON.parse(body)
192       return callback(null, json)
193     } catch (err) {
194       return callback(err)
195     }
196   })
197 }
198
199 function makeRequestsToWinningPods (cert, podsList, callback) {
200   // Stop pool requests
201   db.Request.deactivate()
202   // Flush pool requests
203   db.Request.forceSend()
204
205   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
206     const params = {
207       url: constants.REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + constants.API_VERSION + '/pods/',
208       method: 'POST',
209       json: {
210         host: constants.CONFIG.WEBSERVER.HOST,
211         publicKey: cert
212       }
213     }
214
215     requests.makeRetryRequest(params, function (err, res, body) {
216       if (err) {
217         logger.error('Error with adding %s pod.', pod.host, { error: err })
218         // Don't break the process
219         return callbackEach()
220       }
221
222       if (res.statusCode === 200) {
223         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert })
224         podObj.save().asCallback(function (err, podCreated) {
225           if (err) {
226             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
227             return callbackEach()
228           }
229
230           // Add our videos to the request scheduler
231           sendOwnedVideosToPod(podCreated.id)
232
233           return callbackEach()
234         })
235       } else {
236         logger.error('Status not 200 for %s pod.', pod.host)
237         return callbackEach()
238       }
239     })
240   }, function endRequests () {
241     // Final callback, we've ended all the requests
242     // Now we made new friends, we can re activate the pool of requests
243     db.Request.activate()
244
245     logger.debug('makeRequestsToWinningPods finished.')
246     return callback()
247   })
248 }
249
250 // Wrapper that populate "to" argument with all our friends if it is not specified
251 function createRequest (type, endpoint, data, to) {
252   if (to) return _createRequest(type, endpoint, data, to)
253
254   // If the "to" pods is not specified, we send the request to all our friends
255   db.Pod.listAllIds(function (err, podIds) {
256     if (err) {
257       logger.error('Cannot get pod ids', { error: err })
258       return
259     }
260
261     return _createRequest(type, endpoint, data, podIds)
262   })
263 }
264
265 function _createRequest (type, endpoint, data, to) {
266   const pods = []
267
268   // If there are no destination pods abort
269   if (to.length === 0) return
270
271   to.forEach(function (toPod) {
272     pods.push(db.Pod.build({ id: toPod }))
273   })
274
275   const createQuery = {
276     endpoint,
277     request: {
278       type: type,
279       data: data
280     }
281   }
282
283   // We run in transaction to keep coherency between Request and RequestToPod tables
284   db.sequelize.transaction(function (t) {
285     const dbRequestOptions = {
286       transaction: t
287     }
288
289     return db.Request.create(createQuery, dbRequestOptions).then(function (request) {
290       return request.setPods(pods, dbRequestOptions)
291     })
292   }).asCallback(function (err) {
293     if (err) logger.error('Error in createRequest transaction.', { error: err })
294   })
295 }
296
297 function isMe (host) {
298   return host === constants.CONFIG.WEBSERVER.HOST
299 }