Type functions
[oweals/peertube.git] / server / lib / friends.ts
1 import { each, eachLimit, eachSeries, series, waterfall } from 'async'
2 import * as request from 'request'
3 import * as Sequelize from 'sequelize'
4
5 import { database as db } from '../initializers/database'
6 import {
7   API_VERSION,
8   CONFIG,
9   REQUESTS_IN_PARALLEL,
10   REQUEST_ENDPOINTS,
11   REQUEST_ENDPOINT_ACTIONS,
12   REMOTE_SCHEME
13 } from '../initializers'
14 import {
15   logger,
16   getMyPublicCert,
17   makeSecureRequest,
18   makeRetryRequest,
19   createEmptyCallback
20 } from '../helpers'
21 import {
22   RequestScheduler,
23   RequestSchedulerOptions,
24
25   RequestVideoQaduScheduler,
26   RequestVideoQaduSchedulerOptions,
27
28   RequestVideoEventScheduler,
29   RequestVideoEventSchedulerOptions
30 } from './request'
31 import { PodInstance, VideoInstance } from '../models'
32
33 type QaduParam = { videoId: string, type: string }
34 type EventParam = { videoId: string, type: string }
35
36 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
37
38 const requestScheduler = new RequestScheduler()
39 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
40 const requestVideoEventScheduler = new RequestVideoEventScheduler()
41
42 function activateSchedulers () {
43   requestScheduler.activate()
44   requestVideoQaduScheduler.activate()
45   requestVideoEventScheduler.activate()
46 }
47
48 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
49   const options = {
50     type: ENDPOINT_ACTIONS.ADD,
51     endpoint: REQUEST_ENDPOINTS.VIDEOS,
52     data: videoData,
53     transaction
54   }
55   createRequest(options, callback)
56 }
57
58 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction, callback: (err: Error) => void) {
59   const options = {
60     type: ENDPOINT_ACTIONS.UPDATE,
61     endpoint: REQUEST_ENDPOINTS.VIDEOS,
62     data: videoData,
63     transaction
64   }
65   createRequest(options, callback)
66 }
67
68 function removeVideoToFriends (videoParams: Object) {
69   const options = {
70     type: ENDPOINT_ACTIONS.REMOVE,
71     endpoint: REQUEST_ENDPOINTS.VIDEOS,
72     data: videoParams,
73     transaction: null
74   }
75   createRequest(options)
76 }
77
78 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance) {
79   const options = {
80     type: ENDPOINT_ACTIONS.REPORT_ABUSE,
81     endpoint: REQUEST_ENDPOINTS.VIDEOS,
82     data: reportData,
83     toIds: [ video.Author.podId ],
84     transaction: null
85   }
86   createRequest(options)
87 }
88
89 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
90   const options = {
91     videoId: qaduParam.videoId,
92     type: qaduParam.type,
93     transaction
94   }
95   return createVideoQaduRequest(options, callback)
96 }
97
98 function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
99   const tasks = []
100
101   qadusParams.forEach(function (qaduParams) {
102     const fun = function (callback) {
103       quickAndDirtyUpdateVideoToFriends(qaduParams, transaction, callback)
104     }
105
106     tasks.push(fun)
107   })
108
109   series(tasks, finalCallback)
110 }
111
112 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction, callback?: (err: Error) => void) {
113   const options = {
114     videoId: eventParam.videoId,
115     type: eventParam.type,
116     transaction
117   }
118   createVideoEventRequest(options, callback)
119 }
120
121 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction, finalCallback: (err: Error) => void) {
122   const tasks = []
123
124   eventsParams.forEach(function (eventParams) {
125     const fun = function (callback) {
126       addEventToRemoteVideo(eventParams, transaction, callback)
127     }
128
129     tasks.push(fun)
130   })
131
132   series(tasks, finalCallback)
133 }
134
135 function hasFriends (callback: (err: Error, hasFriends?: boolean) => void) {
136   db.Pod.countAll(function (err, count) {
137     if (err) return callback(err)
138
139     const hasFriends = (count !== 0)
140     callback(null, hasFriends)
141   })
142 }
143
144 function makeFriends (hosts: string[], callback: (err: Error) => void) {
145   const podsScore = {}
146
147   logger.info('Make friends!')
148   getMyPublicCert(function (err, cert) {
149     if (err) {
150       logger.error('Cannot read public cert.')
151       return callback(err)
152     }
153
154     eachSeries(hosts, function (host, callbackEach) {
155       computeForeignPodsList(host, podsScore, callbackEach)
156     }, function (err: Error) {
157       if (err) return callback(err)
158
159       logger.debug('Pods scores computed.', { podsScore: podsScore })
160       const podsList = computeWinningPods(hosts, podsScore)
161       logger.debug('Pods that we keep.', { podsToKeep: podsList })
162
163       makeRequestsToWinningPods(cert, podsList, callback)
164     })
165   })
166 }
167
168 function quitFriends (callback: (err: Error) => void) {
169   // Stop pool requests
170   requestScheduler.deactivate()
171
172   waterfall([
173     function flushRequests (callbackAsync) {
174       requestScheduler.flush(err => callbackAsync(err))
175     },
176
177     function flushVideoQaduRequests (callbackAsync) {
178       requestVideoQaduScheduler.flush(err => callbackAsync(err))
179     },
180
181     function getPodsList (callbackAsync) {
182       return db.Pod.list(callbackAsync)
183     },
184
185     function announceIQuitMyFriends (pods, callbackAsync) {
186       const requestParams = {
187         method: 'POST' as 'POST',
188         path: '/api/' + API_VERSION + '/remote/pods/remove',
189         sign: true,
190         toPod: null
191       }
192
193       // Announce we quit them
194       // We don't care if the request fails
195       // The other pod will exclude us automatically after a while
196       eachLimit(pods, REQUESTS_IN_PARALLEL, function (pod, callbackEach) {
197         requestParams.toPod = pod
198         makeSecureRequest(requestParams, callbackEach)
199       }, function (err) {
200         if (err) {
201           logger.error('Some errors while quitting friends.', { err: err })
202           // Don't stop the process
203         }
204
205         return callbackAsync(null, pods)
206       })
207     },
208
209     function removePodsFromDB (pods, callbackAsync) {
210       each(pods, function (pod: any, callbackEach) {
211         pod.destroy().asCallback(callbackEach)
212       }, callbackAsync)
213     }
214   ], function (err: Error) {
215     // Don't forget to re activate the scheduler, even if there was an error
216     requestScheduler.activate()
217
218     if (err) return callback(err)
219
220     logger.info('Removed all remote videos.')
221     return callback(null)
222   })
223 }
224
225 function sendOwnedVideosToPod (podId: number) {
226   db.Video.listOwnedAndPopulateAuthorAndTags(function (err, videosList) {
227     if (err) {
228       logger.error('Cannot get the list of videos we own.')
229       return
230     }
231
232     videosList.forEach(function (video) {
233       video.toAddRemoteJSON(function (err, remoteVideo) {
234         if (err) {
235           logger.error('Cannot convert video to remote.', { error: err })
236           // Don't break the process
237           return
238         }
239
240         const options = {
241           type: 'add',
242           endpoint: REQUEST_ENDPOINTS.VIDEOS,
243           data: remoteVideo,
244           toIds: [ podId ],
245           transaction: null
246         }
247         createRequest(options)
248       })
249     })
250   })
251 }
252
253 function getRequestScheduler () {
254   return requestScheduler
255 }
256
257 function getRequestVideoQaduScheduler () {
258   return requestVideoQaduScheduler
259 }
260
261 function getRequestVideoEventScheduler () {
262   return requestVideoEventScheduler
263 }
264
265 // ---------------------------------------------------------------------------
266
267 export {
268   activateSchedulers,
269   addVideoToFriends,
270   updateVideoToFriends,
271   reportAbuseVideoToFriend,
272   quickAndDirtyUpdateVideoToFriends,
273   quickAndDirtyUpdatesVideoToFriends,
274   addEventToRemoteVideo,
275   addEventsToRemoteVideo,
276   hasFriends,
277   makeFriends,
278   quitFriends,
279   removeVideoToFriends,
280   sendOwnedVideosToPod,
281   getRequestScheduler,
282   getRequestVideoQaduScheduler,
283   getRequestVideoEventScheduler
284 }
285
286 // ---------------------------------------------------------------------------
287
288 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }, callback: (err: Error) => void) {
289   getForeignPodsList(host, function (err, res) {
290     if (err) return callback(err)
291
292     const foreignPodsList = res.data
293
294     // Let's give 1 point to the pod we ask the friends list
295     foreignPodsList.push({ host })
296
297     foreignPodsList.forEach(function (foreignPod) {
298       const foreignPodHost = foreignPod.host
299
300       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
301       else podsScore[foreignPodHost] = 1
302     })
303
304     return callback(null)
305   })
306 }
307
308 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
309   // Build the list of pods to add
310   // Only add a pod if it exists in more than a half base pods
311   const podsList = []
312   const baseScore = hosts.length / 2
313
314   Object.keys(podsScore).forEach(function (podHost) {
315     // If the pod is not me and with a good score we add it
316     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
317       podsList.push({ host: podHost })
318     }
319   })
320
321   return podsList
322 }
323
324 function getForeignPodsList (host: string, callback: (err: Error, foreignPodsList?: any) => void) {
325   const path = '/api/' + API_VERSION + '/pods'
326
327   request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
328     if (err) return callback(err)
329
330     try {
331       const json = JSON.parse(body)
332       return callback(null, json)
333     } catch (err) {
334       return callback(err)
335     }
336   })
337 }
338
339 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[], callback: (err: Error) => void) {
340   // Stop pool requests
341   requestScheduler.deactivate()
342   // Flush pool requests
343   requestScheduler.forceSend()
344
345   eachLimit(podsList, REQUESTS_IN_PARALLEL, function (pod: PodInstance, callbackEach) {
346     const params = {
347       url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
348       method: 'POST' as 'POST',
349       json: {
350         host: CONFIG.WEBSERVER.HOST,
351         email: CONFIG.ADMIN.EMAIL,
352         publicKey: cert
353       }
354     }
355
356     makeRetryRequest(params, function (err, res, body: { cert: string, email: string }) {
357       if (err) {
358         logger.error('Error with adding %s pod.', pod.host, { error: err })
359         // Don't break the process
360         return callbackEach()
361       }
362
363       if (res.statusCode === 200) {
364         const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
365         podObj.save().asCallback(function (err, podCreated) {
366           if (err) {
367             logger.error('Cannot add friend %s pod.', pod.host, { error: err })
368             return callbackEach()
369           }
370
371           // Add our videos to the request scheduler
372           sendOwnedVideosToPod(podCreated.id)
373
374           return callbackEach()
375         })
376       } else {
377         logger.error('Status not 200 for %s pod.', pod.host)
378         return callbackEach()
379       }
380     })
381   }, function endRequests () {
382     // Final callback, we've ended all the requests
383     // Now we made new friends, we can re activate the pool of requests
384     requestScheduler.activate()
385
386     logger.debug('makeRequestsToWinningPods finished.')
387     return callback(null)
388   })
389 }
390
391 // Wrapper that populate "toIds" argument with all our friends if it is not specified
392 type CreateRequestOptions = {
393   type: string
394   endpoint: string
395   data: Object
396   toIds?: number[]
397   transaction: Sequelize.Transaction
398 }
399 function createRequest (options: CreateRequestOptions, callback?: (err: Error) => void) {
400   if (!callback) callback = function () { /* empty */ }
401
402   if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions, callback)
403
404   // If the "toIds" pods is not specified, we send the request to all our friends
405   db.Pod.listAllIds(options.transaction, function (err, podIds) {
406     if (err) {
407       logger.error('Cannot get pod ids', { error: err })
408       return
409     }
410
411     const newOptions = Object.assign(options, { toIds: podIds })
412     return requestScheduler.createRequest(newOptions, callback)
413   })
414 }
415
416 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions, callback: (err: Error) => void) {
417   if (!callback) callback = createEmptyCallback()
418
419   requestVideoQaduScheduler.createRequest(options, callback)
420 }
421
422 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions, callback: (err: Error) => void) {
423   if (!callback) callback = createEmptyCallback()
424
425   requestVideoEventScheduler.createRequest(options, callback)
426 }
427
428 function isMe (host: string) {
429   return host === CONFIG.WEBSERVER.HOST
430 }