8da9d52b50b5ad85af628ba912a8594c3a47a656
[oweals/peertube.git] / server / lib / schedulers / videos-redundancy-scheduler.ts
1 import { AbstractScheduler } from './abstract-scheduler'
2 import { HLS_REDUNDANCY_DIRECTORY, REDUNDANCY, VIDEO_IMPORT_TIMEOUT, WEBSERVER } from '../../initializers/constants'
3 import { logger } from '../../helpers/logger'
4 import { VideosRedundancyStrategy } from '../../../shared/models/redundancy'
5 import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
6 import { downloadWebTorrentVideo, generateMagnetUri } from '../../helpers/webtorrent'
7 import { join } from 'path'
8 import { move } from 'fs-extra'
9 import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
10 import { getVideoCacheFileActivityPubUrl, getVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url'
11 import { removeVideoRedundancy } from '../redundancy'
12 import { getOrCreateVideoAndAccountAndChannel } from '../activitypub/videos'
13 import { downloadPlaylistSegments } from '../hls'
14 import { CONFIG } from '../../initializers/config'
15 import {
16   MStreamingPlaylist, MStreamingPlaylistFiles,
17   MStreamingPlaylistVideo,
18   MVideoAccountLight,
19   MVideoFile,
20   MVideoFileVideo,
21   MVideoRedundancyFileVideo,
22   MVideoRedundancyStreamingPlaylistVideo,
23   MVideoRedundancyVideo,
24   MVideoWithAllFiles
25 } from '@server/typings/models'
26 import { getVideoFilename } from '../video-paths'
27 import { VideoModel } from '@server/models/video/video'
28 import { getServerActor } from '@server/models/application/application'
29
30 type CandidateToDuplicate = {
31   redundancy: VideosRedundancyStrategy
32   video: MVideoWithAllFiles
33   files: MVideoFile[]
34   streamingPlaylists: MStreamingPlaylistFiles[]
35 }
36
37 function isMVideoRedundancyFileVideo (
38   o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo
39 ): o is MVideoRedundancyFileVideo {
40   return !!(o as MVideoRedundancyFileVideo).VideoFile
41 }
42
43 export class VideosRedundancyScheduler extends AbstractScheduler {
44
45   private static instance: VideosRedundancyScheduler
46
47   protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
48
49   private constructor () {
50     super()
51   }
52
53   async createManualRedundancy (videoId: number) {
54     const videoToDuplicate = await VideoModel.loadWithFiles(videoId)
55
56     if (!videoToDuplicate) {
57       logger.warn('Video to manually duplicate %d does not exist anymore.', videoId)
58       return
59     }
60
61     return this.createVideoRedundancies({
62       video: videoToDuplicate,
63       redundancy: null,
64       files: videoToDuplicate.VideoFiles,
65       streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
66     })
67   }
68
69   protected async internalExecute () {
70     for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
71       logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy)
72
73       try {
74         const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig)
75         if (!videoToDuplicate) continue
76
77         const candidateToDuplicate = {
78           video: videoToDuplicate,
79           redundancy: redundancyConfig,
80           files: videoToDuplicate.VideoFiles,
81           streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
82         }
83
84         await this.purgeCacheIfNeeded(candidateToDuplicate)
85
86         if (await this.isTooHeavy(candidateToDuplicate)) {
87           logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
88           continue
89         }
90
91         logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, redundancyConfig.strategy)
92
93         await this.createVideoRedundancies(candidateToDuplicate)
94       } catch (err) {
95         logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err })
96       }
97     }
98
99     await this.extendsLocalExpiration()
100
101     await this.purgeRemoteExpired()
102   }
103
104   static get Instance () {
105     return this.instance || (this.instance = new this())
106   }
107
108   private async extendsLocalExpiration () {
109     const expired = await VideoRedundancyModel.listLocalExpired()
110
111     for (const redundancyModel of expired) {
112       try {
113         const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
114         const candidate: CandidateToDuplicate = {
115           redundancy: redundancyConfig,
116           video: null,
117           files: [],
118           streamingPlaylists: []
119         }
120
121         // If the administrator disabled the redundancy or decreased the cache size, remove this redundancy instead of extending it
122         if (!redundancyConfig || await this.isTooHeavy(candidate)) {
123           logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
124           await removeVideoRedundancy(redundancyModel)
125         } else {
126           await this.extendsRedundancy(redundancyModel)
127         }
128       } catch (err) {
129         logger.error(
130           'Cannot extend or remove expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel),
131           { err }
132         )
133       }
134     }
135   }
136
137   private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) {
138     const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
139     // Redundancy strategy disabled, remove our redundancy instead of extending expiration
140     if (!redundancy) {
141       await removeVideoRedundancy(redundancyModel)
142       return
143     }
144
145     await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
146   }
147
148   private async purgeRemoteExpired () {
149     const expired = await VideoRedundancyModel.listRemoteExpired()
150
151     for (const redundancyModel of expired) {
152       try {
153         await removeVideoRedundancy(redundancyModel)
154       } catch (err) {
155         logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
156       }
157     }
158   }
159
160   private findVideoToDuplicate (cache: VideosRedundancyStrategy) {
161     if (cache.strategy === 'most-views') {
162       return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
163     }
164
165     if (cache.strategy === 'trending') {
166       return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
167     }
168
169     if (cache.strategy === 'recently-added') {
170       const minViews = cache.minViews
171       return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
172     }
173   }
174
175   private async createVideoRedundancies (data: CandidateToDuplicate) {
176     const video = await this.loadAndRefreshVideo(data.video.url)
177
178     if (!video) {
179       logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url)
180
181       return
182     }
183
184     for (const file of data.files) {
185       const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
186       if (existingRedundancy) {
187         await this.extendsRedundancy(existingRedundancy)
188
189         continue
190       }
191
192       await this.createVideoFileRedundancy(data.redundancy, video, file)
193     }
194
195     for (const streamingPlaylist of data.streamingPlaylists) {
196       const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id)
197       if (existingRedundancy) {
198         await this.extendsRedundancy(existingRedundancy)
199
200         continue
201       }
202
203       await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist)
204     }
205   }
206
207   private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) {
208     let strategy = 'manual'
209     let expiresOn: Date = null
210
211     if (redundancy) {
212       strategy = redundancy.strategy
213       expiresOn = this.buildNewExpiration(redundancy.minLifetime)
214     }
215
216     const file = fileArg as MVideoFileVideo
217     file.Video = video
218
219     const serverActor = await getServerActor()
220
221     logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy)
222
223     const { baseUrlHttp, baseUrlWs } = video.getBaseUrls()
224     const magnetUri = generateMagnetUri(video, file, baseUrlHttp, baseUrlWs)
225
226     const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT)
227
228     const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, getVideoFilename(video, file))
229     await move(tmpPath, destPath, { overwrite: true })
230
231     const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({
232       expiresOn,
233       url: getVideoCacheFileActivityPubUrl(file),
234       fileUrl: video.getVideoRedundancyUrl(file, WEBSERVER.URL),
235       strategy,
236       videoFileId: file.id,
237       actorId: serverActor.id
238     })
239
240     createdModel.VideoFile = file
241
242     await sendCreateCacheFile(serverActor, video, createdModel)
243
244     logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url)
245   }
246
247   private async createStreamingPlaylistRedundancy (
248     redundancy: VideosRedundancyStrategy,
249     video: MVideoAccountLight,
250     playlistArg: MStreamingPlaylist
251   ) {
252     let strategy = 'manual'
253     let expiresOn: Date = null
254
255     if (redundancy) {
256       strategy = redundancy.strategy
257       expiresOn = this.buildNewExpiration(redundancy.minLifetime)
258     }
259
260     const playlist = playlistArg as MStreamingPlaylistVideo
261     playlist.Video = video
262
263     const serverActor = await getServerActor()
264
265     logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy)
266
267     const destDirectory = join(HLS_REDUNDANCY_DIRECTORY, video.uuid)
268     await downloadPlaylistSegments(playlist.playlistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT)
269
270     const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({
271       expiresOn,
272       url: getVideoCacheStreamingPlaylistActivityPubUrl(video, playlist),
273       fileUrl: playlist.getVideoRedundancyUrl(WEBSERVER.URL),
274       strategy,
275       videoStreamingPlaylistId: playlist.id,
276       actorId: serverActor.id
277     })
278
279     createdModel.VideoStreamingPlaylist = playlist
280
281     await sendCreateCacheFile(serverActor, video, createdModel)
282
283     logger.info('Duplicated playlist %s -> %s.', playlist.playlistUrl, createdModel.url)
284   }
285
286   private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) {
287     logger.info('Extending expiration of %s.', redundancy.url)
288
289     const serverActor = await getServerActor()
290
291     redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
292     await redundancy.save()
293
294     await sendUpdateCacheFile(serverActor, redundancy)
295   }
296
297   private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) {
298     while (await this.isTooHeavy(candidateToDuplicate)) {
299       const redundancy = candidateToDuplicate.redundancy
300       const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime)
301       if (!toDelete) return
302
303       await removeVideoRedundancy(toDelete)
304     }
305   }
306
307   private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) {
308     const maxSize = candidateToDuplicate.redundancy.size
309
310     const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(candidateToDuplicate.redundancy.strategy)
311     const totalWillDuplicate = totalDuplicated + this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists)
312
313     return totalWillDuplicate > maxSize
314   }
315
316   private buildNewExpiration (expiresAfterMs: number) {
317     return new Date(Date.now() + expiresAfterMs)
318   }
319
320   private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) {
321     if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
322
323     return `${object.VideoStreamingPlaylist.playlistUrl}`
324   }
325
326   private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]) {
327     const fileReducer = (previous: number, current: MVideoFile) => previous + current.size
328
329     let allFiles = files
330     for (const p of playlists) {
331       allFiles = allFiles.concat(p.VideoFiles)
332     }
333
334     return allFiles.reduce(fileReducer, 0)
335   }
336
337   private async loadAndRefreshVideo (videoUrl: string) {
338     // We need more attributes and check if the video still exists
339     const getVideoOptions = {
340       videoObject: videoUrl,
341       syncParam: { likes: false, dislikes: false, shares: false, comments: false, thumbnail: false, refreshVideo: true },
342       fetchType: 'all' as 'all'
343     }
344     const { video } = await getOrCreateVideoAndAccountAndChannel(getVideoOptions)
345
346     return video
347   }
348 }