Improve redundancy: add 'min_lifetime' configuration
[oweals/peertube.git] / server / lib / schedulers / videos-redundancy-scheduler.ts
1 import { AbstractScheduler } from './abstract-scheduler'
2 import { CONFIG, JOB_TTL, REDUNDANCY } from '../../initializers'
3 import { logger } from '../../helpers/logger'
4 import { VideosRedundancy } from '../../../shared/models/redundancy'
5 import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
6 import { VideoFileModel } from '../../models/video/video-file'
7 import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
8 import { join } from 'path'
9 import { rename } from 'fs-extra'
10 import { getServerActor } from '../../helpers/utils'
11 import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
12 import { VideoModel } from '../../models/video/video'
13 import { getVideoCacheFileActivityPubUrl } from '../activitypub/url'
14 import { isTestInstance } from '../../helpers/core-utils'
15 import { removeVideoRedundancy } from '../redundancy'
16
17 export class VideosRedundancyScheduler extends AbstractScheduler {
18
19   private static instance: AbstractScheduler
20   private executing = false
21
22   protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
23
24   private constructor () {
25     super()
26   }
27
28   async execute () {
29     if (this.executing) return
30
31     this.executing = true
32
33     for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
34       logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
35
36       try {
37         const videoToDuplicate = await this.findVideoToDuplicate(obj)
38         if (!videoToDuplicate) continue
39
40         const videoFiles = videoToDuplicate.VideoFiles
41         videoFiles.forEach(f => f.Video = videoToDuplicate)
42
43         await this.purgeCacheIfNeeded(obj, videoFiles)
44
45         if (await this.isTooHeavy(obj, videoFiles)) {
46           logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
47           continue
48         }
49
50         logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy)
51
52         await this.createVideoRedundancy(obj, videoFiles)
53       } catch (err) {
54         logger.error('Cannot run videos redundancy %s.', obj.strategy, { err })
55       }
56     }
57
58     await this.extendsLocalExpiration()
59
60     await this.purgeRemoteExpired()
61
62     this.executing = false
63   }
64
65   static get Instance () {
66     return this.instance || (this.instance = new this())
67   }
68
69   private async extendsLocalExpiration () {
70     const expired = await VideoRedundancyModel.listLocalExpired()
71
72     for (const redundancyModel of expired) {
73       try {
74         const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
75         await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
76       } catch (err) {
77         logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel))
78       }
79     }
80   }
81
82   private async purgeRemoteExpired () {
83     const expired = await VideoRedundancyModel.listRemoteExpired()
84
85     for (const redundancyModel of expired) {
86       try {
87         await removeVideoRedundancy(redundancyModel)
88       } catch (err) {
89         logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
90       }
91     }
92   }
93
94   private findVideoToDuplicate (cache: VideosRedundancy) {
95     if (cache.strategy === 'most-views') {
96       return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
97     }
98
99     if (cache.strategy === 'trending') {
100       return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
101     }
102
103     if (cache.strategy === 'recently-added') {
104       const minViews = cache.minViews
105       return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
106     }
107   }
108
109   private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
110     const serverActor = await getServerActor()
111
112     for (const file of filesToDuplicate) {
113       const existing = await VideoRedundancyModel.loadByFileId(file.id)
114       if (existing) {
115         await this.extendsExpirationOf(existing, redundancy.minLifetime)
116
117         continue
118       }
119
120       // We need more attributes and check if the video still exists
121       const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id)
122       if (!video) continue
123
124       logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy)
125
126       const { baseUrlHttp, baseUrlWs } = video.getBaseUrls()
127       const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs)
128
129       const tmpPath = await downloadWebTorrentVideo({ magnetUri }, JOB_TTL['video-import'])
130
131       const destPath = join(CONFIG.STORAGE.VIDEOS_DIR, video.getVideoFilename(file))
132       await rename(tmpPath, destPath)
133
134       const createdModel = await VideoRedundancyModel.create({
135         expiresOn: this.buildNewExpiration(redundancy.minLifetime),
136         url: getVideoCacheFileActivityPubUrl(file),
137         fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL),
138         strategy: redundancy.strategy,
139         videoFileId: file.id,
140         actorId: serverActor.id
141       })
142       createdModel.VideoFile = file
143
144       await sendCreateCacheFile(serverActor, createdModel)
145     }
146   }
147
148   private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) {
149     logger.info('Extending expiration of %s.', redundancy.url)
150
151     const serverActor = await getServerActor()
152
153     redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
154     await redundancy.save()
155
156     await sendUpdateCacheFile(serverActor, redundancy)
157   }
158
159   private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
160     while (this.isTooHeavy(redundancy, filesToDuplicate)) {
161       const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime)
162       if (!toDelete) return
163
164       await removeVideoRedundancy(toDelete)
165     }
166   }
167
168   private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
169     const maxSize = redundancy.size - this.getTotalFileSizes(filesToDuplicate)
170
171     const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy)
172
173     return totalDuplicated > maxSize
174   }
175
176   private buildNewExpiration (expiresAfterMs: number) {
177     return new Date(Date.now() + expiresAfterMs)
178   }
179
180   private buildEntryLogId (object: VideoRedundancyModel) {
181     return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
182   }
183
184   private getTotalFileSizes (files: VideoFileModel[]) {
185     const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size
186
187     return files.reduce(fileReducer, 0)
188   }
189 }