Move to promises
[oweals/peertube.git] / server / lib / friends.ts
1 import * as request from 'request'
2 import * as Sequelize from 'sequelize'
3 import * as Promise from 'bluebird'
4
5 import { database as db } from '../initializers/database'
6 import {
7   API_VERSION,
8   CONFIG,
9   REQUESTS_IN_PARALLEL,
10   REQUEST_ENDPOINTS,
11   REQUEST_ENDPOINT_ACTIONS,
12   REMOTE_SCHEME
13 } from '../initializers'
14 import {
15   logger,
16   getMyPublicCert,
17   makeSecureRequest,
18   makeRetryRequest
19 } from '../helpers'
20 import {
21   RequestScheduler,
22   RequestSchedulerOptions,
23
24   RequestVideoQaduScheduler,
25   RequestVideoQaduSchedulerOptions,
26
27   RequestVideoEventScheduler,
28   RequestVideoEventSchedulerOptions
29 } from './request'
30 import {
31   PodInstance,
32   VideoInstance
33 } from '../models'
34 import {
35   RequestEndpoint,
36   RequestVideoEventType,
37   RequestVideoQaduType
38 } from '../../shared'
39
40 type QaduParam = { videoId: string, type: RequestVideoQaduType }
41 type EventParam = { videoId: string, type: RequestVideoEventType }
42
43 const ENDPOINT_ACTIONS = REQUEST_ENDPOINT_ACTIONS[REQUEST_ENDPOINTS.VIDEOS]
44
45 const requestScheduler = new RequestScheduler()
46 const requestVideoQaduScheduler = new RequestVideoQaduScheduler()
47 const requestVideoEventScheduler = new RequestVideoEventScheduler()
48
49 function activateSchedulers () {
50   requestScheduler.activate()
51   requestVideoQaduScheduler.activate()
52   requestVideoEventScheduler.activate()
53 }
54
55 function addVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) {
56   const options = {
57     type: ENDPOINT_ACTIONS.ADD,
58     endpoint: REQUEST_ENDPOINTS.VIDEOS,
59     data: videoData,
60     transaction
61   }
62   return createRequest(options)
63 }
64
65 function updateVideoToFriends (videoData: Object, transaction: Sequelize.Transaction) {
66   const options = {
67     type: ENDPOINT_ACTIONS.UPDATE,
68     endpoint: REQUEST_ENDPOINTS.VIDEOS,
69     data: videoData,
70     transaction
71   }
72   return createRequest(options)
73 }
74
75 function removeVideoToFriends (videoParams: Object) {
76   const options = {
77     type: ENDPOINT_ACTIONS.REMOVE,
78     endpoint: REQUEST_ENDPOINTS.VIDEOS,
79     data: videoParams,
80     transaction: null
81   }
82   return createRequest(options)
83 }
84
85 function reportAbuseVideoToFriend (reportData: Object, video: VideoInstance, transaction: Sequelize.Transaction) {
86   const options = {
87     type: ENDPOINT_ACTIONS.REPORT_ABUSE,
88     endpoint: REQUEST_ENDPOINTS.VIDEOS,
89     data: reportData,
90     toIds: [ video.Author.podId ],
91     transaction
92   }
93   return createRequest(options)
94 }
95
96 function quickAndDirtyUpdateVideoToFriends (qaduParam: QaduParam, transaction?: Sequelize.Transaction) {
97   const options = {
98     videoId: qaduParam.videoId,
99     type: qaduParam.type,
100     transaction
101   }
102   return createVideoQaduRequest(options)
103 }
104
105 function quickAndDirtyUpdatesVideoToFriends (qadusParams: QaduParam[], transaction: Sequelize.Transaction) {
106   const tasks = []
107
108   qadusParams.forEach(function (qaduParams) {
109     tasks.push(quickAndDirtyUpdateVideoToFriends(qaduParams, transaction))
110   })
111
112   return Promise.all(tasks)
113 }
114
115 function addEventToRemoteVideo (eventParam: EventParam, transaction?: Sequelize.Transaction) {
116   const options = {
117     videoId: eventParam.videoId,
118     type: eventParam.type,
119     transaction
120   }
121   return createVideoEventRequest(options)
122 }
123
124 function addEventsToRemoteVideo (eventsParams: EventParam[], transaction: Sequelize.Transaction) {
125   const tasks = []
126
127   eventsParams.forEach(function (eventParams) {
128     tasks.push(addEventToRemoteVideo(eventParams, transaction))
129   })
130
131   return Promise.all(tasks)
132 }
133
134 function hasFriends () {
135   return db.Pod.countAll().then(count => count !== 0)
136 }
137
138 function makeFriends (hosts: string[]) {
139   const podsScore = {}
140
141   logger.info('Make friends!')
142   return getMyPublicCert()
143     .then(cert => {
144       return Promise.mapSeries(hosts, host => {
145         return computeForeignPodsList(host, podsScore)
146       }).then(() => cert)
147     })
148     .then(cert => {
149       logger.debug('Pods scores computed.', { podsScore: podsScore })
150       const podsList = computeWinningPods(hosts, podsScore)
151       logger.debug('Pods that we keep.', { podsToKeep: podsList })
152
153       return makeRequestsToWinningPods(cert, podsList)
154     })
155 }
156
157 function quitFriends () {
158   // Stop pool requests
159   requestScheduler.deactivate()
160
161   return requestScheduler.flush()
162     .then(() => {
163       return requestVideoQaduScheduler.flush()
164     })
165     .then(() => {
166       return db.Pod.list()
167     })
168     .then(pods => {
169       const requestParams = {
170         method: 'POST' as 'POST',
171         path: '/api/' + API_VERSION + '/remote/pods/remove',
172         sign: true,
173         toPod: null
174       }
175
176       // Announce we quit them
177       // We don't care if the request fails
178       // The other pod will exclude us automatically after a while
179       return Promise.map(pods, pod => {
180         requestParams.toPod = pod
181         return makeSecureRequest(requestParams)
182       }, { concurrency: REQUESTS_IN_PARALLEL })
183       .then(() => pods)
184       .catch(err => {
185         logger.error('Some errors while quitting friends.', { err: err })
186         // Don't stop the process
187       })
188     })
189     .then(pods => {
190       const tasks = []
191       pods.forEach(pod => tasks.push(pod.destroy()))
192
193       return Promise.all(pods)
194     })
195     .then(() => {
196       logger.info('Removed all remote videos.')
197       // Don't forget to re activate the scheduler, even if there was an error
198       return requestScheduler.activate()
199     })
200     .finally(() => requestScheduler.activate())
201 }
202
203 function sendOwnedVideosToPod (podId: number) {
204   db.Video.listOwnedAndPopulateAuthorAndTags()
205     .then(videosList => {
206       const tasks = []
207       videosList.forEach(video => {
208         const promise = video.toAddRemoteJSON()
209           .then(remoteVideo => {
210             const options = {
211               type: 'add',
212               endpoint: REQUEST_ENDPOINTS.VIDEOS,
213               data: remoteVideo,
214               toIds: [ podId ],
215               transaction: null
216             }
217             return createRequest(options)
218           })
219           .catch(err => {
220             logger.error('Cannot convert video to remote.', { error: err })
221             // Don't break the process
222             return undefined
223           })
224
225         tasks.push(promise)
226       })
227
228       return Promise.all(tasks)
229     })
230 }
231
232 function getRequestScheduler () {
233   return requestScheduler
234 }
235
236 function getRequestVideoQaduScheduler () {
237   return requestVideoQaduScheduler
238 }
239
240 function getRequestVideoEventScheduler () {
241   return requestVideoEventScheduler
242 }
243
244 // ---------------------------------------------------------------------------
245
246 export {
247   activateSchedulers,
248   addVideoToFriends,
249   updateVideoToFriends,
250   reportAbuseVideoToFriend,
251   quickAndDirtyUpdateVideoToFriends,
252   quickAndDirtyUpdatesVideoToFriends,
253   addEventToRemoteVideo,
254   addEventsToRemoteVideo,
255   hasFriends,
256   makeFriends,
257   quitFriends,
258   removeVideoToFriends,
259   sendOwnedVideosToPod,
260   getRequestScheduler,
261   getRequestVideoQaduScheduler,
262   getRequestVideoEventScheduler
263 }
264
265 // ---------------------------------------------------------------------------
266
267 function computeForeignPodsList (host: string, podsScore: { [ host: string ]: number }) {
268   // TODO: type res
269   return getForeignPodsList(host).then((res: any) => {
270     const foreignPodsList = res.data
271
272     // Let's give 1 point to the pod we ask the friends list
273     foreignPodsList.push({ host })
274
275     foreignPodsList.forEach(foreignPod => {
276       const foreignPodHost = foreignPod.host
277
278       if (podsScore[foreignPodHost]) podsScore[foreignPodHost]++
279       else podsScore[foreignPodHost] = 1
280     })
281
282     return undefined
283   })
284 }
285
286 function computeWinningPods (hosts: string[], podsScore: { [ host: string ]: number }) {
287   // Build the list of pods to add
288   // Only add a pod if it exists in more than a half base pods
289   const podsList = []
290   const baseScore = hosts.length / 2
291
292   Object.keys(podsScore).forEach(podHost => {
293     // If the pod is not me and with a good score we add it
294     if (isMe(podHost) === false && podsScore[podHost] > baseScore) {
295       podsList.push({ host: podHost })
296     }
297   })
298
299   return podsList
300 }
301
302 function getForeignPodsList (host: string) {
303   return new Promise((res, rej) => {
304     const path = '/api/' + API_VERSION + '/pods'
305
306     request.get(REMOTE_SCHEME.HTTP + '://' + host + path, function (err, response, body) {
307       if (err) return rej(err)
308
309       try {
310         const json = JSON.parse(body)
311         return res(json)
312       } catch (err) {
313         return rej(err)
314       }
315     })
316   })
317 }
318
319 function makeRequestsToWinningPods (cert: string, podsList: PodInstance[]) {
320   // Stop pool requests
321   requestScheduler.deactivate()
322   // Flush pool requests
323   requestScheduler.forceSend()
324
325   return Promise.map(podsList, pod => {
326     const params = {
327       url: REMOTE_SCHEME.HTTP + '://' + pod.host + '/api/' + API_VERSION + '/pods/',
328       method: 'POST' as 'POST',
329       json: {
330         host: CONFIG.WEBSERVER.HOST,
331         email: CONFIG.ADMIN.EMAIL,
332         publicKey: cert
333       }
334     }
335
336     return makeRetryRequest(params)
337       .then(({ response, body }) => {
338         body = body as { cert: string, email: string }
339
340         if (response.statusCode === 200) {
341           const podObj = db.Pod.build({ host: pod.host, publicKey: body.cert, email: body.email })
342           return podObj.save()
343             .then(podCreated => {
344
345               // Add our videos to the request scheduler
346               sendOwnedVideosToPod(podCreated.id)
347             })
348             .catch(err => {
349               logger.error('Cannot add friend %s pod.', pod.host, { error: err })
350             })
351         } else {
352           logger.error('Status not 200 for %s pod.', pod.host)
353         }
354       })
355       .catch(err => {
356         logger.error('Error with adding %s pod.', pod.host, { error: err.stack })
357         // Don't break the process
358       })
359   }, { concurrency: REQUESTS_IN_PARALLEL })
360   .then(() => logger.debug('makeRequestsToWinningPods finished.'))
361   .finally(() => {
362     // Final callback, we've ended all the requests
363     // Now we made new friends, we can re activate the pool of requests
364     requestScheduler.activate()
365   })
366 }
367
368 // Wrapper that populate "toIds" argument with all our friends if it is not specified
369 type CreateRequestOptions = {
370   type: string
371   endpoint: RequestEndpoint
372   data: Object
373   toIds?: number[]
374   transaction: Sequelize.Transaction
375 }
376 function createRequest (options: CreateRequestOptions) {
377   if (options.toIds !== undefined) return requestScheduler.createRequest(options as RequestSchedulerOptions)
378
379   // If the "toIds" pods is not specified, we send the request to all our friends
380   return db.Pod.listAllIds(options.transaction).then(podIds => {
381     const newOptions = Object.assign(options, { toIds: podIds })
382     return requestScheduler.createRequest(newOptions)
383   })
384 }
385
386 function createVideoQaduRequest (options: RequestVideoQaduSchedulerOptions) {
387   return requestVideoQaduScheduler.createRequest(options)
388 }
389
390 function createVideoEventRequest (options: RequestVideoEventSchedulerOptions) {
391   return requestVideoEventScheduler.createRequest(options)
392 }
393
394 function isMe (host: string) {
395   return host === CONFIG.WEBSERVER.HOST
396 }