Merge branch 'develop' into pr/1285
[oweals/peertube.git] / server / lib / schedulers / videos-redundancy-scheduler.ts
1 import { AbstractScheduler } from './abstract-scheduler'
2 import { CONFIG, HLS_REDUNDANCY_DIRECTORY, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers'
3 import { logger } from '../../helpers/logger'
4 import { VideosRedundancy } from '../../../shared/models/redundancy'
5 import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
6 import { VideoFileModel } from '../../models/video/video-file'
7 import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
8 import { join } from 'path'
9 import { move } from 'fs-extra'
10 import { getServerActor } from '../../helpers/utils'
11 import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
12 import { getVideoCacheFileActivityPubUrl, getVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url'
13 import { removeVideoRedundancy } from '../redundancy'
14 import { getOrCreateVideoAndAccountAndChannel } from '../activitypub'
15 import { VideoStreamingPlaylistModel } from '../../models/video/video-streaming-playlist'
16 import { VideoModel } from '../../models/video/video'
17 import { downloadPlaylistSegments } from '../hls'
18
19 type CandidateToDuplicate = {
20   redundancy: VideosRedundancy,
21   video: VideoModel,
22   files: VideoFileModel[],
23   streamingPlaylists: VideoStreamingPlaylistModel[]
24 }
25
26 export class VideosRedundancyScheduler extends AbstractScheduler {
27
28   private static instance: AbstractScheduler
29
30   protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
31
32   private constructor () {
33     super()
34   }
35
36   protected async internalExecute () {
37     for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
38       logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy)
39
40       try {
41         const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig)
42         if (!videoToDuplicate) continue
43
44         const candidateToDuplicate = {
45           video: videoToDuplicate,
46           redundancy: redundancyConfig,
47           files: videoToDuplicate.VideoFiles,
48           streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
49         }
50
51         await this.purgeCacheIfNeeded(candidateToDuplicate)
52
53         if (await this.isTooHeavy(candidateToDuplicate)) {
54           logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
55           continue
56         }
57
58         logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, redundancyConfig.strategy)
59
60         await this.createVideoRedundancies(candidateToDuplicate)
61       } catch (err) {
62         logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err })
63       }
64     }
65
66     await this.extendsLocalExpiration()
67
68     await this.purgeRemoteExpired()
69   }
70
71   static get Instance () {
72     return this.instance || (this.instance = new this())
73   }
74
75   private async extendsLocalExpiration () {
76     const expired = await VideoRedundancyModel.listLocalExpired()
77
78     for (const redundancyModel of expired) {
79       try {
80         const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
81         const candidate = {
82           redundancy: redundancyConfig,
83           video: null,
84           files: [],
85           streamingPlaylists: []
86         }
87
88         // If the administrator disabled the redundancy or decreased the cache size, remove this redundancy instead of extending it
89         if (!redundancyConfig || await this.isTooHeavy(candidate)) {
90           logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
91           await removeVideoRedundancy(redundancyModel)
92         } else {
93           await this.extendsRedundancy(redundancyModel)
94         }
95       } catch (err) {
96         logger.error(
97           'Cannot extend or remove expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel),
98           { err }
99         )
100       }
101     }
102   }
103
104   private async extendsRedundancy (redundancyModel: VideoRedundancyModel) {
105     const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
106     // Redundancy strategy disabled, remove our redundancy instead of extending expiration
107     if (!redundancy) await removeVideoRedundancy(redundancyModel)
108
109     await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
110   }
111
112   private async purgeRemoteExpired () {
113     const expired = await VideoRedundancyModel.listRemoteExpired()
114
115     for (const redundancyModel of expired) {
116       try {
117         await removeVideoRedundancy(redundancyModel)
118       } catch (err) {
119         logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
120       }
121     }
122   }
123
124   private findVideoToDuplicate (cache: VideosRedundancy) {
125     if (cache.strategy === 'most-views') {
126       return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
127     }
128
129     if (cache.strategy === 'trending') {
130       return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
131     }
132
133     if (cache.strategy === 'recently-added') {
134       const minViews = cache.minViews
135       return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
136     }
137   }
138
139   private async createVideoRedundancies (data: CandidateToDuplicate) {
140     const video = await this.loadAndRefreshVideo(data.video.url)
141
142     if (!video) {
143       logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url)
144
145       return
146     }
147
148     for (const file of data.files) {
149       const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
150       if (existingRedundancy) {
151         await this.extendsRedundancy(existingRedundancy)
152
153         continue
154       }
155
156       await this.createVideoFileRedundancy(data.redundancy, video, file)
157     }
158
159     for (const streamingPlaylist of data.streamingPlaylists) {
160       const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id)
161       if (existingRedundancy) {
162         await this.extendsRedundancy(existingRedundancy)
163
164         continue
165       }
166
167       await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist)
168     }
169   }
170
171   private async createVideoFileRedundancy (redundancy: VideosRedundancy, video: VideoModel, file: VideoFileModel) {
172     file.Video = video
173
174     const serverActor = await getServerActor()
175
176     logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy)
177
178     const { baseUrlHttp, baseUrlWs } = video.getBaseUrls()
179     const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs)
180
181     const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT)
182
183     const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, video.getVideoFilename(file))
184     await move(tmpPath, destPath)
185
186     const createdModel = await VideoRedundancyModel.create({
187       expiresOn: this.buildNewExpiration(redundancy.minLifetime),
188       url: getVideoCacheFileActivityPubUrl(file),
189       fileUrl: video.getVideoRedundancyUrl(file, CONFIG.WEBSERVER.URL),
190       strategy: redundancy.strategy,
191       videoFileId: file.id,
192       actorId: serverActor.id
193     })
194
195     createdModel.VideoFile = file
196
197     await sendCreateCacheFile(serverActor, video, createdModel)
198
199     logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url)
200   }
201
202   private async createStreamingPlaylistRedundancy (redundancy: VideosRedundancy, video: VideoModel, playlist: VideoStreamingPlaylistModel) {
203     playlist.Video = video
204
205     const serverActor = await getServerActor()
206
207     logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, redundancy.strategy)
208
209     const destDirectory = join(HLS_REDUNDANCY_DIRECTORY, video.uuid)
210     await downloadPlaylistSegments(playlist.playlistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT)
211
212     const createdModel = await VideoRedundancyModel.create({
213       expiresOn: this.buildNewExpiration(redundancy.minLifetime),
214       url: getVideoCacheStreamingPlaylistActivityPubUrl(video, playlist),
215       fileUrl: playlist.getVideoRedundancyUrl(CONFIG.WEBSERVER.URL),
216       strategy: redundancy.strategy,
217       videoStreamingPlaylistId: playlist.id,
218       actorId: serverActor.id
219     })
220
221     createdModel.VideoStreamingPlaylist = playlist
222
223     await sendCreateCacheFile(serverActor, video, createdModel)
224
225     logger.info('Duplicated playlist %s -> %s.', playlist.playlistUrl, createdModel.url)
226   }
227
228   private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) {
229     logger.info('Extending expiration of %s.', redundancy.url)
230
231     const serverActor = await getServerActor()
232
233     redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
234     await redundancy.save()
235
236     await sendUpdateCacheFile(serverActor, redundancy)
237   }
238
239   private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) {
240     while (this.isTooHeavy(candidateToDuplicate)) {
241       const redundancy = candidateToDuplicate.redundancy
242       const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime)
243       if (!toDelete) return
244
245       await removeVideoRedundancy(toDelete)
246     }
247   }
248
249   private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) {
250     const maxSize = candidateToDuplicate.redundancy.size
251
252     const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(candidateToDuplicate.redundancy.strategy)
253     const totalWillDuplicate = totalDuplicated + this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists)
254
255     return totalWillDuplicate > maxSize
256   }
257
258   private buildNewExpiration (expiresAfterMs: number) {
259     return new Date(Date.now() + expiresAfterMs)
260   }
261
262   private buildEntryLogId (object: VideoRedundancyModel) {
263     if (object.VideoFile) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
264
265     return `${object.VideoStreamingPlaylist.playlistUrl}`
266   }
267
268   private getTotalFileSizes (files: VideoFileModel[], playlists: VideoStreamingPlaylistModel[]) {
269     const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size
270
271     return files.reduce(fileReducer, 0) * playlists.length
272   }
273
274   private async loadAndRefreshVideo (videoUrl: string) {
275     // We need more attributes and check if the video still exists
276     const getVideoOptions = {
277       videoObject: videoUrl,
278       syncParam: { likes: false, dislikes: false, shares: false, comments: false, thumbnail: false, refreshVideo: true },
279       fetchType: 'all' as 'all'
280     }
281     const { video } = await getOrCreateVideoAndAccountAndChannel(getVideoOptions)
282
283     return video
284   }
285 }