Merge branch 'master' into webseed-merged
[oweals/peertube.git] / server / lib / friends.js
1 'use strict'
2
3 const each = require('async/each')
4 const eachLimit = require('async/eachLimit')
5 const eachSeries = require('async/eachSeries')
6 const fs = require('fs')
7 const mongoose = require('mongoose')
8 const request = require('request')
9 const waterfall = require('async/waterfall')
10
11 const constants = require('../initializers/constants')
12 const logger = require('../helpers/logger')
13 const requests = require('../helpers/requests')
14
15 const Pod = mongoose.model('Pod')
16 const Request = mongoose.model('Request')
17 const Video = mongoose.model('Video')
18
19 const friends = {
20   addVideoToFriends,
21   hasFriends,
22   getMyCertificate,
23   makeFriends,
24   quitFriends,
25   removeVideoToFriends,
26   sendOwnedVideosToPod
27 }
28
29 function addVideoToFriends (video) {
30   createRequest('add', video)
31 }
32
33 function hasFriends (callback) {
34   Pod.countAll(function (err, count) {
35     if (err) return callback(err)
36
37     const hasFriends = (count !== 0)
38     callback(null, hasFriends)
39   })
40 }
41
42 function getMyCertificate (callback) {
43   fs.readFile(constants.CONFIG.STORAGE.CERT_DIR + 'peertube.pub', 'utf8', callback)
44 }
45
46 function makeFriends (urls, callback) {
47   const podsScore = {}
48
49   logger.info('Make friends!')
50   getMyCertificate(function (err, cert) {
51     if (err) {
52       logger.error('Cannot read public cert.')
53       return callback(err)
54     }
55
56     eachSeries(urls, function (url, callbackEach) {
57       computeForeignPodsList(url, podsScore, callbackEach)
58     }, function (err) {
59       if (err) return callback(err)
60
61       logger.debug('Pods scores computed.', { podsScore: podsScore })
62       const podsList = computeWinningPods(urls, podsScore)
63       logger.debug('Pods that we keep.', { podsToKeep: podsList })
64
65       makeRequestsToWinningPods(cert, podsList, callback)
66     })
67   })
68 }
69
70 function quitFriends (callback) {
71   // Stop pool requests
72   Request.deactivate()
73   // Flush pool requests
74   Request.flush()
75
76   waterfall([
77     function getPodsList (callbackAsync) {
78       return Pod.list(callbackAsync)
79     },
80
81     function announceIQuitMyFriends (pods, callbackAsync) {
82       const requestParams = {
83         method: 'POST',
84         path: '/api/' + constants.API_VERSION + '/pods/remove',
85         sign: true
86       }
87
88       // Announce we quit them
89       // We don't care if the request fails
90       // The other pod will exclude us automatically after a while
91       eachLimit(pods, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
92         requestParams.toPod = pod
93         requests.makeSecureRequest(requestParams, callbackEach)
94       }, function (err) {
95         if (err) {
96           logger.error('Some errors while quitting friends.', { err: err })
97           // Don't stop the process
98         }
99
100         return callbackAsync()
101       })
102     },
103
104     function removePodsFromDB (callbackAsync) {
105       Pod.removeAll(function (err) {
106         return callbackAsync(err)
107       })
108     },
109
110     function listRemoteVideos (callbackAsync) {
111       logger.info('Broke friends, so sad :(')
112
113       Video.listRemotes(callbackAsync)
114     },
115
116     function removeTheRemoteVideos (videosList, callbackAsync) {
117       each(videosList, function (video, callbackEach) {
118         video.remove(callbackEach)
119       }, callbackAsync)
120     }
121   ], function (err) {
122     // Don't forget to re activate the scheduler, even if there was an error
123     Request.activate()
124
125     if (err) return callback(err)
126
127     logger.info('Removed all remote videos.')
128     return callback(null)
129   })
130 }
131
132 function removeVideoToFriends (videoParams) {
133   createRequest('remove', videoParams)
134 }
135
136 function sendOwnedVideosToPod (podId) {
137   Video.listOwned(function (err, videosList) {
138     if (err) {
139       logger.error('Cannot get the list of videos we own.')
140       return
141     }
142
143     videosList.forEach(function (video) {
144       video.toRemoteJSON(function (err, remoteVideo) {
145         if (err) {
146           logger.error('Cannot convert video to remote.', { error: err })
147           // Don't break the process
148           return
149         }
150
151         createRequest('add', remoteVideo, [ podId ])
152       })
153     })
154   })
155 }
156
157 // ---------------------------------------------------------------------------
158
159 module.exports = friends
160
161 // ---------------------------------------------------------------------------
162
163 function computeForeignPodsList (url, podsScore, callback) {
164   getForeignPodsList(url, function (err, foreignPodsList) {
165     if (err) return callback(err)
166
167     if (!foreignPodsList) foreignPodsList = []
168
169     // Let's give 1 point to the pod we ask the friends list
170     foreignPodsList.push({ url: url })
171
172     foreignPodsList.forEach(function (foreignPod) {
173       const foreignPodUrl = foreignPod.url
174
175       if (podsScore[foreignPodUrl]) podsScore[foreignPodUrl]++
176       else podsScore[foreignPodUrl] = 1
177     })
178
179     callback()
180   })
181 }
182
183 function computeWinningPods (urls, podsScore) {
184   // Build the list of pods to add
185   // Only add a pod if it exists in more than a half base pods
186   const podsList = []
187   const baseScore = urls.length / 2
188   Object.keys(podsScore).forEach(function (pod) {
189     if (podsScore[pod] > baseScore) podsList.push({ url: pod })
190   })
191
192   return podsList
193 }
194
195 function getForeignPodsList (url, callback) {
196   const path = '/api/' + constants.API_VERSION + '/pods'
197
198   request.get(url + path, function (err, response, body) {
199     if (err) return callback(err)
200
201     try {
202       const json = JSON.parse(body)
203       return callback(null, json)
204     } catch (err) {
205       return callback(err)
206     }
207   })
208 }
209
210 function makeRequestsToWinningPods (cert, podsList, callback) {
211   // Stop pool requests
212   Request.deactivate()
213   // Flush pool requests
214   Request.forceSend()
215
216   eachLimit(podsList, constants.REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
217     const params = {
218       url: pod.url + '/api/' + constants.API_VERSION + '/pods/',
219       method: 'POST',
220       json: {
221         url: constants.CONFIG.WEBSERVER.URL,
222         publicKey: cert
223       }
224     }
225
226     requests.makeRetryRequest(params, function (err, res, body) {
227       if (err) {
228         logger.error('Error with adding %s pod.', pod.url, { error: err })
229         // Don't break the process
230         return callbackEach()
231       }
232
233       if (res.statusCode === 200) {
234         const podObj = new Pod({ url: pod.url, publicKey: body.cert })
235         podObj.save(function (err, podCreated) {
236           if (err) {
237             logger.error('Cannot add friend %s pod.', pod.url, { error: err })
238             return callbackEach()
239           }
240
241           // Add our videos to the request scheduler
242           sendOwnedVideosToPod(podCreated._id)
243
244           return callbackEach()
245         })
246       } else {
247         logger.error('Status not 200 for %s pod.', pod.url)
248         return callbackEach()
249       }
250     })
251   }, function endRequests () {
252     // Final callback, we've ended all the requests
253     // Now we made new friends, we can re activate the pool of requests
254     Request.activate()
255
256     logger.debug('makeRequestsToWinningPods finished.')
257     return callback()
258   })
259 }
260
261 function createRequest (type, data, to) {
262   const req = new Request({
263     request: {
264       type: type,
265       data: data
266     }
267   })
268
269   if (to) {
270     req.to = to
271   }
272
273   req.save(function (err) {
274     if (err) logger.error('Cannot save the request.', { error: err })
275   })
276 }